news 2026/4/24 17:09:21

那些“卡死”的任务:Python 3.11 以下如何优雅地实现自动取消?

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
那些“卡死”的任务:Python 3.11 以下如何优雅地实现自动取消?

小李今天又遇到了烦心事。

他写了一个数据处理脚本,要调用外部API获取一万个用户的信息。每个请求大概要等2秒。他不想干等着,所以用了asyncio,并发发出去50个请求。

跑了三分钟,脚本卡住了。

不是死锁,就是单纯的慢——有几个API服务不稳定,响应要半分钟。他想:“能不能设定一个超时时间,比如5秒,超过5秒就不等了,直接跳过?”

这个需求太常见了。不管是网络请求、数据库查询还是复杂的计算任务,你总不希望它无限期地卡下去。就像点外卖,等了45分钟还没到,你肯定想取消订单,换一家点。

问题在于,Python的异步超时机制,在3.11之前有一个不大不小的“坑”,如果你不注意,任务可能根本取消不掉。

一个“取消失败”的真实案例

先来看一段代码,它在Python 3.10上运行:

import asyncio async def quick_task(): # 这个任务瞬间完成 return "done" async def wrapper(): # 设置30秒超时,但实际任务1秒就完成 return await asyncio.wait_for(quick_task(), timeout=30) async def main(): task = asyncio.create_task(wrapper()) await asyncio.sleep(0) # 让任务开始执行 task.cancel() # 手动取消 try: await task except asyncio.CancelledError: print("任务被取消了") else: print("任务没有被取消!") asyncio.run(main())

在Python 3.8上运行,输出的是“任务被取消了”。但在Python 3.9或3.10上运行,输出的却是“任务没有被取消!”

什么情况?明明调用了cancel(),为什么任务没被取消?

真相藏在wait_for的源码里

要理解这个问题,得看看asyncio.wait_for这个函数到底在干什么。

wait_for的作用是:给一个任务设置一个超时时间,如果超时了就取消它。但这里有一个细节——如果任务在超时之前就已经完成了,而在完成的那一瞬间你恰好发起了取消请求,wait_for会怎么处理?

Python 3.8的做法是:不管任务完没完成,只要收到了取消信号,就抛出一个CancelledError

Python 3.9及之后的做法是:先检查一下任务是不是已经完成了。如果已经完成了,“取消”就没有意义了,直接返回任务的结果,不抛异常。

听起来3.9之后的逻辑更合理对吧?毕竟任务都做完了,还取消什么?

但在并发环境下,问题就出在这个“先检查”上。

想象一下这个时序:

  1. 任务在wait_for里等着

  2. 你调用task.cancel()

  3. wait_for收到取消信号

  4. 它问:任务完成了吗?如果回答“完成了”,它就直接返回结果,把取消信号吞掉

这个“任务完成了吗”的判断,在多线程/多任务的并发环境下,会因为时序问题出现误判。任务实际上还没有真正完成,只是处于“即将完成”的状态,wait_for就可能把它当成“已完成”,从而无视你的取消请求。

这不是bug,而是设计上的一种取舍。但这个取舍导致了一个后果:在Python 3.11以下的版本中,取消操作并不是100%可靠的

那怎么办?三个靠谱的解决方案

既然wait_for靠不住,那就自己动手。

方案一:自己封装一个“靠谱版的wait_for”

思路很简单:不依赖wait_for的取消机制,而是自己用asyncio.create_task()加一个超时的“看门狗”。

import asyncio async def cancellable_wait_for(coro, timeout): """ 一个更可靠的wait_for版本,确保取消信号不会被吞掉 """ task = asyncio.create_task(coro) try: # 等待任务完成,或者超时 return await asyncio.wait_for(task, timeout=timeout) except asyncio.TimeoutError: # 超时了,取消任务 task.cancel() try: await task except asyncio.CancelledError: # 确保取消信号被传播出去 pass raise # 重新抛出TimeoutError except asyncio.CancelledError: # 外部取消了,把取消信号传递给内部任务 task.cancel() try: await task except asyncio.CancelledError: pass raise # 重新抛出CancelledError async def my_task(): try: await asyncio.sleep(10) return "完成" except asyncio.CancelledError: print("内部任务被取消了") raise async def main(): task = asyncio.create_task(cancellable_wait_for(my_task(), timeout=5)) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print("外部:确实被取消了") asyncio.run(main())

