在这里面,会用反射的方式实例化出Reducer。
Reducer与Mapper非常相似,我贴一下他的实现:
?
?
public class Reducer{ /** * The Contextpassed on to the {@link Reducer} implementations. */ public abstract class Context implements ReduceContext{ } /** * Called once at the start of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * This method is called once for each key. Most applications will define * their reduce class by overriding this method. The default implementation * is an identity function. */ @SuppressWarnings("unchecked") protected void reduce(KEYIN key, Iterable values,Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } } /** * Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to * control how the reduce task works. */ public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator )iter).resetBackupStore(); } } } finally { cleanup(context); } } }
?
?
Reducer的输入输出的信息同样是封装在Context中。Reducer与Mapper看上去很像,但是有很 多关键的不同。比如reduce方法的参数,还有run方法的实现。
注意到,reduce方法的第二个参数,不再是一个VALUE类型,而是一个迭代器,意指key相同的 value会一次性的扔给这个reduce方法,那么到底怎样算key相同呢?我们看到reduce方法是在run方法中调用的,第一个参数与Mapper相同,也是context的currentKey,第二个不一样,是从context 获得的values,在ReduceContextImpl中,这个getValues方法,直接返回一个迭代器。
从语义上说,Reducer的reduce方法应该每次处理Key相同的那一组输入,那么到底什么样的一组 key,算是相同的key呢?这个关键的问题由构造ReduceContext的一个不起眼的参数,GroupingComparator 来解决。
GroupingComparator
我们知道,哪些Mapper的输出交给一个Reduce线程是由Partitioner决定的,但是这些输入并不 是一次性处理的,举个例子,我们在做大小写敏感wordcount的时候,假设使用的partition策略是根据单词首字母大小来指定reducer,有2个reducer的话,"an"和"car"都会交给同一个reduce线程, 但是统计每个单词个数的时候,他俩是不能混起来的,也就是一个reduce线程实际上将整个输入分成了好多组,在每一组上运行了一次reduce的过程。这个组怎么分,就是GroupingComparator做得事情。针对word count这个实例,我们应该将完全相等的两个单词作为一组,运行一次reduce的方法。
我们看一下GroupingComparator接口的定义:?
public interface RawComparatorextends Comparator { /** * Compare two objects in binary. ? * b1[s1:l1] is the first object, and b2[s2:l2] is the second object. * ? * @param b1 The first byte array. ? * @param s1 The position index in b1. The object under comparison's starting index. ? * @param l1 The length of the object in b1. ? * @param b2 The second byte array. ? * @param s2 The position index in b2. The object under comparison's starting index. ? * @param l2 The length of the object under comparison in b2. ? * @return An integer result of the comparison. */ ?public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); ?}
?
可见,它是从字节码的角度来判定是否相等的,具体比较哪一部分,可以根据我们自己的实现来控 制。
经过了Shuffle过程,所有的输入都已经按照Key排序好了。所以,在判定两个Key要不要交给同一 个Reduce方法时,只要判定相邻的两个key就可以了。这个比较的过程,实际上在我们在reduce方法 中,对value的迭代器不断的取next的时候,就悄悄发生了。
业务场景
接着前面的业务场景,还是ev