1. 主页 > 史诗殿堂 >

2,MapReduce原理及源码解读

MapReduce原理及源码解读

目录MapReduce原理及源码解读一、分片灵魂拷问:为什么要分片?1.1 对谁分片1.2 长度是否为01.3 是否可以分片1.4 分片的大小1.5 开始分片1.6 分片后读取会不会断行二、Map阶段2.1 实例化Mapper2.2 调用map()方法三、Shuffle阶段灵魂拷问:哪来的Shuffle?3.1 shuffle的概念3.2 Map端Shuffle3.2.1 分区(partition)3.2.2 写入环形缓冲区3.2.3 排序并溢写(sortAndSpill):3.2.4 合并(merge):3.3 Reduce端Shuffle3.3.1 拉取(Copy)3.3.2 排序合并(Merge Sort)3.3.3 归并分组(reduce)四、Reduce阶段4.1 执行reduce()方法4.2 输出最终结果参考文章:

一、分片

灵魂拷问:为什么要分片?

分而治之:MapReduce(MR)的核心思想就是分而治之;何时分,如何分就要从原理和源码来入手。做为码农大家都知道,不管一个程序多么复杂,在写代码和学习代码之前最重要的就是搞懂输入和输出,而MR的输入其实就是一个目录。而所谓的分而治之其实也是在把大文件分成小文件,然后一个机器处理一个小文件,最后再合并。所以MR的第一步就是对输入的文件进行分片。

1.1 对谁分片

对每个文件分片:分片是对输入目录中的每一个文件进行分片。后面的分片都是针对单个文件分片。

源码解读(对谁分片):

// 分片的源码位置

package org.apache.hadoop.mapreduce.lib.input;

abstract class FileInputFormat.java;

// 下面代码所在方法

method getSplits();

// InputStatus表示一个切片类

List splits = new ArrayList();

// 得到所有输入文件

List files = listStatus(job);

// 遍历每个文件。 根据每个文件来切片,而不是整个文件夹

for (FileStatus file : files) {

// 分片1

}

1.2 长度是否为0

文件长度:当文件长度不为0时才会进行下面的分片操作;如果文件长度为0,则会向分片列表中添加一个空的hosts文件数组和空长度的文件。也就是说,空文件也会创建一个空的分片。

源码解读(长度是否为0):

for (FileStatus file : files) {

Path path = file.getPath();

// 获取文件大小

long length = file.getLen();

if (length != 0) {

// 分片2

} else {// 如果文大小为空,默认就创建一个空的hosts文件数组和空长度的文件

//Create empty hosts array for zero length files

splits.add(makeSplit(path, 0, length, new String[0]));

}

}

1.3 是否可以分片

压缩格式:并不是所有的文件都可以分片,有一些压缩格式的文件是不可以分片的。因此只会对可以分片的文件进行分片,而不可以分片的文件即使再大也会作为一个整体来处理,相当于一个片。

源码解读(是否可以分片):

// 如果可以分片

if (isSplitable(job, path)) {

// 分片3

} else { // not splitable

splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),

blkLocations[0].getCachedHosts()));

}

// 判断一个文件是否可以切片

// FileInputFormat抽象类中默认返回true,子类TextInputFormat中实现如下

@Override

protected boolean isSplitable(JobContext context, Path file) {

final CompressionCodec codec =

new CompressionCodecFactory(context.getConfiguration()).getCodec(file);

if (null == codec) {// 如果一个文件的压缩编码为null,那么表示可以切片

return true;

}// 如果一个文件的压缩编码是SplittableCompressionCodec的子类,那么表示当前文件也可以切片

return codec instanceof SplittableCompressionCodec;

}

1.4 分片的大小

分片大小:分片太大就失去了分片的意义;如果分片很小,则管理和构建map任务的时间就会增多,效率变低。并且如果分片跨越两个数据块,那么分片的部分数据需要通过网络传输到map任务运行的节点上,效率会更低。所以分片的最佳大小应该和HDFS的分块大小一致。Hadoop2默认128M。

源码解读(分片大小):

// FormatMinSplitSize是 1, MinSplitSize如果没配置默认是 1

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

// 如果没配置,则默认是 Long类型的最大值

long maxSize = getMaxSplitSize(job);

// 块大小,Hadoop2是128M,本地模式为32M

long blockSize = file.getBlockSize();

// 分片大小计算公式。默认就是blockSize的大小

long splitSize=Math.max(minSize, Math.min(maxSize, blockSize));

自定义分片大小:由上面的公式可知,默认的分片大小就是blockSize的大小。如果要自定义大于blockSize,比如改为200M,就把minSize改为200;小于blockSize,比如20M,就把maxSize改为20

