news 2026/4/18 6:31:19

Hadoop MapReduce 计算框架详解

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Hadoop MapReduce 计算框架详解

一、MapReduce框架概述

1、MapReduce概述

关于MapReduce的简单介绍可以参考官方文档的说明:

Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte>3、MapReduce的核心思想

以大数据场景最为典型的WordCount计算为例,在计算任务需要统计某个文本中各个单词出现的次数。MapReduce框架的典型计算思想是这样的:
整体分为Mapper,Shuffle,和Reduce计算三个阶段。其中Mapper和Reduce是需要应用自行实现的部分,而中间的Shuffle则是由Hadoop来实现,不需要进行管理。

二、官方示例解析

1、官方示例解析

下面从一个官方示例来入手。

  1. 在本地编辑一个简单的文本文件。

    bash

    [root@hadoop01 ]# vi wdtest.txt my name is roy this is my bigdata lesson welcome to my lesson
  2. 使用hdfs dfs -put wdtest.txt /input指令,将这个文件上传到hdfs的input目录下。

  3. 执行以下指令,启动一个官方提供的MapReduce计算任务。

    bash

    [root@hadoop01 ~]# hadoop jar /app/hadoop/hadoop-3.2.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.2.jar wordcount /input /output 2022-01-23 17:22:01,211 INFO client.RMProxy: Connecting to ResourceManager at hadoop02/192.168.65.204:8032 # ---》以下这几行日志就是MapReduce计算的标志性日志 2022-01-23 17:22:03,774 INFO mapreduce.Job: The url to track the job: http://hadoop02:8088/proxy/application_1642929672088_0001/ 2022-01-23 17:22:03,775 INFO mapreduce.Job: Running job: job_1642929672088_0001 ... 2022-01-23 17:22:18,227 INFO mapreduce.Job: map 0% reduce 0% 2022-01-23 17:22:25,431 INFO mapreduce.Job: map 100% reduce 0% 2022-01-23 17:22:32,527 INFO mapreduce.Job: map 100% reduce 100%

    计算结果会输出到hdfs上的Output目录下。(需要启动HDFS和YARN)

  4. 获取源码:可以通过反编译任务jar包,或从Hadoop源码中获取。3.2.2版本Hadoop源码下载地址:https://archive.apache.org/dist/hadoop/common/hadoop-3.2.2/hadoop-3.2.2-src.tar.gz。导入IDEA后,直接搜索WordCount,即可找到任务的入口执行类。重点关注其main方法:

    java

    public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); //寻找执行的jar包 job.setJarByClass(WordCount.class); //Mapper任务类 job.setMapperClass(TokenizerMapper.class); //可选任务,对Mapper的数据结果先进行一次本地合并 job.setCombinerClass(IntSumReducer.class); //Reduce任务类 job.setReducerClass(IntSumReducer.class); //设置输出Key类型 job.setOutputKeyClass(Text.class); //设置输出Value类型 job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }

从这个示例中可以了解到MapReduce计算程序的基础开发过程。例如对Job可以设置哪些额外的属性,对Mapper任务、Reduce任务有个基本的了解。重点是理解Mapper的输出结果和Reduce的输入结果。
注意:输入地址和输出地址是需要作为参数配置进去的。例如/input /output。如果在本地执行,这两个地址都将是本地文件中的地址。如果要访问HDFS,可以配置hdfs://hadoop01:8020/input hdfs://hadoop01:8020/output

2、搭建开发环境

这是一个典型的MapReduce任务。这个任务即可以像之前那样打个包,提交到Hadoop集群中执行,也可以在IDEA直接本地执行。当然本地执行前需要完成Maven编译的整个过程。这个过程很容易出错,所以还是建议单独搭建一个Hadoop的MapReduce测试工程。

先创建一个Maven项目,在pom.xml中引入相关依赖:

xml

<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency>

然后就可以将WordCount相关的类移入到该测试项目中。这个测试项目将作为我们深入理解MapReduce的演示项目以及调试入口。

三、MapReduce计算过程详解

1、Mapper任务的执行流程

对于Map阶段整体流程如下图:

阶段一:客户端提交任务阶段
这一阶段主要做两件事情,一是预先对文件进行切片,二是提交任务信息到HDFS。

  1. 文件切片:客户端会在提交任务前,对待处理的数据(主要是文本)进行预处理,文本数据按照BlockSize大小进行拆分。

    • 关键源码调试点

      • 源码调试从JobwaitForCompletion方法入手。其中submit()方法就是对任务进行提交。

      • 文件切割的关键代码在JobSubmitterwriteNewSplits方法。

      • 客户端通过InputFormat来计算文件分割。其中isSplitable方法判断文件是否可以拆分。对某些压缩过的文件,是不支持拆分的。而getSplits方法实际进行文件拆分。拆分的文件大小,默认与HDFS的Block大小一致。

    • 默认行为:默认会加载TextInputFormat,他是FileInputFormat的一个实现类。对于文本都是可以切割的,所以直接返回true。然后getSplits方法对文件进行拆分时,并不是对文件进行实际切割,只是记录一系列InputSplit对象。其中包含的是每个切分块在文件当中的偏移量。这些文件切割信息最终会生成.split文件上传到任务目录中。

    • 输入格式InputFormat拆分的结果是(K,V)型的键值对。对于FileInputFormat拆分出的结果中value就是每一行文本,而Key就是当前这一行在整个文件中的起始字节偏移量。例如,在MapReduce的执行日志中会看到一行这样的日志:INFO [org.apache.hadoop.mapred.MapTask] - Processing split:hdfs://hadoop01:8020/input/wdtest.txt:0+62这里的062就是文件切割出来的偏移量。

    • 其他InputFormat:还有其他的InputFormat的实现类,可以通过job.setInputFormatClass();方法进行设置。另一种比较常用的实现类是CombineTextInputFormat。他的切片方式就是只按照文件大小切片,不管多少个文件,都会是一个单独的切片,也就会交给一个MapTask进行后续的处理。切片大小可以通过CombineTextInputFormat.setMaxInputSplitSize方法自行指定。这种切片方式适合于有很多小文件的场景。因为如果小文件很多,默认的FileInputFormat实现类就会产生非常多的小分片,后续也就启动非常多的MapTask,对性能是一种浪费。

  2. 提交任务信息:客户端会将任务信息提交到HDFS当中。这个任务信息会在MapReduce任务执行完成后删除。

    • 关键源码调试点

      • JobSubmitter类的submitJobInternal方法中,会获得一个提交任务的地址jobStagingArea,指向一个.stage文件。如果是本地执行,会指向一个file:开头的本地目录,如果是Yarn集群执行,会指向一个hdfs:开头的HDFS地址。

      • 客户端会负责将MapReduce执行的任务文件上传到HDFS中,并通知Runner执行。如果是本地执行,会通知LocalJobRunner。Yarn集群则会通知YarnRunner。在这个地方就完成了客户端与Hadoop集群的任务交接。

      • 客户端提交的.stage文件夹中主要包含三个文件:1)job.split文件,包含目标文件的切割信息。2)job.xml,包含当前任务执行的所有配置属性。3)wc.jar,任务的jar包文件。本地执行时,由于所有的类就在当前JVM中,所以不需要jar包。

  3. 通知Runner执行:客户端将MapReduce执行的任务文件上传到HDFS后,就会通知Runner执行。客户端会通知一个ClientProtocol实现类来执行任务。根据任务执行环境不同,会通知不同的实现类。如果是本地执行,会通知LocalJobRunner。Yarn集群则会通知YarnRunner。而这两个ClientProtocol实现,在接收到任务后,会返回给客户端一个集群状态,然后在构建任务时,就会自动启动任务进行计算。提交完成后,将任务状态调整为JobState.RUNNING。到这一步就完成了客户端的整个任务提交过程。后续客户端可以持续跟踪任务的执行进度,并打印日志。

阶段二:MapTask任务执行阶段
这一阶段会启动多个MapTask,执行Mapper任务。MapTask的数量取决于上一阶段分片的个数。
4.读取数据:MapTask会先将文件读取成K-V类型的元数据,再交由Mapper执行。
*.stage文件夹中的job.xml文件,实际上是在任务执行阶段,由ClientProtocol类在执行阶段生成的。这也比较好理解,只有在执行阶段才会去获取所有的执行参数。
* Job执行过程中,会根据.split文件中的切片个数,启动对应的MapTask。
5.执行Map方法:每一个Block会启动一个MapperTask来处理,并最终通过OutputCollector将分散的处理结果收集起来。
* MapperTask执行map方法时的(K,V)参数,都是由RecordReader对象根据之前的.split文件读取到的内容。RecordReader是一个抽象类,具体的实现子类是由之前的InputFormat实现类的createRecordReader方法产生。默认的TextInputFormat就会产生一个LineRecordReader,这个实现类会一行一行的读取文件。而CombineTextInputFormat对象会产生一个CombineFileRecordReader
6.后续计算:Mapper计算过程中会定时的收集任务完成率,反馈给客户端。

