news 2026/4/16 10:47:43

Flink SQL Deduplication用 ROW_NUMBER 做流式去重

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink SQL Deduplication用 ROW_NUMBER 做流式去重

1. Deduplication 是什么,为什么流式场景尤其需要

Deduplication(去重)是在一组列(去重键)上移除重复行,只保留第一条最后一条记录。典型原因是:上游 ETL 不是端到端 exactly-once,故障恢复/重试可能把同一业务事件写了多次,导致下游 SUM/COUNT 等统计被“重复行”污染,所以需要在进一步分析前先做去重。(Apache Nightlies)

从实现角度看,Flink SQL 的去重本质上就是 Top-N 的特例:N = 1,并且按处理时间或事件时间排序。(Apache Nightlies)

2. Flink SQL 去重的标准写法(优化器识别模式)

2.1 QUALIFY 写法(更简洁)

官方给出的 Deduplication 语法是:对去重键PARTITION BY,按时间属性ORDER BY,保留ROW_NUMBER() = 1。(Apache Nightlies)

SELECT[column_list]FROMtable_name QUALIFY ROW_NUMBER()OVER(PARTITIONBYcol1[,col2...]ORDERBYtime_attr[ASC|DESC])=1;

2.2 子查询 + WHERE rownum = 1(更通用,适配更多版本)

很多生产环境更习惯用这版(逻辑更直观),并且必须包含rownum = 1,否则优化器可能无法把它翻译成 Deduplicate 算子。(Apache Nightlies)

SELECT[column_list]FROM(SELECT*,ROW_NUMBER()OVER(PARTITIONBYcol1[,col2...]ORDERBYtime_attr[ASC|DESC])ASrownumFROMtable_name)WHERErownum=1;

3. 参数语义:去重键、保留第一条/最后一条、必须是“时间属性”

3.1 PARTITION BY:去重键(deduplicate key)

PARTITION BY col1[, col2...]就是你判定“重复”的那组字段(比如 order_id、message_id、(user_id, item_id) 等)。(Apache Nightlies)

3.2 ORDER BY time_attr:决定保留第一条还是最后一条

排序列必须是时间属性(processing time 或 event time)。(Apache Nightlies)

  • ORDER BY time_attr ASC:保留最早的一条(first row)
  • ORDER BY time_attr DESC:保留最新的一条(last row)(Apache Nightlies)

3.3 处理时间 vs 事件时间:结果是否“可复现”

经验上强烈建议优先用事件时间(rowtime)做去重排序:
一些调优/最佳实践文档也明确指出:按 processing time 去重,结果会随运行时机波动;按 event time 去重,结果更确定、更可复现。(Ververica 文檔)

4. 示例:按 order_id 去重,只保留第一次出现(你给的 Orders 例子)

CREATETABLEOrders(order_id STRING,userSTRING,product STRING,numBIGINT,proctimeASPROCTIME())WITH(...);-- 按 order_id 去重,保留第一次出现(ASC)SELECTorder_id,user,product,numFROM(SELECT*,ROW_NUMBER()OVER(PARTITIONBYorder_idORDERBYproctimeASC)ASrow_numFROMOrders)WHERErow_num=1;

这个模式正是 Flink 官方去重示例中强调的写法:ROW_NUMBER + 分区键 + 时间排序 +rownum = 1。(Apache Nightlies)

5. 去重的 Changelog 语义:为什么下游可能会看到 UPDATE/撤回

流式去重不是“简单过滤一次”——当更早/更晚的记录到来时(尤其事件时间乱序),去重结果可能需要被修正,因此会产生changelog(更新流)

在 Flink SQL 的 retract 机制里,更新通常会被拆成DELETE(-U)+INSERT(+U)两条事件(先撤回旧值,再发新值)。(Confluent)

工程结论:

  • 你的 sink/下游必须能消费更新流(upsert/retract),否则结果会错或写不进去。

6. Sink 选型建议:优先 Upsert 型(尤其写 Kafka / 外部库)

如果你把去重结果写入 Kafka,官方upsert-kafka明确支持消费 changelog:会把 INSERT/UPDATE_AFTER 写成正常消息,把 DELETE 写成 tombstone。(Apache Nightlies)

落外部存储(JDBC、KV、OLAP)时,同样建议使用Upsert 语义(主键一致),让“同一 key 的最新结果”被覆盖更新。

7. 状态与 TTL:去重会“记住 key”,不设 TTL 可能越跑越大

去重需要维护“某个 key 当前保留的是哪一条”,因此会占用 state。Flink Table/SQL 提供table.exec.state.ttl(Idle State Retention Time)用来控制 key 的状态多久没更新就清理。(Apache Nightlies)

注意:TTL 会影响正确性(清理太早可能把还会来的迟到数据当成“新 key”),一般需要结合业务延迟/乱序程度来定。

8. 进阶:Window Deduplication(每个窗口内去重)

如果你需要“每个窗口内按 key 去重”(例如 10 分钟内同一用户只保留最后一次点击),Flink 还有专门的Window Deduplication形态:同样要求ORDER BY是时间属性,且rownum = 1/<=1/<2这种固定谓词,才能让优化器识别。(Apache Nightlies)

9. 一页避坑清单(最常见 5 个坑)

  1. rownum = 1写错/写丢 → 优化器可能无法识别为 Deduplication。(Apache Nightlies)
  2. ORDER BY不是时间属性 → 不符合 Deduplication 语义要求。(Apache Nightlies)
  3. 用 processing time 去重却要求结果稳定 → 可能每次跑都不一样。(Ververica 文檔)
  4. 下游不支持更新流(append-only sink)→ 结果不正确或写入失败。(Confluent)
  5. 不设 TTL/不控状态 → key 越来越多时 state 可能持续膨胀。(Apache Nightlies)
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 4:27:42

mlr3机器学习框架:为什么它成为R语言数据分析的首选工具?

mlr3机器学习框架&#xff1a;为什么它成为R语言数据分析的首选工具&#xff1f; 【免费下载链接】mlr3 mlr3: Machine Learning in R - next generation 项目地址: https://gitcode.com/gh_mirrors/ml/mlr3 mlr3是R语言中新一代的机器学习框架&#xff0c;作为经典mlr包…

作者头像 李华
网站建设 2026/4/12 3:39:51

LightRAG技术实践:从概念理解到应用部署

LightRAG技术实践&#xff1a;从概念理解到应用部署 【免费下载链接】LightRAG "LightRAG: Simple and Fast Retrieval-Augmented Generation" 项目地址: https://gitcode.com/GitHub_Trending/li/LightRAG 传统RAG系统的挑战与LightRAG的解决方案 在人工智能…

作者头像 李华
网站建设 2026/4/12 3:28:21

2025轻量AI革命:ERNIE-4.5-0.3B如何重新定义终端智能

2025轻量AI革命&#xff1a;ERNIE-4.5-0.3B如何重新定义终端智能 【免费下载链接】ERNIE-4.5-0.3B-PT 项目地址: https://ai.gitcode.com/hf_mirrors/baidu/ERNIE-4.5-0.3B-PT 导语&#xff1a;360亿参数的"口袋AI"来了 当大模型还在比拼千亿参数时&#xf…

作者头像 李华
网站建设 2026/3/24 12:42:59

Ant Design ProComponents终极指南:快速提升中后台开发效率

Ant Design ProComponents终极指南&#xff1a;快速提升中后台开发效率 【免费下载链接】pro-components &#x1f3c6; Use Ant Design like a Pro! 项目地址: https://gitcode.com/gh_mirrors/pr/pro-components Ant Design ProComponents是一套基于Ant Design的高级组…

作者头像 李华
网站建设 2026/4/8 5:35:48

69、使用Python的C API扩展Python

使用Python的C API扩展Python 1. C编码的Python扩展模块概述 C函数 initx 通常具有以下整体结构: void initx(void) {PyObject* thismod = Py_InitModule3("x", x_methods, "docstring for x");/* 可选:调用 PyModule_AddObject(thismod, "so…

作者头像 李华