1.1倍:最常见的问题就是:一个大小为130M的文件,在分片大小为128M的集群上会分成几片?答案是1片;因为 128*1.1>130,准确来说应该是130 / 128 < 1.1 (源码的公式)。也就是说,如果剩下的文件大小在分片大小的1.1倍以内,就不会再分片了。要这个1.1倍,是为了优化性能;试想如果不这样,当还剩下130M大小的时候,就会分成一块128M,一块2M,后面还要为这个2M的块单独开一个map任务,不划算。至于为什么是1.1,这个1.1是专家们通过反复试验得出来的结果。

源码解读(1.1倍):

// 当剩余文件的大小,大于分片大小的1.1倍时,才会分片

private static final double SPLIT_SLOP = 1.1; // 10% slop

// bytesRemaining为文件剩余大小,splitSize为上面计算出的分片大小

while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {

// 分片4

}

1.5 开始分片

终于分片了:经过上面的层层条件,下面就是// 分片4中的分片代码。与HDFS的物理分块不同的是,MapReduce的分片只是逻辑上的分片,即按照偏移量分片。

// 封装一个分片信息(包含文件的路径,分片的起始偏移量,要处理的大小,分片包含的块的信息,分片中包含的块存在哪儿些机器上)

int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);

// makeSplit进行切片操作,返回值是一个切片,并且加入到切片列表中

splits.add(makeSplit(path, length - bytesRemaining, splitSize,

blkLocations[blkIndex].getHosts(),

blkLocations[blkIndex].getCachedHosts()));

// 剩余文件大小

bytesRemaining -= splitSize;

1.6 分片后读取会不会断行

不会:由于分片时是按照长度进行分片的,那就有很大可能会把一行数据分在两个片里面,所以分片的时候确实会断行。如果读取并处理断行的数据,就会导致结果不正确,那是肯定不行的。所以LineRecordReader类就充当了读取记录的角色,保证读取不断行;其中nextKeyValue()方法里是真正给Mapper中的key赋值的地方,并且调用了父类LineReader类中的readLine()方法来给value赋值。

源码解读(读取时不断行):

public class TextInputFormat extends FileInputFormat {

@Override

public RecordReader

createRecordReader(InputSplit split,

TaskAttemptContext context) {

String delimiter = context.getConfiguration().get(

"textinputformat.record.delimiter");

// 行分隔符

byte[] recordDelimiterBytes = null;

if (null != delimiter)

recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);

// 返回LineRecordReader对象

return new LineRecordReader(recordDelimiterBytes);

}

}

// 行记录读取类,提供读取片中数据的功能,并且保证不断行

public class LineRecordReader extends RecordReader {

// ......其他代码

public void initialize(InputSplit genericSplit,

TaskAttemptContext context) throws IOException {

// ......

// 如果不是第一个分片,则开始位置退到下一行记录的开始位置

// 因为为了保证读取时不断行,每个块都会向后多读一行(最后一个除外)

if (start != 0) {

start += in.readLine(new Text(), 0, maxBytesToConsume(start));

}

}

public boolean nextKeyValue() throws IOException {

// 给Mapper中输入的key赋值

key.set(pos);

// 实例化Mapper中输入的value

if (value == null) {

value = new Text();

}

// 注意是<=end,在等于end时还会执行一次,多读了一行,所以不会断行

while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {

if (pos == 0) {

newSize = skipUtfByteOrderMark();

} else {

// 给Mapper中输入的value赋值。

// readLine方法会根据是否自定义行分隔符来调用不同的方法。

newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));

pos += newSize;

}

}

}

}

二、Map阶段

2.1 实例化Mapper

各种实例化:上面费了很大的劲来编写分片TextInputFormat,和读取类LineRecordReader;而这一切都是为了把输入数据很好的传给map()方法来运算,所以首先就要实例化我们自定义的Mapper类。

源码解读(各种实例化):

package org.apache.hadoop.mapred;

class MapTask.java;

method runNewMapper();

// 通过反射来获取Mapper。在Job中设置的Mapper,也就是自己定义的继承自Mapper的类

org.apache.hadoop.mapreduce.Mapper mapper =

(org.apache.hadoop.mapreduce.Mapper)

ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

// 通过反射来得到 InputFormat。默认是TextInputFormat

org.apache.hadoop.mapreduce.InputFormat inputFormat =

(org.apache.hadoop.mapreduce.InputFormat)

ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

// 获得当前MapTask要处理的split

org.apache.hadoop.mapreduce.InputSplit split = null;

split = getSplitDetails(new Path(splitIndex.getSplitLocation()),

splitIndex.getStartOffset());

LOG.info("Processing split: " + split);

// 根据InputFormat对象创建RecordReader对象。默认是LineRecordReader

org.apache.hadoop.mapreduce.RecordReader input =

new NewTrackingRecordReader

(split, inputFormat, reporter, taskContext);

// 初始化。用来打开文件,并且调整文件的头指针