2、Shuffle阶段的执行流程

在Map方法执行之后,到Reduce方法执行之前,还有一段数据处理的过程。这个过程称为Shuffle。这是整个MapReduce最为核心的一段过程。

整个Shuffle的过程如下:

  1. 分区 (Partition):Mapper阶段输出的数据,会经过Partition进行分区,给每个数据标记上他的分区信息。

    • 作用:将数据按照条件输出到不同的文件(分区)当中。这些数据将进入Reduce中处理,所以每个Reduce接收到的数据都是key相同的。

    • 默认实现:分区操作是通过Partitioner接口来实现的。默认加载的是HashPartitioner这个实现类,会按key的hashcode分组,相同的key会分到一个组。但是用户没法控制某一个key具体会分到哪个分区。

    • 自定义分区:用户可以通过实现Partitioner的方式自定义分区实现,然后通过job.setPartitionerClass方法指定。自定义分区时,要注意,生成的分区号必须从0开始,逐一增加,不能有跳过。

  2. 环形缓冲区与溢写:分区后的数据会收集进入到环形缓冲区进行缓存。数据收集到一定数量后,会从缓冲区溢写到磁盘当中。

    • 环形缓冲区:是Hadoop中的一块核心内存,由MapOutputBuffer进行管理。默认大小是100M。他的作用就是对Mapper产生的数据进行缓存,缓存到一定数量后,按照分区,将数据写到磁盘中。环形缓冲区有一个默认的容量阈值capacity,默认是80%

    • 工作机制

      • 环形缓冲区在收集数据时,会从内存块的头部位置开始,同时记录数据的序列化结果(kvbuffer中)和数据的索引信息(kvmeta中)。

      • 当环形缓冲区中的数据容量达到他的容量阈值(80%)后,环形缓冲区就会改为从内存块的尾部位置开始反向记录数据。而原有的前80%内存块中的数据,就会开始往磁盘中进行溢写,并逐步开始释放内存。

      • 同样,当从尾部开始反写的数据容量超过80%后,又会再次反过来从内存头部开始写数据,然后重复之前的过程。内存块循环使用,这也正是环形缓冲区这个名称的来源。

      • 如果数据反向写入的速度比内存溢写的速度快,那么这时数据写入操作就会暂停,等待释放内存。

  3. 排序:溢写出来的数据是按照分区整合到一起的。这时会将各个分区数据进行排序。

    • 从环形缓冲区溢写出来的数据是按照分区进行了划分的。相同分区的数据会写入到同一个文件当中。

    • 刚从环形缓冲区溢写出来的数据,是按分区进行了分组,但是组内并不是有序的。这时会使用快排算法对分区内的数据进行排序。

  4. 合并 (Combine) 与归并

    • Combine操作:是Shuffle过程中的一个可选操作,由job.setCombinerClass方法指定。在环形缓冲区MapOutputBuffer中也会记录Combiner。Combine是对已经局部有序的多个溢写文件按照key再次进行整合排序,这个过程中使用的是归并排序的算法。

    • 归并:归并后的数据文件可以选择进行第二次Combine操作,进行局部合并。形成最终的局部有序的分区数据集合。

    • Combine的作用:只是让数据在进入Reduce处理之前,先进行一下局部数据合并。这样做并不会影响最终的计算结果。其最大的作用在于减少网络传输的数据量,提升计算的性能。

  5. 压缩:分区数据集合可以选择进行压缩,最终保存到磁盘当中,完成整个Shuffle过程。

    • 作用:减少网络传输以及减少存盘文件大小,提升IO。压缩过程主要有两个执行点,一个是Mapper任务输出,另一个是Reduce任务输出。

    • 配置:这两个阶段默认都是不启用压缩的,如果要启用数据压缩,需要在mapred-site.xml中配置参数,例如mapreduce.map.output.compressmapreduce.map.output.compress.codec等。

    • 压缩算法:Hadoop中提供了多种数据压缩的算法,他们都是CompressionCodec接口的实现类。

      • GZip:压缩比率最高,但是不支持Split,压缩解压的速度一般。

      • BZip2:压缩比率也挺高,支持Split,压缩解压速度最慢。

      • Snappy:压缩比率一般,不支持Split,压缩解压的速度最快。

    • 自定义:企业也可以按照Hadoop的接口要求自行定制数据压缩算法。不过通常来说,数据压缩都会带来额外的计算开销,所以如果是计算密集型的任务,就不太建议增加数据压缩。让计算资源更多的投入到业务计算中。

  6. 数据落盘:Shuffle过程完成后,数据只负责落盘到磁盘文件当中。后续由Reduce任务主动来拉取数据。

