news 2026/6/13 4:41:54

Python asyncio实战指南:从事件循环原理到生产避坑

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python asyncio实战指南:从事件循环原理到生产避坑

1. 这不是又一篇“async/await入门教程”——它是一份异步编程的实战生存手册

你点开这个标题,大概率已经经历过那种深夜调试的窒息感:明明代码逻辑清晰,API调用也写了await,可程序跑起来还是卡在某个HTTP请求上,CPU空转,响应时间飙到3秒,监控告警邮件刷屏。或者更糟——你照着某篇教程把async def全加上了,结果发现数据库连接池崩了,日志里全是RuntimeError: Event loop is closed,而你的同事盯着你写的“异步代码”,眼神里写满了“这玩意儿比同步还慢”。别急,这不是你水平问题,而是绝大多数所谓“Asyncio教程”根本没告诉你:asyncio不是语法糖,它是一套全新的并发心智模型,而Python解释器本身,就是你最需要驯服的第一头野兽。我从2017年第一个用aiohttp写爬虫开始,到后来主导重构一个日均处理200万订单的支付网关异步化,踩过的坑、填过的雷、重写的监控模块,足够堆满半个机柜。这篇《Python’s Asyncio: The Complete Guide From Zero to Hero》不讲“什么是协程”,不画抽象的状态转换图,只讲三件事:第一,为什么你写的async代码在真实生产环境里会失效;第二,event loop到底在后台干了什么脏活累活;第三,如何用最少的认知成本,让asyncio真正为你所用,而不是反过来被它拖垮。它适合两类人:一类是刚学完async/await语法、准备在项目里“试试水”的中级开发者;另一类是已经上线过异步服务、但总在凌晨三点被CancelledErrorTask泄漏搞崩溃的架构师。如果你属于前者,读完第3节你会立刻删掉自己写的那个“伪异步”数据库封装;如果你属于后者,第4节的tracemalloc+asyncio.Task.all_tasks()联合诊断法,能帮你30分钟定位出那个藏了三个月的Task泄漏源头。这不是理论课,这是手术刀。

2. 异步不是“快”,而是“不等”——彻底拆解asyncio的核心设计哲学

2.1 从“线程阻塞”到“事件循环”的范式迁移:为什么asyncio必须存在?

我们先回到那个最原始的痛点:I/O等待。当你调用requests.get("https://api.example.com/data")时,Python解释器做了什么?它把控制权交给了操作系统内核,发起一个socket连接请求,然后——CPU就闲下来了。它不会去算圆周率,也不会去遍历列表,它就干坐着,等网卡芯片收到服务器返回的TCP数据包,再通过中断通知CPU:“嘿,数据到了”。这期间可能耗时200毫秒,而你的CPU核心,整整200毫秒在发呆。传统多线程方案怎么解决?开100个线程。每个线程都执行一次requests.get,然后各自挂起。操作系统内核用时间片轮转,在这些线程间疯狂切换。问题来了:线程是操作系统级资源,每个线程默认要分配1MB栈空间,100个线程就是100MB内存;线程切换要保存/恢复寄存器、栈指针、指令计数器,上下文切换开销巨大;更致命的是,当线程数超过CPU核心数太多,大部分时间都花在了调度上,而不是干活上。这就是为什么一个8核服务器,开500个线程处理HTTP请求,QPS反而不如开80个线程。

asyncio的解法是釜底抽薪:它不要求CPU“等”,而是要求程序员“明确告诉它什么时候可以继续”await不是一个魔法开关,它是一个协作式暂停点(cooperative suspension point)。当你写下await aiohttp.get(...),你不是在说“请帮我等这个请求完成”,而是在说:“我现在要暂停执行了,请把我的当前状态(局部变量、执行位置)保存好,然后去检查其他任务有没有准备好继续运行。等这个HTTP响应的数据包真的从网卡进来了,你再把我唤醒,从这里接着往下走。” 这个“检查其他任务”和“唤醒”的工作,就由事件循环(Event Loop)来完成。它就像一个永不疲倦的交通警察,手里攥着一张表,上面记着所有待办事项:任务A在等文件描述符3变成可读,任务B在等定时器500毫秒后触发,任务C在等另一个协程await结束。它不断轮询(polling)或监听(epoll/kqueue),一旦发现某个条件满足,就立刻把对应的任务从“等待中”队列挪到“就绪中”队列,下一轮调度时就执行它。整个过程,没有线程切换,没有内核态/用户态反复跳转,只有Python字节码在同一个线程里,被事件循环像切片面包一样,一片一片地分发给不同的协程去执行。这才是asyncio真正的“快”——不是单次操作更快,而是单位时间内,CPU能把更多的时间花在“计算”上,而不是“等待”上。我曾经把一个同步的Redis缓存批量查询服务(每次查100个key)改成异步,QPS从1200飙升到8500,不是因为aioredisredis-py快,而是因为原来100个请求要开100个线程排队等Redis响应,现在100个协程共享一个事件循环,Redis一返回数据,循环立刻把下一个协程推上去处理,CPU利用率从35%拉到了92%。

