背景痛点:流量激增时客服系统“卡”在哪
去年双十一,我们给电商客户做的 AI 客服在 0 点刚过 3 分钟就报警:P99 延迟飙到 4.2 s,意图识别服务大量 504,对话状态同步直接乱序,用户一句话要等十几秒才收到回复。拆日志后发现几个典型瓶颈:
- 对话状态同步延迟:WebSocket 长连接里,每个用户消息都要回写 Redis 集群,同步阻塞模型下,线程池 200 个 Slot 瞬间被占满,新消息进不来。
- 意图识别服务超时:NLU 模块是同步 HTTP 调用,超时 1 s 就重试,重试又占线程,结果线程池雪崩。
- 背压失控:网关层不做限流,MQ 生产者把 RabbitMQ 队列打满,消费者处理不过来,内存直接打满,触发 OOMKilled,Pod 重启丢会话。
一句话,同步 + 阻塞 + 无流控的三连击,让系统在流量洪峰下毫无还手之力。
技术选型:为什么放弃“一把梭”同步架构
先列个对比表,看完就懂:
| 维度 | 同步阻塞(Spring MVC 直调) | 异步事件驱动(FastAPI + RabbitMQ) |
|---|---|---|
| 线程模型 | 1 请求 1 线程,排队等结果 | 单线程事件循环,协程切换 |
| 背压处理 | 靠线程池拒绝策略,暴力 | 消费者 ack 前限流,天然背压 |
| 失败隔离 | 一个下游超时,整条链路卡死 | 队列隔离,单点故障不影响全局 |
| 弹性扩容 | 线程数固定,垂直扩容贵 | 消费者水平扩容,秒级伸缩 |
| 语言生态 | Java 组件多,但重 | Python 3.8+ 异步生态够用,开发快 |
最终拍板:FastAPI做网关层,aio-pika当 RabbitMQ 客户端,协程 + 事件驱动把线程让出来吃宵夜,CPU 利用率从 35% 飙到 75%,同配置机器直接多扛一倍 QPS。
核心实现:动态权重分配 + 熔断
1. 动态权重算法(带熔断)
需求:下游有 3 个 NLU Pod,能力相同但偶尔抽风,需要实时把流量往“健康”节点倾斜,失败率超 30% 就熔断 30 s。
代码不到 120 行,可直接贴进项目:
# weight_balancer.py import asyncio import random import time from typing import Dict, List, Optional class WeightedBalancer: def __init__(self, endpoints: List[str], threshold: float = 0.3, cooldown: int = 30): self.endpoints = endpoints self.threshold = threshold self.cooldown = cooldown self._weights: Dict[str, float] = {ep: 1.0 for ep in endpoints} self._fail: Dict[str, int] = {ep: 0 for ep in endpoints} self._succ: Dict[str, int] = {ep: 0 for ep in endpoints} self._frozen_until: Dict[str, float] = {ep: 0.0 for ep in endpoints} self._lock = asyncio.Lock() async def pick(self) -> Optional[str]: async with self._lock: now = time.time() available = [ep for ep in self.endpoints if now > self._frozen_until[ep]] if not available: return None total = sum(self._weights[ep] for ep in available) r = random.uniform(0, total) cum = 0.0 for ep in available: cum += self._weights[ep] if r <= cum: return ep return available[-1] async def report(self, endpoint: str, ok: bool): async with self._lock: if ok: self._succ[endpoint] += 1 else: self._fail[endpoint] += 1 fail_rate = self._fail[endpoint] / (self._succ[endpoint] + self._fail[endpoint] + 1) if fail_rate > self.threshold: self._frozen_until[endpoint] = time.time() + self.cooldown self._weights[endpoint] = 0.1 # 最低权重,不直接踢掉 else: # 动态权重 = 成功率 self._weights[endpoint] = self._succ[endpoint] / ( self._succ[endpoint] + self._fail[endpoint] )使用示例:
balancer = WeightedBalancer(["nlu-1:8000", "nlu-2:8000", "nlu-3:8000"]) async def call_nlu(text: str) -> str: ep = await balancer.pick() if ep is None: raise RuntimeError("all endpoints frozen") try: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=1)) as s: async with s.post(f"http://{ep}/parse", json={"q": text}) as r: r.raise_for_status() data = await r.json() await balancer.report(ep, ok=True) return data["intent"] except Exception: await balancer.report(ep, ok=False) raise2. Prometheus 埋点
在 FastAPI 里加中间件,不侵入业务:
from prometheus_client import Counter, Histogram, generate_latest from starlette.responses import Response REQUEST_COUNT = Counter("chat_request_total", "total requests", ["method", "endpoint"]) REQUEST_LATENCY = Histogram("chat_request_duration_seconds", "latency") @app.middleware("http") async def prom_middleware(request: Request, call_next): start = time.time() REQUEST_COUNT.labels(method=request.method, endpoint=request.url.path).inc() response = await call_next(request) REQUEST_LATENCY.observe(time.time() - start) return response @app.get("/metrics") def metrics(): return Response(generate_latest(), media_type="text/plain")Grafana 模板 ID 14765 直接导入,就能看到 QPS、P99、失败率三条曲线,告警规则一条就够用:P99 > 800 ms 持续 2 min 就电话喊人。
性能测试:JMeter 压测报告
测试环境:
- 4C8G K8s 节点 * 3
- FastAPI 网关副本 6 个,NLU 副本 6 个
- RabbitMQ 3 节点集群
压测脚本:200 线程、Ramp-up 60 s、循环 300 次,模拟 6 k 并发长连接。
| 指标 | 同步阻塞(优化前) | 异步事件驱动(优化后) |
|---|---|---|
| QPS | 420 | 780 |
| CPU 利用率 | 38% | 72% |
| P99 延迟 | 4.2 s | 800 ms |
| 失败率 | 12% | 0.4% |
结论:同样硬件,吞吐翻倍,延迟砍半,省下的 30% 机器直接退掉,一年云费用少花 18 W。
避坑指南:三个隐形炸弹
1. 对话上下文序列化的线程安全
FastAPI 的依赖注入默认是单例,如果把用户上下文放内存 dict,高并发下两个协程会互相覆盖,导致张冠李戴。
解决:用contextvars.ContextVar做协程隔离,或干脆把上下文扔 Redis 带 TTL,无状态才是云原生。
2. 第三方 NLP 服务幂等性
NLU 有时需要计费,重复调用 = 多扣费。在 MQ 消费者端做幂等键:message_id + user_id写 Redis SETNX,过期 5 min,重复消息直接 ack 不掉计费。
3. K8s 滚动更新时会话保持
WebSocket 长连接默认被 kube-proxy 直接断,用户看到“客服已离线”。
方案:
- 给 Pod 加
preStophook,sleep 15 s,等现有消息处理完再退出。 - 前端配合心跳,断线 3 s 内重连并带上
session_reconnect_token,网关层用 Redis 恢复现场,用户无感知。
延伸思考:边缘计算还能怎么省
目前所有 NLU 请求还是回源到中心云,跨省延迟 60 ms 起跳。如果把意图模型剪枝 + INT8 量化后下发到边缘节点(CDN POP 或运营商 MEC),就能把最耗时的推理放在离用户 10 km 以内:
- 延迟再降 30%,用户体验“秒回”。
- 中心云 CPU 不再跑推理,只负责训练与热更新,成本再省 20%。
- 边缘节点无状态,利用 K8s KubeEdge 一键下发,白天高峰弹性扩容,夜里缩到 0,按秒计费。
当然,边缘内存只有 2 G,模型体积需压到 100 MB 以内,这就得靠量化、蒸馏、稀疏化三板斧了——下一篇再聊。
整套方案上线三个月,再没因为大促加机器。开发组也不用凌晨 3 点起床扩容,把复杂留给自己,把 800 ms 的顺滑留给用户,大概这就是工程价值最好的注解。