news 2026/4/16 18:19:28

Flink源码阅读:如何生成StreamGraph

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink源码阅读:如何生成StreamGraph

Flink 中有四种执行图,分别是 StreamGraph、JobGraph、ExecutionGraph 和 Physical Graph。今天我们来看下我们编写的 Flink 程序代码是如何生成 StreamGraph 的。

在开始读代码之前,我们先来简单介绍一下四种图之间的关系和区别。

StreamGraph 是根据用户用 Stream API 编写的代码生成的图,用来表示整个程序的拓扑结构。

JobGraph 是由 StreamGraph 生成的,它在 StreamGraph 的基础上,对链化了部分算子,将其合并成为一个节点,减少数据在节点之间传输时序列化和反序列化这些消耗。

ExecutionGraph 是由 JobGraph 生成的,它的主要特点是并行,将多并发的节点拆分。

PhysicalGraph 是 ExecutionGraph 实际部署后的图,它并不是一种数据结构。

StreamExecutionEnvironment

OK,了解了 Flink 四种执行图之后,我们就正式开始源码探索了。首先从 StreamExecutionEnvironment 入手,在编写 Flink 程序时,它是必不可少的一个类。它提供了一系列方法来配置流处理程序的执行环境(如并行度、Checkpoint 配置、时间属性等)。

本文我们主要关注 StreamGraph 的生成,首先是数据流的入口,即 Source 节点。在 StreamExecutionEnvironment 中有 addSource 和 fromSource 等方法,它们用来定义从哪个数据源读取数据,然后返回一个 DataStreamSource (继承自 DataStream),得到 DataStream 之后,它会在各个算子之间流转,最终到 Sink 端输出。

我们从 addSource 方法入手,addSource 方法中主要做了三件事:

1、处理数据类型,优先使用用户执行的数据类型,也可以自动推断

2、闭包清理,使用户传入的 function 能被序列化并发布到分布式环境执行

3、创建 DataStreamSource 并返回

private<OUT>DataStreamSource<OUT>addSource(finalSourceFunction<OUT>function,finalStringsourceName,@NullablefinalTypeInformation<OUT>typeInfo,finalBoundednessboundedness){checkNotNull(function);checkNotNull(sourceName);checkNotNull(boundedness);TypeInformation<OUT>resolvedTypeInfo=getTypeInfo(function,sourceName,SourceFunction.class,typeInfo);booleanisParallel=functioninstanceofParallelSourceFunction;clean(function);finalStreamSource<OUT,?>sourceOperator=newStreamSource<>(function);returnnewDataStreamSource<>(this,resolvedTypeInfo,sourceOperator,isParallel,sourceName,boundedness);}

现在我们有了 DataStream 了,那如何知道后续要进行哪些转换逻辑呢?答案在 transformations 这个变量中,它保存了后续所有的转换。

protectedfinalList<Transformation<?>>transformations=newArrayList<>();

Transformation

我们来看 Transformation 是如何生成和描述 DataStream 的转换流程的。以最常见的 map 方法为例。

public<R>SingleOutputStreamOperator<R>map(MapFunction<T,R>mapper,TypeInformation<R>outputType){returntransform("Map",outputType,newStreamMap<>(clean(mapper)));}

它调用了 transform 方法,transform 又调用了 doTransform 方法。