这个方案的要点是:不管什么情况,只要外部取消或者超时,都强制取消内部任务,然后把取消信号往上抛。不会被“任务已完成”这种假象迷惑。

方案二:用第三方库quattro的CancelScope

Python 3.11之后,官方推出了asyncio.timeout(),终于有了靠谱的超时机制。那3.11以下怎么办?

有一个第三方库叫quattro,它在3.11以下版本中实现了类似的功能。

# pip install quattro import asyncio from quattro import move_on_after async def main(): with move_on_after(5) as scope: # 5秒超时 result = await some_slow_operation() print(result) if scope.cancelled_caught: print("超时被取消了,但程序继续运行") asyncio.run(main())

quattroCancelScope比官方的更灵活:它是普通的上下文管理器(不需要async with),可以手动调用scope.cancel()提前取消,还可以查询是否被取消了。

如果你在项目中需要同时支持Python 3.9、3.10和3.11,quattro是个不错的选择。代码写一遍,在所有版本上行为一致。

方案三:最底层的做法——自己用Event和超时循环

如果不想引入第三方库,也嫌自己封装太麻烦,还有一个最朴素的办法:不用wait_for,自己用asyncio.Event加上循环检查。

import asyncio async def cancellable_operation(timeout): """ 在操作内部主动检查超时和取消信号 """ start = asyncio.get_event_loop().time() # 假设这是一系列的小步骤 for step in range(10): # 检查是否超时 if asyncio.get_event_loop().time() - start > timeout: raise asyncio.TimeoutError() # 检查是否被取消(通过捕获取消信号) try: await asyncio.sleep(0.5) # 模拟一个小步骤 except asyncio.CancelledError: # 做清理工作 print("收到取消信号,正在清理...") raise # 重新抛出,让上层知道被取消了 print(f"完成步骤 {step}") return "全部完成" async def main(): task = asyncio.create_task(cancellable_operation(timeout=3)) await asyncio.sleep(2) task.cancel() try: result = await task print(result) except asyncio.TimeoutError: print("超时了") except asyncio.CancelledError: print("被取消了") asyncio.run(main())

这个方案的优点是:你完全掌控取消逻辑。缺点是:你得在操作内部主动插入检查点。如果你的操作是一大块无法分割的同步代码,这个方法就不太适用了。

那同步代码怎么办?线程池里怎么取消?

上面聊的都是异步代码(async/await)。但小李的脚本里,requests.get()是同步的,它不响应asyncio的取消信号。

这种情况下,task.cancel()根本没用。因为取消信号只在await的地方才能被处理,而同步代码里没有await

解决方案是:用loop.run_in_executor把同步代码扔到线程池里,然后用一个threading.Event来手动控制停止。

import asyncio import threading import time def blocking_task(stop_event): """ 这是一个会阻塞的同步函数 它定期检查stop_event,如果被设置了就主动退出 """ print("同步任务开始") for i in range(30): if stop_event.is_set(): print("收到停止信号,主动退出") return "被停止了" print(f"工作中... {i}") time.sleep(1) # 模拟耗时操作 return "正常完成" async def main(): stop_event = threading.Event() loop = asyncio.get_running_loop() # 把同步任务扔到线程池里执行 task = loop.run_in_executor(None, blocking_task, stop_event) # 5秒后发送停止信号 await asyncio.sleep(5) stop_event.set() # 等待任务结束 result = await task print(f"结果: {result}") asyncio.run(main())

关键点:同步任务本身必须主动检查stop_event,不能指望Python帮你“强行”取消。强行杀线程在Python里是不安全的,也不被推荐。

那如果用多进程呢?

如果你面对的是CPU密集型的任务,线程也帮不了你——Python的GIL锁会让多线程在计算任务上毫无优势。这时候考虑用ProcessPoolExecutor

import asyncio from concurrent.futures import ProcessPoolExecutor import time def cpu_intensive_task(): """模拟一个费CPU的大活儿""" total = 0 for i in range(100000000): total += i # 每1000万次检查一次,但进程间通信复杂,先忽略细节 if i % 10000000 == 0: print(f"计算到 {i}") return total async def main(): loop = asyncio.get_running_loop() with ProcessPoolExecutor(max_workers=1) as pool: # 在子进程中执行 task = loop.run_in_executor(pool, cpu_intensive_task) # 等待3秒 await asyncio.sleep(3) print("3秒到了,但子进程不会自己停...") # 注意:ProcessPoolExecutor没有优雅的取消方法 # 只能shutdown(wait=False)但会有warning # 或者terminate,但不推荐

多进程的取消更麻烦。进程不像线程,你不能优雅地通知它“停下来”。常见的做法是:在子进程里也放一个检查循环,通过进程间通信(比如multiprocessing.Event或队列)来传递停止信号。

总结一下,到底选哪个方案?

给你一张决策表:

你的场景推荐方案
纯异步代码,Python 3.11+直接用官方asyncio.timeout()
纯异步代码,Python 3.10及以下quattroCancelScope,或自己封装cancellable_wait_for
同步阻塞代码(如requests)线程池 +threading.Event手动轮询检查
CPU密集型代码多进程 + 进程间通信信号,或用concurrent.futures的超时(有限支持)
不想改现有代码,只想加个保护asyncio.wait_for,但要接受它可能偶尔失效

小李最后选择了方案一:自己封装了一个cancellable_wait_for,用了半小时写完,以后所有异步任务都走这个函数,再也没出现过“取消不掉”的情况。

他说了一句大实话:“官方的不靠谱,就自己写一个。反正常用的功能就那么几个,封装一次到处用,不亏。”

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/24 17:09:19

大型集团管控数智化:一场关乎企业生死存亡的系统工程(PPT)

本文约9000字,深度拆解大型集团企业管控数智化解决方案的核心逻辑、架构设计与落地要点,适合集团CIO、CFO、信息化主管、数字化转型负责人及相关咨询从业者深度阅读。写在前面:集团管控的本质是什么? 做了这么多年企业数字化咨询&…

作者头像 李华
网站建设 2026/4/24 17:07:27

PyTorch实战:从零构建CNN模型实现MNIST分类

1. 项目概述在计算机视觉领域,卷积神经网络(CNN)已经成为图像识别任务的事实标准。作为一名长期使用PyTorch框架的开发者,我想分享如何从零开始构建一个完整的CNN模型。不同于直接调用预训练模型,自己搭建网络能让你真正理解每个卷积层、池化…

作者头像 李华
网站建设 2026/4/24 17:05:56

RWKV-7 (1.5B World)企业应用案例:本地化多语言技术支持坐席辅助系统

RWKV-7 (1.5B World)企业应用案例:本地化多语言技术支持坐席辅助系统 1. 项目背景与价值 在全球化企业技术支持场景中,多语言服务能力与响应效率是两大核心痛点。传统解决方案面临以下挑战: 语言壁垒:跨国团队需要配备多语种人…

作者头像 李华
网站建设 2026/4/24 17:03:36

水面漂浮垃圾2400张8类VOC+YOLO格式

水面漂浮垃圾2400张8类VOCYOLO格式数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):2400 标注数量(xml文件个数):2400 标注数量(t…

作者头像 李华
网站建设 2026/4/24 16:58:06

OBS多平台直播高效解决方案:obs-multi-rtmp插件智能配置指南

OBS多平台直播高效解决方案:obs-multi-rtmp插件智能配置指南 【免费下载链接】obs-multi-rtmp OBS複数サイト同時配信プラグイン 项目地址: https://gitcode.com/gh_mirrors/ob/obs-multi-rtmp 你是否正在为多平台直播的繁琐操作而烦恼?每次直播都…

作者头像 李华