在大语言模型(LLM)应用开发中,如何组织复杂的多步骤任务是一个核心挑战。传统的线性流程无法满足现代智能应用的需求,而LangGraph为我们提供了一套完整的解决方案。本文将深入探讨LangGraph的五大工作流模式,从基础到高级,带你全面掌握构建复杂LLM应用的精髓。
🧩 为什么需要工作流?
随着LLM能力的提升,我们不再满足于简单的问答系统。现代应用需要:
- 多步骤任务处理
- 动态决策能力
- 并行任务执行
- 状态持久化
- 可视化调试
传统的API调用方式无法满足这些需求。LangGraph通过图结构的方式,将LLM应用组织成可编排、可调试、可部署的工作流,为复杂应用开发提供了坚实的基础。
🔍 工作流 vs 智能体:核心概念
在构建LLM应用时,我们经常面临两种选择:
工作流(Workflow)
- 预定义路径:流程由开发者事先确定
- 固定执行:每个步骤按顺序执行
- 高可控性:易于调试和维护
- 适用于:结构化、可预测的任务
智能体(Agent)
- 动态决策:由LLM决定下一步操作
- 自适应性:能根据上下文调整策略
- 高灵活性:处理未知和复杂场景
- 适用于:开放性、探索性任务
| 特性 | 工作流 | 智能体 |
|---|---|---|
| 控制流 | 程序员预定义 | 由模型动态决定 |
| 工具选择 | 固定 | 模型可动态调用 |
| 灵活性 | 中 | 高 |
| 可控性 | 强 | 相对弱 |
| 适用场景 | 结构化任务 | 开放性任务 |
LangGraph的精妙之处在于,它既能构建严格的流程,又能实现智能的决策,完美融合了两种模式的优势。
⚙️ 基础设置
1. 环境准备
pipinstalllangchain_core langchain-anthropic langgraph2. 模型初始化
fromlangchain_anthropicimportChatAnthropic# 使用Claude模型llm=ChatAnthropic(model="claude-3-5-sonnet-latest")3. 增强型LLM
大模型可以通过以下方式增强:
结构化输出
frompydanticimportBaseModelclassSearchQuery(BaseModel):search_query:strjustification:strstructured_llm=llm.with_structured_output(SearchQuery)result=structured_llm.invoke("How does Calcium CT score relate to cholesterol?")工具调用
defmultiply(a:int,b:int):returna*b llm_with_tools=llm.bind_tools([multiply])msg=llm_with_tools.invoke("What is 2 times 3?")🚀 五大工作流模式详解
模式1:提示链(Prompt Chain)
适用于任务可固定分解的场景。
特点:
- 严格顺序执行
- 每个步骤都有明确目的
- 适合需要质量控制的流程
fromlanggraph.graphimportStateGraph,ENDfromlangchain_openaiimportChatOpenAI# 初始化模型llm=ChatOpenAI(model="gpt-4o-mini",temperature=0.7)# 定义状态classState(dict):topic:strtext:str# 定义节点defgenerate(state:State):topic=state["topic"]resp=llm.invoke(f"请根据主题 '{topic}' 创作一个简短笑话。")state["text"]=resp.contentreturnstatedefimprove(state:State):joke=state["text"]resp=llm.invoke(f"请把下面这个笑话改得更有趣:\n\n{joke}")state["text"]=resp.contentreturnstatedefpolish(state:State):joke=state["text"]resp=llm.invoke(f"请把下面这个笑话进一步润色,加入一点反转:\n\n{joke}")state["text"]=resp.contentreturnstate# 构建图workflow=StateGraph(State)workflow.add_node("generate",generate)workflow.add_node("improve",improve)workflow.add_node("polish",polish)workflow.add_edge("generate","improve")workflow.add_edge("improve","polish")workflow.add_edge("polish",END)workflow.set_entry_point("generate")app=workflow.compile()模式2:并行化(Parallelization)
适用于可拆分且可并行执行的任务。
特点:
- 提高处理效率
- 适合多视角分析
- 可用于投票或选择
importasyncioimportrandomclassNodeA:asyncdefrun(self,query):print("[A] 接收到用户输入:",query)awaitasyncio.sleep(0.5)return{"query":query}classNodeB1:asyncdefrun(self,context):print("[B1] 正在从文档库检索…")awaitasyncio.sleep(random.uniform(0.5,1.5))return{"retrieval":f"检索结果:与{context['query']}相关的文档片段"}classNodeB2:asyncdefrun(self,context):print("[B2] 正在向量库中匹配…")awaitasyncio.sleep(random.uniform(0.5,1.2))return{"embedding":f"向量相似度最高的是 chunk_X(query={context['query']})"}classNodeB3:asyncdefrun(self,context):print("[B3] 正在执行计算任务…")awaitasyncio.sleep(random.uniform(0.3,1.0))return{"calc":f"计算器结果:42(来自 query={context['query']})"}classNodeC:asyncdefrun(self,*results):print("[C] 正在汇总并行任务结果…")merged={}forrinresults:merged.update(r)returnmergedasyncdefmain():A=NodeA()B1=NodeB1()B2=NodeB2()B3=NodeB3()C=NodeC()context=awaitA.run("帮我介绍一下 RAG 工作流")tasks=[B1.run(context),B2.run(context),B3.run(context)]results=awaitasyncio.gather(*tasks)final_output=awaitC.run(*results)print(final_output)if__name__=="__main__":asyncio.run(main())模式3:路由(Routing)
适用于输入类型多样,需要分类处理的场景。
特点:
- 动态选择处理路径
- 支持复杂决策逻辑
- 提高系统灵活性
fromlanggraph.graphimportStateGraph,ENDfromlangchain_openaiimportChatOpenAI llm=ChatOpenAI(model="gpt-4o-mini",temperature=0)classState(dict):question:stranswer:strdefrouter(state:State):q=state["question"]ifany(xinqforxin["计算","多少","+","-","*","/","平方"]):return"math_solver"elifany(xinqforxin["翻译","translate","译成"]):return"translate"else:return"general_answer"defmath_solver(state:State):q=state["question"]resp=llm.invoke(f"请计算这个表达式并给出答案:{q}")state["answer"]=resp.contentreturnstatedeftranslate(state:State):q=state["question"]resp=llm.invoke(f"请将下面内容翻译成中文:{q}")state["answer"]=resp.contentreturnstatedefgeneral_answer(state:State):q=state["question"]resp=llm.invoke(f"请回答这个问题:{q}")state["answer"]=resp.contentreturnstate wf=StateGraph(State)wf.add_node("math_solver",math_solver)wf.add_node("translate",translate)wf.add_node("general_answer",general_answer)wf.add_node("router",router)wf.set_entry_point("router")wf.add_conditional_edges("router",router,{"math_solver":"math_solver","translate":"translate","general_answer":"general_answer",})wf.add_edge("math_solver",END)wf.add_edge("translate",END)wf.add_edge("general_answer",END)app=wf.compile()模式4:协调器-工作器(Coordinator–Worker)
最灵活、最强大的模式。
特点:
- 动态任务拆分
- 可扩展的架构
- 适合复杂任务分解
fromlangchain_openaiimportChatOpenAIfromlanggraph.graphimportStateGraph,END llm=ChatOpenAI(model="gpt-4o-mini",temperature=0.3)classState(dict):task:strsubtasks:listresults:listfinal:strdefcoordinator(state:State):task=state["task"]resp=llm.invoke(f"请将任务拆分成 3 个子任务(列表形式):{task}")try:state["subtasks"]=eval(resp.content)except:state["subtasks"]=[resp.content]state["results"]=[]returnstatedefworker_1(state:State):sub=state["subtasks"][0]resp=llm.invoke(f"执行子任务:{sub}")state["results"].append(resp.content)returnstatedefworker_2(state:State):sub=state["subtasks"][1]resp=llm.invoke(f"执行子任务:{sub}")state["results"].append(resp.content)returnstatedefworker_3(state:State):sub=state["subtasks"][2]resp=llm.invoke(f"执行子任务:{sub}")state["results"].append(resp.content)returnstatedefaggregator(state:State):results=state["results"]resp=llm.invoke(f"请将以下 3 个子任务的结果整合成一个完整回答:{results}")state["final"]=resp.contentreturnstate wf=StateGraph(State)wf.add_node("coordinator",coordinator)wf.add_node("worker_1",worker_1)wf.add_node("worker_2",worker_2)wf.add_node("worker_3",worker_3)wf.add_node("aggregator",aggregator)wf.set_entry_point("coordinator")wf.add_edge("coordinator","worker_1")wf.add_edge("coordinator","worker_2")wf.add_edge("coordinator","worker_3")wf.add_edge("worker_1","aggregator")wf.add_edge("worker_2","aggregator")wf.add_edge("worker_3","aggregator")wf.add_edge("aggregator",END)app=wf.compile()🛠️ LangGraph核心能力
| 能力 | 用途 | 实现方式 |
|---|---|---|
| 持久化 | 多轮对话保持状态 | checkpoint机制 |
| 流式处理 | 输出逐token返回 | stream()方法 |
| 工具调用 | 智能体动态使用函数 | bind_tools() |
| 调试 | 可视化图结构 | app.get_graph().draw_mermaid() |
| 部署 | Server模式 | app.serve() |
# 可视化调试importgraphviz app.get_graph().draw_mermaid_png("workflow.png")# 流式处理forchunkinapp.stream({"task":"生成报告"}):print(chunk)🌐 典型应用场景
| 场景 | 推荐模式 | 优势 |
|---|---|---|
| 固定流程、多步骤业务 | Prompt Chain | 结构清晰,易于维护 |
| 多模型同时运行、投票 | Parallelization | 提高效率,多视角分析 |
| 决策分支(NLP Router) | Routing | 动态分类,灵活处理 |
| 复杂、未知结构、智能体系统 | Coordinator–Worker | 高度灵活,可扩展性强 |
| RAG检索问答 | Routing + Tools | 精准匹配,智能决策 |
| 多轮对话任务 | Coordinator–Worker + Memory | 保持上下文,持续交互 |
💡 实战技巧
1. 状态管理最佳实践
classState(dict):task:strsubtasks:listresults:listfinal:str# 添加元数据timestamp:strsession_id:str2. 错误处理
try:result=app.invoke({"task":"生成报告"})exceptExceptionase:# 记录错误print(f"错误:{e}")# 重试机制result=retry_with_backoff(app,{"task":"生成报告"})3. 性能优化
# 使用缓存fromlangchain.cacheimportInMemoryCache llm=ChatOpenAI(model="gpt-4o-mini").bind(cache=InMemoryCache())# 批量处理batch_results=app.batch([{"task":"生成A"},{"task":"生成B"}])✅ 总结
通过本文的学习,我们掌握了LangGraph的五大核心工作流模式:
- 提示链:适合结构化任务
- 并行化:提高处理效率
- 路由:动态决策
- 协调器-工作器:最灵活的架构
- 混合模式:结合多种模式的优势
这些模式不是孤立的,而是可以组合使用。例如:
- 用路由选择处理路径
- 用提示链处理每个路径
- 用协调器-工作器处理复杂任务
LangGraph为我们提供了一套完整的工具集,让复杂LLM应用的开发变得像搭积木一样简单。无论是企业级应用还是个人项目,都能从中受益。