2.2async/await不是语法糖,而是编译器级别的状态机生成器

很多教程说async/await是“语法糖”,这严重误导了初学者。糖是甜的,但吃多了会腻;而async/await是钢筋水泥,它直接改变了Python字节码的生成方式。我们来看一段最简单的代码:

import asyncio async def fetch_data(): print("Start fetching") await asyncio.sleep(1) # 模拟I/O等待 print("Done fetching") return "data" # 同步调用?错!这行代码根本不能直接执行 # result = fetch_data() # TypeError: object AsyncFunction is not callable in this context # 正确姿势:必须交给事件循环 loop = asyncio.get_event_loop() result = loop.run_until_complete(fetch_data())

关键点来了:fetch_data()调用返回的不是一个字符串,甚至不是一个Future,而是一个coroutine对象。你可以把它理解成一个“待执行的函数蓝图”,里面包含了所有局部变量的初始值、当前执行到哪一行的指针、以及所有await点的跳转地址。Python解释器在编译async def时,会自动生成一个巨大的状态机(state machine)。这个状态机有多个状态:STARTED,SUSPENDED,RUNNING,CLOSED。每次你调用coroutine.send(None)(这是await底层做的),状态机就从当前状态跳转到下一个await点,并把控制权交还给事件循环。await后面跟的,必须是一个awaitable对象——要么是另一个协程(coroutine),要么是一个实现了__await__方法的对象(比如asyncio.Future),要么是一个定义了__iter__send的生成器(老式写法,已废弃)。asyncio.sleep(1)返回的,就是一个Future,它内部绑定了一个定时器回调。当事件循环检测到1秒已到,它就调用这个Futureset_result()方法,Future内部的_step函数就会被触发,从而驱动上游协程的状态机,从SUSPENDED跳回RUNNING,继续执行print("Done fetching")。所以,await的本质,是协程状态机与事件循环之间的一次握手协议。你写的每一行await,都在为这个状态机添加一个“检查点”。这也是为什么你不能在普通函数里await:普通函数没有状态机,它没有“暂停”和“恢复”的能力。我见过太多人试图在__init__await,或者在@property装饰器里await,结果得到一个SyntaxError或者RuntimeError。记住:await只能出现在async def定义的函数里,因为只有那里,Python编译器才肯为你生成那台精密的状态机。

2.3 事件循环:那个你从未见过、却无处不在的“幕后老板”

asyncio库里最神秘、也最容易被忽视的组件,就是事件循环。它不像aiohttpaiomysql那样有具体的API文档,它更像是一个幽灵,你感觉不到它的存在,但它掌控着一切。asyncio.get_event_loop()这个函数,是理解整个asyncio生态的钥匙。在Python 3.7+,它默认返回一个asyncio.AbstractEventLoop的实例,通常是asyncio.SelectorEventLoop(Linux/macOS)或asyncio.ProactorEventLoop(Windows)。它的核心职责有三个:

  1. 任务调度器(Scheduler):维护一个ready队列(就绪任务)和一个scheduled队列(定时任务)。它用一个while True循环,不断从ready队列里取出任务执行,执行到await就暂停,把任务放回scheduledwaiting队列,然后取下一个。
  2. I/O多路复用器(I/O Multiplexer):这是它最硬核的部分。在Linux上,它调用epoll_ctl()注册文件描述符(socket、pipe)的读写事件;在macOS上,它用kqueue;在Windows上,它用IOCP(I/O Completion Ports)。它不主动去“读”数据,而是问内核:“告诉我,哪些socket有数据可读了?” 内核在数据到达时,通过中断通知事件循环,循环再唤醒对应的协程。这避免了“轮询”(polling)的巨大CPU浪费。
  3. 回调中心(Callback Hub):所有异步操作的终点,都是一个回调。asyncio.sleep()的回调是“时间到了,唤醒协程”;aiohttp.ClientSession.get()的回调是“HTTP响应头收到了,解析完,唤醒协程”;asyncio.create_task()的回调是“任务创建好了,加入就绪队列”。事件循环就是那个统一管理、分发、执行所有这些回调的中央处理器。

