news 2026/4/25 1:03:25

Python 异步IO:asyncio深度解析

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python 异步IO:asyncio深度解析

Python 异步IO:asyncio深度解析

1. 异步编程概述

异步编程是一种编程范式,它允许程序在等待某个操作完成时继续执行其他任务,而不是阻塞等待。在Python中,asyncio库是实现异步编程的核心。

核心概念

  • 同步编程:代码按顺序执行,一个操作完成后才开始下一个操作
  • 异步编程:代码可以在等待某些操作(如I/O)时执行其他任务
  • 协程:轻量级的并发执行单元
  • 事件循环:管理和调度协程的执行
  • Future:表示异步操作的结果
  • Task:对协程的封装,用于并发执行

2. asyncio基础

2.1 协程定义

在Python中,协程可以通过async def关键字定义:

import asyncio async def hello(): print('Hello') await asyncio.sleep(1) print('World') # 运行协程 asyncio.run(hello())

2.2 事件循环

事件循环是asyncio的核心,它负责调度协程的执行:

import asyncio async def task1(): print('Task 1 started') await asyncio.sleep(2) print('Task 1 completed') async def task2(): print('Task 2 started') await asyncio.sleep(1) print('Task 2 completed') async def main(): # 创建任务 t1 = asyncio.create_task(task1()) t2 = asyncio.create_task(task2()) # 等待任务完成 await t1 await t2 # 运行主协程 asyncio.run(main())

2.3 await关键字

await关键字用于等待一个异步操作完成:

async def fetch_data(url): print(f'Fetching data from {url}') # 模拟网络请求 await asyncio.sleep(2) return f'Data from {url}' async def main(): # 串行执行 data1 = await fetch_data('https://api.example.com/data1') data2 = await fetch_data('https://api.example.com/data2') print(data1, data2) asyncio.run(main())

3. 并发执行

3.1 asyncio.gather

asyncio.gather用于并发执行多个协程:

async def fetch_data(url): print(f'Fetching data from {url}') await asyncio.sleep(2) return f'Data from {url}' async def main(): # 并发执行 results = await asyncio.gather( fetch_data('https://api.example.com/data1'), fetch_data('https://api.example.com/data2'), fetch_data('https://api.example.com/data3') ) print(results) asyncio.run(main())

3.2 asyncio.create_task

asyncio.create_task用于创建后台任务:

async def background_task(): while True: print('Background task running') await asyncio.sleep(1) async def main(): # 创建后台任务 task = asyncio.create_task(background_task()) # 执行其他操作 print('Main task running') await asyncio.sleep(3) # 取消后台任务 task.cancel() try: await task except asyncio.CancelledError: print('Background task cancelled') asyncio.run(main())

3.3 asyncio.wait

asyncio.wait用于等待多个协程完成:

async def task1(): await asyncio.sleep(2) return 'Task 1 result' async def task2(): await asyncio.sleep(1) return 'Task 2 result' async def main(): tasks = [task1(), task2()] done, pending = await asyncio.wait(tasks, timeout=1.5) print('Done tasks:', len(done)) print('Pending tasks:', len(pending)) for task in done: print('Result:', await task) asyncio.run(main())

4. 异步IO操作

4.1 文件IO

使用aiofiles库进行异步文件操作:

import asyncio import aiofiles async def read_file(filename): async with aiofiles.open(filename, 'r') as f: content = await f.read() return content async def write_file(filename, content): async with aiofiles.open(filename, 'w') as f: await f.write(content) async def main(): # 读取文件 content = await read_file('example.txt') print('File content:', content) # 写入文件 await write_file('output.txt', 'Hello, asyncio!') print('File written') asyncio.run(main())

4.2 网络IO

使用aiohttp库进行异步网络请求:

import asyncio import aiohttp async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'https://example.com') print('HTML length:', len(html)) asyncio.run(main())

4.3 数据库操作

使用asyncpg库进行异步数据库操作:

import asyncio import asyncpg async def main(): # 连接数据库 conn = await asyncpg.connect( host='localhost', port=5432, user='postgres', password='password', database='mydb' ) # 执行查询 rows = await conn.fetch('SELECT * FROM users') for row in rows: print(row) # 关闭连接 await conn.close() asyncio.run(main())

5. 高级特性

5.1 异步上下文管理器

使用async with语句创建异步上下文管理器:

import asyncio class AsyncContextManager: async def __aenter__(self): print('Entering context') await asyncio.sleep(1) return self async def __aexit__(self, exc_type, exc_val, exc_tb): print('Exiting context') await asyncio.sleep(1) async def main(): async with AsyncContextManager() as cm: print('Inside context') await asyncio.sleep(2) asyncio.run(main())

5.2 异步迭代器

使用async for语句创建异步迭代器:

import asyncio class AsyncIterator: def __init__(self, start, end): self.start = start self.end = end def __aiter__(self): self.current = self.start return self async def __anext__(self): if self.current >= self.end: raise StopAsyncIteration value = self.current self.current += 1 await asyncio.sleep(0.5) return value async def main(): async for num in AsyncIterator(1, 5): print(num) asyncio.run(main())

5.3 任务组

Python 3.11+引入了任务组,用于更安全地管理并发任务:

import asyncio async def task(id, duration): print(f'Task {id} started') await asyncio.sleep(duration) print(f'Task {id} completed') return f'Task {id} result' async def main(): async with asyncio.TaskGroup() as tg: # 创建任务 task1 = tg.create_task(task(1, 2)) task2 = tg.create_task(task(2, 1)) task3 = tg.create_task(task(3, 3)) # 任务组退出时,所有任务已完成 print('All tasks completed') print('Task 1 result:', task1.result()) print('Task 2 result:', task2.result()) print('Task 3 result:', task3.result()) asyncio.run(main())

6. 性能优化

6.1 避免阻塞操作

在异步代码中避免使用阻塞操作,如同步IO:

# 错误示例 async def bad_example(): # 阻塞操作 time.sleep(1) # 这会阻塞整个事件循环 print('Done') # 正确示例 async def good_example(): # 异步操作 await asyncio.sleep(1) # 这会释放事件循环 print('Done')

6.2 批量操作

对于多个IO操作,使用并发执行:

async def fetch_all(urls): async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] results = await asyncio.gather(*tasks) return results

6.3 超时处理

为异步操作设置超时:

async def fetch_with_timeout(url, timeout=5): try: async with aiohttp.ClientSession() as session: async with asyncio.timeout(timeout): async with session.get(url) as response: return await response.text() except asyncio.TimeoutError: return 'Request timed out'

7. 常见陷阱

7.1 忘记await

忘记使用await会导致协程不会执行:

async def foo(): print('Foo') await asyncio.sleep(1) print('Bar') async def main(): foo() # 错误:忘记await,协程不会执行 await foo() # 正确:使用await asyncio.run(main())

7.2 阻塞事件循环

在协程中使用阻塞操作会阻塞整个事件循环:

async def blocking_operation(): # 错误:使用阻塞操作 time.sleep(1) # 这会阻塞事件循环 return 'Done' async def main(): # 正确:使用线程池执行阻塞操作 loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, lambda: time.sleep(1)) return result

7.3 任务泄漏

创建的任务如果不等待或取消,会导致任务泄漏:

async def background_task(): while True: await asyncio.sleep(1) print('Background task') async def main(): # 错误:创建任务但不管理 asyncio.create_task(background_task()) await asyncio.sleep(5) # 任务会继续运行,导致泄漏 async def main_fixed(): # 正确:管理任务生命周期 task = asyncio.create_task(background_task()) await asyncio.sleep(5) task.cancel() try: await task except asyncio.CancelledError: pass

8. 代码示例

8.1 异步Web服务器

使用aiohttp创建异步Web服务器:

from aiohttp import web async def handle(request): name = request.match_info.get('name', 'World') # 模拟异步操作 await asyncio.sleep(0.5) return web.Response(text=f'Hello, {name}!') async def main(): app = web.Application() app.add_routes([ web.get('/', handle), web.get('/{name}', handle) ]) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, 'localhost', 8080) await site.start() print('Server started on http://localhost:8080') # 保持运行 await asyncio.Future() # 无限等待 if __name__ == '__main__': asyncio.run(main())

8.2 异步爬虫

使用aiohttp创建异步爬虫:

import asyncio import aiohttp from bs4 import BeautifulSoup async def fetch_url(session, url): try: async with session.get(url) as response: return await response.text() except Exception as e: print(f'Error fetching {url}: {e}') return '' async def parse_page(html): soup = BeautifulSoup(html, 'html.parser') links = [] for a in soup.find_all('a', href=True): links.append(a['href']) return links async def crawl(start_url, max_depth=2): visited = set() queue = [(start_url, 0)] async with aiohttp.ClientSession() as session: while queue: url, depth = queue.pop(0) if url in visited or depth >= max_depth: continue visited.add(url) print(f'Crawling {url} (depth: {depth})') html = await fetch_url(session, url) if not html: continue links = await parse_page(html) for link in links: if link.startswith('http'): queue.append((link, depth + 1)) async def main(): await crawl('https://example.com') asyncio.run(main())

