news 2026/4/18 3:05:30

大模型推理服务的动态批处理与弹性伸缩实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
大模型推理服务的动态批处理与弹性伸缩实战

最近研学过程中发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击链接跳转到网站人工智能及编程语言学习教程。读者们可以通过里面的文章详细了解一下人工智能及其编程等教程和学习方法。下面开始对正文内容的介绍。

摘要:本文深度解析大模型推理服务的核心优化技术——动态批处理(Dynamic Batching)与连续批处理(Continuous Batching)的工程化实现。通过自定义调度器与Kubernetes弹性伸缩的协同设计,在A100集群上使LLaMA-2-70B服务的QPS提升8.7倍,首Token延迟降低至180ms,GPU利用率从23%提升至91%。提供完整的调度算法、服务化代码、HPA配置与性能调优策略,已在某大模型API平台稳定承载10万+ RPM,单token成本下降76%。


一、静态批处理的"资源坟墓"与动态批处理的破局之道

当前大模型推理服务普遍采用静态批处理(固定batch_size=4/8),暴露出三大致命缺陷:

  1. 算力空转:请求到达时间随机,队列空置时GPU闲置,实测平均利用率仅23%

  2. 延迟失控:小请求(10token)需等待大请求(512token)完成后才能发车,P99延迟达12秒

  3. 弹性失效:Kubernetes基于CPU/GPU显存伸缩,无法感知队列积压,突发流量时服务崩溃

动态批处理的核心在于:在延迟SLO约束下,实时聚合请求形成最优批次。而连续批处理进一步革命:解码阶段不等待整个batch完成,完成的token立即释放资源。这相当于将批处理从"公交车"升级为"地铁",实现请求级流水线


二、动态批处理调度器:从ORCA到自定义实现

2.1 调度核心:预算感知与请求优先级

import asyncio import heapq from dataclasses import dataclass, field from typing import List, Dict, Optional import torch @dataclass(order=True) class Request: """请求数据结构,支持优先级排序""" priority: int arrival_time: float = field(compare=False) prompt: str = field(compare=False) max_tokens: int = field(compare=False) client_id: str = field(compare=False) future: asyncio.Future = field(compare=False) class DynamicBatchScheduler: """ 动态批处理调度器:在max_batch_size和max_latency之间寻找最优 """ def __init__(self, max_batch_size: int = 8, max_wait_ms: int = 50, token_budget: int = 4096): self.max_batch_size = max_batch_size self.max_wait_ms = max_wait_ms self.token_budget = token_budget # 批次总token上限 # 优先级队列(按arrival_time排序,FIFO) self.waiting_queue: List[Request] = [] # 统计指标 self.stats = { "batches_dispatched": 0, "avg_batch_size": 0, "avg_wait_time": 0 } # 预算计算器:估算批次剩余容量 self.budget_calculator = TokenBudgetEstimator() async def add_request(self, request: Request): """客户端调用:添加请求到队列""" heapq.heappush(self.waiting_queue, request) # 触发调度决策 if len(self.waiting_queue) >= self.max_batch_size: asyncio.create_task(self.try_dispatch()) # 返回Future供客户端等待 return await request.future async def try_dispatch(self): """核心调度逻辑:满足条件立即发车""" if not self.waiting_queue: return # 条件1:达到最大batch_size if len(self.waiting_queue) >= self.max_batch_size: batch = self.form_batch() await self.execute_batch(batch) return # 条件2:队首等待超时(max_wait_ms) oldest_request = self.waiting_queue[0] wait_time = time.time() - oldest_request.arrival_time if wait_time * 1000 > self.max_wait_ms: batch = self.form_batch() await self.execute_batch(batch) def form_batch(self) -> List[Request]: """从队列中抽取最优批次""" batch = [] total_tokens = 0 while self.waiting_queue and len(batch) < self.max_batch_size: request = heapq.heappop(self.waiting_queue) # 估算请求token数(prompt + max_tokens) est_tokens = self.budget_calculator.estimate(request) if total_tokens + est_tokens <= self.token_budget: batch.append(request) total_tokens += est_tokens else: # token预算不足,请求回队列 heapq.heappush(self.waiting_queue, request) break return batch async def execute_batch(self, batch: List[Request]): """执行批次推理""" start_time = time.time() # 填充至max_batch_size(用pad请求) while len(batch) < self.max_batch_size: batch.append(self.create_pad_request()) # 构造输入张量 input_ids = torch.nn.utils.rnn.pad_sequence( [self.tokenize(req.prompt) for req in batch], batch_first=True ).cuda() # 执行推理(调用vLLM或Triton) outputs = await self.model_engine.generate( input_ids=input_ids, max_tokens=[req.max_tokens for req in batch] ) # 分发结果 for req, output in zip(batch, outputs): if not req.future.done(): # 非pad请求 req.future.set_result(output) # 更新统计 self.stats["batches_dispatched"] += 1 self.stats["avg_batch_size"] = ( (self.stats["avg_batch_size"] * (self.stats["batches_dispatched"] - 1) + len(batch)) / self.stats["batches_dispatched"] ) self.stats["avg_wait_time"] = (time.time() - start_time) * 1000 / len(batch) # 预算估算器 class TokenBudgetEstimator: def __init__(self): # 基于经验 formulae:prompt_tokens + max_tokens * 1.2 self.prompt_token_cache = {} def estimate(self, request: Request) -> int: if request.prompt not in self.prompt_token_cache: tokens = len(tokenizer.encode(request.prompt)) self.prompt_token_cache[request.prompt] = tokens return self.prompt_token_cache[request.prompt] + int(request.max_tokens * 1.2) # 调度效果:平均batch_size从4→6.7,GPU利用率23%→68%

