设为首页 加入收藏

TOP

自定义Hadoop Map/Reduce输入文件切割InputFormat(二)
2014-11-24 07:15:02 来源: 作者: 【 】 浏览:3
Tags:定义 Hadoop Map/Reduce 输入 文件 切割 InputFormat
kipped line of size " + newSize + " at pos " + (this.pos - newSize));
}


if (newSize == 0) {
//读下一个buffer
this.key = null;
this.value = null;
return false;
}
//读同一个buffer的下一个记录
return true;
}


public LongWritable getCurrentKey() {
return this.key;
}


public Text getCurrentValue() {
return this.value;
}


public float getProgress() {
if (this.start == this.end) {
return 0.0F;
}
return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));
}


public synchronized void close() throws IOException {
if (this.in != null)
this.in.close();
}


}


3)重写SearchRecordReader需要的LineReader,可作为SearchRecordReader内部类。特别需要注意的地方就 是,读取文件的方式是按指定大小的buffer来读,必定就会遇到一条完整的记录被切成两半,甚至如果分隔符大于1个字符时分隔符也会被切成两半的情况, 这种情况一定要加以拼接处理。


public class LineReader {
//回车键(hadoop默认)
//private static final byte CR = 13;
//换行符(hadoop默认)
//private static final byte LF = 10;

//按buffer进行文件读取
private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private InputStream in;
private byte[] buffer;
private int bufferLength = 0;
private int bufferPosn = 0;

LineReader(InputStream in, int bufferSize) {
this.bufferLength = 0;
this.bufferPosn = 0;

this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
}


public LineReader(InputStream in, Configuration conf) throws IOException {
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
}


public void close() throws IOException {
in.close();
}


public int readLine(Text str, int maxLineLength) throws IOException {
return readLine(str, maxLineLength, Integer.MAX_VALUE);
}


public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}


//以下是需要改写的部分_start,核心代码


public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
str.clear();
Text record = new Text();
int txtLength = 0;
long bytesConsumed = 0L;
boolean newline = false;
int sepPosn = 0;

do {
//已经读到buffer的末尾了,读下一个buffer
if (this.bufferPosn >= this.bufferLength) {
bufferPosn = 0;
bufferLength = in.read(buffer);

//读到文件末尾了,则跳出,进行下一个文件的读取
if (bufferLength <= 0) {
break;
}
}

int startPosn = this.bufferPosn;
for (; bufferPosn < bufferLength; bufferPosn ++) {
//处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){
sepPosn = 0;
}

//遇到行分隔符的第一个字符
if (buffer[bufferPosn] == separator[sepPosn]) {
bufferPosn ++;
int i = 0;

//判断接下来的字符是否也是行分隔符中的字符
for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){

//buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
if(bufferPosn + i >= bufferLength){
bufferPosn += i - 1;
break;
}

//一旦其中有一个字符不相同,就判定为不是分隔符
if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){
sepPosn = 0;
break;
}
}

//的确遇到了行分隔符
if(sepPosn == sepLength){
bufferPosn += i;
newline = true;
sepPosn = 0;
break;
}
}
}


int readLength = this.bufferPosn - startPosn;


bytesConsumed += readLength;
//行分隔符不放入块中
//int appendLength = readLength - newlineLength;
if (readLength > maxLineLength - txtLength) {
readLength = maxLineLength - txtLength;
}
if (readLength > 0) {
record.append(this.buffer, startPosn, readLength);
txtLength += readLength;

//去掉记录的分隔符
if(newline){
str.set(record.getBytes(), 0, record.getLength() - sepLength);
}
}


} while (!newline && (bytesConsumed < maxBytesToConsume));


if (bytesConsumed > (long)Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}

return (int) bytesConsumed;
}


//以下是需要改写的部分_end


//以下是hadoop-core中LineReader的源码_start


public int readLine(Text str

首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇Java随机数的生成算法 下一篇使用Hadoop MapReduce 进行排序

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·用 C 语言或者限制使 (2025-12-25 08:50:05)
·C++构造shared_ptr为 (2025-12-25 08:50:01)
·既然引用计数在做 GC (2025-12-25 08:49:59)
·Java 编程和 c 语言 (2025-12-25 08:19:48)
·. net内存管理宝典这 (2025-12-25 08:19:46)