小李今天又遇到了烦心事。
他写了一个数据处理脚本,要调用外部API获取一万个用户的信息。每个请求大概要等2秒。他不想干等着,所以用了asyncio,并发发出去50个请求。
跑了三分钟,脚本卡住了。
不是死锁,就是单纯的慢——有几个API服务不稳定,响应要半分钟。他想:“能不能设定一个超时时间,比如5秒,超过5秒就不等了,直接跳过?”
这个需求太常见了。不管是网络请求、数据库查询还是复杂的计算任务,你总不希望它无限期地卡下去。就像点外卖,等了45分钟还没到,你肯定想取消订单,换一家点。
问题在于,Python的异步超时机制,在3.11之前有一个不大不小的“坑”,如果你不注意,任务可能根本取消不掉。
一个“取消失败”的真实案例
先来看一段代码,它在Python 3.10上运行:
import asyncio async def quick_task(): # 这个任务瞬间完成 return "done" async def wrapper(): # 设置30秒超时,但实际任务1秒就完成 return await asyncio.wait_for(quick_task(), timeout=30) async def main(): task = asyncio.create_task(wrapper()) await asyncio.sleep(0) # 让任务开始执行 task.cancel() # 手动取消 try: await task except asyncio.CancelledError: print("任务被取消了") else: print("任务没有被取消!") asyncio.run(main())在Python 3.8上运行,输出的是“任务被取消了”。但在Python 3.9或3.10上运行,输出的却是“任务没有被取消!”
什么情况?明明调用了cancel(),为什么任务没被取消?
真相藏在wait_for的源码里
要理解这个问题,得看看asyncio.wait_for这个函数到底在干什么。
wait_for的作用是:给一个任务设置一个超时时间,如果超时了就取消它。但这里有一个细节——如果任务在超时之前就已经完成了,而在完成的那一瞬间你恰好发起了取消请求,wait_for会怎么处理?
Python 3.8的做法是:不管任务完没完成,只要收到了取消信号,就抛出一个CancelledError。
Python 3.9及之后的做法是:先检查一下任务是不是已经完成了。如果已经完成了,“取消”就没有意义了,直接返回任务的结果,不抛异常。
听起来3.9之后的逻辑更合理对吧?毕竟任务都做完了,还取消什么?
但在并发环境下,问题就出在这个“先检查”上。
想象一下这个时序:
任务在
wait_for里等着你调用
task.cancel()wait_for收到取消信号它问:任务完成了吗?如果回答“完成了”,它就直接返回结果,把取消信号吞掉
这个“任务完成了吗”的判断,在多线程/多任务的并发环境下,会因为时序问题出现误判。任务实际上还没有真正完成,只是处于“即将完成”的状态,wait_for就可能把它当成“已完成”,从而无视你的取消请求。
这不是bug,而是设计上的一种取舍。但这个取舍导致了一个后果:在Python 3.11以下的版本中,取消操作并不是100%可靠的。
那怎么办?三个靠谱的解决方案
既然wait_for靠不住,那就自己动手。
方案一:自己封装一个“靠谱版的wait_for”
思路很简单:不依赖wait_for的取消机制,而是自己用asyncio.create_task()加一个超时的“看门狗”。
import asyncio async def cancellable_wait_for(coro, timeout): """ 一个更可靠的wait_for版本,确保取消信号不会被吞掉 """ task = asyncio.create_task(coro) try: # 等待任务完成,或者超时 return await asyncio.wait_for(task, timeout=timeout) except asyncio.TimeoutError: # 超时了,取消任务 task.cancel() try: await task except asyncio.CancelledError: # 确保取消信号被传播出去 pass raise # 重新抛出TimeoutError except asyncio.CancelledError: # 外部取消了,把取消信号传递给内部任务 task.cancel() try: await task except asyncio.CancelledError: pass raise # 重新抛出CancelledError async def my_task(): try: await asyncio.sleep(10) return "完成" except asyncio.CancelledError: print("内部任务被取消了") raise async def main(): task = asyncio.create_task(cancellable_wait_for(my_task(), timeout=5)) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print("外部:确实被取消了") asyncio.run(main())这个方案的要点是:不管什么情况,只要外部取消或者超时,都强制取消内部任务,然后把取消信号往上抛。不会被“任务已完成”这种假象迷惑。
方案二:用第三方库quattro的CancelScope
Python 3.11之后,官方推出了asyncio.timeout(),终于有了靠谱的超时机制。那3.11以下怎么办?
有一个第三方库叫quattro,它在3.11以下版本中实现了类似的功能。
# pip install quattro import asyncio from quattro import move_on_after async def main(): with move_on_after(5) as scope: # 5秒超时 result = await some_slow_operation() print(result) if scope.cancelled_caught: print("超时被取消了,但程序继续运行") asyncio.run(main())quattro的CancelScope比官方的更灵活:它是普通的上下文管理器(不需要async with),可以手动调用scope.cancel()提前取消,还可以查询是否被取消了。
如果你在项目中需要同时支持Python 3.9、3.10和3.11,quattro是个不错的选择。代码写一遍,在所有版本上行为一致。
方案三:最底层的做法——自己用Event和超时循环
如果不想引入第三方库,也嫌自己封装太麻烦,还有一个最朴素的办法:不用wait_for,自己用asyncio.Event加上循环检查。
import asyncio async def cancellable_operation(timeout): """ 在操作内部主动检查超时和取消信号 """ start = asyncio.get_event_loop().time() # 假设这是一系列的小步骤 for step in range(10): # 检查是否超时 if asyncio.get_event_loop().time() - start > timeout: raise asyncio.TimeoutError() # 检查是否被取消(通过捕获取消信号) try: await asyncio.sleep(0.5) # 模拟一个小步骤 except asyncio.CancelledError: # 做清理工作 print("收到取消信号,正在清理...") raise # 重新抛出,让上层知道被取消了 print(f"完成步骤 {step}") return "全部完成" async def main(): task = asyncio.create_task(cancellable_operation(timeout=3)) await asyncio.sleep(2) task.cancel() try: result = await task print(result) except asyncio.TimeoutError: print("超时了") except asyncio.CancelledError: print("被取消了") asyncio.run(main())这个方案的优点是:你完全掌控取消逻辑。缺点是:你得在操作内部主动插入检查点。如果你的操作是一大块无法分割的同步代码,这个方法就不太适用了。
那同步代码怎么办?线程池里怎么取消?
上面聊的都是异步代码(async/await)。但小李的脚本里,requests.get()是同步的,它不响应asyncio的取消信号。
这种情况下,task.cancel()根本没用。因为取消信号只在await的地方才能被处理,而同步代码里没有await。
解决方案是:用loop.run_in_executor把同步代码扔到线程池里,然后用一个threading.Event来手动控制停止。
import asyncio import threading import time def blocking_task(stop_event): """ 这是一个会阻塞的同步函数 它定期检查stop_event,如果被设置了就主动退出 """ print("同步任务开始") for i in range(30): if stop_event.is_set(): print("收到停止信号,主动退出") return "被停止了" print(f"工作中... {i}") time.sleep(1) # 模拟耗时操作 return "正常完成" async def main(): stop_event = threading.Event() loop = asyncio.get_running_loop() # 把同步任务扔到线程池里执行 task = loop.run_in_executor(None, blocking_task, stop_event) # 5秒后发送停止信号 await asyncio.sleep(5) stop_event.set() # 等待任务结束 result = await task print(f"结果: {result}") asyncio.run(main())关键点:同步任务本身必须主动检查stop_event,不能指望Python帮你“强行”取消。强行杀线程在Python里是不安全的,也不被推荐。
那如果用多进程呢?
如果你面对的是CPU密集型的任务,线程也帮不了你——Python的GIL锁会让多线程在计算任务上毫无优势。这时候考虑用ProcessPoolExecutor:
import asyncio from concurrent.futures import ProcessPoolExecutor import time def cpu_intensive_task(): """模拟一个费CPU的大活儿""" total = 0 for i in range(100000000): total += i # 每1000万次检查一次,但进程间通信复杂,先忽略细节 if i % 10000000 == 0: print(f"计算到 {i}") return total async def main(): loop = asyncio.get_running_loop() with ProcessPoolExecutor(max_workers=1) as pool: # 在子进程中执行 task = loop.run_in_executor(pool, cpu_intensive_task) # 等待3秒 await asyncio.sleep(3) print("3秒到了,但子进程不会自己停...") # 注意:ProcessPoolExecutor没有优雅的取消方法 # 只能shutdown(wait=False)但会有warning # 或者terminate,但不推荐多进程的取消更麻烦。进程不像线程,你不能优雅地通知它“停下来”。常见的做法是:在子进程里也放一个检查循环,通过进程间通信(比如multiprocessing.Event或队列)来传递停止信号。
总结一下,到底选哪个方案?
给你一张决策表:
| 你的场景 | 推荐方案 |
|---|---|
| 纯异步代码,Python 3.11+ | 直接用官方asyncio.timeout() |
| 纯异步代码,Python 3.10及以下 | 用quattro的CancelScope,或自己封装cancellable_wait_for |
| 同步阻塞代码(如requests) | 线程池 +threading.Event手动轮询检查 |
| CPU密集型代码 | 多进程 + 进程间通信信号,或用concurrent.futures的超时(有限支持) |
| 不想改现有代码,只想加个保护 | 用asyncio.wait_for,但要接受它可能偶尔失效 |
小李最后选择了方案一:自己封装了一个cancellable_wait_for,用了半小时写完,以后所有异步任务都走这个函数,再也没出现过“取消不掉”的情况。
他说了一句大实话:“官方的不靠谱,就自己写一个。反正常用的功能就那么几个,封装一次到处用,不亏。”