news 2026/4/16 14:10:22

Flink源码阅读:如何生成JobGraph

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink源码阅读:如何生成JobGraph

前文我们介绍了 Flink 的四种执行图,并且通过源码了解了 Flink 的 StreamGraph 是怎么生成的,本文我们就一起来看下 Flink 的另一种执行图——JobGraph 是如何生成的。

StreamGraph 和 JobGraph 的区别

在正式开始之前,我们再来回顾一下 StreamGraph 和 JobGraph 的区别。假设我们的任务是建造一座大楼,StreamGraph 就像是设计蓝图,它描述了每个窗户、每根水管的位置和规格,而 JobGraph 像是给到施工队的施工流程图,它描述了每个任务模块,例如先把地基浇筑好,再铺设管线等。总的来说,JobGraph 更偏向执行层面,它是由 StreamGraph 优化而来。

回到 Flink 本身,我们通过一个表格来了解两个图的区别。

StreamGraphJobGraph
生成阶段客户端,执行 execute() 时客户端,提交前由 StreamGraph 转换生成
抽象层级高层逻辑图,直接对应 API优化后的执行图,为调度做准备
核心优化主要是算子链优化
节点StreamNodeJobVertex
StreamEdgeJobEdge
提交对象提交给 JobManager
包含资源包含执行作业所需的 Jar 包、依赖库和资源文件

JobVertex

JobGraph 中的节点是 JobVertex,在 StreamGraph 转换成 JobGraph 的过程中,会将多个节点串联起来,最终生成 JobVertex。

JobVertex包含以下成员变量:

我们分别看一下这些成员变量及其作用。

1、标识符相关
// JobVertex的id,在作业执行过程中的唯一标识。监控、调度和故障恢复都会使用privatefinalJobVertexIDid;// operator id列表,按照深度优先顺序存储。operator 的管理、状态分配都会用到privatefinalList<OperatorIDPair>operatorIDs;
2、输入输出相关
// 定义所有的输入边privatefinalList<JobEdge>inputs=newArrayList<>();// 定义所有的输出数据集privatefinalMap<IntermediateDataSetID,IntermediateDataSet>results=newLinkedHashMap<>();// 输入分片源,主要用于批处理作业,定义如何将数据分成多个片privateInputSplitSource<?>inputSplitSource;
3、执行配置相关
// 并行度,即运行时拆分子任务数量,默认使用全局配置privateintparallelism=ExecutionConfig.PARALLELISM_DEFAULT;// 最大并行度privateintmaxParallelism=MAX_PARALLELISM_DEFAULT;// 存储运行时实际执行的类,使 Flink 可以灵活处理不同类型的操作符// 流任务可以是"org.apache.flink.streaming.runtime.tasks.StreamTask"// 批任务可以是"org.apache.flink.runtime.operators.BatchTask"privateStringinvokableClassName;// 自定义配置privateConfigurationconfiguration;// 是否是动态设置并发度privatebooleandynamicParallelism=false;// 是否支持优雅停止privatebooleanisStoppable=false;
4、资源管理相关
// JobVertex 最小资源需求privateResourceSpecminResources=ResourceSpec.DEFAULT;// JobVertex 推荐资源需求privateResourceSpecpreferredResources=ResourceSpec.DEFAULT;// 用于资源优化,运行不同的 JobVertex 的子任务运行在同一个 slot@NullableprivateSlotSharingGroupslotSharingGroup;// 需要严格共址的 JobVertex 组,每个 JobVertex 的第 n 个子任务运行在同一个 TaskManager@NullableprivateCoLocationGroupImplcoLocationGroup;
5、协调器
// 操作符协调器,用于处理全局协调逻辑privatefinalList<SerializedValue<OperatorCoordinator.Provider>>operatorCoordinators=newArrayList<>();
6、显示和描述信息
// JobVertex 的名称privateStringname;// 操作符名称,比如 'Flat Map' 或 'Join'privateStringoperatorName;// 操作符的描述,比如 'Hash Join' 或 'Sorted Group Reduce'privateStringoperatorDescription;// 提供比 name 更友好的描述信息privateStringoperatorPrettyName;
7、状态和行为标志
// 是否支持同一个子任务并发多次执行privatebooleansupportsConcurrentExecutionAttempts=true;// 标记并发度是否被显式设置privatebooleanparallelismConfigured=false;// 是否有阻塞型输出privatebooleananyOutputBlocking=false;
8、缓存数据集
// 存储该 JobVertex 需要消费的缓存中间数据集的 ID,可提高作业执行效率privatefinalList<IntermediateDataSetID>intermediateDataSetIdsToConsume=newArrayList<>();

