news 2026/6/9 19:33:37

Flink源码阅读:状态管理

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink源码阅读:状态管理

前面我们介绍了 Flink 状态的分类和应用。今天从源码层面再看一下 Flink 是如何管理状态的。

State 概述

关于 State 的详细介绍可以参考 Flink学习笔记:状态类型和应用 和 Flink学习笔记:状态后端这两篇文章,为了方面阅读,这里我们再简单介绍一下。

State 使用

State 是 Flink 做复杂逻辑所依赖的核心组件。它的分类如下

常见的是 Keyed State 和 Operator State,Keyed State 作用于 KeyedStream 上,Operator State 可以作用于所有的 Operator 上。Keyed State 使用时,需要先创建 StateDescriptor,然后再调用 getState 获取。

ValueStateDescriptor<Tuple2<Long,Long>>descriptor=newValueStateDescriptor<>("average",TypeInformation.of(newTypeHint<Tuple2<Long,Long>>(){}));ValueState<Tuple2<Long,Long>>sum=getRuntimeContext().getState(descriptor);

Opeartor State 的获取方式与 Keyed State 类似,都需要 StateDescriptor。Operator State 在定义时需要实现 CheckpointedFunction。

State 存储

State Backend 用来管理 State 存储,根据存储格式和存储类型的组合,可以分为三类:

  1. MemoryStateBackend:HashMapStateBackend 和 JobManagerCheckpointStorage 的组合,即将 State 以 Java 对象的形式存储在 JobManager 内存中。

  2. FsStateBackend:HashMapStateBackend 和 FileSystemCheckpointStorage 的组合,将 State 以 Java 对象的形式存储在远端文件系统中。

  3. RocksDBStateBackend:EmbeddedRocksDBStateBackend 和 FileSystemCheckpointStorage 的组合,State 序列化后存储在 RocksDB。

创建 State Backend

创建 State Backend 的入口在 StreamTask,StreamTask 是 Flink 部署和运行在 TaskManager 的基本单元。

在 StreamTask 的 invoke 方法中,会先调用 restoreStateAndGates 方法去创建 State Backend。完整的调用链路如下图所示。

在 streamOperatorStateContext 方法中,分别调用了 keyedStatedBackend 和 operatorStateBackend 来创建两种 State Backend。

我们先来看 keyedStateBackend 的逻辑。

