Hadoop内核分析之Hadoop文件存储细节

众所周知,我们需要Hadoop来分布式存储我们的数据,提高并发和吞吐量,造就了Mapreduce框架的易用性。那对于整个这个过程来说,最开始需要我们认识到的是文件是如何存储在hadoop系统上的。

Hadoop可以分为三个部分,Client端,namenode端和datanode端。他们之间的协作做成了这个庞大的分布式文件系统。文件从客户端这个接口,进入系统,由客户端和namenode通信,使用反射机制,告知Client文件所需要存储的datanode列表,然后就可以进行传输了,当然,我们在这里屏蔽了所有hadoop错误处理的过程,即便这是hadoop的最大的优势之一。

大体的过程知道了,那么下面我们可以深入源码,来看看具体的实现。

首先在Client端,假设你写了一个mapreduce程序就是用来存储一个文件的,代码如下:

String localSrc = "/root/Desktop/examp.c";
  String dst = "/fangpei/examp.c";
  InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
  Configuration conf = new Configuration();
  URI uri = URI.create(dst);
  FileSystem fs = FileSystem.get(uri, conf);
  OutputStream out = fs.create(new Path(dst),new Progressable(){
   public void progress(){
    System.out.print(".");
   
   }
  });
IOUtils.copyBytes(in, out, 4096,true); 

FileSystem.get(uri, conf);

用来获取文件系统。其中FileSystem类是抽象类,会进入内存的cache寻找已创建的文件系统(FileSystem.cache.get(uri,conf)),如果没有则最终会调用newInstance去创建DistrubutedFileSystem这个类的对象,放入cache中并返回这个文件系统实例。

OutputStream out = fs.create(new Path(dst),new Progressable(){
   public void progress(){
    System.out.print(".");
   
   }
  });

这个create会调用DistributedFileSystem的create。这个create创建了一个DFSOutputStream输出流,在这个输出流的构造函数中会映射调用namenode方法用于在分布式系统上创建一个文件。

namenode.create(src, masked, clientName, overwrite, createParent, replication, blockSize);

然后启动streamer

streamer.start();

DataStreamer这个线程类用于真正的数据传输。里面保持了一个dataqueue,会不停的监听这个队列是否为空。

启动完数据监听守护进程后,对数据IO通道的读写操作如下:

IOUtils.copyBytes(in, out, 4096,true);

其中,copyBytes最终会调用到writechunk函数,该函数如下:

currentPacket.writeChecksum(checksum, 0, cklen);
        currentPacket.writeData(b, offset, len);
        currentPacket.numChunks++;
        bytesCurBlock += len;
............................
        enqueueCurrentPacket();

这里就是将数据包组装并且压进队列,然后DataStreamer就会执行发送队列。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/7634cbb9e28670d1526b00fc81969c93.html