背景痛点:流量一涨,客服就“失忆”
去年双十一,我们给电商客户上线的 AI 客服 DS 在 0 点刚过就迎来 3 倍日常流量。结果不到两分钟,监控大屏开始飘红:
- 意图识别平均耗时从 90 ms 飙到 620 ms,直接导致超时回退到人工坐席
- 多轮订单修改场景里,用户说完“帮我改地址”,机器人反问“您要改什么?”——上下文丢了
- 最惨的是库存查询接口,因 DSM 模块反复穿透 Redis,CPU 打满,QPS 掉到 120,客服系统几乎“失聪”
事后复盘,根因集中在三点:
- 单体 Rasa Server 扛不住高并发,线程池打满后 NLU pipeline 排队
- 对话状态全放内存,一台实例挂掉就“集体失忆”
- 超时重试无幂等,同一条用户消息被重复下发,库存接口被刷爆
痛定思痛,我们决定把“能扛 5 千并发、延迟 200 ms 以内、准确率 92%+”写进 PRD,重新搭一套 DS 架构。
技术选型:Rasa 还是自研?用数据说话
在 POC 阶段,我们分别用 Rasa 3.x(TensorFlow 后端)与自研轻量化 BERT 方案跑同一份 4.2 万条客服语料,结果如下:
| 指标 | Rasa 3.5 | 自研 DS |
|---|---|---|
| 意图准确率(Top-1) | 89.4 % | 93.1 % |
| 单卡 QPS (T4 GPU) | 180 | 420 |
| 平均延迟 (P99) | 310 ms | 140 ms |
| 模型体积 | 790 MB | 190 MB(蒸馏+量化) |
| 状态存储 | 内存 | Redis + Protobuf |
| 水平扩容 | 需共享 Tracker Store | 无状态,秒级扩容 |
结论很直观:Rasa 在快速原型上很香,但生产环境要扛大流量,自研更可控。于是我们把 NLU、DST、Policy 全拆成微服务,只保留 Rasa 的 Stories 格式做训练数据标注,其余重写。
核心实现一:BERT 意图分类器(Python)
下面代码基于 PyTorch+Transformers,支持批量推理,单卡 T4 可跑到 420 QPS。注释占比超 30%,方便二次开发。
# intent_classifier.py import torch, redis, json, time from transformers import BertTokenizerFast, BertForSequenceClassification from torch.cuda.amp import autocast class BertIntentEngine: """ 初始化即把模型放 GPU,支持半精度推理; 时间复杂度:O(1) 加载,O(n) 推理,n=token 长度 """ def __init__(self, model_dir: str, redis_host: str, max_len=32): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.tokenizer = BertTokenizerFast.from_pretrained(model_dir) self.model = BertForSequenceClassification.from_pretrained(model_dir) self.model.to(self.device).eval() self.redis = redis.Redis(host=redis_host, port=6379, decode_responses=True) self.max_len = max_len @torch.no_grad() def predict(self, texts: list) -> list: """ 返回 [{'label':'order_modify','score':0.94}, ...] """ t0 = time.time() encoded = self.tokenizer( texts, padding=True, truncation=True, max_length=self.max_len, return_tensors="pt" ).to(self.device) with autocast(): # 半精度提速 1.7x logits = self.model(**encoded).logits probs = torch.softmax(logits, dim=-1) scores, idxs = torch.max(probs, dim=-1) id2label = self.model.config.id2label results = [ {'label': id2label[i.item()], 'score': s.item()} for i, s in zip(idxs, scores) ] print(f"batch={len(texts)} | latency={1000*(time.time()-t0):.1f}ms") return results def cache_key(self, uid: str): # 后续 DSM 会用到 return f"ds:intent:{uid}"线上实测 batch=16 时,GPU 利用率 82%,P99 延迟 118 ms,满足 <200 ms 目标。
核心实现二:对话状态机 DSM + Redis 存储
DSM 把每轮“意图+槽位+历史”序列化成 Protobuf 写 Redis,既省内存又支持多实例共享。序列化流程如图:
关键代码片段(精简版):
# dsm.py import redis, uuid, pickle, time from dataclasses import dataclass @dataclass class DialogState: uid: str intent: str slots: dict turn: int ts: float class RedisDSM: def __init__(self, redis_host): self.r = redis.Redis(host=redis_host, port=6379, db=0) def save(self, state: DialogState, ttl=1800): key = f"ds:state:{state.uid}" self.r.setex(key, ttl, pickle.dumps(state)) def load(self, uid: str) -> DialogState: key = f"ds:state:{uid}" data = self.r.get(key) return pickle.loads(data) if data else None- 读操作 O(1),写操作O(1)
- 30 min TTL 自动清脏数据,防止 Redis 膨胀
- 多轮场景下,Policy 服务每轮更新 turn+1,前端拉取即可续聊
生产优化一:Locust 压测 5000 并发
为了验证“单实例 420 QPS”能否横向扩展到 5 k,我们用 Locust 写了一个 gRPC 压测脚本:
# locustfile.py from locust import User, task, between import grpc, intent_pb2, intent_pb2_grpc class DSUser(User): wait_time = between(0.1, 0.3) def on_start(self): channel = grpc.insecure_channel("ds-nlu.internal:50051") self.stub = intent_pb2_grpc.IntentStub(channel) @task(10) def ask_delivery(self): req = intent_pb2.Request(uid=str(uuid.uuid4()), text="我的快递什么时候到?") self.stub.Classify(req)在 K8s 集群起 20 个 Locust Pod,每秒递增加并发,结果:
- 2000 并发时平均延迟 125 ms
- 5000 并发时延迟 198 ms,CPU 70%,GPU 92%,无 5xx
- 再往上 GPU 先成为瓶颈,通过增加推理 Pod 数到 12 个,QPS 顶到 5.2 k,满足业务峰值 4.5 k 需求
生产优化二:超时重试的幂等性
客服场景经常遇到“用户重复点击”或“网络抖动重发”。我们在网关层加唯一 message_id,下游服务做幂等:
# idempotent_retry.py import redis, hashlib, json class RetryGuard: def __init__(self, redis_host): self.r = redis.Redis(host=redis_host, port=6379, db=1) def has_processed(self, msg_id: str) -> bool: """ 利用 Redis SETNX 原子检查,TTL 5 min 复杂度 O(1) """ key = f"ds:retry:{msg_id}" return self.r.set(key, 1, nx=True, ex=300) is None网关收到消息先生成message_id=md5(uid+timestamp+seq),再调下游。若has_processed=True直接返回上次结果,避免重复扣库存或重复发货。
避坑指南:三次踩坑,三次爬出
Nginx keepalive_timeout 默认 75 s,WebSocket 长连接被提前断开
→ 调到 180 s,同时 proxy_read_timeout 与 DS 心跳 30 s 对齐Redis 序列化用 pickle,版本升级后 Python 3.9→3.10 出现兼容异常
→ 统一用 protobuf + 版本号字段,支持向前兼容Gunicorn 同步 worker 数=2×CPU,结果 I/O 等待把 GPU 饿死
→ 改成 gevent worker,worker-connections=1000,GPU 利用率提升 35%
扩展思考
- 在 200 ms 延迟红线内,如何继续压榨模型精度?是否值得用动态 Early-exit BERT?
- 当业务扩充到多语言、多方言,意图标签体系如何自动对齐?
- 强化学习在线更新 Policy 时,如何防止“探索”把线上客服带歪?
下一版我们打算把蒸馏+量化再往前推一步,让 CPU 也能跑 150 QPS,把 GPU 留给更复杂的情感分析模型。踩坑还在继续,欢迎一起交流。