前言
在LangChain中,Chains(链)是构建AI应用工作流的核心概念。早期的LangChain提供了SequentialChain等传统方式,但配置繁琐且不够灵活。LangChain表达式语言(LCEL)的诞生,正是为了解决这些问题——它提供了一种声明式的、基于管道的方法来组合链,使得构建复杂、生产级的任务链变得异常简单和直观。
LCEL的出现让LangChain真正成为了一整套AI应用框架。无论你是处理文本预处理、检索增强生成(RAG),还是构建Agent工作流,LCEL都能以统一、简洁、高效的方式串联各个组件。
本文将深入拆解Runnable接口和LCEL的八大核心组件,通过大量代码示例带你从零到一掌握LangChain工作流的构建精髓。
一、Runnable:一切可运行单元的基石
1.1 什么是Runnable?
Runnable是LangChain中可以调用、批处理、流式传输、转换和组合的工作单元。简单来说,所有能被LangChain“运行”的东西都实现了Runnable接口——语言模型、输出解析器、检索器、编译的LangGraph图等,无一例外。
Runnable接口强制要求所有LCEL组件实现一组标准方法:
方法 | 功能描述 |
invoke / ainvoke | 将单个输入转换为输出 |
batch / abatch | 批量将多个输入转换为输出 |
stream / astream | 从单个输入生成流式输出 |
1.2 为什么需要统一调用方式?
在LCEL出现之前,LangChain各组件的调用方式各不相同:
- 提示词渲染用 .format()
- 模型调用用 .generate()
- 解析器解析用 .parse()
- 工具调用用 .run()
如果你需要串联一个“提示词 → 模型 → 解析器”的流程,代码会变得像这样:
# 传统方式:各组件的调用接口五花八门 prompt_text = prompt.format(topic="猫") # 方法1:format model_out = model.generate(prompt_text) # 方法2:generate result = parser.parse(model_out) # 方法3:parse每种组件都有自己的调用方式,你需要在不同API之间来回切换,代码的可读性和可维护性都大打折扣。
Runnable统一调用方式后,一切都变得优雅了:
# 分步调用:所有组件统一使用invoke prompt_text = prompt.invoke({"topic": "猫"}) # 方法1:invoke model_out = model.invoke(prompt_text) # 方法2:invoke result = parser.invoke(model_out) # 方法3:invoke无论组件的功能多么复杂(模型/提示词/工具),调用方式完全相同。这就是统一接口带来的巨大价值。
💡技术要点:Runnable接口不仅统一了调用方式,还内置了批处理和异步优化。默认情况下,batch()方法使用线程池并行执行invoke(),而异步方法(ainvoke、abatch、astream)默认使用asyncio的线程池执行同步版本。
二、LCEL:LangChain表达式语言
2.1 LCEL是什么?
LCEL(LangChain Expression Language)是一种声明式语言,用于从现有的Runnable构建新的Runnable。我们称使用LCEL创建的Runnable为“链”(Chain),而“链”本身也是Runnable——这意味着你可以链中套链,无限组合。
LCEL的两个主要组合原语是:
- RunnableSequence:顺序执行
- RunnableParallel:并行执行
许多其他组合原语(如RunnableBranch、RunnableWithFallbacks)都可以看作是这两个原语的变体。
2.2 LCEL的核心优势
根据LangChain官方文档,LCEL具有以下关键特性:
- 自动并行化:当LCEL链条中有可以并行执行的步骤时(例如从多个检索器中获取文档),LCEL会自动执行并行化以最小化延迟。
- 流式支持:支持在生成过程中逐步输出结果。
- 异步支持:提供完整的异步API,支持高并发场景。
- 跟踪和调试:自动生成执行轨迹,便于调试。
2.3 管道符的魔法
LCEL最直观的特性就是重载了 | 运算符,你可以像搭积木一样连接各个Runnable:
# 管道式组合 chain = prompt | model | parser # 一次性调用整个链 result = chain.invoke({"topic": "猫"})一行代码就完成了“提示词格式化 → 模型调用 → 输出解析”三个步骤的串联,简洁程度令人惊叹。
三、RunnableSequence:可运行序列
3.1 核心概念
RunnableSequence按顺序“链接”多个可运行对象,其中一个对象的输出作为下一个对象的输入。这是LangChain中使用最广泛的组合方式——几乎每条链都用到了它。
3.2 基础用法
LCEL重载了 | 运算符,从两个Runnables创建RunnableSequence:
chain = runnable1 | runnable2 # 等价于 chain = RunnableSequence([runnable1, runnable2])3.3 实战示例:笑话生成器
让我们通过一个完整的示例来感受RunnableSequence的魅力:
import os from langchain.chat_models import init_chat_model from langchain.core.prompts import PromptTemplate from langchain.core.output_parsers import StrOutputParser # Step 1: 创建提示词模板 # PromptTemplate是Runnable,可以使用invoke方法 prompt_template = PromptTemplate( template="讲一个关于{topic}的笑话", input_variables=["topic"], ) # Step 2: 初始化聊天模型 # init_chat_model返回的对象也是Runnable llm = init_chat_model( model="openai/gpt-oss-20b:free", # 模型名称 model_provider="openai", # 模型提供商 base_url="https://openrouter.ai/api/v1", # API端点 api_key=os.getenv("OPENROUTER_API_KEY"), ) # Step 3: 创建输出解析器 # StrOutputParser将模型输出转换为纯文本字符串 parser = StrOutputParser() # Step 4: 使用管道符构建链 # prompt_template的输出 → llm的输入 → parser的输入 → 最终结果 chain = prompt_template | llm | parser # Step 5: 执行链 resp = chain.invoke({"topic": "人工智能"}) print(resp)代码解读:
- PromptTemplate.invoke() 接收字典,将模板中的占位符替换为实际值,输出字符串
- llm.invoke() 接收字符串,返回AIMessage对象
- parser.invoke() 接收AIMessage,提取其中的文本内容并返回
三个组件通过 | 无缝衔接,数据自动流转。
3.4 高级特性:批处理与流式
RunnableSequence 自动支持批处理和流式处理:
# 批量处理多个输入 topics = ["人工智能", "机器学习", "深度学习"] results = chain.batch([{"topic": t} for t in topics]) # 流式输出(实时逐字返回) for chunk in chain.stream({"topic": "人工智能"}): print(chunk, end="", flush=True)性能提示:RunnableSequence的batch()和abatch()方法默认使用线程池和asyncio.gather,对于I/O密集型Runnables(如LLM调用),比顺序调用invoke快得多。
四、RunnableParallel:可运行并行
4.1 为什么需要并行?
在实际AI应用中,经常需要同时执行多个独立任务。例如:
- 同时生成笑话和诗歌
- 同时查询多个知识库
- 同时调用多个API获取数据
如果串行执行这些任务,总耗时是各任务耗时之和。但如果它们是独立的,完全可以并行执行,总耗时≈最长任务的耗时。
4.2 RunnableParallel的核心机制
RunnableParallel同时运行多个可运行对象,并为每个对象提供相同的输入。它的内部实现非常巧妙:
- 同步执行:使用ThreadPoolExecutor在线程池中并发执行
- 异步执行:使用asyncio.gather并发执行
在LCEL表达式中,字典会自动转换为RunnableParallel——这是一个极其方便的语法糖。
4.3 实战示例:多任务并行处理
import os from langchain.chat_models import init_chat_model from langchain_core.prompts import PromptTemplate from langchain_core.runnables import RunnableParallel from langchain_core.output_parsers import StrOutputParser llm = init_chat_model( model="openai/gpt-oss-20b:free", model_provider="openai", base_url="https://openrouter.ai/api/v1", api_key=os.getenv("OPENROUTER_API_KEY"), ) # 定义第一个子链:生成笑话 joke_chain = ( PromptTemplate.from_template("讲一个关于{topic}的笑话") | llm | StrOutputParser() ) # 定义第二个子链:生成诗歌 poem_chain = ( PromptTemplate.from_template("写一首关于{topic}的诗歌") | llm | StrOutputParser() ) # 方式1:显式使用RunnableParallel map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain) # 方式2:使用字典语法糖(推荐) # map_chain = {"joke": joke_chain, "poem": poem_chain} # 执行:两个任务并行运行 resp = map_chain.invoke({"topic": "人工智能"}) print(resp) # 输出: {'joke': '...笑话内容...', 'poem': '...诗歌内容...'}代码解读:
- joke_chain和poem_chain共享同一个输入{"topic": "人工智能"}
- 两个链同时执行,互不干扰
- 最终输出是一个字典,键为joke和poem,值为各自链的输出
4.4 字典语法的隐式转换机制
这里有一个经常让初学者困惑的点:为什么可以直接在链中写字典?
答案是LangChain的隐式转换(Coercion)机制。当你使用管道符|构建链时,RunnableSequence会检查每一个步骤:
- 如果它是一个字典(Dict),系统会自动调用RunnableParallel(dict)将其包装
- 字典中的每个Value也会被递归地转换为Runnable(例如lambda函数会被转为RunnableLambda)
因此,你可以写出如此简洁的代码:
# 这种写法... chain = {"joke": joke_chain, "poem": poem_chain} | combine_chain # ...等价于这种 chain = RunnableParallel({"joke": joke_chain, "poem": poem_chain}) | combine_chain⚡ 性能实战:如果每个任务耗时1秒,串行执行需要2秒,而并行执行只需约1秒。这就是并行带来的性能提升。
五、RunnableLambda:自定义函数转换
5.1 核心概念
RunnableLambda将Python可调用函数转换为Runnable,使得函数可以在同步或异步上下文中使用。这意味着你可以将任何自定义函数无缝集成到LCEL链中。
5.2 基础用法
from langchain_core.runnables import RunnableLambda # 方式1:构造函数 chain = { "text1": lambda x: x + " world", "text2": lambda x: x + ", how are you", } | RunnableLambda(lambda x: len(x["text1"]) + len(x["text2"])) result = chain.invoke("hello") print(result) # 输出: 295.3 装饰器语法
LangChain还提供了便捷的@chain装饰器,功能等同于RunnableLambda:
from langchain_core.runnables import chain @chain def total_len(x): return len(x["text1"]) + len(x["text2"]) chain = { "text1": lambda x: x + " world", "text2": lambda x: x + ", how are you", } | total_len result = chain.invoke("hello") print(result) # 输出: 295.4 ⚠️ 重要限制:函数必须接收单个参数
一个容易被忽视的限制是:自定义函数必须接收单个参数。如果你的函数需要多个参数,应该使用字典来包装输入:
# ❌ 错误写法:函数需要多个参数 def multiple_length(text1, text2): return len(text1) * len(text2) # ✅ 正确写法:用字典包装 def multiple_length(data): return len(data["text1"]) * len(data["text2"])5.5 流式限制
RunnableLambda默认不支持流式传输(stream)。如果你需要在自定义函数中支持流式处理,需要使用RunnableGenerator替代。
六、RunnablePassthrough:数据透传与上下文保留
6.1 核心概念
RunnablePassthrough接收输入并将其原样输出,是LCEL体系中的“无操作节点”。它的作用看起来很简单,但在实际应用中极其有用:
- 在流水线中透传输入或保留上下文
- 向输出中添加额外的键值对
6.2 基础用法:保留原始输入
from langchain_core.runnables import RunnablePassthrough, RunnableParallel # 同时输出原始输入和计算结果 chain = RunnableParallel( original=RunnablePassthrough(), # 原样透传输入 word_count=lambda x: len(x), # 计算单词数量 ) result = chain.invoke("hello world") print(result) # {'original': 'hello world', 'word_count': 11}6.3 高级用法:assign()添加键
assign()方法可以在透传输入的同时,向输出中添加额外的键值对:
from langchain_core.runnables import RunnablePassthrough chain = { "text1": lambda x: x + " world", "text2": lambda x: x + ", how are you", } | RunnablePassthrough.assign( word_count=lambda x: len(x["text1"] + x["text2"]) ) result = chain.invoke("hello") print(result) # 输出: {'text1': 'hello world', 'text2': 'hello, how are you', 'word_count': 29}6.4 实战场景:RAG中的上下文保留
在RAG(检索增强生成)应用中,RunnablePassthrough尤为有用:
# 典型的RAG链结构 rag_chain = ( { "context": retriever, # 检索相关内容 "question": RunnablePassthrough() # 透传用户问题 } | prompt_template | llm | parser )这个例子中:
- retriever从向量数据库检索相关文档
- RunnablePassthrough()将用户问题原样传递
- 两者合并后送入提示词模板,生成最终的LLM输入
七、RunnableBranch:条件分支与智能路由
7.1 核心概念
RunnableBranch使用(条件,Runnable)对列表和默认分支进行初始化。对输入进行操作时,选择第一个计算结果为True的条件,并在输入上运行相应的Runnable。如果没有条件为True,则在输入上运行默认分支。
简单来说,RunnableBranch就是链式if/elif/else在LangChain中的实现。
7.2 基础示例
from langchain_core.runnables import RunnableBranch branch = RunnableBranch( (lambda x: isinstance(x, str), lambda x: x.upper()), # 条件1: 字符串 → 转大写 (lambda x: isinstance(x, int), lambda x: x + 1), # 条件2: 整数 → 加1 (lambda x: isinstance(x, float), lambda x: x * 2), # 条件3: 浮点数 → 乘2 lambda x: "goodbye", # 默认分支 ) print(branch.invoke("hello")) # 输出: HELLO (字符串 → 大写) print(branch.invoke(5)) # 输出: 6 (整数 → 加1) print(branch.invoke(None)) # 输出: goodbye (无匹配 → 默认分支)7.3 智能客服路由实战
在实际应用中,RunnableBranch非常适合做智能路由——根据用户输入类型选择不同的处理链:
def detect_topic(input_text): """根据输入文本检测主题类型""" if "天气" in input_text: return "weather" elif "新闻" in input_text: return "news" else: return "general" branch_chain = RunnableBranch( (lambda x: detect_topic(x["input"]) == "weather", weather_chain), (lambda x: detect_topic(x["input"]) == "news", news_chain), general_chain, # 默认分支 ) result = branch_chain.invoke({"input": "今天天气怎么样?"})7.4 执行机制要点
- 顺序评估:条件按照声明顺序依次检查
- 短路执行:第一个满足条件的分支被执行后,后续条件不再检查
- 必须有默认分支:如果所有条件均不满足且未设置默认分支,系统将抛出异常
八、RunnableWithFallbacks:容错与回退
8.1 核心概念
在生产环境中,外部API(如语言模型的API)可能会经历性能下降甚至停机。在这些情况下,拥有一个回退Runnable非常有用。
RunnableWithFallbacks使得Runnable失败后可以回退到其他Runnable。回退按顺序尝试,直到某个成功或全部失败。
8.2 基础用法
import os from langchain.chat_models import init_chat_model from langchain_core.prompts import PromptTemplate from langchain_core.runnables import RunnableLambda llm = init_chat_model( model="openai/gpt-oss-20b:free", model_provider="openai", base_url="https://openrouter.ai/api/v1", api_key=os.getenv("OPENROUTER_API_KEY"), ) # 原始链(可能会失败) chain = PromptTemplate.from_template("hello") | llm # 添加回退机制 chain_with_fallback = chain.with_fallbacks([ RunnableLambda(lambda x: "抱歉,服务暂时不可用") ]) # 示例:提示词模板中没有需要填充的变量,会报错 # with_fallbacks会自动捕获异常并执行回退 result = chain_with_fallback.invoke("1") print(result) # 输出: 抱歉,服务暂时不可用8.3 更优雅的用法:按需回退
你可以通过with_fallbacks方法的exceptions_to_handle参数,指定哪些异常触发回退:
# 只在特定异常(如API限流)时触发回退 chain_with_fallback = chain.with_fallbacks( fallbacks=[fallback_chain], exceptions_to_handle=(RateLimitError, APIConnectionError) )8.4 实用场景:多模型备份
一个典型的使用场景是模型降级——当主力模型(如GPT-4)因限流不可用时,自动切换到备用模型(如GPT-3.5-turbo):
# 主力模型链 primary_chain = prompt | gpt4_model | parser # 备用模型链 fallback_chain = prompt | gpt35_model | parser # 构建带回退的生产级链 robust_chain = primary_chain.with_fallbacks([fallback_chain])💡 技术提示:在流式处理中,回退只会在流创建阶段的失败时触发。流已经开始后发生的错误不会触发回退机制。
总结
LCEL和Runnable系列组件构成了LangChain工作流的基石,通过本文的学习,我们可以总结出以下要点:
组件 | 一句话总结 | 典型场景 |
Runnable | 一切可运行单元的标准化接口 | 统一invoke/batch/stream调用 |
LCEL | 声明式管道语法 | 用 | 简化链式组合 |
RunnableSequence | 顺序执行流水线 | 提示词→模型→解析器 |
RunnableParallel | 并行执行多个任务 | 多任务并发、多模型对比 |
RunnableLambda | 自定义函数转换 | 数据清洗、格式转换 |
RunnablePassthrough | 数据透传与上下文保留 | RAG中的问题透传 |
RunnableBranch | 条件分支路由 | 智能客服、多意图识别 |
RunnableWithFallbacks | 容错与回退机制 | 模型降级、高可用保障 |
最佳实践建议
- 优先使用字典语法创建RunnableParallel:代码更简洁,且享受LangChain的隐式转换机制
- 合理利用批处理:使用batch()替代循环调用invoke(),性能提升显著
- 为生产系统添加回退:LLM API不可预测,with_fallbacks是保障可用性的最佳实践
- RAG场景善用RunnablePassthrough:透传用户输入,与检索结果完美融合
- 注意RunnableLambda的流式限制:如需流式输出自定义逻辑,使用RunnableGenerator
掌握这些LCEL核心组件,你就掌握了LangChain工作流构建的精髓。无论是简单的问答系统,还是复杂的Agent应用,都可以通过像搭积木一样灵活组合的方式,快速构建出生产级的AI应用。