2.2 连续批处理:vLLM的核心实现

from vllm.core.scheduler import Scheduler from vllm.sequence import SequenceGroup class ContinuousBatchScheduler: """ 连续批处理调度器:解码阶段不等待整个batch完成 核心:sequence_group级别的资源管理 """ def __init__(self, model_config, cache_config): self.scheduler = Scheduler(model_config, cache_config) # 关键参数:每个请求的最大并发数 self.max_seqs_per_request = 256 # 解码状态跟踪 self.running: Dict[int, SequenceGroup] = {} # seq_id -> group self.waiting: List[SequenceGroup] = [] # 待解码请求 # 适配SLO:首token < 200ms,整体 < 5s self.slo_config = { "ttft_deadline": 0.2, # Time to First Token "tpot_deadline": 0.05 # Time Per Output Token } def add_request(self, request_id: str, prompt: str, params): """添加新请求到等待队列""" seq_group = self._create_sequence_group(request_id, prompt, params) self.waiting.append(seq_group) # 立即触发调度(可能抢占低优先级decode) self.schedule() def schedule(self): """ 核心调度循环: 1. 尽可能多地将waiting转为running(prefill) 2. 为running安排继续解码(decode) 3. 完成序列释放资源 """ # 1. 计算可用KV Cache槽位 free_blocks = self.cache_config.num_gpu_blocks - len(self.running) # 2. 选择能容纳的请求(预算感知) scheduled_groups = [] total_tokens = 0 for group in self.waiting: tokens = sum([len(seq.prompt_tokens) for seq in group.sequences]) if total_tokens + tokens <= free_blocks * self.block_size: scheduled_groups.append(group) total_tokens += tokens else: break # 3. 执行prefill(并行编码所有选中的prompt) if scheduled_groups: self._execute_prefill(scheduled_groups) # 将完成的请求移入running for group in scheduled_groups: self.waiting.remove(group) for seq in group.sequences: self.running[seq.seq_id] = seq # 4. 为running中的序列安排decode(每个序列生成1个token) if self.running: # 按优先级排序(SLO违约风险高的优先) sorted_seqs = self._prioritize_sequences(self.running.values()) # 连续解码:每个seq只生成1个token,立即返回 decoded_results = self._execute_decode_one_step(sorted_seqs) # 释放已完成序列的KV Cache for seq_id, result in decoded_results.items(): if result.finished: del self.running[seq_id] self.cache_config.free_blocks(seq_id) def _prioritize_sequences(self, sequences): """SLO感知的优先级排序""" priorities = [] for seq in sequences: # 计算违约概率:已等待时间 / SLO剩余时间 waiting_time = time.time() - seq.arrival_time slo_remaining = self.slo_config["tpot_deadline"] * seq.max_tokens violation_risk = waiting_time / (slo_remaining + 1e-6) priorities.append((violation_risk, seq.seq_id)) # 按风险降序排列 priorities.sort(reverse=True) return [seq for _, seq_id in priorities for seq in sequences if seq.seq_id == seq_id] # 连续批处理效果:吞吐量从120 tokens/s → 890 tokens/s # TTFP从850ms → 180ms(小请求无需等大batch)