JobEdge

在 StreamGraph 中,StreamEdge 是连接 StreamNode 的桥梁。在 JobGraph 中,与之对应的是 JobEdge,不同点在于 JobEdge 中保存的是输入节点和输出结果。

1、连接关系成员
// 定义数据流向哪个 JobVertexprivatefinalJobVertextarget;// 定义这条边的源数据privatefinalIntermediateDataSetsource;// 输入类型的编号privatefinalinttypeNumber;// 多个输入间的键是否相关,如果为 true,相同键的数据在一个输入被分割时,在其他数据对应的记录也会发送到相同的下游节点privatefinalbooleaninterInputsKeysCorrelated;// 同一输入内相同的键是否必须发送到同一下游任务privatefinalbooleanintraInputKeyCorrelated;
2、数据分发模式
// 定义数据在并行任务期间的分发模式// 可能值:// ALL_TO_ALL:全连接,每个上游子任务连接所有下游任务// POINTWISE:点对点连接,一对一或一对多的本地连接privatefinalDistributionPatterndistributionPattern;
3、数据传输策略
// 是否为广播连接privatefinalbooleanisBroadcast;// 是否为 forward 连接,forward 连接最高效,直接转发,无需序列化网络传输privatefinalbooleanisForward;// 数据传输策略名称,用于显示privateStringshipStrategyName;
4、状态重分布映射器
// 下游状态重分布映射器,当作业扩容时,决定是否重新分配下游算子的持久化状态privateSubtaskStateMapperdownstreamSubtaskStateMapper=SubtaskStateMapper.ROUND_ROBIN;// 上游状态重分布映射器,当作业扩容时,决定是否重新分配上游算子的持久化状态privateSubtaskStateMapperupstreamSubtaskStateMapper=SubtaskStateMapper.ROUND_ROBIN;
5、描述和缓存信息
// 预处理操作的名称privateStringpreProcessingOperationName;// 操作符级别缓存的描述privateStringoperatorLevelCachingDescription;

StreamGraph 转换成 JobGraph

现在我们再来看一下 StreamGraph 是如何转换成 JobGraph 的。转换逻辑的入口是 StreamGraph.getJobGraph 方法。它只是调用了 StreamingJobGraphGenerator.createJobGraph,核心逻辑在 createJobGraph 方法中。