protected<R>SingleOutputStreamOperator<R>doTransform(StringoperatorName,TypeInformation<R>outTypeInfo,StreamOperatorFactory<R>operatorFactory){// read the output type of the input Transform to coax out errors about MissingTypeInfotransformation.getOutputType();OneInputTransformation<T,R>resultTransform=newOneInputTransformation<>(this.transformation,operatorName,operatorFactory,outTypeInfo,environment.getParallelism(),false);@SuppressWarnings({"unchecked","rawtypes"})SingleOutputStreamOperator<R>returnStream=newSingleOutputStreamOperator(environment,resultTransform);getExecutionEnvironment().addOperator(resultTransform);returnreturnStream;}

在 doTransform 方法中,就是创建 Transformation 和 SingleOutputStreamOperator(DataStream 的一个子类),然后调用 addOperator 方法将 transform 存到 StreamExecutionEnviroment 中的 transformations 变量中。

每个 Transformation 都有 id、name、parallelism 和 slotSharingGroup 等信息。其子类也记录有输入信息,如 OneInputTransformation 和 TwoInputTransformation。

StreamOperator

我们在调用 map 方法时,会传入一个自定义的处理函数,它也会保存在 Transformation 中。在 Flink 中定义了 StreamOperator 方法来抽象这类处理函数。在 map 方法中,它将我们传入的函数转成了 StreamMap,它继承了 AbstractUdfStreamOperator,同时实现了 OneInputStreamOperator 接口。

StreamOperator 定义了对算子生命周期管理的函数。

voidopen()throwsException;voidfinish()throwsException;voidclose()throwsException;OperatorSnapshotFuturessnapshotState(longcheckpointId,longtimestamp,CheckpointOptionscheckpointOptions,CheckpointStreamFactorystorageLocation)throwsException;voidinitializeState(StreamTaskStateInitializerstreamTaskStateManager)throwsException;

OneInputStreamOperator 是 StreamOperator 的子接口。在其基础上增加了对具体元素的处理,主要是对 key 的提取。

defaultvoidsetKeyContextElement(StreamRecord<IN>record)throwsException{setKeyContextElement1(record);}

AbstractUdfStreamOperator 则是提供了对自定义函数生命周期管理的实现。

@Overridepublicvoidopen()throwsException{super.open();FunctionUtils.openFunction(userFunction,DefaultOpenContext.INSTANCE);}@Overridepublicvoidfinish()throwsException{super.finish();if(userFunctioninstanceofSinkFunction){((SinkFunction<?>)userFunction).finish();}}@Overridepublicvoidclose()throwsException{super.close();FunctionUtils.closeFunction(userFunction);}

到这里,我们就知道了 Flink 中 DataStream 是如何转换的。处理逻辑保存在 Transformation 中。下面我们来看一组 Transformation 是如何生成 StreamGraph 的。

StreamGraph

生成 StreamGraph 的入口在org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#generateStreamGraph

在 generate 方法中,会遍历所有 Transformation 并调用 transform 方法。在调用节点的 transform 方法之前,会先确保它的输入节点都已经转换成功。

目前定义了以下 Transformation:

static{@SuppressWarnings("rawtypes")Map<Class<?extendsTransformation>,TransformationTranslator<?,?extendsTransformation>>tmp=newHashMap<>();tmp.put(OneInputTransformation.class,newOneInputTransformationTranslator<>());tmp.put(TwoInputTransformation.class,newTwoInputTransformationTranslator<>());tmp.put(MultipleInputTransformation.class,newMultiInputTransformationTranslator<>());tmp.put(KeyedMultipleInputTransformation.class,newMultiInputTransformationTranslator<>());tmp.put(SourceTransformation.class,newSourceTransformationTranslator<>());tmp.put(SinkTransformation.class,newSinkTransformationTranslator<>());tmp.put(GlobalCommitterTransform.class,newGlobalCommitterTransformationTranslator<>());tmp.put(LegacySinkTransformation.class,newLegacySinkTransformationTranslator<>());tmp.put(LegacySourceTransformation.class,newLegacySourceTransformationTranslator<>());tmp.put(UnionTransformation.class,newUnionTransformationTranslator<>());tmp.put(StubTransformation.class,newStubTransformationTranslator<>());tmp.put(PartitionTransformation.class,newPartitionTransformationTranslator<>());tmp.put(SideOutputTransformation.class,newSideOutputTransformationTranslator<>());tmp.put(ReduceTransformation.class,newReduceTransformationTranslator<>());tmp.put(TimestampsAndWatermarksTransformation.class,newTimestampsAndWatermarksTransformationTranslator<>());tmp.put(BroadcastStateTransformation.class,newBroadcastStateTransformationTranslator<>());tmp.put(KeyedBroadcastStateTransformation.class,newKeyedBroadcastStateTransformationTranslator<>());tmp.put(CacheTransformation.class,newCacheTransformationTranslator<>());translatorMap=Collections.unmodifiableMap(tmp);}

Flink 会根据不同的 Transformation 类调用其 translateInternal 方法。在 translateInternal 方法中就会去添加节点和边。

streamGraph.addOperator(transformationId,slotSharingGroup,transformation.getCoLocationGroupKey(),operatorFactory,inputType,transformation.getOutputType(),transformation.getName());for(IntegerinputId:context.getStreamNodeIds(parentTransformations.get(0))){streamGraph.addEdge(inputId,transformationId,0);}

在 addOperator 方法中,它通过调用 addNode 来创建 StreamNode。

protectedStreamNodeaddNode(IntegervertexID,@NullableStringslotSharingGroup,@NullableStringcoLocationGroup,Class<?extendsTaskInvokable>vertexClass,@NullableStreamOperatorFactory<?>operatorFactory,StringoperatorName){if(streamNodes.containsKey(vertexID)){thrownewRuntimeException("Duplicate vertexID "+vertexID);}StreamNodevertex=newStreamNode(vertexID,slotSharingGroup,coLocationGroup,operatorFactory,operatorName,vertexClass);streamNodes.put(vertexID,vertex);isEmpty=false;returnvertex;}

在 addEdgeInternal 方法中,对于 sideOutput 和 partition 这类虚拟节点,会先解析出原始节点,再建立实际的边。

privatevoidaddEdgeInternal(IntegerupStreamVertexID,IntegerdownStreamVertexID,inttypeNumber,StreamPartitioner<?>partitioner,List<String>outputNames,OutputTagoutputTag,StreamExchangeModeexchangeMode,IntermediateDataSetIDintermediateDataSetId){if(virtualSideOutputNodes.containsKey(upStreamVertexID)){intvirtualId=upStreamVertexID;upStreamVertexID=virtualSideOutputNodes.get(virtualId).f0;if(outputTag==null){outputTag=virtualSideOutputNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,null,outputTag,exchangeMode,intermediateDataSetId);}elseif(virtualPartitionNodes.containsKey(upStreamVertexID)){intvirtualId=upStreamVertexID;upStreamVertexID=virtualPartitionNodes.get(virtualId).f0;if(partitioner==null){partitioner=virtualPartitionNodes.get(virtualId).f1;}exchangeMode=virtualPartitionNodes.get(virtualId).f2;addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,outputNames,outputTag,exchangeMode,intermediateDataSetId);}else{createActualEdge(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,outputTag,exchangeMode,intermediateDataSetId);}}

