news 2026/6/10 18:22:39

AgentScope深入学习-Pipeline与消息

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
AgentScope深入学习-Pipeline与消息

协调的艺术:Pipeline 与消息系统核心解析

请关注公众号【碳硅化合物AI】

摘要

多智能体系统的核心是协调。AgentScope 通过 Pipeline 和消息系统实现了优雅的多智能体编排。本文将深入分析 MsgHub、Pipeline 模式以及消息系统的设计。你会发现,消息(Msg)不仅是数据载体,更是整个框架的统一接口;MsgHub 通过订阅机制实现了自动消息广播;Pipeline 提供了多种编排模式,让多智能体协作变得简单而强大。通过阅读本文,你会理解多智能体如何通过消息进行通信,如何通过 Pipeline 进行编排,以及这些机制背后的设计考量。

入口类与类关系

消息系统的类层次

消息系统非常简单,但设计精妙:

Pipeline 系统的类层次

Pipeline 提供了多种编排模式:

关键代码:Msg 数据结构

Msg 是框架的核心数据结构:

class Msg: """The message class in agentscope.""" def __init__( self, name: str, content: str | Sequence[ContentBlock], role: Literal["user", "assistant", "system"], metadata: dict[str, JSONSerializableObject] | None = None, timestamp: str | None = None, invocation_id: str | None = None, ) -> None: self.name = name self.content = content self.role = role self.metadata = metadata self.id = shortuuid.uuid() self.timestamp = timestamp or datetime.now().strftime(...)

这个设计非常巧妙:

  • 统一接口:所有智能体间通信都使用 Msg
  • 多模态支持:content 可以是字符串或 ContentBlock 列表
  • 元数据支持:metadata 字段可以存储结构化输出等额外信息
  • 自动标识:每个消息都有唯一的 id 和 timestamp

关键流程分析

消息创建和传递流程

消息在智能体间的传递非常直接:

多智能体对话流程

使用 MsgHub 的多智能体对话流程:

Pipeline 执行流程

顺序 Pipeline 的执行流程:

关键技术点

1. 消息作为统一数据结构的设计

Msg 类在 AgentScope 中扮演着核心角色,它是:

  • 智能体间通信的媒介:所有智能体都通过 Msg 交换信息
  • 与 LLM API 的桥梁:Formatter 将 Msg 转换为 API 所需格式
  • 记忆存储的单元:Memory 存储的是 Msg 对象列表
  • UI 显示的数据源:前端可以直接显示 Msg 对象

这种统一设计避免了数据格式转换的复杂性。无论消息来自哪里、要去哪里,都是同一个数据结构。

2. 多模态消息块系统

Msg 支持多模态内容,通过 ContentBlock 系统实现:

class TextBlock(TypedDict, total=False): type: Required[Literal["text"]] text: str class ToolUseBlock(TypedDict, total=False): type: Required[Literal["tool_use"]] id: Required[str] name: Required[str] input: Required[dict[str, object]] class ToolResultBlock(TypedDict, total=False): type: Required[Literal["tool_result"]] id: Required[str] name: Required[str] output: Required[str | List[ContentBlock]] class ImageBlock(TypedDict, total=False): type: Required[Literal["image"]] source: Required[Base64Source | URLSource] class AudioBlock(TypedDict, total=False): type: Required[Literal["audio"]] source: Required[Base64Source | URLSource] ContentBlock = ( ToolUseBlock | ToolResultBlock | TextBlock | ThinkingBlock | ImageBlock | AudioBlock | VideoBlock )

这种设计让消息可以包含:

  • 文本内容
  • 工具调用和结果
  • 图像、音频、视频等多媒体内容
  • 思考过程(thinking block)

所有内容都统一在一个消息对象中,非常灵活。

3. 消息中心(MsgHub)的设计

MsgHub 通过订阅机制实现自动消息广播。关键代码:

def _reset_subscriber(self) -> None: """Reset the subscriber for agent in `self.participant`""" if self.enable_auto_broadcast: for agent in self.participants: agent.reset_subscribers(self.name, self.participants)

当智能体在 MsgHub 中回复时,消息会自动广播给其他参与者。这个机制通过 AgentBase 的订阅系统实现:

# 在 AgentBase 中defreset_subscribers(self,hub_name:str,subscribers:list[AgentBase])->None:"""设置订阅者,当智能体回复时,消息会自动发送给订阅者"""self._subscribers[hub_name]=subscribers

这种设计让多智能体对话变得非常简单:

asyncwithMsgHub([agent1,agent2,agent3]):awaitagent1()# agent2 和 agent3 自动收到消息awaitagent2()# agent1 和 agent3 自动收到消息awaitagent3()# agent1 和 agent2 自动收到消息

4. 不同 Pipeline 模式的实现

AgentScope 提供了多种 Pipeline 模式:

顺序 Pipeline(SequentialPipeline)

async def sequential_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None = None, ) -> Msg | list[Msg] | None: """执行智能体序列,前一个的输出作为下一个的输入""" for agent in agents: msg = await agent(msg) return msg

