Python 中的协程(Coroutine)是一种比线程(Thread)更加轻量的代码执行机构。与线程不同的是,协程完全是由程序本身控制,不需要操作系统内核对其进行调度,因而没有线程切换的开销。同时也不需要多线程中数据同步所依赖的锁机制,执行效率与多线程相比要高出很多。
从句法上看,协程可以看作对生成器(Generator)的一种扩展,都是定义体中包含 yield 关键字的函数。启动生成器和协程所需的开销,与调用函数的开销相差无几。
英文中的 yield 有两个意思:产出和让步。 Python 生成器中的 yield 刚好符合了上述两个释义。yield item 会“产出”一个值提供给 next() 的调用方,同时做出“让步”,暂停生成器函数的执行,将程序控制权移交给调用方。直到调用方再次执行 next() 函数,生成器则继续“产出”下一个值。
而协程中的 yield 通常出现在表达式右边(如 data = yield),可以产出值,也可以不产出。调用方可以通过 .send(data) 方法向协程提供数据。
不管数据如何流动,yield 都是一种用来实现协作式多任务的流程控制工具。协程通过 yield 把控制器让步给中心调度程序,再由调度程序激活其他协程。
一、用作协程的生成器
一个最简单的协程实现代码如下:1
2
3
4
5# coroutine.py
def simple_coroutine():
print('-> coroutine started')
x = yield
print('-> coroutine received: ', x)
在 Python Shell 中进行测试,结果如下:1
2
3
4
5
6
7
8
9
10
11
12from coroutine import simple_coroutine
my_coro = simple_coroutine()
my_coro
<generator object simple_coroutine at 0x00000000021B4DC8>
next(my_coro)
-> coroutine started
my_coro.send(42)
-> coroutine received: 42
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
>>>
上述代码的执行流程为:
- 主函数调用
next()函数启动生成器。生成器在yield语句处暂停,没有产出值(None) my_coro.send(42)向协程发送数据 42,协程恢复运行,抛出StopIteration异常
协程的状态
可以使用 inspect.getgeneratorstate 获取协程的运行状态,共包含以下四种:
GEN_CREATED:等待开始执行GEN_RUNNING:协程正在执行GEN_SUSPENDED:在 yield 表达式处暂停GEN_CLOSE:协程执行结束
参考如下代码:1
2
3
4
5
6def simple_coro2(a):
print('-> Started: a=', a)
b = yield a
print('-> Received: b=', b)
c = yield a + b
print('-> Received: c=', c)
1 | my_coro2 = simple_coro2(14) |
协程 simple_coro2 的执行流程分为如下三个阶段:
- 调用
next(my_coro2),协程启动,打印消息Started: a= 14,执行yield a,产出数字 14 - 调用
my_coro2.send(28),把 28 赋值给 b,打印Received: b= 28,执行yield a + b,产出数字 42 - 调用
my_coro2.send(99),把 99 赋值给 c,打印Received: c= 99,协程终止
参考如下示意图:
二、使用协程计算移动平均值
1 | # coroaverager.py |
在 Python Shell 中进行测试:1
2
3
4
5
6
7
8
9
10from coroaverager import averager
coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
10.0
coro_avg.send(30)
20.0
coro_avg.send(5)
15.0
>>>
从执行结果看,只要调用方不断把数据发送给协程 averager(),协程就会一直接收值并返回平均值的计算结果。其中 yield 表达式用于暂停协程的执行,返回计算结果给调用方,同时等待调用方继续发送数据给协程以恢复循环。
使用装饰器预激协程
使用协程之前必须预激(即通过 next() 调用启动协程),可以创建如下的用于预激协程的装饰器:1
2
3
4
5
6
7
8
9
10# coroutil.py
from functools import wraps
def coroutine(func):
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer
此时 coroaverager.py 则可以改为如下版本:1
2
3
4
5
6
7
8
9
10
11
12from coroutil import coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count
1 | from coroaverager import averager |
三、协程返回值
1 | # coroaverager2.py |
1 | from coroaverager2 import averager |
此处的 yield 表达式只接收数据而不返回任何结果,直到协程收到 None,循环终止协程结束,返回最终结果 Result(...)。return 表达式返回的值会传递给调用方,赋值给 StopIteration 异常的一个属性。因此最终结果需要通过 try...except 语句来捕获。
好在 yield from 结构会在内部自动捕获 StopIteration 异常,并把 value 属性的值变成 yield from 表达式的值。
四、yield from
yield from 可以用来简化 for 循环中的 yield 表达式。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16def gen():
for c in 'AB':
yield c
for i in range(1, 3):
yield i
print(list(gen()))
# => ['A', 'B', 1, 2]
def gen2():
yield from 'AB'
yield from range(1, 3)
print(list(gen2()))
# => ['A', 'B', 1, 2]
yield from 表示的含义为,在生成器 gen 中使用 yield from subgen() 时,subgen 会获得控制权,其产出的值传递给 gen 的调用方。同时 gen 会阻塞,等待 subgen 终止。
使用 yield from 可以连接多个可迭代对象:1
2
3
4
5
6
7
8
9def chain(*iterables):
for it in iterables:
yield from it
s = 'ABC'
t = tuple(range(3))
print(list(chain(s, t)))
# => ['A', 'B', 'C', 0, 1, 2]
yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,使得两者可以直接发送和产出值,而不必在位于中间的协程中添加大量处理异常(StopIteration)的代码。
使用 yield from 计算移动平均值:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66from collections import namedtuple
Result = namedtuple('Result', 'count average')
# the subgenerator
def averager(): # <1>
total = 0.0
count = 0
average = None
while True:
term = yield # <2>
if term is None: # <3>
break
total += term
count += 1
average = total/count
return Result(count, average) # <4>
# the delegating generator
def grouper(results, key): # <5>
while True: # <6>
results[key] = yield from averager() # <7>
# the client code, a.k.a. the caller
def main(data): # <8>
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group) # <9>
for value in values:
group.send(value) # <10>
group.send(None) # <11>
report(results)
# output report
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))
data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
main(data)
# => 9 boys averaging 40.42kg
# => 9 boys averaging 1.39m
# => 10 girls averaging 42.04kg
# => 10 girls averaging 1.43m
注释:
- <1>:
averager协程作为被调用的子生成器,计算移动平均值1> - <2>:调用方函数
main发送的值都会绑定给term变量2> - <3>:终止条件,若无此句代码,子生成器永不终止,
yield from也会一直阻塞3> - <4>:返回的
Result对象将作为grouper函数中yield from表达式的值4> - <5>:
grouper函数作为委派生成器,相当于子生成器和调用方之间的“管道”5> - <6>:这里
while循环的每次遍历都会创建一个averager协程实例6> - <7>:
grouper通过.send发送的每一个值都会被yield from导向给averager实例,待averager处理完所有grouper发送的值之后,最终的计算结果绑定给results[key]。while循环则继续创建另一个averager实例用来处理更多的值7> - <8>:
main即客户端代码,子生成器的调用方8> - <9>:预激协程对象9>
- <10>:发送数据给
grouper,该数据实际上对grouper并不可见,而是通过yield from传递给了averager中的term = yield。10>