DataNode本地数据存储和管理(四)

2014-11-24 11:52:29 · 作者: · 浏览: 153
thread.interrupt();
}
}
ongoingCreates.remove(b);//从近期块活动列表中移除
}
FSVolume v = null;
if (!isRecovery) {//正常模式
v = volumes.getNextVolume(blockSize);//所在卷
// create temporary file to hold block in the designated volume
f = createTmpFile(v, b);//在tmp目录下创建一个临时块
volumeMap.put(b, new DatanodeBlockInfo(v));//将此块信息记录到哈希表中
} else if (f != null) {//恢复模式,且此块文件近期被处理过(或正在被处理)---->视为append操作,即在原有块的基础上进行写入
DataNode.LOG.info("Reopen already-open Block for append " + b);
// create or reuse temporary file to hold block in the designated volume
v = volumeMap.get(b).getVolume();
volumeMap.put(b, new DatanodeBlockInfo(v));
} else {//此块近期未被处理
// reopening block for appending to it.
DataNode.LOG.info("Reopen Block for append " + b);
v = volumeMap.get(b).getVolume();
f = createTmpFile(v, b);//在tmp目录下创建一个临时块
File blkfile = getBlockFile(b);//获取已有的块文件(已在本地存在,且最近未被访问)
File oldmeta = getMetaFile(b);//获取旧的元数据文件
File newmeta = getMetaFile(f, b);//临时块对应的元数据文件
// rename meta file to tmp directory
DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
if (!oldmeta.renameTo(newmeta)) {//将旧数据文件移动到newmeta位置
throw new IOException("Block " + b + " reopen failed. " +
" Unable to move meta file " + oldmeta +
" to tmp dir " + newmeta);
}
// rename block file to tmp directory
DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
if (!blkfile.renameTo(f)) {//将旧文件移动到tmp目录下
if (!f.delete()) {
throw new IOException("Block " + b + " reopen failed. " +
" Unable to remove file " + f);
}
if (!blkfile.renameTo(f)) {
throw new IOException("Block " + b + " reopen failed. " +
" Unable to move block file " + blkfile +
" to tmp dir " + f);
}
}
volumeMap.put(b, new DatanodeBlockInfo(v));//将此块信息记录到哈希表中
}
if (f == null) {//临时文件创建失败
DataNode.LOG.warn("Block " + b + " reopen failed " +
" Unable to locate tmp file.");
throw new IOException("Block " + b + " reopen failed " +
" Unable to locate tmp file.");
}
ongoingCreates.put(b, new ActiveFile(f, threads));//将此块添加到最近操作文件列表
}
try {
if (threads != null) {//对于恢复模式要等待操作线程的结束
for (Thread thread:threads) {
thread.join();
}
}
} catch (InterruptedException e) {
throw new IOException("Recovery waiting for thread interrupted.");
}
//
// Finally, allow a writer to the block file
// REMIND - mjc - make this a filter stream that enforces a max
// block size, so clients can't go crazy
//
File metafile = getMetaFile(f, b);
return createBlockWriteStreams( f , metafile);//返回块文件输出流和块元数据文件输出流
}
[java]
public void updateBlock(Block oldblock, Block newblock);//通过调用tryUpdateBlock将旧块更新成新块(旧块大小大于新块大小),如果有线程正在对旧块进行操作,则中断这些线程,并等待线程的结束。
tryUpdateBlock用于将旧块截断成新块(调用truncateBlock),并截断相应的元数据文件,以及更新ongoingCreates、volumeMap。
truncateBlock用于将旧块截断成新块(旧块比较大),并截断旧块的元数据文件,然后对阶段和的元数据文件的最后一个校验和字段重新计算(因为截断操作中,文件