8.3 异步数据库操作

使用asyncpg进行异步数据库操作:

import asyncio import asyncpg async def setup_database(): # 连接数据库 conn = await asyncpg.connect( host='localhost', port=5432, user='postgres', password='password', database='mydb' ) # 创建表 await conn.execute(''' CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, name VARCHAR(100), email VARCHAR(100) UNIQUE ) ''') # 插入数据 await conn.execute( 'INSERT INTO users (name, email) VALUES ($1, $2) ON CONFLICT DO NOTHING', 'Alice', 'alice@example.com' ) await conn.execute( 'INSERT INTO users (name, email) VALUES ($1, $2) ON CONFLICT DO NOTHING', 'Bob', 'bob@example.com' ) # 查询数据 rows = await conn.fetch('SELECT * FROM users') print('Users:') for row in rows: print(f'ID: {row["id"]}, Name: {row["name"]}, Email: {row["email"]}') # 关闭连接 await conn.close() asyncio.run(setup_database())

9. 工具与生态

9.1 常用库

  • aiohttp:异步HTTP客户端/服务器
  • aiofiles:异步文件操作
  • asyncpg:异步PostgreSQL客户端
  • motor:异步MongoDB客户端
  • redis-py:支持异步的Redis客户端
  • httpx:现代HTTP客户端,支持同步和异步

9.2 开发工具

  • asyncio-debug:调试异步代码
  • aiomonitor:监控异步应用
  • tqdm:支持异步的进度条

9.3 框架

  • FastAPI:现代异步Web框架
  • Sanic:高性能异步Web框架
  • Tornado:异步Web框架
  • Quart:Flask的异步版本

10. 结论

asyncio为Python带来了强大的异步编程能力,使得我们可以编写高效的I/O密集型应用。通过合理使用asyncio的各种特性,我们可以:

  • 提高应用的并发性能
  • 减少资源消耗
  • 编写更清晰、更可维护的代码
  • 充分利用现代硬件的多核性能

最佳实践

  1. 使用异步库:尽量使用支持异步的库,如aiohttp、aiofiles等
  2. 避免阻塞操作:在异步代码中避免使用阻塞操作
  3. 合理使用并发:使用asyncio.gather等工具并发执行任务
  4. 管理任务生命周期:确保任务被正确等待或取消
  5. 设置超时:为异步操作设置合理的超时
  6. 使用任务组:在Python 3.11+中使用TaskGroup管理任务
  7. 监控和调试:使用适当的工具监控和调试异步代码

未来发展

  • 更广泛的异步支持:越来越多的库将支持异步操作
  • 更简洁的语法:Python可能会引入更简洁的异步语法
  • 更好的工具支持:更多专门用于异步编程的工具和框架
  • 更深入的集成:异步编程将与Python生态系统更深入集成

通过掌握asyncio,我们可以构建高性能、可扩展的Python应用,特别是在处理大量I/O操作的场景中,如Web服务器、爬虫、数据处理等。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/25 1:01:20

从可解释AI到ChatGPT:聊聊模型‘透明度’如何影响你的实际项目选型

从可解释AI到ChatGPT:模型透明度如何重塑你的技术决策框架 在金融风控系统中,一个拒绝贷款申请的决策可能引发客户投诉甚至法律纠纷;在医疗AI辅助诊断时,医生需要理解模型为何标记某个病灶为恶性肿瘤;而当ChatGPT生成的…

作者头像 李华
网站建设 2026/4/25 0:54:18

抖音批量下载终极指南:免费开源工具快速上手

抖音批量下载终极指南:免费开源工具快速上手 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fallback support. 抖音批…

作者头像 李华
网站建设 2026/4/25 0:54:12

10人SolidWorks团队如何通过云主机实现“设计-仿真-制造”一体化

在制造业数字化转型加速的背景下,许多三维(SolidWorks、UG、CATIA等)设计团队正面临效率提升与成本控制的双重挑战。传统本地化工作站模式因硬件资源分散、协作效率低、数据孤岛等问题,逐渐难以满足复杂产品设计、多学科仿真及敏捷…

作者头像 李华