引言
在Python开发中,IO密集型任务(如网络请求、文件读写、数据库查询)往往成为性能瓶颈。传统的多线程方案虽然能解决并发问题,却存在全局解释器锁(GIL)限制、上下文切换开销大、调试困难等缺点。自Python 3.4引入asyncio库以来,异步编程逐渐成为主流,它通过单线程事件循环(event loop)实现协作式并发,极大地提升了IO密集型应用的吞吐量。
本文将带你系统掌握asyncio的核心概念与实战技巧,从协程、任务到高性能Web请求,每个知识点都配有完整可运行的代码示例。无论你是刚接触异步编程,还是希望深化理解的开发者,这篇文章都能成为你学习路上的高效指南。
一、核心概念:从协程到事件循环
1.1 什么是协程(coroutine)
协程是可以在执行过程中暂停并恢复的函数。Python通过async def关键字定义协程函数,调用该函数不会立即执行,而是返回一个协程对象,需要交给事件循环驱动。
import asyncio async def hello(): print("Hello") await asyncio.sleep(1) # 模拟IO等待,让出控制权 print("World") # 运行协程 asyncio.run(hello())上面代码中,await asyncio.sleep(1)意味着当前协程在这里暂停,让出CPU,事件循环可以去执行其他任务。这就是协作式多任务的核心——显式地交出控制权。
1.2 可等待对象(Awaitable)
在asyncio中,能够被await的对象称为可等待对象,主要有三种:
- 协程对象:由
async def函数返回。 - Task对象:包裹协程,用于并发调度。
- Future对象:底层回调容器,通常由框架使用。
async def say(msg): await asyncio.sleep(0.5) print(msg) async def main(): # 直接await协程 await say("Hello") # 创建Task,立即加入事件循环调度 task = asyncio.create_task(say("World")) print("Task created") await task # 等待task完成 asyncio.run(main())1.3 事件循环(Event Loop)
事件循环是异步编程的引擎,它不断轮询任务队列,执行已就绪的回调或协程。一个线程通常只有一个事件循环,asyncio.run()会自动创建并运行它。
# 底层操作,一般不需要手动管理 loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: loop.close()在Python 3.10+中,推荐始终使用高级APIasyncio.run(),它会自动处理循环的创建、关闭和异常。
二、实战示例:编写高性能异步程序
2.1 并发执行多个任务
批量下载网页是典型的IO密集型场景,使用asyncio.gather()或TaskGroup可以同时运行多个协程。
import asyncio import time import aiohttp # 若未安装,用 pip install aiohttp async def fetch_url(session, url): async with session.get(url) as resp: data = await resp.text() print(f"Fetched {url}, size: {len(data)}") return len(data) async def main(): urls = [ "https://python.org", "https://baidu.com", "https://bing.com", "https://qq.com", ] start = time.perf_counter() async with aiohttp.ClientSession() as session: tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) # 并发执行 elapsed = time.perf_counter() - start print(f"Fetched {len(urls)} URLs in {elapsed:.2f}s") print("Results:", results) asyncio.run(main())这段代码利用aiohttp异步库同时发出四个HTTP请求,总耗时近似于单个请求的最长时间,极大提升效率。
2.2 控制并发数量 — 信号量(Semaphore)
有时我们需要限制并发请求数,避免对服务器造成过大压力或触发反爬机制。asyncio.Semaphore可以轻松实现。
import asyncio import aiohttp async def fetch(session, url, sem): async with sem: # 进入上下文自动acquire,退出后release print(f"Fetching {url}") async with session.get(url) as resp: await asyncio.sleep(0.5) # 模拟额外耗时 return url, resp.status async def main(): sem = asyncio.Semaphore(2) # 最多同时2个请求 urls = [f"https://httpbin.org/delay/1?num={i}" for i in range(5)] async with aiohttp.ClientSession() as session: tasks = [fetch(session, url, sem) for url in urls] results = await asyncio.gather(*tasks) print("Results:", results) asyncio.run(main())运行时会发现,任何时候最多只有2个请求处于活跃状态。
2.3 超时控制与取消
异步操作必须考虑超时,否则可能造成任务永无响应。使用asyncio.wait_for()可以为协程设置超时时间,超时后抛出TimeoutError。
import asyncio async def long_task(): await asyncio.sleep(10) return "Done" async def main(): try: result = await asyncio.wait_for(long_task(), timeout=2) print(result) except asyncio.TimeoutError: print("任务超时!") # 取消正在运行的Task task = asyncio.create_task(long_task()) await asyncio.sleep(0.1) task.cancel() try: await task except asyncio.CancelledError: print("任务被取消") asyncio.run(main())注意:task.cancel()会引发CancelledError,通常在协程内部可以捕获该异常进行清理工作。
2.4 生产者-消费者模式
使用asyncio.Queue可以在多个协程间安全地传递数据。
import asyncio import random async def producer(queue, n): for i in range(n): await asyncio.sleep(random.random()) # 模拟生产耗时 item = f"item-{i}" await queue.put(item) print(f"Produced {item}") await queue.put(None) # 发送结束信号 async def consumer(queue, name): while True: item = await queue.get() if item is None: # 收到结束信号 queue.task_done() break await asyncio.sleep(random.random() * 0.5) # 模拟消费 print(f"Consumer {name} processed {item}") queue.task_done() async def main(): queue = asyncio.Queue() prod = asyncio.create_task(producer(queue, 5)) consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(2)] await asyncio.gather(prod, *consumers) # 等待队列中的所有项都被处理 await queue.join() # 可选,确保task_done均被调用 print("所有任务完成") asyncio.run(main())生产者-消费者模式在异步爬虫、日志处理等场景中非常实用。
三、常见问题与注意事项
3.1 避免在协程中使用同步阻塞代码
time.sleep()会阻塞整个线程,导致事件循环停止。在异步代码中必须使用await asyncio.sleep()。如果不小心调用了同步阻塞函数,整个事件循环都会被卡住,并发优势荡然无存。
错误示例:
async def bad(): import time time.sleep(1) # 阻塞,其他协程无法运行正确做法:
async def good(): await asyncio.sleep(1)如果不得不调用CPU密集型或阻塞型函数,可以将其放到线程池执行,使用loop.run_in_executor()或asyncio.to_thread()(Python 3.9+)。
import asyncio import time def blocking_io(): time.sleep(2) return "result" async def main(): result = await asyncio.to_thread(blocking_io) print(result) asyncio.run(main())3.2 调试异步代码
异步程序的异常堆栈可能不太直观。启用调试模式可以帮助定位问题:
import asyncio # 设置环境变量 PYTHONASYNCIODEBUG=1 或代码中启用 asyncio.run(main(), debug=True)此外,未处理的异常可能导致任务静默失败。推荐在gather()时设置return_exceptions=True来手动检查,或者使用Task.add_done_callback()。
async def main(): tasks = [asyncio.create_task(may_fail(i)) for i in range(5)] results = await asyncio.gather(*tasks, return_exceptions=True) for r in results: if isinstance(r, Exception): print(f"任务失败: {r}")3.3 使用正确的异步库
标准库中的许多IO函数(如requests、open)是同步阻塞的。进行异步HTTP请求应使用aiohttp或httpx;操作文件可使用aiofiles;操作数据库则有aiomysql、asyncpg等。使用不当的库会破坏异步性能。
3.4 事件循环与线程安全
asyncio本身不是线程安全的,除了少数方法(如loop.call_soon_threadsafe())之外,不应在多线程中随意调用事件循环的方法。如果需要从其他线程调度任务,请使用asyncio.run_coroutine_threadsafe()。
from threading import Thread import asyncio async def coro(): print("在线程中调度") def thread_main(loop): asyncio.run_coroutine_threadsafe(coro(), loop) print("线程发送完毕") async def main(): loop = asyncio.get_running_loop() t = Thread(target=thread_main, args=(loop,)) t.start() t.join() asyncio.run(main())四、总结
本文从协程基础概念出发,逐步深入到并发控制、超时处理、生产者消费者模式等实战场景,并给出了大量可直接运行的代码。异步编程并非银弹,但对于IO密集型任务,它能以极低的资源成本实现高并发,是网络编程、分布式爬虫、微服务通信等领域的利器。
掌握asyncio需要实践,建议你亲自动手修改示例代码,感受事件循环的调度逻辑。同时,注意区分同步与异步上下文,选择正确的异步库,并善用asyncio.to_thread来处理遗留阻塞代码。
希望本文能为你打开Python异步世界的大门,编写出高性能、可维护的异步应用。如果你有任何疑问或更好的实践,欢迎在评论区交流,我们一起进步!