news 2026/4/16 9:05:27

Flink学习笔记:如何做容错

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink学习笔记:如何做容错

现在我们已经了解了 Flink 的状态如何定义和使用,那 Flink 是如何做容错的呢?今天我们一起来了解一下。

先来回答问题, Flink 是通过状态快照来做容错的,在 Flink 中状态快照分为 Checkpoint 和 Savepoint 两种。

Checkpoint

Checkpoint 是一种自动执行的快照,其目的是让 Flink 任务可以从故障中恢复。它可以是增量的,并且为快速恢复进行了优化。

如何开启 Checkpoint

Checkpoint 默认是关闭的,开启的方法很简单,只需要调用 enableCheckpointing() 方法即可。除了这个方法之外,Checkpoint 还有一些高级特性。我们来看几个比较常用的,更多的选项可以查看官方文档。

/* by 01022.hk - online tools website : 01022.hk/zh/checkkeyword.html */ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(1000); // 高级选项: env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setExternalizedCheckpointRetention( ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().enableUnalignedCheckpoints();
  • CheckpointingMode:支持 EXACTLY_ONCE 和 AT_LEAST_ONCE 两种,精确一次有更好的数据一致性,而至少一次可以提供更低的延迟。

  • MinPauseBetweenCheckpoints:Checkpoint 之间最小间隔时间,单位是毫秒,即前一次 Checkpoint 执行完成之后必须间隔 n 毫秒之后才会开启下一次 Checkpoint。

  • CheckpointTimeout:Checkpoint 超时时间,单位为毫秒,表示 Checkpoint 必须在 n 毫秒内完成,否则就会因超时失败。

  • TolerableCheckpointFailureNumber:可容忍连续失败次数,默认是0。超过这个阈值之后,整个 Flink 作业会触发 fail over。

  • MaxConcurrentCheckpoints:Checkpoint 并发数,默认情况下是1,在同一时间只允许一个 Checkpoint 执行。这个参数不能和最小间隔时间一起使用。

  • ExternalizedCheckpointRetention:周期存储 Checkpoint 到外部存储,这样在任务失败时 Checkpoint 也不会被删除。

  • enableUnalignedCheckpoints:使用非对齐的 Checkpoint,可以减少在产生背压时 Checkpoint 的创建时间。

Checkpoint 存储

Flink 提供了两种存储类型:JobManagerCheckpointStorage 和 FileSystemCheckpointStorage。默认是 JobManagerCheckpointStorage,即将 Checkpoint 快照存储在 JobManager 的堆内存中,也可以设置 Checkpoint 目录,将快照存储在外部存储系统中。

Checkpoint 目录通过 execution.checkpointing.dir 设置项设置。其目录结构如下:

/* by 01022.hk - online tools website : 01022.hk/zh/checkkeyword.html */ /user-defined-checkpoint-dir /{job-id} | + --shared/ + --taskowned/ + --chk-1/ + --chk-2/ + --chk-3/ ...
Checkpoint 工作原理

在前文中,我们曾经提到过 Checkpoint Coordinator,它是 JobManager 的其中一个模块。它在 Checkpoint 过程中担任着重要的角色。

现在来看下 Checkpoint 的完整流程

1、Checkpoint Coordinator 触发所有 Source 节点开始 Checkpoint,Source 收到触发命令后,会将自己的 State 进行持久化(图中三角形),并且向下游发送 barrier 事件(图中的小矩形)。当 Source 节点的 State 持久化完成之后,会数据存储的地址发送给 Checkpoint Coordinator。

2、barrier 事件随着事件流传输到下游节点,当下游节点收到所有的上游 barrier 事件后,也会将自己的 State 持久化,并继续向下传播 barrier 事件。持久化完成后,也同样将数据存储地址发送给 Checkpoint Coordinator。

3、当所有的算子都完成持久化过程后,Checkpoint Coordinator 会将一些元数据进行持久化。

至此,一次完整的 Checkpoint 流程就结束了。

Savepoint

学习完 Checkpoint 之后,我们再来了解下另一种快照——Savepoint。

Savepoint 是依据 checkpoint 机制创建的一致性镜像。通常用来做 Flink 作业的重启或更新等运维操作。Savepoint 包含稳定存储上的二进制文件(作业状态的镜像)和元数据文件两部分。

使用 Savepoint

根据官方文档的提示,在我们的程序中,最好显式调用 uid() 方法来为算子指定一个 ID,这些 ID 被用来恢复每个算子的状态。如果不指定的话,Flink 任务会自动生成算子 ID,但是生成的 ID 与程序结构有关,也就是说,如果程序的结构改变了的话,就没有办法从 Savepoint 恢复对应算子的状态了。

