news 2026/6/15 19:38:11

Python 爬虫异步架构实战:基于 aiohttp+Redis 队列实现分布式任务分发与消费

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python 爬虫异步架构实战:基于 aiohttp+Redis 队列实现分布式任务分发与消费

前言

单机同步爬虫受限于串行请求逻辑,面对海量目标 URL 采集场景时,请求排队阻塞、CPU 与网络资源利用率低下,单进程单日爬取量级难以突破万级;多进程改造又存在任务重复分配、任务状态无记录、节点宕机丢失未完成任务等缺陷。依托 aiohttp 协程高并发特性搭配 Redis 消息队列构建分布式爬虫调度架构,能够实现任务统一集中下发、多爬虫节点并行消费、失败任务回滚重入队列,从架构层面拆分调度中心与采集工作节点,突破单机性能上限。本文围绕 Redis 任务队列数据结构选型、生产者任务入库逻辑、aiohttp 异步消费者协程池管控、异常任务回写重试、多节点分布式防重复抓取等核心模块展开落地,区分普通待爬队列、失败死信队列、去重缓存集合三类 Redis 存储单元,附带完整可运行工程代码、架构参数选型对照表,完成从任务生产、分发、消费、异常回收全链路工程化落地。

本文所需依赖官方文档链接:

  1. aiohttp:异步 HTTP 请求核心库,实现高并发页面采集
  2. redis-py:Redis 客户端,实现任务队列读写与去重管控
  3. asyncio:Python 内置协程调度,管理异步任务池
  4. ujson:高性能序列化库,任务信息快速序列化存储

一、分布式异步爬虫架构分层与存储设计

1.1 三层架构分工明细

整套架构划分为任务生产者层、Redis 中间存储层、异步消费者层,三层解耦可独立部署扩容:

表格

架构分层核心职责部署特性
生产者层批量生成待爬 URL、组装任务附加参数、任务入队、初始化去重集合独立部署调度服务,可定时从文件 / 数据库批量生成任务
Redis 中间层存储待爬任务队列、失败死信任务、已爬取 URL 去重集合、爬虫运行配置参数独立 Redis 服务,支持多节点爬虫共享数据源
消费者层aiohttp 协程批量拉取任务、页面数据采集、数据入库、异常任务回写死信队列多台服务器横向新增节点即可提升整体爬虫吞吐

1.2 Redis 多数据结构任务存储规划

为实现任务生命周期全管控,拆分三类存储容器,规避任务重复入队、重复爬取:

  1. List 结构:spider:task:wait待爬任务主队列,左侧入队、右侧阻塞弹出,天然 FIFO 先进先出;
  2. List 结构:spider:task:dead死信失败队列,超过重试阈值的异常任务存入,等待人工二次下发;
  3. Set 结构:spider:url:done已完成 URL 去重集合,存入采集成功的 URL,入队前先校验集合,杜绝重复任务生成。

1.3 核心配置常量预定义

python

运行

# Redis连接配置 REDIS_HOST = "127.0.0.1" REDIS_PORT = 6379 REDIS_DB = 0 # 队列键名 QUEUE_WAIT = "spider:task:wait" QUEUE_DEAD = "spider:task:dead" SET_DONE = "spider:url:done" # 爬虫配置 MAX_CONCURRENT = 20 # 单实例最大协程并发数 MAX_RETRY = 3 # 单任务最大重试次数

二、环境依赖安装与基础客户端初始化

bash

运行

pip install aiohttp redis ujson

2.1 同步 Redis 生产者客户端、异步 Redis 消费者客户端区分

生产者批量写入任务使用同步 redis 客户端,消费者异步环境采用 aioredis 实现非阻塞队列读取,避免同步 Redis 阻塞协程运行:

python

运行

import redis import ujson # 同步生产者Redis sync_redis = redis.Redis(host=REDIS_HOST,port=REDIS_PORT,db=REDIS_DB,decode_responses=True)

三、任务生产者模块:批量生成任务与防重复入队实现

3.1 生产者底层原理

生产者读取目标 URL 清单,每条任务封装字典结构(url、retry_count、extra_params),入队前校验 URL 是否存在于已爬集合,不存在则序列化后 LPUSH 推入待爬队列,实现源头去重,避免无效任务占用爬虫资源。

3.2 生产者完整代码实现

python

运行

