Hadoop翻译文档(五) Map/Reduce 开发指南 | 张恒镭的博客

Hadoop翻译文档(五) Map/Reduce 开发指南

时间:13-12-26 栏目:MapReduce 作者:恒镭, 张 评论:0 点击: 3,221 次

  目的
  先决条件
  概述
  输入与输出
  例子:WordCount v1.0
o  源代码
o  用法
o  解释
  Map/Reduce -  用户界面
o  核心功能描述
  Mapper
  Reducer
  Partitioner
  Reporter
  OutputCollector
o  作业配置
o  任务的执行和环境
o  作业的提交与监控
  作业的控制
o  作业的输入
  InputSplit
  RecordReader
o  作业的输出
  任务的 Side-Effect File
  RecordWriter
o  其他有用的特性
  Counters
  DistributedCache
  Tool
  IsolationRunner
  Profiling
  调试
  JobControl
  数据压缩
  例子:WordCount v2.0
o  源代码
o  运行样例
o  程序要点
目的
这篇教程从用户的角度出发,全面地介绍了 Hadoop Map/Reduce 框架的各个方面。
先决条件
请先确认 Hadoop 被正确安装、配置和正常运行中。更多信息见:
  Hadoop 快速入门对初次使用者。
  Hadoop 集群搭建对大规模分布式集群。
概述
Hadoop Map/Reduce 是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群
上,并以一种可靠容错的方式并行处理上 T 级别的数据集。
一个 Map/Reduce  作业(job)  通常会把输入的数据集切分为若干独立的数据块,由  map 任务(task)以完全并行的方式
处理它们。框架会对 map 的输出先进行排序,  然后把结果输入给 reduce 任务。通常作业的输入和输出都会被存储在文件系
统中。  整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
通常,Map/Reduce 框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这
种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。
Map/Reduce 框架由一个单独的 master JobTracker  和每个集群节点一个 slave TaskTracker 共同组成。master 负责调
度构成一个作业的所有任务,这些任务分布在不同的 slave 上,master 监控它们的执行,重新执行已经失败的任务。而 slave
仅负责执行由 master 指派的任务。
应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供 map 和 reduce 函数。再加上其他作
业的参数,就构成了作业配置(job configuration)。然后,Hadoop 的  job client 提交作业(jar 包/可执行程序等)和配置
信息给 JobTracker,后者负责分发这些软件和配置信息给 slave、调度任务并监控它们的执行,同时提供状态和诊断信息
给 job-client。
虽然 Hadoop 框架是用 Java
TM
实现的,但 Map/Reduce 应用程序则不一定要用  Java 来写  。
  Hadoop Streaming 是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序  (例如:Shell 工具)来做
为 mapper 和 reducer。
  Hadoop Pipes 是一个与 SWIG 兼容的 C++ API  (没有基于 JNI
TM
技术),它也可用于实现 Map/Reduce 应用程序。
输入与输出
Map/Reduce 框架运转在<key, value>  键值对上,也就是说,  框架把作业的输入看为是一组<key, value>  键值对,
同样也产出一组  <key, value>  键值对做为作业的输出,这两组键值对的类型可能不同。
框架需要对 key 和 value 的类(classes)进行序列化操作,  因此,这些类需要实现  Writable 接口。  另外,为了方便框架执
行排序操作,key 类必须实现  WritableComparable 接口。
一个 Map/Reduce  作业的输入和输出类型如下所示:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)  
例子:WordCount v1.0
在深入细节之前,让我们先看一个 Map/Reduce 的应用示例,以便对它们的工作方式有一个初步的认识。
WordCount 是一个简单的应用,它可以计算出指定数据集中每一个单词出现的次数。
这个应用适用于  单机模式,  伪分布式模式  或  完全分布式模式  三种 Hadoop 安装方式。
源代码
WordCount.java
1.  package org.myorg;
2.
3.  import java.io.IOException;
4.  import java.util.*;
5.
6.  import org.apache.hadoop.fs.Path;
7.  import org.apache.hadoop.conf.*;
8.  import org.apache.hadoop.io.*;
9.  import org.apache.hadoop.mapred.*;
10.  import org.apache.hadoop.util.*;
11.
12.  public class WordCount {
13.
14.      public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text,
IntWritable> {
15.        private final static IntWritable one = new IntWritable(1);
16.        private Text word = new Text();
17.
18.        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
19.          String line = value.toString();
20.          StringTokenizer tokenizer = new StringTokenizer(line);
21.          while (tokenizer.hasMoreTokens()) {
22.            word.set(tokenizer.nextToken());
23.            output.collect(word, one);
24.          }
25.        }
26.      }
27.
28.      public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable,
Text, IntWritable> {
29.        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,
IntWritable> output, Reporter reporter) throws IOException {
30.          int sum = 0;
31.          while (values.hasNext()) {
32.            sum += values.next().get();
33.          }
34.          output.collect(key, new IntWritable(sum));
35.        }
36.      }
37.
38.      public static void main(String[] args) throws Exception {
39.        JobConf conf = new JobConf(WordCount.class);
40.        conf.setJobName("wordcount");  
41.
42.        conf.setOutputKeyClass(Text.class);
43.        conf.setOutputValueClass(IntWritable.class);
44.
45.        conf.setMapperClass(Map.class);
46.        conf.setCombinerClass(Reduce.class);
47.        conf.setReducerClass(Reduce.class);
48.
49.        conf.setInputFormat(TextInputFormat.class);
50.        conf.setOutputFormat(TextOutputFormat.class);
51.
52.        FileInputFormat.setInputPaths(conf, new Path(args[0]));
53.        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54.
55.        JobClient.runJob(conf);
57.      }
58.  }
59.
用法
假设环境变量 HADOOP_HOME 对应安装时的根目录,HADOOP_VERSION 对应 Hadoop 的当前安装版本,编译
WordCount.java 来创建 jar 包,可如下操作:
$ mkdir wordcount_classes
$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d wordcount_classes
WordCount.java
$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .
假设:
  /usr/joe/wordcount/input -  是 HDFS 中的输入路径
  /usr/joe/wordcount/output -  是 HDFS 中的输出路径