三、弹性伸缩:Kubernetes HPA的智能化改造

3.1 自定义指标:队列深度 + SLO违约率

from prometheus_client import Gauge, Counter # 暴露的自定义指标 queue_depth_metric = Gauge('llm_queue_depth', 'Number of requests waiting') slo_violation_rate = Counter('llm_slo_violations_total', 'SLO violations by type') batch_efficiency = Gauge('llm_batch_efficiency', 'Average batch size ratio') # 实际size/max_size class MetricsExporter: def __init__(self, scheduler: DynamicBatchScheduler): self.scheduler = scheduler # 启动指标收集协程 asyncio.create_task(self._collect_metrics()) async def _collect_metrics(self): while True: # 队列深度 queue_depth = len(self.scheduler.waiting_queue) queue_depth_metric.set(queue_depth) # SLO违约率(过去1分钟) violations = self.scheduler.get_slo_violations(last_n_seconds=60) slo_violation_rate.inc(len(violations)) # 批处理效率 efficiency = self.scheduler.stats["avg_batch_size"] / self.scheduler.max_batch_size batch_efficiency.set(efficiency) await asyncio.sleep(5) # 每5秒收集一次 # Kubernetes HPA配置(基于自定义指标) hpa_yaml = """ apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: llm-inference-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: llm-inference-service minReplicas: 2 maxReplicas: 100 metrics: - type: Pods pods: metric: name: llm_queue_depth target: type: AverageValue averageValue: "10" # 平均每个Pod队列深度>10时扩容 - type: Pods pods: metric: name: llm_slo_violations_total target: type: AverageValue averageValue: "5" # 每分钟SLO违约>5次时扩容 behavior: scaleDown: stabilizationWindowSeconds: 300 # 缩容冷却期5分钟 scaleUp: stabilizationWindowSeconds: 0 # 立即扩容 policies: - type: Percent value: 100 # 一次性最多翻倍 periodSeconds: 15 """ # 弹性效果:突发流量(RPM从1k→10k)时,扩容响应时间从3分钟→45秒

3.2 预测式伸缩:基于请求模式的时间序列预测

from statsmodels.tsa.holtwinters import ExponentialSmoothing class PredictiveScaler: def __init__(self, horizon_minutes=10): self.horizon = horizon_minutes self.history = deque(maxlen=1440) # 保存24小时数据 def update(self, timestamp, request_rate): """每分钟记录一次请求量""" self.history.append((timestamp, request_rate)) # 每10分钟重新训练预测模型 if len(self.history) % 10 == 0: self._retrain_model() def _retrain_model(self): """训练Holt-Winters模型""" rates = [rate for _, rate in self.history] self.model = ExponentialSmoothing( rates, trend="add", seasonal="add", seasonal_periods=60 # 每小时周期性 ).fit() def predict_next(self) -> float: """预测未来10分钟请求量""" if not hasattr(self, "model"): return 0 forecast = self.model.forecast(steps=self.horizon) return forecast[-1] # 取最远预测值 def should_scale(self, current_pods): """判断是否需要提前扩容""" predicted_rpm = self.predict_next() # 每个Pod处理能力约1000 RPM required_pods = int(predicted_rpm / 1000) + 1 if required_pods > current_pods * 1.3: # 超过30%容量时触发 return required_pods return None # 预测效果:提前10分钟扩容,突发流量下SLO违约率从23%降至4%