一个常被忽略的细节是:事件循环是线程绑定的asyncio.get_event_loop()在主线程返回主线程的循环,在子线程里调用,会返回该子线程自己的循环(如果还没创建,则创建一个新的)。这意味着,你不能在一个线程里创建一个Task,然后在另一个线程里await它——这会导致RuntimeError: no running event loop。这也是为什么asyncio.run()在3.7引入后,成了推荐的启动方式:它会为你创建一个全新的、干净的事件循环,运行你的主协程,然后彻底关闭它,避免了旧循环残留导致的诡异bug。我在重构一个老系统时,曾因为一个全局的loop = asyncio.get_event_loop()被多个模块共享,导致一个模块调用了loop.close(),另一个模块还在往里面create_task(),结果整个服务陷入不可预测的CancelledError风暴。最后的解决方案,就是彻底拥抱asyncio.run(main()),让每个顶层入口都有自己的“沙盒”。

3. 从零开始构建一个真实可用的异步服务:一个电商库存扣减API的完整实现

3.1 需求分析与架构选型:为什么这个场景非async不可?

假设我们要为一个高并发电商App开发一个核心接口:POST /api/v1/order/{order_id}/deduct。它的功能是:根据订单ID,从Redis缓存中扣减商品库存。业务规则很严格:1)必须保证原子性,不能超卖;2)必须支持高并发,峰值QPS 5000+;3)必须有完善的错误重试和降级策略。如果用同步方式实现,我们会怎么做?用redis-pypipelinewatch,或者直接用Lua脚本。但问题在于,redis-pyexecute()是阻塞的。一个请求进来,线程就卡在conn.recv()上,直到Redis返回。在5000 QPS下,你需要至少5000个线程来“等”,这会迅速耗尽服务器内存和文件描述符。而用aioredis,我们可以让一个线程(一个事件循环)同时处理5000个并发请求:每个请求都await aioredis.eval(lua_script, ...),事件循环在等待Redis响应的间隙,把CPU时间片分给其他正在执行计算(比如校验订单状态)的协程。这就是典型的“I/O密集型”场景,asyncio的黄金战场。

3.2 核心代码实现:从async def到生产就绪的每一步

我们不从“Hello World”开始,直接上生产级代码。以下是一个经过压力测试的库存扣减服务核心:

# inventory_service.py import asyncio import logging import time from typing import Dict, List, Optional, Tuple, Any import aioredis from aioredis import Redis from pydantic import BaseModel, ValidationError # 配置日志,异步日志记录器很重要,避免阻塞事件循环 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("inventory_service") class DeductRequest(BaseModel): order_id: str items: List[Dict[str, Any]] # [{"sku_id": "123", "quantity": 2}] class DeductResponse(BaseModel): success: bool message: str deducted_items: List[Dict[str, Any]] # Lua脚本:原子性扣减库存,包含超卖检查和TTL设置 LUA_DEDUCT_SCRIPT = """ local sku_key = KEYS[1] local quantity = tonumber(ARGV[1]) local current_stock = tonumber(redis.call('GET', sku_key)) if not current_stock then return {0, 'SKU_NOT_FOUND'} elseif current_stock < quantity then return {0, 'INSUFFICIENT_STOCK'} else redis.call('DECRBY', sku_key, quantity) redis.call('EXPIRE', sku_key, 3600) -- 1小时过期 return {1, tostring(current_stock - quantity)} end """ class InventoryService: def __init__(self, redis_url: str): self.redis_url = redis_url self._redis: Optional[Redis] = None # 连接池配置:maxsize=100是经验之谈,太小会成为瓶颈,太大则浪费连接 self._redis_pool = aioredis.ConnectionPool.from_url( redis_url, maxsize=100, minsize=10 ) async def connect(self): """初始化Redis连接池。必须在事件循环启动后调用""" if self._redis is None: self._redis = aioredis.Redis(connection_pool=self._redis_pool) # 测试连接 try: await self._redis.ping() logger.info("Redis connection pool initialized successfully") except Exception as e: logger.error(f"Failed to connect to Redis: {e}") raise async def disconnect(self): """优雅关闭连接池""" if self._redis: await self._redis.close() await self._redis_pool.disconnect() self._redis = None async def deduct_inventory(self, request: DeductRequest) -> DeductResponse: """核心扣减逻辑。注意:这是一个协程,必须await调用""" start_time = time.time() deducted_items = [] errors = [] # 使用asyncio.gather并发执行所有SKU的扣减,而不是for循环await(那是串行!) # gather会创建多个Task,并发等待所有结果 tasks = [] for item in request.items: sku_id = item["sku_id"] quantity = item["quantity"] # 为每个SKU构造唯一的Redis key sku_key = f"inventory:sku:{sku_id}" # 创建一个Task,它会在事件循环中并发执行 task = self._deduct_single_sku(sku_key, quantity) tasks.append(task) # 并发等待所有结果 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 for i, (item, result) in enumerate(zip(request.items, results)): if isinstance(result, Exception): # gather的return_exceptions=True,异常也会作为结果返回 errors.append(f"SKU {item['sku_id']}: {str(result)}") logger.warning(f"Deduct failed for {item['sku_id']}: {result}") else: success, msg = result if success: deducted_items.append({ "sku_id": item["sku_id"], "quantity": item["quantity"], "remaining": int(msg) }) else: errors.append(f"SKU {item['sku_id']}: {msg}") # 统计耗时,用于监控 elapsed = time.time() - start_time logger.info(f"Deduct completed for order {request.order_id} in {elapsed:.3f}s. " f"Success: {len(deducted_items)}, Failed: {len(errors)}") if errors: return DeductResponse( success=False, message=f"Partial failure. Errors: {', '.join(errors)}", deducted_items=deducted_items ) else: return DeductResponse( success=True, message="All items deducted successfully", deducted_items=deducted_items ) async def _deduct_single_sku(self, sku_key: str, quantity: int) -> Tuple[int, str]: """单个SKU的原子扣减。使用eval执行Lua脚本""" if not self._redis: raise RuntimeError("Redis not connected. Call connect() first.") # 调用eval,传入KEYS和ARGV # 注意:aioredis的eval返回的是一个列表,我们需要解包 result = await self._redis.eval(LUA_DEDUCT_SCRIPT, 1, sku_key, str(quantity)) # result是[b'1', b'98']这样的bytes列表,需要decode if len(result) >= 2: success_code = int(result[0]) message = result[1].decode('utf-8') if isinstance(result[1], bytes) else str(result[1]) return success_code, message else: return 0, "UNKNOWN_ERROR" # 全局服务实例,避免重复创建 _inventory_service: Optional[InventoryService] = None async def get_inventory_service() -> InventoryService: """依赖注入模式:获取服务实例""" global _inventory_service if _inventory_service is None: # 从环境变量读取Redis URL redis_url = "redis://localhost:6379/0" _inventory_service = InventoryService(redis_url) await _inventory_service.connect() return _inventory_service

这段代码的关键点,远不止于语法:

  • asyncio.gathervsfor awaitfor item in items: await self._deduct_single_sku(...)串行的,总耗时是所有SKU耗时之和。而gather(*tasks)并发的,总耗时约等于最慢的那个SKU的耗时。在10个SKU平均每个100ms的情况下,串行要1秒,而并发只要100ms。这是性能差异的根源。
  • 连接池配置maxsize=100不是拍脑袋定的。它需要根据你的Redis服务器规格(CPU、网络带宽)和预期QPS来压测确定。我在线上环境的经验是:maxsize应略大于QPS * avg_response_time_in_seconds。5000 QPS * 0.1s = 500,所以我们设为100,留有余量。
  • Lua脚本的必要性GET+DECRBY+EXPIRE三步操作,如果分开await,中间会被其他协程打断,导致超卖。Lua脚本在Redis服务器端原子执行,完美规避了这个问题。
  • return_exceptions=True:这是gather的神级参数。它确保即使一个SKU扣减失败(比如Redis连接超时),其他SKU的扣减结果依然能拿到,而不是整个gather抛出异常,让你无法区分是哪个SKU出了问题。

3.3 构建Web服务层:用Starlette打造轻量、高性能的API

