otalOrderPartitioner.setPartitionFile(conf, partitionFile);
Job job = new Job(conf);
job.setJobName("Total-Sort");
job.setJarByClass(TotalSortMR.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(reduceNumber);
// partitioner class设置成TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
// 写partition file到mapreduce.totalorderpartitioner.path
InputSampler.writePartitionFile(job, sampler);
return job.waitForCompletion(true) 0 : 1;
}
public static void main(String[] args) throws Exception{
System.exit(runTotalSortJob(args));
}
}
上面的例子是采用InputSampler来创建partition file,其实还可以使用mapreduce来创建,可以自定义一个inputformat来取样,将output key输出到一个reducer
ps:hive 0.12实现了parallel ORDER BY(https://issues.apache.org/jira/browse/HIVE-1402),也是基于TotalOrderPartitioner,非常靠谱的new feature啊