input.initialize(split, mapperContext);

// MapTask中调用Mapper的run()方法

mapper.run(mapperContext);

2.2 调用map()方法

每行数据调用一次:从上面的代码中我们知道,MapTask中会调用Mapper类的run()方法;而run()方法会在while循环中调用map()方法,由退出条件可知,是每一行数据调用一次map()方法。

源码解读(怎么调用map()方法):

public void run(Context context) throws IOException, InterruptedException {

// 在所有map执行之前初始化,也可以根据业务需要来重写此方法

setup(context);

try {

// context.nextKeyValue()其实就是LineRecordReader中的nextKeyValue()方法;

// 在run方法中遍历所有的key,每行数据都执行一次自定义map方法;

while (context.nextKeyValue()) {

map(context.getCurrentKey(), context.getCurrentValue(), context);

}

} finally {

// 父类Mapper中的setup()和cleanup()方法中什么都没做;

// 只执行一次,可以根据业务需要来重写此方法;

cleanup(context);

}

}

三、Shuffle阶段

灵魂拷问:哪来的Shuffle?

理论与实现:看过源码的都知道,其实源码中根本就没有什么shuffle;shuffle只是一个过程,确切的来说是连贯Map阶段和reduce阶段的一个理论过程,而它的实现主要在MapTask和ReduceTask类中。shuffle阶段可以说是MapReduce中最核心的一个阶段。

3.1 shuffle的概念

作用:shuffle这个单词的本意是洗牌、打乱的意思,而在这里则是:将map端的无规则输出按照指定的规则“打乱”成具有一定规则的数据,以便reduce端接收和处理。

流程:shuffle的范围是map输出后到reduce输入前。它的流程主要包括Map端shuffle和reduce端shuffle。

MapReduce大致流程:

3.2 Map端Shuffle

作用:Map端的shuffle过程是对Map的结果进行分区、排序、溢写、合并分区,最后写入磁盘;最终会得到一个分区有序的文件,即先按分区排序,再按key排序。

Map端shuffle大致流程:

3.2.1 分区(partition)

概念:对于map的每一个输出的键值对,都会根据key来生成partition再一起写入环形缓冲区。每一个reduceTask会处理一个partition(第0个reduceTask处理partition为0的分区,以此类推)。

如何分区:默认情况下,分区号是key的hash值对numReduceTask数量取模的结果。也可以自定义分区。

源码解读(如何分区):

// 当设置的reduceTask数大于实际分区数时,可以正常执行,多出的分区为空文件;

// 当设置的reduceTask数小于实际分区数时,会报错。

job.setNumReduceTasks(4);

// 如果设置的 numReduceTasks大于 1,而又没有设置自定义的 PartitionerClass

// 则会调用系统默认的 HashPartitioner实现类来计算分区。

job.setPartitionerClass(WordCountPartitioner.class);

// 自定义分区

public class WordCountPartitioner extends Partitioner {

private static HashMap map = new HashMap<>();

static {

map.put("0734", 0);

map.put("0561", 1);

map.put("0428", 2);

}

// 当 Mapper的输出要写入环形缓冲区时,会调用此方法来计算当前的分区号

@Override

public int getPartition(Text text, IntWritable intWritable, int numPartitions) {

String strText = text.toString();

return map.getOrDefault(strText.substring(0, 4), 3);

}

}

// MapTask.java$NewOutputCollector

public void write(K key, V value) throws IOException, InterruptedException {

// 把 K,V以及分区号写入环形缓冲区

collector.collect(key, value,

partitioner.getPartition(key, value, partitions));

}

3.2.2 写入环形缓冲区

概念:环形缓冲区是在内存中的一个字节数组kvbuffer。kvbuffer不仅存放map输出的,还在另一端存放了的索引(元数据) kvmeta,每个kvmeta包括value的起始位置、key的起始位置、partition值、value的长度,占用4个int长度。上图中的bufindex和kvindex分别表示kvbuffer和kvmeta的指针。环形缓冲区的默认大小是100M,当写入数据大小超过80%(80M)就会触发Spill,溢写到磁盘。

源码解读(Spill):

// SpillThread线程在MapTask$MapOutputBuffer类中初始化,在init()方法中启动。

// 它会一直监视环形缓冲区,当大小超过80%的时候,就会调用sortAndSpill()方法。

protected class SpillThread extends Thread {

@Override

public void run() {

// ....

// run方法中调用排序并溢写方法

while (true) {

// ....

sortAndSpill();

}

//....

}

}

3.2.3 排序并溢写(sortAndSpill):

排序:触发溢写后,会先排序,再溢写。排序是根据partition和key的升序排序,移动的只是索引数据,排序的结果是将kvmeta中到的数据按照partition聚合在一起,同一个partition内再根据key排序。