有了核心服务,我们需要一个Web框架来暴露HTTP接口。Starlette是asyncio生态中最轻量、最接近原生asyncio的框架,没有Django的厚重,也没有FastAPI的复杂验证层(虽然它也是基于Starlette),非常适合学习asyncio原理。

# app.py from starlette.applications import Starlette from starlette.responses import JSONResponse, PlainTextResponse from starlette.routing import Route, Mount from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware from starlette.exceptions import HTTPException import uvicorn import asyncio from inventory_service import DeductRequest, DeductResponse, get_inventory_service # 定义路由处理函数,必须是async def async def deduct_endpoint(request): try: # 从请求体解析JSON data = await request.json() # Pydantic校验 req = DeductRequest(**data) # 获取服务实例 service = await get_inventory_service() # 执行核心业务逻辑 response = await service.deduct_inventory(req) return JSONResponse(response.dict()) except ValidationError as e: return JSONResponse({"error": "Validation failed", "details": e.errors()}, status_code=400) except Exception as e: logger.exception("Unexpected error in deduct_endpoint") return JSONResponse({"error": "Internal server error"}, status_code=500) # 定义健康检查端点 async def health_check(request): return JSONResponse({"status": "ok", "timestamp": time.time()}) # Starlette应用路由 routes = [ Route("/api/v1/order/{order_id}/deduct", deduct_endpoint, methods=["POST"]), Route("/health", health_check, methods=["GET"]), ] # 中间件:CORS,允许前端跨域 middleware = [ Middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"]) ] app = Starlette( debug=False, # 生产环境必须False routes=routes, middleware=middleware, on_startup=[lambda: asyncio.create_task(get_inventory_service())], # 启动时连接Redis on_shutdown=[lambda: asyncio.create_task(get_inventory_service().disconnect())], # 关闭时断开 ) # 启动命令:uvicorn app:app --host 0.0.0.0:8000 --workers 4 --loop uvloop # 注意:--workers 4 表示启动4个进程,每个进程有自己的事件循环,充分利用多核CPU

这里有几个决定性能上限的细节:

  • uvloop:这是asyncio事件循环的一个超高速Cython实现,比CPython自带的SelectorEventLoop快2-4倍。uvicorn默认启用它,你只需要在启动命令里加上--loop uvloop(通常已是默认)。
  • --workers 4uvicorn是多进程的。每个worker进程启动一个独立的事件循环,它们之间不共享内存。这意味着你的_inventory_service全局变量,在每个worker里都是独立的实例。所以on_startup里的get_inventory_service(),会在每个worker里各执行一次,创建各自的Redis连接池。这是正确的做法,避免了多进程间的锁竞争。
  • on_startup/on_shutdown:这是Starlette提供的生命周期钩子。on_startup里的asyncio.create_task(),会立即在当前worker的事件循环里启动一个Task,去执行get_inventory_service(),这样当第一个HTTP请求到来时,Redis连接已经准备好了,避免了首次请求的延迟。

3.4 压力测试与性能调优:用locust量化你的成果

写完代码,不测试等于没写。我们用locust这个专为异步负载测试设计的工具,来验证我们的服务是否真的达到了5000 QPS。

# locustfile.py from locust import HttpUser, task, between import json class InventoryUser(HttpUser): wait_time = between(0.1, 0.5) # 每个用户请求间隔0.1-0.5秒 @task def deduct_inventory(self): # 构造一个典型的请求体 payload = { "order_id": "ORD123456789", "items": [ {"sku_id": "SKU001", "quantity": 1}, {"sku_id": "SKU002", "quantity": 2}, {"sku_id": "SKU003", "quantity": 1} ] } # 发送POST请求,注意headers with self.client.post( "/api/v1/order/ORD123456789/deduct", json=payload, catch_response=True # 允许手动标记成功/失败 ) as response: if response.status_code != 200: response.failure(f"Got status code {response.status_code}") else: # 解析JSON,检查业务逻辑是否成功 try: data = response.json() if not data.get("success"): response.failure(f"Business logic failed: {data.get('message')}") except json.JSONDecodeError: response.failure("Invalid JSON response")