最后根据两个物理节点创建 StreamEdge 进行连接。

privatevoidcreateActualEdge(IntegerupStreamVertexID,IntegerdownStreamVertexID,inttypeNumber,StreamPartitioner<?>partitioner,OutputTagoutputTag,StreamExchangeModeexchangeMode,IntermediateDataSetIDintermediateDataSetId){StreamNodeupstreamNode=getStreamNode(upStreamVertexID);StreamNodedownstreamNode=getStreamNode(downStreamVertexID);// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if(partitioner==null&&upstreamNode.getParallelism()==downstreamNode.getParallelism()){partitioner=dynamic?newForwardForUnspecifiedPartitioner<>():newForwardPartitioner<>();}elseif(partitioner==null){partitioner=newRebalancePartitioner<Object>();}if(partitionerinstanceofForwardPartitioner){if(upstreamNode.getParallelism()!=downstreamNode.getParallelism()){if(partitionerinstanceofForwardForConsecutiveHashPartitioner){partitioner=((ForwardForConsecutiveHashPartitioner<?>)partitioner).getHashPartitioner();}else{thrownewUnsupportedOperationException("Forward partitioning does not allow "+"change of parallelism. Upstream operation: "+upstreamNode+" parallelism: "+upstreamNode.getParallelism()+", downstream operation: "+downstreamNode+" parallelism: "+downstreamNode.getParallelism()+" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}}}if(exchangeMode==null){exchangeMode=StreamExchangeMode.UNDEFINED;}/** * Just make sure that {@link StreamEdge} connecting same nodes (for example as a result of * self unioning a {@link DataStream}) are distinct and unique. Otherwise it would be * difficult on the {@link StreamTask} to assign {@link RecordWriter}s to correct {@link * StreamEdge}. */intuniqueId=getStreamEdges(upstreamNode.getId(),downstreamNode.getId()).size();StreamEdgeedge=newStreamEdge(upstreamNode,downstreamNode,typeNumber,partitioner,outputTag,exchangeMode,uniqueId,intermediateDataSetId);getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}

通过 StreamNode 和 StreamEdge,就可以得到所有的节点和边,也就是我们的 StreamGraph 就创建完成了。

总结

本文先介绍了 Flink 的四种执行图以及它们之间的关系。接着又通过源码探索了 StreamGraph 的生成逻辑,Flink 将处理 逻辑保存在 Transformation 中,又由 Transformation 生成了 StreamGraph。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 12:04:39

当AI遇上电影工业:一个让“抽卡式“视频生成彻底翻篇的工程化实践

从剧本到成片,我们如何用关键帧驱动理念重构AI视频生产流程 写在前面的话 说实话,当我第一次看到市面上那些"输入一句话就能生成视频"的AI工具时,内心是激动的。但当我真正尝试用它们做点正经事——比如制作一个30秒的品牌宣传片——我就发现了问题:生成的视频就像开…

作者头像 李华
网站建设 2026/4/15 14:15:24

当AI遇上A股:一个让机器读懂财经新闻的量化框架

"如果你能让机器理解新闻,它就能预测股价。"这听起来像科幻小说,但CSMD项目正在把这个想法变成现实。 一、缘起:股市预测为什么这么难? 周五下午三点,你盯着电脑屏幕上跳动的K线图,心里盘算着下周一该不该加仓。突然,手机推送了一条新闻:"某龙头企业获得重大技…

作者头像 李华
网站建设 2026/4/16 15:15:03

紫薯矮砧密植:水肥一体化系统的铺设要点纪实

红薯地里&#xff0c;老刘的紫薯长势喜人&#xff0c;薯块饱满。"这套水肥系统让我的紫薯产量提高四成&#xff0c;"他指着地里的滴灌设备说&#xff0c;"不仅省水省肥&#xff0c;紫薯品质还特别好。"认识紫薯矮砧密植紫薯矮砧密植&#xff0c;简单来说就…

作者头像 李华
网站建设 2026/4/16 13:44:51

拦截器注册InterceptorRegistry 实现讲解

1.核心概念InterceptorRegistry 是 Spring MVC 提供的拦截器注册器&#xff0c;用于配置拦截器的拦截规则。2.主要方法addInterceptor(): 添加拦截器 addPathPatterns(): 指定要拦截的路径 excludePathPatterns(): 指定要排除的路径 路径匹配规则 /api/**: 匹配 /api/ 下的所有…

作者头像 李华
网站建设 2026/4/16 13:44:08

软件缺少vbschs.dll文件 无法启动运行的情况 下载修复

在使用电脑系统时经常会出现丢失找不到某些文件的情况&#xff0c;由于很多常用软件都是采用 Microsoft Visual Studio 编写的&#xff0c;所以这类软件的运行需要依赖微软Visual C运行库&#xff0c;比如像 QQ、迅雷、Adobe 软件等等&#xff0c;如果没有安装VC运行库或者安装…

作者头像 李华
网站建设 2026/4/16 13:44:20

sglang 大模型推理框架支持的EAGLE 1,2,3

文章目录EAGLE 系列模型的演进与核心机制关键参数与训练逻辑思考参考来源&#xff1a;https://docs.sglang.com.cn/backend/speculative_decoding.html https://github.com/SafeAILab/EAGLE EAGLE3 https://arxiv.org/pdf/2503.01840 EAGLE 系列模型的演进与核心机制 EAGLE 基…

作者头像 李华