class TaskProducer: @staticmethod def generate_task(url_list:list,extra:dict=None): """批量生成任务并入队""" extra_info = extra if extra else {} insert_count = 0 for url in url_list: # 去重校验:已爬URL直接跳过 if sync_redis.sismember(SET_DONE,url): continue # 组装任务体,初始化重试次数0 task_body = { "url":url, "retry_count":0, "extra":extra_info } # ujson序列化 task_str = ujson.dumps(task_body) sync_redis.rpush(QUEUE_WAIT,task_str) insert_count +=1 print(f"成功入库任务数:{insert_count}") # 生产者测试调用 if __name__ == "__main__": test_urls = [ "https://httpbin.org/get?page=1", "https://httpbin.org/get?page=2", "https://httpbin.org/get?page=3" ] TaskProducer.generate_task(test_urls,extra={"source":"test_task","cate":"demo"})

3.3 生产者优化细节

选用 ujson 替换原生 json 序列化,大批量任务生成时序列化速度提升 40% 以上;使用 Set 天然去重特性,相比数据库查询去重耗时大幅缩减。

四、aiohttp 异步消费者协程池实现

4.1 消费者核心运行逻辑

  1. 单个协程从 Redis 阻塞式 RPOP 拉取任务,无任务时阻塞等待,减少无效轮询;
  2. aiohttp 发起异步请求,页面解析提取目标数据;
  3. 请求成功:URL 写入已完成 Set 集合,数据落地存储;
  4. 请求失败:重试计数 + 1,未达上限重新推入待爬队列,超出上限存入死信队列。

4.2 异步爬虫消费者完整代码

python

运行

