基于RAGFlow的智能客服问答系统:从架构设计到性能优化实战
背景痛点:传统客服的“三慢”顽疾
做ToB SaaS客服平台三年,最怕听到客户吐槽“你们机器人答非所问”。
传统FAQ-bot的通病可以总结成“三慢”:
- 知识更新慢:运营同学改一次Excel,研发再导库,上线至少两天,热点政策早就变了。
- 长尾响应慢:越冷门的问题越依赖关键字匹配,一旦没命中就转人工,排队10分钟起步。
- 语义理解慢:同义词、口语化表达全靠穷举,维护成本指数级上升。
去年618大促,我们峰值QPS 3.2k,人工坐席溢出率飙到47%,老板直接拍桌子:两周内必须上语义升级方案。于是把视线投向了RAG(Retrieval-Augmented Generation)。
技术选型:为什么不是BERT+FAQ、也不是Fine-tune LLM?
对比实验在内部沙箱跑了7天,结论一句话:
- BERT+FAQ:召回率92%,但更新需重训分类器,依旧“两天上线”。
- Fine-tune LLM:生成效果惊艳,可10w条领域数据才收敛,GPU账单5w+/月。
- RAGFlow:检索与生成解耦,知识库10分钟级增量更新,成本≈LLM的1/5。
RAGFlow还把Milvus、Faiss、Elasticsearch等组件做了DSL封装,对我们这种“运维人力紧张”的小团队极度友好,于是直接All-in。
架构设计:一张图看懂数据流
先上图,再拆解:
- 网关层:Nginx+Lua做灰度分流,按uid哈希到不同版本RAG链。
- 检索模块:
- 语义编码:bge-small-zh-v1.5,768维,延迟<35ms。
- 向量库:Milvus 2.3,分区按“业务线+版本”做软隔离,方便回滚。
- 精排:用向量相似度计算score1,再叠加BM25的score2,加权融合。
- 生成模块:
- 提示模板=“系统人设+检索结果+用户历史3轮”,token控制在2k以内。
- 大模型:Qwen-14B-Chat-int4,单卡A10G可跑,TPS≈8。
- 缓存层:
- Redis缓存“同一知识版本+问题指纹”的生成结果,TTL=300s,命中率38%。
- 反馈闭环:
- 用户点“解决/未解决”落Kafka,离线负采样做困难例挖掘,次日晨跑批处理更新索引。
核心实现:代码直接能跑
以下片段来自生产仓库,已脱敏,可直接粘进IDE跑通。
1. 知识库构建(含chunk、embedding、写入)
# kb_builder.py import os, json, time, logging from ragflow import DocumentSet, EmbeddingEncoder from concurrent.futures import ThreadPoolExecutor, as_completed logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") def chunk_text(text: str, max_len=384, overlap=64): """按标点+长度双重切片,避免截断语义""" import re split_re = re.compile(r'[。!?;]') sentences = split_re.split(text) buf, chunks = "", [] for sent in sentences: if len(buf + sent) < max_len: buf += sent else: if buf := buf.strip(): chunks.append(buf) buf = sent if buf: chunks.append(buf) return chunks def build_kb(file_path: str, ds: DocumentSet, encoder: EmbeddingEncoder, batch=200): """多线程批量写入,IO与CPU解耦""" tasks, cnt = [], 0 with open(file_path, encoding="utf-8") as f: for line in f: data = json.loads(line) chunks = chunk_text(data["content"]) for chk in chunks: tasks.append((chk, {"source": data["id"]})) if len(tasks) >= batch: _flush(ds, encoder, tasks) tasks.clear() if tasks: _flush(ds, encoder, tasks) logging.info("kb build done, total chunks=%s", ds.count()) def _flush(ds, encoder, tasks): texts = [t[0] for t in tasks] embeddings = encoder.encode(texts) # shape=[batch,768] ds.upsert(records=[{"text": t[0], "vec": e, "meta": t[1]} for t, e in zip(tasks, embeddings)]) if __name__ == "__main__": encoder = EmbeddingEncoder("bge-small-zh") ds = DocumentSet("cs_kb_v3") # 对应Milvus collection build_kb("qa_corpus.jsonl", ds, encoder)2. 在线问答接口(含异常、日志、缓存)
# rag_service.py from flask import Flask, request, jsonify from ragflow import Retriever, Generator from redis import Redis import hashlib, time, logging app = Flask(__name__) redis_cli = Redis(decode_responses=True) ret = Retriever("cs_kb_v3") # 绑定同一张量库 gen = Generator("qwen-14b-chat") # 本地vLLM推理 def make_key(q, kv_version="v3"): """生成问题指纹""" return f"rag:{kv_version}:" + hashlib.md5(q.encode()).hexdigest() @app.route("/ask", methods=["POST"]) def ask(): st = time.time() question = request.json.get("question", "").strip() history = request.json.get("history", []) if not question: return jsonify({"code": 400, "msg": "empty question"}), 400 try: # 1. 缓存命中 key = make_key(question) if ans := redis_cli.get(key): logging.info("cache hit, key=%s", key) return jsonify({"answer": ans, "latency": time.time()-st, "source": "cache"}) # 2. 检索top5 docs = ret.retrieve(question, topk=5, score_threshold=0.55) contexts = [d["text"] for d in docs] # 3. 生成 prompt = format_prompt(question, contexts, history) answer = gen.generate(prompt, max_tokens=512, temperature=0.3) # 4. 写缓存 & 日志 redis_cli.setex(key, 300, answer) logging.info("rag ok, q=%s, latency=%.2f, docs=%s", question, time.time()-st, len(docs)) return jsonify({"answer": answer, "latency": time.time()-st, "source": "rag"}) except Exception as e: logging.exception("rag error") return jsonify({"code": 500, "msg": "internal error"}), 500 def format_prompt(q, ctxs, hist): """极简模板,token=hist+ctxs+q < 2k""" hist_str = "\n".join([f"User:{h['q']}\nAssistant:{h['a']}" for h in hist[-3:]]) ctx_str = "\n".join([f"[{i+1}] {c}" for i, c in enumerate(ctxs)]) return f"你是客服机器人,请依据以下资料回答问题:\n{ctx_str}\n历史对话:\n{hist_str}\n用户:{q}\n助理:"性能优化:把延迟压到400ms以内
上线第一版平均延迟900ms,老板一句“不如人工快”直接打回。我们做了三轮压测,最终P99<400ms,关键动作如下:
索引并行化
Milvus的index_file_size=1024MB,单次build CPU打满,把segment_row_limit降到512k,并给create_index开nprobe=32并行度,8核机器索引时间从45min缩到7min。模型量化+连续批处理
vLLM支持--quantization int4,显存占用减半;同时打开--max-num-seqs 256,把单卡吞吐从5.6提到8.2 req/s,延迟反而降了15%。预检索过滤
业务线彼此独立,先在元数据里加product_id字段,利用Milvus的partition_key做剪枝,把候选池从200w降到5w,向量相似度计算耗时从120ms降到18ms。缓存分层
除了Redis,再加一层本地Caffeine(基于Guava),TTL=30s,命中率又多12%,对超高频“密码怎么改”类问题极有效。
避坑指南:生产级血泪总结
切片粒度过细→召回冗余
早期按128字符切,结果top5里同一段出现3次,用户直呼啰嗦。把overlap调为max_len*1/6,并加“句子边界”正则后,冗余率从34%降到7%。时间字段没同步→答案过期
政策库带生效时间,第一次没把effective_date写进meta,检索到去年答案,被投诉“误导”。后在Retriever层加filter="effective_date<=today",日更脚本同步T+0。高并发下Milvus OOM
默认cache_insert_data=true,写入高峰把内存吃满。关闭该参数并调queryNode.memory=60%,同时把retriever的batch_search拆成4次并发,内存稳在70%以下。生成“幻觉”甩锅给检索
用户问“能否退款”,模型答“可以全额退”,实际需满足7天无理由。解决方式:在提示模板里加“若资料未提及,请回答‘暂无相关信息’”,并给contexts标号,让模型引用编号,减少自由发挥。日志没脱敏→泄露手机号
开发期把请求全文打印,被安全扫描揪出。统一用logging.Filter把1\d{10}替换为1*********,并关闭debug级别,合规通过。
安全考量:数据隐私与访问控制
- 向量库隔离:敏感企业数据单独建
database,Milvus支持RBAC,检索服务启动时只读授权账号。 - 传输加密:Nginx层强制TLS1.3,内网服务间用mTLS双向校验,杜绝明文嗅探。
- prompt攻击防护:在
Generator层加正则黑名单,含“忽略前面”“转为中文”等指令直接拦截。 - 审计留痕:把
user_id+question+answer_hash写进ES,保留30天,方便合规部门抽查。
开放性问题
目前我们仅支持3轮历史,如果要做多轮对话状态跟踪(DST),你会如何把“用户意图槽位”与RAG的检索结果融合?欢迎一起探讨。