学习 Flink(六):检查点和保存点

更新至 Flink 1.8.0

检查点

检查点(Checkpoint)机制是 Flink 实现错误容忍机制的核心。通过持续以异步的方式保存轻量级的镜像,当错误(机器、网络或者软件原因)发生时,系统重启操作并重置操作到最新保存成功的检查点。

Flink 镜像参考论文:Lightweight Asynchronous Snapshots for Distributed Dataflows

Flink 检查点需要两个前提:

  • 持久化数据源支持重放(Replay)数据,如 Kafka 和 HDFS;
  • 用于持久化状态的存储,如 HDFS 和 RocksDB;

默认情况下,检查点是禁用的,需要手动启用检查点,如下所示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStateBackend(new FsStateBackend(path)); // ①

env.enableCheckpointing(1000); // ②

/* ********
 * 高级选项 *
 * ****** */

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // ③  
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // ④  

① 设置状态存储后端,支持内存、文件系统和 RocksDB;

② 启用检查点并每秒保存一次,单位:毫秒;

③ 设置保存点模式为恰好一次;

④ 取消时保留检查点。

保存点

保存点通过 Flink 检查点机制保存了任务运行过程中状态的镜像。通常用于停止并恢复、分发和任务更新。

触发保存点:

bin/flink savepoint <任务 ID> [<保存点目录>] -yid <YARN 应用 ID>  

取消时触发保存点:

bin/flink cancel -s [<保存点目录>] <任务 ID> -yid <YARN 应用 ID>  

从保存点恢复运行:

bin/flink run -s <保存点目录> -c <主类> -m <JM 地址> -p <并发数> app.jar  

从保留的检查点恢复与从保存点恢复是一样的。

如果不指定保存点目录,默认为 state.savepoints.dir 配置,可以通过编辑 conf/flink-conf.yml 文件修改:

state.savepoints.dir: hdfs://RM/user/flink/savepoints  

检查点 VS 保存点

检查点是自动完成的,保存点是手动完成的。

检查点是轻量级的,保存点是重量级的。

检查点支持 RocksDB,保存点仅支持文件系统。

参考