news 2026/4/15 20:53:57

Flink Procedures 用 SQL 的 `CALL` 跑 Flink Job(实现、类型推断、命名参数、Catalog 集成一篇搞懂)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink Procedures 用 SQL 的 `CALL` 跑 Flink Job(实现、类型推断、命名参数、Catalog 集成一篇搞懂)

1. Procedures 是什么,适合做什么

Procedure 可以理解为:SQL 世界里的“存储过程”,但执行体可以启动 Flink 作业

典型用途

  • 管理类:生成测试数据、重建/维护某些资源、触发后台作业
  • 数据操作类:一键跑一个数据准备/清洗/校验 Job,并把结果以表的形式返回
  • 平台化:把一堆“运维脚本/管理逻辑”收敛到 Catalog 中,让用户统一用 SQLCALL调用

2. 实现规则:必须实现Procedure接口 + 定义call(...)

2.1 类要求

  • 实现org.apache.flink.table.procedures.Procedure
  • 类必须public、非抽象、全局可访问
  • 不能是匿名类、非 static 内部类

2.2call(...)方法要求(最关键)

接口本身不定义方法,你需要自己定义名为call的方法:

硬性规则

  • call必须public

  • 第一个参数必须是ProcedureContext

    • context.getExecutionEnvironment()拿到StreamExecutionEnvironment
  • 返回类型必须是数组int[]String[]Row[]Long[]

而且 JVM 普通重载规则都适用

  • 支持重载:call(ctx, int)/call(ctx, String)
  • 支持 varargs:call(ctx, Integer...)
  • 支持继承入参:call(ctx, Object)

如果你用 Scala

  • varargs 需要加scala.annotation.varargs
  • 建议用装箱类型(java.lang.Integer而不是Int)以支持 NULL

3. 一个最小可用的 Procedure 示例:生成序列

下面这个示例展示了:Procedure 拿到StreamExecutionEnvironment,用fromSequence跑一个小 Job,然后把结果收集为数组返回。

importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.util.CloseableIterator;publicclassGenerateSequenceProcedureimplementsProcedure{publiclong[]call(ProcedureContextcontext,intn)throwsException{returngenerate(context.getExecutionEnvironment(),n);}publiclong[]call(ProcedureContextcontext,Stringn)throwsException{returngenerate(context.getExecutionEnvironment(),Integer.parseInt(n));}privatelong[]generate(StreamExecutionEnvironmentenv,intn)throwsException{long[]sequenceN=newlong[n];inti=0;try(CloseableIterator<Long>it=env.fromSequence(0,n-1).executeAndCollect()){while(it.hasNext()){sequenceN[i++]=it.next();}}returnsequenceN;}}

要点

  • call可以重载
  • context.getExecutionEnvironment()获取执行环境
  • 结果必须是数组

4. 类型推断 Type Inference:为什么你有时必须加注解

Flink Table/SQL 是强类型生态,因此 Procedure 的参数/返回值需要映射成 Flink DataType。

Flink 会通过反射自动推断(Automatic Type Inference),但在以下情况经常需要你“补注解”

  • 小数精度/scale(DECIMAL)
  • 嵌套 Row 类型(ROW<…>)
  • RAW/自定义序列化对象
  • 一个call想吃多种输入类型、但希望统一输出类型(用@ProcedureHint

4.1@DataTypeHint:给参数或返回值补充类型信息

注意:call的返回值必须是T[],如果你给返回值加@DataTypeHint,其实标注的是数组元素类型 T

importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.annotation.InputGroup;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.types.Row;importjava.math.BigDecimal;importjava.nio.ByteBuffer;importjava.time.Instant;publicclassOverloadedProcedureimplementsProcedure{publicLong[]call(ProcedureContextcontext,longa,longb){returnnewLong[]{a+b};}public@DataTypeHint("DECIMAL(12, 3)")BigDecimal[]call(ProcedureContextcontext,doublea,doubleb){returnnewBigDecimal[]{BigDecimal.valueOf(a+b)};}@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")publicRow[]call(ProcedureContextcontext,inti){returnnewRow[]{Row.of(String.valueOf(i),Instant.ofEpochSecond(i))};}@DataTypeHint(value="RAW",bridgedTo=ByteBuffer.class)publicByteBuffer[]call(ProcedureContextcontext,@DataTypeHint(inputGroup=InputGroup.ANY)Objecto){returnnewByteBuffer[]{MyUtils.serializeToByteBuffer(o)};}}

4.2@ProcedureHint:把“输入类型 -> 输出类型”的映射说清楚

适合场景

  • 一个call想统一处理多输入类型(例如Object...
  • 多个重载call的输出类型一致,想全局声明一次
importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.annotation.ProcedureHint;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;importorg.apache.flink.types.Row;@ProcedureHint(output=@DataTypeHint("ROW<s STRING, i INT>"))publicclassSumProcedureimplementsProcedure{publicRow[]call(ProcedureContextcontext,inta,intb){returnnewRow[]{Row.of("Sum",a+b)};}publicRow[]call(ProcedureContextcontext){returnnewRow[]{Row.of("Empty args",-1)};}}

更极端的用法:完全用 hint 决定类型,call只要 JVM 能调用即可。

5. 命名参数 Named Parameters:让 CALL 更可读,也能省参数

调用 procedure 时可以用“命名参数”,好处

  • 不怕参数顺序写错
  • 可选参数可省略(默认补null
  • 可读性强(适合平台化)

通过@ArgumentHint标注参数名、类型、是否可选。

参数级别标注示例

importorg.apache.flink.table.annotation.ArgumentHint;importorg.apache.flink.table.annotation.DataTypeHint;importorg.apache.flink.table.procedure.ProcedureContext;importorg.apache.flink.table.procedures.Procedure;publicclassNamedParameterProcedureimplementsProcedure{public@DataTypeHint("INT")Integer[]call(ProcedureContextcontext,@ArgumentHint(name="a",isOptional=true)Integera,@ArgumentHint(name="b")Integerb){returnnewInteger[]{a+(b==null?0:b)};}}

重要限制

  • 命名参数仅在没有重载、没有可变参数(varargs)时生效
  • @ArgumentHint已包含@DataTypeHint,在某些组合场景下不能混用(按文档要求)

6. 把 Procedure 放进 Catalog:getProcedure+listProcedures

Procedure 必须存在于 Catalog 才能被CALL

你需要在 Catalog 中实现:

  • Catalog.getProcedure(ObjectPath procedurePath):返回 procedure 实例
  • Catalog.listProcedures(String dbName):列出该库下有哪些 procedure

示例(内存 catalog 内置 procedure)

importorg.apache.flink.table.catalog.GenericInMemoryCatalog;importorg.apache.flink.table.catalog.ObjectPath;importorg.apache.flink.table.catalog.exceptions.CatalogException;importorg.apache.flink.table.catalog.exceptions.DatabaseNotExistException;importorg.apache.flink.table.catalog.exceptions.ProcedureNotExistException;importorg.apache.flink.table.procedures.Procedure;importjava.util.*;importjava.util.stream.Collectors;publicclassCatalogWithBuiltInProcedureextendsGenericInMemoryCatalog{privatestaticfinalMap<ObjectPath,Procedure>PROCEDURE_MAP=newHashMap<>();static{PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"),newGenerateSequenceProcedure());}publicCatalogWithBuiltInProcedure(Stringname){super(name);}@OverridepublicList<String>listProcedures(StringdbName)throwsDatabaseNotExistException,CatalogException{if(!databaseExists(dbName)){thrownewDatabaseNotExistException(getName(),dbName);}returnPROCEDURE_MAP.keySet().stream().filter(p->p.getDatabaseName().equals(dbName)).map(ObjectPath::getObjectName).collect(Collectors.toList());}@OverridepublicProceduregetProcedure(ObjectPathprocedurePath)throwsProcedureNotExistException,CatalogException{Procedurep=PROCEDURE_MAP.get(procedurePath);if(p!=null){returnp;}thrownewProcedureNotExistException(getName(),procedurePath);}}

7. SQL 调用:CALL catalog.db.proc(args...)

注册 Catalog 后就能调用:

TableEnvironmenttEnv=TableEnvironment.create(...);tEnv.registerCatalog("my_catalog",newCatalogWithBuiltInProcedure("my_catalog"));// 调用tEnv.executeSql("CALL my_catalog.`system`.generate_n(5)");

SQL 侧一般就是

  • CALL my_catalog.\system.generate_n(5)
  • 或者用命名参数(如果你的 procedure 支持且没有重载/varargs)

8. 实战建议:什么时候用 Procedure,什么时候别用

推荐用 Procedure

  • 平台里做“管理命令”:一键生成数据、触发离线/流式任务、数据质量检查
  • 把复杂逻辑隐藏在 Procedure 里,让用户只写CALL ...

不太推荐(或要谨慎)

  • 你只是想做查询内的行级/集合级变换:那是 UDF/PTF 的领域
  • Procedure 内部启动长周期作业时,要考虑资源、权限、隔离和可观测性(日志/指标/审计)
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 10:41:23

【国产大模型进阶之路】:基于智谱Open-AutoGLM的5个关键突破点全披露

第一章&#xff1a;智谱 Open-AutoGLM沉思Open-AutoGLM 是智谱AI推出的一项面向自动化自然语言任务的创新技术&#xff0c;融合了大模型理解能力与任务自适应机制。其核心在于通过自然语言指令驱动模型自主分析任务需求、选择合适工具并生成可执行逻辑&#xff0c;实现从“人写…

作者头像 李华
网站建设 2026/4/16 11:12:25

计算机网络及TCP网络应用程序开发

学习目标 1、了解计算机网络相关信息 2、掌握Python3编码转换的方法 3、掌握TCP客户端及服务器端开发流程及应用实践 4、socket套接字之send和recv原理剖析 5、掌握网络综合案例&#xff1a;多任务版TCP服务端程序应用实践 一、计算机网络概述 1、网络的概念 网络就是将具有独立…

作者头像 李华
网站建设 2026/4/16 2:05:33

10、路由器取证与网络取证全解析

路由器取证与网络取证全解析 在当今数字化时代,网络安全至关重要。路由器取证和网络取证作为保障网络安全的重要手段,能够帮助我们发现安全事件的源头、追踪攻击者的踪迹。下面将深入探讨这两个领域的相关知识。 网络取证概述 网络取证主要是对网络流量和事件进行嗅探、记…

作者头像 李华
网站建设 2026/4/15 16:14:11

13、数字取证软件与硬件工具全解析

数字取证软件与硬件工具全解析 在数字取证领域,软件和硬件工具都发挥着至关重要的作用。本文将为你详细介绍各类数字取证工具,包括软件工具和硬件工具的特点、功能及应用场景。 数字取证软件工具 在数字取证工作中,软件工具是不可或缺的一部分,它们能帮助我们完成数据恢复…

作者头像 李华
网站建设 2026/4/16 11:20:56

21、Windows Azure Blob存储:功能与操作全解析

Windows Azure Blob存储:功能与操作全解析 在云计算时代,数据的存储和管理至关重要。Windows Azure提供了强大的Blob存储服务,它具有多种功能,能够满足不同场景下的数据存储需求。本文将详细介绍Windows Azure Blob存储的多个重要功能,包括Blob复制、块Blob、页Blob、Win…

作者头像 李华