protected<K,RextendsDisposable&Closeable>RkeyedStatedBackend(TypeSerializer<K>keySerializer,StringoperatorIdentifierText,PrioritizedOperatorSubtaskStateprioritizedOperatorSubtaskStates,CloseableRegistrybackendCloseableRegistry,MetricGroupmetricGroup,doublemanagedMemoryFraction,StateObject.StateObjectSizeStatsCollectorstatsCollector,KeyedStateBackendCreator<K,R>keyedStateBackendCreator)throwsException{if(keySerializer==null){returnnull;}......finalKeyGroupRangekeyGroupRange=KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(taskInfo.getMaxNumberOfParallelSubtasks(),taskInfo.getNumberOfParallelSubtasks(),taskInfo.getIndexOfThisSubtask());// Now restore processing is included in backend building/constructing process, so we need// to make sure// each stream constructed in restore could also be closed in case of task cancel, for// example the data// input stream opened for serDe during restore.CloseableRegistrycancelStreamRegistryForRestore=newCloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);BackendRestorerProcedure<R,KeyedStateHandle>backendRestorer=newBackendRestorerProcedure<>((stateHandles)->{KeyedStateBackendParametersImpl<K>parameters=newKeyedStateBackendParametersImpl<>(...);returnkeyedStateBackendCreator.create(...),parameters);},backendCloseableRegistry,logDescription);try{returnbackendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState(),statsCollector);}finally{if(backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)){IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}}

这里的创建过程也比较简单,先是获取 KeyGroupRange,它表示的是当前 Operator 上处理的 key 的范围。然后就是创建 StateBackend 实例,这里通过 BackendRestorerProcedure 封装统一的恢复、异常处理和资源清理逻辑。operatorStateBackend 方法的逻辑相比较来说,只是少了 KeyGroupRange 的处理,直接创建 StateBackend 实例。

创建和使用 State

创建 KeyedState

KeyedState 是通过调用 StreamingRuntimeContext.getState 方法获取的。我们先来看完整的调用流程。

在调用 getState 这些方法时,都会先调用 keyedStateStore 提供的方法,它是 Flink 提供的一个封装 keyedStateBackend 的接口。调用流程的最后,是调用 keyedStateBackend 中的 createOrUpdateInternalState 方法(这里我们以 HeapStateBackend 为例)。

public<N,SV,SEV,SextendsState,ISextendsS>IScreateOrUpdateInternalState(@NonnullTypeSerializer<N>namespaceSerializer,@NonnullStateDescriptor<S,SV>stateDesc,@NonnullStateSnapshotTransformFactory<SEV>snapshotTransformFactory,booleanallowFutureMetadataUpdates)throwsException{StateTable<K,N,SV>stateTable=tryRegisterStateTable(namespaceSerializer,stateDesc,getStateSnapshotTransformFactory(stateDesc,snapshotTransformFactory),allowFutureMetadataUpdates);@SuppressWarnings("unchecked")IScreatedState=(IS)createdKVStates.get(stateDesc.getName());if(createdState==null){StateCreateFactorystateCreateFactory=STATE_CREATE_FACTORIES.get(stateDesc.getType());if(stateCreateFactory==null){thrownewFlinkRuntimeException(stateNotSupportedMessage(stateDesc));}createdState=stateCreateFactory.createState(stateDesc,stateTable,getKeySerializer());}else{StateUpdateFactorystateUpdateFactory=STATE_UPDATE_FACTORIES.get(stateDesc.getType());if(stateUpdateFactory==null){thrownewFlinkRuntimeException(stateNotSupportedMessage(stateDesc));}createdState=stateUpdateFactory.updateState(stateDesc,stateTable,createdState);}createdKVStates.put(stateDesc.getName(),createdState);returncreatedState;}privatestaticfinalMap<StateDescriptor.Type,StateCreateFactory>STATE_CREATE_FACTORIES=Stream.of(Tuple2.of(StateDescriptor.Type.VALUE,(StateCreateFactory)HeapValueState::create),Tuple2.of(StateDescriptor.Type.LIST,(StateCreateFactory)HeapListState::create),Tuple2.of(StateDescriptor.Type.MAP,(StateCreateFactory)HeapMapState::create),Tuple2.of(StateDescriptor.Type.AGGREGATING,(StateCreateFactory)HeapAggregatingState::create),Tuple2.of(StateDescriptor.Type.REDUCING,(StateCreateFactory)HeapReducingState::create)).collect(Collectors.toMap(t->t.f0,t->t.f1));

这里首先是注册了一个 StateTable,这个是 State 中一个非常重要的成员变量,它内部是一个类似 Map 的结构,用来保存 key 和 key 的状态。

STATE_CREATE_FACTORIES 这个变量保存了不同类型的 State 和它对应的创建方法,同理 STATE_UPDATE_FACTORIES 保存的是不同 State 对应的 更新方法。

创建 OperatorState

看完了 KeyedState 的创建过程后,我们再来看下 OperatorState 的创建过程。

OperatorState 的创建方法是通过 FunctionInitializationContext 先获取到 OperatorStateStore,它与 KeyedStateStore 类似,都是对 StateBackend 的方法进行了封装。

@OverridepublicvoidinitializeState(FunctionInitializationContextcontext)throwsException{ListStateDescriptor<Tuple2<String,Integer>>descriptor=newListStateDescriptor<>("buffered-elements",TypeInformation.of(newTypeHint<Tuple2<String,Integer>>(){}));checkpointedState=context.getOperatorStateStore().getListState(descriptor);if(context.isRestored()){for(Tuple2<String,Integer>element:checkpointedState.get()){bufferedElements.add(element);}}}

OperatorStateStore 的 getListState 方法中,直接创建出了 PartitionableListState,同时也做了一些缓存操作。

private<S>ListState<S>getListState(ListStateDescriptor<S>stateDescriptor,OperatorStateHandle.Modemode)throwsStateMigrationException{......PartitionableListState<S>partitionableListState=(PartitionableListState<S>)registeredOperatorStates.get(name);if(null==partitionableListState){// no restored state for the state name; simply create new state holderpartitionableListState=newPartitionableListState<>(newRegisteredOperatorStateBackendMetaInfo<>(name,partitionStateSerializer,mode));registeredOperatorStates.put(name,partitionableListState);}else{......}accessedStatesByName.put(name,partitionableListState);returnpartitionableListState;}

PartitionableListState 内部有一个 ArrayList 用于保存数据。

使用 KeyedState

了解完 State 的创建之后,接下来就是 State 的使用了。我们以 HeapValueState 为例来看如何获取 State。

// HeapValueState 类publicVvalue(){finalVresult=stateTable.get(currentNamespace);if(result==null){returngetDefaultValue();}returnresult;}

在 HeapValueState 类的 value 方法中,直接调用 StateTable 的 get 方法,最终调用的是 CopyOnWriteStateMap 的 get 方法,这个方法与 HashMap 的 get 方法比较类似。

publicSget(Kkey,Nnamespace){finalinthash=computeHashForOperationAndDoIncrementalRehash(key,namespace);finalintrequiredVersion=highestRequiredSnapshotVersion;finalStateMapEntry<K,N,S>[]tab=selectActiveTable(hash);intindex=hash&(tab.length-1);for(StateMapEntry<K,N,S>e=tab[index];e!=null;e=e.next){finalKeKey=e.key;finalNeNamespace=e.namespace;if((e.hash==hash&&key.equals(eKey)&&namespace.equals(eNamespace))){// copy-on-write check for stateif(e.stateVersion<requiredVersion){// copy-on-write check for entryif(e.entryVersion<requiredVersion){e=handleChainedEntryCopyOnWrite(tab,hash&(tab.length-1),e);}e.stateVersion=stateMapVersion;e.state=getStateSerializer().copy(e.state);}returne.state;}}returnnull;}
使用 OperatorState

OperatorState 底层使用的是 PartitionableListState,前面也提到了,它的内部用了一个 ArrayList 来保存数据,对于 OperatorState 的各种操作也都是来操作这个 ArrayList。

@Overridepublicvoidclear(){internalList.clear();}@OverridepublicIterable<S>get(){returninternalList;}@Overridepublicvoidadd(Svalue){Preconditions.checkNotNull(value,"You cannot add null to a ListState.");internalList.add(value);}@Overridepublicvoidupdate(List<S>values){internalList.clear();addAll(values);}@OverridepublicvoidaddAll(List<S>values){Preconditions.checkNotNull(values,"List of values to add cannot be null.");if(!values.isEmpty()){for(Svalue:values){checkNotNull(value,"Any value to add to a list cannot be null.");add(value);}}}

总结

本文对 State 的相关代码进行了梳理。包括 StateBackend 的创建,KeyedState 和 OperatorState 的创建和使用。State 和 Checkpoint 两者需要结合使用,因此后面我们会再梳理 Checkpoint 的相关代码。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 9:57:00

光伏并网系统这玩意儿说白了就是让太阳能板发的电乖乖并入电网。今天咱们直接拆开看里面的几个硬核模块,手把手撸点仿真代码,搞明白这些电路和控制策略到底怎么玩

三相光伏并网仿真模型 Boost&#xff0b;三相逆变器 PLL锁相环 MPPT最大功率点跟踪控制(扰动观察法) dq解耦控制 电流内环电压外环的并网控制策略先来看Boost电路这哥们。光伏板的输出电压经常不够高&#xff0c;得靠它来抬电压。核心逻辑就是调节占空比让输出电压稳定在设定值…

作者头像 李华
网站建设 2026/6/10 14:31:35

ARMv8-M架构IPSR寄存器读取函数解析

这是一个用于 ARMv8-M 架构 的 C 语言内联汇编函数&#xff0c;用于读取 IPSR&#xff08;中断程序状态寄存器&#xff09; 的值。 函数功能解析&#xff1a; 1. 函数声明 static inline uint32_t getipsr(void)static inline&#xff1a;内联函数&#xff0c;编译器会将函数…

作者头像 李华
网站建设 2026/6/10 12:26:13

SpringBoot+vue养老院运营管理系统

摘 要 目前&#xff0c;我国人口逐步进入老龄化&#xff0c;老人院事业在我国将会进入发展期&#xff0c;更多的年轻人会选择把家里的老人送到养老院&#xff0c;让自己的父母能够安享晚年。随之而来的是老人的安全问题&#xff0c;儿女会担心老人们能否在养老院幸福安全地生活…

作者头像 李华
网站建设 2026/6/10 16:02:44

基于springboot生鲜农产品保鲜及溯源管理系

基于Spring Boot的生鲜农产品保鲜及溯源管理系统是一个集成了多种先进技术的信息化管理系统&#xff0c;旨在提高生鲜农产品供应链的透明度和信息化水平。以下是对该系统的详细介绍&#xff1a; 一、系统背景与意义 随着人们生活水平的提高和健康意识的增强&#xff0c;消费者对…

作者头像 李华
网站建设 2026/6/10 12:29:31

基于springboot企业支付费用管控平台

基于Spring Boot的企业支付费用管控平台是一个集成了多种支付管理和费用控制功能的综合性系统。以下是对该平台的详细介绍&#xff1a; 一、平台概述 该平台以Spring Boot框架为核心&#xff0c;结合前端技术&#xff08;如Vue.js等&#xff09;和关系型数据库&#xff08;如My…

作者头像 李华
网站建设 2026/6/9 18:50:25

C#面试题分享(一)

1.什么是C#&#xff0c;它的主要特点是什么&#xff1f; C#&#xff08;发音为C Sharp&#xff09;是一种由微软开发的现代、通用、面向对象的编程语言&#xff0c;作为.NET框架的一 部分&#xff0c;主要用于开发Windows应用、Web应用、移动应用等。其主要特点包括&#xff1a…

作者头像 李华