用示例文本文件做为输入:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World Bye World
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
运行应用程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input
/usr/joe/wordcount/output
输出是:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
应用程序能够使用-files 选项来指定一个由逗号分隔的路径列表,这些路径是 task 的当前工作目录。使用选项-libjars
可以向 map 和 reduce 的 classpath 中添加 jar 包。使用-archives 选项程序可以传递档案文件做为参数,这些档案文件会
被解压并且在 task 的当前工作目录下会创建一个指向解压生成的目录的符号链接(以压缩包的名字命名)。  有关命令行选
项的更多细节请参考  Commands manual。
使用-libjars 和-files 运行 wordcount 例子:
hadoop jar hadoop-examples.jar wordcount  -files cachefile.txt  -libjars mylib.jar input output
解释
WordCount 应用程序非常直截了当。
Mapper(14-26 行)中的 map 方法(18-25 行)通过指定的  TextInputFormat(49 行)一次处理一行。然后,它通过
StringTokenizer  以空格为分隔符将一行切分为若干 tokens,之后,输出< <word>, 1>  形式的键值对。
对于示例中的第一个输入,map 输出是:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
第二个输入,map 输出是:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
关于组成一个指定作业的 map 数目的确定,以及如何以更精细的方式去控制这些 map,我们将在教程的后续部分学习到更
多的内容。
WordCount 还指定了一个 combiner (46 行)。因此,每次 map 运行之后,会对输出按照 key 进行排序,然后把输出传递给
本地的 combiner(按照作业的配置与 Reducer 一样),进行本地聚合。
第一个 map 的输出是:
< Bye, 1>
< Hello, 1>
< World, 2>
第二个 map 的输出是:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
Reducer(28-36 行)中的 reduce 方法(29-35 行)  仅是将每个 key(本例中就是单词)出现的次数求和。
因此这个作业的输出就是:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>  
代码中的 run 方法中指定了作业的几个方面,  例如:通过命令行传递过来的输入/输出路径、key/value 的类型、输入/输出
的格式等等 JobConf 中的配置信息。随后程序调用了 JobClient.runJob(55 行)来提交作业并且监控它的执行。
我们将在本教程的后续部分学习更多的关于 JobConf,  JobClient,  Tool 和其他接口及类(class)。
Map/Reduce -  用户界面
这部分文档为用户将会面临的 Map/Reduce 框架中的各个环节提供了适当的细节。这应该会帮助用户更细粒度地去实现、配
置和调优作业。然而,请注意每个类/接口的 javadoc 文档提供最全面的文档;本文只是想起到指南的作用。
我们会先看看 Mapper 和 Reducer 接口。应用程序通常会通过提供 map 和 reduce 方法来实现它们。
然后,我们会讨论其他的核心接口,其中包括:   JobConf, JobClient, Partitioner,   OutputCollector, Reporter,
InputFormat,OutputFormat 等等。
最后,我们将通过讨论框架中一些有用的功能点(例如:DistributedCache,  IsolationRunner 等等)来收尾。
核心功能描述
应用程序通常会通过提供 map 和 reduce 来实现  Mapper 和 Reducer 接口,它们组成作业的核心。
Mapper
Mapper 将输入键值对(key/value pair)映射到一组中间格式的键值对集合。
Map 是一类将输入记录集转换为中间格式记录集的独立任务。   这种转换的中间格式记录集不需要与输入记录集的类型一致。
一个给定的输入键值对可以映射成 0 个或多个输出键值对。
Hadoop Map/Reduce 框架为每一个 InputSplit 产生一个 map 任务,而每个 InputSplit 是由该作业的 InputFormat
产生的。
概括地说,对 Mapper 的实现者需要重写  JobConfigurable.configure(JobConf)方法,这个方法需要传递一个 JobConf 参数,
目的是完成 Mapper 的初始化工作。然后,框架为这个任务的 InputSplit 中每个键值对调用一次  map(WritableComparable,
Writable, OutputCollector, Reporter)操作。应用程序可以通过重写 Closeable.close()方法来执行相应的清理工作。
输出键值对不需要与输入键值对的类型一致。一个给定的输入键值对可以映射成 0 个或多个输出键值对。通过调用
OutputCollector.collect(WritableComparable,Writable)可以收集输出的键值对。
应用程序可以使用 Reporter 报告进度,设定应用级别的状态消息,更新 Counters(计数器),或者仅是表明自己运行正
常。
框架随后会把与一个特定 key 关联的所有中间过程的值(value)分成组,然后把它们传给 Reducer 以产出最终的结果。用
户可以通过  JobConf.setOutputKeyComparatorClass(Class)来指定具体负责分组的  Comparator。
Mapper 的输出被排序后,就被划分给每个 Reducer。分块的总数目和一个作业的 reduce 任务的数目是一样的。用户可以
通过实现自定义的  Partitioner 来控制哪个 key 被分配给哪个  Reducer。
用户可选择通过  JobConf.setCombinerClass(Class)指定一个 combiner,它负责对中间过程的输出进行本地的聚集,这会有
助于降低从 Mapper 到  Reducer 数据传输量。
这些被排好序的中间过程的输出结果保存的格式是(key-len, key, value-len, value),应用程序可以通过 JobConf 控制对这些
中间结果是否进行压缩以及怎么压缩,使用哪种  CompressionCodec。
需要多少个 Map?
Map 的数目通常是由输入数据的大小决定的,一般就是所有输入文件的总块(block)数。
Map 正常的并行规模大致是每个节点(node)大约 10 到 100 个 map,对于 CPU  消耗较小的 map 任务可以设到 300 个左
右。由于每个任务初始化需要一定的时间,因此,比较合理的情况是 map 执行的时间至少超过 1 分钟。
这样,如果你输入 10TB 的数据,每个块(block)的大小是 128MB,你将需要大约 82,000 个 map 来完成任务,除非使用
setNumMapTasks(int)(注意:这里仅仅是对框架进行了一个提示(hint),实际决定因素见这里)将这个数值设置得更高。
Reducer
Reducer 将与一个 key 关联的一组中间数值集归约(reduce)为一个更小的数值集。
用户可以通过  JobConf.setNumReduceTasks(int)设定一个作业中 reduce 任务的数目。
概括地说,对 Reducer 的实现者需要重写  JobConfigurable.configure(JobConf)方法,这个方法需要传递一个 JobConf 参
数,目的是完成 Reducer 的初始化工作。然后,框架为成组的输入数据中的每个<key, (list of values)>对调用一次
reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。之后,应用程序可以通过重写 Closeable.close()来
执行相应的清理工作。
Reducer 有 3 个主要阶段:shuffle、sort 和 reduce。
Shuffle
Reducer 的输入就是 Mapper 已经排好序的输出。在这个阶段,框架通过 HTTP 为每个 Reducer 获得所有 Mapper 输出中
与之相关的分块。
Sort
这个阶段,框架将按照 key 的值对 Reducer 的输入进行分组  (因为不同 mapper 的输出中可能会有相同的 key)。
Shuffle 和 Sort 两个阶段是同时进行的;map 的输出也是一边被取回一边被合并的。
Secondary Sort
如果需要中间过程对 key 的分组规则和 reduce 前对 key 的分组规则不同,那么可以通过
JobConf.setOutputValueGroupingComparator(Class)来指定一个 Comparator。再加上
JobConf.setOutputKeyComparatorClass(Class)可用于控制中间过程的 key 如何被分组,所以结合两者可以实现按值的二次
排序。
Reduce
在这个阶段,框架为已分组的输入数据中的每个  <key, (list of values)>对调用一次  reduce(WritableComparable,
Iterator, OutputCollector, Reporter)方法。
Reduce 任务的输出通常是通过调用  OutputCollector.collect(WritableComparable, Writable)写入  文件系统的。
应用程序可以使用 Reporter 报告进度,设定应用程序级别的状态消息,更新 Counters(计数器),或者仅是表明自己运
行正常。
Reducer 的输出是没有排序的。
需要多少个 Reduce?
Reduce 的数目建议是 0.95 或 1.75 乘以  (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。
用 0.95,所有 reduce 可以在 maps 一完成时就立刻启动,开始传输 map 的输出结果。用 1.75,速度快的节点可以在完成第
一轮 reduce 任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。
增加 reduce 的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。
上述比例因子比整体数目稍小一些是为了给框架中的推测性任务(speculative-tasks)   或失败的任务预留一些 reduce 的资源。
无 Reducer
如果没有归约要进行,那么设置 reduce 任务的数目为零是合法的。
这种情况下, map 任务的输出会直接被写入由  setOutputPath(Path)指定的输出路径。框架在把它们写入 FileSystem 之前
没有对它们进行排序。
Partitioner
Partitioner 用于划分键值空间(key space)。
Partitioner 负责控制 map 输出结果 key 的分割。Key(或者一个 key 子集)被用于产生分区,通常使用的是 Hash 函数。分
区的数目与一个作业的 reduce 任务的数目是一样的。因此,它控制将中间过程的 key (也就是这条记录)应该发送给 m 个 reduce
任务中的哪一个来进行 reduce 操作。
HashPartitioner 是默认的  Partitioner。  
Reporter
Reporter 是用于 Map/Reduce 应用程序报告进度,设定应用级别的状态消息,  更新 Counters(计数器)的机制。
Mapper 和 Reducer 的实现可以利用 Reporter  来报告进度,或者仅是表明自己运行正常。在那种应用程序需要花很长时
间处理个别键值对的场景中,这种机制是很关键的,因为框架可能会以为这个任务超时了,从而将它强行杀死。另一个避免
这种情况发生的方式是,将配置参数 mapred.task.timeout 设置为一个足够高的值(或者干脆设置为零,则没有超时限
制了)。
应用程序可以用 Reporter 来更新 Counter(计数器)。
OutputCollector
OutputCollector 是一个 Map/Reduce 框架提供的用于收集  Mapper 或 Reducer 输出数据的通用机制  (包括中间输出结果
和作业的输出结果)。
Hadoop Map/Reduce 框架附带了一个包含许多实用型的 mapper、reducer 和 partitioner  的类库。
作业配置
JobConf 代表一个 Map/Reduce 作业的配置。
JobConf 是用户向 Hadoop 框架描述一个 Map/Reduce 作业如何执行的主要接口。框架会按照 JobConf 描述的信息忠实地
去尝试完成这个作业,然而:
  一些参数可能会被管理者标记为  final,这意味它们不能被更改。
  一些作业的参数可以被直截了当地进行设置(例如:  setNumReduceTasks(int)),而另一些参数则与框架或者作
业的其他参数之间微妙地相互影响,并且设置起来比较复杂(例如:  setNumMapTasks(int))。
通常, JobConf 会指明 Mapper、 Combiner(如果有的话)、   Partitioner、 Reducer、 InputFormat 和  OutputFormat
的具体实现。JobConf 还能指定一组输入文件  (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path))  和
(setInputPaths(JobConf, String) /addInputPaths(JobConf, String))  以及输出文件应该写在哪儿  (setOutputPath(Path))。
JobConf 可选择地对作业设置一些高级选项,例如:设置 Comparator;  放到 DistributedCache 上的文件;中间结果
或者作业输出结果是否需要压缩以及怎么压缩;  利用用户提供的脚本
(setMapDebugScript(String)/setReduceDebugScript(String))  进行调试;作业是否允许预防性(speculative)任务的执行
(setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean))  ;每个任务最大的尝试次数
(setMaxMapAttempts(int)/setMaxReduceAttempts(int))  ;一个作业能容忍的任务失败的百分比
(setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int))  ;等等。
当然,用户能使用  set(String, String)/get(String, String)  来设置或者取得应用程序需要的任意参数。然而,
DistributedCache 的使用是面向大规模只读数据的。
任务的执行和环境
TaskTracker 是在一个单独的 jvm 上以子进程的形式执行  Mapper/Reducer 任务(Task)的。
子任务会继承父 TaskTracker 的环境。用户可以通过 JobConf 中的  mapred.child.java.opts 配置参数来设定子 jvm
上的附加选项,例如:  通过-Djava.library.path=<>  将一个非标准路径设为运行时的链接用以搜索共享库,等等。如
果 mapred.child.java.opts 包含一个符号@taskid@,  它会被替换成 map/reduce 的 taskid 的值。
下面是一个包含多个参数和替换的例子,其中包括:记录 jvm GC 日志;  JVM JMX 代理程序以无密码的方式启动,这样它
就能连接到 jconsole 上,从而可以查看子进程的内存和线程,得到线程的 dump;还把子 jvm 的最大堆尺寸设置为 512MB,
并为子 jvm 的 java.library.path 添加了一个附加路径。
<property>
<name>mapred.child.java.opts</name>
<value>
-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
</value>
</property>
用户或管理员也可以使用 mapred.child.ulimit 设定运行的子任务的最大虚拟内存。mapred.child.ulimit 的值以
(KB)为单位,并且必须大于或等于-Xmx 参数传给 JavaVM 的值,否则 VM 会无法启动。
注意:mapred.child.java.opts 只用于设置 task tracker 启动的子任务。为守护进程设置内存选项请查看
cluster_setup.html
${mapred.local.dir}/taskTracker/是 task tracker 的本地目录,   用于创建本地缓存和 job。它可以指定多个目录(跨
越多个磁盘),文件会半随机的保存到本地路径下的某个目录。当 job 启动时, task tracker 根据配置文档创建本地 job 目录,
目录结构如以下所示:
  ${mapred.local.dir}/taskTracker/archive/ :分布式缓存。这个目录保存本地的分布式缓存。因此本地分
