在异步 HTTP 请求场景中,aiohttp 是 Python 生态下的主流选择。实际开发中,请求的日志记录(排查问题)和失败重试(提升稳定性)是必备能力,而 aiohttp 的中间件机制能优雅地实现这两个功能,无需侵入业务代码。本文将详细讲解如何基于 aiohttp 中间件,打造通用的异步请求日志与重试组件。
一、核心概念:aiohttp 中间件
aiohttp 中间件是一个异步函数,它介于ClientSession和实际 HTTP 请求之间,能拦截请求的发起、响应的返回以及异常的抛出。其核心作用是对请求 / 响应生命周期进行统一处理,格式如下:
python
运行
async def middleware(app, handler): async def wrapper(request): # 请求发送前的处理(如日志、参数修改) response = await handler(request) # 执行实际请求 # 响应返回后的处理(如日志、响应解析) return response return wrapper中间件的优势在于:一次定义,全局生效,所有通过ClientSession发起的请求都会经过中间件处理。
二、实现思路分析
我们需要实现两个核心功能,且需保证逻辑解耦:
- 日志中间件:记录请求的 URL、方法、状态码、耗时、请求体(可选)、响应体(可选)等关键信息;
- 重试中间件:捕获指定类型的异常(如网络超时、5xx 服务器错误),按照配置的次数和间隔重试请求;
- 执行顺序:重试中间件应包裹日志中间件(先重试,再记录最终的请求结果)。
三、完整代码实现
3.1 依赖安装
首先确保安装 aiohttp 和异步重试依赖(tenacity支持异步重试,比手动写循环更优雅):
bash
运行
pip install aiohttp tenacity python-dotenv3.2 核心代码
python
运行
import asyncio import logging import time from typing import Dict, Any, Optional import aiohttp from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, AsyncRetrying, ) # 配置日志格式 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler()] ) logger = logging.getLogger("aiohttp-requests") # -------------------------- 日志中间件 -------------------------- async def logging_middleware(app: aiohttp.web.Application, handler): """ 记录请求的关键信息:URL、方法、状态码、耗时、异常等 """ async def wrapper(request: aiohttp.ClientRequest): start_time = time.time() request_id = f"req-{int(start_time * 1000)}" # 生成唯一请求ID try: # 执行实际请求 response = await handler(request) # 计算耗时 elapsed = round((time.time() - start_time) * 1000, 2) # 记录成功日志 logger.info( f"[{request_id}] SUCCESS | {request.method} {request.url} | " f"Status: {response.status} | Elapsed: {elapsed}ms" ) return response except Exception as e: # 记录失败日志 elapsed = round((time.time() - start_time) * 1000, 2) logger.error( f"[{request_id}] FAILED | {request.method} {request.url} | " f"Error: {str(e)} | Elapsed: {elapsed}ms", exc_info=True # 打印异常堆栈,便于排查 ) raise # 抛出异常,让重试中间件处理 return wrapper # -------------------------- 重试中间件 -------------------------- async def retry_middleware(app: aiohttp.web.Application, handler): """ 对指定异常的请求进行重试: - 重试次数:3次 - 等待策略:指数退避(1s, 2s, 4s) - 重试条件:网络超时、5xx错误、连接错误 """ async def wrapper(request: aiohttp.ClientRequest): # 定义重试策略 retryer = AsyncRetrying( stop=stop_after_attempt(3), # 最多重试3次 wait=wait_exponential(multiplier=1, min=1, max=4), # 指数等待 retry=retry_if_exception_type( ( aiohttp.ClientTimeoutError, # 超时异常 aiohttp.ClientConnectionError, # 连接异常 aiohttp.ServerConnectionError, # 服务器连接异常 ) ), before_sleep=lambda retry_state: logger.warning( f"Retry {retry_state.attempt_number} for {request.method} {request.url} " f"due to {retry_state.outcome.exception()}" ), # 重试前打印日志 ) try: # 执行带重试的请求 response = await retryer.__aenter__() try: return await handler(request) finally: await retryer.__aexit__(None, None, None) except Exception as e: logger.error(f"All retries failed for {request.method} {request.url}: {str(e)}") raise return wrapper # -------------------------- 扩展ClientSession -------------------------- class CustomClientSession(aiohttp.ClientSession): """ 封装带日志和重试的ClientSession,简化使用 """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 注册中间件(注意顺序:重试在外,日志在内) self._middlewares = [retry_middleware, logging_middleware] # -------------------------- 测试代码 -------------------------- async def test_requests(): """测试异步请求日志与重试功能""" # 使用自定义的Session async with CustomClientSession(timeout=aiohttp.ClientTimeout(total=5)) as session: # 测试正常请求 try: async with session.get("https://httpbin.org/get") as resp: print(f"Normal request: {resp.status}") except Exception as e: print(f"Normal request failed: {e}") # 测试会触发重试的请求(模拟超时/5xx) try: async with session.get("https://httpbin.org/delay/10") as resp: # 超时请求 print(f"Retry request: {resp.status}") except Exception as e: print(f"Retry request failed: {e}") if __name__ == "__main__": asyncio.run(test_requests())四、代码关键说明
4.1 日志中间件
- 生成唯一请求 ID:便于关联同一次请求的日志,排查问题时可快速定位;
- 记录核心指标:请求方法、URL、状态码、耗时,失败时打印异常堆栈;
- 不吞异常:记录日志后重新抛出异常,保证重试中间件能捕获并处理。
4.2 重试中间件
- 基于
tenacity实现异步重试:相比手动写循环,tenacity支持更灵活的重试策略(指数退避、最大次数、自定义条件); - 精准重试:仅对网络超时、连接错误等临时异常重试,避免对业务异常(如 400 参数错误)无效重试;
- 重试日志:每次重试前打印日志,便于观察重试过程。
4.3 自定义 ClientSession
- 封装中间件注册逻辑:业务代码只需使用
CustomClientSession,无需重复注册中间件; - 中间件顺序:重试中间件在外,日志中间件在内,保证每次重试的请求都能被日志记录。
五、进阶优化
5.1 可配置化
将重试次数、等待时间、日志级别等配置抽离到配置文件(如.env),便于不同环境调整:
python
运行
# 从环境变量读取配置 import os from dotenv import load_dotenv load_dotenv() RETRY_MAX_ATTEMPTS = int(os.getenv("RETRY_MAX_ATTEMPTS", 3)) RETRY_WAIT_MULTIPLIER = float(os.getenv("RETRY_WAIT_MULTIPLIER", 1))5.2 支持 5xx 状态码重试
默认重试只捕获异常,可扩展为对 5xx 响应码重试:
python
运行
# 在重试中间件中新增判断逻辑 async def wrapper(request: aiohttp.ClientRequest): async def _handle_request(): response = await handler(request) if response.status >= 500: raise aiohttp.ServerErrorResponse( f"Server error: {response.status}", status=response.status ) return response # 将原handler替换为_handle_request retryer = AsyncRetrying(...) try: response = await retryer.__aenter__() try: return await _handle_request() finally: await retryer.__aexit__(None, None, None) # ...5.3 忽略指定 URL 重试
对不需要重试的 URL(如写操作接口)添加白名单:
python
运行
# 重试中间件中添加过滤逻辑 NO_RETRY_URLS = ["/api/write", "/api/submit"] if any(url in str(request.url) for url in NO_RETRY_URLS): return await handler(request) # 直接执行,不重试六、使用注意事项
- 重试幂等性:仅对幂等请求(如 GET 查询)重试,POST/PUT 等写操作需确保接口幂等,避免重复提交;
- 超时设置:单个请求的超时时间不宜过长,结合重试次数,避免整体请求耗时过久;
- 日志脱敏:若请求包含敏感信息(如 token、密码),需在日志中脱敏,避免泄露;
- 中间件顺序:重试中间件需在日志中间件外层,否则重试的请求不会被日志记录。
总结
- aiohttp 中间件能无侵入地实现请求日志和重试,核心是通过拦截
handler执行过程,添加通用逻辑; - 日志中间件重点记录请求 ID、耗时、状态码和异常,重试中间件基于
tenacity实现精准的异步重试; - 使用时需注意中间件顺序、重试幂等性和超时控制,保证功能可靠且不引入新问题。
通过上述实现,你可以快速为 aiohttp 异步请求添加企业级的日志和重试能力,提升项目的可维护性和稳定性。