扇出 Pipeline(FanoutPipeline)

async def fanout_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None = None, enable_gather: bool = True, ) -> list[Msg]: """将同一个输入分发给多个智能体""" if enable_gather: # 并发执行 tasks = [asyncio.create_task(agent(deepcopy(msg))) for agent in agents] return await asyncio.gather(*tasks) else: # 顺序执行 return [await agent(deepcopy(msg)) for agent in agents]

这些 Pipeline 模式让多智能体编排变得非常灵活。你可以:

  • 顺序执行:前一个智能体的输出作为下一个的输入(适合流水线场景)
  • 并发执行:多个智能体同时处理同一个输入(适合并行分析场景)
  • 动态编排:在 MsgHub 中动态添加或删除参与者

5. 消息的序列化和反序列化

Msg 支持完整的序列化:

def to_dict(self) -> dict: """Convert the message into JSON dict data.""" return { "id": self.id, "name": self.name, "role": self.role, "content": self.content, "metadata": self.metadata, "timestamp": self.timestamp, } @classmethod def from_dict(cls, json_data: dict) -> "Msg": """Load a message object from the given JSON data.""" new_obj = cls( name=json_data["name"], content=json_data["content"], role=json_data["role"], metadata=json_data.get("metadata", None), timestamp=json_data.get("timestamp", None), invocation_id=json_data.get("invocation_id", None), ) new_obj.id = json_data.get("id", new_obj.id) return new_obj

这种设计让消息可以:

  • 保存到文件或数据库
  • 通过网络传输
  • 在不同进程间共享
  • 用于调试和日志记录

总结

Pipeline 和消息系统是 AgentScope 框架中实现多智能体协调的核心:

  1. 消息系统:通过统一的 Msg 数据结构,实现了智能体间通信、API 交互、记忆存储的统一接口
  2. MsgHub:通过订阅机制实现了自动消息广播,让多智能体对话变得简单优雅
  3. Pipeline:提供了多种编排模式,支持顺序、并发、动态等多种场景

这些系统的设计都体现了 AgentScope 的核心理念:透明、模块化、易用。在下一篇文章中,我们会分析扩展机制的实现,了解如何为框架添加新功能。


版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 16:03:11

19、微软 Edge 浏览器使用指南

微软 Edge 浏览器使用指南 在当今数字化时代,浏览器是我们访问互联网的重要工具。微软 Edge 浏览器以其快速的浏览速度和简洁的界面设计,成为了许多用户的选择。本文将详细介绍微软 Edge 浏览器的使用方法,帮助你更好地利用它在互联网上畅游。 1. 打开微软 Edge 浏览器 要…

作者头像 李华
网站建设 2026/6/9 21:25:43

24、Windows 10 个性化设置与硬件配置全攻略

Windows 10 个性化设置与硬件配置全攻略 1. 多显示器设置 在使用多显示器时,你可以按照以下步骤进行设置: 1. 拖动屏幕上的显示器图标,使其与实际显示器的摆放位置相匹配。 2. 点击应显示开始按钮的屏幕显示器,然后选中“将此显示器设为主显示器”复选框。 3. 根据需要…

作者头像 李华
网站建设 2026/6/5 6:49:52

25、让Windows远离故障的实用指南

让Windows远离故障的实用指南 1. 创建还原点 虽然Windows正在向更新的刷新系统转变,但老派的系统还原爱好者仍然可以创建和使用可靠的Windows还原点,将电脑恢复到状态良好的时间点。还原点就像一个时间胶囊,能保存电脑在特定时间的设置。如果这些设置后来损坏,恢复到早期的…

作者头像 李华
网站建设 2026/6/10 17:01:20

27、Windows 10 用户管理与网络连接全攻略

Windows 10 用户管理与网络连接全攻略 一、用户账户管理 1.1 删除账户 删除账户需谨慎,因为删除某人的账户会一并删除其所有文件。若选择删除,建议同时选择“保留文件”选项,该选项会将此人的所有文件存至桌面的一个文件夹中。 1.2 管理其他账户 若要管理其他账户,需先…

作者头像 李华
网站建设 2026/6/10 12:43:53

29、Windows系统音乐播放指南

Windows系统音乐播放指南 在Windows系统中,我们有多种方式来播放和管理音乐。下面将详细介绍Groove音乐应用和Windows Media Player的使用方法、特点以及相关操作步骤。 使用Groove音乐应用播放音乐 Groove音乐应用是Windows系统中一款适合现代年轻人的音乐播放工具。它仅能…

作者头像 李华
网站建设 2026/6/10 10:01:13

33、Windows 10常见问题解决指南

Windows 10常见问题解决指南 在使用Windows 10系统的过程中,我们难免会遇到各种各样的问题,如系统故障、文件丢失、应用程序出错等。本文将为你详细介绍一些常见问题的解决方法,帮助你轻松应对这些困扰。 系统重置后的操作 当你重置计算机后,需要进行以下操作: 1. 重…

作者头像 李华