3377 字
17 分钟

Python asyncio 异步编程完全指南:从 async/await 到并发实战 | 2026最新入门教程

Python 的 asyncio 模块是现代 Python 异步编程的核心基础设施。从 Python 3.4 引入 asyncio,到 3.5 添加 async/await 语法,再到 3.11 的 TaskGroup,异步编程已成为 Python 高并发场景的首选方案。

Python asyncio 异步编程示意图

本文将从零开始,系统讲解 Python asyncio 的完整知识体系,包括:

  • 协程概念与 async/await 基础语法
  • asyncio 核心 API:run / gather / create_task / wait
  • 异步 HTTP 请求实战(aiohttp)
  • 异步文件与数据库操作
  • TaskGroup 任务组(Python 3.11+)
  • 信号量限流与并发控制
  • 异常处理与任务取消
  • 同步 vs 异步性能对比
  • 常见陷阱与最佳实践

一、为什么需要异步编程?#

1.1 同步代码的问题#

传统同步代码在处理 I/O 操作时会阻塞整个线程:

import requests
import time
def fetch_urls(urls):
results = []
for url in urls:
response = requests.get(url) # 每次请求阻塞等待
results.append(response.text)
return results
# 10 个 URL,每个耗时 1 秒 → 总耗时约 10 秒
urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]
start = time.time()
fetch_urls(urls)
print(f"同步耗时: {time.time() - start:.2f}s") # 约 10s

1.2 异步代码的优势#

异步代码可以在等待 I/O 时执行其他任务:

import aiohttp
import asyncio
import time
async def fetch_urls_async(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def fetch_one(session, url):
async with session.get(url) as response:
return await response.text()
# 10 个 URL,并发执行 → 总耗时约 1 秒
urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]
start = time.time()
asyncio.run(fetch_urls_async(urls))
print(f"异步耗时: {time.time() - start:.2f}s") # 约 1s

异步编程的核心价值

  • 高并发处理:同时处理数千个 I/O 任务
  • 资源高效:单线程即可实现并发,无需多线程开销
  • 代码简洁async/await 语法直观易懂
  • 适合场景:网络请求、数据库查询、文件读写等 I/O 密集型任务

二、协程基础:async/await 语法#

2.1 什么是协程?#

协程(Coroutine)是一种可以暂停和恢复执行的函数。使用 async def 定义的函数就是协程:

async def my_coroutine():
print("开始执行")
await asyncio.sleep(1) # 模拟 I/O 操作,暂停 1 秒
print("执行完成")
return "结果"
# 协程调用方式
result = asyncio.run(my_coroutine())
print(result) # "结果"

关键概念

概念说明
async def定义协程函数,调用后返回协程对象
await暂停协程,等待可等待对象(Awaitable)完成
asyncio.run()运行协程的入口点,创建事件循环
协程对象调用协程函数返回的对象,需要被调度执行

2.2 可等待对象(Awaitable)#

await 只能用于以下三类对象:

# 1. 协程(Coroutine)
async def coro():
await asyncio.sleep(1)
return "协程结果"
result = await coro()
# 2. 任务(Task)- 协程的调度包装
task = asyncio.create_task(coro())
result = await task
# 3. Future - 低层级异步操作的结果表示
future = asyncio.Future()
future.set_result("Future结果")
result = await future

2.3 协程 vs 函数 vs 任务#

import asyncio
async def say_hello():
await asyncio.sleep(1)
return "Hello"
# ❌ 直接调用协程不会执行
coro = say_hello() # 返回协程对象,不执行
# coro 是一个 coroutine 对象,需要被 await 或调度
# ✅ 正确方式 1: await 协程
async def main1():
result = await say_hello() # 直接 await
print(result)
# ✅ 正确方式 2: 创建 Task
async def main2():
task = asyncio.create_task(say_hello()) # 立即调度
result = await task # 等待完成
print(result)
# ✅ 正确方式 3: asyncio.run()
asyncio.run(main1())

三、asyncio 核心 API 详解#

3.1 asyncio.run() — 程序入口#

asyncio.run() 是运行异步程序的标准入口(Python 3.7+):

import asyncio
async def main():
print("Hello asyncio")
await asyncio.sleep(1)
print("Done")
# 创建事件循环,运行协程,关闭循环
asyncio.run(main())
# 注意:一个程序只能调用一次 asyncio.run()
# 不要在已有事件循环中再次调用

3.2 asyncio.gather() — 并发运行多个协程#

gather 是最常用的并发执行工具:

import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} 完成")
return f"{name} 结果"
async def main():
# 并发运行 3 个任务
results = await asyncio.gather(
task("A", 1),
task("B", 2),
task("C", 1),
)
print(results) # ['A 结果', 'B 结果', 'C 结果']
asyncio.run(main())

gather 参数详解

