Spark内核解析 (12)

RDD的持久化由Spark的Storage模块负责,实现了RDD与物理存储的解耦合。Storage模块负责管理Spark在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时Driver端和Executor端的Storage模块构成了主从式的架构,即Driver端的BlockManager为Master,Executor端的BlockManager为Slave。

Storage模块在逻辑上以Block为基本存储单位,RDD的每个Partition经过处理后位移对应一个Block(BlockId的格式为rdd_RDD-ID_PARTITION-ID)。Driver端的Master负责整个Spark应用程序的Block的元数据信息的管理和维护,而Executor端的Slave需要将Block的更新等状态上报到Master,同时接受Master的命令,例如新增或删除一个RDD。

img

在对RDD持久化时,Spark规定了MEMORY_ONLY、MEMORY_AND_DISK等7中不同的存储级别,而存储级别是以下5个变量的组合:

class StorageLevel private(

private var _useDisk: Boolean, //磁盘

private var _useMemory: Boolean, //这里其实是指堆内内存

private var _useOffHeap: Boolean, //堆外内存

private var _deserialized: Boolean, //是否为非序列化

private var _replication: Int = 1 //副本个数

)

Spark中7中存储级别如下:

img

通过对数据结构的分析,可以看出存储级别从三个维度定义了RDD的Partition(同时也就是Block)的存储方式:

(1)存储位置:磁盘/堆内内存/堆外内存。如MEMORY_AND_DISK是同时在磁盘和堆内内存上存储,实现了冗余备份。OFF_HEAP则是只在堆外内存存储,目前选择堆外内存时不能同时存储到其他位置。

(2)存储形式:Block缓存到存储内存后,是否为非序列化的形式。如MEMORY_ONLY是非序列化方式存储,OFF_HEAP是序列化方式存储。

(3)副本数量:大于1时需要远程冗余备份到其他节点。如DISK_ONLY_2需要远程备份1个副本。

2、RDD的缓存过程

RDD在缓存到存储内存之前,Partition中的数据一般以迭代器(Iterator)的数据结构来访问,这是Scala语言中一种遍历数据集合的方法。通过Iterator可以获取分区中每一条序列化或者非序列化的数据项(Record),这些Record的对象实例在逻辑上占用了JVM堆内内存的other部分的空间,同一Partition的不同Record的存储空间并不连续。

RDD在缓存到存储内存之后,Partition被转换成Block,Record在堆内或堆外存储内存中占用一块连续的空间。将Partition由不连续的存储空间转换为连续存储空间的过程,Spark称之为“展开”(Unroll)。

Block有序列化和非序列化两种存储格式,具体以哪种方式取决于该RDD的存储级别。非序列化的Block以一种DeserializedMemoryEntry的数据结构定义,用一个数组存储所有的对象实例,序列化的Block则以SerializedMemoryEntry的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个Executor的Storage模块用一个链式Map结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的Block对象的实例,对这个LinkedHashMap新增和删除间接记录了内存的申请和释放。

因为不能保证存储空间可以一次容纳Iterator中的所有数据,当前的计算任务在Unroll时要向MemoryManager申请足够的Unroll空间来临时占位,空间不足则Unroll失败,空间足够时可以继续进行。

对于序列化的Partition,其所需的Unroll空间可以直接累加计算,一次申请。

对于非序列化的Partition则要在便利Record的过程中一次申请,即每读取一条Record,采样估算其所需的Unroll空间并进行申请,空间不足时可以中断,释放已占用的Unroll空间。

如果最终Unroll成功,当前Partition所占用的Unroll空间被转换为正常的缓存RDD的存储空间,如下图所示。

img

在静态内存管理时,Spark在存储内存中专门划分了一块Unroll空间,其大小是固定的,统一内存管理时则没有对Unroll空间进行特别区分,当存储空间不足时会根据动态占用机制进行处理。

3、淘汰与落盘

由于同一个Executor的所有的计算任务共享有限的存储内存空间,当有新的Block需要缓存单数剩余空间不足且无法动态占用时,就要对LinkedHashMap中的旧Block进行淘汰(Eviction),而被淘汰的Block如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该Block。

存储内存的淘汰规则为:

被淘汰的旧Block要与新的Block的MemoryMode相同,即同属于堆外或堆内内存;

新旧Block不能属于同一个RDD,避免循环淘汰;

旧Block所属RDD不能处于被读状态,避免引发一致性问题;

遍历LinkedHashMap中Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新Block所需的空间。其中LRU是LinkedHashMap的特性。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxwxz.html