Reduce的输出
?
在构造ReduceContext的时候,传入了两个跟输出相关的参数,一个是RecordWriter类型,一个 是OutputCommitter类型。但是,当查看这两个对象构造的过程时,会发现他们的幕后boss居然是 OutputFormat!这货看起来是不是非常眼熟?没错,我们在之前讲Map的输出时提到过一次,没有展开 讲。它跟InputFormat的功能其实很呼应。
?
OutputFormat
按照惯例,我们还是来看看它的接口。?
public abstract class OutputFormat{ /** * Get the {@link RecordWriter} for the given task. * * @param context the information about the current task. * @return a {@link RecordWriter} to write the output for the job. * @throws IOException */ public abstract RecordWriter getRecordWriter(TaskAttemptContext context ) throws IOException, InterruptedException; /** * Check for validity of the output-specification for the job. * * This is to validate the output specification for the job when it is * a job is submitted. Typically checks that it does not already exist, * throwing an exception when it already exists, so that output is not * overwritten.
* * @param context information about the job * @throws IOException when output should not be attempted */ public abstract void checkOutputSpecs(JobContext context ) throws IOException, InterruptedException; /** * Get the output committer for this output format. This is responsible * for ensuring the output is committed correctly. * @param context the task context * @return an output committer * @throws IOException * @throws InterruptedException */ public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context ) throws IOException, InterruptedException; }
?
从源码的描述来看,OutputFormat主要做两件事情,一是验证job的输出配置是否合理(比如查看目标路径是否存在),二是提供一个RecordWriter的实现,来写入最终的输出。三个抽象方法,分别用 于返回RecordWriter,返回OutputCommitter,以及验证输出的配置。
你可能会想不通一个输出为什么要搞这么复杂,反正一个reducer产生一个文件,指定一下目录,直接往里写不就行了吗?怎么还要recordWriter,还要committer的。我们回想一下hadoop设计的初 衷,在不可靠的机器上,得到可靠的输出。也就是,hadoop的设计者认为一个task它很可能是会运行 失败的,我们常常需要尝试多次,因此,除了写入操作之外,我们应该先写在临时目录,确定成功了, 再提交到正式的输出目录里,这个工作其实就是committer做得,而recordWriter只要专注于写入操作 就可以了。
我们当然可以从头开始写一个OutputFormat,但更一般的做法是,继承自一个典型的实现 FileOutputFormat。
FileOutputFormat
下面是它对checkOutputSpecs的实现:?
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws FileAlreadyExistsException,
InvalidJobConfException, IOException {
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null && job.getNumReduceTasks() != 0) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
if (outDir != null) {
FileSystem fs = outDir.getFileSystem(job);
// normalize the output directory
outDir = fs.makeQualified(outDir);
setOutputPath(job, outDir);
// get delegation token for the outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] {outDir}, job);
// check its existence
if (fs.exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
}
?
?
最开始接触hadoop跑测试的时候,经常遇到FileAlreadyExistsException这个错误,原因就是 没有删掉上一次跑的结果。直到现在,才知道原来是从这里抛出的啊。hadoop之所以这样设定,是为 了防止因为粗心覆盖掉之前生成的数据,我觉得这是合理的。
FileOutputFormat还提供了一些好用的方法,比如下面这个:
public synchronized static String getUniqueFile(TaskAttemptContext c