# return_exceptions=True: 异常作为结果返回,不中断其他任务
results = await asyncio.gather(
task("A", 1),
failing_task(), # 这个会抛异常
task("C", 1),
return_exceptions=True
)
# results = ['A 结果', ValueError(...), 'C 结果']
# return_exceptions=False (默认): 第一个异常会中断 gather

3.3 asyncio.create_task() — 创建后台任务#

create_task 将协程包装为 Task 并立即调度:

import asyncio
async def background_task():
while True:
print("后台任务运行中...")
await asyncio.sleep(1)
async def main():
# 创建后台任务,立即开始执行
task = asyncio.create_task(background_task())
# 主任务执行其他工作
await asyncio.sleep(3)
print("主任务完成")
# 取消后台任务
task.cancel()
try:
await task
except asyncio.CancelledError:
print("后台任务已取消")
asyncio.run(main())

3.4 asyncio.wait() — 更灵活的等待控制#

wait 提供更细粒度的控制:

import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return name
async def main():
tasks = [asyncio.create_task(task(f"T{i}", i)) for i in range(5)]
# FIRST_COMPLETED: 任一完成即返回
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"已完成: {done}")
print(f"待处理: {pending}")
# 取消剩余任务
for t in pending:
t.cancel()
asyncio.run(main())

return_when 选项

选项说明
ALL_COMPLETED所有任务完成(默认)
FIRST_COMPLETED任一任务完成即返回
FIRST_EXCEPTION任一任务异常即返回

3.5 asyncio.sleep() — 异步等待#

# 异步等待,不阻塞线程
await asyncio.sleep(1) # 1 秒
# 等待 0 秒:让事件循环有机会调度其他任务
await asyncio.sleep(0) # 立即切换,类似 yield

四、实战:异步 HTTP 请求#

4.1 使用 aiohttp 进行并发请求#

import aiohttp
import asyncio
async def fetch_url(session, url):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
return await response.text()
else:
return f"Error: {response.status}"
except asyncio.TimeoutError:
return "Timeout"
except Exception as e:
return f"Exception: {e}"
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 示例
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/headers",
]
results = asyncio.run(fetch_all(urls))
for r in results[:100]: # 只显示前 100 字符
print(r[:100])

4.2 POST 请求与 JSON 处理#

import aiohttp
import asyncio
async def post_json():
async with aiohttp.ClientSession() as session:
payload = {"name": "test", "value": 123}
async with session.post(
"https://httpbin.org/post",
json=payload,
headers={"Content-Type": "application/json"}
) as response:
data = await response.json()
print(data["json"]) # {"name": "test", "value": 123}
asyncio.run(post_json())

4.3 连接池与会话复用#

import aiohttp
import asyncio
# 全局会话复用(推荐)
class HttpClient:
_session = None
@classmethod
async def get_session(cls):
if cls._session is None or cls._session.closed:
cls._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
connector=aiohttp.TCPConnector(limit=100) # 连接池限制
)
return cls._session
@classmethod
async def close(cls):
if cls._session:
await cls._session.close()
async def fetch(url):
session = await HttpClient.get_session()
async with session.get(url) as response:
return await response.text()
async def main():
urls = [f"https://httpbin.org/get?id={i}" for i in range(50)]
results = await asyncio.gather(*[fetch(url) for url in urls])
await HttpClient.close()
print(f"获取了 {len(results)} 个响应")
asyncio.run(main())

五、实战:异步文件操作#

5.1 使用 aiofiles 异步读写文件#

import aiofiles
import asyncio
async def write_file(path, content):
async with aiofiles.open(path, mode='w', encoding='utf-8') as f:
await f.write(content)
async def read_file(path):
async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
content = await f.read()
return content
async def main():
await write_file("test.txt", "Hello aiofiles!")
content = await read_file("test.txt")
print(content)
asyncio.run(main())

5.2 批量异步文件处理#

import aiofiles
import asyncio
import os
async def process_file(filepath):
async with aiofiles.open(filepath, 'r', encoding='utf-8') as f:
content = await f.read()
# 处理内容
lines = content.split('\n')
return filepath, len(lines)
async def process_directory(directory):
files = [f for f in os.listdir(directory) if f.endswith('.txt')]
tasks = [process_file(os.path.join(directory, f)) for f in files]
results = await asyncio.gather(*tasks)
for filepath, count in results:
print(f"{filepath}: {count} 行")
asyncio.run(process_directory("./data"))

六、TaskGroup 任务组(Python 3.11+)#

Python 3.11 引入了更安全的 TaskGroup

6.1 TaskGroup 基础用法#

import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} 完成")
return name
async def main():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task("A", 1))
t2 = tg.create_task(task("B", 2))
t3 = tg.create_task(task("C", 1))
# TaskGroup 退出时,所有任务已完成
print(f"结果: {t1.result()}, {t2.result()}, {t3.result()}")
asyncio.run(main())

6.2 TaskGroup 的异常处理优势#

import asyncio
async def failing_task():
await asyncio.sleep(0.5)
raise ValueError("任务失败")
async def normal_task():
await asyncio.sleep(1)
return "正常完成"
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(normal_task())
tg.create_task(failing_task())
tg.create_task(normal_task())
except ExceptionGroup as eg:
# ExceptionGroup 包含所有异常
print(f"捕获异常组: {eg}")
for exc in eg.exceptions:
print(f" - {type(exc).__name__}: {exc}")
asyncio.run(main())

TaskGroup vs gather

特性TaskGroupgather
异常处理自动收集所有异常为 ExceptionGroup默认中断,需 return_exceptions=True
任务取消任一异常时自动取消所有任务需手动处理
代码风格结构化并发,更安全灵活但需更多手动控制
Python 版本3.11+3.4+

七、并发控制:信号量与限流#

7.1 asyncio.Semaphore — 限制并发数#

import aiohttp
import asyncio
async def fetch_with_limit(session, url, semaphore):
async with semaphore: # 获取信号量,限制并发
async with session.get(url) as response:
return await response.text()
async def fetch_all_limited(urls, max_concurrent=10):
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
return results
# 限制最多 5 个并发请求
urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(20)]
asyncio.run(fetch_all_limited(urls, max_concurrent=5))

7.2 自定义速率限制器#

import asyncio
import time
class RateLimiter:
def __init__(self, rate_per_second):
self.interval = 1.0 / rate_per_second
self.last_time = 0
async def acquire(self):
current = time.monotonic()
wait_time = self.last_time + self.interval - current
if wait_time > 0:
await asyncio.sleep(wait_time)
self.last_time = time.monotonic()
async def fetch_rate_limited(urls, rate=2):
limiter = RateLimiter(rate)
results = []
async with aiohttp.ClientSession() as session:
for url in urls:
await limiter.acquire() # 限制速率
async with session.get(url) as response:
results.append(await response.text())
return results
# 每秒最多 2 个请求
asyncio.run(fetch_rate_limited(urls, rate=2))

八、异步数据库操作#

8.1 异步 SQLite(aiosqlite)#

import aiosqlite
import asyncio
async def init_db():
async with aiosqlite.connect("app.db") as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE
)
""")
await db.commit()
async def insert_user(name, email):
async with aiosqlite.connect("app.db") as db:
await db.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
(name, email)
)
await db.commit()
async def get_all_users():
async with aiosqlite.connect("app.db") as db:
async with db.execute("SELECT * FROM users") as cursor:
rows = await cursor.fetchall()
return rows
async def main():
await init_db()
await insert_user("Alice", "alice@example.com")
await insert_user("Bob", "bob@example.com")
users = await get_all_users()
print(users)
asyncio.run(main())

8.2 异步 PostgreSQL(asyncpg)#

import asyncpg
import asyncio
async def query_postgres():
conn = await asyncpg.connect(
host="localhost",
port=5432,
user="postgres",
password="password",
database="mydb"
)
# 执行查询
rows = await conn.fetch("SELECT * FROM users LIMIT 10")
for row in rows:
print(row)
# 执行插入
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Charlie", "charlie@example.com"
)
await conn.close()
asyncio.run(query_postgres())

九、任务取消与超时处理#

9.1 取消任务#

import asyncio
async def long_running_task():
try:
print("任务开始")
for i in range(10):
await asyncio.sleep(1)
print(f"进度: {i+1}/10")
print("任务完成")
except asyncio.CancelledError:
print("任务被取消,执行清理...")
# 执行清理操作
raise # 重新抛出,让调用者知道被取消
async def main():
task = asyncio.create_task(long_running_task())
await asyncio.sleep(3)
task.cancel() # 发送取消请求
try:
await task
except asyncio.CancelledError:
print("确认任务已取消")
asyncio.run(main())

9.2 超时处理#

import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "完成"
async def main():
try:
# asyncio.wait_for 设置超时
result = await asyncio.wait_for(slow_operation(), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("操作超时")
asyncio.run(main())

9.3 使用 asyncio.timeout(Python 3.11+)#

import asyncio
async def main():
async with asyncio.timeout(3.0):
await slow_operation() # 超时后自动取消
asyncio.run(main())

十、同步 vs 异步性能对比#

10.1 网络请求对比#

import asyncio
import aiohttp
import requests
import time
# 同步版本
def sync_fetch(urls):
start = time.time()
results = []
for url in urls:
results.append(requests.get(url).text)
elapsed = time.time() - start
return results, elapsed
# 异步版本
async def async_fetch(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
results = [await r.text() for r in responses]
elapsed = time.time() - start
return results, elapsed
# 测试
urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]
# 同步: 约 10 秒
_, sync_time = sync_fetch(urls)
print(f"同步耗时: {sync_time:.2f}s")
# 异步: 约 1 秒
_, async_time = asyncio.run(async_fetch(urls))
print(f"异步耗时: {async_time:.2f}s")
# 性能提升约 10 倍
print(f"性能提升: {sync_time/async_time:.1f}x")

10.2 适用场景总结#

场景推荐方案原因
网络请求asyncio等待时间占比高,异步可并发
数据库查询asyncioI/O 等待,异步可并发查询
文件读写asyncio磁盘 I/O,异步可并行处理
CPU 密集计算multiprocessing需多核并行,异步无优势
混合任务asyncio + 线程池I/O 用异步,CPU 用线程池

十一、常见陷阱与最佳实践#

❌ 陷阱 1:在同步函数中调用协程#

# ❌ 错误:直接调用协程不执行
async def my_coro():
return "结果"
def wrong_call():
result = my_coro() # 返回协程对象,不是结果
print(result) # <coroutine object my_coro at ...>
# ✅ 正确
def correct_call():
result = asyncio.run(my_coro())
print(result) # "结果"

❌ 陷阱 2:阻塞事件循环#

# ❌ 错误:在协程中使用阻塞操作
async def bad_example():
time.sleep(5) # 阻塞整个事件循环!
return "完成"
# ✅ 正确:使用异步版本
async def good_example():
await asyncio.sleep(5) # 不阻塞,允许其他任务运行
return "完成"
# ✅ 正确:在线程池中运行阻塞代码
async def run_blocking():
result = await asyncio.to_thread(blocking_function)
return result

❌ 陷阱 3:忘记关闭 ClientSession#

# ❌ 错误:不关闭会话导致资源泄漏
async def bad():
session = aiohttp.ClientSession()
await session.get("https://example.com")
# 没有 close!
# ✅ 正确:使用 async with
async def good():
async with aiohttp.ClientSession() as session:
await session.get("https://example.com")
# 自动关闭

❌ 陷阱 4:过度并发导致资源耗尽#

# ❌ 错误:无限制并发
async def bad():
tasks = [fetch(url) for url in urls] # 可能数千个
await asyncio.gather(*tasks) # 可能耗尽内存或触发服务器限流
# ✅ 正确:使用信号量限制
async def good():
semaphore = asyncio.Semaphore(100)
tasks = [fetch_with_limit(url, semaphore) for url in urls]
await asyncio.gather(*tasks)

✅ 最佳实践清单#

原则说明
使用 async with确保资源正确释放(session、文件、锁)
限制并发数使用 Semaphore 防止资源耗尽
避免阻塞调用asyncio.to_thread 包装阻塞函数
处理超时使用 wait_forasyncio.timeout
正确取消任务捕获 CancelledError 并执行清理
优先 TaskGroupPython 3.11+ 使用 TaskGroup 更安全
复用 ClientSession避免每次请求创建新会话

十二、异步与同步代码混用#

12.1 asyncio.to_thread — 在协程中运行阻塞函数#

import asyncio
import time
def blocking_io():
time.sleep(2) # 阻塞操作
return "阻塞函数结果"
async def main():
# 在线程池中运行阻塞函数,不阻塞事件循环
result = await asyncio.to_thread(blocking_io)
print(result)
# 同时可以运行其他异步任务
await asyncio.sleep(1)
asyncio.run(main())

12.2 asyncio.run_in_executor#

import asyncio
from concurrent.futures import ThreadPoolExecutor
def cpu_intensive_task(n):
return sum(range(n))
async def main():
executor = ThreadPoolExecutor(max_workers=4)
loop = asyncio.get_event_loop()
# 在线程池中运行 CPU 密集任务
result = await loop.run_in_executor(
executor,
cpu_intensive_task,
10_000_000
)
print(f"计算结果: {result}")
asyncio.run(main())

十三、总结:asyncio 学习路径#

阶段内容关键 API
入门async/await 语法、协程概念async def, await, asyncio.run()
基础并发执行、任务管理gather, create_task, sleep
实战HTTP 请求、文件操作aiohttp, aiofiles, aiosqlite
进阶并发控制、异常处理Semaphore, wait_for, cancel
高级TaskGroup、混用同步代码TaskGroup, to_thread, run_in_executor

掌握 asyncio 是 Python 高并发编程的关键。从网络爬虫到 API 服务,从数据处理到实时通信,异步编程能显著提升程序性能。建议从简单的并发请求开始,逐步掌握更复杂的场景,最终将 asyncio 融入你的日常开发工具箱。

Python asyncio 异步编程完全指南:从 async/await 到并发实战 | 2026最新入门教程
https://971918.xyz/posts/python-guide/python-asyncio-guide/
作者
九所长
发布于
2026-06-14
许可协议
CC BY-NC-SA 4.0