8.5.5 API接口路由
文件routes.py是本项目的API路由模块,基于FastAPI构建,定义了与人脸识别相关的核心接口。通过初始化人脸检测服务、模型注册服务和模型仓库,实现了多模型管理与调用,例如“/models”接口返回可用模型列表,“/infer”接口接收图像并返回检测到的人脸边界框及特征嵌入向量,“/infer_visualize”接口生成带有人脸标记框的可视化图像并支持下载,“/download”接口用于获取生成的可视化图像。同时集成了线程池处理CPU密集型任务(如人脸检测、特征提取),并通过OpenTelemetry实现接口调用追踪,确保异步高效处理的同时便于监控和调试。
(1)下面代码的功能是初始化 FastAPI 路由器和相关服务。它定义了允许的图像文件扩展名,并初始化了人脸检测服务、模型注册服务以及模型仓库。同时,配置了线程池用于异步执行耗时操作,并初始化了 OpenTelemetry 追踪器,以便对请求进行追踪和监控。
router = APIRouter() # 允许的图像文件扩展名 ALLOWED_EXTENSIONS = (".jpg", ".jpeg", ".png") # 初始化服务 face_detector_service = FaceDetectionService() model_registry_service = ModelRegistryService() # 从模型注册服务中加载所有模型,构建模型仓库 models_repository = {name: FaceModel(meta["path"]) for name, meta in model_registry_service.get_all_models().items()} # 线程池配置 thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=min(32, os.cpu_count() * 4)) # 初始化追踪器 tracer = trace.get_tracer(__name__)(2)下面代码的功能是定义了一个获取所有可用模型名称列表的 API 接口。它使用 OpenTelemetry 追踪器记录接口的执行过程,并返回模型仓库中所有模型的名称。
@router.get("/models") async def get_models(): with tracer.start_as_current_span("get_models"): # 返回所有可用模型的名称列表 return {"available_models": list(models_repository.keys())}(3)下面代码的功能是定义了一个进行人脸嵌入推理的 API 接口。它接收模型名称、图像格式和图像文件,使用 OpenTelemetry 追踪器记录整个推理过程。它读取并解码图像文件,运行人脸检测,在线程池中并发处理检测到的人脸,并返回包含人脸嵌入向量的结果。
@router.post("/infer") async def infer(model_name: str = Form(...), image_format: str = Form(...), file: UploadFile = File(...)): with tracer.start_as_current_span("infer") as span: # 设置追踪属性 span.set_attribute("model_name", model_name) span.set_attribute("image_format", image_format) span.set_attribute("file_name", file.filename) try: with tracer.start_as_current_span("read_file"): # 读取图像文件字节数据 image_bytes = await file.read() with tracer.start_as_current_span("decode_image"): # 解码图像字节数据为OpenCV格式 image = decode_image(image_bytes, image_format) # 在线程中运行人脸检测,避免阻塞事件循环 with tracer.start_as_current_span("detect_faces"): faces = await run_in_threadpool(face_detector_service.detect_faces, image) span.set_attribute("faces_detected", len(faces)) # 记录检测到的人脸数量 # 获取指定模型 model = models_repository.get(model_name) if not model: span.set_status(Status(StatusCode.ERROR, "模型未找到")) raise HTTPException(status_code=404, detail="模型未找到") # 并发处理所有检测到的人脸 with tracer.start_as_current_span("process_faces"): tasks = [process_face(face, image, model, model_name) for face in faces] results = await asyncio.gather(*tasks) return JSONResponse({"faces": results}) except Exception as e: span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e) raise(4)下面代码的功能是定义了一个进行人脸嵌入推理并生成可视化结果的 API 接口。它接收模型名称、图像格式和图像文件,使用 OpenTelemetry 追踪器记录整个过程。它读取并解码图像文件,运行人脸检测,并行处理人脸生成可视化结果,然后在后台保存可视化图像,并返回保存路径。
@router.post("/infer_visualize") async def infer_visualize(model_name: str = Form(...), image_format: str = Form(...), file: UploadFile = File(...), background_tasks: BackgroundTasks = None): with tracer.start_as_current_span("infer_visualize") as span: # 设置追踪属性 span.set_attribute("model_name", model_name) span.set_attribute("image_format", image_format) span.set_attribute("file_name", file.filename) try: with tracer.start_as_current_span("read_file"): # 读取图像文件字节数据 image_bytes = await file.read() with tracer.start_as_current_span("decode_image"): # 解码图像字节数据为OpenCV格式 image = decode_image(image_bytes, image_format) # 在线程中运行人脸检测 with tracer.start_as_current_span("detect_faces"): faces = await run_in_threadpool(face_detector_service.detect_faces, image) span.set_attribute("faces_detected", len(faces)) # 记录检测到的人脸数量 # 获取指定模型 model = models_repository.get(model_name) if not model: span.set_status(Status(StatusCode.ERROR, "模型未找到")) raise HTTPException(status_code=404, detail="模型未找到") # 并行处理人脸并生成可视化结果 with tracer.start_as_current_span("process_and_visualize"): image_pil, results = await process_and_visualize_faces(faces, image, model, model_name) # 在后台保存图像,以更快地返回响应 with tracer.start_as_current_span("save_image"): saved_image_path = await run_in_threadpool(save_visualized_image, image_pil) # TODO: 添加S3/对象存储支持 # 可选:在后台清理旧的临时文件 # if background_tasks: # background_tasks.add_task(cleanup_old_temp_files) return JSONResponse({"message": "图像保存成功", "saved_image_path": saved_image_path}) except Exception as e: span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e) raise(5)下面代码的功能是定义了一个下载图像文件的 API 接口。它验证文件扩展名是否允许,并检查文件是否存在。如果文件存在,它返回文件响应;否则,返回错误信息。整个过程使用 OpenTelemetry 追踪器进行监控。
@router.get("/download/") async def download_image(file_path: str = Query(..., description=f"要下载的图像文件路径(允许的扩展名:{', '.join(ALLOWED_EXTENSIONS)})")): with tracer.start_as_current_span("download_image") as span: span.set_attribute("file_path", file_path) try: # 验证文件扩展名 if not any(file_path.lower().endswith(ext) for ext in ALLOWED_EXTENSIONS): span.set_status(Status(StatusCode.ERROR, "无效的文件扩展名")) raise HTTPException(status_code=400, detail="无效的文件扩展名") # 在线程中检查文件是否存在,避免阻塞 file_exists = await run_in_threadpool(os.path.isfile, file_path) if not file_exists: span.set_status(Status(StatusCode.ERROR, "文件未找到")) raise HTTPException(status_code=404, detail="文件未找到。") # 返回文件响应 return FileResponse(path=file_path, filename=os.path.basename(file_path), media_type="image/" + file_path.lower().split('.')[-1]) except Exception as e: span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e) raise(6)下面代码的功能是定义了一个辅助函数,用于在指定的线程池中运行同步函数,避免阻塞 FastAPI 的事件循环。这使得耗时的同步操作可以在后台线程中执行,从而提高应用的响应性能。
async def run_in_threadpool(func, *args, **kwargs): """在线程池中运行同步函数,避免阻塞事件循环""" return await asyncio.get_event_loop().run_in_executor( thread_pool, partial(func, *args, **kwargs) )(7)下面代码的功能是定义了一个异步函数,用于在线程中处理单个人脸。它生成唯一的人脸 ID,提取人脸区域,并在线程池中预处理人脸和获取人脸嵌入向量。整个过程使用 OpenTelemetry 追踪器进行监控,并返回人脸的嵌入向量和边界框信息。
async def process_face(face, image, model, model_name): """在线程中处理单个人脸(用于CPU密集型操作)""" with tracer.start_as_current_span("process_face") as span: face_id = str(uuid.uuid4()) # 生成唯一人脸ID span.set_attribute("face_id", face_id) x1, y1, x2, y2 = map(int, face) # 人脸边界框坐标 span.set_attribute("bbox", f"{x1},{y1},{x2},{y2}") # 记录边界框信息 # 提取人脸区域 face_region = image[y1:y2, x1:x2] # 在线程中预处理人脸 with tracer.start_as_current_span("preprocess_face"): preprocessed_face = await run_in_threadpool(preprocess_face, face_region, model_name=model_name) # 在线程中获取人脸嵌入向量 with tracer.start_as_current_span("get_embedding"): embedding = await run_in_threadpool(model.get_embedding, preprocessed_face) return {"face_id": face_id, "bbox": [x1, y1, x2, y2], "embedding": embedding.tolist()}(8)下面代码的功能是定义了一个异步函数,用于并行处理多个人脸,并生成可视化结果。它批量处理人脸,然后将 OpenCV 图像转换为 PIL 格式,并在图像上绘制人脸边界框和 ID。整个过程使用 OpenTelemetry 追踪器进行监控。
async def process_and_visualize_faces(faces, image, model, model_name): """并行处理多个人脸,然后生成可视化结果""" with tracer.start_as_current_span("process_and_visualize_faces") as span: span.set_attribute("num_faces", len(faces)) # 记录人脸数量 with tracer.start_as_current_span("process_faces_batch"): # 批量处理人脸 tasks = [process_face(face, image, model, model_name) for face in faces] results = await asyncio.gather(*tasks) # 转换图像并绘制边界框 - 在线程中处理更快 with tracer.start_as_current_span("visualize_faces"): def visualize(): with tracer.start_as_current_span("visualize_inner"): # 将OpenCV图像转换为PIL格式(BGR转RGB) image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(image_pil) # 创建绘图对象 font = ImageFont.load_default() # 加载默认字体 # 为每个人脸绘制边界框和ID for result in results: x1, y1, x2, y2 = result['bbox'] draw.rectangle([(x1, y1), (x2, y2)], outline="green", width=2) # 绘制绿色边界框 draw.text((x1, y1 - 10), result["face_id"][:8], fill="yellow", font=font) # 绘制人脸ID return image_pil # 在线程中执行可视化绘制 image_pil = await run_in_threadpool(visualize) return image_pil, results(9)下面代码的功能是定义了一个函数,用于将可视化图像保存到临时文件。它确保临时目录存在,生成临时文件名,并保存图像。整个过程使用 OpenTelemetry 追踪器进行监控,并返回保存的图像路径。
def save_visualized_image(image): """将可视化图像保存到临时文件""" with tracer.start_as_current_span("save_visualized_image") as span: temp_dir = "temp" # 确保临时目录存在 if not os.path.exists(temp_dir): os.makedirs(temp_dir) # 生成临时文件名 temp_filename = f"temp_{uuid.uuid4().hex[:8]}.jpg" saved_image_path = os.path.join(temp_dir, temp_filename) span.set_attribute("saved_image_path", saved_image_path) # 记录保存路径 # 保存图像 image.save(saved_image_path) return saved_image_path(10)下面代码的功能是定义了一个异步函数,用于清理超过指定时长的临时文件。它遍历临时目录中的文件,检查文件的最后修改时间,并删除超过最大保留时长的文件。整个过程使用 OpenTelemetry 追踪器进行监控,并记录删除的文件数量。
async def cleanup_old_temp_files(max_age_hours=1): """清理超过指定小时数的临时文件""" with tracer.start_as_current_span("cleanup_old_temp_files") as span: span.set_attribute("max_age_hours", max_age_hours) # 记录最大保留时长 temp_dir = "temp" if not os.path.exists(temp_dir): return def _cleanup(): with tracer.start_as_current_span("_cleanup_inner"): now = time.time() files_removed = 0 # 记录删除的文件数量 # 遍历临时目录中的文件 for filename in os.listdir(temp_dir): file_path = os.path.join(temp_dir, filename) if os.path.isfile(file_path): # 检查文件是否超过最大保留时长 if now - os.path.getmtime(file_path) > max_age_hours * 3600: try: os.remove(file_path) # 删除文件 files_removed += 1 except Exception as e: pass # 忽略删除错误 return files_removed # 在线程中执行清理操作 files_removed = await run_in_threadpool(_cleanup) span.set_attribute("files_removed", files_removed) # 记录删除的文件数量8.5.6 主程序
文件main.py是本项目的入口程序,基于FastAPI构建了整个Web应用的核心框架。它初始化了FastAPI应用,配置了遥测(追踪)功能、跨域资源共享(CORS)中间件和请求处理时间跟踪中间件,集成了API路由模块以提供人脸检测相关接口,并自定义了OpenAPI文档输出。同时,通过OpenTelemetry对FastAPI进行instrumentation以增强可观测性,最终使用uvicorn作为服务器运行应用,支持热重载,为整个项目提供了基础运行环境和核心服务入口。
import time import uvicorn from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.openapi.utils import get_openapi from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from requests import Request from routes import router as api_router from telemetry_utils import setup_telemetry # 初始化FastAPI应用,设置标题和版本 app = FastAPI(title="FaceTron MCP Server", version="1.0.0") # 默认启用遥测设置 tracer = setup_telemetry() # CORS(跨域资源共享)设置 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有来源的请求 allow_credentials=True, # 允许携带认证信息 allow_methods=["*"], # 允许所有HTTP方法 allow_headers=["*"], # 允许所有请求头 ) # 添加中间件以跟踪请求处理时间 @app.middleware("http") async def add_process_time_header(request: Request, call_next): start_time = time.time() # 记录请求开始时间 response = await call_next(request) # 处理请求并获取响应 process_time = time.time() - start_time # 计算处理时间 response.headers["X-Process-Time"] = str(process_time) # 在响应头中添加处理时间 return response # 集成API路由 app.include_router(api_router, tags=["face-detection"]) # 根路径接口 @app.get("/") def root(): return {"message": "FaceTron MCP Server running"} # 自定义OpenAPI文档输出 @app.get("/openapi.json", include_in_schema=False) def custom_openapi(): return get_openapi( title=app.title, version=app.version, routes=app.routes, ) # 为FastAPI应用添加OpenTelemetry instrumentation,增强可观测性 FastAPIInstrumentor.instrument_app(app) # 应用入口 if __name__ == "__main__": # 使用uvicorn运行应用,监听所有网络接口的8000端口,启用热重载 uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)8.5.7 MCP服务设计
到此为止,本项目的核心功能介绍完毕。本项目始终围绕MCP的核心需求,设计并实现了一系列功能,使其具备符合MCP规范的模型服务化能力,这主要体现在以下几个方面。
1. 标准化模型注册与发现
通过ModelRegistryService自动扫描并注册模型目录中的ONNX模型,生成包含模型名称、路径、输入输出信息的元数据。同时,提供/models API接口暴露所有可用模型列表,使外部系统(如MCP代理)能便捷查询和发现可调用的模型,实现MCP要求的模型注册表功能。
2. 统一模型调用接口
定义/infer和/infer_visualize等标准化接口,通过model_name参数指定模型,接收统一格式的图像输入(文件+格式参数),返回结构化的输出(人脸边界框、特征嵌入向量、可视化结果等)。这种设计符合MCP对模型调用接口标准化的要求,确保不同模型能通过一致的方式被调用,降低集成复杂度。
3. MCP兼容的元数据规范
项目通过/openapi.json接口生成符合OpenAPI v3.1.0规范的文档,其中包含模型信息、接口定义、请求响应格式等元数据,且这些元数据严格遵循MCP协议对服务描述的要求,便于MCP代理或其他遵循该协议的系统自动解析和对接。
4. 多模型协同与扩展支持
采用动态加载机制支持多模型并行运行,可根据需求灵活添加新的ONNX模型(无需修改核心代码),并通过模块化架构预留了对TensorFlow、PyTorch等其他框架模型的扩展空间,满足MCP协议中多模型协同和可扩展性的要求。
总之,本项目通过模型注册发现、标准化接口、元数据规范及扩展设计,全面实现了MCP协议要求的核心功能,使其能作为标准化的模型服务节点,无缝集成到遵循MCP协议的模型生态系统中。