背景痛点:用户激增时的“三座大山”
去年公司做了一场直播带货,公众号粉丝从 3 万暴涨到 30 万,客服接口当天就“三连炸”:
- 并发崩溃:微信服务器一次性推 20 条消息,Flask 单进程直接 502
- 消息丢失:回调超时 5s 微信重试,结果重复下单,用户收到两条“已发货”
- 上下文断裂:分布式 3 台节点,用户上一句在 A 机器,下一句被 B 机器当成新会话,答非所问
痛定思痛,决定把客服系统重新设计,目标只有一个:5000 QPS 下不丢消息、不断会话、不加班救火。
技术选型:自建 NLP vs 扣子
| 维度 | 自建 NLP | 扣子(Coze) |
|---|---|---|
| 机器成本 | 8 台 16C32G GPU 节点,月 1.2w | 0,平台托管 |
| 人力成本 | 2 名算法+3 名后端,持续迭代 | 1 名后端,拖拽式配置 |
| 并发能力 | 实测 2200 QPS,再高压需扩容 | 官方 1w QPS,自动弹性 |
| 运维 | 自己做熔断、限流、发版 | 平台兜底 |
一句话总结:扣子在成本、弹性、迭代速度上碾压自建,我们只需专注“桥接微信”这一件事。
架构设计:三层结构 + 会话状态保持
系统拆成三层,全部无状态,方便水平扩展:
- 微信回调处理层:Nginx+Gunicorn+Flask,只做验签、去重、入队
- 扣子对话引擎层:调用 Coze Bot API,返回 Markdown 答案
- 业务逻辑层:订单查询、物流接口、人工转接,可插拔
会话状态保持方案:
- Redis Cluster 16 分片,单分片 8G,最大 4000w key
- key 格式
wx:{openid}:ctx,value 存{"session_id":"xxx","last_turn":unix_ts} - LRU 策略
maxmemory-policy allkeys-lru,保留 7 天自然过期
代码实现:Flask+Redis 桥接示例
项目结构:
coze_wechat/ ├── app.py ├── redis_client.py └── coze_client.py1. 统一依赖
pip install flask redis requests gevent2. Redis 客户端(线程安全)
# redis_client.py import redis.sentinel class RedisClient: """基于哨兵的 Redis 客户端,支持 Cluster 读写分离""" def __init__(self): self.sentinel = redis.sentinel.Sentinel( [("sentinel1", 26379), ("sentinel2", 26379)], socket_keepalive=True, socket_keepalive_options={} ) self.r = self.sentinel.master_for("mymaster", socket_timeout=0.2) def get(self, key): return self.r.get(key) def setex(self, key, ttl, value): return self.r.setex(key, ttl, value) def exists(self, key): return self.r.exists(key) redis_cli = RedisClient()3. 扣子客户端
# coze_client.py import requests, os, json BOT_ID = os.getenv("COZE_BOT_ID") PAT = os.getenv("COZE_PAT") # 个人访问令牌 def chat(session_id, user_query): """ 调用 Coze Bot,返回 Markdown 文本 :param session_id: 微信 openid 或自定义会话 id :param user_query: 用户原文 :return: str, 机器人回答 """ url = f"https://api.coze.com/open_api/v2/chat" headers = {"Authorization": f"Bearer {PAT}"} payload = { "bot_id": BOT_ID, "user": session_id, "query": user_query, "stream": False } r = requests.post(url, json=payload, timeout=3) r.raise_for_status() msg = r.json()["messages"][-1]["content"] return msg4. Flask 桥接层(含幂等去重)
# app.py from flask import Flask, request, make_response import xml.etree.ElementTree as ET import hashlib, time, json from redis_client import redis_cli from coze_client import chat app = Flask(__name__) TOKEN = "your_wechat_token" def verify_signature(signature, timestamp, nonce): tmp = sorted([TOKEN, timestamp, nonce]) return hashlib.sha1("".join(tmp).encode()).hexdigest() == signature def parse_xml(body): root = ET.fromstring(body) return {child.tag: child.text for child in root} @app.route("/wechat", methods=["GET", "POST"]) def wechat_entry(): # 1. 验签 if request.method == "GET": echostr = request.args.get("echostr", "") return echostr if verify_signature(**request.args) else "fail" # 2. 解析微信 XML data = parse_xml(request.data) openid = data["FromUserName"] msg_id = data["MsgId"] # 3. 幂等去重 dup_key = f"dup:{msg_id}" if redis_cli.exists(dup_key): return "success" redis_cli.setex(dup_key, 3600, "1") # 4. 取上下文 session ctx_key = f"wx:{openid}:ctx" ctx = json.loads(redis_cli.get(ctx_key) or "{}") session_id = ctx.get("session_id", openid) # 5. 调用扣子 try: answer = chat(session_id, data["Content"]) except Exception as e: answer = "系统开小差,请稍后再试" app.logger.exception(e) # 6. 更新上下文 ctx["last_turn"] = int(time.time()) redis_cli.setex(ctx_key, 7*24*3600, json.dumps(ctx)) # 7. 返回微信 XML xml_tpl = """ <xml> <ToUserName><![CDATA[{to}]]></ToUserName> <FromUserName><![CDATA[{frm}]]></FromUserName> <CreateTime>{ts}</CreateTime> <MsgType><![CDATA[text]]></MsgType> <Content><![CDATA[{content}]]></Content> </xml> """.format(to=openid, frm=data["ToUserName"], ts=int(time.time()), content=answer) response = make_response(xml_tpl) response.content_type = "application/xml" return response if __name__ == "__main__": app.run(host="0.0.0.0", port=80)关键点:
- 所有 I/O(Redis、Coze)都带超时,防止协程堆积
- 使用
msgId做幂等,Redis 1h 过期,兼顾内存与重试窗口 - 会话
session_id默认用openid,后续可扩展子账号
生产建议:冷启动、安全、监控
冷启动优化
Coze 实例 30 分钟无对话会被回收,首次请求延迟 2~3s。解决:- 每天 08:00 定时任务调一次“你好”空对话,保持实例温热
- 或者开启 Coze 的“常驻模式”(付费版功能)
安全防护
- 微信回调 IP 白名单:Nginx 层
allow 101.226.103.0/24; deny all; - 请求签名验证:代码已示例,务必开启
- 扣子 PAT 定期轮换,存 Kubernetes Secret,不落地代码
- 微信回调 IP 白名单:Nginx 层
监控指标
- 响应延迟 P99:Prometheus 埋点
histogram_observe(coze_latency_seconds) - 对话中断率:用户连续两次间隔>30min 视为中断,每日报表
- Redis 命中率、evicted_keys:低于 90% 立即扩容内存
- 响应延迟 P99:Prometheus 埋点
延伸思考:多公众号租户隔离
当 SaaS 化服务 100 个公众号时,需要数据隔离与成本复用:
- 每个租户分配独立
namespace,Redis key 加前缀 - 扣子侧用“工作空间”隔离,Bot 复用同一底座,仅知识库分离
代码示例:
def tenant_key(tenant_id, openid, suffix="ctx"): """租户隔离的 Redis key""" return f"t{tenant_id}:wx:{openid}:{suffix}" # 使用 ctx_key = tenant_key(tenant_id, openid)- 好处:批量删除
t{tid}:*即可清租户数据 - 注意:Coze 的
session_id同样拼接tenant_id,防止串号
未来可拓展:
- 按租户做限流(令牌桶)
- 按租户做灰度发布,先给 10% 公众号上新模型
踩坑两周,上线一月,这套方案扛住了 6·18 高峰。扣子把 NLP 的脏活累活揽走,我们只写 200 行胶水代码,就能让 5000 QPS 的客服系统稳稳跑在 2C4G 的小水管上——真香。