news 2026/4/18 8:28:02

FastAPI的异步开发-Asyncio

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
FastAPI的异步开发-Asyncio

Asyncio实现学习方案与实现逻辑

Asyncio具体的实现逻辑的流程:

  1. 初始化信号量sem来控制每次处理的数量
  2. 先实现单个任务方法,包括传入的参数都是单个处理的,一般单个方法中还要加入信号量
  3. 在另一方法中先通过创建外部client,防止每执行一次任务重启一次服务,再通过TaskGroup的方式用循环/批量将create_task去调用单个任务方法,并且通过tasks将任务记录
  4. 同时还要保证在TaskGroup中有try...except机制,防止因为一个任务错误而取消所有任务

🗺️ 总览:从入门到生产环境 (2周计划)

  • 阶段一:核心思维与防坑 (Day 1-3)—— 彻底搞懂“阻塞”与“非阻塞”。
  • 阶段二:控制流与现代模式 (Day 4-6)—— 掌握TaskGroup、信号量与超时。
  • 阶段三:真实世界的 I/O (Day 7-10)—— 异步文件操作 (aiofiles) 与 数据库 (asyncpg)。
  • 阶段四:工程化实践 (Day 11-14)—— 单元测试 (pytest)、调试模式与性能分析。

🟢 第一阶段:核心思维与防坑 (Day 1-3)

目标:不仅仅会写async def,更要懂得如何不卡死事件循环。这是新手最容易犯错的地方。

1. 核心知识点
  • Event Loop (事件循环):程序的“心脏”,必须保持每秒几千次的空转,绝不能被一段代码卡住。
  • Blocking (阻塞):任何占用 CPU 超过 50ms 或进行同步 IO(如requests.gettime.sleep)的操作。
  • 解决方法:使用await让出控制权;对于必须同步的库,使用asyncio.to_thread扔到线程池。
2. 代码对比:错误的阻塞 vs 正确的异步

错误示范 (卡死 Loop)

Python

import asyncio import time async def blocking_task(name): print(f"[{name}] 开始干活 (阻塞中...)") # 💥 致命错误:time.sleep 暂停了整个线程,其他任务全部无法运行! time.sleep(2) print(f"[{name}] 结束") async def main(): # 这里的 Task 2 必须等 Task 1 彻底运行完这2秒才能开始,完全串行了 await asyncio.gather(blocking_task("A"), blocking_task("B")) if __name__ == "__main__": asyncio.run(main())

正确示范 (非阻塞)

Python

import asyncio import time async def async_task(name): print(f"[{name}] 开始干活") # ✨ 正确:asyncio.sleep 挂起当前任务,Loop 去执行其他任务 await asyncio.sleep(2) print(f"[{name}] 结束") # 💡 救命招数:如果你必须用同步库(如 requests/pandas) def heavy_calculation(): time.sleep(2) # 模拟耗时计算 return "计算完成" async def main(): start = time.perf_counter() # 1. 纯异步任务 task1 = asyncio.create_task(async_task("A")) # 2. 将同步阻塞代码扔到线程池运行,不卡 Loop # (Python 3.9+ 新特性,旧版本用 loop.run_in_executor) task2 = asyncio.to_thread(heavy_calculation) await asyncio.gather(task1, task2) print(f"总耗时: {time.perf_counter() - start:.2f}s") # 应该是2秒左右,而不是4秒 if __name__ == "__main__": asyncio.run(main())

🔵 第二阶段:控制流与现代模式 (Day 4-6)

目标:掌握 Python 3.11+ 推荐的结构化并发(Structured Concurrency)。

1. 核心知识点
  • TaskGroup (Python 3.11+):比gather更安全的任务管理方式,自动处理异常取消。
  • Semaphore (信号量)最重要的限流工具,防止你把下游服务打挂。
2. 实战代码:带限流的高并发采集

Python

import asyncio import random # 限制最大并发数为 5 sem = asyncio.Semaphore(5) async def worker(task_id): # 使用 async with 自动管理 acquire/release async with sem: print(f"工号 {task_id} 正在执行... (剩余名额: {sem._value})") await asyncio.sleep(random.uniform(0.5, 1.5)) return f"结果-{task_id}" async def main(): async with asyncio.TaskGroup() as tg: tasks = [] for i in range(20): # tg.create_task 会自动将任务加入管理 # 如果其中一个任务报错,TaskGroup 会自动取消其他所有任务 tasks.append(tg.create_task(worker(i))) # 所有任务完成后,获取结果 results = [t.result() for t in tasks] print(f"成功处理 {len(results)} 个任务") if __name__ == "__main__": asyncio.run(main())

🟠 第三阶段:真实世界的 I/O (Day 7-10)

目标:告别open()和同步数据库驱动。

1. 文件操作:aiofiles

不要在async def里直接用with open(...),因为读写硬盘会阻塞 Loop。

Python

# pip install aiofiles import aiofiles import asyncio async def write_log(text): async with aiofiles.open('async_test.log', mode='a', encoding='utf-8') as f: await f.write(f"{text}\n") async def main(): # 并发写入 100 行 await asyncio.gather(*(write_log(f"Log entry {i}") for i in range(100))) if __name__ == "__main__": asyncio.run(main())
2. 数据库操作:asyncpg(PostgreSQL)

