news 2026/6/10 14:58:35

Flink源码阅读:如何生成ExecutionGraph

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink源码阅读:如何生成ExecutionGraph

今天我们一起来了解 Flink 最后一种执行图,ExecutionGraph 的执行过程。

基本概念

在阅读源码之前,我们先来了解一下 ExecutionGraph 中的一些基本概念。

  • ExecutionJobVertex:ExecutionJobVertex 是 ExecutionGraph 中的节点,对应的是 JobGraph 中的 JobVertex。

  • ExecutionVertex:每个 ExecutionJobVertex 都包含了一组 ExecutionVertex,ExecutionVertex 的数量就是节点对应的并行度。

  • IntermediateResult:IntermediateResult 表示节点的输出结果,与之对应的是 JobGraph 中的 IntermediateDataSet。

  • IntermediateResultPartition:IntermediateResultPartition 是每个 ExecutionVertex 的输出。

  • EdgeManager:EdgeManager 主要负责存储 ExecutionGraph 中所有之间的连接,包括其并行度。

  • Execution:Execution 可以认为是一次实际的运行尝试。每次执行时,Flink 都会将ExecutionVertex 封装成一个 Execution,并通过一个 ExecutionAttemptID 来做唯一标识。

ExecutionGraph 生成过程

了解了这些基本概念之后,我们一起来看一下 ExecutionGraph 的具体生成过程。生成 ExecutionGraph 的代码入口是 DefaultExecutionGraphBuilder.build 方法。

首先是获取一些基本信息,包括 jobInformation、jobStatusChangedListeners 等。

接下来就是创建一个 DefaultExecutionGraph 和生成执行计划。

// create a new execution graph, if none exists so farfinalDefaultExecutionGraphexecutionGraph=newDefaultExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,executionHistorySizeLimit,classLoader,blobWriter,partitionGroupReleaseStrategyFactory,shuffleMaster,partitionTracker,executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,vertexAttemptNumberStore,vertexParallelismStore,isDynamicGraph,executionJobVertexFactory,jobGraph.getJobStatusHooks(),markPartitionFinishedStrategy,taskDeploymentDescriptorFactory,jobStatusChangedListeners,executionPlanSchedulingContext);try{executionGraph.setPlan(JsonPlanGenerator.generatePlan(jobGraph));}catch(Throwablet){log.warn("Cannot create plan for job",t);// give the graph an empty planexecutionGraph.setPlan(newJobPlanInfo.Plan("","","",newArrayList<>()));}

下面就是两个比较核心的方法 getVerticesSortedTopologicallyFromSources 和 attachJobGraph。

// topologically sort the job vertices and attach the graph to the existing oneList<JobVertex>sortedTopology=jobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology,jobManagerJobMetricGroup);

这两个方法是先将 JobVertex 进行排序,然后构建 ExecutionGraph 的拓扑图。