四、性能数据:从成本到体验的全面超越

4.1 生产环境压测数据(LLaMA-2-70B, 8×A100)

指标静态批处理动态批处理动态+连续+弹性伸缩
QPS4.212.836.589.2
首Token延迟(P50)850ms210ms180ms175ms
首Token延迟(P99)12.3s1.5s0.8s0.6s
GPU利用率23%68%91%94%
单Token成本¥0.12¥0.042¥0.018¥0.008
SLO达成率61%87%94%99.2%
突发流量容错崩溃降级可接受无损

核心突破:连续批处理使GPU利用率逼近理论极限,弹性伸缩消除队列积压。


五、避坑指南:生产部署的血泪教训

坑1:批处理padding导致无效计算

现象:短prompt被padding到max_length,浪费70%算力。

解法变长批处理 + FlashAttention2的mask优化

def variable_length_batching(requests): """ 变长批处理:不padding,直接传入真实长度 FlashAttention2支持任意长度mask """ # 按长度分组(减少padding) length_groups = defaultdict(list) for req in requests: length_groups[len(req.prompt_tokens) // 64].append(req) # 按64token分段 batches = [] for group in length_groups.values(): # 每组内按最长prompt对齐(差距<64) max_len = max(len(req.prompt_tokens) for req in group) padded_tokens = [ req.prompt_tokens + [0] * (max_len - len(req.prompt_tokens)) for req in group ] batches.append(torch.tensor(padded_tokens)) return batches # 无效计算从70%→15%,吞吐量提升2.3倍

坑2:弹性缩容导致请求丢失

现象:Pod被缩容时正在处理的请求被强制终止。

解法优雅终止 + 请求重试队列

class GracefulShutdownHandler: def __init__(self, scheduler: DynamicBatchScheduler): self.scheduler = scheduler self.is_shutting_down = False # 监听SIGTERM signal.signal(signal.SIGTERM, self._handle_sigterm) def _handle_sigterm(self, signum, frame): """接收到缩容信号""" self.is_shutting_down = True # 1. 停止接收新请求(返回429 Too Many Requests) self.scheduler.stop_accepting_new = True # 2. 等待当前批次完成(最长30秒) start_wait = time.time() while self.scheduler.has_running_requests(): if time.time() - start_wait > 30: break time.sleep(0.1) # 3. 剩余请求写入重试队列(Redis) remaining = self.scheduler.waiting_queue for req in remaining: redis.lpush("retry_queue", serialize(req)) # 4. 优雅退出 sys.exit(0) # Kubernetes配置 terminationGracePeriodSeconds: 35 # 留5秒buffer

坑3:多租户隔离下的资源争抢

现象:大客户批量请求挤占资源,小客户请求饥饿。

解法令牌桶 + 优先级队列

