Hadoop HelloWorld Examples - 求k临近点(+自定义变量+参数传入)(二)
.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.io.*;
class PointRecordReader extends RecordReader{
private LineRecordReader lineRecordReader;
private LongWritable key;
private Point2D value;
public PointRecordReader()
{
lineRecordReader = new LineRecordReader();
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
lineRecordReader.close();
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public Point2D getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return lineRecordReader.getProgress();
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
lineRecordReader.initialize(arg0, arg1);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
if(lineRecordReader.nextKeyValue() == false)
{
return false;
}
key = lineRecordReader.getCurrentKey();
if(value == null)
{
value = new Point2D();
}
String lineva lue = lineRecordReader.getCurrentValue().toString();
String[] xy = lineva lue.split(" ");
value.x = Float.parseFloat(xy[0]);
value.y = Float.parseFloat(xy[1]);
return true;
}
}
最后是Map-Reduce的主类
[java]
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
public class KNPoints {
public static class KNPointMapper extends Mapper
{
@Override
public void map(LongWritable key, Point2D value, Context context) throws IOException, InterruptedException
{
Configuration conf = context.getConfiguration();
String cPos = conf.get("cPoint");//The center point that we want to calculate the others' distance from it.
String[] cxy = cPos.split(" ");
Point2D cpoint = new Point2D();
cpoint.x = Integer.parseInt(cxy[0]);
cpoint.y = Integer.parseInt(cxy[1]);
float dis = (float)Math.sqrt( Math.pow((value.x -