设为首页 加入收藏

TOP

MapReduce TotalOrderPartitioner 全局排序(一)
2014-11-24 03:14:22 来源: 作者: 【 】 浏览:3
Tags:MapReduce TotalOrderPartitioner 全局 排序

public class HashPartitioner extends Partitioner {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}


/**
* Partitioner effecting a total order by reading split points from
* an externally generated source.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TotalOrderPartitioner,V>
extends Partitioner implements Configurable {
// by construction, we know if our keytype
@SuppressWarnings("unchecked") // is memcmp-able and uses the trie
public int getPartition(K key, V value, int numPartitions) {
return partitions.findPartition(key);
}
}


TotalOrderPartitioner依赖于一个partition file来distribute keys,partition file是一个实现计算好的sequence file,如果我们设置的reducer number是N,那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。


InputSampler类的writePartitionFile方法会对input files取样并创建partition file。有三种取样方法:


1. RandomSampler 随机取样
2. IntervalSampler 从s个split里面按照一定间隔取样,通常适用于有序数据
3. SplitSampler 从s个split中选取前n条记录取样


paritition file可以通过TotalOrderPartitioner.setPartitionFile(conf, partitionFile)来设置,在TotalOrderPartitioner instance创建的时候会调用setConf函数,这时会读入partition file中key值,如果key是BinaryComparable(可以认为是字符串类型)的话会构建trie,时间复杂度是O(n), n是树的深度。如果是非BinaryComparable类型就构建BinarySearchNode,用二分查找,时间复杂度O(log(n)),n是reduce数


boolean natOrder =
conf.getBoolean(NATURAL_ORDER, true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
// Now that blocks of identical splitless trie nodes are
// represented reentrantly, and we develop a leaf for any trie
// node with only one split point, the only reason for a depth
// limit is to refute stack overflow or bloat in the pathological
// case where the split points are long and mostly look like bytes
// iii...iixii...iii . Therefore, we make the default depth
// limit large but not huge.
conf.getInt(MAX_TRIE_DEPTH, 200));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}


示例程序


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;


public class TotalSortMR {

public static int runTotalSortJob(String[] args) throws Exception {
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Path partitionFile = new Path(args[2]);
int reduceNumber = Integer.parseInt(args[3]);

// RandomSampler第一个参数表示key会被选中的概率,第二个参数是一个选取samples数,第三个参数是最大读取input splits数
RandomSampler sampler = new InputSampler.RandomSampler(0.1, 10000, 10);

Configuration conf = new Configuration();
// 设置partition file全路径到conf
T

首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇如何用Hadoop计算平均值 下一篇Android中创建自己的Launcher

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·哈希表 - 菜鸟教程 (2025-12-24 20:18:55)
·MySQL存储引擎InnoDB (2025-12-24 20:18:53)
·索引堆及其优化 - 菜 (2025-12-24 20:18:50)
·Shell 中各种括号的 (2025-12-24 19:50:39)
·Shell 变量 - 菜鸟教 (2025-12-24 19:50:37)