import asyncio import aiohttp import ujson import aioredis class AsyncTaskConsumer: def __init__(self): self.session = None self.redis_conn = None self.headers = { "User-Agent":"Mozilla/5.0 Windows Chrome/120.0.0.0 Safari/537.36" } async def init_resource(self): """初始化异步会话与异步Redis连接""" self.session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=MAX_CONCURRENT)) self.redis_conn = await aioredis.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}",decode_responses=True) async def close_resource(self): """关闭连接释放资源""" await self.session.close() await self.redis_conn.close() async def crawl_single_task(self,task_info:dict): """单任务采集逻辑""" url = task_info["url"] try: async with self.session.get(url,headers=self.headers,timeout=aiohttp.ClientTimeout(total=8)) as resp: content = await resp.text() # 模拟数据入库,生产环境替换mysql/mongo写入逻辑 print(f"采集成功:{url},页面长度:{len(content)}") # 成功标记URL为已完成 await self.redis_conn.sadd(SET_DONE,url) return True except Exception as e: print(f"采集失败{url},异常:{str(e)}") task_info["retry_count"] +=1 # 未超限重入待爬队列,否则死信 if task_info["retry_count"] < MAX_RETRY: await self.redis_conn.rpush(QUEUE_WAIT,ujson.dumps(task_info)) else: await self.redis_conn.rpush(QUEUE_DEAD,ujson.dumps(task_info)) return False async def consumer_worker(self): """单个消费协程循环拉取任务""" while True: # blpop阻塞弹出,等待超时3秒 task_raw = await self.redis_conn.blpop(QUEUE_WAIT,timeout=3) if not task_raw: continue task_str = task_raw[1] task_dict = ujson.loads(task_str) await self.crawl_single_task(task_dict) async def run_consumer_pool(self,worker_num=MAX_CONCURRENT): """启动协程消费池""" await self.init_resource() task_list = [asyncio.create_task(self.consumer_worker()) for _ in range(worker_num)] await asyncio.gather(*task_list) # 消费者启动入口 async def main(): consumer = AsyncTaskConsumer() await consumer.run_consumer_pool() if __name__ == "__main__": asyncio.run(main())

4.3 协程消费关键原理拆解

  1. blpop阻塞队列:队列无任务时协程休眠,不循环空跑占用 CPU,是消息队列经典消费方案;
  2. TCPConnector (limit) 管控并发:限制单实例最大同时 TCP 连接,避免瞬间海量请求触发目标站点风控;
  3. 失败分级处理:动态变更任务重试字段,实现阶梯式重试,杜绝无效死循环重试。

五、多节点分布式拓展与防重复消费方案

5.1 多节点横向扩容规则

新服务器仅需部署同一份消费者代码,配置指向同一个远程 Redis 地址即可接入集群,生产者与所有消费者共用一套任务队列,新增节点自动分摊剩余任务,无需修改生产者代码。

5.2 分布式重复消费规避方案

表格

方案实现逻辑适用场景
URL 前置去重生产者入队前 Set 校验 URL常规资讯、商品列表爬虫
任务消费临时锁Redis Set 标记正在消费任务,完成后删除风控严苛、耗时较长的详情页爬虫

临时锁补充代码片段:

python

运行

# 消费开始上锁 lock_key = f"lock:{task_dict['url']}" if await self.redis_conn.sismember("spider:lock:doing",lock_key): continue await self.redis_conn.sadd("spider:lock:doing",lock_key) # 采集完成解锁 await self.redis_conn.srem("spider:lock:doing",lock_key)

六、死信队列运维与任务二次补发工具

6.1 死信任务补发函数

定期人工核查死信队列异常 URL,修复反爬、接口故障问题后批量重新下发至待爬队列:

python

运行

def resend_dead_task(): while True: task = sync_redis.lpop(QUEUE_DEAD) if not task: break sync_redis.rpush(QUEUE_WAIT,task) print("死信任务重新下发完成")

6.2 死信分类统计

通过 redis 批量取出死信任务,解析异常来源,针对性优化 UA 池、代理配置,从源头减少失败任务生成。

七、架构性能调优参数对照表

表格

调优项参数配置建议优化效果
单实例协程数15~30,依据目标站点反爬强度调整反爬宽松站点上调,风控站点下调并发
请求超时时间5~10 秒网络差的代理爬虫放宽至 12 秒
Redis 持久化RDB+AOF 双持久开启服务器宕机任务不丢失
任务批量生成单次入队 500~2000 条减少 Redis 频繁 IO 损耗

八、常见架构故障与解决方案

表格

异常现象故障诱因优化方案
多节点同一 URL 重复爬取无消费临时锁,任务弹出后节点宕机任务丢失新增消费锁定机制,任务完成再移除锁定
Redis 队列数据暴涨消费者消费速度低于生产者生产速度新增多台消费节点、提升单实例并发数
大量任务进入死信队列IP 封禁、UA 失效接入前文异地多区代理池,动态切换代理发起请求

九、总结

aiohttp+Redis 队列分布式异步爬虫架构依托生产消费分离设计,从根源解决单机爬虫并发瓶颈,依靠 Redis 实现任务统一调度、去重管控、失败任务回收三大核心能力。生产者专注任务生成、源头去重,消费者依托协程实现高并发采集,多服务器横向部署即可线性提升采集吞吐量。在实际项目落地中,可搭配代理池、异常重试自愈机制、结构化数据入库模块,形成一套可 7×24 小时稳定运行的工业化分布式爬虫体系,适配海量站点全量数据批量采集场景。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/8 19:23:58

5分钟快速上手:免费SVG网络拓扑图工具终极指南

5分钟快速上手&#xff1a;免费SVG网络拓扑图工具终极指南 【免费下载链接】easy-topo vuesvgelement-ui 快捷画出网络拓扑图 项目地址: https://gitcode.com/gh_mirrors/ea/easy-topo 如果你正在寻找一款简单易用的免费网络拓扑图工具&#xff0c;那么Easy-Topo绝对是你…

作者头像 李华
网站建设 2026/6/8 12:40:35

终极Fontmin字体优化指南:如何实现高效Web字体压缩与转换

终极Fontmin字体优化指南&#xff1a;如何实现高效Web字体压缩与转换 【免费下载链接】fontmin Minify font seamlessly 项目地址: https://gitcode.com/gh_mirrors/fo/fontmin Fontmin是一个基于Node.js的字体处理工具&#xff0c;专注于无缝压缩字体文件并生成各种Web…

作者头像 李华
网站建设 2026/6/10 7:06:36

2026年OpenClaw/Hermes Agent配置Token Plan保姆级全攻略

2026年OpenClaw/Hermes Agent配置Token Plan保姆级全攻略。OpenClaw是开源的个人AI助手&#xff0c;Hermes Agent则是一个能自我进化的AI智能体框架。阿里云提供计算巢、轻量服务器及无影云电脑三种部署OpenClaw 与 Hermes Agent的方案、百炼Token Plan兼容主流 AI 工具&#x…

作者头像 李华
网站建设 2026/6/10 9:15:32

重磅汇总!2026AI写作辅助软件大盘点(覆盖 99% 论文写作需求)

本文精选13 款2026 年实测 AI 论文工具&#xff0c;按全流程全能型、垂直领域专精型、润色降重专家、文献管理助手四大类别排序&#xff0c;覆盖从选题到定稿全链路&#xff0c;适配本科 / 硕博 / 期刊全场景&#xff0c;附选型速查表与避坑指南&#xff0c;帮你快速找到最佳拍…

作者头像 李华
网站建设 2026/6/10 5:53:53

如何突破四足机器人开发瓶颈?Unitree Go2 ROS2 SDK深度实践指南

如何突破四足机器人开发瓶颈&#xff1f;Unitree Go2 ROS2 SDK深度实践指南 【免费下载链接】go2_ros2_sdk Unofficial ROS2 SDK support for Unitree GO2 AIR/PRO/EDU 项目地址: https://gitcode.com/gh_mirrors/go/go2_ros2_sdk 想要在四足机器人开发中实现从基础控制…

作者头像 李华