为什么需要多线程做健康检查?
假设你有 50 个服务节点要检查。
单线程:
50 个 × 每个 0.8 秒 = 40 秒多线程(10 并发):
50 个 ÷ 10 线程 ≈ 4 秒差了 10 倍。健康检查本身就是 IO 密集型,多线程几乎零成本提速。
核心方案:ThreadPoolExecutor
Python 3.2+ 内置,几行代码搞定,推荐度 ⭐⭐⭐⭐⭐。
importrequestsimporttimefromconcurrent.futuresimportThreadPoolExecutor,as_completedfromdataclassesimportdataclassfromtypingimportList@dataclassclassNodeResult:name:str# 节点名称url:str# 完整URLstatus:str# 状态码或错误信息elapsed:float# 耗时(秒)alive:bool# 是否存活classHealthChecker:"""多线程健康检查器"""def__init__(self,nodes:List[dict],health_path:str="/health",timeout:int=5):""" nodes: [{"name": "节点A", "host": "api.example.com", "port": 8080}, ...] health_path: 健康检查接口路径 timeout: 单次请求超时(秒) """self.nodes=nodes self.health_path=health_path self.timeout=timeout self.results:List[NodeResult]=[]def_check_one(self,node:dict)->NodeResult:"""检查单个节点"""url=f"http://{node['host']}:{node['port']}{self.health_path}"t1=time.time()try:resp=requests.get(url,timeout=self.timeout)elapsed=time.time()-t1returnNodeResult(name=node['name'],url=url,status=str(resp.status_code),elapsed=elapsed,alive=resp.status_code==200)exceptrequests.exceptions.Timeout:returnNodeResult(name=node['name'],url=url,status="Timeout",elapsed=time.time()-t1,alive=False)exceptrequests.exceptions.ConnectionError:returnNodeResult(name=node['name'],url=url,status="Down",elapsed=time.time()-t1,alive=False)exceptExceptionase:returnNodeResult(name=node['name'],url=url,status=str(e),elapsed=time.time()-t1,alive=False)defrun(self,max_workers:int=10)->List[NodeResult]:"""并发检查所有节点"""self.results=[]withThreadPoolExecutor(max_workers=max_workers)asexecutor:futures={executor.submit(self._check_one,node):nodefornodeinself.nodes}forfutureinas_completed(futures):result=future.result()self.results.append(result)mark="✅"ifresult.aliveelse"❌"print(f"{mark}{result.name:10s}|{result.elapsed:.3f}s |{result.status}")# 按耗时排序self.results.sort(key=lambdax:x.elapsed)returnself.resultsdefsummary(self):"""打印统计摘要"""alive=[rforrinself.resultsifr.alive]dead=[rforrinself.resultsifnotr.alive]print("\n"+"="*50)print(f"📊 健康检查报告 | 总计{len(self.nodes)}节点 | 存活{len(alive)}| 异常{len(dead)}")print("="*50)ifself.results:fastest=self.results[0]print(f"\n🏆 最快节点:{fastest.name}({fastest.elapsed:.3f}s)")print("\n📋 完整排名:")fori,rinenumerate(self.results,1):mark="✅"ifr.aliveelse"❌"print(f"{i}.{mark}{r.name:10s}|{r.elapsed:.3f}s |{r.status}")# ========== 使用示例(全是假数据)==========if__name__=="__main__":# 模拟 8 个服务节点NODES=[{"name":"用户服务-A","host":"user-svc-a.internal","port":8080},{"name":"用户服务-B","host":"user-svc-b.internal","port":8080},{"name":"订单服务","host":"order-svc.internal","port":8081},{"name":"支付网关","host":"payment-gw.internal","port":8082},{"name":"通知服务","host":"notify-svc.internal","port":8083},{"name":"文件服务","host":"file-svc.internal","port":8084},{"name":"日志服务","host":"log-svc.internal","port":8085},{"name":"配置中心","host":"config-center.internal","port":8086},]checker=HealthChecker(nodes=NODES,health_path="/api/health",timeout=3)checker.run(max_workers=8)checker.summary()输出示例:
✅ 用户服务-A | 0.089s | 200 ❌ 订单服务 | 3.001s | Timeout ✅ 支付网关 | 0.112s | 200 ✅ 通知服务 | 0.067s | 200 ✅ 文件服务 | 0.145s | 200 ✅ 日志服务 | 0.098s | 200 ✅ 配置中心 | 0.134s | 200 ✅ 用户服务-B | 0.076s | 200 ================================================== 📊 健康检查报告 | 总计 8 节点 | 存活 7 | 异常 1 ================================================== 🏆 最快节点: 通知服务 (0.067s) 📋 完整排名: 1. ✅ 通知服务 | 0.067s | 200 2. ✅ 用户服务-B | 0.076s | 200 3. ✅ 用户服务-A | 0.089s | 200 4. ✅ 日志服务 | 0.098s | 200 5. ✅ 支付网关 | 0.112s | 200 6. ✅ 配置中心 | 0.134s | 200 7. ✅ 文件服务 | 0.145s | 200 8. ❌ 订单服务 | 3.001s | Timeout关键配置说明
| 参数 | 推荐值 | 说明 |
|---|---|---|
max_workers | 10~20 | 并发线程数,超过 50 收益递减 |
timeout | 3~5 秒 | 单次请求超时,防止卡死 |
health_path | /health或/api/health | 各服务自己定义的健康检查接口 |
常见坑
1. Session 不能跨线程共享
# ❌ 错误:多线程共享同一个 sessionsession=requests.Session()defcheck(url):returnsession.get(url)# 线程不安全# ✅ 正确:每个请求独立创建连接defcheck(url):returnrequests.get(url,timeout=5)# 每个线程自己管理连接2. max_workers 不是越大越好
workers = 100 # ❌ 大概率更慢,还可能触发对方限流 workers = 10 # ✅ 大部分场景够用3. 忘了处理异常
不捕获异常的话,一个节点超时会导致整个检查任务中断。
try:resp=requests.get(url,timeout=5)exceptExceptionase:returnNodeResult(...,status=str(e),alive=False)# ✅ 保证不中断选型决策
节点数 < 20? → 单线程 for 循环就够了 节点数 20~200? → ThreadPoolExecutor(本方案)✅ 节点数 > 200 或已在用异步框架? → asyncio + aiohttp 需要定时循环检查 + 告警? → 加上 schedule 库,每分钟跑一次一句话总结
多线程健康检查的核心就三件事:控并发、设超时、收结果。
ThreadPoolExecutor+as_completed解决了 90% 的场景,剩下 10% 才需要上 asyncio。