news 2026/4/25 6:08:02

(10-6-01)基于MCP实现的多智能体协同系统:应用整合模块(1)为MCP服务的路由配置

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
(10-6-01)基于MCP实现的多智能体协同系统:应用整合模块(1)为MCP服务的路由配置

10.6 应用整合模块

在本项目中,“agent_mcp/app”目录应用于实现应用程序的核心逻辑与流程管理,承载着应用的主要功能实现,包括应用的初始化、路由配置以处理各类请求,以及服务生命周期的管理等,为整个项目的应用层提供基础架构和核心业务逻辑支持,是应用运行和功能交互的核心场所。

10.6.1 为MCP服务的路由配置

文件agent_mcp/app/routes.py是本项目中负责API路由配置的核心文件,与MCP系统的前端交互和数据流转紧密关联。它定义了一系列API 端点,涵盖系统状态查询、代理与任务数据管理、图形化数据展示、节点详情获取、内存(上下文)增删改查等功能,通过调用MCP核心模块(如认证、数据库操作、工具实现等)处理请求,实现了前端与后端的数据交互,为MCP系统的 dashboard 展示、代理管理、任务管控等核心功能提供了接口支撑,是MCP系统中连接前端界面与后端业务逻辑的关键桥梁。

(1)下面代码的功能是提供系统状态查询API接口。原理是处理HTTP请求,验证请求方法,通过调用数据库操作函数获取活跃代理和任务数据,统计不同状态的任务数量,生成包含服务器运行状态、代理数量、任务数量及最后更新时间的JSON响应,为前端提供系统整体状态信息。

async def simple_status_api_route(request: Request) -> JSONResponse: # 处理CORS预请求的OPTIONS方法 if request.method == 'OPTIONS': return await handle_options(request) try: # 获取系统状态 from ..db.actions.agent_db import get_all_active_agents_from_db from ..db.actions.task_db import get_all_tasks_from_db agents = get_all_active_agents_from_db() # 从数据库获取所有活跃代理 tasks = get_all_tasks_from_db() # 从数据库获取所有任务 # 统计任务状态数量 pending_tasks = len([t for t in tasks if t.get('status') == 'pending']) # 待处理任务数 completed_tasks = len([t for t in tasks if t.get('status') == 'completed']) # 已完成任务数 return JSONResponse({ "server_running": True, # 服务器运行状态 "total_agents": len(agents), # 总代理数 "active_agents": len([a for a in agents if a.get('status') == 'active']), # 活跃代理数 "total_tasks": len(tasks), # 总任务数 "pending_tasks": pending_tasks, # 待处理任务数 "completed_tasks": completed_tasks, # 已完成任务数 "last_updated": datetime.datetime.now().isoformat() # 最后更新时间 }) except Exception as e: logger.error(f"simple_status_api_route 错误: {e}", exc_info=True) return JSONResponse({"error": f"获取简单状态失败: {str(e)}"}, status_code=500)

(2)下面代码的功能是提供节点详情查询API接口。原理是接收请求中的节点ID参数,根据节点ID解析节点类型,通过数据库查询获取对应类型节点(代理、任务、上下文、文件、管理员)的详细数据、相关操作记录和关联信息,生成JSON响应返回给前端,支持前端展示节点的详细信息。

async def node_details_api_route(request: Request) -> JSONResponse: node_id = request.query_params.get('node_id') # 从请求参数获取节点ID if not node_id: return JSONResponse({'error': '缺少 node_id 参数'}, status_code=400) # 初始化节点详情字典 details: Dict[str, Any] = {'id': node_id, 'type': 'unknown', 'data': {}, 'actions': [], 'related': {}} conn = None try: conn = get_db_connection() cursor = conn.cursor() # 从节点ID解析节点类型和实际ID parts = node_id.split('_', 1) node_type_from_id = parts[0] if len(parts) > 1 else node_id actual_id_from_node = parts[1] if len(parts) > 1 else (node_id if node_type_from_id != 'admin' else 'admin') details['type'] = node_type_from_id # 根据节点类型查询详情 if node_type_from_id == 'agent': # 代理节点 cursor.execute("SELECT * FROM agents WHERE agent_id = ?", (actual_id_from_node,)) row = cursor.fetchone() if row: details['data'] = dict(row) # 查询代理相关操作记录 cursor.execute("SELECT timestamp, action_type, task_id, details FROM agent_actions WHERE agent_id = ? ORDER BY timestamp DESC LIMIT 10", (actual_id_from_node,)) details['actions'] = [dict(r) for r in cursor.fetchall()] # 查询代理分配的任务 cursor.execute("SELECT task_id, title, status, priority FROM tasks WHERE assigned_to = ? ORDER BY created_at DESC LIMIT 10", (actual_id_from_node,)) details['related']['assigned_tasks'] = [dict(r) for r in cursor.fetchall()] # 其他节点类型(任务、上下文、文件、管理员)的查询逻辑类似... if not details.get('data') and node_type_from_id not in ['admin']: return JSONResponse({'error': '节点数据未找到或类型不识别'}, status_code=404) except Exception as e: logger.error(f"获取节点 {node_id} 详情错误: {e}", exc_info=True) return JSONResponse({'error': f'获取节点详情失败: {str(e)}'}, status_code=500) finally: if conn: conn.close() # 确保数据库连接关闭 return JSONResponse(details)

(3)下面代码的功能是提供创建代理的API接口。原理是处理POST请求,验证管理员身份,解析请求中的代理ID、能力等参数,调用代理创建工具实现函数创建代理,解析工具返回结果,生成包含创建状态和代理令牌的JSON响应,支持通过前端界面创建新代理。

async def create_agent_dashboard_api_route(request: Request) -> JSONResponse: """用于创建代理的仪表盘API端点,内部调用管理员工具。""" if request.method != 'POST': return JSONResponse({"error": "方法不允许"}, status_code=405) try: data = await get_sanitized_json_body(request) # 获取并验证请求体JSON数据 admin_auth_token = data.get("token") # 管理员认证令牌 agent_id = data.get("agent_id") # 要创建的代理ID capabilities = data.get("capabilities", []) # 代理能力(可选) working_directory = data.get("working_directory") # 工作目录(可选) # 验证管理员身份 if not verify_token(admin_auth_token, "admin"): return JSONResponse({"message": "未授权: API调用的管理员令牌无效"}, status_code=401) if not agent_id: # 检查代理ID是否存在 return JSONResponse({"message": "代理ID为必需项"}, status_code=400) # 准备工具函数的参数 tool_args = { "token": admin_auth_token, # 工具实现会再次验证此令牌 "agent_id": agent_id, "capabilities": capabilities, "working_directory": working_directory } # 调用已重构的工具实现函数 result_list: List[mcp_types.TextContent] = await create_agent_tool_impl(tool_args) # 处理工具返回结果,构建JSON响应 if result_list and result_list[0].text.startswith(f"代理 '{agent_id}' 创建成功。"): # 从结果中提取代理令牌(用于仪表盘显示) agent_token_from_result = None for line in result_list[0].text.split('\n'): if line.startswith("令牌: "): agent_token_from_result = line.split("令牌: ", 1)[1] break return JSONResponse({ "message": f"代理 '{agent_id}' 通过仪表盘API创建成功。", "agent_token": agent_token_from_result # 可能为None(如果解析失败) }) else: # 返回工具返回的错误信息 error_message = result_list[0].text if result_list else "创建代理时发生未知错误。" status_code = 400 # 默认错误状态码 if "未授权" in error_message: status_code = 401 if "已存在" in error_message: status_code = 409 # 冲突 return JSONResponse({"message": error_message}, status_code=status_code) except ValueError as e_val: # 来自get_sanitized_json_body的错误 return JSONResponse({"message": str(e_val)}, status_code=400) except Exception as e: logger.error(f"create_agent_dashboard_api_route 错误: {e}", exc_info=True) return JSONResponse({"message": f"通过仪表盘API创建代理时出错: {str(e)}"}, status_code=500)

(4)下面代码的功能是提供全量数据查询API接口。原理是处理GET 请求,连接数据库,查询所有代理(含令牌)、任务、上下文条目、最近操作记录和文件元数据,整合全局文件映射和管理员令牌等信息,生成包含所有关键数据的JSON响应,支持前端缓存数据以减少请求次数。