3、Reduce阶段的执行流程

Reduce阶段主要是会对之前Mapper阶段的任务进行聚合。对于一个MapReduce计算任务来说,其实Reduce也是一个可选的任务阶段。

整体步骤如下:

  1. 启动时机:Reduce通常在Mapper任务执行完成后再启动。在MapReduce的执行日志中,能够跟踪到Mapper任务和Reduce任务的执行进度。通常情况下,Reduce需要收集Mapper输出完的结果,所以需要等所有的Mapper任务执行完成后再启动。但是,Mapreduce框架也会根据计算情况进行优化。在某些Reduce任务所依赖的Mapper任务全部完成了之后,这些Reduce可以提前启动,而不用等待所有的Mapper任务执行完成。

  2. 拉取数据:Reduce主动去磁盘中拉取相关的数据。

    • MapTask的并行度是由切片个数决定的,而ReduceTask的并行度则不同,可以通过setNumReduceTasks方法手动指定,默认是1个。

    • MapTask将数据输出到磁盘,ReduceTask主动去磁盘上远程拷贝一片数据。如果数据大小超过了一定阈值,就会将一部分数据写入到磁盘当中。

    • 排序之后,Reduce会对数据进行分组,将相同key的数据整合到一起。之后才进入用户自定义的Reduce方法中进行聚合。

  3. 合并数据:Reduce对获取到的数据进行合并。

    • ReduceTask在获取到数据之后,会对内存和磁盘中的数据进行合并。将相同key的数据整合在一起,而Value整合成一个数组。

    • 而在合并过程中,为了将相同的key快速合并到一起,ReduceTask还会对数据再进行一次排序,排序之后的数据针对相同的key,局部有序。

  4. 执行Reduce方法与输出:进入用户自定义的reduce()方法。每个Reduce任务各自对数据进行计算,并将收集到的结果输出到最终文件中。

    • 每个Reduce接收到的数据是按key局部有序的,但是多个Reduce任务之间的key并不能保证是有序的。

    • 每个Reduce都将结果输出到自己对应的目标文件中,输出的结果同样是按照key在每个文件内局部有序,整体并不保证有序。

    • Reduce的计算结果,最终交由FileOutputFormat的实现类来输出。默认加载的是TextOutputFormat。他会产生一个LineRecordWriter,将每一个KV对,向目标文件输出一行。用户同样可以使用job.setOutputFormatClass()方法选择其他的结果输出器,也可以定制自定义的结果输出格式。

4、MapReduce流程整体总结

这样,我们将MapReduce任务的各个阶段详细进行拆解后,就形成了MapReduce任务更加详细的整体细节流程图:

接下来我们重点梳理一下有哪些过程是可以由用户参与的。

  1. 输入数据接口:InputFormat

    • 默认使用的实现类是:TextInputFormat

    • TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。

    • CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。

  2. 逻辑处理接口:Mapper
    用户根据业务需求实现其中三个方法:map()setup()cleanup()

  3. Partitioner分区

    • 默认的实现方式是HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode() & Integer.MAXVALUE % numReduces。数据分配均匀,但是由于HashCode是很难预估的,所以预先无法知道具体的分区结果。

    • 如果业务上有特别的需求,可以自定义分区。

  4. Comparable排序
    当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。

  5. Combiner合并
    Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。

  6. 数据压缩算法CompressionCodec
    用户可以定义数据压缩算法,但是需要注意压缩算法是否支持数据分片。

  7. 逻辑处理接口:Reducer
    用户根据业务需求实现其中三个方法:reduce()setup()cleanup()

  8. 输出数据接口:OutputFormat

    • 默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。

    • 用户还可以自定义OutputFormat