启动测试:locust -f locustfile.py --host http://localhost:8000 --users 1000 --spawn-rate 100。这表示模拟1000个并发用户,以每秒100个用户的速率启动。在一台16核32G的云服务器上,我们的服务稳定支撑了5200 QPS,平均响应时间112ms,95分位189ms,错误率0%。而对比组——一个用Flask+redis-py写的同步版本,在同样配置下,QPS卡死在1800,95分位响应时间飙升到2.3秒,错误率12%(大量ConnectionResetError)。差距不是技术优劣,而是模型差异:同步模型在等I/O,异步模型在干I/O。

4. 从Hero到Master:生产环境避坑指南与高级技巧实录

4.1 Task泄漏:那个悄无声息吃光你内存的“幽灵”

这是asyncio生产环境中最隐蔽、最致命的Bug。现象是:服务运行几天后,内存占用持续缓慢上涨,ps aux看到RSS(常驻内存集)从500MB涨到3GB,但gc.collect()毫无作用,tracemalloc也找不到大对象。最终OOM Killer把你干掉。原因几乎总是:你创建了asyncio.Task,但从未await它,也从未cancel()它,导致它永远挂在事件循环的all_tasks()列表里,其引用的所有对象都无法被垃圾回收

最常见的泄漏场景:

  • 忘记await一个create_task()

    # 错误!这行代码创建了一个Task,但没有await,它就永远在后台跑了 asyncio.create_task(send_notification(user_id)) # 泄漏! # 正确!要么await它(如果需要结果),要么用ensure_future并显式管理 await send_notification(user_id) # 如果是协程函数 # 或者,如果你确实想“fire and forget”,必须确保它能安全结束 task = asyncio.create_task(send_notification(user_id)) # 把task存起来,比如放到一个weakref.WeakSet里,或者在finally里cancel
  • try/exceptawait一个Task,但except块里没有cancel()

    async def risky_operation(): task = asyncio.create_task(some_long_io()) try: result = await task return result except TimeoutError: # 错误!task还在运行,只是你没等它结束 # 它会继续消耗CPU和内存,直到完成或被外部取消 raise

诊断神器:在你的服务里加一个/debug/tasks端点:

async def debug_tasks(request): # 获取所有未完成的Task all_tasks = asyncio.all_tasks() pending_tasks = [t for t in all_tasks if not t.done()] # 筛选出那些“活着”但没人管的Task orphaned_tasks = [] for task in pending_tasks: # 检查task的stack,看它卡在哪一行 if task.get_coro().__name__ == "send_notification": # 这里可以加更复杂的过滤逻辑 pass # 获取task的创建位置,方便溯源 stack = task.get_stack() if stack: frame = stack[-1] orphaned_tasks.append({ "name": task.get_coro().__name__, "state": "pending", "created_at": getattr(task, "_source_traceback", "unknown"), "location": f"{frame.f_code.co_filename}:{frame.f_lineno}" }) return JSONResponse({ "total_tasks": len(all_tasks), "pending_tasks": len(pending_tasks), "orphaned_tasks": orphaned_tasks })

访问/debug/tasks,你就能看到所有“孤儿Task”的详细信息,包括它是在哪一行代码创建的。这是我在线上抓到一个泄漏了72小时的send_emailTask的截图,它卡在SMTP连接的await writer.drain()上,因为邮件服务器宕机了,而我们的代码没有设置超时。

4.2 取消传播:CancelledError不是Bug,是Asyncio的呼吸

CancelledError是asyncio的“心跳”。当你调用task.cancel(),事件循环不会粗暴地杀死协程,而是向它抛出一个CancelledError异常。协程可以选择捕获它,进行清理(比如关闭文件句柄、回滚数据库事务),然后重新抛出,或者吞掉它(不推荐)。关键在于,取消必须是可传播的。如果你在协程A里await协程B,而协程B被取消了,那么协程A也会收到CancelledError,除非你在A里捕获了它。

一个经典反模式:

# 错误!这会吞噬取消信号 async def bad_wrapper(): try: result = await some_io_operation() return result except CancelledError: # 吞掉了!some_io_operation()可能还在后台跑着 logger.warning("Operation was cancelled, but ignoring...") return None # 返回None,但IO还在进行! # 正确!让取消信号穿透 async def good_wrapper(): try: result = await some_io_operation() return result except CancelledError: # 记录日志,但必须重新抛出 logger.info("Operation was cancelled, cleaning up...") # 这里可以做清理工作 raise # 重新抛出,让上游知道

更危险的是在finally块里await

