Flink StateBackend:Checkpoint Snapshot

Checkpoint Snapshot

由 ExecutionGraph.CheckpointCoordinator 来触发,StreamInputProcessor 中做完 Barrier 对齐之后,会调用 Operator 的 snapshotState 来做 Checkpoint,主要代码:
AbstractStreamOperator.snapshotState(...) {
// Take snapshot for TimeService + Raw State
snapshotState(snapshotContext);

// Take snapshot for operatorState
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)

// Take snapshot for keyedState
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)
}

CheckpointStreamFactory

提供了对底层存储的封装,上层的 StateBackend 和 RawState(由用户管理) 在 Checkpoint 时会将自身维护的所有 State 序列化后写入由 CheckpointStreamFactory 初始化的 CheckpointStateOutputStream。
public interface CheckpointStreamFactory {
CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException;
}
根据存储介质不同目前主要由两种实现
Flink StateBackend:Checkpoint Snapshot
MemCheckpointStreamFactory:初始化基于内存的 StateOutputStream
FsCheckpointStreamFactory:初始化基于分布式文件系统的 StateOutputStream

CheckpointStateOutputStream

Flink StateBackend:Checkpoint Snapshot

现有的几种实现

Flink StateBackend:Checkpoint Snapshot
MemoryCheckpointOutputStream:数据写内存,基本就是封装了内存中的一个 byte[]
FsCheckpointStateOutputStream:数据小于阈值时写内存,否则写文件FileBasedStateOutputStream:写文件,主要是用来在本地写 State 数据,用来做 LocalRestart
DuplicatingCheckpointOutputStream:封装了两个 StateOutputStream,数据会同时写入两个 Stream 中。比如 FsCheckpointStateOutputStream + FileBasedStateOutputStream,达到在本地 + 远端同时写两份 State 数据的效果。

closeAndGetHandle

  • 关闭 OutputStream,返回包含写入数据的 StateHandle,JM 会将 StateHandle 持久化,后续恢复 State 时会通过 StateHandle 来检索 State 数据
  • 对于 MemCheckpointStreamFactory
    • 返回 ByteStreamStateHandle,封装了内存中包含写入的 State 数据的 byte[]
  • 对于 FsCheckpointStreamFactory
    • 数据小于阈值,返回 ByteStreamStateHandle
    • 大于阈值,返回 FileStateHandle,包含存储 State 数据的文件信息

State、StateBackend、CheckpointStreamFactory、Memory/FS 之间关系

Flink StateBackend:Checkpoint Snapshot

Snapshot in DefaultOperatorStateBackend

DefaultOperatorStateBackend
CheckpointStreamFactory.CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

// 将 StateMetaInfo 序列化后写入 localOut
// StateMetaInfo:name,serializers 等
writeStateMetaInfoSnapshots(operatorStateMetaInfoSnapshots, localOut);
writeStateMetaInfoSnapshots(broadcastStateMetaInfoSnapshots, localOut);

// 将 State 数据序列化后写入 localOut
writeOperatorState(registeredOperatorStates, localOut);
writeOperatorState(registeredBroadcastStates, localOut);

// 最后将 State offset 等信息封装到 OperatorStateHandle 交由 JM 来持久化
备注
  1. MetaInfo + State 数据一个文件
  1. SnapshotResult: OperatorStateHandle,包含了 StateMetaInfo,State 在 Stream 中的 offset 等信息,交由 JM 来持久化,Checkpoint 恢复时会用到

Snapshot in RocksDBKeyedStateBackend

全量备份

全量备份的实现逻辑在 RocksFullSnapshotStrategy.doSnapshot,主要工作
  1. 从 CheckpointStreamFactory 获取 StateOutputStream
  1. 写 StateMetaInfo 到 StateOutputStream:state name, colmnFamily 等信息
  1. RocksDB scan 拿到所有的 k-v,序列化后写入 StateOutputStream
备注
  1. MetaInfo + k-v 只有一个文件
  1. SnapshotResult 包含了 KeyGroupsStateHandle,功能同 OperatorStateHandle 类似

增量备份

增量备份的实现逻辑在 RocksIncrementalSnapshotStrategy.doSnapshot,主要工作:
  1. 从 CheckpointStreamFactory 获取 StateOutputStream
  1. 写 StateMetaInfo 到 StateOutputStream, state name, colmnFamily 等信息
  1. 拿到上一个成功的 snapshot,算出增量的 sst,将增量的 sst upload 到 HDFS
备注
  1. MetaInfo 一个文件,每个 sst 一个独立文件
  1. SnapshotResult 结果包含了 IncrementalRemoteKeyedStateHandle or IncrementalLocalKeyedStateHandle,包含了 checkpoint id、sst 文件等信息

Snapshot in HeapKeyedStateBackend

实现细节在 HeapSnapshotStrategy.snapshot 中,总体以下几件事情
  1. 打开 StateOutputStream
  1. 遍历 StateTable 将 StateMetainfo 写入 StateOutputStream
  1. 遍历 StateTable 将 State 数据写入 StateOutputStream
  1. close StateOutputStream,返回包含 State 信息的 StateHandle
    1. 根据 State 存储介质不同,返回的 StateHandle 也不同,ByteStreamStateHandle or FileStateHandle
版权声明:农夫的狗 发表于 2021-01-13 13:04:12。
转载请注明:Flink StateBackend:Checkpoint Snapshot | 404导航

暂无评论

暂无评论...