MapReduce的详细过程(八)

2015-07-24 07:41:40 · 作者: · 浏览: 9
:

?

@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
// TODO: This should not be necessary after HADOOP-8942
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher
  
    eventFetcher =
new EventFetcher
   
    (reduceId, umbilical, scheduler, this, maxEventsToFetch); eventFetcher.start(); // Start the map-output fetcher threads boolean isLocal = localMapFiles != null; final int numFetchers = isLocal ? 1 : jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); Fetcher
    
     [] fetchers = new Fetcher[numFetchers]; if (isLocal) { fetchers[0] = new LocalFetcher
     
      (jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret(), localMapFiles); fetchers[0].start(); } else { for (int i=0; i < numFetchers; ++i) { fetchers[i] = new Fetcher
      
       (jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret()); fetchers[i].start(); } } // Wait for shuffle to complete successfully while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) { reporter.progress(); synchronized (this) { if (throwable != null) { throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } } } // Stop the event-fetcher thread eventFetcher.shutDown(); // Stop the map-output fetcher threads for (Fetcher
       
         fetcher : fetchers) { fetcher.shutDown(); } // stop the scheduler scheduler.close(); copyPhase.complete(); // copy is already complete taskStatus.setPhase(TaskStatus.Phase.SORT); reduceTask.statusUpdate(umbilical); // Finish the on-going merges... RawKeyValueIterator kvIter = null; try { kvIter = merger.close(); } catch (Throwable e) { throw new ShuffleError("Error while doing final merge " , e); } // Sanity check synchronized (this) { if (throwable != null) { throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } } return kvIter; }
       
      
     
    
   
  

?

Shuffle的时候,会先判断是不是local run的,如果不是的话,会默认启动5个Fetcher线程拉取 map的输出,Fetcher会先找到一个主机,确定这台机器上它要拉取的map task的输出,然后使用http协议获取response的stream,交给MapOutput类型的对象去完成具体的下载任务。

当文件拉取完成,就会进入sort阶段。注意到我们拉取到数据都是局部有序的,因此,排序的过程, 实际上也就是一个Merge的过程。Copy phase结束之后,Shuffle会调用

kvIter = merger.close();

方法来得到排序完成的map的key value输出。

?

MapOutput

MapOutput有两个实现类,即OnDiskMapOutput和InMemoryMapOutput,具体哪一个被实例化,是看当前要shuffle的数据适不适合放到内存中。

OnDiskMapOutput的行为如下所示:


?

final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) {
int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream reading " +
getMapId());
}
disk.write(buf, 0, n);
bytesLeft -= n;
metrics.inputBytes(n);
reporter.progress();
}

InMemoryMapOutput的行为如下:

?

?

public static void readFully(InputStream in, byte buf[],
int off, int len) throws IOException {
int toRead = len;
while (toRead > 0) {
int ret = in.read(buf, off, toRead);
if (ret < 0) {
throw new IOException( "Premature EOF from inputStream");
}
toRead -= ret;
off += ret;
}
}

?

代码比较简单,前者有个buffer,一边读一边写文件,后者将数据缓存在一个byte数组里,跟类名 看上去的行为完全一致。

当MapOutput拷贝方法shuffle返回时,Fetcher会调用Scheduler的copySucceed方法做一些 收尾工作,比如将已经拷贝过的host从待拷贝列表中删除。比较重要的一点是,它会调用Mapoutput的commit方法。两种Mapoutput的实现在这里的差异不大,都会调用MergeManagerImpl的closeXXXXFile 方法。

MapOutput负责的是将数据从集群中得其他机器上拉取过来,拉取到的数据怎么Merge到一起, 就是MergeManagerImp