Python 中的协程(Coroutine)是一种比线程(Thread)更加轻量的代码执行机构。与线程不同的是,协程完全是由程序本身控制,不需要操作系统内核对其进行调度,因而没有线程切换的开销。同时也不需要多线程中数据同步所依赖的锁机制,执行效率与多线程相比要高出很多。
从句法上看,协程可以看作对生成器(Generator)的一种扩展,都是定义体中包含 yield
关键字的函数。启动生成器和协程所需的开销,与调用函数的开销相差无几。
英文中的 yield
有两个意思:产出和让步。 Python 生成器中的 yield
刚好符合了上述两个释义。yield item
会“产出”一个值提供给 next()
的调用方,同时做出“让步”,暂停生成器函数的执行,将程序控制权移交给调用方。直到调用方再次执行 next()
函数,生成器则继续“产出”下一个值。
而协程中的 yield
通常出现在表达式右边(如 data = yield
),可以产出值,也可以不产出。调用方可以通过 .send(data)
方法向协程提供数据。
不管数据如何流动,yield
都是一种用来实现协作式多任务的流程控制工具。协程通过 yield
把控制器让步给中心调度程序,再由调度程序激活其他协程。
一、用作协程的生成器
一个最简单的协程实现代码如下:1
2
3
4
5# coroutine.py
def simple_coroutine():
print('-> coroutine started')
x = yield
print('-> coroutine received: ', x)
在 Python Shell 中进行测试,结果如下:1
2
3
4
5
6
7
8
9
10
11
12from coroutine import simple_coroutine
my_coro = simple_coroutine()
my_coro
<generator object simple_coroutine at 0x00000000021B4DC8>
next(my_coro)
-> coroutine started
42) my_coro.send(
-> coroutine received: 42
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
>>>
上述代码的执行流程为:
- 主函数调用
next()
函数启动生成器。生成器在yield
语句处暂停,没有产出值(None
) my_coro.send(42)
向协程发送数据 42,协程恢复运行,抛出StopIteration
异常
协程的状态
可以使用 inspect.getgeneratorstate
获取协程的运行状态,共包含以下四种:
GEN_CREATED
:等待开始执行GEN_RUNNING
:协程正在执行GEN_SUSPENDED
:在 yield 表达式处暂停GEN_CLOSE
:协程执行结束
参考如下代码:1
2
3
4
5
6def simple_coro2(a):
print('-> Started: a=', a)
b = yield a
print('-> Received: b=', b)
c = yield a + b
print('-> Received: c=', c)
1 | 14) my_coro2 = simple_coro2( |
协程 simple_coro2 的执行流程分为如下三个阶段:
- 调用
next(my_coro2)
,协程启动,打印消息Started: a= 14
,执行yield a
,产出数字 14 - 调用
my_coro2.send(28)
,把 28 赋值给 b,打印Received: b= 28
,执行yield a + b
,产出数字 42 - 调用
my_coro2.send(99)
,把 99 赋值给 c,打印Received: c= 99
,协程终止
参考如下示意图:
二、使用协程计算移动平均值
1 | # coroaverager.py |
在 Python Shell 中进行测试:1
2
3
4
5
6
7
8
9
10from coroaverager import averager
coro_avg = averager()
next(coro_avg)
10) coro_avg.send(
10.0
30) coro_avg.send(
20.0
5) coro_avg.send(
15.0
>>>
从执行结果看,只要调用方不断把数据发送给协程 averager()
,协程就会一直接收值并返回平均值的计算结果。其中 yield
表达式用于暂停协程的执行,返回计算结果给调用方,同时等待调用方继续发送数据给协程以恢复循环。
使用装饰器预激协程
使用协程之前必须预激(即通过 next()
调用启动协程),可以创建如下的用于预激协程的装饰器:1
2
3
4
5
6
7
8
9
10# coroutil.py
from functools import wraps
def coroutine(func):
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer
此时 coroaverager.py
则可以改为如下版本:1
2
3
4
5
6
7
8
9
10
11
12from coroutil import coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count
1 | from coroaverager import averager |
三、协程返回值
1 | # coroaverager2.py |
1 | from coroaverager2 import averager |
此处的 yield
表达式只接收数据而不返回任何结果,直到协程收到 None
,循环终止协程结束,返回最终结果 Result(...)
。return
表达式返回的值会传递给调用方,赋值给 StopIteration
异常的一个属性。因此最终结果需要通过 try...except
语句来捕获。
好在 yield from
结构会在内部自动捕获 StopIteration
异常,并把 value 属性的值变成 yield from
表达式的值。
四、yield from
yield from
可以用来简化 for 循环中的 yield 表达式。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16def gen():
for c in 'AB':
yield c
for i in range(1, 3):
yield i
print(list(gen()))
# => ['A', 'B', 1, 2]
def gen2():
yield from 'AB'
yield from range(1, 3)
print(list(gen2()))
# => ['A', 'B', 1, 2]
yield from
表示的含义为,在生成器 gen
中使用 yield from subgen()
时,subgen
会获得控制权,其产出的值传递给 gen
的调用方。同时 gen
会阻塞,等待 subgen
终止。
使用 yield from
可以连接多个可迭代对象:1
2
3
4
5
6
7
8
9def chain(*iterables):
for it in iterables:
yield from it
s = 'ABC'
t = tuple(range(3))
print(list(chain(s, t)))
# => ['A', 'B', 'C', 0, 1, 2]
yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,使得两者可以直接发送和产出值,而不必在位于中间的协程中添加大量处理异常(StopIteration
)的代码。
使用 yield from 计算移动平均值:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66from collections import namedtuple
Result = namedtuple('Result', 'count average')
# the subgenerator
def averager(): # <1>
total = 0.0
count = 0
average = None
while True:
term = yield # <2>
if term is None: # <3>
break
total += term
count += 1
average = total/count
return Result(count, average) # <4>
# the delegating generator
def grouper(results, key): # <5>
while True: # <6>
results[key] = yield from averager() # <7>
# the client code, a.k.a. the caller
def main(data): # <8>
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group) # <9>
for value in values:
group.send(value) # <10>
group.send(None) # <11>
report(results)
# output report
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))
data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
main(data)
# => 9 boys averaging 40.42kg
# => 9 boys averaging 1.39m
# => 10 girls averaging 42.04kg
# => 10 girls averaging 1.43m
注释:
- <1>:
averager
协程作为被调用的子生成器,计算移动平均值1> - <2>:调用方函数
main
发送的值都会绑定给term
变量2> - <3>:终止条件,若无此句代码,子生成器永不终止,
yield from
也会一直阻塞3> - <4>:返回的
Result
对象将作为grouper
函数中yield from
表达式的值4> - <5>:
grouper
函数作为委派生成器,相当于子生成器和调用方之间的“管道”5> - <6>:这里
while
循环的每次遍历都会创建一个averager
协程实例6> - <7>:
grouper
通过.send
发送的每一个值都会被yield from
导向给averager
实例,待averager
处理完所有grouper
发送的值之后,最终的计算结果绑定给results[key]
。while
循环则继续创建另一个averager
实例用来处理更多的值7> - <8>:
main
即客户端代码,子生成器的调用方8> - <9>:预激协程对象9>
- <10>:发送数据给
grouper
,该数据实际上对grouper
并不可见,而是通过yield from
传递给了averager
中的term = yield
。10>