psycopg2快很多,且完全异步。

Python

# pip install asyncpg import asyncpg import asyncio async def run(): conn = await asyncpg.connect(user='user', password='pwd', database='test', host='127.0.0.1') # 插入数据 await conn.execute(''' INSERT INTO users(name, dob) VALUES($1, $2) ''', 'Bob', '1984-03-01') # 查询数据 values = await conn.fetch('''SELECT * FROM users WHERE name = $1''', 'Bob') print(values) await conn.close() if __name__ == "__main__": asyncio.run(run())

🟣 第四阶段:工程化实践 (Debug & Test) (Day 11-14)

目标:写出健壮的异步代码。

1. 开启上帝视角:Debug 模式

当你发现程序变慢却不知道卡在哪里时,开启 Debug 模式。

Python

# 启动时开启 debug # 它会告诉你:“某个协程阻塞了 Loop 超过 0.1秒!” 并打印出是哪一行代码。 asyncio.run(main(), debug=True)
2. 单元测试:pytest-asyncio

传统的unittest无法测试async函数。

Python

# pip install pytest pytest-asyncio import pytest import asyncio # 业务代码 async def add(a, b): await asyncio.sleep(0.01) return a + b # 测试代码 @pytest.mark.asyncio async def test_add(): result = await add(2, 3) assert result == 5

🎓 毕业挑战题目 (完善版)

为了验证学习成果,请尝试完成以下工具:

题目:异步端口扫描器 (Port Scanner)

需求:

  1. 输入:一个 IP 地址 (如192.168.1.1) 和端口范围 (1-1024)。
  2. 核心逻辑
    • 使用asyncio.open_connection(ip, port)尝试连接。
    • 设置wait_for超时时间为 0.5 秒(扫描要快,超时的视为关闭)。
  1. 并发要求
    • 同时扫描 1000 个端口(需要用到Semaphore限制并发,否则系统报错 "Too many open files")。
  1. 输出:实时打印开放的端口号,最后统计总耗时。

提示

  • 如果asyncio.open_connection没有抛出异常,说明端口是通的,记得writer.close()
  • 使用try...except捕获TimeoutErrorConnectionRefusedError

实现代码:

import asyncio sem = asyncio.Semaphore(500) async def scan_port(ip, port): async with sem: try: reader, writer = await asyncio.wait_for(asyncio.open_connection(ip, port), timeout=0.5) print(f"Port{port} is open") writer.close() await writer.wait_closed() except ConnectionRefusedError: print(f"{ip}:{port} 未开启") except TimeoutError: print(f"{ip}:{port} 超时") async def main(): ip = "192.168.2.217" ports = [i for i in range(8000, 15000)] async with asyncio.TaskGroup() as tg: tasks = [] for port in ports: tasks.append(tg.create_task(scan_port(ip, port))) # 运行主协程并统计耗时 asyncio.run(main())
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/27 18:41:01

基于.net6的一款开源的低代码、权限、工作流、动态接口平台-动态接口篇

概述动态接口允许用户在运行时创建和修改API端点,而无需重新部署应用程序。这对于需要频繁更改API结构的应用程序特别有用。通过动态接口,开发人员可以根据业务需求快速调整API,提升开发效率和响应速度。功能特点动态创建和修改API端点&#…

作者头像 李华
网站建设 2026/4/16 17:50:36

KOReader电子书阅读器全面解析:从新手到高手的进阶之路

KOReader电子书阅读器全面解析:从新手到高手的进阶之路 【免费下载链接】koreader An ebook reader application supporting PDF, DjVu, EPUB, FB2 and many more formats, running on Cervantes, Kindle, Kobo, PocketBook and Android devices 项目地址: https:…

作者头像 李华
网站建设 2026/4/16 11:54:38

git 怎么把main分支里的项目改到master,同时删除main分支

# 1. 切换到 main 分支 git checkout main# 2. 重命名 main 到 master git branch -m main master# 3. 推送到远程(强制覆盖) git push origin -u master --force# 4. 删除远程的 main 分支 git push origin --delete main# 5. 更新本地追踪分支 git fet…

作者头像 李华
网站建设 2026/4/14 15:45:19

Android010 MMC SD卡 驱动初始化通讯相关

一、官方标准原图如下:二、源码流程分析如下:三、方法解释(待补充):2.1 SD卡插拔检测1. sdhci_irq:中断信号2. mmc_gpio_cd_irqt:判断插拔3. mmc_detect_change4. mmc_rescan5. mmc_rescan_try_…

作者头像 李华
网站建设 2026/4/18 5:42:16

纳米柱阵列超颖表面构建模块的严格分析

摘要 利用先进的制造技术,人们成功实现了具有高数值孔径的可见波长的超透镜。通常使用空间变化的纳米结构作为模块来构建超透镜。在这个例子中分析了用于组成偏振不敏感超透镜的纳米柱状结构。利用傅立叶模态方法(FMM,也称为RCWA)…

作者头像 李华