news 2026/4/16 18:10:22

Flink SQL 性能调优MiniBatch、两阶段聚合、Distinct 拆分、MultiJoin 与 Delta Join 一文打通

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink SQL 性能调优MiniBatch、两阶段聚合、Distinct 拆分、MultiJoin 与 Delta Join 一文打通

1. 为什么 Flink SQL 会慢:状态与放大效应

Flink Table/SQL 的性能瓶颈高频出现在两类算子:

1)聚合(Group Aggregation / Window TVF Aggregation)
默认逐条处理:读 state → 更新 accumulator → 写回 state。RocksDB 场景下尤其“读写上瘾”,数据倾斜时还会出现热点 key,轻松 backpressure。

2)常规 Join(Regular Join)
同样逐条处理:查对侧 state → 更新本侧 state → 产出 join 结果。多表级联 join 时,中间结果会“记录放大”,state 变大、反查更慢、checkpoint 更重,作业稳定性直线下降。

接下来我们用一组优化把这两个痛点逐个拆掉。

2. MiniBatch 聚合:把“每条一次状态读写”变成“一批一次”

2.1 核心思想

MiniBatch 聚合会先把输入缓存到算子内部 buffer,触发时再批量处理。同一个 key 在一个 batch 内可以被折叠,状态访问从“每条一次”降低到“每个 key 一次”。

收益
显著减少 state 访问次数,提高吞吐,尤其是 RocksDB StateBackend。

代价
会引入额外延迟(因为要攒一批再算),吞吐与延迟的典型 trade-off。

2.2 开启方式(Java 配置)

// instantiate table environmentTableEnvironmenttEnv=...;// access flink configurationTableConfigconfiguration=tEnv.getConfig();configuration.set("table.exec.mini-batch.enabled","true");// 开启 mini-batchconfiguration.set("table.exec.mini-batch.allow-latency","5 s");// 允许缓存 5sconfiguration.set("table.exec.mini-batch.size","5000");// 每个 task 缓存最大记录数

2.3 一个经验值怎么选

  • allow-latency:先按 1s~5s 试,目标是“吞吐明显上升但业务还能接受延迟”
  • size:按单条记录大小与并发估算内存,5000/10000 常见,越大越容易提升吞吐但越吃内存、延迟越高
  • RocksDB 场景通常更值得开(state 读写成本更高)

2.4 Window TVF Aggregation 的特殊点

Window TVF 聚合默认总是开启 MiniBatch,而且它使用托管内存(managed memory)缓存,不走 JVM 堆,GC/OOM 风险小一些。Group Aggregation 则需要你显式开启。

3. Local-Global 两阶段聚合:专治数据倾斜与热点 key

3.1 为什么“两阶段”能治倾斜

GROUP BY的 key 倾斜时(比如某个 color/day 的数据量巨大),某些聚合实例会变成热点。两阶段聚合把聚合拆成:

  • Local 聚合:上游先在本地做一次预聚合(类似 MapReduce 的 Combine)
  • Global 聚合:下游再把各个 local 的 accumulator 合并

这样网络 shuffle 的数据量减少了,state 访问也减少了,热点压力被分摊。

3.2 注意:它依赖 MiniBatch

Local-Global 的“攒一波再合并”依赖 MiniBatch 的触发节奏,所以必须先开启 MiniBatch。

3.3 开启方式(Java 配置)

TableEnvironmenttEnv=...;TableConfigconfiguration=tEnv.getConfig();configuration.set("table.exec.mini-batch.enabled","true");configuration.set("table.exec.mini-batch.allow-latency","5 s");configuration.set("table.exec.mini-batch.size","5000");// 两阶段聚合:TWO_PHASEconfiguration.set("table.optimizer.agg-phase-strategy","TWO_PHASE");

3.4 适用场景

  • SUM / COUNT / MAX / MIN / AVG 等普通聚合 + 明显倾斜
  • 需要降低 shuffle 与 RocksDB state 读写

不太适用

  • DISTINCT 聚合(下一节讲)

4. Split Distinct Aggregation:让 COUNT(DISTINCT) 也能水平扩展

4.1 为什么 DISTINCT 聚合难搞

COUNT(DISTINCT user_id)如果 user_id 很稀疏,Local-Global 并不能有效减少数据:local accumulator 里几乎还是“原始全集”,全压到 global 上,global 仍是瓶颈。

4.2 解决思路:加一个 bucket key

把 distinct 拆成两层:

第一层:按group key + bucket key聚合
bucket key 由MOD(HASH_CODE(distinct_key), BUCKET_NUM)得到,默认 BUCKET_NUM=1024,可配置。

第二层:按原 group key 再聚合,把各 bucket 的结果 SUM 起来。

等价性
同一个 distinct_key 会落在同一个 bucket,去重逻辑不变,但热点被 1024 个桶分摊,吞吐更稳定。

4.3 自动改写示例

原 SQL:

SELECTday,COUNT(DISTINCTuser_id)FROMTGROUPBYday;

开启后会被等价改写为类似:

SELECTday,SUM(cnt)FROM(SELECTday,COUNT(DISTINCTuser_id)AScntFROMTGROUPBYday,MOD(HASH_CODE(user_id),1024))GROUPBYday;

4.4 开启方式

tEnv.getConfig().set("table.optimizer.distinct-agg.split.enabled","true");

可调 bucket 数:

  • table.optimizer.distinct-agg.split.bucket-num

4.5 限制点

目前不支持包含用户自定义 AggregateFunction 的聚合(distinct 拆分无法保证通用等价)。

5. DISTINCT 多维 UV:用 FILTER 替代 CASE WHEN,省 state、提性能

很多人写多维 UV 喜欢这样:

COUNT(DISTINCTCASEWHENflagIN(...)THENuser_idELSENULLEND)

更推荐用标准 SQL 的 FILTER:

SELECTday,COUNT(DISTINCTuser_id)AStotal_uv,COUNT(DISTINCTuser_id)FILTER(WHEREflagIN('android','iphone'))ASapp_uv,COUNT(DISTINCTuser_id)FILTER(WHEREflagIN('wap','other'))ASweb_uvFROMTGROUPBYday;

关键收益
优化器能识别“同一 distinct_key(user_id)+ 不同 filter 条件”,从而复用状态:原本可能要 3 份 distinct state,现在可以共享一份,state 大小与访问次数都下降,特别适合高基数 UV 指标。

6. MiniBatch Regular Join:减少 state IO + 抑制冗余输出

6.1 常规 Join 的痛点

逐条 join 会导致:

  • 频繁查对侧 state(RocksDB 更慢)
  • 级联 join 时记录放大严重,中间结果爆炸

6.2 MiniBatch Join 做了两件事

1)buffer 内折叠记录:join 前先把同 key 的变更合并,减少参与 join 的数据量
2)尽量抑制冗余结果:buffer 处理时尽可能不输出多余的中间变更

6.3 开启方式(SQL 例子)

SET'table.exec.mini-batch.enabled'='true';SET'table.exec.mini-batch.allow-latency'='5S';SET'table.exec.mini-batch.size'='5000';SELECTa.idASa_id,a.a_content,b.idASb_id,b.b_contentFROMaLEFTJOINbONa.id=b.id;

说明
Regular Join 的 MiniBatch 默认是关闭的,需要显式开启(同聚合一样的三项参数)。

6.4 适用建议

  • Join 两侧是 Upsert / CDC 场景、同 key 变更频繁,buffer 折叠收益巨大
  • 级联 join 的作业,先上 mini-batch join 往往能立刻看到 state 与吞吐改善

7. 多表 Regular Join:MultiJoin(Flink 2.1)把“中间 state”直接砍掉

当你的 SQL 是多表非时态 regular join,最常见的故障模式是:state 越跑越大,checkpoint 越来越慢,作业越来越不稳。

Flink 2.1 引入 MultiJoin Operator,核心目标是:

零中间 state(zero intermediate state)
不再为 join 链上的每个二元 join 存中间结果状态,而是把多个流同时在一个算子里 join,显著减少 state。

7.1 什么时候启用最划算

如果满足两点,就非常值得试:

  • 作业有多个 join,且 join 条件共享至少一个公共 join key(能按同一 key 分区)
  • 你观察到:中间 join 的 state 比输入表 state 还大(典型记录放大链路)

7.2 开启方式

SET'table.optimizer.multi-join.enabled'='true';

7.3 支持与限制要点(很重要)

  • 当前处于实验状态(可能有优化与 breaking change)

  • 目前支持 streaming INNER / LEFT joins

  • RIGHT join 计划支持(但你上线前要以实际版本为准)

  • 分区要求:至少有一条可以把多表一起 partition 的 key

    • 支持:A JOIN B ON A.key = B.key JOIN C ON A.key = C.key
    • 支持:A JOIN B ON A.key = B.key JOIN C ON B.key = C.key(传递性)
    • 不支持:A.key1=B.key1 且 B.key2=C.key2(没有统一 key,会拆成多个 MultiJoin)

7.4 经验结论

  • 记录放大越明显,MultiJoin 越能“越跑越稳”
  • 如果 join 链反而让 state 变小(较少见),二元 join 可能更快,但 MultiJoin 通常 still 更省总 state

8. Delta Join:用“外部索引 + 双向查表”替代“大 state”,稳定性直接起飞

8.1 为什么 Delta Join 能把 state 压下去

传统 regular join 必须把两侧历史数据都存进 Flink state,以确保对侧迟到时还能匹配。

Delta join 的思路是:
不在 Flink state 里囤全量数据,而是借助外部存储系统的索引能力(例如 Apache Fluss 提供索引信息),直接对外部系统做高效索引查询来完成匹配。这样 Flink state 与外部存储之间不会重复存储同一份数据。

效果
state 大幅缩小,checkpoint 压力下降,作业长期运行更稳。

8.2 默认策略

Delta join 默认开启;当满足条件时 regular join 会自动被优化为 delta join。

如需关闭:

SET'table.optimizer.delta-join.strategy'='NONE';

8.3 可调缓存参数(调优入口)

  • table.exec.delta-join.cache-enabled
  • table.exec.delta-join.left.cache-size
  • table.exec.delta-join.right.cache-size

8.4 支持与限制(上线前务必自查)

支持

  • INSERT-only source 表
  • 没有 DELETE 的 CDC source 表
  • delta join 前的 projection / filter
  • 算子内部缓存

限制:出现以下任一情况就不能优化成 delta join

  • 表的 index key 必须在 join 等值条件中
  • 目前只支持 INNER JOIN
  • 下游必须能处理重复变更(例如 UPSERT sink 且没有 upsertMaterialize 时可能不行)
  • CDC 场景:join key 必须是主键的一部分
  • CDC 场景:所有 filter 必须应用在 upsert key 上
  • filter / projection 中不允许非确定性函数

9. 一套“按症下药”的调优落地清单

9.1 你看到 backpressure + RocksDB state 读写很重

优先做

  • 开 MiniBatch(聚合/Join 都考虑)
  • 聚合倾斜明显就上 TWO_PHASE(Local-Global)

9.2 你有 COUNT DISTINCT 且 group key 热点

优先做

  • 开 distinct-agg split(bucket 拆分)
  • UV 多维统计用 FILTER 替换 CASE WHEN,争取共享 state

9.3 你有多表级联 join,state 越跑越大

优先做

  • 开 mini-batch join(先抑制记录放大)
  • Flink 2.1 且满足公共 key 分区条件:尝试 MultiJoin
  • 如果 source 外部系统具备索引能力且满足限制:让 regular join 自动转 delta join(或检查为何没转)

10. 结语:把“默认策略”切到“更适合生产负载的策略”

Flink SQL 的默认执行策略是通用稳妥型,但生产负载往往更偏“状态密集 + 倾斜 + 多表 join + CDC 变更频繁”。你这份调优组合拳的核心路线很清晰:

  • MiniBatch:用吞吐换少量延迟,换来 state IO 大幅下降
  • Local-Global:治倾斜、减 shuffle
  • Distinct 拆分 + FILTER:让 UV 指标也能扩展、还能省 state
  • MiniBatch Join:减少中间结果与冗余输出
  • MultiJoin:多表 join 直接砍中间 state
  • Delta Join:把大 state 变成外部索引查表,长期稳定运行
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 12:34:25

创客匠人:智能体激活 IP 情绪价值 —— 从 “知识输出” 到 “情绪共鸣” 的知识变现新范式

一、网易云的 “情绪魔法” 启示 ——IP 变现的终极竞争力是 “让人感觉被懂” 2025 年网易云年度听歌报告刷屏时,没人深究 “听了多少首歌” 的冰冷数据,却对 “需要一点安静”“反复确认自己” 的情绪解读念念不忘。这背后藏着一个被多数 IP 忽视的真…

作者头像 李华
网站建设 2026/4/16 16:04:33

从新手到专家:C# Lambda函数的7个必知应用场景

第一章:C# Lambda函数的概述与核心概念Lambda函数是C#中一种简洁、高效的匿名函数表达方式,允许开发者以内联形式定义可作为委托传递的代码块。它极大地提升了代码的可读性和编写效率,尤其在LINQ查询和集合操作中被广泛使用。语法结构与基本用…

作者头像 李华
网站建设 2026/4/16 16:08:47

C# using别名你真的懂吗:如何安全操控不安全类型提升效率

第一章:C# using别名的本质与作用在C#语言中,using关键字不仅用于资源管理,还支持为命名空间或类型创建别名。这种别名机制本质上是编译器层面的符号映射,能够在不改变原始类型的前提下,为复杂或冲突的类型名称提供简洁…

作者头像 李华
网站建设 2026/4/16 16:27:25

Unity引擎实时渲染画面+HeyGem后期配音合成

Unity引擎实时渲染画面 HeyGem后期配音合成 在企业培训视频制作的日常中,你是否遇到过这样的场景:同一套课件内容,需要为不同地区、不同语言的员工分别录制讲解视频?传统方式下,这意味着重复搭建场景、反复调整灯光动…

作者头像 李华
网站建设 2026/4/16 9:04:04

飞书妙记转写文字+TTS生成音频+HeyGem合成

飞书妙记转写文字 TTS生成音频 HeyGem 合成数字人视频:构建高效 AIGC 视频生产线 在企业内容生产日益高频、个性化的今天,一个常见的痛点浮现出来:如何快速将一场会议、一次培训或一段讲稿,变成多个版本的专业级播报视频&#x…

作者头像 李华
网站建设 2026/4/16 9:03:32

汽车客运站大变样!护照阅读器成出行新“神器”

在保障车站安全方面,护照阅读器更是 “功不可没”。它能与公安系统联网,实时比对旅客身份信息和重点人员数据库。一旦发现可疑人员,比如失信被执行人、在逃人员,系统立即发出警报,车站工作人员和安保人员可及时采取措施…

作者头像 李华