privateJobGraphcreateJobGraph(){// 预验证,检查 StreamGraph 配置正确性preValidate(streamGraph,userClassloader);// 【核心】链化操作符setChaining();if(jobGraph.isDynamic()){// 支持动态扩缩容场景,为动态图设置并行度setVertexParallelismsForDynamicGraphIfNecessary();}// Note that we set all the non-chainable outputs configuration here because the// "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job// vertices and partition-reusefinalMap<Integer,Map<StreamEdge,NonChainedOutput>>opIntermediateOutputs=newHashMap<>();// 设置不能链化的输出边setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs,jobVertexBuildContext);setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);// 设置物理边连接setPhysicalEdges(jobVertexBuildContext);// 设置支持并发执行的 JobVertexmarkSupportingConcurrentExecutionAttempts(jobVertexBuildContext);// 验证混合 shuffle 模式只在批处理模式下使用validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext);// 设置 Slot 共享和协同定位setSlotSharingAndCoLocation(jobVertexBuildContext);// 设置托管内存比例setManagedMemoryFraction(jobVertexBuildContext);// 为 JobVertex 名称添加前缀addVertexIndexPrefixInVertexName(jobVertexBuildContext,newAtomicInteger(0));// 设置操作符描述信息setVertexDescription(jobVertexBuildContext);// Wait for the serialization of operator coordinators and stream config.// 序列化操作符协调器和流配置serializeOperatorCoordinatorsAndStreamConfig(serializationExecutor,jobVertexBuildContext);returnjobGraph;}

可以看到,在 createJobGraph 方法中,调用了 setChaining 方法,即进行链化操作。这也是 JobGraph 最核心的优化之一。下面我们来看一下具体怎么做链化。

privatevoidsetChaining(){// we separate out the sources that run as inputs to another operator (chained inputs)// from the sources that needs to run as the main (head) operator.finalMap<Integer,OperatorChainInfo>chainEntryPoints=buildChainedInputsAndGetHeadInputs();finalCollection<OperatorChainInfo>initialEntryPoints=chainEntryPoints.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).map(Map.Entry::getValue).collect(Collectors.toList());// iterate over a copy of the values, because this map gets concurrently modifiedfor(OperatorChainInfoinfo:initialEntryPoints){createChain(info.getStartNodeId(),1,// operators start at position 1 because 0 is for chained source inputsinfo,chainEntryPoints,true,serializationExecutor,jobVertexBuildContext,null);}}

setChaining 方法中主要分为两步,第一步是处理 Source 节点,将可以链化的 Source 和不能链化的 Source 节点分开。先来看如何判断一个 Source 是否可被链化。

publicstaticbooleanisChainableSource(StreamNodestreamNode,StreamGraphstreamGraph){// 最基本的一些判空,输出边数量为1if(streamNode.getOperatorFactory()==null||!(streamNode.getOperatorFactory()instanceofSourceOperatorFactory)||streamNode.getOutEdges().size()!=1){returnfalse;}finalStreamEdgesourceOutEdge=streamNode.getOutEdges().get(0);finalStreamNodetarget=streamGraph.getStreamNode(sourceOutEdge.getTargetId());finalChainingStrategytargetChainingStrategy=Preconditions.checkNotNull(target.getOperatorFactory()).getChainingStrategy();// 链化策略必须 HEAD_WITH_SOURCES,输出边是可链化的returntargetChainingStrategy==ChainingStrategy.HEAD_WITH_SOURCES&&isChainableInput(sourceOutEdge,streamGraph,false);}privatestaticbooleanisChainableInput(StreamEdgeedge,StreamGraphstreamGraph,booleanallowChainWithDefaultParallelism){StreamNodeupStreamVertex=streamGraph.getSourceVertex(edge);StreamNodedownStreamVertex=streamGraph.getTargetVertex(edge);if(!(streamGraph.isChainingEnabled()// 上下游节点是否在同一个 slot 共享组&&upStreamVertex.isSameSlotSharingGroup(downStreamVertex)// 操作符是否可以链化,主要做并行度检查&&areOperatorsChainable(upStreamVertex,downStreamVertex,streamGraph,allowChainWithDefaultParallelism)// 分区器和交换模式是否支持链化&&arePartitionerAndExchangeModeChainable(edge.getPartitioner(),edge.getExchangeMode(),streamGraph.isDynamic()))){returnfalse;}// check that we do not have a union operation, because unions currently only work// through the network/byte-channel stack.// we check that by testing that each "type" (which means input position) is used only once// 检查是否为 Union 操作,Union 操作不能链化for(StreamEdgeinEdge:downStreamVertex.getInEdges()){if(inEdge!=edge&&inEdge.getTypeNumber()==edge.getTypeNumber()){returnfalse;}}returntrue;}

Source 的链化条件主要就是这些,我们结合一些例子来看一下。

Source(并行度=4) -> Map(并行度=4) -> Filter(并行度=4) Source -> Map 边: 1. isChainingEnabled() = true 2. isSameSlotSharingGroup() = true (都在默认组) 3. areOperatorsChainable() = true (Source可链化,Map是HEAD_WITH_SOURCES) 4. arePartitionerAndExchangeModeChainable() = true (ForwardPartitioner) 5. Union检查通过 结果:可链化 Map -> Filter 边: 1. isChainingEnabled() = true 2. isSameSlotSharingGroup() = true 3. areOperatorsChainable() = true (Map和Filter都是ALWAYS) 4. arePartitionerAndExchangeModeChainable() = true (ForwardPartitioner) 5. Union检查通过 结果:可链化 最终:Source -> Map -> Filter 三者链化到一个JobVertex中 Source(并行度=2) -> Map(并行度=4) // 并行度不匹配 Source -> Map 边: 1. isChainingEnabled() = true 2. isSameSlotSharingGroup() = true 3. areOperatorsChainable() = false (并行度不匹配) 结果:不可链化,需要网络传输 Source1 --\ Union -> Map Source2 --/ Source1 -> Union 边: 虽然满足前4个条件,但Union节点有两个输入边,typeNumber相同 Union检查失败,不可链化

得到了所有入口之后,就可以进行后续节点的链化操作了,它的逻辑在 createChain 方法中。这里主要是一个递归过程,先将节点的输出边分为可链化和不可链化两个 list,之后对可链化的边进行递归调用链化。对不可链化的边,需要创建出新的链。由于篇幅原因,这里只贴一部分核心的代码

publicstaticList<StreamEdge>createChain(finalIntegercurrentNodeId,finalintchainIndex,finalOperatorChainInfochainInfo,finalMap<Integer,OperatorChainInfo>chainEntryPoints,finalbooleancanCreateNewChain,finalExecutorserializationExecutor,finalJobVertexBuildContextjobVertexBuildContext,final@NullableConsumer<Integer>visitedStreamNodeConsumer){......// 拆分可链化边和不可链化边for(StreamEdgeoutEdge:currentNode.getOutEdges()){if(isChainable(outEdge,streamGraph)){chainableOutputs.add(outEdge);}else{nonChainableOutputs.add(outEdge);}}// 处理可链化边for(StreamEdgechainable:chainableOutputs){StreamNodetargetNode=streamGraph.getStreamNode(chainable.getTargetId());AttributetargetNodeAttribute=targetNode.getAttribute();if(isNoOutputUntilEndOfInput){if(targetNodeAttribute!=null){targetNodeAttribute.setNoOutputUntilEndOfInput(true);}}transitiveOutEdges.addAll(createChain(chainable.getTargetId(),chainIndex+1,chainInfo,chainEntryPoints,canCreateNewChain,serializationExecutor,jobVertexBuildContext,visitedStreamNodeConsumer));// Mark upstream nodes in the same chain as outputBlockingif(targetNodeAttribute!=null&&targetNodeAttribute.isNoOutputUntilEndOfInput()){currentNodeAttribute.setNoOutputUntilEndOfInput(true);}}// 处理不可链化边for(StreamEdgenonChainable:nonChainableOutputs){transitiveOutEdges.add(nonChainable);// Used to control whether a new chain can be created, this value is true in the// full graph generation algorithm and false in the progressive generation// algorithm. In the future, this variable can be a boolean type function to adapt// to more adaptive scenarios.if(canCreateNewChain){createChain(nonChainable.getTargetId(),1,// operators start at position 1 because 0 is for chained source// inputschainEntryPoints.computeIfAbsent(nonChainable.getTargetId(),(k)->chainInfo.newChain(nonChainable.getTargetId())),chainEntryPoints,canCreateNewChain,serializationExecutor,jobVertexBuildContext,visitedStreamNodeConsumer);}}// 创建 JobVertexStreamConfigconfig;if(currentNodeId.equals(startNodeId)){JobVertexjobVertex=jobVertexBuildContext.getJobVertex(startNodeId);if(jobVertex==null){jobVertex=createJobVertex(chainInfo,serializationExecutor,jobVertexBuildContext);}config=newStreamConfig(jobVertex.getConfiguration());}else{config=newStreamConfig(newConfiguration());}// 判断是否为起始节点,如果不是,将对应的配置信息存到链化起始节点的 key 中if(currentNodeId.equals(startNodeId)){chainInfo.setTransitiveOutEdges(transitiveOutEdges);jobVertexBuildContext.addChainInfo(startNodeId,chainInfo);config.setChainStart();config.setChainIndex(chainIndex);config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());config.setTransitiveChainedTaskConfigs(jobVertexBuildContext.getChainedConfigs().get(startNodeId));}else{config.setChainIndex(chainIndex);StreamNodenode=streamGraph.getStreamNode(currentNodeId);config.setOperatorName(node.getOperatorName());jobVertexBuildContext.getOrCreateChainedConfig(startNodeId).put(currentNodeId,config);}......}

是否可链化依赖于 isChainable 方法的结果。它主要判断了下游的输入边数量是否为1,然后调用了 isChainableInput,这个方法我们刚刚已经看过了。

publicstaticbooleanisChainable(StreamEdgeedge,StreamGraphstreamGraph){returnisChainable(edge,streamGraph,false);}publicstaticbooleanisChainable(StreamEdgeedge,StreamGraphstreamGraph,booleanallowChainWithDefaultParallelism){StreamNodedownStreamVertex=streamGraph.getTargetVertex(edge);returndownStreamVertex.getInEdges().size()==1&&isChainableInput(edge,streamGraph,allowChainWithDefaultParallelism);}

总结

本文我们主要介绍了生成 JobGraph 的相关代码。首先了解了 JobGraph 中的节点和边对应的类,以及它们和 StreamGraph 中的类的映射关系。然后又看了生成 JobGraph 的核心代码,其中重点学习了链化相关的代码。

最后补充一个生成 JobGraph 的调用链路,感兴趣的同学可以看下。

clusterClient.submitJob()MiniCluster.submitJob()Dispatcher.submitJob()JobMasterServiceLeadershipRunnerFactoryDefaultJobMasterServiceFactoryJobMasterDefaultSchedulerFactory.createInstance()StreamGraph.getJobGraph()
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 12:04:39

当AI遇上电影工业:一个让“抽卡式“视频生成彻底翻篇的工程化实践

从剧本到成片,我们如何用关键帧驱动理念重构AI视频生产流程 写在前面的话 说实话,当我第一次看到市面上那些"输入一句话就能生成视频"的AI工具时,内心是激动的。但当我真正尝试用它们做点正经事——比如制作一个30秒的品牌宣传片——我就发现了问题:生成的视频就像开…

作者头像 李华
网站建设 2026/4/15 14:15:24

当AI遇上A股:一个让机器读懂财经新闻的量化框架

"如果你能让机器理解新闻,它就能预测股价。"这听起来像科幻小说,但CSMD项目正在把这个想法变成现实。 一、缘起:股市预测为什么这么难? 周五下午三点,你盯着电脑屏幕上跳动的K线图,心里盘算着下周一该不该加仓。突然,手机推送了一条新闻:"某龙头企业获得重大技…

作者头像 李华
网站建设 2026/3/30 6:24:44

紫薯矮砧密植:水肥一体化系统的铺设要点纪实

红薯地里&#xff0c;老刘的紫薯长势喜人&#xff0c;薯块饱满。"这套水肥系统让我的紫薯产量提高四成&#xff0c;"他指着地里的滴灌设备说&#xff0c;"不仅省水省肥&#xff0c;紫薯品质还特别好。"认识紫薯矮砧密植紫薯矮砧密植&#xff0c;简单来说就…

作者头像 李华
网站建设 2026/4/16 13:44:51

拦截器注册InterceptorRegistry 实现讲解

1.核心概念InterceptorRegistry 是 Spring MVC 提供的拦截器注册器&#xff0c;用于配置拦截器的拦截规则。2.主要方法addInterceptor(): 添加拦截器 addPathPatterns(): 指定要拦截的路径 excludePathPatterns(): 指定要排除的路径 路径匹配规则 /api/**: 匹配 /api/ 下的所有…

作者头像 李华
网站建设 2026/4/16 13:44:08

软件缺少vbschs.dll文件 无法启动运行的情况 下载修复

在使用电脑系统时经常会出现丢失找不到某些文件的情况&#xff0c;由于很多常用软件都是采用 Microsoft Visual Studio 编写的&#xff0c;所以这类软件的运行需要依赖微软Visual C运行库&#xff0c;比如像 QQ、迅雷、Adobe 软件等等&#xff0c;如果没有安装VC运行库或者安装…

作者头像 李华
网站建设 2026/4/16 13:44:20

sglang 大模型推理框架支持的EAGLE 1,2,3

文章目录EAGLE 系列模型的演进与核心机制关键参数与训练逻辑思考参考来源&#xff1a;https://docs.sglang.com.cn/backend/speculative_decoding.html https://github.com/SafeAILab/EAGLE EAGLE3 https://arxiv.org/pdf/2503.01840 EAGLE 系列模型的演进与核心机制 EAGLE 基…

作者头像 李华