背景痛点:规则引擎为何扛不住“十万个为什么”
传统客服系统普遍采用“正则+关键词+决策树”三板斧,在固定 FAQ 场景下表现尚可,一旦遇到长尾问题立刻露馅:
- 意图泛化能力弱:用户把“我订单卡住了”换成“物流不动弹”,规则库就匹配不到。
- 多轮对话维护困难:状态变量散落在数百个 if-else 里,稍一改动就牵一发动全身。
- 知识更新慢:新活动规则上线,需要业务、产品、开发、测试四面会审,平均 3~5 天才能发版。
结果同样是 1000 并发,传统方案 CPU 飙到 90%,平均响应 2.3 s,用户直接暴走。
技术选型:RAG 与 Fine-tuning 并非二选一
| 维度 | RAG(检索增强生成) | Fine-tuning(微调) |
|---|---|---|
| 数据新鲜度 | 小时级更新 | 需重新训练+回滚 |
| 长尾知识 | 依赖检索质量 | 可内化到参数 |
| 幻觉风险 | 高(检索不到就编) | 中(见过就能答) |
| 成本 | 低(只训 Embedding) | 高(GPU+数据标注) |
最终采用“混合架构”:
- 高频标准问 → Fine-tuned 小模型(1.3 B)兜底,P99 延迟 < 200 ms。
- 长尾/活动问 → RAG 分支,实时拉取企业 Wiki,召回 Top5 后重排序。
- 两路结果做 Ensemble,置信度>0.85 直接返回答案,否则转人工。
核心实现:LangChain 状态机 + 异步缓存
1. 对话状态机设计
状态节点仅保留“业务语义”,不存储大段文本,减少序列化压力。
from enum import Enum, auto from typing import Dict, Optional class State(Enum): INIT = auto() # 刚接入 AWAIT_ORDER = auto() # 待提供订单号 AWAIT_ADDR = auto() # 待提供地址 ANSWER = auto() # 已给出答案 ESCALATE = auto() # 转人工 TRANSITIONS = { State.INIT: {"provide_order": State.AWAIT_ORDER, "greet": State.ANSWER}, State.AWAIT_ORDER: {"provide_order": State.ANSWER, "missing": State.ESCALATE}, State.AWAIT_ADDR: {"provide_addr": State.ANSWER, "missing": State.ESCALATE}, State.ANSWER: {"thanks": State.INIT, "new_question": State.INIT}, State.ESCALATE: {} # 终点 }2. 上下文缓存:Redis + LRU 双保险
- key 格式
cs:{session_id},value 使用 MessagePack 压缩。 - 本地维护 10 k 容量的 LRU 字典,命中失败再回 Redis,降低 35% 网络 IO。
import aioredis, msgpack, time from cachetools import LRU local_cache = LRU(maxsize=10_000) async def get_context(sid: str) -> list: # 本地 LRU 命中 if sid in local_cache: return local_cache[sid] # 回源 Redis redis = aioredis.from_url("redis://cluster") raw = await redis.get(f"cs:{sid}") if raw: data = msgpack.unpackb(raw) local_cache[sid] = data return data return [] # 无历史 async def set_context(sid: str, messages: list, ttl: int = 1800): packed = msgpack.packb(messages) local_cache[sid] = messages redis = aioredis.from_url("redis://cluster") await redis.set(f"cs:{sid}", packed, ex=ttl)3. 异步对话入口(含异常兜底)
import asyncio, uuid, time from langchain.llms import AsyncOpenAI from langchain.chains import ConversationalRetrievalChain llm_chat = AsyncOpenAI(model="ft:gpt-3.5-turbo", max_tokens=512) retriever = build_rag_retriever() # 自定义 ES+Embedding async def chat(request: dict) -> dict: """ 入口函数,返回 JSON 给前端。 1. 生成/复用 session_id 2. 加载上下文 → 构造 StateMachine 3. 路由到 FT 或 RAG 4. 更新上下文并回写 """ sid = request.get("session_id") or uuid.uuid4().hex try: hist = await get_context(sid) state = hist[-1].get("state", State.INIT) if hist else State.INIT user_utt = request["text"] # 简单意图分诊 if quick_intent(user_utt) == "long_tail": chain = ConversationalRetrievalChain.from_llm(llm_chat, retriever) answer = await chain.acall({"question": user_utt, "chat_history": hist}) bot_utt = answer["answer"] else: # 走微调模型 prompt = concat_prompt(hist, user_utt) bot_utt = await llm_chat.agenerate([prompt]) # 状态转移 new_state = TRANSITIONS[state].get(parse_action(user_utt), State.ESCALATE) hist.append({"role": "user", "content": user_utt, "state": state}) hist.append({"role": "bot", "content": bot_utt, "state": new_state}) await set_context(sid, hist) return {"session_id": sid, "answer": bot_utt, "state": new_state.name} except Exception as e: # 限流、超时、幻觉等统一降级 return {"session_id": sid, "answer": "系统繁忙,正在为您转接人工客服...", "state": State.ESCALATE.name}生产考量:压测、敏感词与脱敏
1. 压测数据(AWS c7g.xlarge,4 vCPU,8 G)
| 并发 | 平均延迟 | P99 延迟 | CPU | 备注 |
|---|---|---|---|---|
| 50 | 180 ms | 320 ms | 42% | 全走微调 |
| 200 | 260 ms | 510 ms | 78% | 30% 走 RAG |
| 300 | 430 ms | 1.2 s | 95% | 触发限流 |
目标 200 TPS 时,单实例可抗,留 30% 缓冲;超过 300 TPS 时 HPA 自动扩容。
2. 敏感词过滤
- 采用开源 DFA+拼音+拆字 三表合一,2 ms 内完成 8 k 词库扫描。
- 命中后不做直接拒绝,而是把敏感片段替换为
*,继续送模型,防止“误杀”导致投诉。
3. 数据脱敏
正则抽取手机号、身份证、银行卡,统一替换为掩码,再写日志:
import re mask_rules = { r"1[3-9]\d{9}": "mobile", r"\d{6}(19|20)\d{8}": "idcard" } def desensitize(text: str) -> str: for pattern, label in mask_rules.items(): text = re.sub(pattern, lambda m: f"[{label}]", text) return text避坑指南:会话 ID 与限流降级
1. 会话 ID 碰撞
- 前端生成 v4 UUID 仍可能重复,服务端再加 Snowflake worker-id 作为后缀,双保险。
- 写入 Redis 前用
SETNX EX做分布式锁,失败则回退到新建会话,避免串话。
2. 大模型 API 限流
- 提前压测拿到官方限流阈值(RPM/TPM),本地令牌桶预限流,留 10% 余量。
- 触发限流后,自动把请求写入 Kafka,延迟 5 s 后重试;重试 3 次仍失败则返回静态兜底答案。
互动环节:用户突然切换话题怎么办?
示例代码里状态机只在同一会话里顺序推进,如果用户正说到“ AWAIT_ADDR ”,突然问“你们周年庆几号开始”,状态仍卡在旧节点,模型可能给出驴唇不对马嘴的答案。
开放问题:你会如何改动状态机或 prompt,让智能体既能“记得旧话题”,又能“优雅切换到新话题”?欢迎在评论区贴出你的 PR 链接。
把以上模块拼装完,一个可灰度、可回滚、可扩缩的 LLM 客服智能体就上线了。先小流量 A/B,看转化率、转人工率、差评率三项指标,全部稳住后再全量。祝你部署顺利,日志常看,告警常关。