有了这个前提条件之后,我们就可以使用命令来操作 Savepoint 了。

// 触发 savepoint $ bin/flink savepoint :jobId [:targetDirectory] // 触发 savepoint, 指定 type,默认是 canonical $ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory] // 触发 savepoint,客户端拿到 trigger id 后立即返回 $ bin/flink savepoint :jobId [:targetDirectory] -detached // 使用 savepoint 停止作业 $ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId // 从 savepoint 恢复 $ bin/flink run -s :savepointPath [:runArgs] // 删除 savepoint $ bin/flink savepoint -d :savepointPath

在 触发 savepoint 时,我们可以指定格式,两种格式的区别是:

  • canonical(标准格式):在任何存储都保持统一格式,重在保证兼容性。

  • native(原生格式):标准格式创建和恢复都很慢,原生格式是以特定的状态后端的格式生成,可以更快的创建和恢复。

Checkpoint 与 Savepoint 区别

这是面试最常见的问题之一,有了 checkpoint,为什么还需要 savepoint?或者说两者之间有什么区别?

从概念上来讲,Checkpoint 类似数据库的恢复日志,而 Savepoint 类似数据库的备份。Checkpoint 主要用于作业故障的恢复,它的管理和删除也都是 Flink 内部处理,用户不需要过多关注。Savepoint 主要用于有计划的手动运维,例如升级 Flink 版本。它的创建、删除操作都需要用户手动执行。

下面是官方文档给出的 Checkpoint 和 Savepoint 支持的操作。✓表示完全支持,x表示不支持,!表示目前有效,但没有正式保证支持,使用时存在一定风险。

操作标准 Savepoint原生 Savepoint对齐 Checkpoint非对齐 Checkpoint
更换状态后端xxx
State Processor API (写)xxx
State Processor API (读)!!x
自包含和可移动xx
Schema 变更!!!
任意 job 升级x
非任意 job 升级
Flink 小版本升级x
Flink bug/patch 版本升级
扩缩容

总结

本文我们介绍了 Flink 是如何做容错的,分别介绍了 Checkpoint 和 Savepoint,以及它们之间的区别。本文多次提到了 Checkpoint 和 Savepoint 依赖的稳定存储,我会在下一篇文章进行详细的介绍。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/15 14:21:02

vue基于springboot的社区健身服务_yob3w0op_

目录 具体实现截图项目介绍论文大纲核心代码部分展示项目运行指导结论源码获取详细视频演示 :文章底部获取博主联系方式!同行可合作 具体实现截图 本系统(程序源码数据库调试部署讲解)同时还支持java、ThinkPHP、Node.js、Spring…

作者头像 李华
网站建设 2026/4/16 9:01:25

题解:AT_abc436_f

题面 Starry Landscape Photo 问题描述 在 AtCoder 行星上看到的夜空中,有 NNN 颗星星,这些星星从东到西排成一条直线。从东方数起的第 iii 颗星(1≤i≤N1 \le i \le N1≤i≤N)是这些星星中第 BiB _ iBi​ 亮的。 Takahashi 决…

作者头像 李华
网站建设 2026/4/10 22:20:19

每天一个网络知识:什么是堆叠?

在企业网络、数据中心或学校机房中,我们常常会看到多个交换机整齐排列在机柜里。随着网络规模增加,设备数量越来越多,如何让这些交换机更高效地协同工作、简化管理、提高可靠性? 其中一个非常重要的技术就是 “堆叠(S…

作者头像 李华
网站建设 2026/4/16 1:42:56

Django WiFi文件分享

项目介绍 在日常工作和生活中,我们经常需要在电脑和手机之间传输文件。传统的传输方式要么需要数据线连接,要么需要借助第三方应用,操作繁琐且不够高效。今天,我将介绍一个基于Django开发的WiFi文件分享应用,它可以让你通过电脑选择本地文件夹,生成访问二维码,然后通过…

作者头像 李华
网站建设 2026/4/16 1:40:56

《高压电气连接器必备指南》

高压电气连接器对于工作电压超过 60V 的电路以及汽车和工业应用中的关键组件至关重要。它们促进大电流的传输——特别是在电动汽车中——连接电池组、电机控制器和充电器等关键部件。高压电气连接器中使用的材料以下是在高压连接器开发和使用中常用的关键材料:导电材…

作者头像 李华
网站建设 2026/4/14 5:43:12

springboot月度员工绩效考核管理系统(11488)

有需要的同学,源代码和配套文档领取,加文章最下方的名片哦 一、项目演示 项目演示视频 二、资料介绍 完整源代码(前后端源代码SQL脚本)配套文档(LWPPT开题报告)远程调试控屏包运行 三、技术介绍 Java…

作者头像 李华