flink 使用 savepoint 保存任务状态,并从保存点恢复任务。

代码处理

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

在 map 与 source 操作中添加自定义 id。

flink 配置

flink 中需要配置 savepoint 目录

state.savepoints.dir: hdfs:///user/test/flink/flink-savepoints

可设置为 hdfs 目录。

使用示例

触发 savepoint

./flink savepoint :jobId

取消任务并保存任务状态

./flink cancel -s jobId

此命令会在命令行中打印出该任务的 savepoint 保存路径,如:

Triggering savepoint for job jobId.
Waiting for response...
Savepoint completed. Path: hdfs://nameservice/user/test/flink/flink-savepoints/savepoint-test
You can resume your program from this savepoint with the run command.

从 savepoint 恢复运行任务:

flink run -s savepoint-path test.jar args

savepoint-path 为 上面任务取消命令中打印出的路径,其他与常规提交任务相同。

以上!

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html