3438 字
17 分钟

Python 迭代器与生成器完全指南:从原理到实战 | Python 进阶核心知识

迭代器和生成器是 Python 中最强大也最容易被忽视的特性之一。理解它们不仅能帮你写出更优雅的代码,还能掌握惰性求值这一重要的编程范式,大幅节省内存。

Python 迭代器与生成器

本文系统讲解迭代器与生成器的完整知识体系:

  • 迭代器协议与 __iter__ / __next__ 方法
  • iter()next() 内置函数
  • 生成器函数与 yield 关键字
  • yield from 语法与生成器组合
  • 生成器表达式
  • itertools 模块详解
  • 惰性求值与内存优化
  • 自定义迭代器与生成器实战
  • 生成器的高级用法(send/throw/close)

一、迭代器基础#

1.1 什么是迭代器?#

迭代器是实现了迭代器协议的对象,即同时实现了两个方法:

方法作用
__iter__()返回迭代器自身
__next__()返回下一个元素,没有更多元素时抛出 StopIteration

1.2 可迭代对象 vs 迭代器#

# 可迭代对象(Iterable):实现了 __iter__ 方法
# 迭代器(Iterator):实现了 __iter__ 和 __next__ 方法
# 列表是可迭代对象,但不是迭代器
nums = [1, 2, 3]
print(hasattr(nums, '__iter__')) # True
print(hasattr(nums, '__next__')) # False
# 获取迭代器
it = iter(nums)
print(hasattr(it, '__iter__')) # True
print(hasattr(it, '__next__')) # True
# 使用 next() 获取元素
print(next(it)) # 1
print(next(it)) # 2
print(next(it)) # 3
print(next(it)) # StopIteration

1.3 for 循环的本质#

# 这段 for 循环:
for item in [1, 2, 3]:
print(item)
# 实际上等价于:
it = iter([1, 2, 3])
while True:
try:
item = next(it)
print(item)
except StopIteration:
break

1.4 自定义迭代器#

class Countdown:
"""从 N 倒数到 1 的迭代器"""
def __init__(self, start):
self.current = start
def __iter__(self):
return self # 返回自身,因为本身就是迭代器
def __next__(self):
if self.current <= 0:
raise StopIteration
value = self.current
self.current -= 1
return value
# 使用
for i in Countdown(5):
print(i, end=' ')
# 输出: 5 4 3 2 1
# 手动使用
cd = Countdown(3)
print(next(cd)) # 3
print(next(cd)) # 2
print(next(cd)) # 1

1.5 可迭代对象(非迭代器)#

class Range:
"""自定义 range 类(可迭代对象,但不是迭代器)"""
def __init__(self, start, end, step=1):
self.start = start
self.end = end
self.step = step
def __iter__(self):
# 返回一个新的迭代器对象
return RangeIterator(self.start, self.end, self.step)
class RangeIterator:
"""Range 的迭代器"""
def __init__(self, start, end, step):
self.current = start
self.end = end
self.step = step
def __iter__(self):
return self
def __next__(self):
if self.current >= self.end:
raise StopIteration
value = self.current
self.current += self.step
return value
# 可以多次迭代(因为每次 __iter__ 都返回新迭代器)
r = Range(0, 5, 2)
for i in r:
print(i, end=' ') # 0 2 4
print()
for i in r:
print(i, end=' ') # 0 2 4(可以再次迭代)

二、生成器函数#

2.1 什么是生成器?#

生成器是使用 yield 关键字的函数,调用时返回一个生成器对象:

def countdown(n):
"""倒数生成器"""
while n > 0:
yield n
n -= 1
# 调用生成器函数返回生成器对象
gen = countdown(5)
print(type(gen)) # <class 'generator'>
# 生成器是迭代器
print(hasattr(gen, '__iter__')) # True
print(hasattr(gen, '__next__')) # True
# 使用 next() 逐个获取值
print(next(gen)) # 5
print(next(gen)) # 4
print(next(gen)) # 3

2.2 yield 的工作原理#

def simple_gen():
print("开始")
yield 1
print("暂停后恢复")
yield 2
print("再暂停")
yield 3
print("结束")
gen = simple_gen()
# 第一次调用 next():执行到第一个 yield
result = next(gen) # 输出 "开始"
print(result) # 1
# 第二次调用 next():从暂停处继续,执行到第二个 yield
result = next(gen) # 输出 "暂停后恢复"
print(result) # 2
# 第三次调用 next()
result = next(gen) # 输出 "再暂停"
print(result) # 3
# 第四次调用:函数执行完毕,抛出 StopIteration
next(gen) # 输出 "结束",然后 StopIteration

2.3 生成器的惰性求值#

# 生成器只在需要时才计算值
def fibonacci():
"""无限斐波那契数列生成器"""
a, b = 0, 1
while True:
yield b
a, b = b, a + b
# 无限序列也不会耗尽内存
fib = fibonacci()
# 按需获取前 10 个
for _ in range(10):
print(next(fib), end=' ')
# 输出: 1 1 2 3 5 8 13 21 34 55
# 内存对比:生成大序列
import sys
# 列表方式(占用大量内存)
big_list = list(range(1_000_000))
print(sys.getsizeof(big_list)) # ~8MB
# 生成器方式(几乎不占内存)
big_gen = (x for x in range(1_000_000))
print(sys.getsizeof(big_gen)) # ~128 bytes

2.4 生成器表达式#

# 列表推导式(立即计算,占用内存)
squares_list = [x ** 2 for x in range(10)]
print(squares_list) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 生成器表达式(惰性求值)
squares_gen = (x ** 2 for x in range(10))
print(squares_gen) # <generator object <genexpr> at 0x...>
# 可以迭代
for square in squares_gen:
print(square, end=' ')
# 0 1 4 9 16 25 36 49 64 81
# 生成器表达式的常见用法
# 1. 作为函数参数(不需要额外括号)
print(sum(x ** 2 for x in range(10))) # 285
print(max(x ** 2 for x in range(10))) # 81
# 2. 链式操作
nums = (x for x in range(10))
evens = (x for x in nums if x % 2 == 0)
doubles = (x * 2 for x in evens)
# 只有在迭代时才真正计算
print(list(doubles)) # [0, 4, 8, 12, 16]

三、yield from 语法#

3.1 基本用法#

# yield from 将可迭代对象的所有元素逐个产出
def chain(*iterables):
for it in iterables:
yield from it
# 等价于:
def chain_old(*iterables):
for it in iterables:
for item in it:
yield item
# 使用
print(list(chain([1, 2], [3, 4], [5, 6])))
# [1, 2, 3, 4, 5, 6]

3.2 嵌套生成器#

# 扁平化嵌套列表
def flatten(nested):
for item in nested:
if isinstance(item, list):
yield from flatten(item) # 递归调用
else:
yield item
nested = [1, [2, [3, 4], 5], 6, [7, 8]]
print(list(flatten(nested)))
# [1, 2, 3, 4, 5, 6, 7, 8]
# 遍历目录树
import os
def walk_files(root):
for entry in os.scandir(root):
if entry.is_file():
yield entry.path
elif entry.is_dir():
yield from walk_files(entry.path)
# 遍历指定目录下所有文件
# for file in walk_files('/path/to/dir'):
# print(file)

3.3 yield from 的双向通信#

# yield from 建立调用者与子生成器之间的双向通道
def sub_gen():
"""子生成器"""
val = yield "子生成器启动"
print(f"子生成器收到: {val}")
val2 = yield f"第一次收到: {val}"
print(f"子生成器再次收到: {val2}")
return "子生成器结束返回"
def main_gen():
"""主生成器,使用 yield from 委托子生成器"""
result = yield from sub_gen()
print(f"主生成器收到子生成器返回值: {result}")
yield result
# 使用
gen = main_gen()
print(next(gen)) # "子生成器启动"
print(gen.send("你好")) # "子生成器收到: 你好",然后 "第一次收到: 你好"
print(gen.send("再见")) # "子生成器再次收到: 再见",然后 "主生成器收到子生成器返回值: 子生成器结束返回"

3.4 获取子生成器返回值#

def inner():
yield 1
yield 2
return "inner done"
def outer():
# result 获取 inner 的 return 值
result = yield from inner()
print(f"inner 返回: {result}")
yield 3
print(list(outer()))
# 输出 "inner 返回: inner done"
# 输出: [1, 2, 3]

四、生成器的高级方法#

4.1 send():向生成器发送数据#

def echo():
"""回显生成器:接收并返回发送的值"""
msg = "初始值"
while True:
incoming = yield msg # yield 返回值就是 send() 发送的值
if incoming is None:
break
msg = f"收到: {incoming}"
gen = echo()
# 第一次必须用 next() 启动(或 send(None))
print(next(gen)) # "初始值"
# 使用 send() 发送数据
print(gen.send("Hello")) # "收到: Hello"
print(gen.send("World")) # "收到: World"
# 发送 None 结束
# gen.send(None) # 抛出 StopIteration

4.2 throw():向生成器抛异常#

def guarded_gen():
try:
yield 1
yield 2
yield 3
except ValueError:
print("捕获到 ValueError")
yield "错误已处理"
gen = guarded_gen()
print(next(gen)) # 1
print(gen.throw(ValueError("出错了"))) # 输出 "捕获到 ValueError",返回 "错误已处理"

4.3 close():关闭生成器#

def resource_gen():
print("打开资源")
try:
yield 1
yield 2
yield 3
finally:
print("关闭资源")
gen = resource_gen()
print(next(gen)) # "打开资源",1
gen.close() # "关闭资源"
# 关闭后再调用 next() 会抛出 StopIteration
# next(gen) # StopIteration

五、itertools 模块#

5.1 无限迭代器#

import itertools
# count(start, step):无限计数
counter = itertools.count(10, 2) # 从 10 开始,步长 2
print(next(counter)) # 10
print(next(counter)) # 12
print(next(counter)) # 14
# cycle(iterable):无限循环
cyc = itertools.cycle(['A', 'B', 'C'])
print(next(cyc)) # A
print(next(cyc)) # B
print(next(cyc)) # C
print(next(cyc)) # A
# repeat(obj, times):重复
repeater = itertools.repeat("Hello", 3)
print(list(repeater)) # ['Hello', 'Hello', 'Hello']

5.2 排列组合迭代器#

import itertools
# product:笛卡尔积
for a, b in itertools.product([1, 2], ['x', 'y']):
print(f"{a}-{b}", end=' ')
# 1-x 1-y 2-x 2-y
# permutations:排列(顺序重要)
for p in itertools.permutations([1, 2, 3], 2):
print(p, end=' ')
# (1, 2) (1, 3) (2, 1) (2, 3) (3, 1) (3, 2)
# combinations:组合(顺序不重要)
for c in itertools.combinations([1, 2, 3], 2):
print(c, end=' ')
# (1, 2) (1, 3) (2, 3)
# combinations_with_replacement:可重复组合
for c in itertools.combinations_with_replacement([1, 2], 2):
print(c, end=' ')
# (1, 1) (1, 2) (2, 2)

5.3 处理输入迭代器#

import itertools
# accumulate:累积
nums = [1, 2, 3, 4, 5]
print(list(itertools.accumulate(nums))) # [1, 3, 6, 10, 15]
# chain:连接多个可迭代对象
print(list(itertools.chain([1, 2], [3, 4], [5]))) # [1, 2, 3, 4, 5]
# chain.from_iterable:从嵌套可迭代对象扁平化
print(list(itertools.chain.from_iterable([[1, 2], [3, 4]]))) # [1, 2, 3, 4]
# compress:根据选择器过滤
data = ['A', 'B', 'C', 'D']
selectors = [True, False, True, False]
print(list(itertools.compress(data, selectors))) # ['A', 'C']
# dropwhile:丢弃满足条件的前缀元素
nums = [1, 3, 5, 2, 4, 6]
print(list(itertools.dropwhile(lambda x: x < 5, nums))) # [5, 2, 4, 6]
# takewhile:取满足条件的前缀元素
print(list(itertools.takewhile(lambda x: x < 5, nums))) # [1, 3]
# islice:切片(支持步长)
print(list(itertools.islice(range(10), 2, 8, 2))) # [2, 4, 6]
# starmap:将参数元组解包后传入函数
points = [(1, 2), (3, 4), (5, 6)]
print(list(itertools.starmap(lambda x, y: x + y, points))) # [3, 7, 11]

5.4 分组与分组迭代#

import itertools
# groupby:按键分组(注意:需要先排序)
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('A', 5)]
# 按第一个元素分组
for key, group in itertools.groupby(data, key=lambda x: x[0]):
print(f"{key}: {list(group)}")
# A: [('A', 1), ('A', 2)]
# B: [('B', 3), ('B', 4)]
# A: [('A', 5)] (注意:相同键不连续时会分成多个组)
# tee:从一个迭代器创建多个独立迭代器
nums = range(5)
it1, it2 = itertools.tee(nums, 2)
print(list(it1)) # [0, 1, 2, 3, 4]
print(list(it2)) # [0, 1, 2, 3, 4]
# zip_longest:不等长 zip,用 fillvalue 填充
a = [1, 2, 3]
b = ['x', 'y']
print(list(itertools.zip_longest(a, b, fillvalue='-')))
# [(1, 'x'), (2, 'y'), (3, '-')]

六、实战场景#

6.1 大文件逐行读取#

def read_large_file(filepath):
"""逐行读取大文件,内存占用极低"""
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
yield line.strip()
# 使用
# for line in read_large_file('huge_file.txt'):
# process(line)

6.2 数据流管道#

def gen_numbers():
"""生成数字"""
for i in range(1000000):
yield i
def filter_even(numbers):
"""过滤偶数"""
for n in numbers:
if n % 2 == 0:
yield n
def square(numbers):
"""平方"""
for n in numbers:
yield n * n
def take_first(numbers, n):
"""取前 n 个"""
for i, num in enumerate(numbers):
if i >= n:
break
yield num
# 构建管道
pipeline = take_first(square(filter_even(gen_numbers())), 5)
# 惰性求值:只计算需要的
print(list(pipeline)) # [0, 4, 16, 36, 64]

6.3 分页数据获取#

def fetch_all_pages(api_func, start_page=1, page_size=100):
"""分页获取所有数据"""
page = start_page
while True:
results = api_func(page, page_size)
if not results:
break
for item in results:
yield item
page += 1
if len(results) < page_size:
break
# 模拟 API
def mock_api(page, page_size):
if page > 3:
return []
return [f"item_{page}_{i}" for i in range(page_size)]
# 使用
all_items = fetch_all_pages(mock_api)
for item in all_items:
pass # 处理每个 item

6.4 状态机实现#

def traffic_light():
"""交通灯状态机"""
while True:
# 红灯
yield "🔴 红灯 - 停止"
# 绿灯
yield "🟢 绿灯 - 通行"
# 黄灯
yield "🟡 黄灯 - 注意"
light = traffic_light()
for _ in range(6):
print(next(light))
# 🔴 红灯 - 停止
# 🟢 绿灯 - 通行
# 🟡 黄灯 - 注意
# 🔴 红灯 - 停止
# ...

七、常见陷阱与最佳实践#

❌ 陷阱 1:生成器只能迭代一次#

def get_numbers():
yield from [1, 2, 3]
gen = get_numbers()
print(list(gen)) # [1, 2, 3]
print(list(gen)) # [] (生成器已耗尽)
# ✅ 解决方法:每次需要时重新创建生成器
print(list(get_numbers())) # [1, 2, 3]
print(list(get_numbers())) # [1, 2, 3]

❌ 陷阱 2:生成器表达式括号问题#

# ❌ 错误:在函数中使用生成器表达式需要注意
print(sum((x ** 2 for x in range(10)))) # 可以,但多一层括号
# ✅ 正确:作为唯一参数时可以省略括号
print(sum(x ** 2 for x in range(10))) # 285

❌ 陷阱 3:在生成器中修改列表#

# ❌ 错误:迭代时修改列表会导致意外行为
def remove_even(lst):
for item in lst:
if item % 2 == 0:
lst.remove(item) # 迭代时修改列表
return lst
nums = [1, 2, 3, 4, 5, 6]
print(remove_even(nums)) # [1, 3, 5] (可能跳过元素)
# ✅ 正确:创建新列表或使用列表推导式
nums = [1, 2, 3, 4, 5, 6]
result = [x for x in nums if x % 2 != 0]

❌ 陷阱 4:忘记启动生成器#

# ❌ 错误:直接 send 非 None 值
gen = echo()
# gen.send("Hello") # TypeError: can't send non-None value to a just-started generator
# ✅ 正确:先用 next() 或 send(None) 启动
gen = echo()
next(gen) # 启动
gen.send("Hi") # 正常

✅ 最佳实践清单#

原则说明
优先使用生成器处理大数据节省内存,流式处理
生成器表达式用于简单场景代码简洁,一行搞定
生成器函数用于复杂逻辑状态保存、多 yield 点
组合使用 itertools避免重复造轮子
注意生成器一次性用完即弃,需要多次迭代时重新创建
close() 清理资源使用 try/finally 确保资源释放
文档化生成器行为说明产出什么、何时结束

八、总结#

迭代器 vs 生成器对比#

特性迭代器生成器
创建方式实现 __iter____next__使用 yield 关键字
代码量较多简洁
状态管理手动管理自动保存
灵活性更高(send/throw/close)
适用场景复杂数据结构大多数场景

什么时候用生成器?#

✅ 处理大文件/大数据集 → 惰性求值节省内存
✅ 无限序列 → 斐波那契、计数器
✅ 数据流管道 → 多步转换、链式操作
✅ 分页 API → 逐页加载
✅ 状态机 → 保存状态、分步执行
✅ 协程基础 → send/throw/close 双向通信
❌ 需要随机访问 → 使用列表
❌ 需要多次迭代 → 使用可迭代对象(每次返回新迭代器)
❌ 数据量很小 → 直接用列表更简单

生成器是 Python 最优雅的特性之一。掌握迭代器和生成器,你就能用更少的内存处理更大的数据,写出更简洁高效的代码。从简单的生成器表达式开始,逐步探索 yield from、itertools 和高级方法,你会发现 Python 的迭代世界远比想象中精彩。

Python 迭代器与生成器完全指南:从原理到实战 | Python 进阶核心知识
https://971918.xyz/posts/python-guide/python-iterator-generator-guide/
作者
九所长
发布于
2026-06-24
许可协议
CC BY-NC-SA 4.0