Python 异步编程实战:别让事件循环卡死你的服务
一、为什么异步代码写起来简单,跑起来却像阻塞一样卡死?
Python 的asyncio是异步编程的标配,但新手最容易踩的坑就是:在async函数里调用了同步阻塞操作(比如requests.get、time.sleep、同步文件读写)。这会导致整个事件循环被卡住,所有协程都排不上队。
更隐蔽的坑是“饥饿”:即使没有显式阻塞,如果一个协程运行时间过长,它会独占事件循环,其他协程根本插不上手。
举个真实的例子:一个异步 Web 服务同时处理 100 个请求,其中 1 个请求触发了 CPU 密集计算(比如图像处理),耗时 5 秒。结果这 100 个请求全卡了 5 秒——因为 CPU 密集计算阻塞了事件循环,其他协程无法被调度。从“异步”变“阻塞”,往往只差一个忘记run_in_executor的调用。
二、asyncio 到底是怎么调度的?
asyncio 的核心是事件循环(Event Loop):它维护一个就绪队列,不断从中取出协程执行。当协程遇到 I/O 操作时,注册回调并让出控制权;当 I/O 完成时,回调将协程重新加入就绪队列。这个模型在 I/O 密集场景下效率极高,但在 CPU 密集场景下反而更差——因为事件循环是单线程的。
flowchart TB A[事件循环] --> B[就绪队列] B --> C[取出协程执行] C --> D{协程状态} D -->|I/O 等待| E[注册回调, 让出控制权] D -->|计算完成| F[返回结果] D -->|CPU 密集| G[阻塞事件循环!] E --> H[I/O 完成] H --> B F --> I[继续执行后续逻辑] G --> J[所有协程被阻塞] subgraph 正确模式 K[CPU 密集任务 → run_in_executor] L[阻塞 I/O → 替换为异步库] M[长时间协程 → 定期 await asyncio.sleep(0)] end K --> N[线程池执行, 不阻塞事件循环] L --> N M --> N2.1 调度粒度:await是关键
事件循环的调度粒度是“协程切换点”——即await表达式。两个await之间的代码是原子执行的,不会被其他协程打断。这意味着:如果一段代码中没有await,它将独占事件循环直到执行完毕。
2.2 结构化并发:别留“孤儿任务”
结构化并发(Structured Concurrency)的核心原则是:所有子任务的生命周期必须包含在父任务的生命周期内。父任务创建子任务后,必须等待所有子任务完成(或取消)后才能退出。这避免了“孤儿任务”——子任务在父任务退出后仍在运行,导致资源泄漏或状态不一致。
Python 3.11 引入的TaskGroup是结构化并发的标准实现:使用async with创建任务组,任务组退出时自动等待所有子任务完成。任何子任务抛出异常时,其他子任务会被自动取消。
2.3 取消与超时
协程的取消通过Task.cancel()实现,它向目标协程抛出CancelledError。协程可以在try/finally中捕获该异常并执行清理逻辑。超时通过asyncio.wait_for()实现,超时后自动取消目标协程。
三、代码实战:怎么写出靠谱的异步代码?
3.1 异步 HTTP 客户端:连接池 + 超时 + 重试
import asyncio import aiohttp from typing import Optional from dataclasses import dataclass import time @dataclass class HttpResponse: """HTTP 响应封装""" status: int body: str elapsed_ms: float class AsyncHttpClient: """异步 HTTP 客户端:连接池 + 超时 + 重试""" def __init__(self, max_connections: int = 100, request_timeout: float = 30.0, max_retries: int = 2): self.max_connections = max_connections self.request_timeout = request_timeout self.max_retries = max_retries self._session: Optional[aiohttp.ClientSession] = None async def _get_session(self) -> aiohttp.ClientSession: """懒初始化 ClientSession(必须在事件循环内创建)""" if self._session is None or self._session.closed: connector = aiohttp.TCPConnector( limit=self.max_connections, # 启用 DNS 缓存,减少重复解析开销 use_dns_cache=True, # 保持连接复用,减少 TCP 握手 enable_cleanup_closed=True, ) timeout = aiohttp.ClientTimeout(total=self.request_timeout) self._session = aiohttp.ClientSession( connector=connector, timeout=timeout ) return self._session async def get(self, url: str, **kwargs) -> HttpResponse: """GET 请求,带自动重试""" last_error = None for attempt in range(self.max_retries + 1): start = time.monotonic() try: session = await self._get_session() async with session.get(url, **kwargs) as resp: body = await resp.text() elapsed = (time.monotonic() - start) * 1000 return HttpResponse( status=resp.status, body=body, elapsed_ms=elapsed, ) except (aiohttp.ClientError, asyncio.TimeoutError) as e: last_error = e # 指数退避重试 if attempt < self.max_retries: wait_time = 0.5 * (2 ** attempt) await asyncio.sleep(wait_time) raise RuntimeError( f"请求失败(重试 {self.max_retries} 次): {last_error}" ) async def close(self): """关闭连接池""" if self._session and not self._session.closed: await self._session.close()3.2 结构化并发与 TaskGroup
import asyncio from typing import Any async def fetch_multiple(urls: list[str]) -> list[HttpResponse]: """并发请求多个 URL,使用 TaskGroup 实现结构化并发""" client = AsyncHttpClient() results: dict[str, HttpResponse] = {} async with asyncio.TaskGroup() as tg: async def fetch_one(url: str): try: resp = await client.get(url) results[url] = resp except Exception as e: # 记录失败,不中断其他任务 results[url] = HttpResponse( status=0, body=str(e), elapsed_ms=0 ) for url in urls: tg.create_task(fetch_one(url)) await client.close() return [results[url] for url in urls] async def fetch_with_timeout(urls: list[str], timeout_per_url: float = 5.0) -> list[HttpResponse]: """带超时的并发请求""" client = AsyncHttpClient() results: dict[str, HttpResponse] = {} async with asyncio.TaskGroup() as tg: async def fetch_one(url: str): try: # 为每个请求设置独立超时 resp = await asyncio.wait_for( client.get(url), timeout=timeout_per_url, ) results[url] = resp except asyncio.TimeoutError: results[url] = HttpResponse( status=0, body="请求超时", elapsed_ms=timeout_per_url * 1000 ) except Exception as e: results[url] = HttpResponse( status=0, body=str(e), elapsed_ms=0 ) for url in urls: tg.create_task(fetch_one(url)) await client.close() return [results[url] for url in urls]3.3 CPU 密集任务的异步化
import asyncio from concurrent.futures import ProcessPoolExecutor from functools import partial # CPU 密集函数(在进程池中执行) def cpu_intensive_task(data: bytes, threshold: float = 0.5) -> dict: """模拟 CPU 密集的图像处理任务""" import hashlib # 在实际场景中,这里可能是 NumPy 计算、PIL 图像处理等 result = hashlib.sha256(data).hexdigest() return {"hash": result, "size": len(data)} async def process_with_executor(data: bytes) -> dict: """ 将 CPU 密集任务提交到进程池执行 避免阻塞事件循环 """ loop = asyncio.get_running_loop() # 使用 ProcessPoolExecutor(而非 ThreadPoolExecutor) # 因为 Python GIL 限制,线程池无法利用多核 # 进程池可以真正并行执行 CPU 密集任务 with ProcessPoolExecutor(max_workers=4) as executor: # run_in_executor 将同步函数包装为协程 result = await loop.run_in_executor( executor, partial(cpu_intensive_task, data), ) return result async def batch_process(items: list[bytes]) -> list[dict]: """批量处理 CPU 密集任务""" loop = asyncio.get_running_loop() with ProcessPoolExecutor(max_workers=4) as executor: # 并发提交所有任务 futures = [ loop.run_in_executor( executor, partial(cpu_intensive_task, item), ) for item in items ] # 等待所有任务完成 results = await asyncio.gather(*futures) return results3.4 异步上下文管理器与信号量
import asyncio class AsyncRateLimiter: """异步速率限制器:控制并发请求数""" def __init__(self, max_concurrent: int = 10, rate_limit: float = 100.0): self.semaphore = asyncio.Semaphore(max_concurrent) self.rate_limit = rate_limit self._tokens = rate_limit self._last_refill = asyncio.get_event_loop().time() async def acquire(self): """获取一个令牌""" await self._refill_tokens() # 等待令牌可用 while self._tokens < 1: await asyncio.sleep(0.01) await self._refill_tokens() self._tokens -= 1 await self.semaphore.acquire() def release(self): """释放令牌""" self.semaphore.release() async def _refill_tokens(self): """按速率补充令牌""" now = asyncio.get_event_loop().time() elapsed = now - self._last_refill self._tokens = min( self.rate_limit, self._tokens + elapsed * self.rate_limit, ) self._last_refill = now async def __aenter__(self): await self.acquire() return self async def __aexit__(self, *args): self.release()四、架构权衡:协程、线程还是进程?
| 维度 | asyncio(协程) | threading(线程) | multiprocessing(进程) |
|---|---|---|---|
| 并发类型 | 协作式(需 await 让出) | 抢占式(OS 调度) | 独立进程 |
| GIL 影响 | 受限(单线程) | 受限(GIL) | 不受限 |
| 内存开销 | 极低(协程 ~2KB) | 中(线程 ~8MB) | 高(进程 ~30MB) |
| 适用场景 | I/O 密集 | I/O 密集 + 简单并行 | CPU 密集 |
| 调试难度 | 中(堆栈追踪复杂) | 高(竞态条件) | 低(独立进程) |
权衡一:协程与线程的选择。I/O 密集场景用协程(内存开销低、无锁问题),CPU 密集场景用进程(绕过 GIL)。混合场景用协程 +run_in_executor桥接。
权衡二:TaskGroup 与 gather。asyncio.gather是非结构化的——子任务的异常不会自动取消其他任务。TaskGroup是结构化的——任何子任务异常会取消所有其他子任务。建议新代码统一使用 TaskGroup。
权衡三:信号量与连接池。信号量控制并发数但不管理连接生命周期,连接池既控制并发又复用连接。对于 HTTP 请求,连接池(aiohttp.TCPConnector)比信号量更合适。
五、总结:异步编程的核心思路
Python 异步编程的核心思路是"I/O 让出、计算隔离、结构化并发"。I/O 操作用await让出控制权,CPU 密集任务用run_in_executor隔离到进程池,子任务用TaskGroup纳入结构化生命周期——三者协同,才能写出真正高效的异步代码。
落地步骤:第一步,将所有同步 I/O 替换为异步库(requests → aiohttp,time.sleep → asyncio.sleep);第二步,用TaskGroup替换gather,确保子任务的生命周期可控;第三步,对 CPU 密集任务使用ProcessPoolExecutor,避免阻塞事件循环。关键原则是——异步编程不是把所有函数都加上async,而是让 I/O 等待时 CPU 不闲着。