news 2026/6/10 16:58:12

Flink SQL INSERT 语句单表写入、多表分流、分区覆盖与 StatementSet

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink SQL INSERT 语句单表写入、多表分流、分区覆盖与 StatementSet

1. INSERT 语句是干嘛的

INSERT用于把查询结果或字面量数据写入目标表(sink 表)。在 Flink 里,执行 INSERT 会提交一个 Flink Job(流式作业通常是长期运行)。


2. Java 里怎么跑 INSERT:单条 executeSql vs 多条 StatementSet

2.1 单条 INSERT:executeSql 立即提交作业

executeSql()执行 INSERT 会立刻提交 Job,并返回TableResult,你可以通过它拿到 JobClient 查询状态。

TableEnvironmenttEnv=TableEnvironment.create(...);// source & sinktEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");tEnv.executeSql("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");// submit job immediatelyTableResultr1=tEnv.executeSql("INSERT INTO RubberOrders "+"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");System.out.println(r1.getJobClient().get().getJobStatus());

2.2 多条 INSERT:StatementSet.addInsertSql 延迟执行,一次 execute 提交

当你要把同一个源表数据分流写入多个 sink(比如 RubberOrders、GlassOrders),用StatementSet更合适:先addInsertSql()收集多条语句,最后execute()一次性提交。

tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");StatementSetstmtSet=tEnv.createStatementSet();stmtSet.addInsertSql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");stmtSet.addInsertSql("INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");TableResultr2=stmtSet.execute();System.out.println(r2.getJobClient().get().getJobStatus());

注意:addInsertSql()每次只能接收一条 INSERT 语句(不要把多个 INSERT 拼一条字符串)。

3. INSERT INTO / INSERT OVERWRITE:追加 vs 覆盖

3.1 总体语法(Insert from Select)

[EXECUTE]INSERT{INTO|OVERWRITE }[catalog.][db.]table_name[PARTITIONpart_spec][column_list]select_statement

3.2 INTO:追加写入(Append)

  • 不覆盖已有数据(或已有分区数据),新结果继续追加。

3.3 OVERWRITE:覆盖写入(Overwrite)

  • INSERT OVERWRITE会覆盖目标表或目标分区已有数据。
  • 常用于离线批处理、重跑分区、或者“以最后一次跑出来的结果为准”的场景。

4. 分区写入:静态分区 vs 动态分区

假设目标表是分区表:

CREATETABLEcountry_page_view(userSTRING,cntINT,dateSTRING,country STRING)PARTITIONEDBY(date,country)WITH(...);

4.1 写入静态分区(date/country 都固定)

INSERTINTOcountry_page_viewPARTITION(date='2019-8-30',country='China')SELECTuser,cntFROMpage_view_source;

4.2 半动态分区(date 固定、country 每行决定)

INSERTINTOcountry_page_viewPARTITION(date='2019-8-30')SELECTuser,cnt,countryFROMpage_view_source;

4.3 覆盖分区写入(静态/半动态都支持)

INSERTOVERWRITE country_page_viewPARTITION(date='2019-8-30',country='China')SELECTuser,cntFROMpage_view_source;INSERTOVERWRITE country_page_viewPARTITION(date='2019-8-30')SELECTuser,cnt,countryFROMpage_view_source;

5. EXECUTE 关键字:显式执行(语义等价)

Flink 允许在 INSERT 前面加EXECUTE,用于强调“我要执行这条语句”,但语义上等价于不加。

EXECUTEINSERTINTOcountry_page_viewPARTITION(date='2019-8-30',country='China')SELECTuser,cntFROMpage_view_source;

6. column_list:部分列写入(Partial Insert)怎么映射?

Flink 支持指定目标列列表,把 SELECT 的列按列表顺序写入指定列,未写到的列会被置为NULL(前提:该列可空)。

例:表T(a INT, b INT, c INT)

INSERTINTOT(c,b)SELECTx,yFROMS;

含义是:

  • x写入c
  • y写入b
  • a被置为NULL(如果a允许为 NULL)

对 connector/sink 开发者:可以通过DynamicTableSink.Context.getTargetColumns()获取用户指定的目标列,决定如何处理“部分列更新”。

7. INSERT … VALUES:直接插入字面量行

除了INSERT INTO ... SELECT ...,也可以直接写 values:

[EXECUTE]INSERT{INTO|OVERWRITE } table_nameVALUES(val1,val2,...),(val1,val2,...);

示例:

CREATETABLEstudents(name STRING,ageINT,gpaDECIMAL(3,2))WITH(...);INSERTINTOstudentsVALUES('fred flintstone',35,1.28),('barney rubble',32,2.32);

8. 一条 SQL 写多个表:EXECUTE STATEMENT SET

如果你在 SQL 层就想“一次提交多条 insert”,可以用:

EXECUTESTATEMENTSETBEGINinsert_statement;insert_statement;END;

其中insert_statement可以是INSERT ... SELECTINSERT ... VALUES

9. 生产实践建议(你放到博客结尾很加分)

  1. 多 sink 分流优先用 StatementSet:一次提交、共享规划,写法更稳。
  2. OVERWRITE 慎用:尤其是流式任务,确认 connector 对覆盖语义的支持与目标表期望行为。
  3. 分区写入要区分静态/动态:静态分区适合重跑;动态分区适合实时按维度落地。
  4. 部分列写入会把其它列写成 NULL:对非空列/主键列要提前约束,否则容易写入失败或产生脏数据。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 12:31:54

Dolby Atmos Lite:轻量级全景声音效模拟工具,多设备音效增强方案

Dolby Atmos Lite是一款专注于音效增强的轻量级工具,旨在通过算法模拟杜比全景声的沉浸式音频体验。该软件以其极小的体积和广泛的设备兼容性,为用户提供了简单的音效优化解决方案,特别适合希望在普通设备上获得更好音频体验的用户。 获取地…

作者头像 李华
网站建设 2026/6/10 12:26:18

python第一阶段第10章

1. 整体介绍1.1 数据来源2. 效果一: 折线图-----2020印美日新冠累计确诊人数2.1 json数据格式2.1.1 什么时json2.1.2 json有什么用2.1.3 json格式数据转化import json # 准备列表 ,列表内每一个元素都是字典,将其转换为json data [{"na…

作者头像 李华
网站建设 2026/6/9 19:16:05

LobeChat能否集成雾凇形成条件?气象奇观预测与摄影时机推荐

LobeChat能否集成雾凇形成条件?气象奇观预测与摄影时机推荐 在吉林市的寒冬清晨,松花江畔的树枝上挂满晶莹剔透的冰晶——这就是被誉为“冬天童话”的雾凇奇观。每年吸引无数摄影师驱车数百公里守候一夜,只为捕捉那一瞬的美景。但问题也随之而…

作者头像 李华
网站建设 2026/6/10 14:14:37

【Embedded Development】嵌入式相关编程技巧

一、简介 此篇文章专用于记录以及汇总嵌入式高级编程技巧。当然这里面就不会去再单独讲解一次合理使用一些关键词(比如const、static、volital)去优化程序代码的情况了。 如有错误欢迎在评论区指出,或者有其他的小技巧,也欢迎在评…

作者头像 李华
网站建设 2026/6/9 20:38:42

8、Linux 用户管理与软件管理全解析

Linux 用户管理与软件管理全解析 1. PAM 认证管理 1.1 PAM 模块概述 PAM(可插拔认证模块)允许系统根据所需的认证类型添加或替换认证模块。管理员只需更改 PAM 配置文件即可实现这一点。PAM 模块位于 /lib/security 目录,更多信息和模块列表可查看 PAM 官网 。 1.2 …

作者头像 李华
网站建设 2026/6/10 15:43:50

16、深入了解Linux存储与设备管理

深入了解Linux存储与设备管理 RAID示例 在一个简单的RAID配置里,有三个RAID设备,利用两块硬盘上对应的分区来设置 /boot 、 /root 和 /home 分区。由于系统只能从RAID 1设备启动,而不能从RAID 5启动,所以 /boot 分区被配置为RAID 1设备,其他分区则采用更常用的R…

作者头像 李华