LangGraph 生产环境持久化学习总结
核心概念
1. 三种持久化方式对比
| 方式 | 说明 | 适用场景 | 持久化 |
|---|---|---|---|
InMemorySaver | 内存存储 | 测试环境 | ❌ 重启丢失 |
PostgresSaver | PostgreSQL 存储 | 生产环境推荐 | ✅ 持久化 |
RedisSaver | Redis 存储 | 高并发场景 | ✅ 持久化 |
2. State(状态)- 工作流中共享的数据结构
classcustomstate(AgentState):user_name:str# 用户姓名conversation_count:int=0# 对话次数3. Checkpointer(检查点)- 短期记忆
作用:保存对话历史和工作流状态
- 记录每次对话的状态
- 支持多轮对话
- 提供 Time Travel 功能(状态回溯)
4. Store(存储)- 长期记忆
作用:保存用户画像、知识库等跨会话数据
- 用户信息(姓名、偏好等)
- 知识库元数据
- 跨会话的业务数据
一、PostgreSQL Checkpointer 使用
1.1 数据库准备
-- 创建数据库(如果还没有)CREATEDATABASEdengmingfang OWNER dengmingfang;-- 赋予权限GRANTALLPRIVILEGESONDATABASEdengmingfangTOdengmingfang;1.2 代码实现
fromlanggraph.checkpoint.postgresimportPostgresSaver# 数据库连接字符串DB_URL="postgresql://dengmingfang:123456@localhost:5432/dengmingfang"# 使用上下文管理器withPostgresSaver.from_conn_string(DB_URL)ascheckpointer:# 第一次使用必须调用 setup() 初始化数据库表checkpointer.setup()# 创建 graph,绑定 checkpointergraph=create_react_agent(model=llm,tools=[username,greet_user],state_schema=customstate,checkpointer=checkpointer)# 配置参数config={"configurable":{"thread_id":"user_123",# 会话 ID"username":"张三"}}# 多轮对话(状态自动持久化)result1=graph.invoke({"messages":[{"role":"user","content":"第一次对话"}]},config)result2=graph.invoke({"messages":[{"role":"user","content":"第二次对话"}]},config# 相同 thread_id,状态会累加)1.3 查看状态
# 查看当前状态state_snapshot=graph.get_state(config)print(f"next:{state_snapshot.next}")print(f"values:{state_snapshot.values}")# 查看历史记录forsnapshotingraph.get_state_history(config):print(f"checkpoint_id:{snapshot.config['configurable']['checkpoint_id']}")二、Time Travel(时间回溯)
2.1 获取历史状态
# 获取所有历史记录history=list(graph.get_state_history(config))# 倒数第二条记录snapshot=history[-2]2.2 回溯并修改状态
# 从历史记录恢复new_config=graph.update_state(snapshot.config,{"messages":["这是人工修改的消息"]})# 从修改后的状态继续result=graph.invoke({"messages":[{"role":"user","content":"继续执行"}]},new_config)2.3 应用场景
- 调试:回溯到某个节点重新执行
- 错误回滚:撤销错误的工具调用
- 流程优化:对比不同执行路径的结果
三、Store(长期记忆)
3.1 保存用户信息
fromlanggraph.store.postgresimportPostgresStorewithPostgresStore.from_conn_string(DB_URL)asstore:# 初始化表store.setup()# 保存用户画像store.put(namespace=("profile",),# 命名空间key="user_123",value={"name":"邓","age":26,"preferences":["技术","阅读"]})3.2 读取用户信息
# 读取用户数据user_profile=store.get(("profile",),"user_123")ifuser_profile:print(user_profile.value)# 输出:{'name': '邓茗芳', 'age': 27, 'preferences': ['技术', '阅读']}3.3 更新用户信息
# 更新(会覆盖原值)store.put(("profile",),"user_123",{**user_profile.value,"last_login":"2026-01-29","login_count":user_profile.value.get("login_count",0)+1})四、完整生产环境架构
4.1 架构图
┌─────────────────────────────────────────────────────────┐ │ LangGraph Application │ ├─────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌──────────────┐ │ │ │ Graph │────────▶│ Checkpointer │ │ │ │ (State) │ │ (PostgreSQL) │ │ │ └─────────────┘ └──────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────┐ ┌──────────────┐ │ │ │ Store │ │ Vector Store │ │ │ │ (PostgreSQL)│ │ (FAISS) │ │ │ └─────────────┘ └──────────────┘ │ │ │ │ │ │ └───────────────────────┘ │ │ │ │ │ ▼ │ │ PostgreSQL 数据库 │ └─────────────────────────────────────────────────────────┘4.2 完整代码示例
fromlanggraph.checkpoint.postgresimportPostgresSaverfromlanggraph.store.postgresimportPostgresStorefromlangchain_community.vectorstoresimportFAISSfromlanggraph.prebuilt.chat_agent_executorimportcreate_react_agent DB_URL="postgresql://dengmingfang:123456@localhost:5432/dengmingfang"withPostgresSaver.from_conn_string(DB_URL)ascheckpointer,\ PostgresStore.from_conn_string(DB_URL)asstore:# 初始化checkpointer.setup()store.setup()# 加载向量库vector_store=FAISS.load_local("/path/to/faiss_index",embeddings=embeddings,allow_dangerous_deserialization=True)# 创建 graphgraph=create_react_agent(model=llm,tools=[username,greet_user,rag_query],state_schema=customstate,checkpointer=checkpointer,store=store)# 执行result=graph.invoke({"messages":[{"role":"user","content":"查询商品"}]},{"configurable":{"thread_id":"user_123"}})五、数据库表结构
5.1 Checkpointer 自动创建的表
-- 检查点表CREATETABLEcheckpoints(thread_idTEXT,checkpoint_nsTEXT,checkpoint_idTEXT,parent_checkpoint_idTEXT,typeTEXT,checkpointJSONB,metadata JSONB,PRIMARYKEY(thread_id,checkpoint_ns,checkpoint_id));-- 写入表CREATETABLEcheckpoint_writes(thread_idTEXT,checkpoint_nsTEXT,checkpoint_idTEXT,task_idTEXT,idxINTEGER,channelTEXT,typeTEXT,valueJSONB,PRIMARYKEY(thread_id,checkpoint_ns,checkpoint_id,task_id,idx));5.2 Store 自动创建的表
-- 存储表CREATETABLEstore(namespaceTEXT,keyTEXT,valueJSONB,created_atTIMESTAMP,updated_atTIMESTAMP,PRIMARYKEY(namespace,key));六、最佳实践
6.1 配置优化
# 使用连接池importpsycopg2.pool DB_URL="postgresql://dengmingfang:123456@localhost:5432/dengmingfang?pool_size=10&max_overflow=20"# 设置超时checkpointer=PostgresSaver.from_conn_string(DB_URL,checkpointer_options={"connection_timeout":30,"command_timeout":60})6.2 错误处理
frompsycopg2importOperationalErrortry:checkpointer=PostgresSaver.from_conn_string(DB_URL)checkpointer.setup()exceptOperationalErrorase:print(f"数据库连接失败:{e}")# 使用降级方案(如 InMemorySaver)checkpointer=InMemorySaver()6.3 数据备份
# 备份数据库pg_dump -U dengmingfang dengmingfang>backup.sql# 恢复数据库psql -U dengmingfang dengmingfang<backup.sql七、测试环境 vs 生产环境
7.1 测试环境(InMemorySaver)
fromlanggraph.checkpoint.memoryimportInMemorySaver# 简单、快速,但重启后丢失checkpointer=InMemorySaver()graph=create_react_agent(model=llm,tools=tools,checkpointer=checkpointer)7.2 生产环境(PostgresSaver)
fromlanggraph.checkpoint.postgresimportPostgresSaver# 持久化、可靠、支持回溯checkpointer=PostgresSaver.from_conn_string(DB_URL)checkpointer.setup()graph=create_react_agent(model=llm,tools=tools,checkpointer=checkpointer)八、常见问题
8.1 数据库连接失败
# 错误:psycopg2.OperationalError# 解决:检查数据库服务、用户名密码、网络连接# 验证连接importpsycopg2 conn=psycopg2.connect(DB_URL)print("连接成功")8.2 表已存在错误
# 第二次运行时 setup() 会报错# 解决:捕获异常或先检查try:checkpointer.setup()exceptExceptionase:print(f"表可能已存在:{e}")8.3 状态丢失
# 确保 thread_id 一致config={"configurable":{"thread_id":"same_id"}}# 每次都用同一个 configresult1=graph.invoke({"messages":[...]},config)result2=graph.invoke({"messages":[...]},config)# 状态会累加九、性能优化
9.1 批量操作
# 批量存储用户数据foruser_id,user_datainuser_list:store.put(("profile",),user_id,user_data)9.2 定期清理
-- 清理 30 天前的历史记录DELETEFROMcheckpointsWHEREcreated_at<NOW()-INTERVAL'30 days';9.3 索引优化
-- 为常用查询创建索引CREATEINDEXidx_checkpoints_thread_idONcheckpoints(thread_id);CREATEINDEXidx_store_namespaceONstore(namespace);十、总结
关键要点
Checkpointer:短期记忆,保存对话历史
- 生产环境用
PostgresSaver - 测试环境用
InMemorySaver - 必须调用
setup()初始化表
- 生产环境用
Store:长期记忆,保存用户画像
- 独立于 checkpointer
- 支持命名空间分类存储
put()/get()/delete()操作
Time Travel:状态回溯
get_state_history()获取历史update_state()修改状态- 支持从任意历史节点恢复
生产环境:
- PostgreSQL 作为持久化存储
- checkpointer + store 配合使用
- 定期备份数据库
- 配置连接池优化性能
运行完整示例
python /www/learning_langchain/langgraph_state_learining.py输出示例:
============================================================ PostgreSQL 持久化示例 ============================================================ ✓ 数据库表初始化完成 第一次对话: 响应: 祝贺你,张三!这是第 1 次对话 第二次对话(状态已持久化): 响应: 祝贺你,张三!这是第 2 次对话 当前状态快照: - next: () - config: {'configurable': {'thread_id': 'user_123', 'username': '张三'}} - values: {...}