布式缓存是在所有 task 和 job 间共享的。
  ${mapred.local.dir}/taskTracker/jobcache/$jobid/ :  本地 job 目录。
o  ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job 指定的共享目录。各个任务
可以使用这个空间做为暂存空间,用于它们之间共享文件。这个目录通过 job.local.dir  参数暴露给用
户。这个路径可以通过 API JobConf.getJobLocalDir()来访问。它也可以被做为系统属性获得。因此,用
户(比如运行 streaming)可以调用 System.getProperty("job.local.dir")获得该目录。
o  ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/:  存放 jar 包的路径,用于存放作
业的 jar 文件和展开的 jar。 job.jar 是应用程序的 jar 文件,它会被自动分发到各台机器,在 task 启动前
会被自动展开。使用 api JobConf.getJar()  函数可以得到 job.jar 的位置。使用 JobConf.getJar().getParent()
可以访问存放展开的 jar 包的目录。  
o  ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml:  一个 job.xml 文件,本地的
通用的作业配置文件。
o  ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid:  每个任务有一个目录
task-id,它里面有如下的目录结构:
  ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml:  一个
job.xml 文件,本地化的任务作业配置文件。任务本地化是指为该 task 设定特定的属性值。这些
值会在下面具体说明。
  ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output  一个存放
中间过程的输出文件的目录。它保存了由 framwork 产生的临时 map reduce 数据,比如 map 的
输出文件等。
  ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work:   task 的当前
