背景痛点:高并发下的“三宗罪”
去年双十一,公司客服通道被瞬间流量冲垮,核心问题集中在三点:
- 并发请求排队:高峰期 QPS 飙到 2k,单体 Flask 直接 502 503,用户看到“客服不在线”就流失。
- 多轮对话断片:用户问“我订单到哪了→那能改地址吗”,系统把第二句当新会话,答非所问,满意度掉到 62%。
- 意图识别不准:原规则引擎+关键词,新活动文案一换,命中率从 85% 跌到 43%,运营天天手工补 FAQ。
这三点叠加,客服成本不降反升,逼得我们必须在三个月内自研一套“能扛流量、能聊多轮、能秒懂”的对话系统。
技术选型:Rasa 很好,但我仍选了 TensorFlow+Flask
| 维度 | Rasa(2.x) | Dialogflow | 自研 TF+Flask |
|---|---|---|---|
| 中文预训练 | 需自己接 Bert | 谷歌内置,但中文弱 | 完全可控 |
| 私有部署 | 支持,重 | 不支持 | 轻量 Docker 化 |
| 并发性能 | 单进程 200 QPS | 谷歌托管,无感 | 优化后 2k+ QPS |
| 二开需求 | Python 动作器 | Webhook 限制 | 随意魔改 |
一句话总结:Rasa 在 NLU 上很省心,可我们要把“意图模型+策略+业务接口”全部揉进一个镜像,方便 K8s 弹性;TensorFlow 的 SavedModel 热加载 + Flask 的轻量级,刚好契合“训练—发布—扩容”一条龙。
于是拍板:自研。
核心实现
1. 基于 BERT 的意图分类模型
数据准备阶段,我们把历史 18 个月 42 万条对话清洗成 127 个意图,每条样本保留 128 字,用bert-base-chinese微调。
# train_intent.py import pandas as pd, tensorflow as tf, tensorflow_hub as hub from sklearn.model_selection import train_test_split from bert import bert_tokenization data = pd.read_csv('intent.csv') # text,label x_train, x_test, y_train, y_test = train_test_split( data['text'], data['label'], test_size=0.1, random_state=42) # 1. 建立 tokenizer FullTokenizer = bert_tokenization.FullTokenizer tokenizer = FullTokenizer(vocab_file='bert_base/vocab.txt') def encode(texts, max_len=128): ids, masks = [], [] for t in texts: tokens = tokenizer.tokenize(t)[:max_len-2] tokens = ['[CLS]']+tokens+['[SEP]'] ids.append(tokenizer.convert_tokens_to_ids(tokens) + [0]*(max_len-len(tokens))) masks.append([1]*len(tokens)+[0]*(max_len-len(tokens))) return tf.constant(ids), tf.constant(masks) train_ids, train_masks = encode(x_train) test_ids, test_masks = encode(x_test) # 2. 拼装模型 input_ids = tf.keras.Input(shape=(128,), dtype=tf.int32, name='input_ids') input_masks = tf.keras.Input(shape=(128,), dtype=tf.int32, name='masks') bert_layer = hub.KerasLayer('bert_base', trainable=True) pooled = bert_layer([input_ids, input_masks])['pooled_output'] drop = tf.keras.layers.Dropout(0.3)(pooled) out = tf.keras.layers.Dense(127, activation='softmax')(drop) model = tf.keras.Model([input_ids, input_masks], out) model.compile(loss='sparse_categorical_crossentropy', optimizer=tf.keras.optimizers.Adam(2e-5), metrics=['accuracy']) model.fit([train_ids, train_masks], tf.keras.utils.to_categorical(y_train), validation_data=([test_ids, test_masks], tf.keras.utils.to_categorical(y_test)), epochs=3, batch_size=32) model.save('intent_model/1') # SavedModel 格式,方便 tf.serving 热更训练完打包成intent_model,Flask 启动时直接tf.keras.models.load_model,省去 TensorFlow Serving 的 RPC 一跳,单机延迟降 18 ms。
2. Flask 异步优化:gevent 打满 CPU
Flask 默认同步,一请求一线程,I/O 等待时线程空转。我们采用 gevent 猴子补丁 + 协程池,把阻塞变非阻塞。
# app.py from gevent import monkey; monkey.patch_all() from flask import Flask, request, jsonify import tensorflow as tf, redis, json, time app = Flask(__name__) pool = redis.ConnectionPool(host='redis', max_connections=50) rdb = redis.Redis(connection_pool=pool) model = tf.keras.models.load_model('intent_model/1') @app.route('/chat', methods=['POST']) def chat(): uid = request.json['uid'] query = request.json['query'] # 1. 意图识别 ids, masks = encode([query]) # 复用训练时的 encode intent_id = model.predict([ids, masks]).argmax(-1)[0] # 2. 读取对话状态 key = f'dm:{uid}' state = json.loads(rdb.get(key) or '{}') # 3. 策略路由 & 回复生成(略) reply = policy_router(intent_id, state) # 4. 更新状态并回写,TTL 300s state['last_intent'] = int(intent_id) rdb.set(key, json.dumps(state, ensure_ascii=False), ex=300) return jsonify({'reply': reply, 'intent': int(intent_id)}) if __name == '__main__': from gevent.pywsgi import WSGIServer WSGIServer(('0.0.0.0', 5000), app, log=None).serve_forever()压测显示,同样 4C8G,同步模式 380 QPS 就打满,gevent 版本轻松到 1.8k,CPU 才 65%。
3. 对话状态机的 Redis 设计
多轮场景里,状态必须“可过期、可横向扩容、可序列化”。我们采用 JSON + TTL,字段极简:
last_intent上一意图slots已填槽位 dictconfirm待确认标记
序列化用json.dumps(..., ensure_ascii=False),压缩率虽不如 protobuf,但调试肉眼可读;TTL 统一 300 s,用户五分钟无交互自动销毁,内存稳定控制在 1.2 G(20 万并发)。
生产考量
1. 压力测试:Locust 脚本
# locustfile.py from locust import HttpUser, task, between class ChatUser(HttpUser): wait_time = between(0.5, 2) @task(10) def ask(self): self.client.post("/chat", json={ "uid": "u"+str(random.randint(1,1000000)), "query": "我的快递怎么还没到" })本地起 4 台 slave,单机 2 万并发,持续 10 min,99 分位延迟 120 ms,错误率 0.2%,满足上线要求。
2. 接口幂等 & 限流
- 幂等:在 header 带
X-Request-ID,Redis 记录 60 s 窗口,重复 ID 直接返回缓存结果,防用户疯狂点击。 - 限流:基于 Redis-Cell 的 CLF 漏斗,容量 100,每秒回充 50,突发流量削峰,超阈值返回
429,保护后端模型。
避坑指南
1. 模型冷启动:默认回复策略
凌晨发版时模型刚加载,置信度普遍低,我们设了“三级降级”:
- 置信度 >0.8 直接回答
- 0.4~0.8 走 FAQ 检索
- <0.4 返回“亲,转人工客服哦~”并附带工单入口
保证即使模型“懵”,用户也不被晾。
2. 对话超时:常见错误
很多团队把 TTL 设太长,导致用户第二天回来对话状态还在,机器人还问“请问订单号是多少?”——体验诡异。我们采用“短 TTL + 唤醒提示”:超时后首句先回“刚刚的会话已结束,请重新描述问题”,再清空状态。
3. 敏感词过滤:实时更新
运营每周上新活动,黑产也更新话术。我们把敏感词库放 Redis 的SET,Flask 启动时载入内存,并监听keyspace事件,一旦运营SADD新词,各实例 5 s 内重载,无需重启。
上线效果
灰度两周,核心指标:
- QPS 峰值 2.1k → 6.3k(弹性扩容 3 倍)
- 意图准确率 43% → 87%
- 平均响应 320 ms → 95 ms
- 客服人力节省 38%,双十一零宕机。
写在最后
整个项目最大感受:别迷信“开箱即用”,高并发场景里,任何一个同步阻塞点都会被流量放大成事故。把模型、策略、状态、运维全部捏进一条可回滚的 Docker 镜像,才是能睡安稳觉的底气。
开放性问题:如何平衡模型准确率与响应延迟的关系?当 batch 增大、层数加深,准确率涨了 2%,可 P99 延迟却多 60 ms,你会怎么选?欢迎留言聊聊你的实战取舍。