溢写:Spill线程根据排序后的kvmeta文件,将一个个partition输出到文件,在这次溢写过程中,会将环形缓冲区中已计算的数据(80M)写入到一个文件spill.out,所以引入了索引文件spill.index,它记录了partition在spill.out中的位置。

3.2.4 合并(merge):

概念:如果Map的数据很大,那么就会触发多次Spill,spill.out和spill.index文件也会很多。所以最后就要把这些文件合并,方便Reduce读取。

合并过程:合并过程中,首先会根据spill.index文件,将spill.out文件中的partition使用归并排序分别写入到相应的segment中,然后再把所有的segment写入到一个file.out文件中,并用file.out.index来记录partition的索引。由于合并时可能有相同的key,所以如果设置了combine,那么在写入文件之前还会调用自定义的combine方法。

3.3 Reduce端Shuffle

3.3.1 拉取(Copy)

前期工作:Reduce任务会通过HTTP向各个Map任务拉取它所需的partition数据。当Map任务成功完成之后会通知 TaskTracker状态已跟新,TaskTracker进而通知JobTracker(都是通过心跳机制实现),所以JobTracker中记录了Map输出和TaskTracker的映射关系。

何时拉取:Reduce会定期向JobTracker获取Map的输出位置,一旦拿到输出位置,Reduce任务就会立即从此输出对应的TaskTracker上复制相应的partition数据到本地,而不是等到所有Map任务结束。

3.3.2 排序合并(Merge Sort)

合并:copy过来的数据会先放入内存缓冲区中(大小是 JVM的heap size的70%),如果缓冲区放得下就直接把数据写入内存,即内存到内存merge。如果缓冲区中的Map数据达到一定大小(缓冲区的66%)的时候,就会开启内存merge,并将merge后的数据写入磁盘,即内存到磁盘merge。当属于该Reduce任务的map输出全部拉取完成,则会在reduce任务的磁盘上生成多个文件(如果所有map输出的大小没有超过缓冲区大小,则数据只存在于内存中),这时开始最后的合并操作,即磁盘到磁盘merge。如果设置了combine,合并时也会执行。

排序:由于map输出的数据已经是有序的,所以reduce在合并时的排序是归并排序,并且reduce端的copy和sort是同时进行的,最终会得到一个整体有序的数据。

3.3.3 归并分组(reduce)

归并分组(reduce):当reduce任务执行完拉取和排序合并后,就会对相同的key进行分组。默认情况下是根据key对象中重写的compareTo()方法来分组,如果设置了GroupingComparator,则会调用它的compare()方法来分组。reduce会把compareTo(或compare)方法计算返回为 0 的key分为一组,最终会得到一个组>,其中组的key是这一组的第一个数据的key,Iterable则是相同key的value迭代器。最后再对每一个组调用Reducer的reduce()方法。

源码解读(分组):

// org.apache.hadoop.mapreduce.Reducer中的run()方法

while (context.nextKey()) {

// 调用自定义 reduce方法

reduce(context.getCurrentKey(), context.getValues(), context);

// .....

}

// org.apache.hadoop.mapreduce.task.ReduceContextImpl中的方法

public boolean nextKey() throws IOException,InterruptedException {

// 如果当前key与下一个key相同,则继续往下走;

// 这一步就是把相同的key放到一组, 他们的value放到一个迭代器中;当下一个key不同时再调用reduce方法

while (hasMore && nextKeyIsSame) {

nextKeyValue();

}

if (hasMore) {

if (inputKeyCounter != null) {

// 计数器

inputKeyCounter.increment(1);

}

// 当nextKeyIsSame为false时,会再调用一次nextKeyValue(),而它的返回值必为true;

return nextKeyValue();

} else {

return false;

}

}

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

if (hasMore) {

nextKey = input.getKey();

// 在执行reduce方法之前调用ReduceContext中定义的GroupComparator

// 如果key的compareTo方法返回0则 nextKeyIsSame为true,也就会分到一组

nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,

currentRawKey.getLength(),

nextKey.getData(),

nextKey.getPosition(),

nextKey.getLength() - nextKey.getPosition()

) == 0;

} else {

nextKeyIsSame = false;

}

inputValueCounter.increment(1);

return true;

}

四、Reduce阶段

4.1 执行reduce()方法

归并:上面的Shuffle阶段已经将数据分组成了>格式的数据,所以对于相同的key只会调用一次reduce()方法。

注意事项:在reduce()方法中,一定要重新创建key对象,不要直接使用参数中的key。

4.2 输出最终结果

完结:整个MapReduce的输出和输入有点类似。输出是实例化TextOutputFormat和LineRecordWrite对象。并由LineRecordWrite判断是不是NullWriteable,最后输出到文件

参考文章:

https://blog.csdn.net/u014374284/article/details/49205885

https://blog.csdn.net/asn_forever/article/details/81233547