041 异步编程:用asyncio提升LLM应用的并发性能
从一次线上事故说起
凌晨两点,告警电话把我从床上拽起来。监控显示我们的LLM对话服务响应时间从200ms飙到了8秒,CPU负载却只有30%。查日志发现,每次用户请求都在等上游的OpenAI接口返回——一个请求等2秒,十个请求排队等20秒。同步阻塞,典型的“一个人干活,一群人围观”场景。
这个问题的本质是:LLM调用是I/O密集型操作,网络延迟占大头。同步代码里,线程在等待网络响应时啥也不干,白白浪费CPU时间片。当时我盯着火焰图里那条长长的recvfrom调用链,心想:必须上异步了。
asyncio到底在解决什么问题
很多人把asyncio和“并发”划等号,其实它解决的是等待时的资源复用。想象你在咖啡店排队点单:同步模式是你站在柜台前等咖啡做好才让下一个人点;异步模式是你点完单拿个号去旁边坐着,服务员继续接待下一个人,咖啡好了叫号。
Python的GIL让多线程在CPU密集型任务上表现糟糕,但对I/O密集型任务,asyncio用单线程+事件循环实现了比多线程更高效的并发。原因很简单:线程切换有开销,协程切换是用户态操作,轻量得多。
从同步到异步:一个LLM调用的改造实录
先看一个典型的同步LLM调用代码:
importrequestsimporttimedefcall_llm(prompt):# 这里踩过坑:requests默认超时时间很长,生产环境一定要设timeoutresp=requests.post("https://api.openai.com/v1/chat/completions",json={"model":"gpt-3.5-turbo","messages":[{"role":"user","content":prompt}]},headers={"Authorization":"Bearer YOUR_KEY"},timeout=30)returnresp.json()# 串行调用,10个请求要等20秒+prompts=["讲个笑话","写首诗","翻译成英文",...]results=[]forpinprompts:results.append(call_llm(p))这段代码的问题:每个call_llm都在阻塞等待网络响应,10个请求依次执行,总耗时 = 单个请求耗时 × 请求数。
改成asyncio版本:
importasyncioimportaiohttp# 别用requests,它不支持异步asyncdefcall_llm_async(session,prompt):# 注意:aiohttp的session要复用,别每次请求都创建新的asyncwithsession.post("https://api.openai.com/v1/chat/completions",json={"model":"gpt-3.5-turbo","messages":[{"role":"user","content":prompt}]},headers={"Authorization":"Bearer YOUR_KEY"},timeout=aiohttp.ClientTimeout(total=30))asresp:returnawaitresp.json()asyncdefmain():# 这里踩过坑:connector参数控制连接池大小,默认20,并发高时要调大connector=aiohttp.TCPConnector(limit=50)asyncwithaiohttp.ClientSession(connector=connector)assession:prompts=["讲个笑话","写首诗","翻译成英文",...]# 创建10个协程任务,并发执行tasks=[call_llm_async(session,p)forpinprompts]results=awaitasyncio.gather(*tasks)returnresults# 运行事件循环start=time.time()results=asyncio.run(main())print(f"耗时:{time.time()-start:.2f}秒")改造后,10个请求几乎同时发出,总耗时 ≈ 最慢的那个请求的耗时(通常2-3秒)。性能提升5-10倍。
那些年我踩过的asyncio坑
坑1:在异步函数里调用同步阻塞代码
asyncdefbad_example():# 别这样写!time.sleep会阻塞整个事件循环time.sleep(5)return"done"asyncdefgood_example():# 正确做法:用asyncio.sleep让出控制权awaitasyncio.sleep(5)return"done"如果必须调用同步阻塞库(比如某些数据库驱动),用loop.run_in_executor把它扔到线程池里:
importconcurrent.futuresasyncdefcall_sync_db():loop=asyncio.get_event_loop()# 这里踩过坑:默认线程池大小是CPU核心数×5,高并发场景要自定义withconcurrent.futures.ThreadPoolExecutor(max_workers=20)aspool:result=awaitloop.run_in_executor(pool,sync_db_query)returnresult坑2:忘记处理异常导致任务静默失败
asyncio.gather默认会抛出第一个异常,导致其他任务被取消。更稳妥的做法:
asyncdefsafe_gather():tasks=[call_llm_async(session,p)forpinprompts]# return_exceptions=True让异常作为结果返回,不会中断其他任务results=awaitasyncio.gather(*tasks,return_exceptions=True)fori,rinenumerate(results):ifisinstance(r,Exception):print(f"第{i}个请求失败:{r}")results[i]=None# 用默认值替代returnresults坑3:事件循环嵌套
在Jupyter Notebook或某些框架里,事件循环可能已经运行,再调用asyncio.run()会报错。解决方案:
try:loop=asyncio.get_running_loop()exceptRuntimeError:# 没有运行中的事件循环,创建新的results=asyncio.run(main())else:# 已有事件循环,用create_taskresults=loop.run_until_complete(main())实战:构建一个带限速的LLM并发调用器
LLM API通常有速率限制(比如每分钟60次),并发太高会被限流甚至封号。我们需要一个带令牌桶的异步限速器:
importasyncioimporttimeclassRateLimiter:def__init__(self,max_calls,period=60):self.max_calls=max_calls self.period=period self.tokens=max_calls self.last_refill=time.monotonic()self._lock=asyncio.Lock()# 这里踩过坑:必须用asyncio.Lock,不是threading.Lockasyncdefacquire(self):asyncwithself._lock:now=time.monotonic()elapsed=now-self.last_refill# 按时间比例补充令牌self.tokens=min(self.max_calls,self.tokens+elapsed*(self.max_calls/self.period))self.last_refill=nowifself.tokens<1:# 令牌不足,计算需要等待的时间wait_time=(1-self.tokens)*(self.period/self.max_calls)awaitasyncio.sleep(wait_time)self.tokens=0else:self.tokens-=1# 使用示例limiter=RateLimiter(max_calls=60,period=60)# 每分钟60次asyncdefrate_limited_call(session,prompt):awaitlimiter.acquire()# 获取令牌,可能等待returnawaitcall_llm_async(session,prompt)这个限速器用asyncio.Lock保证令牌操作的原子性,用time.monotonic避免系统时间调整带来的问题。实际生产环境中,我还会加上指数退避重试逻辑。
性能调优:从10倍到50倍
基础异步改造能带来5-10倍提升,但想榨干性能,还需要几个技巧:
1. 连接复用
aiohttp.ClientSession默认会复用TCP连接,但要注意连接池大小。我一般根据API的并发限制来设置:
# 如果API允许100并发,连接池设120留余量connector=aiohttp.TCPConnector(limit=120,limit_per_host=120)2. 请求合并
对于流式输出(SSE),用aiohttp的content属性逐块读取,避免等待完整响应:
asyncdefstream_llm(session,prompt):asyncwithsession.post(url,json={"stream":True,...},headers=headers)asresp:asyncforchunkinresp.content:# 处理每个chunk,实现打字机效果yieldchunk.decode()3. 使用uvloop加速
uvloop是用Cython重写的事件循环,性能比默认的asyncio事件循环快2倍:
importuvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())# 然后正常使用asyncio.run()注意:uvloop在Windows上支持有限,生产环境建议在Linux上部署。
个人经验建议
别盲目全盘异步化。如果你的服务主要是CPU计算(比如图像处理、模型推理),异步带来的收益有限,反而增加代码复杂度。异步最适合I/O密集且并发高的场景。
异步代码的调试是个噩梦。
asyncio.gather里的异常堆栈经常指向事件循环内部,而不是你的业务代码。我的做法是:每个协程函数都加try/except,用logging.exception记录完整上下文,再重新抛出或返回错误码。注意内存泄漏。异步代码里容易忘记释放资源,特别是
aiohttp.ClientSession。始终用async with管理资源,或者显式调用session.close()。监控要跟上。异步服务的性能指标和同步服务不同,要关注事件循环的延迟(event loop lag)、协程队列长度、连接池使用率。我习惯在关键路径上埋点,记录每个协程的等待时间和执行时间。
从同步到异步的迁移策略:不要一次性重写所有代码。先找出I/O瓶颈最严重的模块(比如LLM调用、数据库查询),用异步重写,其他部分保持同步。用
run_in_executor作为过渡桥梁。
最后说一句:asyncio不是银弹,但对付LLM这种高延迟、高并发的I/O场景,它确实是最趁手的工具。下次你的服务再因为同步阻塞而告警,试试把requests换成aiohttp,把time.sleep换成asyncio.sleep——你会回来感谢我的。