MapReduce的详细过程(九)

2015-07-24 07:41:40 · 作者: · 浏览: 10
l考虑的事情了。

?

MergeManagerImpl

MergeManagerImpl几乎handle了所有与merge相关的实现。他有两个(其实是三个)内部类, InMemeryMerger和OnDiskMerger,分别对应了前面的两种不同的MapOutput。

我们先看一下这两个Merger共同的父类MergeThread,比较容易理解它做得事情。

?

abstract class MergeThread
  
    extends Thread {
private LinkedList
   
    > pendingToBeMerged; public synchronized void close() throws InterruptedException { closed = true; waitForMerge(); interrupt(); } public void startMerge(Set
    
      inputs) { if (!closed) { numPending.incrementAndGet(); List
     
       toMergeInputs = new ArrayList
      
       (); Iterator
       
         iter=inputs.iterator(); for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) { toMergeInputs.add(iter.next()); iter.remove(); } LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() + " segments, while ignoring " + inputs.size() + " segments"); synchronized(pendingToBeMerged) { pendingToBeMerged.addLast(toMergeInputs); pendingToBeMerged.notifyAll(); } } } public synchronized void waitForMerge() throws InterruptedException { while (numPending.get() > 0) { wait(); } } public void run() { while (true) { List
        
          inputs = null; try { // Wait for notification to start the merge... synchronized (pendingToBeMerged) { while(pendingToBeMerged.size() <= 0) { pendingToBeMerged.wait(); } // Pickup the inputs to merge. inputs = pendingToBeMerged.removeFirst(); } // Merge merge(inputs); } catch (InterruptedException ie) { numPending.set(0); return; } catch(Throwable t) { numPending.set(0); reporter.reportException(t); return; } finally { synchronized (this) { numPending.decrementAndGet(); notifyAll(); } } } } public abstract void merge(List
         
           inputs) throws IOException; }
         
        
       
      
     
    
   
  

?

容易看出,MergeThread有一个LinkedList,用于存放MapOutput得到的输出,startMerge方 法会将这些输出添加到List中,run方法会不断的从中取出Mapoutput的输出,并调用merge方法, Close的时候会等待所有的merge过程结束。startMerge方法正是在MergeManagerImpl的 closeXXXXMergedFile调用的。

这样整个过程就清晰一些了,Shuffle时调用Fetcher来下载Map的输出,Fetcher根据数据量的大小,判断是实例化InMemoryMapOutput还是OnDiskMapOutput,实例化出的MapOutput拉取完 毕后,Fetcher通过一个Shuffle的scheduler调用Mapoutput的commit方法,commit方法中调用到 MergeManagerImpl的closeXXXXMergedFile方法,这个方法又调用到MergeThread实现类中得 startMerge方法,下载到得数据最终就被传递到了MergeThread的实现类了。

剩下的问题,就是怎么Merge了。

Merge

整个Merge的过程比较复杂,牵扯到得代码也比较多,我按照我的理解,在逻辑的层面简单叙述一下这个过程。

从整体上讲,Merge的过程是先Merge掉InMemory的,InMemory的结果也会加入到onDisk的待 Merge队列中,最后补上一记finalMerge,合并起InMemory剩余的与onDisk剩余的。每种Merger的Merge操作最终都是交给一个叫Merger的工具类的静态方法实现的。

除了参数的不同,实际merge的过程是类似的。Merge就是将小文件合并成大文件,对于初始有序 的数据,为了减少比较次数,每次应该合并数据最少的两组,也就是霍夫曼树的思想。从源码看,貌似 是自己用ArrayList实现了一个。

InMemory的Merge行为是,先将InMemoryMapOutput中的buffer结构化成一组segment, segment含有需要merge的数据,最重要的,它含有这些数据的长度信息,这个信息会再霍夫曼树式的 merge用到。接下来它会new出一个path对象用来存放merge的结果,一个Writer来写入,然后就会调用Merger工具类的相应merge方法进行实际的merge。在实际写入文件的时候,会判断有没有指定 Combiner,也就是会不会对相同key的输出进行进一步的合并。InMemoryMerger的最终结果会写入到 文件,并将这个文件的信息注册到onDiskMerger中,以便后续的合并。

onDiskMerger的行为与InMemoryMerger的行为基本一致,只是在调用Merger的时候给定了不 同的参数。

finalMerge的行为是,先判断有没有inMemory的output,有的话构造出segment,合并,最终 的结果是一个文件,添加到onDisk得output队里中,然后合并onDisk的ouput,比较特别的,finalMerge 是有返回值的,最终合并的结果输出是RawKeyValueIterator类型,代表这一个reduce所接收到的所 有输入。

?

MergeManagerImpl的close方法

在shuffle的run方法中,在copyPhase结束之后,调用了MergeManagerImpl的close方法,该 方法的实现如下:

public RawKeyValueIterator close() throws Throwable {
// Wait for on-going merges to complete
if (memToMemMerger != null) {
memToMemMerger.close();
}
inMemoryMerger.close();
onDiskMerger.close();
List
  
   > memory =
new ArrayList
   
    >(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); memory.addAll(inMemoryMapOutputs); inMemoryMapOutputs.clear(); List
    
      disk = new ArrayList
     
      (onDiskMapOutputs); onDiskMapOutputs.clear(); return finalMerge(jobConf, rfs, memory, disk); }
     
    
   
  

可见,在inMemoryMerger和onDiskMerger的close之后,会最终返回finalMerge的结果。这个 RawKeyValueIterator会最为一个参数传递给reduce过程。Shuffle过程到此就算彻底结束了。

?

Reduce的输入

Shuffle过程结束之后,reduce阶段获得了RawKeyValueIterator类型的输入,ReduceTask的 run方法会调用runNewReducer方法,该方法的签名如下:

?

private 
  
   
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,