Python 中的协程(coroutine)简介

Python 中的协程(Coroutine)是一种比线程(Thread)更加轻量的代码执行机构。与线程不同的是,协程完全是由程序本身控制,不需要操作系统内核对其进行调度,因而没有线程切换的开销。同时也不需要多线程中数据同步所依赖的锁机制,执行效率与多线程相比要高出很多。
从句法上看,协程可以看作对生成器(Generator)的一种扩展,都是定义体中包含 yield 关键字的函数。启动生成器和协程所需的开销,与调用函数的开销相差无几。

英文中的 yield 有两个意思:产出和让步。 Python 生成器中的 yield 刚好符合了上述两个释义。yield item 会“产出”一个值提供给 next() 的调用方,同时做出“让步”,暂停生成器函数的执行,将程序控制权移交给调用方。直到调用方再次执行 next() 函数,生成器则继续“产出”下一个值。
而协程中的 yield 通常出现在表达式右边(如 data = yield),可以产出值,也可以不产出。调用方可以通过 .send(data) 方法向协程提供数据
不管数据如何流动,yield 都是一种用来实现协作式多任务流程控制工具。协程通过 yield 把控制器让步给中心调度程序,再由调度程序激活其他协程。

一、用作协程的生成器

一个最简单的协程实现代码如下:

1
2
3
4
5
# coroutine.py
def simple_coroutine():
print('-> coroutine started')
x = yield
print('-> coroutine received: ', x)

在 Python Shell 中进行测试,结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
>>> from coroutine import simple_coroutine
>>> my_coro = simple_coroutine()
>>> my_coro
<generator object simple_coroutine at 0x00000000021B4DC8>
>>> next(my_coro)
-> coroutine started
>>> my_coro.send(42)
-> coroutine received: 42
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
>>>

上述代码的执行流程为:

  • 主函数调用 next() 函数启动生成器。生成器在 yield 语句处暂停,没有产出值(None
  • my_coro.send(42) 向协程发送数据 42,协程恢复运行,抛出 StopIteration 异常
协程的状态

可以使用 inspect.getgeneratorstate 获取协程的运行状态,共包含以下四种:

  • GEN_CREATED:等待开始执行
  • GEN_RUNNING:协程正在执行
  • GEN_SUSPENDED:在 yield 表达式处暂停
  • GEN_CLOSE:协程执行结束

参考如下代码:

1
2
3
4
5
6
def simple_coro2(a):
print('-> Started: a=', a)
b = yield a
print('-> Received: b=', b)
c = yield a + b
print('-> Received: c=', c)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>> my_coro2 = simple_coro2(14)
>>> from inspect import getgeneratorstate
>>> getgeneratorstate(my_coro2)
'GEN_CREATED'
>>> next(my_coro2)
-> Started: a= 14
14
>>> getgeneratorstate(my_coro2)
'GEN_SUSPENDED'
>>> my_coro2.send(28)
-> Received: b= 28
42
>>> my_coro2.send(99)
-> Received: c= 99
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
>>> getgeneratorstate(my_coro2)
'GEN_CLOSED'
>>>

协程 simple_coro2 的执行流程分为如下三个阶段:

  • 调用 next(my_coro2),协程启动,打印消息 Started: a= 14,执行 yield a,产出数字 14
  • 调用 my_coro2.send(28),把 28 赋值给 b,打印 Received: b= 28,执行 yield a + b,产出数字 42
  • 调用 my_coro2.send(99),把 99 赋值给 c,打印 Received: c= 99,协程终止

参考如下示意图:coroutine

二、使用协程计算移动平均值

1
2
3
4
5
6
7
8
9
10
# coroaverager.py
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count

在 Python Shell 中进行测试:

1
2
3
4
5
6
7
8
9
10
>>> from coroaverager import averager
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
10.0
>>> coro_avg.send(30)
20.0
>>> coro_avg.send(5)
15.0
>>>

从执行结果看,只要调用方不断把数据发送给协程 averager(),协程就会一直接收值并返回平均值的计算结果。其中 yield 表达式用于暂停协程的执行,返回计算结果给调用方,同时等待调用方继续发送数据给协程以恢复循环。

使用装饰器预激协程

使用协程之前必须预激(即通过 next() 调用启动协程),可以创建如下的用于预激协程的装饰器:

1
2
3
4
5
6
7
8
9
10
# coroutil.py
from functools import wraps

def coroutine(func):
@wraps(func)
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer

此时 coroaverager.py 则可以改为如下版本:

1
2
3
4
5
6
7
8
9
10
11
12
from coroutil import coroutine

@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count

1
2
3
4
5
6
7
8
9
>>> from coroaverager import averager
>>> coro_avg = averager()
>>> coro_avg.send(10)
10.0
>>> coro_avg.send(20)
15.0
>>> coro_avg.send(15)
15.0
>>>

三、协程返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# coroaverager2.py
from collections import namedtuple

Result = namedtuple('Result', 'count average')

def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total/count
return Result(count, average)
1
2
3
4
5
6
7
8
9
10
11
>>> from coroaverager2 import averager
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
>>> coro_avg.send(20)
>>> coro_avg.send(15)
>>> coro_avg.send(None)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: Result(count=3, average=15.0)
>>>

此处的 yield 表达式只接收数据而不返回任何结果,直到协程收到 None,循环终止协程结束,返回最终结果 Result(...)
return 表达式返回的值会传递给调用方,赋值给 StopIteration 异常的一个属性。因此最终结果需要通过 try...except 语句来捕获。
好在 yield from 结构会在内部自动捕获 StopIteration 异常,并把 value 属性的值变成 yield from 表达式的值。

四、yield from

yield from 可以用来简化 for 循环中的 yield 表达式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def gen():
for c in 'AB':
yield c
for i in range(1, 3):
yield i

print(list(gen()))
# => ['A', 'B', 1, 2]


def gen2():
yield from 'AB'
yield from range(1, 3)

print(list(gen2()))
# => ['A', 'B', 1, 2]

yield from 表示的含义为,在生成器 gen 中使用 yield from subgen() 时,subgen 会获得控制权,其产出的值传递给 gen 的调用方。同时 gen 会阻塞,等待 subgen 终止。

使用 yield from 可以连接多个可迭代对象:

1
2
3
4
5
6
7
8
9
def chain(*iterables):
for it in iterables:
yield from it

s = 'ABC'
t = tuple(range(3))

print(list(chain(s, t)))
# => ['A', 'B', 'C', 0, 1, 2]

yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,使得两者可以直接发送和产出值,而不必在位于中间的协程中添加大量处理异常(StopIteration)的代码。

使用 yield from 计算移动平均值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from collections import namedtuple

Result = namedtuple('Result', 'count average')


# the subgenerator
def averager(): # <1>
total = 0.0
count = 0
average = None
while True:
term = yield # <2>
if term is None: # <3>
break
total += term
count += 1
average = total/count
return Result(count, average) # <4>


# the delegating generator
def grouper(results, key): # <5>
while True: # <6>
results[key] = yield from averager() # <7>


# the client code, a.k.a. the caller
def main(data): # <8>
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group) # <9>
for value in values:
group.send(value) # <10>
group.send(None) # <11>

report(results)


# output report
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))


data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}


if __name__ == '__main__':
main(data)

# => 9 boys averaging 40.42kg
# => 9 boys averaging 1.39m
# => 10 girls averaging 42.04kg
# => 10 girls averaging 1.43m

注释:

  • <1>:averager 协程作为被调用的子生成器,计算移动平均值
  • <2>:调用方函数 main 发送的值都会绑定给 term 变量
  • <3>:终止条件,若无此句代码,子生成器永不终止,yield from 也会一直阻塞
  • <4>:返回的 Result 对象将作为 grouper 函数中 yield from 表达式的值
  • <5>:grouper 函数作为委派生成器,相当于子生成器和调用方之间的“管道”
  • <6>:这里 while 循环的每次遍历都会创建一个 averager 协程实例
  • <7>:grouper 通过 .send 发送的每一个值都会被 yield from 导向给 averager 实例,待 averager 处理完所有 grouper 发送的值之后,最终的计算结果绑定给 results[key]while 循环则继续创建另一个 averager 实例用来处理更多的值
  • <8>:main 即客户端代码,子生成器的调用方
  • <9>:预激协程对象
  • <10>:发送数据给 grouper,该数据实际上对 grouper 并不可见,而是通过 yield from 传递给了 averager 中的 term = yield

我要吐了。再见

参考资料

Fluent Python