AI Agent 编排实战:从工具调用到多 Agent 协作的任务拆解与执行框架
一、单次对话搞不定复杂任务——Agent 编排的工程化刚需
让 AI 一次性完成"分析竞品定价策略、生成报告、发送邮件"这种多步骤任务,结果:要么步骤遗漏、要么上下文丢失、要么中间步骤出错后无法恢复。更常见的情况是,AI 自信地给出了一个看似完整但逻辑断裂的答案。
问题根源:大语言模型是单轮预测引擎,不是任务执行器。它擅长生成文本,不擅长规划步骤、调用工具、处理异常、维护状态。Agent 编排的本质,就是把 LLM 的生成能力包装成一个可控的任务执行框架。
Agent 编排的核心挑战:
- 任务拆解:复杂任务如何分解为可执行的原子步骤
- 工具调用:如何让 LLM 安全、可靠地调用外部 API
- 状态管理:多步骤执行过程中如何维护上下文和中间结果
- 异常恢复:某步骤失败时如何重试、回退或降级
- 多 Agent 协作:不同角色的 Agent 如何分工与通信
二、AI Agent 编排的架构与执行机制
Agent 编排的核心模式:ReAct 循环(Reasoning + Acting)——思考下一步做什么,执行动作,观察结果,再思考。
graph TB subgraph "Agent 执行循环" A[接收任务] --> B[推理: 分析当前状态] B --> C[决策: 选择下一步动作] C --> D{动作类型?} D -->|工具调用| E[执行工具] D -->|子任务委派| F[委派给子 Agent] D -->|直接回答| G[生成最终结果] E --> H[观察: 获取执行结果] F --> H H --> I{任务完成?} I -->|否| B I -->|是| G end subgraph "工具注册表" J[搜索工具] K[数据库查询] L[文件读写] M[API 调用] end subgraph "状态管理" N[对话历史] O[中间结果] P[执行日志] end E --> J & K & L & M B --> N & O & P编排模式对比:
| 模式 | 适用场景 | 复杂度 | 可控性 |
|---|---|---|---|
| 单 Agent + 工具 | 简单任务,步骤 < 5 | 低 | 高 |
| 多 Agent 串行 | 有明确依赖的流水线任务 | 中 | 高 |
| 多 Agent 并行 | 无依赖的子任务可并行 | 中 | 中 |
| 层级 Agent | 复杂任务需分解-委派-汇总 | 高 | 中低 |
三、生产级 Agent 编排框架实现
3.1 工具定义与注册
from dataclasses import dataclass, field from typing import Callable, Any import json @dataclass class ToolDefinition: """工具定义:描述工具的输入输出规范,供 LLM 理解和调用""" name: str description: str # 工具功能描述,LLM 据此判断何时调用 parameters: dict # JSON Schema 格式的参数定义 required: list[str] # 必填参数列表 executor: Callable # 实际执行函数 timeout_seconds: int = 30 # 执行超时时间 max_retries: int = 2 # 最大重试次数 class ToolRegistry: """工具注册表:集中管理所有可用工具""" def __init__(self): self._tools: dict[str, ToolDefinition] = {} def register(self, tool: ToolDefinition): """注册工具""" self._tools[tool.name] = tool def get(self, name: str) -> ToolDefinition | None: return self._tools.get(name) def get_all_schemas(self) -> list[dict]: """获取所有工具的 JSON Schema 描述,供 LLM 函数调用使用""" return [ { "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.parameters, "required": tool.required, }, } for tool in self._tools.values() ] async def execute(self, name: str, arguments: dict) -> Any: """执行工具调用,带超时和重试""" tool = self._tools.get(name) if not tool: raise ValueError(f"未知工具: {name}") # 参数校验:检查必填参数是否存在 for param in tool.required: if param not in arguments: raise ValueError(f"工具 {name} 缺少必填参数: {param}") last_error = None for attempt in range(tool.max_retries + 1): try: import asyncio result = await asyncio.wait_for( tool.executor(**arguments), timeout=tool.timeout_seconds, ) return result except asyncio.TimeoutError: last_error = f"工具 {name} 执行超时 ({tool.timeout_seconds}s)" except Exception as e: last_error = f"工具 {name} 执行失败: {str(e)}" raise RuntimeError(f"工具 {name} 重试耗尽: {last_error}")3.2 ReAct Agent 核心
import asyncio from enum import Enum from dataclasses import dataclass, field class ActionKind(Enum): TOOL_CALL = "tool_call" FINAL_ANSWER = "final_answer" @dataclass class AgentAction: """Agent 的动作决策""" kind: ActionKind tool_name: str | None = None tool_args: dict | None = None answer: str | None = None reasoning: str = "" # 推理过程,用于可解释性 @dataclass class AgentState: """Agent 执行状态:维护对话历史和中间结果""" task: str messages: list[dict] = field(default_factory=list) tool_results: list[dict] = field(default_factory=list) step_count: int = 0 max_steps: int = 15 # 防止无限循环 class ReActAgent: """ReAct 模式 Agent:推理-行动-观察循环""" def __init__(self, llm_client, tool_registry: ToolRegistry): self.llm = llm_client self.tools = tool_registry async def run(self, task: str) -> str: """执行任务,返回最终结果""" state = AgentState(task=task) # 初始化系统提示词 state.messages.append({ "role": "system", "content": self._build_system_prompt(), }) state.messages.append({ "role": "user", "content": task, }) while state.step_count < state.max_steps: state.step_count += 1 # 推理:让 LLM 决定下一步动作 action = await self._decide_action(state) if action.kind == ActionKind.FINAL_ANSWER: return action.answer or "" if action.kind == ActionKind.TOOL_CALL: # 行动:执行工具调用 result = await self._execute_tool(action) # 观察:将工具结果加入上下文 state.tool_results.append({ "step": state.step_count, "tool": action.tool_name, "args": action.tool_args, "result": result, }) state.messages.append({ "role": "assistant", "content": f"调用工具 {action.tool_name},参数: {json.dumps(action.tool_args, ensure_ascii=False)}", }) state.messages.append({ "role": "user", "content": f"工具返回结果: {json.dumps(result, ensure_ascii=False)}", }) return "任务执行超时:达到最大步骤数限制" async def _decide_action(self, state: AgentState) -> AgentAction: """让 LLM 决定下一步动作""" response = await self.llm.chat( messages=state.messages, tools=self.tools.get_all_schemas(), tool_choice="auto", ) # 检查是否有工具调用 if response.tool_calls: call = response.tool_calls[0] return AgentAction( kind=ActionKind.TOOL_CALL, tool_name=call.function.name, tool_args=json.loads(call.function.arguments), reasoning=response.content or "", ) # 无工具调用,视为最终回答 return AgentAction( kind=ActionKind.FINAL_ANSWER, answer=response.content or "", reasoning="", ) async def _execute_tool(self, action: AgentAction) -> Any: """执行工具调用,捕获异常""" try: return await self.tools.execute(action.tool_name, action.tool_args or {}) except Exception as e: # 工具执行失败,返回错误信息而非抛异常 return {"error": str(e)} def _build_system_prompt(self) -> str: return """你是一个任务执行 Agent。根据用户任务,逐步推理并调用工具完成。 规则: 1. 每次只调用一个工具 2. 根据工具返回结果决定下一步 3. 如果工具返回错误,分析原因并尝试其他方案 4. 任务完成后给出最终答案 5. 不要编造工具返回结果"""3.3 多 Agent 协作:层级编排
@dataclass class SubTask: """子任务定义""" name: str description: str agent_role: str # 执行该任务的 Agent 角色 dependencies: list[str] = field(default_factory=list) # 依赖的子任务 input_from: dict[str, str] = field(default_factory=dict) # 从上游任务获取输入 class MultiAgentOrchestrator: """多 Agent 编排器:支持串行、并行和层级任务编排""" def __init__(self, agents: dict[str, ReActAgent]): self.agents = agents async def execute_pipeline( self, tasks: list[SubTask], initial_context: dict, ) -> dict: """执行任务流水线,自动处理依赖关系""" completed: dict[str, Any] = {} results: dict[str, Any] = {} # 拓扑排序:按依赖关系确定执行顺序 execution_order = self._topological_sort(tasks) for task_name in execution_order: task = next(t for t in tasks if t.name == task_name) agent = self.agents.get(task.agent_role) if not agent: raise ValueError(f"未注册的 Agent 角色: {task.agent_role}") # 构建子任务输入:从上游结果中提取 task_input = self._build_task_input(task, initial_context, results) # 执行子任务 result = await agent.run(task_input) results[task.name] = result completed[task.name] = True return results def _topological_sort(self, tasks: list[SubTask]) -> list[str]: """拓扑排序:确保被依赖的任务先执行""" in_degree: dict[str, int] = {t.name: 0 for t in tasks} graph: dict[str, list[str]] = {t.name: [] for t in tasks} for task in tasks: for dep in task.dependencies: graph[dep].append(task.name) in_degree[task.name] += 1 queue = [name for name, deg in in_degree.items() if deg == 0] order = [] while queue: current = queue.pop(0) order.append(current) for neighbor in graph[current]: in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) if len(order) != len(tasks): raise ValueError("任务依赖关系存在环,无法排序") return order def _build_task_input( self, task: SubTask, initial_context: dict, completed_results: dict, ) -> str: """构建子任务输入,从上游结果中提取所需数据""" parts = [f"任务: {task.description}"] for param_name, source_path in task.input_from.items(): # source_path 格式: "task_name.field_name" source_task, field = source_path.split(".", 1) if source_task in completed_results: parts.append(f"输入参数 {param_name}: {completed_results[source_task]}") if initial_context: parts.append(f"初始上下文: {json.dumps(initial_context, ensure_ascii=False)}") return "\n".join(parts)四、Agent 编排的架构权衡与边界
ReAct 循环的可靠性问题:
- 无限循环:Agent 可能反复调用同一工具而不收敛。必须设置
max_steps硬限制 - 幻觉工具调用:LLM 可能生成不存在的工具名或参数。需要在执行前校验
- 上下文窗口溢出:多轮工具调用后,对话历史超出模型上下文窗口。需要摘要压缩
多 Agent 协作的通信开销:
- 串行编排延迟叠加:3 个子任务各需 10 秒,总耗时 30 秒
- 并行编排需要无依赖前提:有依赖关系的子任务无法并行
- Agent 间信息传递有损:子 Agent 的输出被压缩为文本传给下游,结构化信息可能丢失
工具调用的安全边界:
- LLM 生成的参数可能包含注入攻击(如 SQL 注入、命令注入)
- 工具执行必须有权限隔离:只允许访问必要的资源
- 敏感操作(删除、支付)需要人工确认环节
禁用场景:
- 确定性任务:规则明确、逻辑固定的任务,用传统代码实现更可靠
- 实时性要求高的任务:LLM 推理延迟 1-5 秒,不适合毫秒级响应
- 安全敏感任务:金融交易、医疗诊断等场景,Agent 的不确定性不可接受
五、总结
AI Agent 编排的核心框架:ReAct 循环(推理-行动-观察)解决单 Agent 的任务执行问题,层级编排(分解-委派-汇总)解决多 Agent 的协作问题。工具注册表提供标准化的工具定义和执行接口,AgentState 维护多步骤执行的状态和上下文。ReAct 循环必须设置最大步骤数防止无限循环,工具调用需要参数校验和权限隔离。多 Agent 编排通过拓扑排序自动处理任务依赖,但串行编排的延迟叠加和 Agent 间信息传递的有损性是架构瓶颈。Agent 编排适用于步骤不确定、需要工具调用的复杂任务,确定性和实时性要求高的场景应使用传统代码实现。