问题描述:给定一个中心点cpoint,和另外一堆点points,求另外一堆点points到cpoint的距离,并且按照从小到达的顺序排列(然后再自己遍历一边就求得k近邻了,这里忽略这一步)。
Map-Reduce算法:每个map对应接受points里面的一个点point,然后在map里求point和中心点cpoint(通过Configuration传入)的距离,然后把该距离作为map的输出key,而 map的输出value为该点point坐标。这样子经过shuffle&sort中间步骤后,到达reduce时key(即每个点到cpoint的距离)已经按照大小排好序,直接输出即可。
输入数据(各个点的坐标):
2 2
3 4
9 8
20 10
20 10
15 14
89 15
输出数据(第一个数据是距离指定点cpoint的坐标,第二个数据是点point自己的坐标,cpoint在main函数里面通过Configuration指定和传入)
2.828427 2.0,2.0
5.0 3.0,4.0
12.0415945 9.0,8.0
20.518284 15.0,14.0
22.36068 20.0,10.0
22.36068 20.0,10.0
90.255196 89.0,15.0
具体的Map的Input key是当前字符偏移量,Input value是每个点的2D坐标。
代码:
首先是自定义的类Point2D,为了让Map和Reducer能够接受该自定义类作为key/value,必须相应继承Interface:WritableComparable/Writable,因为map-reduce有自己的serialize和deserialize机制。
补充一点是我找了很多的材料,都说自定义的类只要继承这两个相应的接口,map和reduce就能识别并且接受它们作为key/value。但我代码试了下不行。Eclipse提示因为我用了默认TextInputFileFormat,所以value必须是Text,而不能是我自己定义的Point2D.为此我不得不实现了自定义的FileInputFormat和RecordReader,来解析我自己的数据结构Point2D。
[java]
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Point2D implements Writable{
public float x;
public float y;
public Point2D(float x, float y)
{
this.x = x;
this.y = y;
}
public Point2D()
{
this.x = 0;
this.y = 0;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
x = in.readFloat();
y = in.readFloat();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeFloat(x);
out.writeFloat(y);
}
@Override
public String toString()
{
return Float.toString(x) + "," + Float.toString(y);
}
}
自定义的InputFormat
[java]
import java.io.IOException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.input.*;
public class KNPointInputFormat extends FileInputFormat{
@Override
public RecordReader createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return new PointRecordReader();
}
}
注意上面的
[java]
PointRecordReader
用来解析我们自己变量的RecordReader,具体代码如下:
[java]
import java.io.IOException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org