news 2026/6/26 2:05:56

AI 驱动的数据库内核:当学习型查询优化器遇上代价模型

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
AI 驱动的数据库内核:当学习型查询优化器遇上代价模型

AI 驱动的数据库内核:当学习型查询优化器遇上代价模型

一、优化器选错执行计划,比没有索引更可怕

生产环境一条核心报表 SQL,数据量从百万级增长到千万级后,查询耗时从 800ms 飙到 45 秒。EXPLAIN显示优化器选择了全表扫描而非索引范围扫描。原因:统计信息ndv(唯一值数量)严重失真,优化器基于错误代价估算做出了错误决策。

手动ANALYZE TABLE后恢复,但三天后问题复现。传统基于统计信息的代价模型,在高数据倾斜、高频写入场景下,统计信息滞后是结构性缺陷。AI 驱动的学习型优化器(Learned Query Optimizer)试图从历史执行反馈中学习,替代或增强传统代价估算,这正是本文要拆解的核心命题。

二、学习型查询优化器的架构与代价模型重构

2.1 传统代价模型的结构性缺陷

InnoDB 优化器的代价估算依赖mysql.innodb_table_statsmysql.innodb_index_stats,核心公式:

Cost = IO_Cost + CPU_Cost IO_Cost = 页面数 × 随机读取代价权重 CPU_Cost = 评估行数 × 条件评估代价权重

问题在于:评估行数来自统计信息采样,采样率默认仅 1/16 页,高倾斜列的ndv误差可达 10 倍以上。

2.2 学习型优化器的架构

学习型优化器的核心思路:用历史查询的实际执行数据(真实行数、真实 IO 次数、真实耗时)训练模型,预测新查询的代价。

flowchart LR A[SQL 输入] --> B[解析与 Plan 枚举] B --> C[特征提取] C --> D{代价预测模型} D -->|传统代价模型| E[统计信息估算代价] D -->|学习型代价模型| F[ML 模型预测代价] E --> G[代价融合加权] F --> G G --> H[选择最优 Plan] H --> I[执行并采集反馈] I --> J[反馈数据入训练集] J --> D

2.3 特征工程:SQL 如何变成模型输入

学习型优化器的关键在于特征提取。一条 SQL 需要编码为固定维度向量:

特征类别具体特征编码方式
表级特征行数、平均行长度、索引数量归一化数值
列级特征ndv、null_ratio、数据倾斜度归一化数值
谓词特征等值/范围/IN、选择性估计one-hot + 数值
Join 特征Join 类型、连接列 ndv 比率one-hot + 数值
Plan 特征扫描方式、Join 算法、排序方式one-hot

2.4 代价融合策略

生产环境不会直接用 ML 模型替换传统优化器,而是加权融合:

Final_Cost = α × Traditional_Cost + (1 - α) × Learned_Cost

α 初始值为 1.0(完全信任传统模型),随着反馈数据积累逐步降低。当模型置信度低于阈值时,自动回退到传统代价。

三、基于反馈学习的代价校准实现

3.1 执行反馈采集器

import time import threading import pymysql from collections import defaultdict from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple import numpy as np import logging logger = logging.getLogger(__name__) @dataclass class QueryFeedback: """单次查询执行反馈记录""" query_fingerprint: str # SQL 指纹, 归一化后的模板 estimated_rows: int # 优化器估算行数 actual_rows: int # 实际返回行数 estimated_cost: float # 优化器估算代价 execution_time_ms: float # 实际执行耗时(ms) scan_type: str # 扫描方式: full_scan / index_scan / range_scan plan_hash: str # 执行计划哈希 timestamp: float = field(default_factory=time.time) @property def estimation_error(self) -> float: """估算误差倍数, 越大说明统计信息越不准""" if self.estimated_rows == 0: return float('inf') return max(self.actual_rows / self.estimated_rows, self.estimated_rows / max(self.actual_rows, 1)) class FeedbackCollector: """执行反馈采集器, 通过 performance_schema 采集真实执行数据""" def __init__(self, mysql_config: dict, sample_rate: float = 0.1): self.mysql_config = mysql_config self.sample_rate = sample_rate # 采样率, 避免全量采集影响性能 self._feedback_buffer: List[QueryFeedback] = [] self._lock = threading.Lock() self._stop_event = threading.Event() def _get_connection(self): return pymysql.connect(**self.mysql_config) def collect_from_events_statements(self) -> List[QueryFeedback]: """从 performance_schema.events_statements_summary_by_digest 采集""" sql = """ SELECT DIGEST_TEXT, SUM_ROWS_EXAMINED, SUM_ROWS_AFFECTED, SUM_TIMER_WAIT / 1000000000 AS total_ms, COUNT_STAR, FIRST_SEEN, LAST_SEEN FROM performance_schema.events_statements_summary_by_digest WHERE DIGEST_TEXT IS NOT NULL AND SUM_ROWS_EXAMINED > 0 AND COUNT_STAR >= 5 ORDER BY SUM_TIMER_WAIT DESC LIMIT 200 """ feedbacks = [] try: with self._get_connection() as conn: with conn.cursor() as cur: cur.execute(sql) for row in cur.fetchall(): digest_text = row[0] rows_examined = row[1] rows_affected = row[2] exec_time_ms = row[3] / max(row[4], 1) # 平均单次耗时 count_star = row[4] # 采样: 只采集部分查询 if hash(digest_text) % 100 >= self.sample_rate * 100: continue feedback = QueryFeedback( query_fingerprint=digest_text[:200], estimated_rows=0, # 需要从 EXPLAIN 补充 actual_rows=rows_examined, estimated_cost=0.0, execution_time_ms=exec_time_ms, scan_type='unknown', plan_hash='', ) feedbacks.append(feedback) except pymysql.err.OperationalError as e: logger.error(f"采集反馈失败: {e}") return feedbacks def enrich_with_explain(self, feedback: QueryFeedback) -> Optional[QueryFeedback]: """用 EXPLAIN 补充优化器估算信息""" try: with self._get_connection() as conn: with conn.cursor() as cur: cur.execute(f"EXPLAIN {feedback.query_fingerprint}") explain_rows = cur.fetchall() if explain_rows: first_row = explain_rows[0] feedback.estimated_rows = first_row[9] or 0 # rows 列 feedback.scan_type = first_row[1] or 'unknown' # access_type return feedback except Exception as e: logger.warning(f"EXPLAIN 补充失败: {e}") return None def compute_correction_factor(self) -> Dict[str, float]: """计算各 SQL 指纹的代价校准因子""" with self._lock: buffer = list(self._feedback_buffer) # 按 SQL 指纹分组, 计算平均估算误差 grouped: Dict[str, List[float]] = defaultdict(list) for fb in buffer: if fb.estimation_error != float('inf'): grouped[fb.query_fingerprint].append(fb.estimation_error) correction = {} for fingerprint, errors in grouped.items(): if len(errors) >= 3: # 至少 3 次采样才计算校准因子 # 取中位数, 避免极端值干扰 correction[fingerprint] = float(np.median(errors)) return correction def start_background_collection(self, interval_seconds: int = 60): """启动后台采集线程""" def _worker(): while not self._stop_event.is_set(): try: new_feedbacks = self.collect_from_events_statements() with self._lock: self._feedback_buffer.extend(new_feedbacks) # 限制缓冲区大小, 防止内存溢出 if len(self._feedback_buffer) > 10000: self._feedback_buffer = self._feedback_buffer[-5000:] logger.info(f"采集到 {len(new_feedbacks)} 条反馈") except Exception as e: logger.error(f"后台采集异常: {e}") self._stop_event.wait(interval_seconds) thread = threading.Thread(target=_worker, daemon=True) thread.start() logger.info("反馈采集后台线程已启动") def stop(self): self._stop_event.set()

3.2 校准因子注入优化器

采集到校准因子后,通过 MySQL 8.0 的optimizer_switchengine_condition_pushdown等机制间接影响优化器决策,或通过 Proxy 层(如 ProxySQL)改写 SQL Hint:

-- 对已知估算偏差大的查询, 强制指定索引 SELECT /*+ INDEX(orders idx_create_time) */ order_id, amount FROM orders WHERE create_time BETWEEN '2025-01-01' AND '2025-01-31'; -- 或通过 session 级别调整代价权重 SET SESSION optimizer_switch = 'index_condition_pushdown=on';

更彻底的方案是在数据库代理层实现代价校准:Proxy 拦截 SQL,查询校准因子表,自动注入 Hint。

四、学习型优化器的现实边界与架构妥协

4.1 冷启动问题

模型训练需要足够的反馈数据。新上线的业务、数据量突变的场景,模型无历史数据可用。解决方案:用传统代价模型作为先验,模型置信度不足时自动回退。但回退阈值的选择本身就是一个需要调参的问题。

4.2 数据分布漂移

训练数据来自历史分布,当业务模式变化(如大促期间数据倾斜剧变),模型预测精度急剧下降。需要设置反馈数据的衰减窗口(如只使用最近 7 天数据训练),但这又与数据量需求矛盾。

4.3 推理延迟

ML 模型推理需要时间,即使轻量级模型(如 XGBoost)单次推理也在微秒级。对于执行时间本就小于 1ms 的简单查询,优化器开销占比过高。生产方案必须设置"短路"机制:简单查询(单表、无 Join、索引命中)跳过模型推理。

4.4 可解释性缺失

优化器选错 Plan 时,DBA 需要知道原因。传统代价模型可以逐步推演,ML 模型是黑盒。生产环境必须保留完整的代价计算日志,包括传统代价、学习代价、融合权重、最终决策依据。

4.5 禁用场景

  • 数据量小于 10 万行的小表:优化器本身很少选错
  • 高频简单点查(QPS > 10 万):推理开销不可接受
  • 强合规场景:审计要求优化器决策可追溯,黑盒模型无法满足

五、总结

AI 驱动的学习型优化器并非替代传统代价模型,而是在统计信息失真场景下的增强手段。其核心架构是"传统代价 + 学习代价"的融合策略,通过执行反馈采集、校准因子计算、代价加权融合三个环节形成闭环。生产落地的关键挑战不在模型精度,而在冷启动、数据漂移、推理延迟和可解释性四个工程问题。务实的路径是:先用反馈数据量化统计信息偏差,再逐步引入模型校准,始终保持传统优化器作为兜底。任何没有回退机制的 AI 优化器,都是生产事故的定时炸弹。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/26 2:03:16

Dioxus 表单处理:从输入、校验到文件上传,一条链路讲透

前言 前面几篇把 rsx!、Signal、组件、路由、桌面和 Server Functions 都铺了一遍。真到表单这里,Dioxus 才开始有点“干活”的味道。 因为 Demo 和项目之间,差的往往不是“再写一个组件”,而是这些具体问题: 用户输到一半&…

作者头像 李华
网站建设 2026/6/26 2:02:59

Go 网络编程实战:TCP 长连接服务的设计、粘包处理与连接池管理

Go 网络编程实战:TCP 长连接服务的设计、粘包处理与连接池管理一、TCP 长连接服务的工程挑战 在微服务架构中,服务间高频通信场景(如消息推送、实时数据同步、RPC 调用)通常采用 TCP 长连接,避免频繁握手的开销。但 TC…

作者头像 李华
网站建设 2026/6/26 2:01:32

模型量化实战:从 INT8 PTQ 到 GPTQ 的精度保持与推理加速全解析

模型量化实战:从 INT8 PTQ 到 GPTQ 的精度保持与推理加速全解析一、显存墙下的生死抉择:7B 模型在 16GB 显卡上的部署困局 LLaMA-2-7B 的 FP16 权重占 14GB 显存,加上 KV Cache 和运行时开销,至少需要 24GB 显存。但线上推理集群大…

作者头像 李华
网站建设 2026/6/26 2:00:49

FanControl终极指南:5分钟搞定Windows风扇控制与汉化设置

FanControl终极指南:5分钟搞定Windows风扇控制与汉化设置 【免费下载链接】FanControl.Releases This is the release repository for Fan Control, a highly customizable fan controlling software for Windows. 项目地址: https://gitcode.com/GitHub_Trending…

作者头像 李华
网站建设 2026/6/26 1:59:02

AI 代码审查工作流:从 Prompt 工程到自动化 Pipeline 的工程实践

AI 代码审查工作流:从 Prompt 工程到自动化 Pipeline 的工程实践一、代码审查的瓶颈:当人工 Review 成为交付效率的隐形天花板 在一个 20 人的前端团队中,日均产生约 30 个 Merge Request,每个 MR 平均涉及 200 行变更。按照行业推…

作者头像 李华
网站建设 2026/6/26 1:57:22

大模型推理加速:从 KV Cache 到连续批处理的工程优化全景

大模型推理加速:从 KV Cache 到连续批处理的工程优化全景一、当推理延迟遇上商业现实——大模型服务的性能瓶颈链 大模型推理的性能问题不是一个单纯的"慢"字可以概括的,它是一个由多个环节串联的瓶颈链,每个环节的优化策略截然不同…

作者头像 李华