getVerticesSortedTopologicallyFromSources
publicList<JobVertex>getVerticesSortedTopologicallyFromSources()throwsInvalidProgramException{// early out on empty listsif(this.taskVertices.isEmpty()){returnCollections.emptyList();}List<JobVertex>sorted=newArrayList<JobVertex>(this.taskVertices.size());Set<JobVertex>remaining=newLinkedHashSet<JobVertex>(this.taskVertices.values());// start by finding the vertices with no input edges// and the ones with disconnected inputs (that refer to some standalone data set){Iterator<JobVertex>iter=remaining.iterator();while(iter.hasNext()){JobVertexvertex=iter.next();if(vertex.isInputVertex()){sorted.add(vertex);iter.remove();}}}intstartNodePos=0;// traverse from the nodes that were added until we found all elementswhile(!remaining.isEmpty()){// first check if we have more candidates to start traversing from. if not, then the// graph is cyclic, which is not permittedif(startNodePos>=sorted.size()){thrownewInvalidProgramException("The job graph is cyclic.");}JobVertexcurrent=sorted.get(startNodePos++);addNodesThatHaveNoNewPredecessors(current,sorted,remaining);}returnsorted;}

这段代码是将所有的节点进行排序,先将所有的 Source 节点筛选出来,然后再将剩余节点假如列表。这样就能构建出最终的拓扑图。

attachJobGraph
@OverridepublicvoidattachJobGraph(List<JobVertex>verticesToAttach,JobManagerJobMetricGroupjobManagerJobMetricGroup)throwsJobException{assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} "+"vertices and {} intermediate results.",verticesToAttach.size(),tasks.size(),intermediateResults.size());attachJobVertices(verticesToAttach,jobManagerJobMetricGroup);if(!isDynamic){initializeJobVertices(verticesToAttach);}// the topology assigning should happen before notifying new vertices to failoverStrategyexecutionTopology=DefaultExecutionTopology.fromExecutionGraph(this);partitionGroupReleaseStrategy=partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());}

attachJobGraph 方法主要包含两步逻辑,第一步是调用 attachJobVertices 方法创建 ExecutionJobVertex 实例,第二步是调用 fromExecutionGraph 创建一些其他的核心对象。

attachJobVertices

attachJobVertices 方法中就是遍历所有的 JobVertex,然后利用 JobVertex 生成 ExecutionJobVertex。

/** Attach job vertices without initializing them. */privatevoidattachJobVertices(List<JobVertex>topologicallySorted,JobManagerJobMetricGroupjobManagerJobMetricGroup)throwsJobException{for(JobVertexjobVertex:topologicallySorted){if(jobVertex.isInputVertex()&&!jobVertex.isStoppable()){this.isStoppable=false;}VertexParallelismInformationparallelismInfo=parallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graphExecutionJobVertexejv=executionJobVertexFactory.createExecutionJobVertex(this,jobVertex,parallelismInfo,coordinatorStore,jobManagerJobMetricGroup);ExecutionJobVertexpreviousTask=this.tasks.putIfAbsent(jobVertex.getID(),ejv);if(previousTask!=null){thrownewJobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",jobVertex.getID(),ejv,previousTask));}this.verticesInCreationOrder.add(ejv);this.numJobVerticesTotal++;}}

initializeJobVertices

在 DefaultExecutionGraph.initializeJobVertices 中是遍历了刚刚排好序的 JobVertex,获取了 ExecutionJobVertex 之后调用了 ExecutionGraph.initializeJobVertex 方法。

我们直接来看 ExecutionGraph.initializeJobVertex 的逻辑。

defaultvoidinitializeJobVertex(ExecutionJobVertexejv,longcreateTimestamp)throwsJobException{initializeJobVertex(ejv,createTimestamp,VertexInputInfoComputationUtils.computeVertexInputInfos(ejv,getAllIntermediateResults()::get));}

这里先是调用了 VertexInputInfoComputationUtils.computeVertexInputInfos 方法,生成了 Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos。它表示的是每个 ExecutionVertex 消费上游 IntermediateResultPartition 的范围。

这里有两种模式,分别是 POINTWISE (点对点)和 ALL_TO_ALL(全对全)

在 POINTWISE 模式中,会按照尽量均匀分布的方式处理。

  • 例如上游并发度是4,下游并发度是2时,那么前两个 IntermediateResultPartition 就会被第一个 ExecutionVertex 消费,后两个 IntermediateResultPartition 就会被第二个 ExecutionVertex 消费。

  • 如果上游并发度是2,下游是3时,那么下游前两个 IntermediateResultPartition 会被第一个 ExecutionVertex 消费,第三个 IntermediateResultPartition 则会被第二个 ExecutionVertex 消费。

publicstaticJobVertexInputInfocomputeVertexInputInfoForPointwise(intsourceCount,inttargetCount,Function<Integer,Integer>numOfSubpartitionsRetriever,booleanisDynamicGraph){finalList<ExecutionVertexInputInfo>executionVertexInputInfos=newArrayList<>();if(sourceCount>=targetCount){for(intindex=0;index<targetCount;index++){intstart=index*sourceCount/targetCount;intend=(index+1)*sourceCount/targetCount;IndexRangepartitionRange=newIndexRange(start,end-1);IndexRangesubpartitionRange=computeConsumedSubpartitionRange(index,1,()->numOfSubpartitionsRetriever.apply(start),isDynamicGraph,false,false);executionVertexInputInfos.add(newExecutionVertexInputInfo(index,partitionRange,subpartitionRange));}}else{for(intpartitionNum=0;partitionNum<sourceCount;partitionNum++){intstart=(partitionNum*targetCount+sourceCount-1)/sourceCount;intend=((partitionNum+1)*targetCount+sourceCount-1)/sourceCount;intnumConsumers=end-start;IndexRangepartitionRange=newIndexRange(partitionNum,partitionNum);// Variable used in lambda expression should be final or effectively finalfinalintfinalPartitionNum=partitionNum;for(inti=start;i<end;i++){IndexRangesubpartitionRange=computeConsumedSubpartitionRange(i,numConsumers,()->numOfSubpartitionsRetriever.apply(finalPartitionNum),isDynamicGraph,false,false);executionVertexInputInfos.add(newExecutionVertexInputInfo(i,partitionRange,subpartitionRange));}}}returnnewJobVertexInputInfo(executionVertexInputInfos);}

在 ALL_TO_ALL 模式中,每个下游都会消费所有上游的数据。

publicstaticJobVertexInputInfocomputeVertexInputInfoForAllToAll(intsourceCount,inttargetCount,Function<Integer,Integer>numOfSubpartitionsRetriever,booleanisDynamicGraph,booleanisBroadcast,booleanisSingleSubpartitionContainsAllData){finalList<ExecutionVertexInputInfo>executionVertexInputInfos=newArrayList<>();IndexRangepartitionRange=newIndexRange(0,sourceCount-1);for(inti=0;i<targetCount;++i){IndexRangesubpartitionRange=computeConsumedSubpartitionRange(i,targetCount,()->numOfSubpartitionsRetriever.apply(0),isDynamicGraph,isBroadcast,isSingleSubpartitionContainsAllData);executionVertexInputInfos.add(newExecutionVertexInputInfo(i,partitionRange,subpartitionRange));}returnnewJobVertexInputInfo(executionVertexInputInfos);}

生成好了 jobVertexInputInfos 之后,我们再回到 DefaultExecutionGraph.initializeJobVertex 方法中。

@OverridepublicvoidinitializeJobVertex(ExecutionJobVertexejv,longcreateTimestamp,Map<IntermediateDataSetID,JobVertexInputInfo>jobVertexInputInfos)throwsJobException{checkNotNull(ejv);checkNotNull(jobVertexInputInfos);jobVertexInputInfos.forEach((resultId,info)->this.vertexInputInfoStore.put(ejv.getJobVertexId(),resultId,info));ejv.initialize(executionHistorySizeLimit,rpcTimeout,createTimestamp,this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()),executionPlanSchedulingContext);ejv.connectToPredecessors(this.intermediateResults);for(IntermediateResultres:ejv.getProducedDataSets()){IntermediateResultpreviousDataSet=this.intermediateResults.putIfAbsent(res.getId(),res);if(previousDataSet!=null){thrownewJobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",res.getId(),res,previousDataSet));}}registerExecutionVerticesAndResultPartitionsFor(ejv);// enrich network memory.SlotSharingGroupslotSharingGroup=ejv.getSlotSharingGroup();if(areJobVerticesAllInitialized(slotSharingGroup)){SsgNetworkMemoryCalculationUtils.enrichNetworkMemory(slotSharingGroup,this::getJobVertex,shuffleMaster);}}

首先来看 ExecutionJobVertex.initialize 方法。这个方法主要是生成 IntermediateResult 和 ExecutionVertex。

protectedvoidinitialize(intexecutionHistorySizeLimit,Durationtimeout,longcreateTimestamp,SubtaskAttemptNumberStoreinitialAttemptCounts,ExecutionPlanSchedulingContextexecutionPlanSchedulingContext)throwsJobException{checkState(parallelismInfo.getParallelism()>0);checkState(!isInitialized());this.taskVertices=newExecutionVertex[parallelismInfo.getParallelism()];this.inputs=newArrayList<>(jobVertex.getInputs().size());// create the intermediate resultsthis.producedDataSets=newIntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];for(inti=0;i<jobVertex.getProducedDataSets().size();i++){finalIntermediateDataSetresult=jobVertex.getProducedDataSets().get(i);this.producedDataSets[i]=newIntermediateResult(result,this,this.parallelismInfo.getParallelism(),result.getResultType(),executionPlanSchedulingContext);}// create all task verticesfor(inti=0;i<this.parallelismInfo.getParallelism();i++){ExecutionVertexvertex=createExecutionVertex(this,i,producedDataSets,timeout,createTimestamp,executionHistorySizeLimit,initialAttemptCounts.getAttemptCount(i));this.taskVertices[i]=vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor(IntermediateResultir:this.producedDataSets){if(ir.getNumberOfAssignedPartitions()!=this.parallelismInfo.getParallelism()){thrownewRuntimeException("The intermediate result's partitions were not correctly assigned.");}}// set up the input splits, if the vertex has anytry{@SuppressWarnings("unchecked")InputSplitSource<InputSplit>splitSource=(InputSplitSource<InputSplit>)jobVertex.getInputSplitSource();if(splitSource!=null){ThreadcurrentThread=Thread.currentThread();ClassLoaderoldContextClassLoader=currentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try{inputSplits=splitSource.createInputSplits(this.parallelismInfo.getParallelism());if(inputSplits!=null){splitAssigner=splitSource.getInputSplitAssigner(inputSplits);}}finally{currentThread.setContextClassLoader(oldContextClassLoader);}}else{inputSplits=null;}}catch(Throwablet){thrownewJobException("Creating the input splits caused an error: "+t.getMessage(),t);}}

在创建 ExecutionVertex 时,会创建 IntermediateResultPartition 和 Execution,创建 Execution 时,会设置 attemptNumber,这个值默认是0,如果 ExecutionVertex 是重新调度的,那么 attemptNumber 会自增加1。

ExecutionJobVertex.connectToPredecessors 方法主要是生成 ExecutionVertex 与 IntermediateResultPartition 的关联关系。这里设置关联关系也分成了点对点和全对全两种模式处理,点对点模式需要计算 ExecutionVertex 对应的 IntermediateResultPartition index 的范围。两种模式最终都调用了 connectInternal 方法。

/** Connect all execution vertices to all partitions. */privatestaticvoidconnectInternal(List<ExecutionVertex>taskVertices,List<IntermediateResultPartition>partitions,ResultPartitionTyperesultPartitionType,EdgeManageredgeManager){checkState(!taskVertices.isEmpty());checkState(!partitions.isEmpty());ConsumedPartitionGroupconsumedPartitionGroup=createAndRegisterConsumedPartitionGroupToEdgeManager(taskVertices.size(),partitions,resultPartitionType,edgeManager);for(ExecutionVertexev:taskVertices){ev.addConsumedPartitionGroup(consumedPartitionGroup);}List<ExecutionVertexID>consumerVertices=taskVertices.stream().map(ExecutionVertex::getID).collect(Collectors.toList());ConsumerVertexGroupconsumerVertexGroup=ConsumerVertexGroup.fromMultipleVertices(consumerVertices,resultPartitionType);for(IntermediateResultPartitionpartition:partitions){partition.addConsumers(consumerVertexGroup);}consumedPartitionGroup.setConsumerVertexGroup(consumerVertexGroup);consumerVertexGroup.setConsumedPartitionGroup(consumedPartitionGroup);}

这个方法中 ev.addConsumedPartitionGroup(consumedPartitionGroup); 负责将 ExecutionVertex 到 IntermediateResultPartition 的关联关系保存在 EdgeManager.vertexConsumedPartitions 中。

而 partition.addConsumers(consumerVertexGroup); 则负责将 IntermediateResultPartition 到 ExecutionVertex 的关系保存在 EdgeManager.partitionConsumers 中。

总结

通过本文,我们了解了 Flink 是如何将 JobGraph 转换成 ExecutionGraph 的。其中涉及到的一些核心概念名称比较类似,建议认真学习和理解透彻之后再研究其生成方法和对应关系,也可以借助前文中 ExecutionGraph 示意图辅助学习。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/9 16:49:37

Git下载缓慢?国内镜像加速Stable Diffusion 3.5 FP8资源获取

Git下载缓慢&#xff1f;国内镜像加速Stable Diffusion 3.5 FP8资源获取 在AI生成内容&#xff08;AIGC&#xff09;爆发的今天&#xff0c;越来越多开发者和企业希望将文本到图像模型集成进自己的产品线。然而&#xff0c;现实往往令人沮丧&#xff1a;当你兴冲冲地准备部署最…

作者头像 李华
网站建设 2026/6/10 6:15:28

PyTorch DistributedDataParallel加速Qwen-Image-Edit-2509训练

PyTorch DistributedDataParallel 加速 Qwen-Image-Edit-2509 训练 在当今视觉内容爆炸式增长的背景下&#xff0c;电商平台、数字营销和社交媒体对图像处理的需求早已从“能修图”转向“智能修图”。传统的 Photoshop 流程难以应对每天成千上万张商品图的批量编辑需求。而随着…

作者头像 李华
网站建设 2026/6/10 12:50:27

MySQL 查询数据_笔记

SELECT —— 查询数据语法 -- mysql数据库中查询数据通用的SELECT语法 SELECT column1,column2,.... FORM table_name [WHERE condition] [ORDER BY column_name[ASC|DESC]] [LIMT number]-- column1,column2,...是想要选择的列的名称&#xff0c;如果使用*表示选择所有列。 -…

作者头像 李华
网站建设 2026/6/10 12:56:27

城通网盘直链提取:如何用免费工具突破下载速度限制

ctfileGet作为一款专注于城通网盘直链提取的免费工具&#xff0c;通过智能解析技术让文件下载变得简单高效。无论你是普通用户还是开发者&#xff0c;这款开源工具都能为你带来全新的下载加速体验&#xff0c;彻底告别繁琐的等待和广告干扰。 【免费下载链接】ctfileGet 获取城…

作者头像 李华
网站建设 2026/6/10 10:20:34

终极离线思维导图:DesktopNaotu桌面版脑图完整使用指南

终极离线思维导图&#xff1a;DesktopNaotu桌面版脑图完整使用指南 【免费下载链接】DesktopNaotu 桌面版脑图 (百度脑图离线版&#xff0c;思维导图) 跨平台支持 Windows/Linux/Mac OS. (A cross-platform multilingual Mind Map Tool) 项目地址: https://gitcode.com/gh_mi…

作者头像 李华
网站建设 2026/6/10 14:40:17

FLUX.1-dev + Three.js:打造3D可视化AI生成新体验

FLUX.1-dev Three.js&#xff1a;打造3D可视化AI生成新体验 在数字内容创作的前沿&#xff0c;我们正见证一场静默却深刻的变革——从“人工绘制”到“语言驱动”的视觉生产范式迁移。想象这样一个场景&#xff1a;设计师输入一句“极光下的机械森林&#xff0c;蒸汽朋克风格”…

作者头像 李华