Python 通过 concurrent.futures 模块以异步方式处理并发需求

对于计算机程序的执行流而言,I/O 操作通常是时间占比非常大的一块。在当前的硬件设备中,绝大多数 I/O 操作要比 CPU 慢上几个数量级。比如大约花费 1 毫秒写入一个网络 socket,对应到 2.4GHz 的处理器上,同样的时间则可以执行 24000000 条指令。

在一般的同步执行的程序中,当代码遇到 I/O 操作时(如读取一个文件或者写入一个网络 socket),必须暂时中止和内核的交互,去请求 I/O 并等待传输完成。这种因 I/O 阻塞而产生的等待在某些情况下往往导致执行效率的低下和响应的延迟。

而在异步执行的流程中,当一个程序进入 I/O 等待时,其控制权会被移交给程序的其他部分,直到 I/O 操作完成时才可以重新获取(这称为上下文切换)。
异步程序中一般会有一个事件循环用来监听事件并分派任务。比如用一个异步程序做一次网络写操作,该请求会立即返回(程序控制权移交给事件循环),即便写操作实际上并未发生。此时程序允许执行另外的函数和运算。
当写操作完成时,会触发一个特定的事件,由事件循环响应该事件并执行关联的操作。

同步程序下载网络资源

以下代码为使用同步的方式下载网络中存放的多张国旗图片:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# flags.py
import os
import time
import sys

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'

def save_flag(img, filename):
if not os.path.exists(DEST_DIR):
os.makedirs(DEST_DIR)
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)

def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = requests.get(url)
return resp.content

def show(text):
print(text, end=' ')
sys.stdout.flush()

def download_many(cc_list):
for cc in sorted(cc_list):
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')

return len(cc_list)

def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))


if __name__ == '__main__':
main(download_many)

# => BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
# => 20 flags downloaded in 218.70s

使用 concurrent.futures 模块下载

concurrent.futures 模块的主要特色是包含 ThreadPoolExecutorProcessPoolExecutor 两个类,它们实现的接口可以分别在不同的线程或进程中执行可调用的对象,并且它们内部都维护着一个工作线程(或进程)池和一个任务队列。

下载代码如下(引用了上一个源文件 flags.py 中的几个功能函数):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent import futures

from flags import save_flag, get_flag, show, main

MAX_WORKERS = 20

def download_one(cc):
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc

def download_many(cc_list):
workers = min(MAX_WORKERS, len(cc_list))
with futures.ThreadPoolExecutor(workers) as executor:
res = executor.map(download_one, sorted(cc_list))

return len(list(res))


if __name__ == "__main__":
main(download_many)

# => EG BD NG CD IN ET RU ID CN FR US PK PH MX IR VN BR JP DE TR
# => 20 flags downloaded in 83.36s

其中最关键的部分为 download_many 函数。
workers 变量用于指定 ThreadPoolExecutor 对象使用的工作线程的数量,取预设的最大线程数(MAX_WORKERS)和实际下载数目(len(cc_list))中的较小的值;
with 语句用于使用指定数量(workers)的工作线程初始化 ThreadPoolExecutor 对象;
map 方法类似于内置的 map 函数,目的是使 download_one 函数可以被多个工作线程并行地调用。它会返回一个生成器对象,该生成器可以被遍历以获取每一个 download_one 执行后的结果。

Future 对象

Python 标准库中包含两个名为 Future 的类:concurrent.futures.Futureasyncio.Future 。这两个类的实例都表示可能已经完成或者尚未完成的延迟计算
Future 对象并不是一个立即产生的实际结果,更像是一种“承诺”,需要等待其执行完毕并被我们期待的值所填充。
在等待“承诺”兑现的过程中程序可以同时执行其他运算。

Future 是 concurrent.futures 模块和 asyncio 库的重要组件,但是在上面的代码中并没有直接调用 Future 对象。
通常情况下,Future 不应该由用户显式地创建,而只能由并发框架实例化。Future 如同它的名字一样,代表将要发生的事情,而确定某件事未来会发生的唯一方式是其执行时间已经排定。Executor.submit() 方法会接收一个可调用对象作为参数并为其排期,返回一个 Future 对象。

用户代码也不应该改变 Future 对象的状态。并发框架会在 Future 代表的延迟计算结束后自动改变 Future 的状态,没有办法人为地控制延迟计算何时结束。
Future 具有非阻塞的 .done() 方法,返回布尔值表明 Future 对应的调用对象是否已经执行完毕。此外还有 .add_done_callback() 方法用于在 Future 运行结束后执行特定的回调函数。
concurrency.futures.Future 实例还有 .results() 方法用以获取 Future 执行的可调用对象的结果。该方法会阻塞调用方所在的线程,直到可调用对象运行结束并返回结果。result() 方法可以接收可选的 timeout 参数用于设定超时时间。

从更现实的角度理解 Future 对象,可以参考如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from concurrent import futures
from flags import save_flag, get_flag, show, main
import time

MAX_WORKERS = 20

def download_one(cc):
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc

def download_many(cc_list):
cc_list = cc_list[:5]
with futures.ThreadPoolExecutor(max_workers=3) as executor:
to_do = []
for cc in sorted(cc_list):
future = executor.submit(download_one, cc)
to_do.append(future)
msg = 'Scheduled for {}: {}'
print(msg.format(cc, future))

results = []
for future in futures.as_completed(to_do):
res = future.result()
msg = '{} result: {!r}'
print(msg.format(future, res))
results.append(res)

return len(results)


if __name__ == '__main__':
main(download_many)

此处的代码只通过 3 个工作线程获取 5 个国家的国旗图片。和之前的代码相比,将 download_many 函数中较抽象的 executor.map 替换成了两个 for 循环:

  • executor.submit 用于排定可调用对象(即 download_one(cc))给多个工作线程执行,返回一个 Future 对象表示这个待执行的操作。
  • futures.as_completed 则用于在 Future 运行结束后获取可执行对象(即 download_one(cc))返回的结果。

本例中的 future.result() 方法绝不会阻塞,因为 future 是由 as_completed 函数返回的。

最终输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
Scheduled for BR: <Future at 0x4524a48 state=running>
Scheduled for CN: <Future at 0x3ca46c8 state=running>
Scheduled for ID: <Future at 0x453f7c8 state=running>
Scheduled for IN: <Future at 0x2ab1308 state=pending>
Scheduled for US: <Future at 0x4530888 state=pending>
ID BR CN <Future at 0x453f7c8 state=finished returned str> result: 'ID'
<Future at 0x3ca46c8 state=finished returned str> result: 'CN'
<Future at 0x4524a48 state=finished returned str> result: 'BR'
IN <Future at 0x2ab1308 state=finished returned str> result: 'IN'
US <Future at 0x4530888 state=finished returned str> result: 'US'

5 flags downloaded in 1.57s

从输出中可以看出,前三个 Future 的状态是 running,后两个 Future 的状态是 pending,因为只有三个工作线程可供分配。同时,如果多运行几次,输出结果的顺序也是有变化的。

关于 GIL

Cython 解释器不是线程安全的,它通过 GIL(Global Interpreter Lock,全局解释器锁)强制性地一次只允许一个线程执行 Python 代码。所以通常一个 Python 进程并不能同时使用多个 CPU 核心,即不能够将一个 Python 进程拆分成多个独立执行的线程在多个 CPU 核心上以并行的方式运行。
但是 Python 标准库中所有执行阻塞型 I/O 操作的函数,在等待系统返回结果时都会释放 GIL。即在 I/O 密集型的需求场景下,Python 程序可以通过多线程来提升性能。

参考资料

Fluent Python