5967 字
30 分钟

Python 多线程与多进程并发编程完全指南:GIL 原理、threading、multiprocessing 到 concurrent.futures | Python 进阶核心知识

Python 的并发编程是进阶开发者必须掌握的技能。无论是并发爬虫、Web 服务、数据处理还是 GUI 应用,多线程和多进程都是核心武器。

本文将系统讲解 Python 并发编程的完整知识体系:

  • GIL(全局解释器锁)原理——理解 Python 多线程的底层限制
  • threading 模块——多线程编程与同步机制
  • multiprocessing 模块——真正的并行计算
  • concurrent.futures——现代高级并发 API
  • 三种并发模型的选择决策
  • 完整实战案例

前置阅读:本文假设你已了解 Python 基础语法。如果想先了解异步编程,请阅读:Python asyncio 异步编程完全指南

1. GIL 原理深度解析#

1.1 什么是 GIL?#

GIL(Global Interpreter Lock,全局解释器锁) 是 CPython 解释器中的一个互斥锁(mutex)。它的核心规则非常简单:

同一时刻,只有一个线程可以执行 Python 字节码。

这意味着,即使你的 CPU 有 16 个核心,CPython 的多线程在任意时刻也只能用一个核心执行 Python 代码。

1.2 为什么 Python 有 GIL?#

GIL 的存在有其历史原因和实际意义:

历史原因: CPython 的内存管理(引用计数)不是线程安全的。如果没有 GIL,多个线程同时修改对象的引用计数会导致内存泄漏或崩溃。GIL 是最简单、最高效的保护方案。

实际意义:

  • 让 CPython 的单线程性能很高(无需复杂的锁机制)
  • 简化了 C 扩展的编写(C 扩展假设 GIL 存在)
  • 对于 I/O 密集型任务影响很小(I/O 等待时 GIL 会释放)

1.3 GIL 的工作机制#

import sys
# 查看 GIL 切换间隔(Python 3.2+ 默认 5 毫秒)
print(sys.getswitchinterval()) # 0.005 秒

GIL 的释放和获取遵循以下规则:

  1. 时间片到期: 每个线程执行 Python 字节码到一定时间(默认 5ms),会释放 GIL 让其他线程运行
  2. I/O 操作: 线程执行 I/O 操作(网络请求、文件读写、sleep)时会主动释放 GIL
  3. 主动释放: 可以显式调用 time.sleep(0) 来主动让出 GIL

1.4 GIL 的实际影响验证#

让我们用一个实验来验证 GIL 对 CPU 密集型和 I/O 密集型任务的影响:

import time
import threading
# === CPU 密集型任务:计算斐波那契 ===
def cpu_bound(n=35):
def fib(x):
if x <= 1:
return x
return fib(x - 1) + fib(x - 2)
return fib(n)
# === I/O 密集型任务:模拟网络请求 ===
def io_bound():
time.sleep(0.1) # 模拟 100ms 的 I/O 等待
return "done"
# === 单线程基准测试 ===
def benchmark_serial(task, count):
start = time.time()
for _ in range(count):
task()
return time.time() - start
# === 多线程测试 ===
def benchmark_threaded(task, count):
start = time.time()
threads = [threading.Thread(target=task) for _ in range(count)]
for t in threads:
t.start()
for t in threads:
t.join()
return time.time() - start
print("=== CPU 密集型(8 任务)===")
print(f"单线程: {benchmark_serial(cpu_bound, 8):.2f}s")
print(f"多线程: {benchmark_threaded(cpu_bound, 8):.2f}s")
# 预期:多线程 ≈ 单线程(甚至可能更慢!)
print("\n=== I/O 密集型(8 任务)===")
print(f"单线程: {benchmark_serial(io_bound, 8):.2f}s")
print(f"多线程: {benchmark_threaded(io_bound, 8):.2f}s")
# 预期:多线程 ≈ 0.1s(远快于单线程的 0.8s)

输出示例:

=== CPU 密集型(8 任务)===
单线程: 12.34s
多线程: 13.15s ← 多线程反而更慢!
=== I/O 密集型(8 任务)===
单线程: 0.80s
多线程: 0.11s ← 多线程快了 7 倍!

结论:

  • CPU 密集型:多线程受 GIL 限制,无法加速,甚至因线程切换开销略微变慢
  • I/O 密集型:多线程在 I/O 等待时释放 GIL,大幅提升并发效率

1.5 GIL 的未来#

Python 3.13 引入了实验性的 free-threading 模式(PEP 703),允许在编译时禁用 GIL:

Terminal window
# Python 3.13+ 实验性 free-threading 编译
./configure --disable-gil
make

但目前(2026 年)CPython 默认仍带 GIL,且许多 C 扩展尚未适配 free-threading。在可预见的未来,GIL 仍将是 CPython 的默认特性。


2. threading 多线程模块详解#

2.1 Thread 线程创建#

Python 创建线程有两种方式:

import threading
import time
# 方式一:传递目标函数
def worker(name, delay):
"""工作线程函数"""
print(f"[{name}] 开始工作")
time.sleep(delay)
print(f"[{name}] 工作完成,耗时 {delay}s")
# 创建线程(daemon=True 表示守护线程,主线程退出时自动结束)
t1 = threading.Thread(target=worker, args=("worker-1", 2))
t2 = threading.Thread(target=worker, kwargs={"name": "worker-2", "delay": 1})
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print("所有线程完成")
# 方式二:继承 Thread 类
class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__(name=name) # 设置线程名
self.delay = delay
def run(self):
print(f"[{self.name}] 开始工作")
time.sleep(self.delay)
print(f"[{self.name}] 完成")
t = MyThread("custom-thread", 1.5)
t.start()
t.join()

2.2 线程属性与方法#

属性/方法说明
threading.active_count()当前活跃线程数
threading.current_thread()获取当前线程对象
threading.main_thread()获取主线程对象
threading.enumerate()列出所有活跃线程
t.name线程名
t.daemon是否为守护线程
t.ident线程 ID(系统级)
t.is_alive()线程是否存活
t.join(timeout)等待线程结束
import threading
def task():
current = threading.current_thread()
print(f"名称: {current.name}")
print(f"ID: {current.ident}")
print(f"守护: {current.daemon}")
print(f"总线程数: {threading.active_count()}")
t = threading.Thread(target=task, name="demo-thread", daemon=True)
t.start()
t.join()
# 输出:
# 名称: demo-thread
# ID: 123145307795456
# 守护: True
# 总线程数: 2

2.3 daemon 守护线程#

守护线程的特殊行为:

  • 主线程退出时,守护线程会立即被强制结束(不等待运行完成)
  • 非守护线程(默认)会阻止主进程退出,直到所有非守护线程结束
import threading
import time
def daemon_task():
for i in range(5):
print(f"守护线程运行中... {i}")
time.sleep(0.5)
def normal_task():
for i in range(3):
print(f"普通线程运行中... {i}")
time.sleep(0.5)
# 守护线程
d = threading.Thread(target=daemon_task, daemon=True)
d.start()
# 普通线程
n = threading.Thread(target=normal_task)
n.start()
print("主线程退出")
# 主线程退出后,守护线程 d 会立即终止
# 但主线程会等普通线程 n 结束后才真正退出

最佳实践: 后台辅助任务(日志、心跳、监控)用守护线程;需要完整执行的任务用普通线程 + join()


3. 线程同步机制#

多线程编程最大的陷阱是数据竞争(Race Condition)——多个线程同时修改共享数据导致不可预期的结果。

3.1 Lock 互斥锁#

import threading
# 没有锁的经典问题:计数不准确
counter = 0
def increment_without_lock():
global counter
for _ in range(1_000_000):
counter += 1 # 这不是原子操作!实际是:读→加→写 三步
threads = [threading.Thread(target=increment_without_lock) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"不加锁: counter = {counter}")
# 预期 10,000,000,实际可能是 4,328,917(每次不同)
# 使用 Lock 修复
counter = 0
lock = threading.Lock()
def increment_with_lock():
global counter
for _ in range(1_000_000):
with lock: # 自动 acquire/release
counter += 1
threads = [threading.Thread(target=increment_with_lock) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"加锁后: counter = {counter}") # 正确: 10,000,000

Lock 核心方法:

lock = threading.Lock()
# 阻塞获取(等待直到可用)
lock.acquire()
# 非阻塞获取(立即返回是否成功)
if lock.acquire(blocking=False): # 或 lock.acquire(False)
# 成功获取
lock.release()
# 带超时获取
if lock.acquire(timeout=2.0):
# 2 秒内获取到
lock.release()
else:
print("超时!")
# 推荐用法:with 语句自动管理
with lock:
# 临界区代码
pass

3.2 RLock 可重入锁#

RLock 允许同一线程多次获取锁,适用于递归或嵌套调用场景:

import threading
rlock = threading.RLock()
class RecursiveCounter:
def __init__(self):
self.value = 0
def increment(self):
with rlock:
self.value += 1
return self.value
def reset_and_increment(self):
with rlock:
self.value = 0
# 同一线程再次调用 increment(),里面也会获取 rlock
# Lock 会死锁,RLock 正常工作
return self.increment()
counter = RecursiveCounter()
print(counter.reset_and_increment()) # 1

选择建议: 一般场景用 Lock(性能略好),递归/嵌套场景用 RLock

3.3 Semaphore 信号量#

Semaphore 限制同时访问某个资源的线程数量(类似停车场容量):

import threading
import time
import random
# 最多允许 3 个线程同时访问数据库
db_semaphore = threading.Semaphore(3)
def database_query(query_id):
with db_semaphore:
print(f"查询 {query_id} 开始执行...")
time.sleep(random.uniform(0.5, 1.5)) # 模拟查询
print(f"查询 {query_id} 完成")
# 启动 10 个并发查询
for i in range(10):
threading.Thread(target=database_query, args=(i,)).start()
# 最多 3 个同时执行,其余排队等待

3.4 Event 事件#

Event 用于线程间的事件通知——一个线程等待信号,另一个线程发送信号:

import threading
import time
def downloader(stop_event):
"""下载线程:持续下载直到收到停止信号"""
chunk = 0
while not stop_event.is_set():
chunk += 1
print(f"下载 chunk {chunk}...")
time.sleep(0.5)
print("收到停止信号,下载终止")
def controller(stop_event):
"""控制线程:5 秒后发送停止信号"""
time.sleep(5)
print("→ 发送停止信号")
stop_event.set()
stop_event = threading.Event()
d = threading.Thread(target=downloader, args=(stop_event,))
c = threading.Thread(target=controller, args=(stop_event,))
d.start()
c.start()
d.join()
c.join()

Event 方法:

  • event.set() — 设置事件(点亮信号灯)
  • event.clear() — 清除事件(熄灭信号灯)
  • event.wait(timeout) — 等待事件设置(可设超时)
  • event.is_set() — 检查事件是否已设置

3.5 Condition 条件变量#

Condition 比 Event 更强大,支持复杂的等待-通知模式(生产者-消费者模型):

import threading
import time
import random
class BoundedQueue:
"""有界队列:使用 Condition 实现生产者-消费者模式"""
def __init__(self, capacity):
self.queue = []
self.capacity = capacity
self.condition = threading.Condition()
def put(self, item):
with self.condition:
# 队列满时等待消费者腾出空间
while len(self.queue) >= self.capacity:
print(f" 队列满,生产者等待...")
self.condition.wait()
self.queue.append(item)
print(f" 生产: {item} (队列: {len(self.queue)}/{self.capacity})")
self.condition.notify() # 通知一个等待的消费者
def get(self):
with self.condition:
# 队列空时等待生产者放入数据
while len(self.queue) == 0:
print(f" 队列空,消费者等待...")
self.condition.wait()
item = self.queue.pop(0)
print(f" 消费: {item} (队列: {len(self.queue)}/{self.capacity})")
self.condition.notify() # 通知一个等待的生产者
return item
# 测试生产者-消费者
q = BoundedQueue(3)
def producer(q, n):
for i in range(n):
time.sleep(random.uniform(0.1, 0.5))
q.put(f"P-{i}")
def consumer(q, n):
for _ in range(n):
time.sleep(random.uniform(0.2, 1.0))
q.get()
threading.Thread(target=producer, args=(q, 5)).start()
threading.Thread(target=consumer, args=(q, 5)).start()

3.6 Barrier 屏障#

Barrier 让多个线程在某个点互相等待,全部到达后一起继续:

import threading
import time
import random
def racer(barrier, name):
prep_time = random.uniform(0.5, 2.0)
print(f"{name} 准备中...({prep_time:.1f}s)")
time.sleep(prep_time)
print(f"{name} 到达起跑线,等待其他选手...")
barrier.wait() # 等待所有选手
print(f"{name} 起跑!")
# 5 个选手需要全部就位才能起跑
barrier = threading.Barrier(5, timeout=10)
for i in range(5):
threading.Thread(target=racer, args=(barrier, f"选手-{i+1}"), daemon=True).start()

3.7 Timer 定时器#

import threading
def remind(msg):
print(f"⏰ 提醒: {msg}")
# 5 秒后执行
timer = threading.Timer(5.0, remind, args=["该休息了!"])
timer.start()
# 如果不需要了可以取消
# timer.cancel()

3.8 local() 线程局部存储#

import threading
# 每个线程有自己的独立存储空间,互不干扰
thread_data = threading.local()
def process():
thread_data.value = threading.current_thread().name
# 其他线程修改 thread_data.value 不会影响当前线程
print(f"{threading.current_thread().name}: value = {thread_data.value}")
for i in range(3):
threading.Thread(target=process, name=f"thread-{i}").start()

3.9 同步机制选择速查表#

场景推荐工具
保护共享变量不被并发修改Lock
递归/嵌套调用中加锁RLock
限制并发访问数量Semaphore
等待某个事件发生Event
生产者-消费者模式Condition / queue.Queue
多线程同步等待Barrier
定时执行Timer
线程隔离存储local()

4. queue.Queue 线程安全队列#

queue.Queue 是 Python 内置的线程安全队列,比 Condition 更适合生产者-消费者场景:

import threading
import queue
import time
# 创建有界队列
q = queue.Queue(maxsize=5)
def producer(q, n):
for i in range(n):
time.sleep(0.3)
item = f"item-{i}"
q.put(item) # 阻塞直到有空位
print(f"生产: {item} (队列: {q.qsize()})")
def consumer(q, n):
for _ in range(n):
item = q.get() # 阻塞直到有数据
print(f" 消费: {item} (队列: {q.qsize()})")
q.task_done() # 标记任务完成
time.sleep(0.8)
# 启动线程
threading.Thread(target=producer, args=(q, 10), daemon=True).start()
threading.Thread(target=consumer, args=(q, 10), daemon=True).start()
# 等待所有任务被处理
q.join() # 阻塞直到队列中所有项都被 task_done()
print("所有任务完成")

Queue 类型:

  • queue.Queue(maxsize) — FIFO 队列(先进先出)
  • queue.LifoQueue(maxsize) — LIFO 队列(后进先出,栈)
  • queue.PriorityQueue(maxsize) — 优先级队列
import queue
# 优先级队列:数字越小优先级越高
pq = queue.PriorityQueue()
pq.put((2, "普通任务"))
pq.put((1, "紧急任务"))
pq.put((3, "低优先级任务"))
while not pq.empty():
print(pq.get()) # (1, '紧急任务') → (2, '普通任务') → (3, '低优先级任务')

5. multiprocessing 多进程模块#

5.1 为什么需要多进程?#

多进程可以绕过 GIL 限制,实现真正的并行计算。每个进程有自己独立的 Python 解释器和内存空间。

5.2 Process 进程创建#

import multiprocessing
import os
import time
def cpu_heavy(n):
"""CPU 密集型:计算素数和"""
print(f"进程 {os.getpid()} 开始,参数 n={n}")
total = 0
for i in range(2, n):
is_prime = True
for j in range(2, int(i ** 0.5) + 1):
if i % j == 0:
is_prime = False
break
if is_prime:
total += i
return total
# 创建进程(与 Thread API 几乎一样!)
p1 = multiprocessing.Process(target=cpu_heavy, args=(200000,))
p2 = multiprocessing.Process(target=cpu_heavy, args=(200000,))
start = time.time()
p1.start()
p2.start()
p1.join()
p2.join()
print(f"2 进程耗时: {time.time() - start:.2f}s")
# 真实并行,耗时接近单进程一半

注意: Windows 下 multiprocessing 必须在 if __name__ == "__main__": 块中使用,否则会递归创建进程。macOS/Linux 也建议添加此保护。

5.3 Pool 进程池#

Pool 管理固定数量的工作进程,自动分配任务:

import multiprocessing
import time
import random
def task(n):
"""模拟耗时计算"""
time.sleep(random.uniform(0.5, 1.5))
result = n * n
print(f"任务 {n}: {n}² = {result}")
return result
if __name__ == "__main__":
# 创建 4 个工作进程
with multiprocessing.Pool(processes=4) as pool:
# map: 阻塞,按顺序返回
results = pool.map(task, range(10))
print(f"map 结果: {results}")
# imap: 惰性迭代器,按提交顺序返回
for result in pool.imap(task, range(5)):
print(f"imap 得到: {result}")
# imap_unordered: 按完成顺序返回(非提交顺序)
for result in pool.imap_unordered(task, range(5)):
print(f"imap_unordered 得到: {result}")
# apply_async: 异步提交单个任务,返回 AsyncResult
async_result = pool.apply_async(task, args=(42,))
# 可以做其他事情...
result = async_result.get(timeout=10) # 阻塞获取结果
print(f"42² = {result}")
# map_async: 异步 map,返回 AsyncResult
async_results = pool.map_async(task, range(8))
results = async_results.get()
print(f"map_async 结果: {results}")

Pool 方法选择:

方法阻塞/异步返回值适用场景
pool.map()阻塞列表批量任务,等全部完成
pool.imap()惰性迭代器大量任务,边完成边处理
pool.apply()阻塞单个值单个任务
pool.apply_async()异步AsyncResult单个任务,不阻塞主进程
pool.map_async()异步AsyncResult批量任务,不阻塞主进程
pool.starmap()阻塞列表多参数任务(类似 itertools.starmap)

5.4 Queue 与 Pipe 进程间通讯#

进程间不能直接共享变量(独立内存空间),需要通过 IPC 机制:

import multiprocessing
# === 方法一:Queue(队列)===
def producer(q):
for i in range(5):
q.put(f"msg-{i}")
q.put("EOF") # 结束标志
def consumer(q):
while True:
msg = q.get()
if msg == "EOF":
break
print(f"收到: {msg}")
if __name__ == "__main__":
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
# === 方法二:Pipe(管道)===
if __name__ == "__main__":
# Pipe 返回两个连接对象,全双工
parent_conn, child_conn = multiprocessing.Pipe()
def sender(conn):
conn.send("Hello from child")
conn.send([1, 2, 3])
conn.send({"key": "value"})
conn.close()
def receiver(conn):
print(conn.recv()) # "Hello from child"
print(conn.recv()) # [1, 2, 3]
print(conn.recv()) # {"key": "value"}
conn.close()
p = multiprocessing.Process(target=sender, args=(child_conn,))
p.start()
receiver(parent_conn)
p.join()

5.5 Manager 共享数据管理器#

Manager 提供了在进程间共享 Python 对象的机制:

import multiprocessing
def worker(mgr_dict, mgr_list, name, value):
mgr_dict[name] = value
mgr_list.append(f"{name}:{value}")
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
# 创建进程间共享的数据结构
shared_dict = manager.dict()
shared_list = manager.list()
processes = []
for i in range(5):
p = multiprocessing.Process(
target=worker,
args=(shared_dict, shared_list, f"key-{i}", i * 100)
)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"共享字典: {dict(shared_dict)}")
print(f"共享列表: {list(shared_list)}")

Manager 支持的类型: dict, list, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array

5.6 Value 和 Array 共享内存#

比 Manager 更高效的共享内存方式(直接使用 C 类型):

import multiprocessing
# Value: 单个值的共享内存
counter = multiprocessing.Value("i", 0) # "i" = 有符号整数
# Array: 数组的共享内存
shared_array = multiprocessing.Array("d", [1.0, 2.0, 3.0]) # "d" = double
def increment(val, arr):
for _ in range(1000):
with val.get_lock(): # 需要显式加锁
val.value += 1
if __name__ == "__main__":
processes = [
multiprocessing.Process(target=increment, args=(counter, shared_array))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"计数器: {counter.value}") # 4000

类型码:

  • "i" — 有符号 int
  • "f" — float
  • "d" — double
  • "c" — char
  • "b" — signed char(注意:不是布尔值)

6. concurrent.futures 高级并发 API#

concurrent.futures 是 Python 3.2 引入的高级并发 API,提供了统一的线程池和进程池接口。

6.1 ThreadPoolExecutor 线程池#

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
import urllib.request
def fetch_url(url):
"""模拟抓取 URL"""
time.sleep(random.uniform(0.3, 1.0)) # 模拟网络延迟
return f"{url} → 200 OK"
urls = [f"https://api.example.com/v1/item/{i}" for i in range(10)]
# 方式一:submit + as_completed(按完成顺序处理)
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有任务
futures = {executor.submit(fetch_url, url): url for url in urls}
# 按完成顺序处理结果
for future in as_completed(futures):
url = futures[future]
try:
result = future.result(timeout=5)
print(f"✓ {result}")
except Exception as e:
print(f"✗ {url} 失败: {e}")
# 方式二:map(保持提交顺序)
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(fetch_url, urls, timeout=10)
for url, result in zip(urls, results):
print(f"{url}: {result}")

6.2 ProcessPoolExecutor 进程池#

与线程池 API 一致,底层使用进程:

from concurrent.futures import ProcessPoolExecutor
import os
def cpu_task(n):
return f"PID {os.getpid()}: {n}² = {n * n}"
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=4) as executor:
# submit 方式
futures = [executor.submit(cpu_task, i) for i in range(20)]
for future in as_completed(futures):
print(future.result())
# map 方式
results = executor.map(cpu_task, range(10, 15))
for r in results:
print(r)

6.3 Future 对象#

Future 代表一个异步操作的结果,提供统一的查询和控制接口:

方法说明
future.result(timeout)获取结果(阻塞)
future.exception()获取异常(不阻塞)
future.done()任务是否完成
future.running()任务是否正在执行
future.cancel()尝试取消任务(只能取消尚未开始的任务)
future.add_done_callback(fn)添加完成回调
from concurrent.futures import ThreadPoolExecutor
import time
def check_done(future):
"""完成时的回调函数"""
try:
result = future.result()
print(f"回调通知: 结果 = {result}")
except Exception as e:
print(f"回调通知: 异常 = {e}")
with ThreadPoolExecutor() as executor:
future = executor.submit(time.sleep, 1) # 返回 None
future.add_done_callback(check_done)
print("主线程继续执行其他工作...")
time.sleep(1.5)
print("主线程检查: done =", future.done()) # True

6.4 wait 等待多任务#

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time
import random
def task(n):
time.sleep(random.uniform(0.5, 2.0))
return n * 2
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task, i) for i in range(10)]
# 等待第一个完成
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
print(f"第一个完成的: {[f.result() for f in done]}")
# 等待所有完成
done, not_done = wait(not_done, return_when=ALL_COMPLETED)
print(f"剩下的: {[f.result() for f in done]}")

7. 三种并发模型对比与选择#

7.1 对比总结表#

维度threading(多线程)multiprocessing(多进程)asyncio(异步)
适用场景I/O 密集型CPU 密集型高并发 I/O 密集型
GIL 影响受限制(只能并发 I/O)无影响(独立解释器)不受限(单线程)
真正并行❌(CPU 任务)❌(单线程)
内存开销低(共享内存)高(独立内存)极低(协程栈)
创建开销高(fork 新进程)极低(协程切换)
通讯方式共享变量 + LockQueue / Pipe / Manager无需通讯(单线程)
数据共享简单(共享内存)复杂(需要 IPC)N/A
并发数量数十到数百数个到数十数万到数十万
代码复杂度中等(需要处理锁)中等(需要处理 IPC)较高(async/await 传染性)
典型应用爬虫、API 调用、文件处理图像处理、科学计算、视频编码高并发 Web 服务、WebSocket

7.2 选择决策树#

需要并发?
├── CPU 密集型(计算为主)
│ └── → multiprocessing / ProcessPoolExecutor
└── I/O 密集型(等待为主)
├── 简单任务 / 少量并发(< 100)
│ └── → threading / ThreadPoolExecutor
└── 高并发(> 1000)/ 需要精细控制
└── → asyncio

7.3 混合使用:线程 + 进程#

对于复杂场景,可以组合使用:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
def cpu_heavy(n):
"""CPU 密集型工作"""
return sum(i * i for i in range(n * 10000))
def io_heavy(n):
"""I/O 密集型工作"""
time.sleep(0.1)
return f"io-task-{n}"
# 进程池处理 CPU 任务,线程池处理 I/O 任务
with ProcessPoolExecutor(max_workers=4) as cpu_pool, \
ThreadPoolExecutor(max_workers=10) as io_pool:
# 并行提交两种任务
cpu_futures = [cpu_pool.submit(cpu_heavy, i) for i in range(8)]
io_futures = [io_pool.submit(io_heavy, i) for i in range(20)]
# 收集结果
for f in cpu_futures:
print(f"CPU结果: {f.result()}")
for f in io_futures:
print(f"IO结果: {f.result()}")

8. 实战案例#

8.1 多线程爬虫加速#

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
import random
# 模拟爬取页面
def crawl_page(url):
"""模拟爬取一个页面"""
thread = threading.current_thread()
delay = random.uniform(0.2, 0.8)
time.sleep(delay) # 模拟网络延迟
return {
"url": url,
"status": 200,
"size": random.randint(1000, 50000),
"thread": thread.name,
"time": delay,
}
# URL 列表
urls = [f"https://example.com/page/{i}" for i in range(50)]
print(f"开始爬取 {len(urls)} 个页面...\n")
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(crawl_page, url): url for url in urls}
completed = 0
for future in as_completed(futures):
completed += 1
result = future.result()
print(
f"[{completed:2d}/{len(urls)}] {result['url']:35s} | "
f"状态={result['status']} | "
f"{result['size']:5d}字节 | "
f"{result['time']:.2f}s | "
f"{result['thread']}"
)
print(f"\n总耗时: {time.time() - start:.2f}s")
# 50 个页面,10 线程并发,约 4-5 秒完成
# 单线程需要约 0.5 * 50 = 25 秒

8.2 多进程批量图片处理#

from concurrent.futures import ProcessPoolExecutor
import os
import time
def process_image(args):
"""模拟图片处理:缩放、压缩、加水印"""
filename, size = args
time.sleep(0.3) # 模拟处理时间
new_size = (size[0] // 2, size[1] // 2)
new_name = filename.replace(".jpg", "_thumb.jpg")
# 实际中这里会用 Pillow 处理图片
# from PIL import Image
# img = Image.open(filename)
# img.thumbnail(new_size)
# img.save(new_name, quality=80)
return {
"file": new_name,
"old_size": size,
"new_size": new_size,
"pid": os.getpid(),
}
if __name__ == "__main__":
# 模拟 30 张图片
images = [(f"photo_{i}.jpg", (4000, 3000)) for i in range(30)]
print(f"开始处理 {len(images)} 张图片...\n")
start = time.time()
with ProcessPoolExecutor(max_workers=6) as executor:
futures = [executor.submit(process_image, img) for img in images]
for i, future in enumerate(futures, 1):
result = future.result()
print(
f"[{i:2d}/{len(images)}] {result['file']:25s} | "
f"{result['old_size']}{result['new_size']} | "
f"PID={result['pid']}"
)
print(f"\n总耗时: {time.time() - start:.2f}s")
# 30 张图,6 进程并行,约 1.5 秒
# 单进程需要 0.3 * 30 = 9 秒

8.3 线程安全的连接池#

import threading
import time
import random
from contextlib import contextmanager
class ConnectionPool:
"""
线程安全的数据库连接池
使用 Semaphore 限制并发、Lock 保护数据结构
"""
def __init__(self, pool_size=5):
self.pool_size = pool_size
self.semaphore = threading.Semaphore(pool_size)
self.lock = threading.Lock()
self.connections = [] # 可用连接
self.in_use = 0 # 使用中的连接数
# 初始化连接池
for i in range(pool_size):
self.connections.append(self._create_connection(i))
def _create_connection(self, conn_id):
return {"id": conn_id, "connected_at": time.time()}
@contextmanager
def get_connection(self):
"""获取连接(上下文管理器,自动归还)"""
self.semaphore.acquire()
with self.lock:
conn = self.connections.pop()
self.in_use += 1
print(f" → 获取连接 #{conn['id']} (使用中: {self.in_use}/{self.pool_size})")
try:
yield conn
finally:
with self.lock:
self.connections.append(conn)
self.in_use -= 1
print(f" ← 归还连接 #{conn['id']} (使用中: {self.in_use}/{self.pool_size})")
self.semaphore.release()
# 使用连接池
pool = ConnectionPool(pool_size=3)
def execute_query(query_id):
with pool.get_connection() as conn:
duration = random.uniform(0.5, 1.5)
print(f"查询 {query_id} 执行中 (连接 #{conn['id']})...")
time.sleep(duration)
print(f"查询 {query_id} 完成 ({duration:.1f}s)")
# 8 个并发查询,但最多 3 个同时执行
with ThreadPoolExecutor(max_workers=8) as executor:
executor.map(execute_query, range(8))

9. 常见陷阱与最佳实践#

9.1 陷阱一:忘记 join()#

# ❌ 错误:主线程退出,子线程被强制终止
def long_task():
for i in range(100):
print(f"进度: {i}%")
time.sleep(0.1)
t = threading.Thread(target=long_task)
t.start()
# 程序直接结束了!子线程 work 没做完
# ✅ 正确
t.start()
t.join() # 等待子线程完成

9.2 陷阱二:共享变量不加锁#

# ❌ Python 的 += 不是原子操作
counter = 0
def increment():
global counter
counter += 1 # 读 → 加 → 写,三步可能被打断
# ✅ 加锁或使用线程安全结构
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
with lock:
counter += 1

9.3 陷阱三:死锁#

# ❌ 经典死锁场景
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread1():
with lock_a:
time.sleep(0.1)
with lock_b: # 等待 lock_b
pass
def thread2():
with lock_b:
time.sleep(0.1)
with lock_a: # 等待 lock_a
pass
# thread1 持有 lock_a 等 lock_b
# thread2 持有 lock_b 等 lock_a
# → 死锁!
# ✅ 修复:统一加锁顺序
def thread1():
with lock_a:
with lock_b:
pass
def thread2():
with lock_a: # 先获取 lock_a(与 thread1 顺序一致)
with lock_b:
pass

9.4 陷阱四:multiprocessing 在 Windows 上的坑#

# ❌ Windows 下会无限递归创建进程
import multiprocessing
def worker():
pass
p = multiprocessing.Process(target=worker)
p.start() # Windows 会重新导入整个模块!
# ✅ 正确:使用 __name__ 保护
if __name__ == "__main__":
p = multiprocessing.Process(target=worker)
p.start()
p.join()

9.5 陷阱五:线程池中传递不可序列化对象#

# ❌ ProcessPoolExecutor 参数和返回值必须可 pickle
from concurrent.futures import ProcessPoolExecutor
lambda_fn = lambda x: x * 2
with ProcessPoolExecutor() as executor:
executor.submit(lambda_fn, 5) # TypeError: can't pickle lambda
# ✅ 使用普通函数
def double(x):
return x * 2
with ProcessPoolExecutor() as executor:
f = executor.submit(double, 5)
print(f.result()) # 10

9.6 最佳实践清单#

  1. 线程安全数据结构优先: 能用 queue.Queue 就不用 list + Lock
  2. 锁的范围最小化: 只锁关键代码,不要锁整个函数
  3. 使用 with 语句: 避免忘记释放锁/信号量
  4. 统一加锁顺序: 防止死锁
  5. I/O 密集用 ThreadPoolExecutor,CPU 密集用 ProcessPoolExecutor
  6. ProcessPoolExecutor 统一加 if __name__ == "__main__": 保护
  7. 设置合理的超时: future.result(timeout=...) 避免永久阻塞
  8. 异常处理:try/except 包裹任务函数,避免一个异常导致整个任务组崩溃

10. 总结#

场景推荐方案关键理由
网络爬虫 / API 调用ThreadPoolExecutorI/O 密集 + 简单 API
图片/视频处理ProcessPoolExecutorCPU 密集 + 真正并行
高并发 Web 服务asyncio + aiohttp超低开销 + 数万并发
简单并行任务ThreadPoolExecutor轻量 + 无 IPC 开销
数据库操作ThreadPoolExecutorI/O 密集 + 线程安全
科学计算 / 数据处理ProcessPoolExecutor绕过 GIL
混合任务(I/O + CPU)ProcessPoolExecutor + ThreadPoolExecutor各取所长

记住三个核心规律:

  1. I/O 等得起 → 用线程(网络、文件、数据库)
  2. CPU 算不动 → 用进程(计算、编码、模型推理)
  3. 高并发等得多 → 用异步(协程、事件循环)

推荐阅读:

Python 多线程与多进程并发编程完全指南:GIL 原理、threading、multiprocessing 到 concurrent.futures | Python 进阶核心知识
https://971918.xyz/posts/python-guide/python-concurrency-guide/
作者
九所长
发布于
2026-06-30
许可协议
CC BY-NC-SA 4.0