四、深入理解

1、数据倾斜问题

数据倾斜是大数据计算中非常常见的一个问题。其原因就是由于数据分布不均匀,可能造成某些partition的数据非常多,而某些partition的数据非常少。分配给不同的ReduceTask处理后,数据多的ReduceTask计算任务就非常繁忙,而数据少的ReduceTask计算任务就非常简单。这对于集群资源其实是一种浪费。体现出来的效果就是Reduce阶段的进展非常不平衡,前期进度比较快,后期进度非常慢。

解决数据倾斜的问题,核心思想还是要调整数据的分区逻辑,让各个分区的数据尽量平均一点。调整的方式主要有两种:

  1. 调整Reduce的数量:这样可以一定程度上分担ReduceTask的计算压力。并且,默认的HashPartitioner实现,在调整了Reduce的数量后,也会对数据重新进行分区。但是这种方法的坏处是,HashPartitioner根据key的hashCode和ReduceTask来计算分区,由于hashCode是很难把握的,所以,并不能保证调整后的分区就一定能够更加平均。可以在简单的WordCount计算过程中调整numReduceTask属性,观察结果:job.setNumReduceTasks(2);

  2. 自定义分区:给MapReduce计算设置一个自定义的Partitioner实现,让数据按照自定义的逻辑进行分区。通过自定义分区,应用程序可以根据数据的具体情况灵活调整分区逻辑,保证数据分区更加平均。另外,自定义分区最终会影响到ReduceTask的输出文件,所以也可以让MapReduce的输出文件更有意义。例如,可以按照英文单词的首个字母分组。首字母相同的单词就会固定输出到同一个文件当中。

2、自定义对象序列化

示例中每个计算步骤,比如MapTask和ReduceTask,他们的输入和输出类型是一些基础类型的数据,如果想要传输一些自定义的数据类型,比如自己的POJO对象,那就需要了解自定义序列化。这也是开发中非常常见的功能。

MapReduce对计算过程中的Key和Value都需要通过泛型进行严格的定义。默认对各种基础数据类型都提供了对应的封装类型(如IntWritable,LongWritable,FloatWritable,DoubleWritable,BooleanWritable,ByteWritable,Text等)。其中String比较特殊一点,对应的封装类是Text

可以看到,MapReduce对于基础数据类型都提供了对应的Writable封装类型。这些封装类型都是实现自WritableComparable接口。

java

public interface WritableComparable<T> extends Writable, Comparable<T> { }

这个接口实现了两个接口,Hadoop定义的Writable接口以及JDK提供的Comparable接口。其中,Comparable接口就是给数据提供比较的功能。这是因为MapReduce计算中需要多次对数据进行排序,而排序就需要有Comparable接口支持。Writable接口则是Hadoop定义的一个序列化实现接口。这个接口的作用就是将数据转换成二进制的数据流,这些数据流才可以在网络上进行传输。

java

public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }

Writable提供了两个方法,write方法定义如何将数据对象写成二进制数据流,readFields定义如何将二进制数据流读取成数据对象。如果我们需要计算自定义的数据对象,那么数据对象就需要自行实现Writable接口的这两个方法。在Writable接口的源码注释当中,就提供了一个简单的示例:

java