工作目录。
  ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp:  task
的临时目录。(用户可以设定属性 mapred.child.tmp  来为 map 和 reduce task 设定临时目录。
缺省值是./tmp。如果这个值不是绝对路径,  它会把 task 的工作路径加到该路径前面作为 task
的临时文件路径。如果这个值是绝对路径则直接使用这个值。  如果指定的目录不存在,会自动创
建该目录。之后,按照选项  -Djava.io.tmpdir='临时文件的绝对路径'执行 java 子任务。
pipes 和 streaming 的临时文件路径是通过环境变量 TMPDIR='the absolute path of the
tmp dir'设定的)。  如果 mapred.child.tmp 有./tmp 值,这个目录会被创建。
下面的属性是为每个 task 执行时使用的本地参数,它们保存在本地化的任务作业配置文件里:
名称  类型  描述
mapred.job.id  String  job id
mapred.jar  String  job 目录下 job.jar 的位置
job.local.dir  String  job 指定的共享存储空间
mapred.tip.id  String  task id
mapred.task.id  String  task 尝试 id
mapred.task.is.map  boolean    是否是 map task
mapred.task.partition  int    task 在 job 中的 id
map.input.file  String  map 读取的文件名
map.input.start  long  map 输入的数据块的起始位置偏移
map.input.length    long    map 输入的数据块的字节数
mapred.work.output.dir  String    task 临时输出目录
task 的标准输出和错误输出流会被读到 TaskTracker 中,并且记录到  ${HADOOP_LOG_DIR}/userlogs  
DistributedCache  可用于 map 或 reduce task 中分发 jar 包和本地库。子 jvm 总是把  当前工作目录  加到
java.library.path  和  LD_LIBRARY_PATH。  因此,可以通过  System.loadLibrary 或  System.load 装载缓存的库。有
关使用分布式缓存加载共享库的细节请参考  native_libraries.html
作业的提交与监控
JobClient 是用户提交的作业与 JobTracker 交互的主要接口。
JobClient  提供提交作业,追踪进程,访问子任务的日志记录,获得 Map/Reduce 集群状态信息等功能。
作业提交过程包括:
1.  检查作业输入输出样式细节
2.  为作业计算 InputSplit 值。
3.  如果需要的话,为作业的 DistributedCache 建立必须的统计信息。
4.  拷贝作业的 jar 包和配置文件到 FileSystem 上的 Map/Reduce 系统目录下。
5.  提交作业到 JobTracker 并且监控它的状态。
作业的历史文件记录到指定目录的"_logs/history/"子目录下。这个指定目录由 hadoop.job.history.user.location
设定,默认是作业输出的目录。因此默认情况下,文件会存放在 mapred.output.dir/_logs/history 目录下。用户可以设置
hadoop.job.history.user.location 为 none 来停止日志记录。
用户使用下面的命令可以看到在指定目录下的历史日志记录的摘要。
$ bin/hadoop job -history output-dir
这个命令会打印出作业的细节,以及失败的和被杀死的任务细节。
要查看有关作业的更多细节例如成功的任务、每个任务尝试的次数(task attempt)等,可以使用下面的命令
$ bin/hadoop job -history all output-dir
用户可以使用  OutputLogFilter  从输出目录列表中筛选日志文件。
一般情况,用户利用 JobConf 创建应用程序并配置作业属性,  然后用  JobClient  提交作业并监视它的进程。
作业的控制
有时候,用一个单独的 Map/Reduce 作业并不能完成一个复杂的任务,用户也许要链接多个 Map/Reduce 作业才行。这是容
易实现的,因为作业通常输出到分布式文件系统上的,所以可以把这个作业的输出作为下一个作业的输入实现串联。
然而,这也意味着,确保每一作业完成(成功或失败)的责任就直接落在了客户身上。在这种情况下,可以用的控制作业的选
项有:
  runJob(JobConf):提交作业,仅当作业完成时返回。
  submitJob(JobConf):只提交作业,之后需要你轮询它返回的  RunningJob 句柄的状态,并根据情况调度。  
  JobConf.setJobEndNotificationURI(String):设置一个作业完成通知,可避免轮询。
作业的输入
InputFormat  为 Map/Reduce 作业描述输入的细节规范。
Map/Reduce 框架根据作业的 InputFormat 来:
1.  检查作业输入的有效性。
2.  把输入文件切分成多个逻辑 InputSplit 实例,  并把每一实例分别分发给一个  Mapper。
3.  提供 RecordReader 的实现,这个 RecordReader 从逻辑 InputSplit 中获得输入记录,  这些记录将由 Mapper
处理。
基于文件的 InputFormat 实现(通常是  FileInputFormat 的子类)  默认行为是按照输入文件的字节大小,把输入数据切
分成逻辑分块(logical InputSplit  )。  其中输入文件所在的 FileSystem 的数据块尺寸是分块大小的上限。下限可以
设置 mapred.min.split.size  的值。
考虑到边界情况,对于很多应用程序来说,很明显按照文件大小进行逻辑分割是不能满足需求的。  在这种情况下,应用程序
需要实现一个 RecordReader 来处理记录的边界并为每个任务提供一个逻辑分块的面向记录的视图。
TextInputFormat  是默认的 InputFormat。
如果一个作业的 Inputformat 是 TextInputFormat,  并且框架检测到输入文件的后缀是.gz 和.lzo,就会使用对应的
CompressionCodec 自动解压缩这些文件。  但是需要注意,上述带后缀的压缩文件不会被切分,并且整个压缩文件会分给
一个 mapper 来处理。
InputSplit
InputSplit  是一个单独的 Mapper 要处理的数据块。
一般的 InputSplit  是字节样式输入,然后由 RecordReader 处理并转化成记录样式。
FileSplit  是默认的 InputSplit。  它把  map.input.file  设定为输入文件的路径,输入文件是逻辑分块文件。
RecordReader
RecordReader  从 InputSlit 读入<key, value>对。
一般的,RecordReader  把由 InputSplit  提供的字节样式的输入文件,转化成由 Mapper 处理的记录样式的文件。  因
此 RecordReader 负责处理记录的边界情况和把数据表示成 keys/values 对形式。
作业的输出
OutputFormat  描述 Map/Reduce 作业的输出样式。
Map/Reduce 框架根据作业的 OutputFormat 来:  
1.  检验作业的输出,例如检查输出路径是否已经存在。
2.  提供一个 RecordWriter 的实现,用来输出作业结果。  输出文件保存在 FileSystem 上。
TextOutputFormat 是默认的  OutputFormat。
任务的 Side-Effect File
在一些应用程序中,子任务需要产生一些 side-file,这些文件与作业实际输出结果的文件不同。
在这种情况下,同一个 Mapper 或者 Reducer 的两个实例(比如预防性任务)同时打开或者写  FileSystem 上的同一文件
就会产生冲突。因此应用程序在写文件的时候需要为每次任务尝试(不仅仅是每次任务,每个任务可以尝试执行很多次)选
取一个独一无二的文件名(使用 attemptid,例如 task_200709221812_0001_m_000000_0)。
为了避免冲突,Map/Reduce 框架为每次尝试执行任务都建立和维护一个特殊的
${mapred.output.dir}/_temporary/_${taskid}子目录,这个目录位于本次尝试执行任务输出结果所在的
FileSystem 上,可以通过  ${mapred.work.output.dir}来访问这个子目录。  对于成功完成的任务尝试,只有
${mapred.output.dir}/_temporary/_${taskid}下的文件会移动到${mapred.output.dir}。当然,框架会丢弃
那些失败的任务尝试的子目录。这种处理过程对于应用程序来说是完全透明的。
在任务执行期间,应用程序在写文件时可以利用这个特性,比如  通过  FileOutputFormat.getWorkOutputPath()获得
${mapred.work.output.dir}目录,  并在其下创建任意任务执行时所需的 side-file,框架在任务尝试成功时会马上移动
这些文件,因此不需要在程序内为每次任务尝试选取一个独一无二的名字。
注意:在每次任务尝试执行期间,${mapred.work.output.dir}  的值实际上是
${mapred.output.dir}/_temporary/_{$taskid},这个值是 Map/Reduce 框架创建的。  所以使用这个特性的方法
是,在  FileOutputFormat.getWorkOutputPath()  路径下创建 side-file 即可。
对于只使用 map 不使用 reduce 的作业,这个结论也成立。这种情况下,map 的输出结果直接生成到 HDFS 上。
RecordWriter
RecordWriter  生成<key, value>  对到输出文件。
RecordWriter 的实现把作业的输出结果写到  FileSystem。
其他有用的特性
Counters
Counters  是多个由 Map/Reduce 框架或者应用程序定义的全局计数器。  每一个 Counter 可以是任何一种  Enum 类型。
同一特定 Enum 类型的 Counter 可以汇集到一个组,其类型为 Counters.Group。
应用程序可以定义任意(Enum 类型)的 Counters 并且可以通过  map  或者  reduce 方法中的  Reporter.incrCounter(Enum,
long)或者  Reporter.incrCounter(String, String, long)  更新。之后框架会汇总这些全局 counters。  
DistributedCache
DistributedCache  可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。
DistributedCache  是 Map/Reduce 框架提供的功能,能够缓存应用程序所需的文件  (包括文本,档案文件, jar 文件等)。
应用程序在 JobConf 中通过 url(hdfs://)指定需要被缓存的文件。  DistributedCache 假定由 hdfs://格式 url 指定的文
件已经在  FileSystem 上了。
Map-Redcue 框架在作业所有任务执行之前会把必要的文件拷贝到 slave 节点上。  它运行高效是因为每个作业的文件只拷贝
一次并且为那些没有文档的 slave 节点缓存文档。
DistributedCache  根据缓存文档修改的时间戳进行追踪。   在作业执行期间,当前应用程序或者外部程序不能修改缓存文
件。
distributedCache 可以分发简单的只读数据或文本文件,也可以分发复杂类型的文件例如归档文件和 jar 文件。归档文件
(zip,tar,tgz 和 tar.gz 文件)在 slave 节点上会被解档(un-archived)。  这些文件可以设置执行权限。
用户可以通过设置 mapred.cache.{files|archives}来分发文件。  如果要分发多个文件,可以使用逗号分隔文件所在
路径。也可以利用 API 来设置该属性:  DistributedCache.addCacheFile(URI,conf)/
DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/
DistributedCache.setCacheArchives(URIs,conf)  其中 URI 的形式是  hdfs://host:port/absolute-path#link-name
在 Streaming 程序中,可以通过命令行选项  -cacheFile/-cacheArchive  分发文件。
用户可以通过  DistributedCache.createSymlink(Configuration)方法让 DistributedCache  在当前工作目录下创建到缓存
文件的符号链接。   或者通过设置配置文件属性 mapred.create.symlink 为 yes。   分布式缓存会截取 URI 的片段作为链
接的名字。  例如,URI 是  hdfs://namenode:port/lib.so.1#lib.so,  则在 task 当前工作目录会有名为 lib.so
的链接,  它会链接分布式缓存中的 lib.so.1。
DistributedCache 可在 map/reduce 任务中作为  一种基础软件分发机制使用。它可以被用于分发 jar 包和本地库(native
libraries)。   DistributedCache.addArchiveToClassPath(Path, Configuration)和  DistributedCache.addFileToClassPath(Path,
Configuration) API 能够被用于  缓存文件和 jar 包,并把它们加入子 jvm 的 classpath。也可以通过设置配置文档里的属性
mapred.job.classpath.{files|archives}达到相同的效果。缓存文件可用于分发和装载本地库。
Tool
Tool  接口支持处理常用的 Hadoop 命令行选项。
Tool  是 Map/Reduce 工具或应用的标准。应用程序应只处理其定制参数,   要把标准命令行选项通过  ToolRunner.run(Tool,
String[])  委托给  GenericOptionsParser 处理。
Hadoop 命令行的常用选项有:
-conf <configuration file>  
-D <property=value>
-fs <local|namenode:port>
-jt <local|jobtracker:port>
IsolationRunner
IsolationRunner  是帮助调试 Map/Reduce 程序的工具。
使用 IsolationRunner 的方法是,首先设置  keep.failed.tasks.files 属性为 true  (同时参考
keep.tasks.files.pattern)。
然后,登录到任务运行失败的节点上,进入  TaskTracker 的本地路径运行  IsolationRunner:
$ cd <local path>/taskTracker/${taskid}/work
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
IsolationRunner 会把失败的任务放在单独的一个能够调试的 jvm 上运行,并且采用和之前完全一样的输入数据。
Profiling
Profiling 是一个工具,它使用内置的 java profiler 工具进行分析获得(2-3 个)map 或 reduce 样例运行分析报告。
用户可以通过设置属性 mapred.task.profile 指定系统是否采集 profiler 信息。  利用 api
JobConf.setProfileEnabled(boolean)可以修改属性值。如果设为 true,  则开启 profiling 功能。profiler 信息保存在用户日
志目录下。缺省情况,profiling 功能是关闭的。
如果用户设定使用 profiling 功能,可以使用配置文档里的属性  mapred.task.profile.{maps|reduces}  设置要 profile
map/reduce task 的范围。设置该属性值的 api 是  JobConf.setProfileTaskRange(boolean,String)。  范围的缺省值是 0-2。
用户可以通过设定配置文档里的属性 mapred.task.profile.params  来指定 profiler 配置参数。修改属性要使用 api
JobConf.setProfileParams(String)。当运行 task 时,如果字符串包含%s。  它会被替换成 profileing 的输出文件名。这些参
数会在命令行里传递到子 JVM 中。缺省的 profiling  参数是
-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s。
调试
Map/Reduce 框架能够运行用户提供的用于调试的脚本程序。   当 map/reduce 任务失败时,用户可以通过运行脚本在任务日
志(例如任务的标准输出、标准错误、系统日志以及作业配置文件)上做后续处理工作。用户提供的调试脚本程序的标准输
出和标准错误会输出为诊断文件。如果需要的话这些输出结果也可以打印在用户界面上。
在接下来的章节,我们讨论如何与作业一起提交调试脚本。为了提交调试脚本,  首先要把这个脚本分发出去,而且还要在配
置文件里设置。
如何分发脚本文件:
用户要用  DistributedCache  机制来分发和链接脚本文件
如何提交脚本:
一个快速提交调试脚本的方法是分别为需要调试的 map 任务和 reduce 任务设置  "mapred.map.task.debug.script"  和
"mapred.reduce.task.debug.script"  属性的值。这些属性也可以通过  JobConf.setMapDebugScript(String)  和
JobConf.setReduceDebugScript(String) API 来设置。对于 streaming,  可以分别为需要调试的 map 任务和 reduce 任务使
用命令行选项-mapdebug  和  -reducedegug 来提交调试脚本。
脚本的参数是任务的标准输出、标准错误、系统日志以及作业配置文件。在运行 map/reduce 失败的节点上运行调试命令是:
$script $stdout $stderr $syslog $jobconf
Pipes  程序根据第五个参数获得 c++程序名。  因此调试 pipes 程序的命令是
$script $stdout $stderr $syslog $jobconf $program
默认行为
对于 pipes,默认的脚本会用 gdb 处理 core dump,  打印  stack trace 并且给出正在运行线程的信息。
JobControl
JobControl 是一个工具,它封装了一组 Map/Reduce 作业以及他们之间的依赖关系。
数据压缩
Hadoop Map/Reduce 框架为应用程序的写入文件操作提供压缩工具,这些工具可以为 map 输出的中间数据和作业最终输出
数据(例如 reduce 的输出)提供支持。它还附带了一些  CompressionCodec 的实现,比如实现了  zlib 和 lzo 压缩算法。   Hadoop
同样支持 gzip 文件格式。
考虑到性能问题(zlib)以及 Java 类库的缺失(lzo)等因素,Hadoop 也为上述压缩解压算法提供本地库的实现。更多的细
节请参考  这里。
中间输出
应用程序可以通过  JobConf.setCompressMapOutput(boolean)api 控制 map 输出的中间结果,并且可以通过
JobConf.setMapOutputCompressorClass(Class)api 指定  CompressionCodec。
作业输出
应用程序可以通过  FileOutputFormat.setCompressOutput(JobConf, boolean) api 控制输出是否需要压缩并且可以使用
FileOutputFormat.setOutputCompressorClass(JobConf, Class)api 指定 CompressionCodec。
如果作业输出要保存成  SequenceFileOutputFormat 格式,需要使用
SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,来设定
SequenceFile.CompressionType (i.e. RECORD / BLOCK -  默认是 RECORD)。  
例子:WordCount v2.0
这里是一个更全面的 WordCount 例子,它使用了我们已经讨论过的很多 Map/Reduce 框架提供的功能。
运行这个例子需要 HDFS 的某些功能,特别是  DistributedCache 相关功能。因此这个例子只能运行在  伪分布式  或者  完
全分布式模式的  Hadoop 上。
源代码
WordCount.java
1.  package org.myorg;
2.
3.  import java.io.*;
4.  import java.util.*;
5.
6.  import org.apache.hadoop.fs.Path;
7.  import org.apache.hadoop.filecache.DistributedCache;
8.  import org.apache.hadoop.conf.*;
9.  import org.apache.hadoop.io.*;
10.  import org.apache.hadoop.mapred.*;
11.  import org.apache.hadoop.util.*;
12.
13.  public class WordCount extends Configured implements Tool {
14.
15.      public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text,
IntWritable> {
16.
17.        static enum Counters { INPUT_WORDS }
18.  
19.        private final static IntWritable one = new IntWritable(1);
20.        private Text word = new Text();
21.
22.        private boolean caseSensitive = true;
23.        private Set<String> patternsToSkip = new HashSet<String>();
24.
25.        private long numRecords = 0;
26.        private String inputFile;
27.
28.        public void configure(JobConf job) {
29.          caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
30.          inputFile = job.get("map.input.file");
31.
32.          if (job.getBoolean("wordcount.skip.patterns", false)) {
33.            Path[] patternsFiles = new Path[0];
34.            try {
35.              patternsFiles = DistributedCache.getLocalCacheFiles(job);
36.            } catch (IOException ioe) {
37.              System.err.println("Caught exception while getting cached files: " +
StringUtils.stringifyException(ioe));
38.            }
39.            for (Path patternsFile : patternsFiles) {
40.              parseSkipFile(patternsFile);
41.            }
42.          }  
43.        }
44.
45.        private void parseSkipFile(Path patternsFile) {
46.          try {
47.            BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
48.            String pattern = null;
49.            while ((pattern = fis.readLine()) != null) {
50.              patternsToSkip.add(pattern);
51.            }
52.          } catch (IOException ioe) {
53.            System.err.println("Caught exception while parsing the cached file '" + patternsFile
+ "' : " + StringUtils.stringifyException(ioe));
54.          }
55.        }
56.
57.        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable>
output, Reporter reporter) throws IOException {
58.          String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
59.
60.          for (String pattern : patternsToSkip) {
61.            line = line.replaceAll(pattern, "");
62.          }
63.
64.          StringTokenizer tokenizer = new StringTokenizer(line);
65.          while (tokenizer.hasMoreTokens()) {  
66.            word.set(tokenizer.nextToken());
67.            output.collect(word, one);
68.            reporter.incrCounter(Counters.INPUT_WORDS, 1);
69.          }
70.
71.          if ((++numRecords % 100) == 0) {
72.            reporter.setStatus("Finished processing " + numRecords + " records " + "from the input
file: " + inputFile);
73.          }
74.        }
75.      }
76.
77.      public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable,
Text, IntWritable> {
78.        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,
IntWritable> output, Reporter reporter) throws IOException {
79.          int sum = 0;
80.          while (values.hasNext()) {
81.            sum += values.next().get();
82.          }
83.          output.collect(key, new IntWritable(sum));
84.        }
85.      }
86.
87.      public int run(String[] args) throws Exception {
88.        JobConf conf = new JobConf(getConf(), WordCount.class);  
89.        conf.setJobName("wordcount");
90.
91.        conf.setOutputKeyClass(Text.class);
92.        conf.setOutputValueClass(IntWritable.class);
93.
94.        conf.setMapperClass(Map.class);
95.        conf.setCombinerClass(Reduce.class);
96.        conf.setReducerClass(Reduce.class);
97.
98.        conf.setInputFormat(TextInputFormat.class);
99.        conf.setOutputFormat(TextOutputFormat.class);
100.
101.        List<String> other_args = new ArrayList<String>();
102.        for (int i=0; i < args.length; ++i) {
103.          if ("-skip".equals(args[i])) {
104.            DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
105.            conf.setBoolean("wordcount.skip.patterns", true);
106.          } else {
107.            other_args.add(args[i]);
108.          }
109.        }
110.
111.        FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
112.        FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));  
113.
114.        JobClient.runJob(conf);
115.        return 0;
116.      }
117.
118.      public static void main(String[] args) throws Exception {
119.        int res = ToolRunner.run(new Configuration(), new WordCount(), args);
120.        System.exit(res);
121.      }
122.  }
123.
运行样例
输入样例:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World, Bye World!
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.
运行程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input
/usr/joe/wordcount/output
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1
注意此时的输入与第一个版本的不同,输出的结果也有不同。
现在通过 DistributedCache 插入一个模式文件,文件中保存了要被忽略的单词模式。
$ hadoop dfs -cat /user/joe/wordcount/patterns.txt
.
,
!
to
再运行一次,这次使用更多的选项:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount  -Dwordcount.case.sensitive=true
/usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
应该得到这样的输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1
再运行一次,这一次关闭大小写敏感性(case-sensitivity):
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount  -Dwordcount.case.sensitive=false
/usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2
程序要点
通过使用一些 Map/Reduce 框架提供的功能,WordCount 的第二个版本在原始版本基础上有了如下的改进:
  展示了应用程序如何在 Mapper (和 Reducer)中通过 configure 方法  修改配置参数(28-43 行)。
  展示了作业如何使用 DistributedCache  来分发只读数据。  这里允许用户指定单词的模式,在计数时忽略那些
符合模式的单词(104 行)。
  展示 Tool 接口和 GenericOptionsParser 处理 Hadoop 命令行选项的功能  (87-116, 119 行)。
  展示了应用程序如何使用 Counters(68 行),如何通过传递给 map(和 reduce)  方法的 Reporter 实例来设置
应用程序的状态信息(72 行)。
Java 和 JNI 是 Sun Microsystems, Inc.在美国和其它国家的注册商标。

声明: 本文由( 恒镭, 张 )原创编译,转载请保留链接: Hadoop翻译文档(五) Map/Reduce 开发指南

Hadoop翻译文档(五) Map/Reduce 开发指南:等您坐沙发呢!

发表评论




------====== 本站公告 ======------
欢迎关注我的博客。

其他