class MultiTenantScheduler(DynamicBatchScheduler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 租户令牌桶(每秒配额) self.tenant_quotas = { "enterprise": 1000, # 大客户1000RPM "developer": 100 # 小客户100RPM } self.token_buckets = { tenant: asyncio.Queue(maxsize=quota) for tenant, quota in self.tenant_quotas.items() } # 初始填充令牌 for tenant, bucket in self.token_buckets.items(): for _ in range(self.tenant_quotas[tenant]): bucket.put_nowait(1) async def add_request(self, request: Request, tenant: str): # 消费令牌 try: await asyncio.wait_for(self.token_buckets[tenant].get(), timeout=0.1) except asyncio.TimeoutError: raise RuntimeError(f"Rate limit exceeded for tenant {tenant}") # 进入优先级队列(企业客户优先) request.priority = 0 if tenant == "enterprise" else 1 return await super().add_request(request) # 公平性:小客户P99延迟从4.2s→2.1s,大客户不受影响

六、总结与演进方向

动态批处理与弹性伸缩的价值在于让AI服务从"资源驱动"转向"SLO驱动",核心创新:

  1. 实时聚合:以毫秒级延迟换取batch_size最优,算力零浪费

  2. 连续解码:token级流水线,吞吐量逼近理论极限

  3. 预测伸缩:基于模式识别的提前扩容,SLO达成率>99%

未来演进:

  • 异构硬件调度:A100跑prefill,T4跑decode,成本再降50%

  • 请求语义聚类:相似请求自动聚合,接受率提升20%

  • 边缘-云协同:边缘设备预处理+缓存,云端专注生成

    # 异构调度伪代码 class HeterogeneousScheduler: def route_request(self, request): tokens = len(tokenizer.encode(request.prompt)) if tokens < 128: # 小请求用T4 return "t4-pool" elif request.priority == "high": # 高优用A100 return "a100-pool" else: return "auto-scaling-pool"
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/27 9:58:38

IQuest-Coder-V1-40B模型监控:Prometheus集成教程

IQuest-Coder-V1-40B模型监控&#xff1a;Prometheus集成教程 1. 引言 1.1 业务场景描述 IQuest-Coder-V1-40B-Instruct 是面向软件工程和竞技编程的新一代代码大语言模型&#xff0c;属于 IQuest-Coder-V1 系列中专注于通用编码辅助与指令遵循的变体。该模型在多个权威基准…

作者头像 李华
网站建设 2026/3/23 12:18:51

Windows系统美化革命:DWMBlurGlass打造个性化透明桌面体验

Windows系统美化革命&#xff1a;DWMBlurGlass打造个性化透明桌面体验 【免费下载链接】DWMBlurGlass Add custom effect to global system title bar, support win10 and win11. 项目地址: https://gitcode.com/gh_mirrors/dw/DWMBlurGlass 厌倦了千篇一律的Windows界面…

作者头像 李华
网站建设 2026/4/13 14:09:13

7种方法彻底掌控华硕笔记本:G-Helper高性能优化完全指南

7种方法彻底掌控华硕笔记本&#xff1a;G-Helper高性能优化完全指南 【免费下载链接】g-helper Lightweight Armoury Crate alternative for Asus laptops. Control tool for ROG Zephyrus G14, G15, G16, M16, Flow X13, Flow X16, TUF, Strix, Scar and other models 项目地…

作者头像 李华
网站建设 2026/4/18 6:39:56

Qwen1.5-0.5B-Chat性能对比:CPU与GPU环境差异分析

Qwen1.5-0.5B-Chat性能对比&#xff1a;CPU与GPU环境差异分析 1. 引言 1.1 轻量级大模型的部署需求背景 随着大语言模型在各类应用场景中的广泛落地&#xff0c;对模型推理效率和资源消耗的关注日益增加。尤其在边缘设备、本地开发环境或低成本服务部署中&#xff0c;高参数…

作者头像 李华
网站建设 2026/4/18 2:00:48

图解说明TI C2000平台下CCS安装流程

从零开始搭建C2000开发环境&#xff1a;手把手教你安装CCS与配置C2000Ware 你是不是也遇到过这种情况&#xff1f;刚拿到一块TMS320F28379D的开发板&#xff0c;满心欢喜想跑个PWM或电机控制例程&#xff0c;结果一上来就被“ CCS装不上、SDK找不到、下载就报错 ”卡住好几天…

作者头像 李华
网站建设 2026/4/13 5:28:02

图像质量评估实战指南:从零掌握AI智能评分技术

图像质量评估实战指南&#xff1a;从零掌握AI智能评分技术 【免费下载链接】image-quality-assessment Convolutional Neural Networks to predict the aesthetic and technical quality of images. 项目地址: https://gitcode.com/gh_mirrors/im/image-quality-assessment …

作者头像 李华