DataNode本地数据存储和管理(三)
于存放数据文件
private File tmpDir;//tmp文件
private File detachDir; //此文件夹用于 copy-on-write机制,用于将复制的块文件临时存放在此文件夹中,
//(DataNodeBlockInfo.detachFile),然后将此文件移动到current位置,用于断掉硬链接。
private long reserved;//预留的容量大小
private DU dfsUsage;//会启动一个线程,用于实时更新系统容量:执行Shell.runCommand启动一个操作系统线程ProcessBuilder执行`du -sk dirPath`,然后调用DU.parseExecResult将执行结果解析出来,读出其中的关于文件夹字节的描述并赋给DU.used变量。同时FSVolume.addBlock也会对DU.used进行改变,但是这只是在DU.used上进行累加block.getNumBytes()+metaFile.length()。DU.refreshUsed是周期性的执行,并直接赋值给DU.used。
private DF usage;//usage是用于统计磁盘的使用情况和相关信息:容量capacity、使用率used、挂载位置mount。unix df返回格式如下:
Filesystem 1K-blocks Used Available Use% Mounted on
/dev/sda2 18306828 2851076 14525808 17% /
当需要获得这些信息的时候,会调用Shell.runCommand执行`df -k dirPath`,从DF.parseExecResul中对执行结果进行解析。与DU不同的是DF只有在想要查看的时候才调用,故不需要特地开启一个线程。
FSVolume管理多种类型文件夹:current、detach、tmp,每个文件夹是一个FSDir类。
当一个block在写时,会使得其所在的文件处于活跃状态(局部性原理???),FSDataset中引入ActiveFile表示一个活跃的文件及操作这个文件的线程(存在多个线程同时要操作文件的情况),
[java]
final List threads = new ArrayList(2);
创建ActiveFile时会默认把当前线程添加进去。
FSVolume.recoverDetachedBlocks用于将detach中未恢复的文件移动到current下,将detach文件夹下的在current中已存在的文件删除。这操作是在DataNode启动时候进行的。之所以需要对detach进行恢复是因为detach文件有可能在复制文件时系统崩溃。
FSDataset实现了接口FSDatasetInterface。FSDatasetInterface是DataNode对底局存储的抽象。
FSDataset中包含了以下变量:
[java]
FSVolumeSet volumes;//FSVolume集合(Storage集)
private HashMap ongoingCreates = new HashMap();//保存最近操作过的Block集合以及其所在的文件
private int maxBlocksPerDir = 0;//每个目录所能容纳的最多块数dfs.datanode.numblocks,默认64
private HashMap volumeMap = null;//以本地块作为key,对应的DatanodeBlockInfo为value的哈希表
File getMetaFile(Block b);//要获得一个块对应的元数据文件,要得到:块文件所在的绝对路径+块文件名(找到块的DatanodeBlockInfo中的file,这个file是块文件名字)+块元数据文件的时间戳(通过Block.getGenerationStamp())。
public synchronized BlockInputStreams getTmpInputStreams(Block b,
long blkOffset, long ckoff);//用于获取由临时块b的输入流及其对应的元数据文件的输入流构成的BlockInputStreams;临时块是指位于tmp目录下的尚未移到current目录
BlockWriteStreams writeToBlock(Block b, boolean isRecovery) //用于开始向块b写入数据,获取由块文件输出流和块元数据文件输出流构成的BlockWriteStreams。
[java]
public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
//
// Make sure the block isn't a valid one - we're still creating it!
//
if (isValidBlock(b)) {//如果此块存在
if (!isRecovery) {//本次不是恢复操作
throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
}
detachBlock(b, 1);//本次是恢复操作,就分离文件。因为writeToBlock是要开始对块进行写入。
}
long blockSize = b.getNumBytes();//块大小
File f = null;
List threads = null;
synchronized (this) {
ActiveFile activeFile = ongoingCreates.get(b);//此块在活动列表中是否存在。此块近期是否被访问过
if (activeFile != null) {//存在
f = activeFile.file;//块文件
threads = activeFile.threads;//操作块的线程
if (!isRecovery) {//非恢复模式
throw new BlockAlreadyExistsException("Block " + b +
" has already been started (though not completed), and thus cannot be created.");
} else {//恢复模式
for (Thread thread:threads) {//关闭对此块进行操作的线程