async def all_data_api_route(request: Request) -> JSONResponse: """一次调用获取所有数据,用于前端缓存""" if request.method == 'OPTIONS': return await handle_options(request) conn = None try: conn = get_db_connection() cursor = conn.cursor() # 获取所有代理及其令牌 cursor.execute("SELECT * FROM agents ORDER BY created_at DESC") agents_data = [] for row in cursor.fetchall(): agent_dict = dict(row) agent_id = agent_dict['agent_id'] # 从活跃代理中查找该代理的令牌 agent_token = None for token, data in g.active_agents.items(): if data.get("agent_id") == agent_id and data.get("status") != "terminated": agent_token = token break agent_dict['auth_token'] = agent_token agents_data.append(agent_dict) # 添加管理员作为特殊代理 agents_data.insert(0, { 'agent_id': 'Admin', 'status': 'system', 'auth_token': g.admin_token, 'created_at': 'N/A', 'current_task': 'N/A' }) # 获取所有任务 cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC") tasks_data = [dict(row) for row in cursor.fetchall()] # 获取所有上下文条目 cursor.execute("SELECT * FROM project_context ORDER BY last_updated DESC") context_data = [dict(row) for row in cursor.fetchall()] # 获取最近的代理操作(最近100条) cursor.execute(""" SELECT * FROM agent_actions ORDER BY timestamp DESC LIMIT 100 """) actions_data = [dict(row) for row in cursor.fetchall()] # 获取文件元数据 cursor.execute("SELECT * FROM file_metadata") file_metadata = [dict(row) for row in cursor.fetchall()] # 构建响应数据 response_data = { "agents": agents_data, "tasks": tasks_data, "context": context_data, "actions": actions_data, "file_metadata": file_metadata, "file_map": g.file_map, "admin_token": g.admin_token, "timestamp": datetime.datetime.now().isoformat() } return JSONResponse( response_data, headers={ # 设置CORS头 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, OPTIONS', 'Access-Control-Allow-Headers': 'Content-Type' } ) except Exception as e: logger.error(f"获取所有数据错误: {e}", exc_info=True) return JSONResponse({"error": f"获取所有数据失败: {str(e)}"}, status_code=500) finally: if conn: conn.close() # 确保数据库连接关闭
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/24 16:05:00

(10-5)基于MCP实现的多智能体协同系统:框架核心功能模块

10.5 框架核心功能模块“agent_mcp/core”目录是本项目的核心功能模块,实现了项目关键基础功能的实现与管理。负责处理代理身份验证,确保系统访问安全;进行项目配置管理,集中管控各类配置参数;还涉及全局层面的相关操…

作者头像 李华
网站建设 2026/4/23 17:41:22

猫抓浏览器扩展:一键解锁网页视频下载新境界

还在为心仪的视频无法保存而苦恼?猫抓浏览器扩展正是你需要的终极解决方案!这款智能工具能精准捕捉网页中的各类媒体资源,无论是普通MP4还是复杂的M3U8流媒体,都能轻松实现网页视频下载需求。🎬 【免费下载链接】cat-c…

作者头像 李华
网站建设 2026/4/21 15:51:34

NCM文件解密全攻略:解锁网易云音乐加密音频

还在为网易云音乐下载的NCM格式文件无法在第三方播放器使用而烦恼吗?ncmdump工具能够快速处理NCM加密格式,让你轻松实现音频文件的跨平台播放。本文将为您详细介绍这款专业解密工具的使用方法和技巧。 【免费下载链接】ncmdump 项目地址: https://git…

作者头像 李华
网站建设 2026/4/18 15:01:28

哔哩下载姬:专业级B站视频下载解决方案

哔哩下载姬:专业级B站视频下载解决方案 【免费下载链接】downkyi 哔哩下载姬downkyi,哔哩哔哩网站视频下载工具,支持批量下载,支持8K、HDR、杜比视界,提供工具箱(音视频提取、去水印等)。 项目…

作者头像 李华
网站建设 2026/4/24 9:34:57

Iwara视频下载工具:3分钟实现高效批量下载

Iwara视频下载工具:3分钟实现高效批量下载 【免费下载链接】IwaraDownloadTool Iwara 下载工具 | Iwara Downloader 项目地址: https://gitcode.com/gh_mirrors/iw/IwaraDownloadTool 还在为Iwara视频下载效率低下而苦恼吗?每次都需要手动保存单个…

作者头像 李华
网站建设 2026/4/23 15:44:30

Android应用国际化实战:从零到一的完整解决方案

Android应用国际化实战:从零到一的完整解决方案 【免费下载链接】WeChatLuckyMoney :money_with_wings: WeChats lucky money helper (微信抢红包插件) by Zhongyi Tong. An Android app that helps you snatch red packets in WeChat groups. 项目地址: https:/…

作者头像 李华