public class MyWritable implements Writable { // Some data private int counter; private long timestamp; // Default constructor to allow (de)serialization MyWritable() { } public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public static MyWritable read(DataInput in) throws IOException { MyWritable w = new MyWritable(); w.readFields(in); return w; } }

3、自定义输出文件

MapReduce的输出文件都收Part-xxx这样的格式,他是按照Reduce拆分输出的
结果。这样的输出结果通常没有太多的意义。如果想要自定义输出文件,也可以通
过自定义OutputFormat的方式进行自定义。

4、Hadoop中的RPC框架

如果想要深入Hadoop的源码,从MapReduce计算任务为入口,其实是一个比较好的方法。整个Hadoop的源码相当庞大,并且有很多的内部任务,所以通常很难找到调试代码的主线。而从MapReduce计算任务的源码开始调试,就比较容易找到主线。

在跟踪Hadoop源码的时候,他的RPC框架需要提前了解下,要不然Hadoop集群内部的大量RPC请求会非常难以跟踪。而Hadoop基于Netty实现了一套非常简洁小巧的RPC框架。这个小框架,脱离Hadoop框架也是很有借鉴意义的。

使用Hadoop的RPC框架,只需要引入Hadoop的common模块即可。引入如下hadoop-client的maven依赖就会包含common模块。

xml

<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.2</version> </dependency>

然后,Hadoop的RPC框架整体由三个部分构成:服务端,客户端,协议。
Hadoop中的协议层是一个非常简单灵活的层面,只需要定义一个带版本的接口以及服务实现类即可。

java

public interface ClientNamenodeProtocol { public long versionID = 1L; public String getMetaData(String path); } //模拟NameNode,根据文件路径,返回文件所在的Block位置 public class MyNamenode implements ClientNamenodeProtocol { @Override public String getMetaData(String path) { return path + ": 3 - {BLK_1,BLK_2} ......"; } }

Hadoop中的协议是很自由的接口。内部提供了一个VersionedProtocol,但是并不强制要求继承。

在服务端,就可以根据这个协议接口和实现类,直接创建一个服务端。

java

public class RPCServer { public static void main(String[] args) throws IOException { final RPC.Builder builder = new RPC.Builder(new Configuration()); builder.setBindAddress("localhost"); builder.setPort(8888); builder.setProtocol(ClientNamenodeProtocol.class) .setInstance(new MyNamenode()); final RPC.Server server = builder.build(); server.start(); } }

启动后,就会绑定本机的8888端口,等待客户端响应。而客户端就可以直接请求协议中对应的方法,调用服务端的实现类。

java

public class RPCClient { public static void main(String[] args) throws IOException { final ClientNamenodeProtocol proxy = RPC.getProxy(ClientNamenodeProtocol.class, 1L, new InetSocketAddress("localhost", 8888), new Configuration()); final String metaData = proxy.getMetaData("/myNameNode"); System.out.println(metaData); } }

这个RPC框架在Hadoop底层应用非常广泛,例如在NameNode上,就会启动一个RPC服务端,来接收DataNode的心跳请求。在NameNodeRpcServer.java源码中,在他的构造方法中就有这样一段源码:

java

serviceRpcServer = new RPC.Builder(conf) .setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()) .setNumHandlers(serviceHandlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build();

5、Hadoop Streaming

Hadoop Streaming是Hadoop新推出的一个工具集。这个工具集并不是提供流式计算的功能,而是允许以命令行方式代替千篇一律的Driver代码。通过Streaming工具可以简化MapReduce任务的部署。

其实从之前的编码过程可以看到,对于一个完整的MapReduce计算任务,重要的业务逻辑都是由Mapper、Reducer、Partitioner、OutputFormat等这些组件来提供的,而启动任务的Driver其实只是相当于一个组件粘合剂,并没有实际的业务逻辑。Hadoop Streaming就允许以命令行的方式来启动一个Driver。这样可以减少提交任务的Jar包大小(因为Driver类就不需要了)。并且,通过Hadoop Streaming工具,可以扩充各个功能组件的具体实现。Linux的指令工具以及Python语言开发的组件也都可以自由组合到MapReduce计算任务当中。不过,企业一般都是习惯于用一个完整jar包的方式来提交任务,所以这个工具其实用得并不是很多。这里就只做简单的介绍。

5.1 零开发完成一个WordCount计算。

直接执行以下指令,就可以启动一个MapReduce任务。

bash

[root@hadoop01 ~]# mapred streaming \ > -input /input2 \ > -output /output2 \ > -mapper /bin/cat \ > -reducer /usr/bin/wc

这个指令就会启动一个MapReduce计算,统计HDFS上/input2目录下的文件,输出到/output2目录。其中mapper直接使用Linux的cat指令完成,而reduce直接使用Linux的wc指令完成,所以统计完成的输出结果是这样的:36 620 4228。这表示/input2下面的一个文件,行数为36、单词数620、字节数4228。

5.2 Hadoop Streaming指令介绍

Hadoop Streaming的基础指令格式是这样的:

bash

mapred streaming [genericOptions] [streamingOptions]

其中genericOptions表示一些任务相关的配置信息。主要有以下几个配置:

ParameterOptional/RequiredDescription
-conf configuration_fileOptionalSpecify an application configuration file
-D property=valueOptionalUse value for given property
-fs host:port or localOptionalSpecify a namenode
-filesOptionalSpecify comma-separated files to be copied to the Map/Reduce cluster
-libjarsOptionalSpecify comma-separated jar files to include in the classpath
-archivesOptionalSpecify comma-separated archives to be unarchived on the compute machines

例如-D mapreduce.job.reduces = 0指定reduceTask个数。-files允许访问HDFS上的文件。-archives可以引入HDFS上的一个任务jar包(就不用提交到Hadoop了。)

streamingOptions则是一组具体的执行指令。streamingOptions需要在genericOptions之后指定。可选的指令如下:

ParameterOptional/RequiredDescription
-input directoryname or filenameRequiredInput location for mapper
-output directorynameRequiredOutput location for reducer
-mapper executable or JavaClassNameOptionalMapper executable. If not specified, IdentityMapper is used as the default
-reducer executable or JavaClassNameOptionalReducer executable. If not specified, IdentityReducer is used as the default
... (其他参数详见文档表格)

其实对照Driver的实现过程,很多的配置都一目了然。像combinerpartitionermapperreducer等。与Driver的不同之处在于,这些功能组件都可以指定shell指令或者python脚本。
例如:以下指令可以引入一个Parititoner。而这个Partition类所在的jar包,即可以上传到Hadoop的安装目录下,也可以上传到HDFS中,通过-archives引入。

bash

mapred streaming \ -D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -D map.output.key.field.separator=. \ -D mapreduce.partition.keypartitioner.options=-k1,2 \ -D mapreduce.job.reduces=12 \ -input /input \ -output /output3 \ -mapper /bin/cat \ -reducer /bin/cat \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 6:24:39

好写作AI:当推荐算法成为你的“学术红娘”,精准配对每一段灵感

如果你也曾被写作助手推荐过&#xff1a;研究量子物理时收到“唐诗赏析模板”&#xff0c;写实证论文时被推送“抒情散文技巧”——别慌&#xff0c;这只是算法在“乱点鸳鸯谱”。而现在&#xff0c;好写作AI的个性化推荐&#xff0c;正在上演“精准相亲”大戏。深夜的文科院系…

作者头像 李华
网站建设 2026/4/18 6:31:08

UDS物联网网关是什么?有什么功能?

一、UDS车载物联网网关概述UDS&#xff08;Unified Diagnostic Services&#xff0c;统一诊断服务&#xff09;车载物联网网关是一种基于汽车行业标准协议UDS的车载智能通信设备。它作为车辆内部网络与外部物联网平台之间的“智能翻译官”&#xff0c;专门用于连接、管理和监控…

作者头像 李华
网站建设 2026/4/18 6:31:10

摆脱论文困扰! 降AI率平台 千笔 VS 知文AI 面向自考学生

在AI技术迅猛发展的今天&#xff0c;越来越多的学生开始借助AI工具辅助论文写作&#xff0c;以提高效率、优化内容。然而&#xff0c;随着学术审查标准的不断提升&#xff0c;AI生成内容的痕迹越来越容易被查重系统识别&#xff0c;导致论文面临“AI率超标”的风险。许多学生在…

作者头像 李华
网站建设 2026/4/17 13:43:14

服务监控与告警体系搭建

服务监控与告警体系搭建一、简述1. prometheus2. grafana3. Alertmanager4. Exporters二、快速部署三、常用Exporters1. node-exporter2. MySQL Exporter3. Elasticsearch Exporter4. MongoDB Exporter5. RocketMQ Exporter6. Redis Exporter7. MinIO8. Milvus四、自定义Export…

作者头像 李华
网站建设 2026/4/10 12:15:34

SeqGPT-560M企业级运维:Prometheus指标采集、Grafana看板、告警阈值设置

SeqGPT-560M企业级运维&#xff1a;Prometheus指标采集、Grafana看板、告警阈值设置 1. 引言&#xff1a;从模型到服务&#xff0c;运维监控的必然之路 你刚刚部署了SeqGPT-560M&#xff0c;这个阿里达摩院推出的零样本文本理解模型确实好用——无需训练就能完成文本分类和信…

作者头像 李华