一、MapReduce框架概述
1、MapReduce概述
关于MapReduce的简单介绍可以参考官方文档的说明:
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte>3、MapReduce的核心思想
以大数据场景最为典型的WordCount计算为例,在计算任务需要统计某个文本中各个单词出现的次数。MapReduce框架的典型计算思想是这样的:
整体分为Mapper,Shuffle,和Reduce计算三个阶段。其中Mapper和Reduce是需要应用自行实现的部分,而中间的Shuffle则是由Hadoop来实现,不需要进行管理。
二、官方示例解析
1、官方示例解析
下面从一个官方示例来入手。
在本地编辑一个简单的文本文件。
bash
[root@hadoop01 ]# vi wdtest.txt my name is roy this is my bigdata lesson welcome to my lesson使用
hdfs dfs -put wdtest.txt /input指令,将这个文件上传到hdfs的input目录下。执行以下指令,启动一个官方提供的MapReduce计算任务。
bash
[root@hadoop01 ~]# hadoop jar /app/hadoop/hadoop-3.2.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.2.jar wordcount /input /output 2022-01-23 17:22:01,211 INFO client.RMProxy: Connecting to ResourceManager at hadoop02/192.168.65.204:8032 # ---》以下这几行日志就是MapReduce计算的标志性日志 2022-01-23 17:22:03,774 INFO mapreduce.Job: The url to track the job: http://hadoop02:8088/proxy/application_1642929672088_0001/ 2022-01-23 17:22:03,775 INFO mapreduce.Job: Running job: job_1642929672088_0001 ... 2022-01-23 17:22:18,227 INFO mapreduce.Job: map 0% reduce 0% 2022-01-23 17:22:25,431 INFO mapreduce.Job: map 100% reduce 0% 2022-01-23 17:22:32,527 INFO mapreduce.Job: map 100% reduce 100%计算结果会输出到hdfs上的Output目录下。(需要启动HDFS和YARN)
获取源码:可以通过反编译任务jar包,或从Hadoop源码中获取。3.2.2版本Hadoop源码下载地址:
https://archive.apache.org/dist/hadoop/common/hadoop-3.2.2/hadoop-3.2.2-src.tar.gz。导入IDEA后,直接搜索WordCount,即可找到任务的入口执行类。重点关注其main方法:java
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); //寻找执行的jar包 job.setJarByClass(WordCount.class); //Mapper任务类 job.setMapperClass(TokenizerMapper.class); //可选任务,对Mapper的数据结果先进行一次本地合并 job.setCombinerClass(IntSumReducer.class); //Reduce任务类 job.setReducerClass(IntSumReducer.class); //设置输出Key类型 job.setOutputKeyClass(Text.class); //设置输出Value类型 job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }从这个示例中可以了解到MapReduce计算程序的基础开发过程。例如对Job可以设置哪些额外的属性,对Mapper任务、Reduce任务有个基本的了解。重点是理解Mapper的输出结果和Reduce的输入结果。
注意:输入地址和输出地址是需要作为参数配置进去的。例如/input /output。如果在本地执行,这两个地址都将是本地文件中的地址。如果要访问HDFS,可以配置hdfs://hadoop01:8020/input hdfs://hadoop01:8020/output。2、搭建开发环境
这是一个典型的MapReduce任务。这个任务即可以像之前那样打个包,提交到Hadoop集群中执行,也可以在IDEA直接本地执行。当然本地执行前需要完成Maven编译的整个过程。这个过程很容易出错,所以还是建议单独搭建一个Hadoop的MapReduce测试工程。
先创建一个Maven项目,在
pom.xml中引入相关依赖:xml
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency>然后就可以将WordCount相关的类移入到该测试项目中。这个测试项目将作为我们深入理解MapReduce的演示项目以及调试入口。
三、MapReduce计算过程详解
1、Mapper任务的执行流程
对于Map阶段整体流程如下图:
阶段一:客户端提交任务阶段
这一阶段主要做两件事情,一是预先对文件进行切片,二是提交任务信息到HDFS。
文件切片:客户端会在提交任务前,对待处理的数据(主要是文本)进行预处理,文本数据按照BlockSize大小进行拆分。
关键源码调试点:
源码调试从
Job的waitForCompletion方法入手。其中submit()方法就是对任务进行提交。文件切割的关键代码在
JobSubmitter的writeNewSplits方法。客户端通过
InputFormat来计算文件分割。其中isSplitable方法判断文件是否可以拆分。对某些压缩过的文件,是不支持拆分的。而getSplits方法实际进行文件拆分。拆分的文件大小,默认与HDFS的Block大小一致。默认行为:默认会加载
TextInputFormat,他是FileInputFormat的一个实现类。对于文本都是可以切割的,所以直接返回true。然后getSplits方法对文件进行拆分时,并不是对文件进行实际切割,只是记录一系列InputSplit对象。其中包含的是每个切分块在文件当中的偏移量。这些文件切割信息最终会生成.split文件上传到任务目录中。输入格式:
InputFormat拆分的结果是(K,V)型的键值对。对于FileInputFormat拆分出的结果中value就是每一行文本,而Key就是当前这一行在整个文件中的起始字节偏移量。例如,在MapReduce的执行日志中会看到一行这样的日志:INFO [org.apache.hadoop.mapred.MapTask] - Processing split:hdfs://hadoop01:8020/input/wdtest.txt:0+62这里的0和62就是文件切割出来的偏移量。其他InputFormat:还有其他的
InputFormat的实现类,可以通过job.setInputFormatClass();方法进行设置。另一种比较常用的实现类是CombineTextInputFormat。他的切片方式就是只按照文件大小切片,不管多少个文件,都会是一个单独的切片,也就会交给一个MapTask进行后续的处理。切片大小可以通过CombineTextInputFormat.setMaxInputSplitSize方法自行指定。这种切片方式适合于有很多小文件的场景。因为如果小文件很多,默认的FileInputFormat实现类就会产生非常多的小分片,后续也就启动非常多的MapTask,对性能是一种浪费。提交任务信息:客户端会将任务信息提交到HDFS当中。这个任务信息会在MapReduce任务执行完成后删除。
关键源码调试点:
在
JobSubmitter类的submitJobInternal方法中,会获得一个提交任务的地址jobStagingArea,指向一个.stage文件。如果是本地执行,会指向一个file:开头的本地目录,如果是Yarn集群执行,会指向一个hdfs:开头的HDFS地址。客户端会负责将MapReduce执行的任务文件上传到HDFS中,并通知Runner执行。如果是本地执行,会通知
LocalJobRunner。Yarn集群则会通知YarnRunner。在这个地方就完成了客户端与Hadoop集群的任务交接。客户端提交的
.stage文件夹中主要包含三个文件:1)job.split文件,包含目标文件的切割信息。2)job.xml,包含当前任务执行的所有配置属性。3)wc.jar,任务的jar包文件。本地执行时,由于所有的类就在当前JVM中,所以不需要jar包。通知Runner执行:客户端将MapReduce执行的任务文件上传到HDFS后,就会通知Runner执行。客户端会通知一个
ClientProtocol实现类来执行任务。根据任务执行环境不同,会通知不同的实现类。如果是本地执行,会通知LocalJobRunner。Yarn集群则会通知YarnRunner。而这两个ClientProtocol实现,在接收到任务后,会返回给客户端一个集群状态,然后在构建任务时,就会自动启动任务进行计算。提交完成后,将任务状态调整为JobState.RUNNING。到这一步就完成了客户端的整个任务提交过程。后续客户端可以持续跟踪任务的执行进度,并打印日志。阶段二:MapTask任务执行阶段
这一阶段会启动多个MapTask,执行Mapper任务。MapTask的数量取决于上一阶段分片的个数。
4.读取数据:MapTask会先将文件读取成K-V类型的元数据,再交由Mapper执行。
*.stage文件夹中的job.xml文件,实际上是在任务执行阶段,由ClientProtocol类在执行阶段生成的。这也比较好理解,只有在执行阶段才会去获取所有的执行参数。
* Job执行过程中,会根据.split文件中的切片个数,启动对应的MapTask。
5.执行Map方法:每一个Block会启动一个MapperTask来处理,并最终通过OutputCollector将分散的处理结果收集起来。
* MapperTask执行map方法时的(K,V)参数,都是由RecordReader对象根据之前的.split文件读取到的内容。RecordReader是一个抽象类,具体的实现子类是由之前的InputFormat实现类的createRecordReader方法产生。默认的TextInputFormat就会产生一个LineRecordReader,这个实现类会一行一行的读取文件。而CombineTextInputFormat对象会产生一个CombineFileRecordReader。
6.后续计算:Mapper计算过程中会定时的收集任务完成率,反馈给客户端。2、Shuffle阶段的执行流程
在Map方法执行之后,到Reduce方法执行之前,还有一段数据处理的过程。这个过程称为Shuffle。这是整个MapReduce最为核心的一段过程。
整个Shuffle的过程如下:
分区 (Partition):Mapper阶段输出的数据,会经过Partition进行分区,给每个数据标记上他的分区信息。
作用:将数据按照条件输出到不同的文件(分区)当中。这些数据将进入Reduce中处理,所以每个Reduce接收到的数据都是key相同的。
默认实现:分区操作是通过
Partitioner接口来实现的。默认加载的是HashPartitioner这个实现类,会按key的hashcode分组,相同的key会分到一个组。但是用户没法控制某一个key具体会分到哪个分区。自定义分区:用户可以通过实现
Partitioner的方式自定义分区实现,然后通过job.setPartitionerClass方法指定。自定义分区时,要注意,生成的分区号必须从0开始,逐一增加,不能有跳过。环形缓冲区与溢写:分区后的数据会收集进入到环形缓冲区进行缓存。数据收集到一定数量后,会从缓冲区溢写到磁盘当中。
环形缓冲区:是Hadoop中的一块核心内存,由
MapOutputBuffer进行管理。默认大小是100M。他的作用就是对Mapper产生的数据进行缓存,缓存到一定数量后,按照分区,将数据写到磁盘中。环形缓冲区有一个默认的容量阈值capacity,默认是80%。工作机制:
环形缓冲区在收集数据时,会从内存块的头部位置开始,同时记录数据的序列化结果(
kvbuffer中)和数据的索引信息(kvmeta中)。当环形缓冲区中的数据容量达到他的容量阈值
(80%)后,环形缓冲区就会改为从内存块的尾部位置开始反向记录数据。而原有的前80%内存块中的数据,就会开始往磁盘中进行溢写,并逐步开始释放内存。同样,当从尾部开始反写的数据容量超过
80%后,又会再次反过来从内存头部开始写数据,然后重复之前的过程。内存块循环使用,这也正是环形缓冲区这个名称的来源。如果数据反向写入的速度比内存溢写的速度快,那么这时数据写入操作就会暂停,等待释放内存。
排序:溢写出来的数据是按照分区整合到一起的。这时会将各个分区数据进行排序。
从环形缓冲区溢写出来的数据是按照分区进行了划分的。相同分区的数据会写入到同一个文件当中。
刚从环形缓冲区溢写出来的数据,是按分区进行了分组,但是组内并不是有序的。这时会使用快排算法对分区内的数据进行排序。
合并 (Combine) 与归并:
Combine操作:是Shuffle过程中的一个可选操作,由
job.setCombinerClass方法指定。在环形缓冲区MapOutputBuffer中也会记录Combiner。Combine是对已经局部有序的多个溢写文件按照key再次进行整合排序,这个过程中使用的是归并排序的算法。归并:归并后的数据文件可以选择进行第二次Combine操作,进行局部合并。形成最终的局部有序的分区数据集合。
Combine的作用:只是让数据在进入Reduce处理之前,先进行一下局部数据合并。这样做并不会影响最终的计算结果。其最大的作用在于减少网络传输的数据量,提升计算的性能。
压缩:分区数据集合可以选择进行压缩,最终保存到磁盘当中,完成整个Shuffle过程。
作用:减少网络传输以及减少存盘文件大小,提升IO。压缩过程主要有两个执行点,一个是Mapper任务输出,另一个是Reduce任务输出。
配置:这两个阶段默认都是不启用压缩的,如果要启用数据压缩,需要在
mapred-site.xml中配置参数,例如mapreduce.map.output.compress、mapreduce.map.output.compress.codec等。压缩算法:Hadoop中提供了多种数据压缩的算法,他们都是
CompressionCodec接口的实现类。
GZip:压缩比率最高,但是不支持Split,压缩解压的速度一般。
BZip2:压缩比率也挺高,支持Split,压缩解压速度最慢。
Snappy:压缩比率一般,不支持Split,压缩解压的速度最快。
自定义:企业也可以按照Hadoop的接口要求自行定制数据压缩算法。不过通常来说,数据压缩都会带来额外的计算开销,所以如果是计算密集型的任务,就不太建议增加数据压缩。让计算资源更多的投入到业务计算中。
数据落盘:Shuffle过程完成后,数据只负责落盘到磁盘文件当中。后续由Reduce任务主动来拉取数据。
3、Reduce阶段的执行流程
Reduce阶段主要是会对之前Mapper阶段的任务进行聚合。对于一个MapReduce计算任务来说,其实Reduce也是一个可选的任务阶段。
整体步骤如下:
启动时机:Reduce通常在Mapper任务执行完成后再启动。在MapReduce的执行日志中,能够跟踪到Mapper任务和Reduce任务的执行进度。通常情况下,Reduce需要收集Mapper输出完的结果,所以需要等所有的Mapper任务执行完成后再启动。但是,Mapreduce框架也会根据计算情况进行优化。在某些Reduce任务所依赖的Mapper任务全部完成了之后,这些Reduce可以提前启动,而不用等待所有的Mapper任务执行完成。
拉取数据:Reduce主动去磁盘中拉取相关的数据。
MapTask的并行度是由切片个数决定的,而ReduceTask的并行度则不同,可以通过
setNumReduceTasks方法手动指定,默认是1个。MapTask将数据输出到磁盘,ReduceTask主动去磁盘上远程拷贝一片数据。如果数据大小超过了一定阈值,就会将一部分数据写入到磁盘当中。
排序之后,Reduce会对数据进行分组,将相同key的数据整合到一起。之后才进入用户自定义的Reduce方法中进行聚合。
合并数据:Reduce对获取到的数据进行合并。
ReduceTask在获取到数据之后,会对内存和磁盘中的数据进行合并。将相同key的数据整合在一起,而Value整合成一个数组。
而在合并过程中,为了将相同的key快速合并到一起,ReduceTask还会对数据再进行一次排序,排序之后的数据针对相同的key,局部有序。
执行Reduce方法与输出:进入用户自定义的
reduce()方法。每个Reduce任务各自对数据进行计算,并将收集到的结果输出到最终文件中。
每个Reduce接收到的数据是按key局部有序的,但是多个Reduce任务之间的key并不能保证是有序的。
每个Reduce都将结果输出到自己对应的目标文件中,输出的结果同样是按照key在每个文件内局部有序,整体并不保证有序。
Reduce的计算结果,最终交由
FileOutputFormat的实现类来输出。默认加载的是TextOutputFormat。他会产生一个LineRecordWriter,将每一个KV对,向目标文件输出一行。用户同样可以使用job.setOutputFormatClass()方法选择其他的结果输出器,也可以定制自定义的结果输出格式。4、MapReduce流程整体总结
这样,我们将MapReduce任务的各个阶段详细进行拆解后,就形成了MapReduce任务更加详细的整体细节流程图:
接下来我们重点梳理一下有哪些过程是可以由用户参与的。
输入数据接口:InputFormat
默认使用的实现类是:
TextInputFormat
TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map(),setup(),cleanup()Partitioner分区
默认的实现方式是
HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode() & Integer.MAXVALUE % numReduces。数据分配均匀,但是由于HashCode是很难预估的,所以预先无法知道具体的分区结果。如果业务上有特别的需求,可以自定义分区。
Comparable排序
当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。Combiner合并
Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。数据压缩算法CompressionCodec
用户可以定义数据压缩算法,但是需要注意压缩算法是否支持数据分片。逻辑处理接口:Reducer
用户根据业务需求实现其中三个方法:reduce(),setup(),cleanup()输出数据接口:OutputFormat
默认实现类是
TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。用户还可以自定义
OutputFormat。四、深入理解
1、数据倾斜问题
数据倾斜是大数据计算中非常常见的一个问题。其原因就是由于数据分布不均匀,可能造成某些partition的数据非常多,而某些partition的数据非常少。分配给不同的ReduceTask处理后,数据多的ReduceTask计算任务就非常繁忙,而数据少的ReduceTask计算任务就非常简单。这对于集群资源其实是一种浪费。体现出来的效果就是Reduce阶段的进展非常不平衡,前期进度比较快,后期进度非常慢。
解决数据倾斜的问题,核心思想还是要调整数据的分区逻辑,让各个分区的数据尽量平均一点。调整的方式主要有两种:
调整Reduce的数量:这样可以一定程度上分担ReduceTask的计算压力。并且,默认的
HashPartitioner实现,在调整了Reduce的数量后,也会对数据重新进行分区。但是这种方法的坏处是,HashPartitioner根据key的hashCode和ReduceTask来计算分区,由于hashCode是很难把握的,所以,并不能保证调整后的分区就一定能够更加平均。可以在简单的WordCount计算过程中调整numReduceTask属性,观察结果:job.setNumReduceTasks(2);自定义分区:给MapReduce计算设置一个自定义的
Partitioner实现,让数据按照自定义的逻辑进行分区。通过自定义分区,应用程序可以根据数据的具体情况灵活调整分区逻辑,保证数据分区更加平均。另外,自定义分区最终会影响到ReduceTask的输出文件,所以也可以让MapReduce的输出文件更有意义。例如,可以按照英文单词的首个字母分组。首字母相同的单词就会固定输出到同一个文件当中。2、自定义对象序列化
示例中每个计算步骤,比如MapTask和ReduceTask,他们的输入和输出类型是一些基础类型的数据,如果想要传输一些自定义的数据类型,比如自己的POJO对象,那就需要了解自定义序列化。这也是开发中非常常见的功能。
MapReduce对计算过程中的Key和Value都需要通过泛型进行严格的定义。默认对各种基础数据类型都提供了对应的封装类型(如
IntWritable,LongWritable,FloatWritable,DoubleWritable,BooleanWritable,ByteWritable,Text等)。其中String比较特殊一点,对应的封装类是Text。可以看到,MapReduce对于基础数据类型都提供了对应的
Writable封装类型。这些封装类型都是实现自WritableComparable接口。java
public interface WritableComparable<T> extends Writable, Comparable<T> { }这个接口实现了两个接口,Hadoop定义的
Writable接口以及JDK提供的Comparable接口。其中,Comparable接口就是给数据提供比较的功能。这是因为MapReduce计算中需要多次对数据进行排序,而排序就需要有Comparable接口支持。Writable接口则是Hadoop定义的一个序列化实现接口。这个接口的作用就是将数据转换成二进制的数据流,这些数据流才可以在网络上进行传输。java
public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
Writable提供了两个方法,write方法定义如何将数据对象写成二进制数据流,readFields定义如何将二进制数据流读取成数据对象。如果我们需要计算自定义的数据对象,那么数据对象就需要自行实现Writable接口的这两个方法。在Writable接口的源码注释当中,就提供了一个简单的示例:java
public class MyWritable implements Writable { // Some data private int counter; private long timestamp; // Default constructor to allow (de)serialization MyWritable() { } public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public static MyWritable read(DataInput in) throws IOException { MyWritable w = new MyWritable(); w.readFields(in); return w; } }3、自定义输出文件
MapReduce的输出文件都收Part-xxx这样的格式,他是按照Reduce拆分输出的
结果。这样的输出结果通常没有太多的意义。如果想要自定义输出文件,也可以通
过自定义OutputFormat的方式进行自定义。4、Hadoop中的RPC框架
如果想要深入Hadoop的源码,从MapReduce计算任务为入口,其实是一个比较好的方法。整个Hadoop的源码相当庞大,并且有很多的内部任务,所以通常很难找到调试代码的主线。而从MapReduce计算任务的源码开始调试,就比较容易找到主线。
在跟踪Hadoop源码的时候,他的RPC框架需要提前了解下,要不然Hadoop集群内部的大量RPC请求会非常难以跟踪。而Hadoop基于Netty实现了一套非常简洁小巧的RPC框架。这个小框架,脱离Hadoop框架也是很有借鉴意义的。
使用Hadoop的RPC框架,只需要引入Hadoop的
common模块即可。引入如下hadoop-client的maven依赖就会包含common模块。xml
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.2</version> </dependency>然后,Hadoop的RPC框架整体由三个部分构成:服务端,客户端,协议。
Hadoop中的协议层是一个非常简单灵活的层面,只需要定义一个带版本的接口以及服务实现类即可。java
public interface ClientNamenodeProtocol { public long versionID = 1L; public String getMetaData(String path); } //模拟NameNode,根据文件路径,返回文件所在的Block位置 public class MyNamenode implements ClientNamenodeProtocol { @Override public String getMetaData(String path) { return path + ": 3 - {BLK_1,BLK_2} ......"; } }Hadoop中的协议是很自由的接口。内部提供了一个
VersionedProtocol,但是并不强制要求继承。在服务端,就可以根据这个协议接口和实现类,直接创建一个服务端。
java
public class RPCServer { public static void main(String[] args) throws IOException { final RPC.Builder builder = new RPC.Builder(new Configuration()); builder.setBindAddress("localhost"); builder.setPort(8888); builder.setProtocol(ClientNamenodeProtocol.class) .setInstance(new MyNamenode()); final RPC.Server server = builder.build(); server.start(); } }启动后,就会绑定本机的8888端口,等待客户端响应。而客户端就可以直接请求协议中对应的方法,调用服务端的实现类。
java
public class RPCClient { public static void main(String[] args) throws IOException { final ClientNamenodeProtocol proxy = RPC.getProxy(ClientNamenodeProtocol.class, 1L, new InetSocketAddress("localhost", 8888), new Configuration()); final String metaData = proxy.getMetaData("/myNameNode"); System.out.println(metaData); } }这个RPC框架在Hadoop底层应用非常广泛,例如在NameNode上,就会启动一个RPC服务端,来接收DataNode的心跳请求。在
NameNodeRpcServer.java源码中,在他的构造方法中就有这样一段源码:java
serviceRpcServer = new RPC.Builder(conf) .setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()) .setNumHandlers(serviceHandlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build();5、Hadoop Streaming
Hadoop Streaming是Hadoop新推出的一个工具集。这个工具集并不是提供流式计算的功能,而是允许以命令行方式代替千篇一律的Driver代码。通过Streaming工具可以简化MapReduce任务的部署。
其实从之前的编码过程可以看到,对于一个完整的MapReduce计算任务,重要的业务逻辑都是由Mapper、Reducer、Partitioner、OutputFormat等这些组件来提供的,而启动任务的Driver其实只是相当于一个组件粘合剂,并没有实际的业务逻辑。Hadoop Streaming就允许以命令行的方式来启动一个Driver。这样可以减少提交任务的Jar包大小(因为Driver类就不需要了)。并且,通过Hadoop Streaming工具,可以扩充各个功能组件的具体实现。Linux的指令工具以及Python语言开发的组件也都可以自由组合到MapReduce计算任务当中。不过,企业一般都是习惯于用一个完整jar包的方式来提交任务,所以这个工具其实用得并不是很多。这里就只做简单的介绍。
5.1 零开发完成一个WordCount计算。
直接执行以下指令,就可以启动一个MapReduce任务。
bash
[root@hadoop01 ~]# mapred streaming \ > -input /input2 \ > -output /output2 \ > -mapper /bin/cat \ > -reducer /usr/bin/wc这个指令就会启动一个MapReduce计算,统计HDFS上
/input2目录下的文件,输出到/output2目录。其中mapper直接使用Linux的cat指令完成,而reduce直接使用Linux的wc指令完成,所以统计完成的输出结果是这样的:36 620 4228。这表示/input2下面的一个文件,行数为36、单词数620、字节数4228。5.2 Hadoop Streaming指令介绍
Hadoop Streaming的基础指令格式是这样的:
bash
mapred streaming [genericOptions] [streamingOptions]其中
genericOptions表示一些任务相关的配置信息。主要有以下几个配置:
Parameter Optional/Required Description -conf configuration_fileOptional Specify an application configuration file -D property=valueOptional Use value for given property -fs host:port or localOptional Specify a namenode -filesOptional Specify comma-separated files to be copied to the Map/Reduce cluster -libjarsOptional Specify comma-separated jar files to include in the classpath -archivesOptional Specify comma-separated archives to be unarchived on the compute machines 例如
-D mapreduce.job.reduces = 0指定reduceTask个数。-files允许访问HDFS上的文件。-archives可以引入HDFS上的一个任务jar包(就不用提交到Hadoop了。)
streamingOptions则是一组具体的执行指令。streamingOptions需要在genericOptions之后指定。可选的指令如下:
Parameter Optional/Required Description -input directoryname or filenameRequired Input location for mapper -output directorynameRequired Output location for reducer -mapper executable or JavaClassNameOptional Mapper executable. If not specified, IdentityMapper is used as the default -reducer executable or JavaClassNameOptional Reducer executable. If not specified, IdentityReducer is used as the default ... (其他参数详见文档表格) 其实对照Driver的实现过程,很多的配置都一目了然。像
combiner,partitioner,mapper,reducer等。与Driver的不同之处在于,这些功能组件都可以指定shell指令或者python脚本。
例如:以下指令可以引入一个Parititoner。而这个Partition类所在的jar包,即可以上传到Hadoop的安装目录下,也可以上传到HDFS中,通过-archives引入。bash
mapred streaming \ -D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -D map.output.key.field.separator=. \ -D mapreduce.partition.keypartitioner.options=-k1,2 \ -D mapreduce.job.reduces=12 \ -input /input \ -output /output3 \ -mapper /bin/cat \ -reducer /bin/cat \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner