Python 多线程与多进程并发编程完全指南:GIL 原理、threading、multiprocessing 到 concurrent.futures | Python 进阶核心知识
Python 的并发编程是进阶开发者必须掌握的技能。无论是并发爬虫、Web 服务、数据处理还是 GUI 应用,多线程和多进程都是核心武器。
本文将系统讲解 Python 并发编程的完整知识体系:
- GIL(全局解释器锁)原理——理解 Python 多线程的底层限制
threading模块——多线程编程与同步机制multiprocessing模块——真正的并行计算concurrent.futures——现代高级并发 API- 三种并发模型的选择决策
- 完整实战案例
前置阅读:本文假设你已了解 Python 基础语法。如果想先了解异步编程,请阅读:Python asyncio 异步编程完全指南
1. GIL 原理深度解析
1.1 什么是 GIL?
GIL(Global Interpreter Lock,全局解释器锁) 是 CPython 解释器中的一个互斥锁(mutex)。它的核心规则非常简单:
同一时刻,只有一个线程可以执行 Python 字节码。
这意味着,即使你的 CPU 有 16 个核心,CPython 的多线程在任意时刻也只能用一个核心执行 Python 代码。
1.2 为什么 Python 有 GIL?
GIL 的存在有其历史原因和实际意义:
历史原因: CPython 的内存管理(引用计数)不是线程安全的。如果没有 GIL,多个线程同时修改对象的引用计数会导致内存泄漏或崩溃。GIL 是最简单、最高效的保护方案。
实际意义:
- 让 CPython 的单线程性能很高(无需复杂的锁机制)
- 简化了 C 扩展的编写(C 扩展假设 GIL 存在)
- 对于 I/O 密集型任务影响很小(I/O 等待时 GIL 会释放)
1.3 GIL 的工作机制
import sys# 查看 GIL 切换间隔(Python 3.2+ 默认 5 毫秒)print(sys.getswitchinterval()) # 0.005 秒GIL 的释放和获取遵循以下规则:
- 时间片到期: 每个线程执行 Python 字节码到一定时间(默认 5ms),会释放 GIL 让其他线程运行
- I/O 操作: 线程执行 I/O 操作(网络请求、文件读写、sleep)时会主动释放 GIL
- 主动释放: 可以显式调用
time.sleep(0)来主动让出 GIL
1.4 GIL 的实际影响验证
让我们用一个实验来验证 GIL 对 CPU 密集型和 I/O 密集型任务的影响:
import timeimport threading
# === CPU 密集型任务:计算斐波那契 ===def cpu_bound(n=35): def fib(x): if x <= 1: return x return fib(x - 1) + fib(x - 2) return fib(n)
# === I/O 密集型任务:模拟网络请求 ===def io_bound(): time.sleep(0.1) # 模拟 100ms 的 I/O 等待 return "done"
# === 单线程基准测试 ===def benchmark_serial(task, count): start = time.time() for _ in range(count): task() return time.time() - start
# === 多线程测试 ===def benchmark_threaded(task, count): start = time.time() threads = [threading.Thread(target=task) for _ in range(count)] for t in threads: t.start() for t in threads: t.join() return time.time() - start
print("=== CPU 密集型(8 任务)===")print(f"单线程: {benchmark_serial(cpu_bound, 8):.2f}s")print(f"多线程: {benchmark_threaded(cpu_bound, 8):.2f}s")# 预期:多线程 ≈ 单线程(甚至可能更慢!)
print("\n=== I/O 密集型(8 任务)===")print(f"单线程: {benchmark_serial(io_bound, 8):.2f}s")print(f"多线程: {benchmark_threaded(io_bound, 8):.2f}s")# 预期:多线程 ≈ 0.1s(远快于单线程的 0.8s)输出示例:
=== CPU 密集型(8 任务)===单线程: 12.34s多线程: 13.15s ← 多线程反而更慢!
=== I/O 密集型(8 任务)===单线程: 0.80s多线程: 0.11s ← 多线程快了 7 倍!结论:
- CPU 密集型:多线程受 GIL 限制,无法加速,甚至因线程切换开销略微变慢
- I/O 密集型:多线程在 I/O 等待时释放 GIL,大幅提升并发效率
1.5 GIL 的未来
Python 3.13 引入了实验性的 free-threading 模式(PEP 703),允许在编译时禁用 GIL:
# Python 3.13+ 实验性 free-threading 编译./configure --disable-gilmake但目前(2026 年)CPython 默认仍带 GIL,且许多 C 扩展尚未适配 free-threading。在可预见的未来,GIL 仍将是 CPython 的默认特性。
2. threading 多线程模块详解
2.1 Thread 线程创建
Python 创建线程有两种方式:
import threadingimport time
# 方式一:传递目标函数def worker(name, delay): """工作线程函数""" print(f"[{name}] 开始工作") time.sleep(delay) print(f"[{name}] 工作完成,耗时 {delay}s")
# 创建线程(daemon=True 表示守护线程,主线程退出时自动结束)t1 = threading.Thread(target=worker, args=("worker-1", 2))t2 = threading.Thread(target=worker, kwargs={"name": "worker-2", "delay": 1})
t1.start()t2.start()
# 等待线程结束t1.join()t2.join()
print("所有线程完成")# 方式二:继承 Thread 类class MyThread(threading.Thread): def __init__(self, name, delay): super().__init__(name=name) # 设置线程名 self.delay = delay
def run(self): print(f"[{self.name}] 开始工作") time.sleep(self.delay) print(f"[{self.name}] 完成")
t = MyThread("custom-thread", 1.5)t.start()t.join()2.2 线程属性与方法
| 属性/方法 | 说明 |
|---|---|
threading.active_count() | 当前活跃线程数 |
threading.current_thread() | 获取当前线程对象 |
threading.main_thread() | 获取主线程对象 |
threading.enumerate() | 列出所有活跃线程 |
t.name | 线程名 |
t.daemon | 是否为守护线程 |
t.ident | 线程 ID(系统级) |
t.is_alive() | 线程是否存活 |
t.join(timeout) | 等待线程结束 |
import threading
def task(): current = threading.current_thread() print(f"名称: {current.name}") print(f"ID: {current.ident}") print(f"守护: {current.daemon}") print(f"总线程数: {threading.active_count()}")
t = threading.Thread(target=task, name="demo-thread", daemon=True)t.start()t.join()
# 输出:# 名称: demo-thread# ID: 123145307795456# 守护: True# 总线程数: 22.3 daemon 守护线程
守护线程的特殊行为:
- 主线程退出时,守护线程会立即被强制结束(不等待运行完成)
- 非守护线程(默认)会阻止主进程退出,直到所有非守护线程结束
import threadingimport time
def daemon_task(): for i in range(5): print(f"守护线程运行中... {i}") time.sleep(0.5)
def normal_task(): for i in range(3): print(f"普通线程运行中... {i}") time.sleep(0.5)
# 守护线程d = threading.Thread(target=daemon_task, daemon=True)d.start()
# 普通线程n = threading.Thread(target=normal_task)n.start()
print("主线程退出")# 主线程退出后,守护线程 d 会立即终止# 但主线程会等普通线程 n 结束后才真正退出最佳实践: 后台辅助任务(日志、心跳、监控)用守护线程;需要完整执行的任务用普通线程 +
join()。
3. 线程同步机制
多线程编程最大的陷阱是数据竞争(Race Condition)——多个线程同时修改共享数据导致不可预期的结果。
3.1 Lock 互斥锁
import threading
# 没有锁的经典问题:计数不准确counter = 0
def increment_without_lock(): global counter for _ in range(1_000_000): counter += 1 # 这不是原子操作!实际是:读→加→写 三步
threads = [threading.Thread(target=increment_without_lock) for _ in range(10)]for t in threads: t.start()for t in threads: t.join()
print(f"不加锁: counter = {counter}")# 预期 10,000,000,实际可能是 4,328,917(每次不同)# 使用 Lock 修复counter = 0lock = threading.Lock()
def increment_with_lock(): global counter for _ in range(1_000_000): with lock: # 自动 acquire/release counter += 1
threads = [threading.Thread(target=increment_with_lock) for _ in range(10)]for t in threads: t.start()for t in threads: t.join()
print(f"加锁后: counter = {counter}") # 正确: 10,000,000Lock 核心方法:
lock = threading.Lock()
# 阻塞获取(等待直到可用)lock.acquire()
# 非阻塞获取(立即返回是否成功)if lock.acquire(blocking=False): # 或 lock.acquire(False) # 成功获取 lock.release()
# 带超时获取if lock.acquire(timeout=2.0): # 2 秒内获取到 lock.release()else: print("超时!")
# 推荐用法:with 语句自动管理with lock: # 临界区代码 pass3.2 RLock 可重入锁
RLock 允许同一线程多次获取锁,适用于递归或嵌套调用场景:
import threading
rlock = threading.RLock()
class RecursiveCounter: def __init__(self): self.value = 0
def increment(self): with rlock: self.value += 1 return self.value
def reset_and_increment(self): with rlock: self.value = 0 # 同一线程再次调用 increment(),里面也会获取 rlock # Lock 会死锁,RLock 正常工作 return self.increment()
counter = RecursiveCounter()print(counter.reset_and_increment()) # 1选择建议: 一般场景用
Lock(性能略好),递归/嵌套场景用RLock。
3.3 Semaphore 信号量
Semaphore 限制同时访问某个资源的线程数量(类似停车场容量):
import threadingimport timeimport random
# 最多允许 3 个线程同时访问数据库db_semaphore = threading.Semaphore(3)
def database_query(query_id): with db_semaphore: print(f"查询 {query_id} 开始执行...") time.sleep(random.uniform(0.5, 1.5)) # 模拟查询 print(f"查询 {query_id} 完成")
# 启动 10 个并发查询for i in range(10): threading.Thread(target=database_query, args=(i,)).start()
# 最多 3 个同时执行,其余排队等待3.4 Event 事件
Event 用于线程间的事件通知——一个线程等待信号,另一个线程发送信号:
import threadingimport time
def downloader(stop_event): """下载线程:持续下载直到收到停止信号""" chunk = 0 while not stop_event.is_set(): chunk += 1 print(f"下载 chunk {chunk}...") time.sleep(0.5) print("收到停止信号,下载终止")
def controller(stop_event): """控制线程:5 秒后发送停止信号""" time.sleep(5) print("→ 发送停止信号") stop_event.set()
stop_event = threading.Event()
d = threading.Thread(target=downloader, args=(stop_event,))c = threading.Thread(target=controller, args=(stop_event,))
d.start()c.start()d.join()c.join()Event 方法:
event.set()— 设置事件(点亮信号灯)event.clear()— 清除事件(熄灭信号灯)event.wait(timeout)— 等待事件设置(可设超时)event.is_set()— 检查事件是否已设置
3.5 Condition 条件变量
Condition 比 Event 更强大,支持复杂的等待-通知模式(生产者-消费者模型):
import threadingimport timeimport random
class BoundedQueue: """有界队列:使用 Condition 实现生产者-消费者模式"""
def __init__(self, capacity): self.queue = [] self.capacity = capacity self.condition = threading.Condition()
def put(self, item): with self.condition: # 队列满时等待消费者腾出空间 while len(self.queue) >= self.capacity: print(f" 队列满,生产者等待...") self.condition.wait() self.queue.append(item) print(f" 生产: {item} (队列: {len(self.queue)}/{self.capacity})") self.condition.notify() # 通知一个等待的消费者
def get(self): with self.condition: # 队列空时等待生产者放入数据 while len(self.queue) == 0: print(f" 队列空,消费者等待...") self.condition.wait() item = self.queue.pop(0) print(f" 消费: {item} (队列: {len(self.queue)}/{self.capacity})") self.condition.notify() # 通知一个等待的生产者 return item
# 测试生产者-消费者q = BoundedQueue(3)
def producer(q, n): for i in range(n): time.sleep(random.uniform(0.1, 0.5)) q.put(f"P-{i}")
def consumer(q, n): for _ in range(n): time.sleep(random.uniform(0.2, 1.0)) q.get()
threading.Thread(target=producer, args=(q, 5)).start()threading.Thread(target=consumer, args=(q, 5)).start()3.6 Barrier 屏障
Barrier 让多个线程在某个点互相等待,全部到达后一起继续:
import threadingimport timeimport random
def racer(barrier, name): prep_time = random.uniform(0.5, 2.0) print(f"{name} 准备中...({prep_time:.1f}s)") time.sleep(prep_time) print(f"{name} 到达起跑线,等待其他选手...") barrier.wait() # 等待所有选手 print(f"{name} 起跑!")
# 5 个选手需要全部就位才能起跑barrier = threading.Barrier(5, timeout=10)for i in range(5): threading.Thread(target=racer, args=(barrier, f"选手-{i+1}"), daemon=True).start()3.7 Timer 定时器
import threading
def remind(msg): print(f"⏰ 提醒: {msg}")
# 5 秒后执行timer = threading.Timer(5.0, remind, args=["该休息了!"])timer.start()
# 如果不需要了可以取消# timer.cancel()3.8 local() 线程局部存储
import threading
# 每个线程有自己的独立存储空间,互不干扰thread_data = threading.local()
def process(): thread_data.value = threading.current_thread().name # 其他线程修改 thread_data.value 不会影响当前线程 print(f"{threading.current_thread().name}: value = {thread_data.value}")
for i in range(3): threading.Thread(target=process, name=f"thread-{i}").start()3.9 同步机制选择速查表
| 场景 | 推荐工具 |
|---|---|
| 保护共享变量不被并发修改 | Lock |
| 递归/嵌套调用中加锁 | RLock |
| 限制并发访问数量 | Semaphore |
| 等待某个事件发生 | Event |
| 生产者-消费者模式 | Condition / queue.Queue |
| 多线程同步等待 | Barrier |
| 定时执行 | Timer |
| 线程隔离存储 | local() |
4. queue.Queue 线程安全队列
queue.Queue 是 Python 内置的线程安全队列,比 Condition 更适合生产者-消费者场景:
import threadingimport queueimport time
# 创建有界队列q = queue.Queue(maxsize=5)
def producer(q, n): for i in range(n): time.sleep(0.3) item = f"item-{i}" q.put(item) # 阻塞直到有空位 print(f"生产: {item} (队列: {q.qsize()})")
def consumer(q, n): for _ in range(n): item = q.get() # 阻塞直到有数据 print(f" 消费: {item} (队列: {q.qsize()})") q.task_done() # 标记任务完成 time.sleep(0.8)
# 启动线程threading.Thread(target=producer, args=(q, 10), daemon=True).start()threading.Thread(target=consumer, args=(q, 10), daemon=True).start()
# 等待所有任务被处理q.join() # 阻塞直到队列中所有项都被 task_done()print("所有任务完成")Queue 类型:
queue.Queue(maxsize)— FIFO 队列(先进先出)queue.LifoQueue(maxsize)— LIFO 队列(后进先出,栈)queue.PriorityQueue(maxsize)— 优先级队列
import queue
# 优先级队列:数字越小优先级越高pq = queue.PriorityQueue()pq.put((2, "普通任务"))pq.put((1, "紧急任务"))pq.put((3, "低优先级任务"))
while not pq.empty(): print(pq.get()) # (1, '紧急任务') → (2, '普通任务') → (3, '低优先级任务')5. multiprocessing 多进程模块
5.1 为什么需要多进程?
多进程可以绕过 GIL 限制,实现真正的并行计算。每个进程有自己独立的 Python 解释器和内存空间。
5.2 Process 进程创建
import multiprocessingimport osimport time
def cpu_heavy(n): """CPU 密集型:计算素数和""" print(f"进程 {os.getpid()} 开始,参数 n={n}") total = 0 for i in range(2, n): is_prime = True for j in range(2, int(i ** 0.5) + 1): if i % j == 0: is_prime = False break if is_prime: total += i return total
# 创建进程(与 Thread API 几乎一样!)p1 = multiprocessing.Process(target=cpu_heavy, args=(200000,))p2 = multiprocessing.Process(target=cpu_heavy, args=(200000,))
start = time.time()p1.start()p2.start()p1.join()p2.join()print(f"2 进程耗时: {time.time() - start:.2f}s")# 真实并行,耗时接近单进程一半注意: Windows 下
multiprocessing必须在if __name__ == "__main__":块中使用,否则会递归创建进程。macOS/Linux 也建议添加此保护。
5.3 Pool 进程池
Pool 管理固定数量的工作进程,自动分配任务:
import multiprocessingimport timeimport random
def task(n): """模拟耗时计算""" time.sleep(random.uniform(0.5, 1.5)) result = n * n print(f"任务 {n}: {n}² = {result}") return result
if __name__ == "__main__": # 创建 4 个工作进程 with multiprocessing.Pool(processes=4) as pool: # map: 阻塞,按顺序返回 results = pool.map(task, range(10)) print(f"map 结果: {results}")
# imap: 惰性迭代器,按提交顺序返回 for result in pool.imap(task, range(5)): print(f"imap 得到: {result}")
# imap_unordered: 按完成顺序返回(非提交顺序) for result in pool.imap_unordered(task, range(5)): print(f"imap_unordered 得到: {result}")
# apply_async: 异步提交单个任务,返回 AsyncResult async_result = pool.apply_async(task, args=(42,)) # 可以做其他事情... result = async_result.get(timeout=10) # 阻塞获取结果 print(f"42² = {result}")
# map_async: 异步 map,返回 AsyncResult async_results = pool.map_async(task, range(8)) results = async_results.get() print(f"map_async 结果: {results}")Pool 方法选择:
| 方法 | 阻塞/异步 | 返回值 | 适用场景 |
|---|---|---|---|
pool.map() | 阻塞 | 列表 | 批量任务,等全部完成 |
pool.imap() | 惰性 | 迭代器 | 大量任务,边完成边处理 |
pool.apply() | 阻塞 | 单个值 | 单个任务 |
pool.apply_async() | 异步 | AsyncResult | 单个任务,不阻塞主进程 |
pool.map_async() | 异步 | AsyncResult | 批量任务,不阻塞主进程 |
pool.starmap() | 阻塞 | 列表 | 多参数任务(类似 itertools.starmap) |
5.4 Queue 与 Pipe 进程间通讯
进程间不能直接共享变量(独立内存空间),需要通过 IPC 机制:
import multiprocessing
# === 方法一:Queue(队列)===def producer(q): for i in range(5): q.put(f"msg-{i}") q.put("EOF") # 结束标志
def consumer(q): while True: msg = q.get() if msg == "EOF": break print(f"收到: {msg}")
if __name__ == "__main__": q = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(q,)) p2 = multiprocessing.Process(target=consumer, args=(q,)) p1.start() p2.start() p1.join() p2.join()# === 方法二:Pipe(管道)===if __name__ == "__main__": # Pipe 返回两个连接对象,全双工 parent_conn, child_conn = multiprocessing.Pipe()
def sender(conn): conn.send("Hello from child") conn.send([1, 2, 3]) conn.send({"key": "value"}) conn.close()
def receiver(conn): print(conn.recv()) # "Hello from child" print(conn.recv()) # [1, 2, 3] print(conn.recv()) # {"key": "value"} conn.close()
p = multiprocessing.Process(target=sender, args=(child_conn,)) p.start() receiver(parent_conn) p.join()5.5 Manager 共享数据管理器
Manager 提供了在进程间共享 Python 对象的机制:
import multiprocessing
def worker(mgr_dict, mgr_list, name, value): mgr_dict[name] = value mgr_list.append(f"{name}:{value}")
if __name__ == "__main__": with multiprocessing.Manager() as manager: # 创建进程间共享的数据结构 shared_dict = manager.dict() shared_list = manager.list()
processes = [] for i in range(5): p = multiprocessing.Process( target=worker, args=(shared_dict, shared_list, f"key-{i}", i * 100) ) processes.append(p) p.start()
for p in processes: p.join()
print(f"共享字典: {dict(shared_dict)}") print(f"共享列表: {list(shared_list)}")Manager 支持的类型: dict, list, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array
5.6 Value 和 Array 共享内存
比 Manager 更高效的共享内存方式(直接使用 C 类型):
import multiprocessing
# Value: 单个值的共享内存counter = multiprocessing.Value("i", 0) # "i" = 有符号整数
# Array: 数组的共享内存shared_array = multiprocessing.Array("d", [1.0, 2.0, 3.0]) # "d" = double
def increment(val, arr): for _ in range(1000): with val.get_lock(): # 需要显式加锁 val.value += 1
if __name__ == "__main__": processes = [ multiprocessing.Process(target=increment, args=(counter, shared_array)) for _ in range(4) ] for p in processes: p.start() for p in processes: p.join()
print(f"计数器: {counter.value}") # 4000类型码:
"i"— 有符号 int"f"— float"d"— double"c"— char"b"— signed char(注意:不是布尔值)
6. concurrent.futures 高级并发 API
concurrent.futures 是 Python 3.2 引入的高级并发 API,提供了统一的线程池和进程池接口。
6.1 ThreadPoolExecutor 线程池
from concurrent.futures import ThreadPoolExecutor, as_completedimport timeimport randomimport urllib.request
def fetch_url(url): """模拟抓取 URL""" time.sleep(random.uniform(0.3, 1.0)) # 模拟网络延迟 return f"{url} → 200 OK"
urls = [f"https://api.example.com/v1/item/{i}" for i in range(10)]
# 方式一:submit + as_completed(按完成顺序处理)with ThreadPoolExecutor(max_workers=5) as executor: # 提交所有任务 futures = {executor.submit(fetch_url, url): url for url in urls}
# 按完成顺序处理结果 for future in as_completed(futures): url = futures[future] try: result = future.result(timeout=5) print(f"✓ {result}") except Exception as e: print(f"✗ {url} 失败: {e}")# 方式二:map(保持提交顺序)with ThreadPoolExecutor(max_workers=5) as executor: results = executor.map(fetch_url, urls, timeout=10) for url, result in zip(urls, results): print(f"{url}: {result}")6.2 ProcessPoolExecutor 进程池
与线程池 API 一致,底层使用进程:
from concurrent.futures import ProcessPoolExecutorimport os
def cpu_task(n): return f"PID {os.getpid()}: {n}² = {n * n}"
if __name__ == "__main__": with ProcessPoolExecutor(max_workers=4) as executor: # submit 方式 futures = [executor.submit(cpu_task, i) for i in range(20)] for future in as_completed(futures): print(future.result())
# map 方式 results = executor.map(cpu_task, range(10, 15)) for r in results: print(r)6.3 Future 对象
Future 代表一个异步操作的结果,提供统一的查询和控制接口:
| 方法 | 说明 |
|---|---|
future.result(timeout) | 获取结果(阻塞) |
future.exception() | 获取异常(不阻塞) |
future.done() | 任务是否完成 |
future.running() | 任务是否正在执行 |
future.cancel() | 尝试取消任务(只能取消尚未开始的任务) |
future.add_done_callback(fn) | 添加完成回调 |
from concurrent.futures import ThreadPoolExecutorimport time
def check_done(future): """完成时的回调函数""" try: result = future.result() print(f"回调通知: 结果 = {result}") except Exception as e: print(f"回调通知: 异常 = {e}")
with ThreadPoolExecutor() as executor: future = executor.submit(time.sleep, 1) # 返回 None future.add_done_callback(check_done)
print("主线程继续执行其他工作...") time.sleep(1.5) print("主线程检查: done =", future.done()) # True6.4 wait 等待多任务
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETEDimport timeimport random
def task(n): time.sleep(random.uniform(0.5, 2.0)) return n * 2
with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(task, i) for i in range(10)]
# 等待第一个完成 done, not_done = wait(futures, return_when=FIRST_COMPLETED) print(f"第一个完成的: {[f.result() for f in done]}")
# 等待所有完成 done, not_done = wait(not_done, return_when=ALL_COMPLETED) print(f"剩下的: {[f.result() for f in done]}")7. 三种并发模型对比与选择
7.1 对比总结表
| 维度 | threading(多线程) | multiprocessing(多进程) | asyncio(异步) |
|---|---|---|---|
| 适用场景 | I/O 密集型 | CPU 密集型 | 高并发 I/O 密集型 |
| GIL 影响 | 受限制(只能并发 I/O) | 无影响(独立解释器) | 不受限(单线程) |
| 真正并行 | ❌(CPU 任务) | ✅ | ❌(单线程) |
| 内存开销 | 低(共享内存) | 高(独立内存) | 极低(协程栈) |
| 创建开销 | 低 | 高(fork 新进程) | 极低(协程切换) |
| 通讯方式 | 共享变量 + Lock | Queue / Pipe / Manager | 无需通讯(单线程) |
| 数据共享 | 简单(共享内存) | 复杂(需要 IPC) | N/A |
| 并发数量 | 数十到数百 | 数个到数十 | 数万到数十万 |
| 代码复杂度 | 中等(需要处理锁) | 中等(需要处理 IPC) | 较高(async/await 传染性) |
| 典型应用 | 爬虫、API 调用、文件处理 | 图像处理、科学计算、视频编码 | 高并发 Web 服务、WebSocket |
7.2 选择决策树
需要并发?├── CPU 密集型(计算为主)│ └── → multiprocessing / ProcessPoolExecutor│└── I/O 密集型(等待为主) ├── 简单任务 / 少量并发(< 100) │ └── → threading / ThreadPoolExecutor │ └── 高并发(> 1000)/ 需要精细控制 └── → asyncio7.3 混合使用:线程 + 进程
对于复杂场景,可以组合使用:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutorimport time
def cpu_heavy(n): """CPU 密集型工作""" return sum(i * i for i in range(n * 10000))
def io_heavy(n): """I/O 密集型工作""" time.sleep(0.1) return f"io-task-{n}"
# 进程池处理 CPU 任务,线程池处理 I/O 任务with ProcessPoolExecutor(max_workers=4) as cpu_pool, \ ThreadPoolExecutor(max_workers=10) as io_pool:
# 并行提交两种任务 cpu_futures = [cpu_pool.submit(cpu_heavy, i) for i in range(8)] io_futures = [io_pool.submit(io_heavy, i) for i in range(20)]
# 收集结果 for f in cpu_futures: print(f"CPU结果: {f.result()}") for f in io_futures: print(f"IO结果: {f.result()}")8. 实战案例
8.1 多线程爬虫加速
from concurrent.futures import ThreadPoolExecutor, as_completedimport threadingimport timeimport random
# 模拟爬取页面def crawl_page(url): """模拟爬取一个页面""" thread = threading.current_thread() delay = random.uniform(0.2, 0.8) time.sleep(delay) # 模拟网络延迟 return { "url": url, "status": 200, "size": random.randint(1000, 50000), "thread": thread.name, "time": delay, }
# URL 列表urls = [f"https://example.com/page/{i}" for i in range(50)]
print(f"开始爬取 {len(urls)} 个页面...\n")start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor: futures = {executor.submit(crawl_page, url): url for url in urls}
completed = 0 for future in as_completed(futures): completed += 1 result = future.result() print( f"[{completed:2d}/{len(urls)}] {result['url']:35s} | " f"状态={result['status']} | " f"{result['size']:5d}字节 | " f"{result['time']:.2f}s | " f"{result['thread']}" )
print(f"\n总耗时: {time.time() - start:.2f}s")# 50 个页面,10 线程并发,约 4-5 秒完成# 单线程需要约 0.5 * 50 = 25 秒8.2 多进程批量图片处理
from concurrent.futures import ProcessPoolExecutorimport osimport time
def process_image(args): """模拟图片处理:缩放、压缩、加水印""" filename, size = args time.sleep(0.3) # 模拟处理时间 new_size = (size[0] // 2, size[1] // 2) new_name = filename.replace(".jpg", "_thumb.jpg")
# 实际中这里会用 Pillow 处理图片 # from PIL import Image # img = Image.open(filename) # img.thumbnail(new_size) # img.save(new_name, quality=80)
return { "file": new_name, "old_size": size, "new_size": new_size, "pid": os.getpid(), }
if __name__ == "__main__": # 模拟 30 张图片 images = [(f"photo_{i}.jpg", (4000, 3000)) for i in range(30)]
print(f"开始处理 {len(images)} 张图片...\n") start = time.time()
with ProcessPoolExecutor(max_workers=6) as executor: futures = [executor.submit(process_image, img) for img in images]
for i, future in enumerate(futures, 1): result = future.result() print( f"[{i:2d}/{len(images)}] {result['file']:25s} | " f"{result['old_size']} → {result['new_size']} | " f"PID={result['pid']}" )
print(f"\n总耗时: {time.time() - start:.2f}s") # 30 张图,6 进程并行,约 1.5 秒 # 单进程需要 0.3 * 30 = 9 秒8.3 线程安全的连接池
import threadingimport timeimport randomfrom contextlib import contextmanager
class ConnectionPool: """ 线程安全的数据库连接池 使用 Semaphore 限制并发、Lock 保护数据结构 """
def __init__(self, pool_size=5): self.pool_size = pool_size self.semaphore = threading.Semaphore(pool_size) self.lock = threading.Lock() self.connections = [] # 可用连接 self.in_use = 0 # 使用中的连接数
# 初始化连接池 for i in range(pool_size): self.connections.append(self._create_connection(i))
def _create_connection(self, conn_id): return {"id": conn_id, "connected_at": time.time()}
@contextmanager def get_connection(self): """获取连接(上下文管理器,自动归还)""" self.semaphore.acquire() with self.lock: conn = self.connections.pop() self.in_use += 1 print(f" → 获取连接 #{conn['id']} (使用中: {self.in_use}/{self.pool_size})")
try: yield conn finally: with self.lock: self.connections.append(conn) self.in_use -= 1 print(f" ← 归还连接 #{conn['id']} (使用中: {self.in_use}/{self.pool_size})") self.semaphore.release()
# 使用连接池pool = ConnectionPool(pool_size=3)
def execute_query(query_id): with pool.get_connection() as conn: duration = random.uniform(0.5, 1.5) print(f"查询 {query_id} 执行中 (连接 #{conn['id']})...") time.sleep(duration) print(f"查询 {query_id} 完成 ({duration:.1f}s)")
# 8 个并发查询,但最多 3 个同时执行with ThreadPoolExecutor(max_workers=8) as executor: executor.map(execute_query, range(8))9. 常见陷阱与最佳实践
9.1 陷阱一:忘记 join()
# ❌ 错误:主线程退出,子线程被强制终止def long_task(): for i in range(100): print(f"进度: {i}%") time.sleep(0.1)
t = threading.Thread(target=long_task)t.start()# 程序直接结束了!子线程 work 没做完
# ✅ 正确t.start()t.join() # 等待子线程完成9.2 陷阱二:共享变量不加锁
# ❌ Python 的 += 不是原子操作counter = 0def increment(): global counter counter += 1 # 读 → 加 → 写,三步可能被打断
# ✅ 加锁或使用线程安全结构import threadingcounter = 0lock = threading.Lock()
def increment(): global counter with lock: counter += 19.3 陷阱三:死锁
# ❌ 经典死锁场景lock_a = threading.Lock()lock_b = threading.Lock()
def thread1(): with lock_a: time.sleep(0.1) with lock_b: # 等待 lock_b pass
def thread2(): with lock_b: time.sleep(0.1) with lock_a: # 等待 lock_a pass
# thread1 持有 lock_a 等 lock_b# thread2 持有 lock_b 等 lock_a# → 死锁!
# ✅ 修复:统一加锁顺序def thread1(): with lock_a: with lock_b: pass
def thread2(): with lock_a: # 先获取 lock_a(与 thread1 顺序一致) with lock_b: pass9.4 陷阱四:multiprocessing 在 Windows 上的坑
# ❌ Windows 下会无限递归创建进程import multiprocessing
def worker(): pass
p = multiprocessing.Process(target=worker)p.start() # Windows 会重新导入整个模块!
# ✅ 正确:使用 __name__ 保护if __name__ == "__main__": p = multiprocessing.Process(target=worker) p.start() p.join()9.5 陷阱五:线程池中传递不可序列化对象
# ❌ ProcessPoolExecutor 参数和返回值必须可 picklefrom concurrent.futures import ProcessPoolExecutor
lambda_fn = lambda x: x * 2with ProcessPoolExecutor() as executor: executor.submit(lambda_fn, 5) # TypeError: can't pickle lambda
# ✅ 使用普通函数def double(x): return x * 2
with ProcessPoolExecutor() as executor: f = executor.submit(double, 5) print(f.result()) # 109.6 最佳实践清单
- 线程安全数据结构优先: 能用
queue.Queue就不用list + Lock - 锁的范围最小化: 只锁关键代码,不要锁整个函数
- 使用
with语句: 避免忘记释放锁/信号量 - 统一加锁顺序: 防止死锁
- I/O 密集用 ThreadPoolExecutor,CPU 密集用 ProcessPoolExecutor
- ProcessPoolExecutor 统一加
if __name__ == "__main__":保护 - 设置合理的超时:
future.result(timeout=...)避免永久阻塞 - 异常处理: 用
try/except包裹任务函数,避免一个异常导致整个任务组崩溃
10. 总结
| 场景 | 推荐方案 | 关键理由 |
|---|---|---|
| 网络爬虫 / API 调用 | ThreadPoolExecutor | I/O 密集 + 简单 API |
| 图片/视频处理 | ProcessPoolExecutor | CPU 密集 + 真正并行 |
| 高并发 Web 服务 | asyncio + aiohttp | 超低开销 + 数万并发 |
| 简单并行任务 | ThreadPoolExecutor | 轻量 + 无 IPC 开销 |
| 数据库操作 | ThreadPoolExecutor | I/O 密集 + 线程安全 |
| 科学计算 / 数据处理 | ProcessPoolExecutor | 绕过 GIL |
| 混合任务(I/O + CPU) | ProcessPoolExecutor + ThreadPoolExecutor | 各取所长 |
记住三个核心规律:
- I/O 等得起 → 用线程(网络、文件、数据库)
- CPU 算不动 → 用进程(计算、编码、模型推理)
- 高并发等得多 → 用异步(协程、事件循环)
推荐阅读: