news 2026/4/16 13:55:44

Flink CEP Pattern API、连续性、跳过策略、超时与迟到数据一篇讲透

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink CEP Pattern API、连续性、跳过策略、超时与迟到数据一篇讲透

1. 快速开始:依赖与基本骨架

FlinkCEP 不是 Flink binary 的默认组件(集群跑时需要把 jar 链接/分发到集群),你的项目里通常先加 Maven 依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep</artifactId><version>2.2.0</version></dependency>

一个最小可运行的 CEP 骨架是:

  1. 输入流DataStream<Event>(建议 event time + watermark)
  2. 定义 Pattern(模式图)
  3. CEP.pattern(input, pattern)得到PatternStream
  4. process(...)/select(...)把匹配输出成业务结果流

另外有个容易忽略的要求:参与匹配的事件对象需要正确实现equals()/hashCode(),因为 CEP 内部会拿它们做比较和匹配。

2. Pattern 的本质:你在画一张“状态机图”

官方说“pattern sequence is a graph”,你可以把它理解为:
你用begin("start") -> next("middle") -> followedBy("end")这些 API,在画一个事件序列状态机;当事件流跑过来时,CEP 会维护大量“部分匹配”(partial match),直到完整走完图,才输出一个 match。

每个 pattern 节点必须有唯一名字(后面从Map<String, List<Event>>里取匹配结果就靠这个名字)。注意:pattern 名字不能包含:

3. 单个 Pattern 怎么写:条件与类型约束

3.1 where / or:事件是否能“进入”这个节点

  • where(...):必须满足条件才能被该节点接受
  • 多次where()连续调用是 AND
  • or(...)把条件变成 OR
start.where(SimpleCondition.of(e->e.getName().startsWith("foo"))).or(SimpleCondition.of(e->e.getId()==42));

3.2 SimpleCondition vs IterativeCondition:要不要看“历史”

  • SimpleCondition:只看当前事件本身(最快、最简单)
  • IterativeCondition:可以访问同一个 partial match 中先前已接受的事件(功能强,但要注意性能)
middle.oneOrMore().subtype(SubEvent.class).where(newIterativeCondition<SubEvent>(){@Overridepublicbooleanfilter(SubEventvalue,Context<SubEvent>ctx)throwsException{if(!value.getName().startsWith("foo"))returnfalse;doublesum=value.getPrice();for(Eventprev:ctx.getEventsForPattern("middle")){sum+=prev.getPrice();}returnsum<5.0;}});

经验建议:ctx.getEventsForPattern(...)的成本会随匹配复杂度上涨,能不用就不用,或者尽量减少遍历次数。

3.3 subtype:类型过滤

如果你的输入流类型是Event,但某个节点只接受SubEvent

pattern.subtype(SubEvent.class).where(SimpleCondition.of(se->se.getVolume()>=10.0));

4. 量词 Quantifier:一次、N 次、范围、可选、贪婪

CEP 里“单节点”默认只接收 1 条事件(singleton)。如果你要让一个节点接收多条(looping pattern),就用量词:

  • oneOrMore():至少 1 次(b+)
  • times(n):恰好 n 次
  • times(from, to):范围次数
  • timesOrMore(n):至少 n 次
  • optional():可出现 0 次
  • greedy():尽可能多地吃(当前只支持量词节点,不支持 group 贪婪)

你可以把它当成正则里的+ ? {m,n}

start.times(2,4).optional().greedy();

重要提醒:对 looping pattern(oneOrMore/times)强烈建议搭配within()until()来清理状态,不然在高吞吐长时间运行里,partial match 会持续增长,状态压力会很大。

5. Pattern 之间的连续性:next / followedBy / followedByAny

这是 CEP 最“容易写错”的点,因为写出来都能跑,但输出差别巨大。

5.1 next:严格连续(Strict Contiguity)

next("b")要求 b 必须紧挨着 a,中间不能有任何不匹配事件。

5.2 followedBy:宽松连续(Relaxed Contiguity)

允许中间插入无关事件,语义更像“跳过不匹配直到下一个匹配”。

5.3 followedByAny:非确定性宽松(Non-deterministic Relaxed)

不仅允许插入无关事件,还会产生更多组合匹配(同一个 start 可以对应多个 middle/end),匹配数量可能爆炸式增长。

经典对比:pattern “a b”,输入a, c, b1, b2

  • next:无匹配(c 破坏连续)
  • followedBy:只匹配{a b1}
  • followedByAny:匹配{a b1}{a b2}

5.4 NOT 模式:notNext / notFollowedBy

  • notNext("x"):紧接着不能出现 x,否则丢弃该 partial match
  • notFollowedBy("x"):在两段之间任意位置不能出现 x

注意两条限制:

  • pattern sequence 如果末尾是notFollowedBy(),必须配within()
  • NOT pattern 不能跟在 optional pattern 后面

6. looping pattern 内部连续性:consecutive 与 allowCombinations

当你写oneOrMore()这种“多次”节点时,节点内部默认是 relaxed contiguity。

如果你希望“这些 repeated 事件必须紧挨着”,用consecutive()

.oneOrMore().consecutive()

如果你希望“重复节点内部也产生更多组合”(类似 followedByAny 的组合爆炸),用allowCombinations()

.oneOrMore().allowCombinations()

工程上要谨慎:allowCombinations()很容易导致匹配结果数量急剧上升,尤其在高基数 key 或热点 key 下会放大状态与 CPU。

7. within:给整个 pattern sequence 加时间窗口

within(Duration.ofSeconds(10))表示:从该 partial match 开始到完成匹配,必须在 10 秒内,否则丢弃(并且你可以捕获“超时 partial match”,后面会讲)。

一个 pattern sequence 只能有一个时间约束,如果你在不同节点上写多个,最终会取最小的那个。

8. AfterMatchSkipStrategy:控制“一个事件被复用到多少个匹配”

CEP 的默认行为是:同一条事件可以参与多个成功匹配。为了控制结果数量与业务语义,需要 skip strategy。

常用五种:

  • noSkip():全输出(最多)
  • skipToNext():输出一个 match 后,丢掉“和这个 match 共享同一起点事件”的其他 partial match(适合避免同起点产生多结果)
  • skipPastLastEvent():输出一个 match 后,丢掉“在该 match 覆盖范围内启动的所有 partial match”(最激进,结果最少)
  • skipToFirst("patternName"):跳到某节点第一次出现的位置
  • skipToLast("patternName"):跳到某节点最后一次出现的位置

设置方式:

AfterMatchSkipStrategyskip=AfterMatchSkipStrategy.skipPastLastEvent();Pattern<Event,?>pattern=Pattern.begin("start",skip).where(...).followedBy("middle").where(...).followedBy("end").where(...);

实战建议:

  • 你只想要“最典型的一条告警”,别让同一起点产生一堆结果:优先考虑skipToNext()
  • 你只想要“完全不重叠的匹配”:优先考虑skipPastLastEvent()
  • 如果你的模式里有oneOrMore(),默认noSkip()可能会让结果量很夸张,务必明确选择策略

9. 输出与处理:推荐用 PatternProcessFunction(并处理超时)

9.1 processMatch:每次完整匹配触发一次

processMatch收到的是:

Map<String, List<IN>> match

key 是 pattern 名字,value 是该节点接收的事件列表(因为 looping 节点可能接收多条)。

DataStream<Alert>result=patternStream.process(newPatternProcessFunction<Event,Alert>(){@OverridepublicvoidprocessMatch(Map<String,List<Event>>pattern,Contextctx,Collector<Alert>out){Eventstart=pattern.get("start").get(0);Eventend=pattern.get("end").get(0);out.collect(newAlert(start,end));}});

Context 里还能拿到时间信息(processing time / timestamp 等),并支持 side output。

9.2 超时 partial match:TimedOutPartialMatchHandler(用 side output 旁路)

只要你用了within(...),就可能发生“开始了但没完成就超时”的 partial match。可以用 mixin 方式实现TimedOutPartialMatchHandler

OutputTag<TimeoutEvent>timeoutTag=newOutputTag<>("timeout"){};SingleOutputStreamOperator<Alert>main=patternStream.process(newPatternProcessFunction<Event,Alert>()implementsTimedOutPartialMatchHandler<Event>{@OverridepublicvoidprocessMatch(Map<String,List<Event>>match,Contextctx,Collector<Alert>out){out.collect(createAlert(match));}@OverridepublicvoidprocessTimedOutMatch(Map<String,List<Event>>match,Contextctx){Eventstart=match.get("start").get(0);ctx.output(timeoutTag,newTimeoutEvent(start));}});DataStream<TimeoutEvent>timeoutStream=main.getSideOutput(timeoutTag);

注意:processTimedOutMatch不能写主输出,只能用 side output。

9.3 旧 API:select / flatSelect 仍可用,但底层会转成 PatternProcessFunction

新项目建议直接用process(...),逻辑更直观,能力也更完整。

10. Event Time 下迟到数据:CEP 假设 watermark 正确

CEP 对 event time 的处理逻辑是:

  • 元素先进入 buffer,按 timestamp 排序
  • watermark 到来时,处理 timestamp < watermark 的元素
  • timestamp 小于“最后看到的 watermark”的事件,被认为是 late element,不再参与匹配

如果你不想让迟到数据悄悄丢掉,可以用sideOutputLateData

OutputTag<Event>lateTag=newOutputTag<>("late-data"){};SingleOutputStreamOperator<Alert>out=patternStream.sideOutputLateData(lateTag).select(newPatternSelectFunction<Event,Alert>(){...});DataStream<Event>late=out.getSideOutput(lateTag);

工程建议:如果业务允许一定乱序,一定要把 watermark 策略和 allowed lateness 设计好;CEP 本身是“以 watermark 为分界线”的。

11. 性能与内存:SharedBuffer cache 参数什么时候有用

CEP 内部维护 SharedBuffer 来保存 partial matches 与事件引用。官方给了三项 cache 配置:

  • pipeline.cep.sharedbuffer.cache.entry-slots
  • pipeline.cep.sharedbuffer.cache.event-slots
  • pipeline.cep.sharedbuffer.cache.statistics-interval

关键点是:这些 cache 主要在 state backend = RocksDB 时用于限制纯内存占用,超过 cache 的部分会被“换出”到 RocksDB state 里。

反过来,如果你不是 RocksDB(例如 heap hashmap state),开启 cache 反而可能拖慢性能(copy-on-write 等开销变重)。

一句话策略:

  • RocksDB:可以用 cache slots 控制内存并换取可控的吞吐
  • 非 RocksDB:谨慎开启,先压测再决定

12. 一个更完整的示例:keyBy + within + 超时 + 迟到旁路

下面示意一个常见告警:同一个 id 的事件流中,出现error -> critical,要求 10 秒内完成。

StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Event>input=...;// 建议设置 watermarkDataStream<Event>keyed=input.keyBy(Event::getId);AfterMatchSkipStrategyskip=AfterMatchSkipStrategy.skipPastLastEvent();Pattern<Event,?>pattern=Pattern.<Event>begin("start",skip).next("middle").where(SimpleCondition.of(e->"error".equals(e.getName()))).followedBy("end").where(SimpleCondition.of(e->"critical".equals(e.getName()))).within(Duration.ofSeconds(10));PatternStream<Event>ps=CEP.pattern(keyed,pattern);OutputTag<Event>lateTag=newOutputTag<>("late"){};OutputTag<TimeoutEvent>timeoutTag=newOutputTag<>("timeout"){};SingleOutputStreamOperator<Alert>alerts=ps.sideOutputLateData(lateTag).process(newPatternProcessFunction<Event,Alert>()implementsTimedOutPartialMatchHandler<Event>{@OverridepublicvoidprocessMatch(Map<String,List<Event>>match,Contextctx,Collector<Alert>out){Eventerr=match.get("middle").get(0);Eventcri=match.get("end").get(0);out.collect(newAlert(err,cri));}@OverridepublicvoidprocessTimedOutMatch(Map<String,List<Event>>match,Contextctx){Eventerr=match.get("middle").get(0);ctx.output(timeoutTag,newTimeoutEvent(err));}});DataStream<Event>late=alerts.getSideOutput(lateTag);DataStream<TimeoutEvent>timeouts=alerts.getSideOutput(timeoutTag);

你可以按业务需要,把next/followedBy调整为更严格或更宽松的连续性,并且用 skip strategy 控制输出爆炸。

13. 迁移提示:老版本 savepoint

如果你需要从 Flink <= 1.5 的 savepoint 恢复,官方策略是:先迁移到 1.6–1.12,重新打 savepoint,再用 Flink >= 1.13 恢复(因为 1.13 起不再兼容 <=1.5 的 savepoint)。

结尾:写 CEP 最容易翻车的 3 件事

  1. 连续性没想清楚:followedByAny直接把输出量放大好几倍
  2. looping pattern 不加within()/until():状态长期累积
  3. event time watermark 设计不当:迟到数据悄悄被 CEP 当 late 丢掉(建议 sideOutputLateData 兜底)
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 10:42:02

PingFangSC字体跨平台适配完全指南

PingFangSC字体跨平台适配完全指南 【免费下载链接】PingFangSC PingFangSC字体包文件、苹果平方字体文件&#xff0c;包含ttf和woff2格式 项目地址: https://gitcode.com/gh_mirrors/pi/PingFangSC 你是不是也遇到过这样的困扰&#xff1f;&#x1f605; 在Mac上设计的…

作者头像 李华
网站建设 2026/4/14 20:09:20

跨平台文件拖放神器DropPoint:重新定义高效文件传输

跨平台文件拖放神器DropPoint&#xff1a;重新定义高效文件传输 【免费下载链接】DropPoint Make drag-and-drop easier using DropPoint. Drag content without having to open side-by-side windows 项目地址: https://gitcode.com/gh_mirrors/dr/DropPoint 为什么传统…

作者头像 李华
网站建设 2026/4/16 12:44:34

Yuzu版本管理实战技巧:从入门到精通的高效指南

Yuzu版本管理实战技巧&#xff1a;从入门到精通的高效指南 【免费下载链接】yuzu-downloads 项目地址: https://gitcode.com/GitHub_Trending/yu/yuzu-downloads 还在为Yuzu模拟器版本选择而头疼&#xff1f;想要在不同版本间灵活切换却不知从何入手&#xff1f;作为你…

作者头像 李华
网站建设 2026/4/16 10:42:18

ResNet18优化实战:模型蒸馏轻量化方案

ResNet18优化实战&#xff1a;模型蒸馏轻量化方案 1. 背景与挑战&#xff1a;通用物体识别中的效率瓶颈 在当前AI应用广泛落地的背景下&#xff0c;通用物体识别已成为智能监控、内容审核、辅助驾驶等场景的核心能力。基于ImageNet预训练的ResNet-18因其结构简洁、精度稳定&a…

作者头像 李华
网站建设 2026/4/16 12:51:49

快速理解ARM架构流水线:认知型入门解析

深入浅出ARM流水线&#xff1a;从ARM7到Cortex-M的并行演进之路你有没有想过&#xff0c;为什么一块小小的MCU芯片&#xff0c;能在微秒级响应中断、实时处理传感器数据&#xff1f;背后真正的“引擎”是什么&#xff1f;答案就藏在CPU最底层的微架构设计中——指令流水线&…

作者头像 李华
网站建设 2026/4/15 20:24:44

Yuzu模拟器性能优化实战技巧:从入门到精通的完整指南

Yuzu模拟器性能优化实战技巧&#xff1a;从入门到精通的完整指南 【免费下载链接】yuzu-downloads 项目地址: https://gitcode.com/GitHub_Trending/yu/yuzu-downloads 还在为Yuzu模拟器卡顿、闪退问题而烦恼&#xff1f;作为你的专属技术顾问&#xff0c;我将为你揭秘…

作者头像 李华