3377 字
17 分钟
Python asyncio 异步编程完全指南:从 async/await 到并发实战 | 2026最新入门教程
Python 的 asyncio 模块是现代 Python 异步编程的核心基础设施。从 Python 3.4 引入 asyncio,到 3.5 添加 async/await 语法,再到 3.11 的 TaskGroup,异步编程已成为 Python 高并发场景的首选方案。

本文将从零开始,系统讲解 Python asyncio 的完整知识体系,包括:
- 协程概念与 async/await 基础语法
- asyncio 核心 API:
run/gather/create_task/wait - 异步 HTTP 请求实战(aiohttp)
- 异步文件与数据库操作
- TaskGroup 任务组(Python 3.11+)
- 信号量限流与并发控制
- 异常处理与任务取消
- 同步 vs 异步性能对比
- 常见陷阱与最佳实践
一、为什么需要异步编程?
1.1 同步代码的问题
传统同步代码在处理 I/O 操作时会阻塞整个线程:
import requestsimport time
def fetch_urls(urls): results = [] for url in urls: response = requests.get(url) # 每次请求阻塞等待 results.append(response.text) return results
# 10 个 URL,每个耗时 1 秒 → 总耗时约 10 秒urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]start = time.time()fetch_urls(urls)print(f"同步耗时: {time.time() - start:.2f}s") # 约 10s1.2 异步代码的优势
异步代码可以在等待 I/O 时执行其他任务:
import aiohttpimport asyncioimport time
async def fetch_urls_async(urls): async with aiohttp.ClientSession() as session: tasks = [fetch_one(session, url) for url in urls] results = await asyncio.gather(*tasks) return results
async def fetch_one(session, url): async with session.get(url) as response: return await response.text()
# 10 个 URL,并发执行 → 总耗时约 1 秒urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]start = time.time()asyncio.run(fetch_urls_async(urls))print(f"异步耗时: {time.time() - start:.2f}s") # 约 1s异步编程的核心价值:
- 高并发处理:同时处理数千个 I/O 任务
- 资源高效:单线程即可实现并发,无需多线程开销
- 代码简洁:
async/await语法直观易懂 - 适合场景:网络请求、数据库查询、文件读写等 I/O 密集型任务
二、协程基础:async/await 语法
2.1 什么是协程?
协程(Coroutine)是一种可以暂停和恢复执行的函数。使用 async def 定义的函数就是协程:
async def my_coroutine(): print("开始执行") await asyncio.sleep(1) # 模拟 I/O 操作,暂停 1 秒 print("执行完成") return "结果"
# 协程调用方式result = asyncio.run(my_coroutine())print(result) # "结果"关键概念:
| 概念 | 说明 |
|---|---|
async def | 定义协程函数,调用后返回协程对象 |
await | 暂停协程,等待可等待对象(Awaitable)完成 |
asyncio.run() | 运行协程的入口点,创建事件循环 |
| 协程对象 | 调用协程函数返回的对象,需要被调度执行 |
2.2 可等待对象(Awaitable)
await 只能用于以下三类对象:
# 1. 协程(Coroutine)async def coro(): await asyncio.sleep(1) return "协程结果"
result = await coro()
# 2. 任务(Task)- 协程的调度包装task = asyncio.create_task(coro())result = await task
# 3. Future - 低层级异步操作的结果表示future = asyncio.Future()future.set_result("Future结果")result = await future2.3 协程 vs 函数 vs 任务
import asyncio
async def say_hello(): await asyncio.sleep(1) return "Hello"
# ❌ 直接调用协程不会执行coro = say_hello() # 返回协程对象,不执行# coro 是一个 coroutine 对象,需要被 await 或调度
# ✅ 正确方式 1: await 协程async def main1(): result = await say_hello() # 直接 await print(result)
# ✅ 正确方式 2: 创建 Taskasync def main2(): task = asyncio.create_task(say_hello()) # 立即调度 result = await task # 等待完成 print(result)
# ✅ 正确方式 3: asyncio.run()asyncio.run(main1())三、asyncio 核心 API 详解
3.1 asyncio.run() — 程序入口
asyncio.run() 是运行异步程序的标准入口(Python 3.7+):
import asyncio
async def main(): print("Hello asyncio") await asyncio.sleep(1) print("Done")
# 创建事件循环,运行协程,关闭循环asyncio.run(main())
# 注意:一个程序只能调用一次 asyncio.run()# 不要在已有事件循环中再次调用3.2 asyncio.gather() — 并发运行多个协程
gather 是最常用的并发执行工具:
import asyncio
async def task(name, delay): await asyncio.sleep(delay) print(f"{name} 完成") return f"{name} 结果"
async def main(): # 并发运行 3 个任务 results = await asyncio.gather( task("A", 1), task("B", 2), task("C", 1), ) print(results) # ['A 结果', 'B 结果', 'C 结果']
asyncio.run(main())gather 参数详解:
# return_exceptions=True: 异常作为结果返回,不中断其他任务results = await asyncio.gather( task("A", 1), failing_task(), # 这个会抛异常 task("C", 1), return_exceptions=True)# results = ['A 结果', ValueError(...), 'C 结果']
# return_exceptions=False (默认): 第一个异常会中断 gather3.3 asyncio.create_task() — 创建后台任务
create_task 将协程包装为 Task 并立即调度:
import asyncio
async def background_task(): while True: print("后台任务运行中...") await asyncio.sleep(1)
async def main(): # 创建后台任务,立即开始执行 task = asyncio.create_task(background_task())
# 主任务执行其他工作 await asyncio.sleep(3) print("主任务完成")
# 取消后台任务 task.cancel() try: await task except asyncio.CancelledError: print("后台任务已取消")
asyncio.run(main())3.4 asyncio.wait() — 更灵活的等待控制
wait 提供更细粒度的控制:
import asyncio
async def task(name, delay): await asyncio.sleep(delay) return name
async def main(): tasks = [asyncio.create_task(task(f"T{i}", i)) for i in range(5)]
# FIRST_COMPLETED: 任一完成即返回 done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED ) print(f"已完成: {done}") print(f"待处理: {pending}")
# 取消剩余任务 for t in pending: t.cancel()
asyncio.run(main())return_when 选项:
| 选项 | 说明 |
|---|---|
ALL_COMPLETED | 所有任务完成(默认) |
FIRST_COMPLETED | 任一任务完成即返回 |
FIRST_EXCEPTION | 任一任务异常即返回 |
3.5 asyncio.sleep() — 异步等待
# 异步等待,不阻塞线程await asyncio.sleep(1) # 1 秒
# 等待 0 秒:让事件循环有机会调度其他任务await asyncio.sleep(0) # 立即切换,类似 yield四、实战:异步 HTTP 请求
4.1 使用 aiohttp 进行并发请求
import aiohttpimport asyncio
async def fetch_url(session, url): try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response: if response.status == 200: return await response.text() else: return f"Error: {response.status}" except asyncio.TimeoutError: return "Timeout" except Exception as e: return f"Exception: {e}"
async def fetch_all(urls): async with aiohttp.ClientSession() as session: tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) return results
# 示例urls = [ "https://httpbin.org/get", "https://httpbin.org/ip", "https://httpbin.org/headers",]
results = asyncio.run(fetch_all(urls))for r in results[:100]: # 只显示前 100 字符 print(r[:100])4.2 POST 请求与 JSON 处理
import aiohttpimport asyncio
async def post_json(): async with aiohttp.ClientSession() as session: payload = {"name": "test", "value": 123}
async with session.post( "https://httpbin.org/post", json=payload, headers={"Content-Type": "application/json"} ) as response: data = await response.json() print(data["json"]) # {"name": "test", "value": 123}
asyncio.run(post_json())4.3 连接池与会话复用
import aiohttpimport asyncio
# 全局会话复用(推荐)class HttpClient: _session = None
@classmethod async def get_session(cls): if cls._session is None or cls._session.closed: cls._session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=30), connector=aiohttp.TCPConnector(limit=100) # 连接池限制 ) return cls._session
@classmethod async def close(cls): if cls._session: await cls._session.close()
async def fetch(url): session = await HttpClient.get_session() async with session.get(url) as response: return await response.text()
async def main(): urls = [f"https://httpbin.org/get?id={i}" for i in range(50)] results = await asyncio.gather(*[fetch(url) for url in urls]) await HttpClient.close() print(f"获取了 {len(results)} 个响应")
asyncio.run(main())五、实战:异步文件操作
5.1 使用 aiofiles 异步读写文件
import aiofilesimport asyncio
async def write_file(path, content): async with aiofiles.open(path, mode='w', encoding='utf-8') as f: await f.write(content)
async def read_file(path): async with aiofiles.open(path, mode='r', encoding='utf-8') as f: content = await f.read() return content
async def main(): await write_file("test.txt", "Hello aiofiles!") content = await read_file("test.txt") print(content)
asyncio.run(main())5.2 批量异步文件处理
import aiofilesimport asyncioimport os
async def process_file(filepath): async with aiofiles.open(filepath, 'r', encoding='utf-8') as f: content = await f.read() # 处理内容 lines = content.split('\n') return filepath, len(lines)
async def process_directory(directory): files = [f for f in os.listdir(directory) if f.endswith('.txt')] tasks = [process_file(os.path.join(directory, f)) for f in files] results = await asyncio.gather(*tasks)
for filepath, count in results: print(f"{filepath}: {count} 行")
asyncio.run(process_directory("./data"))六、TaskGroup 任务组(Python 3.11+)
Python 3.11 引入了更安全的 TaskGroup:
6.1 TaskGroup 基础用法
import asyncio
async def task(name, delay): await asyncio.sleep(delay) print(f"{name} 完成") return name
async def main(): async with asyncio.TaskGroup() as tg: t1 = tg.create_task(task("A", 1)) t2 = tg.create_task(task("B", 2)) t3 = tg.create_task(task("C", 1))
# TaskGroup 退出时,所有任务已完成 print(f"结果: {t1.result()}, {t2.result()}, {t3.result()}")
asyncio.run(main())6.2 TaskGroup 的异常处理优势
import asyncio
async def failing_task(): await asyncio.sleep(0.5) raise ValueError("任务失败")
async def normal_task(): await asyncio.sleep(1) return "正常完成"
async def main(): try: async with asyncio.TaskGroup() as tg: tg.create_task(normal_task()) tg.create_task(failing_task()) tg.create_task(normal_task()) except ExceptionGroup as eg: # ExceptionGroup 包含所有异常 print(f"捕获异常组: {eg}") for exc in eg.exceptions: print(f" - {type(exc).__name__}: {exc}")
asyncio.run(main())TaskGroup vs gather:
| 特性 | TaskGroup | gather |
|---|---|---|
| 异常处理 | 自动收集所有异常为 ExceptionGroup | 默认中断,需 return_exceptions=True |
| 任务取消 | 任一异常时自动取消所有任务 | 需手动处理 |
| 代码风格 | 结构化并发,更安全 | 灵活但需更多手动控制 |
| Python 版本 | 3.11+ | 3.4+ |
七、并发控制:信号量与限流
7.1 asyncio.Semaphore — 限制并发数
import aiohttpimport asyncio
async def fetch_with_limit(session, url, semaphore): async with semaphore: # 获取信号量,限制并发 async with session.get(url) as response: return await response.text()
async def fetch_all_limited(urls, max_concurrent=10): semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session: tasks = [ fetch_with_limit(session, url, semaphore) for url in urls ] results = await asyncio.gather(*tasks) return results
# 限制最多 5 个并发请求urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(20)]asyncio.run(fetch_all_limited(urls, max_concurrent=5))7.2 自定义速率限制器
import asyncioimport time
class RateLimiter: def __init__(self, rate_per_second): self.interval = 1.0 / rate_per_second self.last_time = 0
async def acquire(self): current = time.monotonic() wait_time = self.last_time + self.interval - current if wait_time > 0: await asyncio.sleep(wait_time) self.last_time = time.monotonic()
async def fetch_rate_limited(urls, rate=2): limiter = RateLimiter(rate) results = []
async with aiohttp.ClientSession() as session: for url in urls: await limiter.acquire() # 限制速率 async with session.get(url) as response: results.append(await response.text())
return results
# 每秒最多 2 个请求asyncio.run(fetch_rate_limited(urls, rate=2))八、异步数据库操作
8.1 异步 SQLite(aiosqlite)
import aiosqliteimport asyncio
async def init_db(): async with aiosqlite.connect("app.db") as db: await db.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, email TEXT UNIQUE ) """) await db.commit()
async def insert_user(name, email): async with aiosqlite.connect("app.db") as db: await db.execute( "INSERT INTO users (name, email) VALUES (?, ?)", (name, email) ) await db.commit()
async def get_all_users(): async with aiosqlite.connect("app.db") as db: async with db.execute("SELECT * FROM users") as cursor: rows = await cursor.fetchall() return rows
async def main(): await init_db() await insert_user("Alice", "alice@example.com") await insert_user("Bob", "bob@example.com") users = await get_all_users() print(users)
asyncio.run(main())8.2 异步 PostgreSQL(asyncpg)
import asyncpgimport asyncio
async def query_postgres(): conn = await asyncpg.connect( host="localhost", port=5432, user="postgres", password="password", database="mydb" )
# 执行查询 rows = await conn.fetch("SELECT * FROM users LIMIT 10") for row in rows: print(row)
# 执行插入 await conn.execute( "INSERT INTO users (name, email) VALUES ($1, $2)", "Charlie", "charlie@example.com" )
await conn.close()
asyncio.run(query_postgres())九、任务取消与超时处理
9.1 取消任务
import asyncio
async def long_running_task(): try: print("任务开始") for i in range(10): await asyncio.sleep(1) print(f"进度: {i+1}/10") print("任务完成") except asyncio.CancelledError: print("任务被取消,执行清理...") # 执行清理操作 raise # 重新抛出,让调用者知道被取消
async def main(): task = asyncio.create_task(long_running_task())
await asyncio.sleep(3) task.cancel() # 发送取消请求
try: await task except asyncio.CancelledError: print("确认任务已取消")
asyncio.run(main())9.2 超时处理
import asyncio
async def slow_operation(): await asyncio.sleep(10) return "完成"
async def main(): try: # asyncio.wait_for 设置超时 result = await asyncio.wait_for(slow_operation(), timeout=3.0) print(result) except asyncio.TimeoutError: print("操作超时")
asyncio.run(main())9.3 使用 asyncio.timeout(Python 3.11+)
import asyncio
async def main(): async with asyncio.timeout(3.0): await slow_operation() # 超时后自动取消
asyncio.run(main())十、同步 vs 异步性能对比
10.1 网络请求对比
import asyncioimport aiohttpimport requestsimport time
# 同步版本def sync_fetch(urls): start = time.time() results = [] for url in urls: results.append(requests.get(url).text) elapsed = time.time() - start return results, elapsed
# 异步版本async def async_fetch(urls): start = time.time() async with aiohttp.ClientSession() as session: tasks = [session.get(url) for url in urls] responses = await asyncio.gather(*tasks) results = [await r.text() for r in responses] elapsed = time.time() - start return results, elapsed
# 测试urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]
# 同步: 约 10 秒_, sync_time = sync_fetch(urls)print(f"同步耗时: {sync_time:.2f}s")
# 异步: 约 1 秒_, async_time = asyncio.run(async_fetch(urls))print(f"异步耗时: {async_time:.2f}s")
# 性能提升约 10 倍print(f"性能提升: {sync_time/async_time:.1f}x")10.2 适用场景总结
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 网络请求 | asyncio | 等待时间占比高,异步可并发 |
| 数据库查询 | asyncio | I/O 等待,异步可并发查询 |
| 文件读写 | asyncio | 磁盘 I/O,异步可并行处理 |
| CPU 密集计算 | multiprocessing | 需多核并行,异步无优势 |
| 混合任务 | asyncio + 线程池 | I/O 用异步,CPU 用线程池 |
十一、常见陷阱与最佳实践
❌ 陷阱 1:在同步函数中调用协程
# ❌ 错误:直接调用协程不执行async def my_coro(): return "结果"
def wrong_call(): result = my_coro() # 返回协程对象,不是结果 print(result) # <coroutine object my_coro at ...>
# ✅ 正确def correct_call(): result = asyncio.run(my_coro()) print(result) # "结果"❌ 陷阱 2:阻塞事件循环
# ❌ 错误:在协程中使用阻塞操作async def bad_example(): time.sleep(5) # 阻塞整个事件循环! return "完成"
# ✅ 正确:使用异步版本async def good_example(): await asyncio.sleep(5) # 不阻塞,允许其他任务运行 return "完成"
# ✅ 正确:在线程池中运行阻塞代码async def run_blocking(): result = await asyncio.to_thread(blocking_function) return result❌ 陷阱 3:忘记关闭 ClientSession
# ❌ 错误:不关闭会话导致资源泄漏async def bad(): session = aiohttp.ClientSession() await session.get("https://example.com") # 没有 close!
# ✅ 正确:使用 async withasync def good(): async with aiohttp.ClientSession() as session: await session.get("https://example.com") # 自动关闭❌ 陷阱 4:过度并发导致资源耗尽
# ❌ 错误:无限制并发async def bad(): tasks = [fetch(url) for url in urls] # 可能数千个 await asyncio.gather(*tasks) # 可能耗尽内存或触发服务器限流
# ✅ 正确:使用信号量限制async def good(): semaphore = asyncio.Semaphore(100) tasks = [fetch_with_limit(url, semaphore) for url in urls] await asyncio.gather(*tasks)✅ 最佳实践清单
| 原则 | 说明 |
|---|---|
| 使用 async with | 确保资源正确释放(session、文件、锁) |
| 限制并发数 | 使用 Semaphore 防止资源耗尽 |
| 避免阻塞调用 | 用 asyncio.to_thread 包装阻塞函数 |
| 处理超时 | 使用 wait_for 或 asyncio.timeout |
| 正确取消任务 | 捕获 CancelledError 并执行清理 |
| 优先 TaskGroup | Python 3.11+ 使用 TaskGroup 更安全 |
| 复用 ClientSession | 避免每次请求创建新会话 |
十二、异步与同步代码混用
12.1 asyncio.to_thread — 在协程中运行阻塞函数
import asyncioimport time
def blocking_io(): time.sleep(2) # 阻塞操作 return "阻塞函数结果"
async def main(): # 在线程池中运行阻塞函数,不阻塞事件循环 result = await asyncio.to_thread(blocking_io) print(result)
# 同时可以运行其他异步任务 await asyncio.sleep(1)
asyncio.run(main())12.2 asyncio.run_in_executor
import asynciofrom concurrent.futures import ThreadPoolExecutor
def cpu_intensive_task(n): return sum(range(n))
async def main(): executor = ThreadPoolExecutor(max_workers=4) loop = asyncio.get_event_loop()
# 在线程池中运行 CPU 密集任务 result = await loop.run_in_executor( executor, cpu_intensive_task, 10_000_000 ) print(f"计算结果: {result}")
asyncio.run(main())十三、总结:asyncio 学习路径
| 阶段 | 内容 | 关键 API |
|---|---|---|
| 入门 | async/await 语法、协程概念 | async def, await, asyncio.run() |
| 基础 | 并发执行、任务管理 | gather, create_task, sleep |
| 实战 | HTTP 请求、文件操作 | aiohttp, aiofiles, aiosqlite |
| 进阶 | 并发控制、异常处理 | Semaphore, wait_for, cancel |
| 高级 | TaskGroup、混用同步代码 | TaskGroup, to_thread, run_in_executor |
掌握 asyncio 是 Python 高并发编程的关键。从网络爬虫到 API 服务,从数据处理到实时通信,异步编程能显著提升程序性能。建议从简单的并发请求开始,逐步掌握更复杂的场景,最终将 asyncio 融入你的日常开发工具箱。
Python asyncio 异步编程完全指南:从 async/await 到并发实战 | 2026最新入门教程
https://971918.xyz/posts/python-guide/python-asyncio-guide/