背景痛点:高并发下的三座“慢”山
智能客服一旦接入 App、小程序、Web 三端,流量瞬间翻十倍,典型症状有三:
- 并发请求排队:传统同步线程池模型,一条对话占一条线程,高峰期线程数飙高,GC 抖动导致 RT 99 线从 400 ms 涨到 2 s。
- 多轮状态丢失:HTTP 无状态,每次请求带
session_id去数据库捞上下文,网络闪断或发布重启时,Redis 里 key 过期,用户被迫“从头开始”。 - 第三方 NLP 抖动:意图识别服务超时 1 s 即重试,结果雪崩,线程全部卡在重试,整站 502 报错。
一句话:不改造架构,客服先“崩溃”再“答非所问”。
架构设计:从“轮询”到“事件驱动”
| 维度 | 传统轮询 | 事件驱动+微服务 | |---|---|---|---| | 连接方式 | 前端短轮询 / 长轮询 | WebSocket+MQ | | 资源占用 | 高(线程阻塞) | 低(异步回调) | | 扩容粒度 | 整站扩容 | 按服务扩容 | | 故障隔离 | 单点爆炸 | 级联熔断 |
决策依据:
- 流量波峰波谷明显,微服务可单独扩缩“对话管理”与“意图识别”。
- 事件总线(Kafka/RabbitMQ)天然削峰,失败消息回队列,可重试但不堵主链路。
- 状态机服务无 WebSocket 连接负担,只负责“状态计算”,水平扩容无状态。
核心实现
1. 对话状态机(Python 3.11)
# dialog/state_machine.py import time from enum import Enum, auto from dataclasses import dataclass from cacheout import Cache # 本地 LRU,可替换为 Redis class State(Enum): START = auto() AWAIT_NAME = auto() AWAIT_PHONE = auto() END = auto() @dataclass class Context: state: State uid: str expire_at: float data: dict class StateMachine: CACHE_TTL = 300 # 5 min _cache = Cache(maxsize=10_000, ttl=CACHE_TTL) @classmethod def get_or_create(cls, uid: str) -> Context: ctx = cls._cache.get(uid) if ctx is None or time.time() > ctx.expire_at: ctx = Context(state=State.START, uid=uid, expire_at=time.time() + cls.CACHE_TTL, data={}) return ctx @classmethod def transition(cls, ctx: Context, intent: str): if ctx.state == State.START and intent == "greeting": ctx.state = State.AWAIT_NAME elif ctx.state == State.AWAIT_NAME: ctx.data["name"] = intent ctx.state = State.AWAIT_PHONE elif ctx.state == State.AWAIT_PHONE: ctx.data["phone"] = intent ctx.state = State.END cls._cache.set(ctx.uid, ctx)超时处理:Cache 自带 TTL,过期自动淘汰;也可在transition里主动del掉过期 key。
2. 集成 NLP 服务(Java 17,Spring WebFlux)
// service/NlpService.java @Service public class NlpService { private final WebClient client = WebClient.builder() .baseUrl("http://nlp-internal") .filter(ExchangeFilterFunction.ofRequestProcessor( Retry.onlyIf(ctx -> ctx.exception() instanceof TimeoutException) .fixedDelay(3, Duration.ofMillis(200)) .toReactorRetry())) .build(); public Mono<String> predict(String text) { return client.post() .uri("/intent") .bodyValue(Map.of("q", text)) .retrieve() .bodyToMono(String.class) .timeout(Duration.ofSeconds(1)) .onErrorReturn("default"); // 兜底 } }错误重试:利用 Reactor 的retry操作符,超时/5xx 自动重试 3 次,仍失败返回默认意图,避免阻塞主流程。
性能优化
1. 上下文存储选型
| 方案 | QPS(单实例) | 延迟 P99 | 备注 |
|---|---|---|---|
| 本地 LRU | 8 w | 0.3 ms | 进程重启丢失,适合无状态副本 |
| Redis + 连接池 | 4 w | 1.2 ms | 重启不丢,需考虑热 key 漂移 |
| Redis + 本地一级缓存 | 6 w | 0.5 ms | 双读,写穿透,推荐 |
落地时采用“本地 LRU + 异步写 Redis”双保险策略:读优先本地,miss 再回 Redis;写操作丢到队列异步刷盘,既保性能又保不丢。
2. 负载测试数据
硬件:4C8G 容器 * 10 副本,JMeter 模拟 5 k 并发长连接。
优化前后对比:
- 优化前:平均 RT 680 ms,QPS 3.2 k,CPU 85%,线程 800+。
- 优化后:平均 RT 120 ms,QPS 9.1 k,CPU 55%,线程 200。
关键动作:
- 将同步 Tomcat 换成 Netty WebFlux,IO 线程与业务线程分离。
- 状态机本地缓存命中率 96%,减少 2 次 Redis RTT。
- 引入 MQ 削峰,峰值从 12 k 降到 7 k,下游 NLP 副本数减半。
避坑指南
1. 分布式会话粘滞
WebSocket 连接默认粘滞到节点 A,若 A 重启,客户端重连到 B,此时状态在 A 的本地内存即丢失。
解决:
- 状态外置:全部放 Redis,节点本身无状态。
- 连接与状态分离:用一致性哈希环做
uid -> node映射,重启后客户端仍被 LB 导回原节点(K8s 可配sessionAffinity=ClientIP),给 30 s 优雅退出窗口把内存状态刷到 Redis。
2. 第三方 API 熔断
NLP 提供方偶发 2 s 延迟,拖垮整条链路。
Hystrix/Resilience4j 模板:
CircuitBreaker cb = CircuitBreaker.ofDefaults("nlp"); Supplier<String> decorated = CircuitBreaker .decorateSupplier(cb, () -> nlpService.blockingPredict(text)); Try<String> result = Try.ofSupplier(decorated) .recover(throwable -> "default");参数建议:失败率 50 % 即打开,休眠 10 s 后半开,单节点 20 并发限制。打开后快速失败,保证客服系统不被“慢”服务拖死。
代码规范小结
- Python 侧:PEP8 命名,行宽 88(Black),关键函数必写 docstring。
- Java 侧:Google Java Style,API 层用
Optional防 NPE,日志用 SLF4J + MDC 打uid便于链路追踪。 - 统一日志格式:
[%level][%X{uid}][%thread] %msg%n,方便 ELK 聚合。
延伸思考:客服 + RPA 超自动化
当客服确认用户意图为“退货”,可一键触发 RPA 机器人:
- 客服系统发送“退货事件”到 MQ;
- RPA 监听事件,自动登录 ERP 创建退货单;
- 执行完后再发“退货完成”事件,客服机器人主动告知用户快递单号。
架构改动点:
- 新增
rpa-connector微服务,只负责把事件转译为 RPA 指令,不耦合客服主流程。 - 事件格式采用 CloudEvents,方便未来对接多个 RPA 厂商。
- 需要额外存储“流程实例”状态,推荐直接复用现有状态机框架,只需新增状态节点。
如此,客服不再只是“回答问题”,而是“直接帮用户把事办完”,体验升级,也减少人工坐席投入。
把并发、状态、抖动三座山铲平后,智能客服才能在高峰期依旧“对答如流”。文中代码与压测数据均来自真实上线环境,可直接拷贝验证。下一步,不妨把 RPA 事件接入,试试让机器人“动口又动手”。