在,就将其删除
? ? ? ? }
? ? ? ? FileOutputFormat.setOutputPath(job, output);//输出路径
? ? ? ? Connection connection = ConnectionFactory.createConnection(configuration);
? ? ? ? TableName tableName = TableName.valueOf(TABLE_NAME);
? ? ? ? HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName));
? ? ? ? job.waitForCompletion(true);
? ? ? ? if (job.isSuccessful()){
? ? ? ? ? ? HFileLoader.doBulkLoad(outputPath, TABLE_NAME);//导入数据
? ? ? ? ? ? return 0;
? ? ? ? } else {
? ? ? ? ? ? return 1;
? ? ? ? }
? ? }
}
BulkLoadMapper.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
?* Created by shaobo on 15-6-9.
?*/
public class BulkLoadMapper extends Mapper {
? ? private String hbaseTable;
? ? private String dataSeperator;
? ? private String columnFamily1;
? ? private String columnFamily2;
? ? public void setup(Context context) {
? ? ? ? Configuration configuration = context.getConfiguration();//获取作业参数
? ? ? ? hbaseTable = configuration.get("hbase.table.name");
? ? ? ? dataSeperator = configuration.get("data.seperator");
? ? ? ? columnFamily1 = configuration.get("COLUMN_FAMILY_1");
? ? ? ? columnFamily2 = configuration.get("COLUMN_FAMILY_2");
? ? }
? ? public void map(LongWritable key, Text value, Context context){
? ? ? ? try {
? ? ? ? ? ? String[] values = value.toString().split(dataSeperator);
? ? ? ? ? ? ImmutableBytesWritable rowKey = new ImmutableBytesWritable(values[0].getBytes());
? ? ? ? ? ? Put put = new Put(Bytes.toBytes(values[0]));
? ? ? ? ? ? put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("month"), Bytes.toBytes(values[1]));
? ? ? ? ? ? put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("day"), Bytes.toBytes(values[2]));
? ? ? ? ? ? for (int i = 3; i < values.length; ++i){
? ? ? ? ? ? ? ? put.addColumn(Bytes.toBytes(columnFamily2), Bytes.toBytes("hour : " + i), Bytes.toBytes(values[i]));
? ? ? ? ? ? }
? ? ? ? ? ? context.write(rowKey, put);
? ? ? ? } catch(Exception exception) {
? ? ? ? ? ? exception.printStackTrace();
? ? ? ? }
? ? }
}
HFileLoader.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
/**
?* Created by shaobo on 15-6-9.
?*/
public class HFileLoader {
? ? public static void doBulkLoad(String pathToHFile, String tableName){
? ? ? ? try {
? ? ? ? ? ? Configuration configuration = new Configuration();
? ? ? ? ? ? HBaseConfiguration.addHbaseResources(configuration);
? ? ? ? ? ? LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
? ? ? ? ? ? HTable hTable = new HTable(configuration, tableName);//指定表名
? ? ? ? ? ? loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);//导入数据
? ? ? ? ? ? System.out.println("Bulk Load Completed..");
? ? ? ? } catch(Exception exception) {
? ? ? ? ? ? exception.printStackTrace();
? ? ? ? }
? ? }
}
程序编译打包,提交到Hadoop运行
HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar BulkLoad.jar inputpath outputpath1
上述命令用法可参考 44. HBase, MapReduce, and the CLASSPATH
作业运行情况:
15/06/14 14:31:07 INFO mapreduce.HFileOutputFormat2: Looki