MapReduce的详细过程(三)

2015-07-24 07:41:40 · 作者: · 浏览: 4
inal TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper mapper = (org.apache.hadoop.mapreduce.Mapper ) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // make the input format org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat ) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); org.apache.hadoop.mapreduce.RecordReader input = new NewTrackingRecordReader (split, inputFormat, reporter, taskContext); ... org.apache.hadoop.mapreduce.MapContext mapContext = new MapContextImpl (job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper .Context mapperContext = new WrappedMapper ().getMapContext( mapContext); try { input.initialize(split, mapperContext); mapper.run(mapperContext); ... }finally{ closeQuietly(input); ... }

从代码中可以看出,我们在客户端提交Job之前所进行的配置以JobContext的形式传递给了MapTask, 在MapTask的runNewMapper()的方法中,它使用反射实例化出了用户指定的Mapper类和inputFormat 类,并新建了InputSplit和RecorderReader用来实例化上面提交的MapContext,MapContext以参数的形式被传递给了包装类WrappedMapper,在对input进行初始化后,以

mapper.run(mapperContext);

的形式正式调用我们用户的代码。

InputSplit

源码中对InputSplit类的描述是:

?

/** 
			*  InputSplit represents the data to be processed by an 
			*  individual {@link Mapper}. * 
			*  

Typically, it presents a byte-oriented view on the input and is the * responsibility of {@link RecordReader} of the job to process this and present * a record-oriented view.*/ public abstract class InputSplit {?

          public abstract long getLength() throws IOException, InterruptedException;?
          public abstractString[] getLocations() throws IOException, InterruptedException; } 


?

?

更易于理解的表述是,InputSplit决定了一个Map Task需要处理的输入。更进一步的,有多少个 InputSplit,就对应了多少个处理他们的Map Task。从接口上来看,InputSplit中并没有存放文件的内 容,而是指定了文件的文件的位置以及长度。

既然inputsplit与MapTask之间是一一对应的,那么我们就可以通过控制InputSplit的个数来调整 MapTask的并行性。当文件量一定时,InputSplit越小,并行性越强。inputsplit的大小并不是任意的, 虽然最大值和最小值都可以通过配置文件来指定,但是最大值是不能超过一个block大小的。

Block是什么?用户通过HDFS的接口,看到的是一个完整文件层面,在HDFS底层,文件会被切成固 定大小的Block,并冗余以达到可靠存储的目的。一般默认大小是64MB,可以调节配置指定。

InputSplit是从字节的角度来描述输入的,回头查看一下Mapper,它里面没有这种东西啊,用到的 Key,Value是从哪来的?有请RecordReader。

RecordReader


?

RecordReader

按照惯例,先上源码:


?

public abstract class RecordReader
  
    implements Closeable {
/**
* Called once at initialization.
* @param split the split that defines the range of records to read
* @param context the information about the task
* @throws IOException
* @throws InterruptedException
*/
public abstract void initialize(InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException;
/**
* Read the next key, value pair.
* @return true if a key/value pair was read
* @throws I