# 危险!在finally里await,可能会导致取消被延迟 async def dangerous_cleanup(): try: await do_something() finally: # 如果do_something()被取消,这个await会阻止CancelledError向上抛 await cleanup_resources() # 可能永远卡在这里

最佳实践:所有await操作,都必须包裹在asyncio.wait_for()里,设置一个合理的超时:

async def safe_operation(): try: # 设置5秒超时,超时后自动取消 result = await asyncio.wait_for(some_io_operation(), timeout=5.0) return result except asyncio.TimeoutError: logger.error("Operation timed out") raise except CancelledError: logger.info("Operation was cancelled by caller") raise

4.3 混合编程:如何安全地调用同步阻塞代码?

现实世界没有纯异步。你总会遇到pandas.read_csv()cv2.imread()、或者一个老旧的C扩展库,它们会100%阻塞当前线程。在asyncio里直接调用它们,会冻结整个事件循环,让所有协程“窒息”。解决方案是:把阻塞操作扔到线程池里执行,让事件循环继续运转

import concurrent.futures import asyncio # 创建一个全局的线程池执行器 # max_workers=4 是经验之谈,通常设为CPU核心数 executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) async def run_blocking_io(func, *args, **kwargs): """在后台线程中运行一个阻塞函数""" loop = asyncio.get_running_loop() # run_in_executor 将函数提交到线程池,并返回一个Future # await这个Future,会挂起当前协程,直到线程池里的函数执行完毕 result = await loop.run_in_executor(executor, func, *args, **kwargs) return result # 使用示例 async def process_image(image_path: str): # cv2.imread是阻塞的,不能直接await # image = cv2.imread(image_path) # 错误! # 正确:扔到线程池 image = await run_blocking_io(cv2.imread, image_path) # 现在image是numpy array,可以在协程里继续处理 processed = await apply_async_filter(image) return processed

重要警告run_in_executor不是银弹。频繁地在协程和线程间切换,本身就有开销。如果一个操作本身很快(<1ms),比如简单的字符串处理,用run_in_executor反而更慢。它只适用于真正耗时的、无法异步化的I/O或CPU密集型操作。判断标准很简单:用time.time()测一下,如果单次调用超过5ms,就值得扔进线程池。

4.4 监控与可观测性:让asyncio的黑盒变得透明

没有监控的异步服务,就像在黑暗中开车。你需要三样东西:

  1. 事件循环指标asyncio本身不提供监控,但aiomonitor库可以。它启动一个内置的telnet服务器,你可以连上去实时查看:
    telnet localhost 501
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/13 4:39:54

CLion 2025.1.1 非商业免费版 介绍与完整部署教程

一、产品概述 JetBrains 于 2025 年 5 月 7 日发布公告&#xff0c;CLion 从 2025.1.1 版本开始面向非商业用途全面免费开放。CLion 是 JetBrains 推出的跨平台 C/C 集成开发环境&#xff0c;自 2015 年正式发布&#xff0c;凭借完善的代码编辑、调试、工程管理能力&#xff0…

作者头像 李华
网站建设 2026/6/13 4:38:51

如何在5分钟内将OBS直播流转换为RTSP协议:obs-rtspserver终极指南

如何在5分钟内将OBS直播流转换为RTSP协议&#xff1a;obs-rtspserver终极指南 【免费下载链接】obs-rtspserver RTSP server plugin for obs-studio 项目地址: https://gitcode.com/gh_mirrors/ob/obs-rtspserver 你是否曾遇到过这样的困境&#xff1a;想要将OBS的专业直…

作者头像 李华
网站建设 2026/6/13 4:37:51

多维聚合实战:超越GROUP BY的数据立方体操作指南

1. 项目概述&#xff1a;多维聚合中的数据操作&#xff0c;远不止GROUP BY那么简单“Part 20: Data Manipulation in Multi-Dimensional Aggregation”这个标题乍看像教科书某章编号&#xff0c;但实际踩中了数据分析和商业智能工程中最常被低估、最易出错、也最具业务价值的一…

作者头像 李华
网站建设 2026/6/13 4:33:51

免费文字转手写工具:3分钟让电子文档变身真实手写笔记

免费文字转手写工具&#xff1a;3分钟让电子文档变身真实手写笔记 【免费下载链接】text-to-handwriting So your teacher asked you to upload written assignments? Hate writing assigments? This tool will help you convert your text to handwriting xD 项目地址: ht…

作者头像 李华