1问题描述:查找最高气温。就是从气候日志数据中查找当年的最高气温,假设每一条记录的格式如下:“China2013inbeijing023isok0",其中2013是年份,023是温度记录,ok表示数据是完好的(为了简单易懂,省略其他的信息),我们的任务是从大量的数据记录中找出北京2013年的最高气温。这样的数据很适合用MapReduce来处理。
2问题分析,这个问题很简单,这里用这么简单的数据只是为了说明在Hadoop上编写调试MapReduce程序的过程。对于一条数据这需要提取出来年份和温度,然后找出最大温度就行了。这了类似于分治法,每一个Map过程就是分得过程,Reduce就是合的过程。
3 编码:
3.1Map函数:
//载入一些必要的包 import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Mapper; //新版APIMap过程需要继承org.apache.hadoop.mapreduce包Mapper 类,并重写其map方法 public class MaxtemMapper extends Mapper{ private static final int MISSING=9999; public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line=value.toString();//把Text类的对象转化为String类来处理 String year=line.substring(5,9);//该String的第5-9就是年份 int airtemperature; //有的数据中温度前面带”+“,需要处理下,带”+“的话第19到21为温度,不带的的话第18到21 //为温度 if(line.charAt(18)=='+'){ airtemperature=Integer.parseInt(line.substring(19, 21)); } else { airtemperature=Integer.parseInt(line.substring(18,21)); } System.out.println("year:"+year+"tem:"+airtemperature); 判断数据是否是正确的 String quality=line.substring(23, 25); if(airtemperature!=MISSING && quality.matches("ok")){ context.write(new Text(year), new IntWritable(airtemperature)); } } }
3.2Reduce函数:
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class MaxtemReducer extends Reducer{ public void reduce(Text key,Iterator values, Context context) throws IOException,InterruptedException{ int maxValue=Integer.MIN_VALUE; while(values.hasNext()){ maxValue=Math.max(maxValue,values.next().get()); } context.write( key, new IntWritable(maxValue)); } }
3.3执行MapReduce过程
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature{ //the main function public static void main(String [] args)throws Exception{ Configuration conf=new Configuration(); if(args.length!=2){ System.out.println("Usage: Maxtemperature "); System.exit(-1); } Job job=new Job(conf,"MaxTemperature"); job.setJarByClass(MaxTemperature.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxtemMapper.class); job.setReducerClass(MaxtemReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); //output the details of the work System.out.println("name:"+job.getJobName()); System.out.println("isSuccessful :"+ (job.isSuccessful() "yes":"no")); } }