flink 使用 savepoint 保存任务状态,并从保存点恢复任务。
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
在 map 与 source 操作中添加自定义 id。
flink 中需要配置 savepoint 目录
state.savepoints.dir: hdfs:///user/test/flink/flink-savepoints
可设置为 hdfs 目录。
./flink savepoint :jobId
./flink cancel -s jobId
此命令会在命令行中打印出该任务的 savepoint 保存路径,如:
Triggering savepoint for job jobId.
Waiting for response...
Savepoint completed. Path: hdfs://nameservice/user/test/flink/flink-savepoints/savepoint-test
You can resume your program from this savepoint with the run command.
从 savepoint 恢复运行任务:
flink run -s savepoint-path test.jar args
savepoint-path 为 上面任务取消命令中打印出的路径,其他与常规提交任务相同。
以上!
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html