前文我们介绍了 Flink 的四种执行图,并且通过源码了解了 Flink 的 StreamGraph 是怎么生成的,本文我们就一起来看下 Flink 的另一种执行图——JobGraph 是如何生成的。
StreamGraph 和 JobGraph 的区别
在正式开始之前,我们再来回顾一下 StreamGraph 和 JobGraph 的区别。假设我们的任务是建造一座大楼,StreamGraph 就像是设计蓝图,它描述了每个窗户、每根水管的位置和规格,而 JobGraph 像是给到施工队的施工流程图,它描述了每个任务模块,例如先把地基浇筑好,再铺设管线等。总的来说,JobGraph 更偏向执行层面,它是由 StreamGraph 优化而来。
回到 Flink 本身,我们通过一个表格来了解两个图的区别。
| StreamGraph | JobGraph | |
|---|---|---|
| 生成阶段 | 客户端,执行 execute() 时 | 客户端,提交前由 StreamGraph 转换生成 |
| 抽象层级 | 高层逻辑图,直接对应 API | 优化后的执行图,为调度做准备 |
| 核心优化 | 无 | 主要是算子链优化 |
| 节点 | StreamNode | JobVertex |
| 边 | StreamEdge | JobEdge |
| 提交对象 | 无 | 提交给 JobManager |
| 包含资源 | 无 | 包含执行作业所需的 Jar 包、依赖库和资源文件 |
JobVertex
JobGraph 中的节点是 JobVertex,在 StreamGraph 转换成 JobGraph 的过程中,会将多个节点串联起来,最终生成 JobVertex。
JobVertex包含以下成员变量:
我们分别看一下这些成员变量及其作用。
1、标识符相关
// JobVertex的id,在作业执行过程中的唯一标识。监控、调度和故障恢复都会使用privatefinalJobVertexIDid;// operator id列表,按照深度优先顺序存储。operator 的管理、状态分配都会用到privatefinalList<OperatorIDPair>operatorIDs;2、输入输出相关
// 定义所有的输入边privatefinalList<JobEdge>inputs=newArrayList<>();// 定义所有的输出数据集privatefinalMap<IntermediateDataSetID,IntermediateDataSet>results=newLinkedHashMap<>();// 输入分片源,主要用于批处理作业,定义如何将数据分成多个片privateInputSplitSource<?>inputSplitSource;3、执行配置相关
// 并行度,即运行时拆分子任务数量,默认使用全局配置privateintparallelism=ExecutionConfig.PARALLELISM_DEFAULT;// 最大并行度privateintmaxParallelism=MAX_PARALLELISM_DEFAULT;// 存储运行时实际执行的类,使 Flink 可以灵活处理不同类型的操作符// 流任务可以是"org.apache.flink.streaming.runtime.tasks.StreamTask"// 批任务可以是"org.apache.flink.runtime.operators.BatchTask"privateStringinvokableClassName;// 自定义配置privateConfigurationconfiguration;// 是否是动态设置并发度privatebooleandynamicParallelism=false;// 是否支持优雅停止privatebooleanisStoppable=false;4、资源管理相关
// JobVertex 最小资源需求privateResourceSpecminResources=ResourceSpec.DEFAULT;// JobVertex 推荐资源需求privateResourceSpecpreferredResources=ResourceSpec.DEFAULT;// 用于资源优化,运行不同的 JobVertex 的子任务运行在同一个 slot@NullableprivateSlotSharingGroupslotSharingGroup;// 需要严格共址的 JobVertex 组,每个 JobVertex 的第 n 个子任务运行在同一个 TaskManager@NullableprivateCoLocationGroupImplcoLocationGroup;5、协调器
// 操作符协调器,用于处理全局协调逻辑privatefinalList<SerializedValue<OperatorCoordinator.Provider>>operatorCoordinators=newArrayList<>();6、显示和描述信息
// JobVertex 的名称privateStringname;// 操作符名称,比如 'Flat Map' 或 'Join'privateStringoperatorName;// 操作符的描述,比如 'Hash Join' 或 'Sorted Group Reduce'privateStringoperatorDescription;// 提供比 name 更友好的描述信息privateStringoperatorPrettyName;7、状态和行为标志
// 是否支持同一个子任务并发多次执行privatebooleansupportsConcurrentExecutionAttempts=true;// 标记并发度是否被显式设置privatebooleanparallelismConfigured=false;// 是否有阻塞型输出privatebooleananyOutputBlocking=false;8、缓存数据集
// 存储该 JobVertex 需要消费的缓存中间数据集的 ID,可提高作业执行效率privatefinalList<IntermediateDataSetID>intermediateDataSetIdsToConsume=newArrayList<>();JobEdge
在 StreamGraph 中,StreamEdge 是连接 StreamNode 的桥梁。在 JobGraph 中,与之对应的是 JobEdge,不同点在于 JobEdge 中保存的是输入节点和输出结果。
1、连接关系成员
// 定义数据流向哪个 JobVertexprivatefinalJobVertextarget;// 定义这条边的源数据privatefinalIntermediateDataSetsource;// 输入类型的编号privatefinalinttypeNumber;// 多个输入间的键是否相关,如果为 true,相同键的数据在一个输入被分割时,在其他数据对应的记录也会发送到相同的下游节点privatefinalbooleaninterInputsKeysCorrelated;// 同一输入内相同的键是否必须发送到同一下游任务privatefinalbooleanintraInputKeyCorrelated;2、数据分发模式
// 定义数据在并行任务期间的分发模式// 可能值:// ALL_TO_ALL:全连接,每个上游子任务连接所有下游任务// POINTWISE:点对点连接,一对一或一对多的本地连接privatefinalDistributionPatterndistributionPattern;3、数据传输策略
// 是否为广播连接privatefinalbooleanisBroadcast;// 是否为 forward 连接,forward 连接最高效,直接转发,无需序列化网络传输privatefinalbooleanisForward;// 数据传输策略名称,用于显示privateStringshipStrategyName;4、状态重分布映射器
// 下游状态重分布映射器,当作业扩容时,决定是否重新分配下游算子的持久化状态privateSubtaskStateMapperdownstreamSubtaskStateMapper=SubtaskStateMapper.ROUND_ROBIN;// 上游状态重分布映射器,当作业扩容时,决定是否重新分配上游算子的持久化状态privateSubtaskStateMapperupstreamSubtaskStateMapper=SubtaskStateMapper.ROUND_ROBIN;5、描述和缓存信息
// 预处理操作的名称privateStringpreProcessingOperationName;// 操作符级别缓存的描述privateStringoperatorLevelCachingDescription;StreamGraph 转换成 JobGraph
现在我们再来看一下 StreamGraph 是如何转换成 JobGraph 的。转换逻辑的入口是 StreamGraph.getJobGraph 方法。它只是调用了 StreamingJobGraphGenerator.createJobGraph,核心逻辑在 createJobGraph 方法中。
privateJobGraphcreateJobGraph(){// 预验证,检查 StreamGraph 配置正确性preValidate(streamGraph,userClassloader);// 【核心】链化操作符setChaining();if(jobGraph.isDynamic()){// 支持动态扩缩容场景,为动态图设置并行度setVertexParallelismsForDynamicGraphIfNecessary();}// Note that we set all the non-chainable outputs configuration here because the// "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job// vertices and partition-reusefinalMap<Integer,Map<StreamEdge,NonChainedOutput>>opIntermediateOutputs=newHashMap<>();// 设置不能链化的输出边setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs,jobVertexBuildContext);setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);// 设置物理边连接setPhysicalEdges(jobVertexBuildContext);// 设置支持并发执行的 JobVertexmarkSupportingConcurrentExecutionAttempts(jobVertexBuildContext);// 验证混合 shuffle 模式只在批处理模式下使用validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext);// 设置 Slot 共享和协同定位setSlotSharingAndCoLocation(jobVertexBuildContext);// 设置托管内存比例setManagedMemoryFraction(jobVertexBuildContext);// 为 JobVertex 名称添加前缀addVertexIndexPrefixInVertexName(jobVertexBuildContext,newAtomicInteger(0));// 设置操作符描述信息setVertexDescription(jobVertexBuildContext);// Wait for the serialization of operator coordinators and stream config.// 序列化操作符协调器和流配置serializeOperatorCoordinatorsAndStreamConfig(serializationExecutor,jobVertexBuildContext);returnjobGraph;}可以看到,在 createJobGraph 方法中,调用了 setChaining 方法,即进行链化操作。这也是 JobGraph 最核心的优化之一。下面我们来看一下具体怎么做链化。
privatevoidsetChaining(){// we separate out the sources that run as inputs to another operator (chained inputs)// from the sources that needs to run as the main (head) operator.finalMap<Integer,OperatorChainInfo>chainEntryPoints=buildChainedInputsAndGetHeadInputs();finalCollection<OperatorChainInfo>initialEntryPoints=chainEntryPoints.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).map(Map.Entry::getValue).collect(Collectors.toList());// iterate over a copy of the values, because this map gets concurrently modifiedfor(OperatorChainInfoinfo:initialEntryPoints){createChain(info.getStartNodeId(),1,// operators start at position 1 because 0 is for chained source inputsinfo,chainEntryPoints,true,serializationExecutor,jobVertexBuildContext,null);}}setChaining 方法中主要分为两步,第一步是处理 Source 节点,将可以链化的 Source 和不能链化的 Source 节点分开。先来看如何判断一个 Source 是否可被链化。
publicstaticbooleanisChainableSource(StreamNodestreamNode,StreamGraphstreamGraph){// 最基本的一些判空,输出边数量为1if(streamNode.getOperatorFactory()==null||!(streamNode.getOperatorFactory()instanceofSourceOperatorFactory)||streamNode.getOutEdges().size()!=1){returnfalse;}finalStreamEdgesourceOutEdge=streamNode.getOutEdges().get(0);finalStreamNodetarget=streamGraph.getStreamNode(sourceOutEdge.getTargetId());finalChainingStrategytargetChainingStrategy=Preconditions.checkNotNull(target.getOperatorFactory()).getChainingStrategy();// 链化策略必须 HEAD_WITH_SOURCES,输出边是可链化的returntargetChainingStrategy==ChainingStrategy.HEAD_WITH_SOURCES&&isChainableInput(sourceOutEdge,streamGraph,false);}privatestaticbooleanisChainableInput(StreamEdgeedge,StreamGraphstreamGraph,booleanallowChainWithDefaultParallelism){StreamNodeupStreamVertex=streamGraph.getSourceVertex(edge);StreamNodedownStreamVertex=streamGraph.getTargetVertex(edge);if(!(streamGraph.isChainingEnabled()// 上下游节点是否在同一个 slot 共享组&&upStreamVertex.isSameSlotSharingGroup(downStreamVertex)// 操作符是否可以链化,主要做并行度检查&&areOperatorsChainable(upStreamVertex,downStreamVertex,streamGraph,allowChainWithDefaultParallelism)// 分区器和交换模式是否支持链化&&arePartitionerAndExchangeModeChainable(edge.getPartitioner(),edge.getExchangeMode(),streamGraph.isDynamic()))){returnfalse;}// check that we do not have a union operation, because unions currently only work// through the network/byte-channel stack.// we check that by testing that each "type" (which means input position) is used only once// 检查是否为 Union 操作,Union 操作不能链化for(StreamEdgeinEdge:downStreamVertex.getInEdges()){if(inEdge!=edge&&inEdge.getTypeNumber()==edge.getTypeNumber()){returnfalse;}}returntrue;}Source 的链化条件主要就是这些,我们结合一些例子来看一下。
Source(并行度=4) -> Map(并行度=4) -> Filter(并行度=4) Source -> Map 边: 1. isChainingEnabled() = true 2. isSameSlotSharingGroup() = true (都在默认组) 3. areOperatorsChainable() = true (Source可链化,Map是HEAD_WITH_SOURCES) 4. arePartitionerAndExchangeModeChainable() = true (ForwardPartitioner) 5. Union检查通过 结果:可链化 Map -> Filter 边: 1. isChainingEnabled() = true 2. isSameSlotSharingGroup() = true 3. areOperatorsChainable() = true (Map和Filter都是ALWAYS) 4. arePartitionerAndExchangeModeChainable() = true (ForwardPartitioner) 5. Union检查通过 结果:可链化 最终:Source -> Map -> Filter 三者链化到一个JobVertex中 Source(并行度=2) -> Map(并行度=4) // 并行度不匹配 Source -> Map 边: 1. isChainingEnabled() = true 2. isSameSlotSharingGroup() = true 3. areOperatorsChainable() = false (并行度不匹配) 结果:不可链化,需要网络传输 Source1 --\ Union -> Map Source2 --/ Source1 -> Union 边: 虽然满足前4个条件,但Union节点有两个输入边,typeNumber相同 Union检查失败,不可链化得到了所有入口之后,就可以进行后续节点的链化操作了,它的逻辑在 createChain 方法中。这里主要是一个递归过程,先将节点的输出边分为可链化和不可链化两个 list,之后对可链化的边进行递归调用链化。对不可链化的边,需要创建出新的链。由于篇幅原因,这里只贴一部分核心的代码
publicstaticList<StreamEdge>createChain(finalIntegercurrentNodeId,finalintchainIndex,finalOperatorChainInfochainInfo,finalMap<Integer,OperatorChainInfo>chainEntryPoints,finalbooleancanCreateNewChain,finalExecutorserializationExecutor,finalJobVertexBuildContextjobVertexBuildContext,final@NullableConsumer<Integer>visitedStreamNodeConsumer){......// 拆分可链化边和不可链化边for(StreamEdgeoutEdge:currentNode.getOutEdges()){if(isChainable(outEdge,streamGraph)){chainableOutputs.add(outEdge);}else{nonChainableOutputs.add(outEdge);}}// 处理可链化边for(StreamEdgechainable:chainableOutputs){StreamNodetargetNode=streamGraph.getStreamNode(chainable.getTargetId());AttributetargetNodeAttribute=targetNode.getAttribute();if(isNoOutputUntilEndOfInput){if(targetNodeAttribute!=null){targetNodeAttribute.setNoOutputUntilEndOfInput(true);}}transitiveOutEdges.addAll(createChain(chainable.getTargetId(),chainIndex+1,chainInfo,chainEntryPoints,canCreateNewChain,serializationExecutor,jobVertexBuildContext,visitedStreamNodeConsumer));// Mark upstream nodes in the same chain as outputBlockingif(targetNodeAttribute!=null&&targetNodeAttribute.isNoOutputUntilEndOfInput()){currentNodeAttribute.setNoOutputUntilEndOfInput(true);}}// 处理不可链化边for(StreamEdgenonChainable:nonChainableOutputs){transitiveOutEdges.add(nonChainable);// Used to control whether a new chain can be created, this value is true in the// full graph generation algorithm and false in the progressive generation// algorithm. In the future, this variable can be a boolean type function to adapt// to more adaptive scenarios.if(canCreateNewChain){createChain(nonChainable.getTargetId(),1,// operators start at position 1 because 0 is for chained source// inputschainEntryPoints.computeIfAbsent(nonChainable.getTargetId(),(k)->chainInfo.newChain(nonChainable.getTargetId())),chainEntryPoints,canCreateNewChain,serializationExecutor,jobVertexBuildContext,visitedStreamNodeConsumer);}}// 创建 JobVertexStreamConfigconfig;if(currentNodeId.equals(startNodeId)){JobVertexjobVertex=jobVertexBuildContext.getJobVertex(startNodeId);if(jobVertex==null){jobVertex=createJobVertex(chainInfo,serializationExecutor,jobVertexBuildContext);}config=newStreamConfig(jobVertex.getConfiguration());}else{config=newStreamConfig(newConfiguration());}// 判断是否为起始节点,如果不是,将对应的配置信息存到链化起始节点的 key 中if(currentNodeId.equals(startNodeId)){chainInfo.setTransitiveOutEdges(transitiveOutEdges);jobVertexBuildContext.addChainInfo(startNodeId,chainInfo);config.setChainStart();config.setChainIndex(chainIndex);config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());config.setTransitiveChainedTaskConfigs(jobVertexBuildContext.getChainedConfigs().get(startNodeId));}else{config.setChainIndex(chainIndex);StreamNodenode=streamGraph.getStreamNode(currentNodeId);config.setOperatorName(node.getOperatorName());jobVertexBuildContext.getOrCreateChainedConfig(startNodeId).put(currentNodeId,config);}......}是否可链化依赖于 isChainable 方法的结果。它主要判断了下游的输入边数量是否为1,然后调用了 isChainableInput,这个方法我们刚刚已经看过了。
publicstaticbooleanisChainable(StreamEdgeedge,StreamGraphstreamGraph){returnisChainable(edge,streamGraph,false);}publicstaticbooleanisChainable(StreamEdgeedge,StreamGraphstreamGraph,booleanallowChainWithDefaultParallelism){StreamNodedownStreamVertex=streamGraph.getTargetVertex(edge);returndownStreamVertex.getInEdges().size()==1&&isChainableInput(edge,streamGraph,allowChainWithDefaultParallelism);}总结
本文我们主要介绍了生成 JobGraph 的相关代码。首先了解了 JobGraph 中的节点和边对应的类,以及它们和 StreamGraph 中的类的映射关系。然后又看了生成 JobGraph 的核心代码,其中重点学习了链化相关的代码。
最后补充一个生成 JobGraph 的调用链路,感兴趣的同学可以看下。
clusterClient.submitJob()→MiniCluster.submitJob()→Dispatcher.submitJob()→JobMasterServiceLeadershipRunnerFactory→DefaultJobMasterServiceFactory→JobMaster→DefaultSchedulerFactory.createInstance()→StreamGraph.getJobGraph()