3438 字
17 分钟
Python 迭代器与生成器完全指南:从原理到实战 | Python 进阶核心知识
迭代器和生成器是 Python 中最强大也最容易被忽视的特性之一。理解它们不仅能帮你写出更优雅的代码,还能掌握惰性求值这一重要的编程范式,大幅节省内存。

本文系统讲解迭代器与生成器的完整知识体系:
- 迭代器协议与
__iter__/__next__方法 iter()和next()内置函数- 生成器函数与
yield关键字 yield from语法与生成器组合- 生成器表达式
itertools模块详解- 惰性求值与内存优化
- 自定义迭代器与生成器实战
- 生成器的高级用法(send/throw/close)
一、迭代器基础
1.1 什么是迭代器?
迭代器是实现了迭代器协议的对象,即同时实现了两个方法:
| 方法 | 作用 |
|---|---|
__iter__() | 返回迭代器自身 |
__next__() | 返回下一个元素,没有更多元素时抛出 StopIteration |
1.2 可迭代对象 vs 迭代器
# 可迭代对象(Iterable):实现了 __iter__ 方法# 迭代器(Iterator):实现了 __iter__ 和 __next__ 方法
# 列表是可迭代对象,但不是迭代器nums = [1, 2, 3]print(hasattr(nums, '__iter__')) # Trueprint(hasattr(nums, '__next__')) # False
# 获取迭代器it = iter(nums)print(hasattr(it, '__iter__')) # Trueprint(hasattr(it, '__next__')) # True
# 使用 next() 获取元素print(next(it)) # 1print(next(it)) # 2print(next(it)) # 3print(next(it)) # StopIteration1.3 for 循环的本质
# 这段 for 循环:for item in [1, 2, 3]: print(item)
# 实际上等价于:it = iter([1, 2, 3])while True: try: item = next(it) print(item) except StopIteration: break1.4 自定义迭代器
class Countdown: """从 N 倒数到 1 的迭代器"""
def __init__(self, start): self.current = start
def __iter__(self): return self # 返回自身,因为本身就是迭代器
def __next__(self): if self.current <= 0: raise StopIteration
value = self.current self.current -= 1 return value
# 使用for i in Countdown(5): print(i, end=' ')# 输出: 5 4 3 2 1
# 手动使用cd = Countdown(3)print(next(cd)) # 3print(next(cd)) # 2print(next(cd)) # 11.5 可迭代对象(非迭代器)
class Range: """自定义 range 类(可迭代对象,但不是迭代器)"""
def __init__(self, start, end, step=1): self.start = start self.end = end self.step = step
def __iter__(self): # 返回一个新的迭代器对象 return RangeIterator(self.start, self.end, self.step)
class RangeIterator: """Range 的迭代器"""
def __init__(self, start, end, step): self.current = start self.end = end self.step = step
def __iter__(self): return self
def __next__(self): if self.current >= self.end: raise StopIteration value = self.current self.current += self.step return value
# 可以多次迭代(因为每次 __iter__ 都返回新迭代器)r = Range(0, 5, 2)for i in r: print(i, end=' ') # 0 2 4print()for i in r: print(i, end=' ') # 0 2 4(可以再次迭代)二、生成器函数
2.1 什么是生成器?
生成器是使用 yield 关键字的函数,调用时返回一个生成器对象:
def countdown(n): """倒数生成器""" while n > 0: yield n n -= 1
# 调用生成器函数返回生成器对象gen = countdown(5)print(type(gen)) # <class 'generator'>
# 生成器是迭代器print(hasattr(gen, '__iter__')) # Trueprint(hasattr(gen, '__next__')) # True
# 使用 next() 逐个获取值print(next(gen)) # 5print(next(gen)) # 4print(next(gen)) # 32.2 yield 的工作原理
def simple_gen(): print("开始") yield 1 print("暂停后恢复") yield 2 print("再暂停") yield 3 print("结束")
gen = simple_gen()
# 第一次调用 next():执行到第一个 yieldresult = next(gen) # 输出 "开始"print(result) # 1
# 第二次调用 next():从暂停处继续,执行到第二个 yieldresult = next(gen) # 输出 "暂停后恢复"print(result) # 2
# 第三次调用 next()result = next(gen) # 输出 "再暂停"print(result) # 3
# 第四次调用:函数执行完毕,抛出 StopIterationnext(gen) # 输出 "结束",然后 StopIteration2.3 生成器的惰性求值
# 生成器只在需要时才计算值def fibonacci(): """无限斐波那契数列生成器""" a, b = 0, 1 while True: yield b a, b = b, a + b
# 无限序列也不会耗尽内存fib = fibonacci()
# 按需获取前 10 个for _ in range(10): print(next(fib), end=' ')# 输出: 1 1 2 3 5 8 13 21 34 55
# 内存对比:生成大序列import sys
# 列表方式(占用大量内存)big_list = list(range(1_000_000))print(sys.getsizeof(big_list)) # ~8MB
# 生成器方式(几乎不占内存)big_gen = (x for x in range(1_000_000))print(sys.getsizeof(big_gen)) # ~128 bytes2.4 生成器表达式
# 列表推导式(立即计算,占用内存)squares_list = [x ** 2 for x in range(10)]print(squares_list) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 生成器表达式(惰性求值)squares_gen = (x ** 2 for x in range(10))print(squares_gen) # <generator object <genexpr> at 0x...>
# 可以迭代for square in squares_gen: print(square, end=' ')# 0 1 4 9 16 25 36 49 64 81
# 生成器表达式的常见用法# 1. 作为函数参数(不需要额外括号)print(sum(x ** 2 for x in range(10))) # 285print(max(x ** 2 for x in range(10))) # 81
# 2. 链式操作nums = (x for x in range(10))evens = (x for x in nums if x % 2 == 0)doubles = (x * 2 for x in evens)
# 只有在迭代时才真正计算print(list(doubles)) # [0, 4, 8, 12, 16]三、yield from 语法
3.1 基本用法
# yield from 将可迭代对象的所有元素逐个产出def chain(*iterables): for it in iterables: yield from it
# 等价于:def chain_old(*iterables): for it in iterables: for item in it: yield item
# 使用print(list(chain([1, 2], [3, 4], [5, 6])))# [1, 2, 3, 4, 5, 6]3.2 嵌套生成器
# 扁平化嵌套列表def flatten(nested): for item in nested: if isinstance(item, list): yield from flatten(item) # 递归调用 else: yield item
nested = [1, [2, [3, 4], 5], 6, [7, 8]]print(list(flatten(nested)))# [1, 2, 3, 4, 5, 6, 7, 8]
# 遍历目录树import os
def walk_files(root): for entry in os.scandir(root): if entry.is_file(): yield entry.path elif entry.is_dir(): yield from walk_files(entry.path)
# 遍历指定目录下所有文件# for file in walk_files('/path/to/dir'):# print(file)3.3 yield from 的双向通信
# yield from 建立调用者与子生成器之间的双向通道def sub_gen(): """子生成器""" val = yield "子生成器启动" print(f"子生成器收到: {val}") val2 = yield f"第一次收到: {val}" print(f"子生成器再次收到: {val2}") return "子生成器结束返回"
def main_gen(): """主生成器,使用 yield from 委托子生成器""" result = yield from sub_gen() print(f"主生成器收到子生成器返回值: {result}") yield result
# 使用gen = main_gen()print(next(gen)) # "子生成器启动"print(gen.send("你好")) # "子生成器收到: 你好",然后 "第一次收到: 你好"print(gen.send("再见")) # "子生成器再次收到: 再见",然后 "主生成器收到子生成器返回值: 子生成器结束返回"3.4 获取子生成器返回值
def inner(): yield 1 yield 2 return "inner done"
def outer(): # result 获取 inner 的 return 值 result = yield from inner() print(f"inner 返回: {result}") yield 3
print(list(outer()))# 输出 "inner 返回: inner done"# 输出: [1, 2, 3]四、生成器的高级方法
4.1 send():向生成器发送数据
def echo(): """回显生成器:接收并返回发送的值""" msg = "初始值" while True: incoming = yield msg # yield 返回值就是 send() 发送的值 if incoming is None: break msg = f"收到: {incoming}"
gen = echo()
# 第一次必须用 next() 启动(或 send(None))print(next(gen)) # "初始值"
# 使用 send() 发送数据print(gen.send("Hello")) # "收到: Hello"print(gen.send("World")) # "收到: World"
# 发送 None 结束# gen.send(None) # 抛出 StopIteration4.2 throw():向生成器抛异常
def guarded_gen(): try: yield 1 yield 2 yield 3 except ValueError: print("捕获到 ValueError") yield "错误已处理"
gen = guarded_gen()print(next(gen)) # 1print(gen.throw(ValueError("出错了"))) # 输出 "捕获到 ValueError",返回 "错误已处理"4.3 close():关闭生成器
def resource_gen(): print("打开资源") try: yield 1 yield 2 yield 3 finally: print("关闭资源")
gen = resource_gen()print(next(gen)) # "打开资源",1gen.close() # "关闭资源"
# 关闭后再调用 next() 会抛出 StopIteration# next(gen) # StopIteration五、itertools 模块
5.1 无限迭代器
import itertools
# count(start, step):无限计数counter = itertools.count(10, 2) # 从 10 开始,步长 2print(next(counter)) # 10print(next(counter)) # 12print(next(counter)) # 14
# cycle(iterable):无限循环cyc = itertools.cycle(['A', 'B', 'C'])print(next(cyc)) # Aprint(next(cyc)) # Bprint(next(cyc)) # Cprint(next(cyc)) # A
# repeat(obj, times):重复repeater = itertools.repeat("Hello", 3)print(list(repeater)) # ['Hello', 'Hello', 'Hello']5.2 排列组合迭代器
import itertools
# product:笛卡尔积for a, b in itertools.product([1, 2], ['x', 'y']): print(f"{a}-{b}", end=' ')# 1-x 1-y 2-x 2-y
# permutations:排列(顺序重要)for p in itertools.permutations([1, 2, 3], 2): print(p, end=' ')# (1, 2) (1, 3) (2, 1) (2, 3) (3, 1) (3, 2)
# combinations:组合(顺序不重要)for c in itertools.combinations([1, 2, 3], 2): print(c, end=' ')# (1, 2) (1, 3) (2, 3)
# combinations_with_replacement:可重复组合for c in itertools.combinations_with_replacement([1, 2], 2): print(c, end=' ')# (1, 1) (1, 2) (2, 2)5.3 处理输入迭代器
import itertools
# accumulate:累积nums = [1, 2, 3, 4, 5]print(list(itertools.accumulate(nums))) # [1, 3, 6, 10, 15]
# chain:连接多个可迭代对象print(list(itertools.chain([1, 2], [3, 4], [5]))) # [1, 2, 3, 4, 5]
# chain.from_iterable:从嵌套可迭代对象扁平化print(list(itertools.chain.from_iterable([[1, 2], [3, 4]]))) # [1, 2, 3, 4]
# compress:根据选择器过滤data = ['A', 'B', 'C', 'D']selectors = [True, False, True, False]print(list(itertools.compress(data, selectors))) # ['A', 'C']
# dropwhile:丢弃满足条件的前缀元素nums = [1, 3, 5, 2, 4, 6]print(list(itertools.dropwhile(lambda x: x < 5, nums))) # [5, 2, 4, 6]
# takewhile:取满足条件的前缀元素print(list(itertools.takewhile(lambda x: x < 5, nums))) # [1, 3]
# islice:切片(支持步长)print(list(itertools.islice(range(10), 2, 8, 2))) # [2, 4, 6]
# starmap:将参数元组解包后传入函数points = [(1, 2), (3, 4), (5, 6)]print(list(itertools.starmap(lambda x, y: x + y, points))) # [3, 7, 11]5.4 分组与分组迭代
import itertools
# groupby:按键分组(注意:需要先排序)data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('A', 5)]# 按第一个元素分组for key, group in itertools.groupby(data, key=lambda x: x[0]): print(f"{key}: {list(group)}")# A: [('A', 1), ('A', 2)]# B: [('B', 3), ('B', 4)]# A: [('A', 5)] (注意:相同键不连续时会分成多个组)
# tee:从一个迭代器创建多个独立迭代器nums = range(5)it1, it2 = itertools.tee(nums, 2)print(list(it1)) # [0, 1, 2, 3, 4]print(list(it2)) # [0, 1, 2, 3, 4]
# zip_longest:不等长 zip,用 fillvalue 填充a = [1, 2, 3]b = ['x', 'y']print(list(itertools.zip_longest(a, b, fillvalue='-')))# [(1, 'x'), (2, 'y'), (3, '-')]六、实战场景
6.1 大文件逐行读取
def read_large_file(filepath): """逐行读取大文件,内存占用极低""" with open(filepath, 'r', encoding='utf-8') as f: for line in f: yield line.strip()
# 使用# for line in read_large_file('huge_file.txt'):# process(line)6.2 数据流管道
def gen_numbers(): """生成数字""" for i in range(1000000): yield i
def filter_even(numbers): """过滤偶数""" for n in numbers: if n % 2 == 0: yield n
def square(numbers): """平方""" for n in numbers: yield n * n
def take_first(numbers, n): """取前 n 个""" for i, num in enumerate(numbers): if i >= n: break yield num
# 构建管道pipeline = take_first(square(filter_even(gen_numbers())), 5)
# 惰性求值:只计算需要的print(list(pipeline)) # [0, 4, 16, 36, 64]6.3 分页数据获取
def fetch_all_pages(api_func, start_page=1, page_size=100): """分页获取所有数据""" page = start_page while True: results = api_func(page, page_size) if not results: break for item in results: yield item page += 1 if len(results) < page_size: break
# 模拟 APIdef mock_api(page, page_size): if page > 3: return [] return [f"item_{page}_{i}" for i in range(page_size)]
# 使用all_items = fetch_all_pages(mock_api)for item in all_items: pass # 处理每个 item6.4 状态机实现
def traffic_light(): """交通灯状态机""" while True: # 红灯 yield "🔴 红灯 - 停止" # 绿灯 yield "🟢 绿灯 - 通行" # 黄灯 yield "🟡 黄灯 - 注意"
light = traffic_light()for _ in range(6): print(next(light))# 🔴 红灯 - 停止# 🟢 绿灯 - 通行# 🟡 黄灯 - 注意# 🔴 红灯 - 停止# ...七、常见陷阱与最佳实践
❌ 陷阱 1:生成器只能迭代一次
def get_numbers(): yield from [1, 2, 3]
gen = get_numbers()print(list(gen)) # [1, 2, 3]print(list(gen)) # [] (生成器已耗尽)
# ✅ 解决方法:每次需要时重新创建生成器print(list(get_numbers())) # [1, 2, 3]print(list(get_numbers())) # [1, 2, 3]❌ 陷阱 2:生成器表达式括号问题
# ❌ 错误:在函数中使用生成器表达式需要注意print(sum((x ** 2 for x in range(10)))) # 可以,但多一层括号
# ✅ 正确:作为唯一参数时可以省略括号print(sum(x ** 2 for x in range(10))) # 285❌ 陷阱 3:在生成器中修改列表
# ❌ 错误:迭代时修改列表会导致意外行为def remove_even(lst): for item in lst: if item % 2 == 0: lst.remove(item) # 迭代时修改列表 return lst
nums = [1, 2, 3, 4, 5, 6]print(remove_even(nums)) # [1, 3, 5] (可能跳过元素)
# ✅ 正确:创建新列表或使用列表推导式nums = [1, 2, 3, 4, 5, 6]result = [x for x in nums if x % 2 != 0]❌ 陷阱 4:忘记启动生成器
# ❌ 错误:直接 send 非 None 值gen = echo()# gen.send("Hello") # TypeError: can't send non-None value to a just-started generator
# ✅ 正确:先用 next() 或 send(None) 启动gen = echo()next(gen) # 启动gen.send("Hi") # 正常✅ 最佳实践清单
| 原则 | 说明 |
|---|---|
| 优先使用生成器处理大数据 | 节省内存,流式处理 |
| 生成器表达式用于简单场景 | 代码简洁,一行搞定 |
| 生成器函数用于复杂逻辑 | 状态保存、多 yield 点 |
| 组合使用 itertools | 避免重复造轮子 |
| 注意生成器一次性 | 用完即弃,需要多次迭代时重新创建 |
| close() 清理资源 | 使用 try/finally 确保资源释放 |
| 文档化生成器行为 | 说明产出什么、何时结束 |
八、总结
迭代器 vs 生成器对比
| 特性 | 迭代器 | 生成器 |
|---|---|---|
| 创建方式 | 实现 __iter__ 和 __next__ | 使用 yield 关键字 |
| 代码量 | 较多 | 简洁 |
| 状态管理 | 手动管理 | 自动保存 |
| 灵活性 | 高 | 更高(send/throw/close) |
| 适用场景 | 复杂数据结构 | 大多数场景 |
什么时候用生成器?
✅ 处理大文件/大数据集 → 惰性求值节省内存✅ 无限序列 → 斐波那契、计数器✅ 数据流管道 → 多步转换、链式操作✅ 分页 API → 逐页加载✅ 状态机 → 保存状态、分步执行✅ 协程基础 → send/throw/close 双向通信
❌ 需要随机访问 → 使用列表❌ 需要多次迭代 → 使用可迭代对象(每次返回新迭代器)❌ 数据量很小 → 直接用列表更简单生成器是 Python 最优雅的特性之一。掌握迭代器和生成器,你就能用更少的内存处理更大的数据,写出更简洁高效的代码。从简单的生成器表达式开始,逐步探索 yield from、itertools 和高级方法,你会发现 Python 的迭代世界远比想象中精彩。
Python 迭代器与生成器完全指南:从原理到实战 | Python 进阶核心知识
https://971918.xyz/posts/python-guide/python-iterator-generator-guide/