Hadoop Writable深度复制及读取任意<key,value>序列文件(一)

2015-01-25 21:03:50 · 作者: · 浏览: 14
 public T More ...deepCopy(T source) {
50    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
51    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
52    T copiedValue = null;
53    try {
54      source.write(dataOut);
55      dataOut.flush();
56      ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
57      DataInput dataInput = new DataInputStream(byteInStream);
58      copiedValue = writableClass.newInstance();
59      copiedValue.readFields(dataInput);
60    } catch (Exception e) {
61      throw new CrunchRuntimeException("Error while deep copying " + source, e);
62    }
63    return copiedValue;
64  }
package mahout.fansy.utils.read;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.types.writable.WritableDeepCopier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class ReadArbiKV {
 
	/**
	 * 读取任意序列文件
	 */
	public static Configuration conf=new Configuration();
	public static WritableDeepCopier wdc;
	static String fPath="";
	static String trainPath="";
	static{
		conf.set("mapred.job.tracker", "ubuntu:9001");
		fPath="hdfs://ubuntu:9000/home/mahout/mahout-work-mahout/labelindex"; //  数据文件
	}
	public static void main(String[] args) throws IOException {
		readFromFile(fPath);
//		readFromFile(trainPath);
	}
	
	/**
	 * 读取序列文件
	 * @param fPath
	 * @return
	 * @throws IOException
	 */
	public static Map
readFromFile(String fPath) throws IOException{ FileSystem fs = FileSystem.get(URI.create(fPath), conf); Path path = new Path(fPath); Map map=new HashMap(); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, path, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); @SuppressWarnings("unchecked") Class writableClassK=(Class) reader.getKeyClass(); @SuppressWarnings("unchecked") Class writableClassV=(Class) reader.getValueClass(); while (reader.next(key, value)) { // Writable k=; // 如何实现Writable的深度复制? Writable k=deepCopy(key, writableClassK); // Writable 的深度复制 Writable v=deepCopy(value,writableClassV); map.put(k, v); // System.out.println(key.toString()+", "+value.toString()); // System.exit(-1);// 只打印第一条记录 } } finally { IOUtils.closeStream(reader); } return map; } /** * Writable 的深度复制 * 引自WritableDeepCopier * @param fPath * @return * @throws IOException */ public static Writable deepCopy(Writable source,Class writableClass) { ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); DataOut