当装饰器遇上 async:如何写出同时兼容同步与异步的 Python 装饰器
以统一打点 SDK 为例,从 Flask 到 FastAPI,一器两用
一、问题从何而来?
一个常见但容易被低估的工程场景:你的团队维护一套统一监控打点 SDK,需要在函数执行前后记录耗时、埋入 trace_id、上报异常。早期服务跑在Flask(同步 WSGI),后来新项目迁移到FastAPI(异步 ASGI)。
于是问题来了——
# 同步环境:Flask@monitordeffetch_user(user_id):returndb.query(User).get(user_id)# 异步环境:FastAPI@monitorasyncdeffetch_user(user_id):returnawaitdb.fetch_user(user_id)同一个@monitor,能同时正确地装饰这两类函数吗?
直觉上可以,实际上会翻车。
普通装饰器拿到 async 函数后,返回值会变成一个协程对象而非执行结果,导致下游调用链断裂。反过来,用asyncio.iscoroutinefunction手动分支虽然可行,但写法冗余、容易遗漏。下面,我们从原理出发,逐步构建一个健壮的双模装饰器。
二、先看清对手:同步与异步的本质差异
在动手写代码之前,有必要把两种调用模型捋清楚。
| 维度 | 同步函数 | 异步函数 |
|---|---|---|
| 关键字 | def | async def |
| 调用方式 | result = func() | result = await func() |
| 返回值 | 直接结果 | 协程对象(coroutine) |
| 阻塞行为 | 直接阻塞线程 | 让出控制权给事件循环 |
| 检测方式 | asyncio.iscoroutinefunction(func) | 同左,返回True |
关键矛盾:装饰器本质是wrapper(*args, **kwargs),而wrapper本身的同步/异步属性决定了它能否正确调度被装饰函数。
如果wrapper是同步的:
- ✅ 装饰同步函数 → 正常
- ❌ 装饰异步函数 → 返回 coroutine,不会真正执行
如果wrapper是异步的:
- ❌ 装饰同步函数 → 调用方不会
await,报错 - ✅ 装饰异步函数 → 正常
所以核心思路是:运行时判断被装饰函数的类型,动态选择 wrapper 的同步/异步版本。
三、第一版:判断分流——最朴素的双模方案
先给出一个直觉式的实现,虽然能跑,但有明显的"代码味"问题:
importtimeimportasyncioimportfunctoolsimportlogging logger=logging.getLogger(__name__)defmonitor_v1(func):"""第一版:if/else 分流"""ifasyncio.iscoroutinefunction(func):@functools.wraps(func)asyncdefasync_wrapper(*args,**kwargs):start=time.perf_counter()try:result=awaitfunc(*args,**kwargs)elapsed=time.perf_counter()-start logger.info(f"[ASYNC]{func.__name__}耗时{elapsed:.4f}s")returnresultexceptExceptionase:elapsed=time.perf_counter()-start logger.error(f"[ASYNC]{func.__name__}异常{e},耗时{elapsed:.4f}s")raisereturnasync_wrapperelse:@functools.wraps(func)defsync_wrapper(*args,**kwargs):start=time.perf_counter()try:result=func(*args,**kwargs)elapsed=time.perf_counter()-start logger.info(f"[SYNC]{func.__name__}耗时{elapsed:.4f}s")returnresultexceptExceptionase:elapsed=time.perf_counter()-start logger.error(f"[SYNC]{func.__name__}异常{e},耗时{elapsed:.4f}s")raisereturnsync_wrapper能用,但痛点明显:
- 同步/异步的逻辑几乎完全重复,只是
await的有无 - 一旦打点逻辑变复杂(加 trace_id、上报 metric),两处都要改
- 违反 DRY 原则
四、第二版:利用inspect与asyncio优雅合并
核心改进思路:把重复的"计时 + 异常捕获"逻辑抽成独立函数,wrapper 只负责调度。
importtimeimportasyncioimportfunctoolsimportinspectimportloggingfromtypingimportCallable,TypeVar,ParamSpec logger=logging.getLogger(__name__)P=ParamSpec('P')T=TypeVar('T')defmonitor(func:Callable[P,T])->Callable[P,T]:""" 双模装饰器:同时支持同步函数和异步函数。 设计原则: 1. 运行时判断被装饰函数类型 2. 通用逻辑提取,避免代码重复 3. 完整保留原函数签名(functools.wraps) """def_log_execution(func_name:str,elapsed:float,error:Exception=None):"""统一的打点逻辑,同步/异步共用"""iferror:logger.error(f"⛔{func_name}异常:{error},耗时{elapsed:.4f}s")else:logger.info(f"✅{func_name}执行完成,耗时{elapsed:.4f}s")ifasyncio.iscoroutinefunction(func):@functools.wraps(func)asyncdefasync_wrapper(*args,**kwargs):start=time.perf_counter()try:result=awaitfunc(*args,**kwargs)returnresultexceptExceptionase:_log_execution(func.__name__,time.perf_counter()-start,e)raiseelse:_log_execution(func.__name__,time.perf_counter()-start)returnasync_wrapperelse:@functools.wraps(func)defsync_wrapper(*args,**kwargs):start=time.perf_counter()try:result=func(*args,**kwargs)returnresultexceptExceptionase:_log_execution(func.__name__,time.perf_counter()-start,e)raiseelse:_log_execution(func.__name__,time.perf_counter()-start)returnsync_wrapper比第一版好一些,但async_wrapper和sync_wrapper里仍然有try/except/else的重复结构。
五、第三版:终极方案——带参数的双模装饰器工厂
实际 SDK 场景中,装饰器往往需要参数(如上报的 service 名、采样率等)。我们一并解决:
""" monitor_sdk.py — 统一打点 SDK 核心模块 同时兼容 Flask(同步 WSGI)和 FastAPI(异步 ASGI) """importtimeimportasyncioimportfunctoolsimportinspectimportloggingimportuuidfromtypingimportCallable,Any,Optional,TypeVar,ParamSpec logger=logging.getLogger("monitor_sdk")P=ParamSpec('P')T=TypeVar('T')classExecutionContext:"""执行上下文:封装一次调用的全部打点信息"""def__init__(self,func_name:str,service:str):self.func_name=func_name self.service=service self.trace_id=str(uuid.uuid4())[:8]self.start_time:float=0self.elapsed:float=0self.success:bool=Trueself.error:Optional[Exception]=Nonedefstart(self):self.start_time=time.perf_counter()deffinish(self,error:Exception=None):self.elapsed=time.perf_counter()-self.start_timeiferror:self.success=Falseself.error=errordefreport(self):"""统一上报逻辑(示例:写日志;实际可替换为 Prometheus、StatsD 等)"""status="✅ 成功"ifself.successelsef"⛔ 失败({type(self.error).__name__})"logger.info(f"[{self.service}]{self.func_name}| "f"trace_id={self.trace_id}| "f"status={status}| "f"elapsed={self.elapsed:.4f}s")defmonitor(_func:Optional[Callable]=None,*,service:str="default",log_args:bool=False,):""" 双模打点装饰器 —— 同时支持同步与异步函数。 参数: _func: 被装饰的函数(支持 @monitor 和 @monitor(service="xx") 两种语法) service: 服务标识,用于区分不同微服务的打点 log_args: 是否记录函数入参(调试时开启) 使用方式: @monitor # 最简用法 @monitor(service="user-service") # 带参数 @monitor(service="pay", log_args=True) # 记录入参 """defdecorator(func:Callable[P,T])->Callable[P,T]:is_async=asyncio.iscoroutinefunction(func)mode="async"ifis_asyncelse"sync"def_build_context(*args,**kwargs)->ExecutionContext:"""构建执行上下文"""ctx=ExecutionContext(func.__name__,service)iflog_args:logger.debug(f"[{service}]{func.__name__}调用参数: "f"args={args}, kwargs={kwargs}")returnctxifis_async:@functools.wraps(func)asyncdefasync_wrapper(*args:P.args,**kwargs:P.kwargs)->T:ctx=_build_context(*args,**kwargs)ctx.start()try:result=awaitfunc(*args,**kwargs)returnresultexceptExceptionase:ctx.finish(error=e)raiseelse:ctx.finish()finally:ctx.report()returnasync_wrapperelse:@functools.wraps(func)defsync_wrapper(*args:P.args,**kwargs:P.kwargs)->T:ctx=_build_context(*args,**kwargs)ctx.start()try:result=func(*args,**kwargs)returnresultexceptExceptionase:ctx.finish(error=e)raiseelse:ctx.finish()finally:ctx.report()returnsync_wrapper# 支持 @monitor 和 @monitor() 两种调用方式if_funcisnotNone:returndecorator(_func)returndecorator六、完整实战:Flask + FastAPI 统一接入
6.1 Flask 同步服务
# app_flask.pyfromflaskimportFlask,jsonifyfrommonitor_sdkimportmonitor app=Flask(__name__)@app.route("/user/<int:user_id>")@monitor(service="user-service",log_args=True)defget_user(user_id:int):"""模拟同步数据库查询"""importtime time.sleep(0.1)# 模拟 I/Oreturnjsonify({"user_id":user_id,"name":"张三"})@app.route("/order/<int:order_id>")@monitor(service="order-service")defget_order(order_id:int):"""模拟同步逻辑异常"""iforder_id<0:raiseValueError("订单ID不能为负数")returnjsonify({"order_id":order_id,"status":"shipped"})if__name__=="__main__":app.run(port=5000)6.2 FastAPI 异步服务
# app_fastapi.pyimportasynciofromfastapiimportFastAPI,HTTPExceptionfrommonitor_sdkimportmonitor app=FastAPI()@app.get("/user/{user_id}")@monitor(service="user-service",log_args=True)asyncdefget_user(user_id:int):"""模拟异步数据库查询"""awaitasyncio.sleep(0.1)# 模拟异步 I/Oreturn{"user_id":user_id,"name":"李四"}@app.get("/order/{order_id}")@monitor(service="order-service")asyncdefget_order(order_id:int):"""模拟异步逻辑异常"""iforder_id<0:raiseValueError("订单ID不能为负数")awaitasyncio.sleep(0.05)return{"order_id":order_id,"status":"processing"}# 同一个项目里也可以有同步端点@app.get("/health")@monitor(service="infra")defhealth_check():"""同步健康检查——同一个装饰器,零改动"""return{"status":"ok"}注意:health_check是同步函数,但 FastAPI 允许同步端点(会在线程池中执行),我们的@monitor自动识别并适配。
七、关键陷阱与避坑指南
❌ 陷阱一:忘记functools.wraps
# 错误示范defmonitor(func):asyncdefwrapper(*args,**kwargs):# 没有 @functools.wraps...returnwrapper后果:func.__name__变成wrapper,路由注册失败(Flask/FastAPI 依赖函数名区分路由),调试时日志全是wrapper。
❌ 陷阱二:在同步 wrapper 中await
# 编译错误defsync_wrapper(*args,**kwargs):result=awaitfunc(*args,**kwargs)# SyntaxError!这就是为什么必须在定义时就确定 wrapper 的同步/异步类型,而不是运行时混用。
❌ 陷阱三:inspect.iscoroutinefunctionvsasyncio.iscoroutinefunction
两者对大部分情况一致,但存在细微差异:
importinspectimportasyncioasyncdeff():passprint(inspect.iscoroutinefunction(f))# Trueprint(asyncio.iscoroutinefunction(f))# True推荐使用asyncio.iscoroutinefunction,因为它是asyncio模块的"官方"检测方式,与事件循环的语义更一致。注意:对于functools.partial包装过的函数,两者都可能返回False,需要额外处理。
❌ 陷阱四:装饰器叠加顺序
# 错误顺序:@app.get 在外层时,monitor 接收的是路由返回值,不是函数@app.get("/user/{user_id}")@monitor(service="user-service")asyncdefget_user(user_id:int):...实际上面的顺序是正确的——@monitor先执行,装饰函数后传给@app.get。关键是@monitor不能出现在框架装饰器之上。
✅ 避坑小结
| 陷阱 | 解决方案 |
|---|---|
| 函数签名丢失 | 始终使用@functools.wraps(func) |
| 同步/异步混用 | 定义时分支,不运行时判断 |
| partial 函数检测失败 | 用inspect.unwrap()递归解包 |
| 装饰器顺序 | 框架路由装饰器在最外层 |
| 异常不应被吞 | try/finally中report,异常原样raise |
八、进阶:让装饰器支持类方法和静态方法
实际 SDK 中,装饰器可能装饰类方法。需要额外注意self参数的处理:
classUserService:@monitor(service="user-service")asyncdefget_user(self,user_id:int):awaitasyncio.sleep(0.1)return{"user_id":user_id}上述第三版代码已经兼容——因为*args会自然捕获self,无需额外处理。唯一需要注意的是log_args=True时,日志中会包含self对象,可能产生大量无用输出。可以在_build_context中过滤:
def_build_context(*args,**kwargs)->ExecutionContext:ctx=ExecutionContext(func.__name__,service)iflog_args:# 过滤掉 self/cls 参数filtered_args=[aforainargsifnot(hasattr(a,'__class__')andhasattr(a,'__dict__'))]logger.debug(f"调用参数: args={filtered_args}, kwargs={kwargs}")returnctx九、性能思考:分支判断的开销
有人担心asyncio.iscoroutinefunction的判断有运行时开销。实际上这个判断发生在装饰时(decorator 应用时),而非每次调用时:
@monitor(service="svc")asyncdeffoo():pass# ↑ 此时 asyncio.iscoroutinefunction(foo) 已经执行完毕# ↓ 后续每次调用 foo() 走的都是固定的 async_wrapper,零分支开销这正是装饰器模式的优势:一次判断,永久绑定。比每次调用时if inspect.iscoroutine(result)的方案高效得多。
十、总结
回顾整个设计思路:
- 问题本质:同步/异步函数的调用协议不同,wrapper 必须匹配
- 核心策略:装饰时(而非运行时)判断函数类型,选择对应 wrapper
- 代码复用:通过
ExecutionContext类抽取公共逻辑,避免双倍维护 - 工程适配:支持
@monitor和@monitor()两种语法,兼容类方法、静态方法 - 避坑要点:
functools.wraps、装饰器顺序、异常不吞、partial 兼容
一句话总结:不要试图写一个"既能 await 又能不 await"的 wrapper——而是在装饰时就决定好它的命运。这既是 Python 类型系统的约束,也是它的优雅之处。
最后分享一个实际项目中的经验:统一打点 SDK 的价值不仅在于监控本身,更在于它强制团队建立一致的可观测性习惯。当同一个@monitor能无缝跑在 Flask、FastAPI、甚至命令行脚本中时,你获得的不只是一行装饰器代码,而是一套贯穿整个技术栈的标准化实践。
如果你在实际接入中遇到functools.partial兼容性、与contextvars联动传递 trace_id、或装饰器与pydantic验证冲突等问题,欢迎在评论区交流——这些话题每一个都值得单独展开。📌