MapReduce的详细过程(四)

2015-07-24 07:41:40 · 作者: · 浏览: 3
* Close this RecordWriter to future operations. * * @param context the context of the task * @throws IOException */ public abstract void close(TaskAttemptContext context ) throws IOException, InterruptedException; } write方法用来写入,close方法用来释放资源。
FileOutputFormat中没有提供getRecordWriter的实现,我们来看一下实际工作中用的比较多的FileOutputFormat最有名的子类,TextOutputFormat中是怎么实现的。

?

?

public RecordWriter
  
    getRecordWriter(FileSystem ignored,
JobConf job,
String name,
Progressable progress)
throws IOException {
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",
"\t");
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter
   
    (fileOut, keyValueSeparator); } else { Class
     codecClass = getOutputCompressorClass(job, GzipCodec.class); // create the named codec CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job); // build the filename including the extension Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter
    
     (new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } }
    
   
  

代码很好懂,key和value之间默认的分割符是"\t",输出流用得是FSDataOutputStream,可压缩,可不压缩。LineRecordWriter是它的内部类,每次新起一行写入新数据,key value之间用分隔符分割。至此,我就将MapReduce的过程全部讲完了,中间还有没说清楚的地方,后面我们继续晚上,再自己的经验积累一些后,应该会有更深刻的理解。

结语

终于将这篇博客写完了,本来想着是入职之前完成的,结果拖了快两个月。最初开始的时候也没想 过会写这么多,看起源码来,一环一环的,就总想再搞得明白一些。里面掺杂了一些我工作中遇到的问 题,不是很多,不过我觉得还是有一定的参考意义的。希望这篇博客能帮助到对hadoop感兴趣的你。