Effective Python 笔记 —— 并发与并行(subprocess、Thread、Lock)

使用 subprocess 管理子进程

由 Python 启动的子进程能够以并行的方式运行,从而最大化地利用 CPU 的多个核心。

可以借助 subprocess 内置模块调用子进程。

1
2
3
4
5
6
7
8
9
10
11
import subprocess

result = subprocess.run(
['echo', 'Hello from the child!'],
capture_output=True,
encoding='utf-8'
)

result.check_returncode()
print(result.stdout)
# => Hello from the child!

子进程相对于其父进程是独立地运行的。
如果使用 Popen 类创建一个子进程处理某个任务,则主程序能够在处理其他任务的同时,通过轮询的方式定期查看子进程的状态,确认其是否已经终止运行。
Popen 中的 poll 方法可以实时地检查子进程的运行状态。若子进程还在运行中,则返回 None;若子进程执行完毕,则返回一个 returncode 值。

1
2
3
4
5
6
7
8
9
10
11
import subprocess

proc = subprocess.Popen(['sleep', '1'])
while proc.poll() is None:
print('Working...')
print('Exit status', proc.poll())
# Working...
# Working...
# Working...
# ...
# Exit status 0

解耦子进程与父进程使得父进程可以同时调用多个并行执行的子程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time
import subprocess

start = time.time()
sleep_procs = []
for _ in range(10):
proc = subprocess.Popen(['sleep', '1'])
sleep_procs.append(proc)

for proc in sleep_procs:
proc.communicate()

end = time.time()
print(f'Finished in {(end - start):.3} seconds')
# => Finished in 1.01 seconds

代码中的 communicate 方法可以用来与子进程通信并等待其终止,此处用于等待所有的子进程执行完毕。
如果上述代码中的子进程以顺序的方式执行,最终整体的延迟会达到 10s 以上。而实际的延迟只略大于 1s,即多个子进程之间是并行的关系。

可以通过管道从 Python 程序向调用的子进程传递数据,并获取子进程的输出内容。
比如调用如下形式的 Shell 测试脚本:

1
2
3
4
#!/bin/bash
echo input your name
read name
echo your name is $name

1
2
3
4
5
6
7
8
9
10
11
12
import subprocess

proc = subprocess.Popen('bash name.sh',
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
shell=True)
proc.stdin.write(b'john')
proc.stdin.flush()

stdout, stderr = proc.communicate()
print(stdout)
# b'input your name\nyour name is john\n'

其中在初始化 Popen 对象时,传入了 stdin=subprocess.PIPEstdout=subprocess.PIPE 两个参数,目的是将子进程的标准输入 STDIN 绑定到 proc 实例的 stdin 属性上,将标准输出 STDOUT 绑定到 proc 实例的 stdout 属性上。从而可以使用 proc.stdin.write() 方法向子进程传入数据。
proc 实例的 communicate 方法会等待子进程终止,并返回 stdoutstderr,即子进程的标准输出和标准错误输出。
若初始化 Popen 时未传入 stdout=subprocess.PIPE 参数,则上面返回的 stdoutNone

如果担心子程序永远不会终止或者长时间阻塞了输入和输出,可以向 communicate 方法传入 timeout 参数来指定等待的最长时间。

1
2
3
4
5
6
7
8
9
10
11
import subprocess

proc = subprocess.Popen(['sleep', '10'])
try:
proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
proc.terminate()
proc.wait()

print('Exit status', proc.poll())
# Exit status -15

知识点
  • subprocess 模块可以调用子进程,且能够管理子进程的输入流和输出流,达到源程序与子进程交互的目的
  • 子进程和 Python 解释器之间是并行运行的,因而可以最大化地利用 CPU 的多个核心
  • subprocess 模块提供的 run 函数可以完成简单的调用操作,而 Popen 类提供了类似 Unix 管线的高级功能
  • communicate 方法的 timeout 参数可以避免死锁及卡住的子进程

使用线程处理阻塞式 IO

Python 的标准实现叫做 CPython。CPython 在运行 Python 程序时,会首先解析源代码并将其编译为字节码,再通过一个基于栈的解释器来运行字节码。
CPython 通过一种称为 GIL 的机制来管理解释器自身的状态信息,强化其一致性。GIL 是一种可以阻止 CPython 解释器受抢占式多线程影响的互斥锁(mutex),从而使控制程序的线程不会被另一个线程意外中断,导致解释器的状态发生混乱。

但 GIL 有一个非常严重的负面影响。不像 C++ 或 Java 等语言可以利用多线程最大化多核心 CPU 的计算能力,Python 虽然支持多线程,但 GIL 会导致任一时刻实际上都只能有一个线程在推进
简单来说,Python 中的多线程不是并行计算,无法同时利用 CPU 的多个核心来提升计算密集型多任务的效率。

单线程处理计算密集型任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time


def factorize(number):
for i in range(1, number + 1):
if number % i == 0:
yield i


numbers = [21390799, 12147599, 15166379, 18522859, 12345678, 87654321]
start = time.time()

for number in numbers:
list(factorize(number))

end = time.time()
print(f'Took {(end - start):.3} seconds')
# Took 6.19 seconds

多线程处理计算密集型任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
from threading import Thread


def factorize(number):
for i in range(1, number + 1):
if number % i == 0:
yield i


class FactorizeThread(Thread):
def __init__(self, number):
super().__init__()
self.number = number

def run(self):
self.factors = list(factorize(self.number))


numbers = [21390799, 12147599, 15166379, 18522859, 12345678, 87654321]
start = time.time()
threads = []

for number in numbers:
thread = FactorizeThread(number)
thread.start()
threads.append(thread)

for thread in threads:
thread.join()

end = time.time()
print(f'Took {(end -start):.3} seconds')
# Took 6.3 seconds

可以看出,Python 中的单线程和多线程在应对计算密集型任务时,两者的处理时间没有相差多少。

但是对于 IO 密集 型的任务,比如从磁盘读写文件、网络传输等阻塞式 IO 操作,使用 Python 中的多线程对于效率的提升就会非常显著。
多线程使得 CPU 不必去等待缓慢的文件读写等 IO 操作。

单线程处理 IO 密集型任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time
from urllib.request import urlopen


def get_example_page():
urlopen('https://example.org')


start = time.time()
for i in range(10):
get_example_page()

print(f'Took {time.time() - start} seconds')
# Took 6.853585243225098 seconds

多线程处理 IO 密集型任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
from urllib.request import urlopen
from threading import Thread


def get_example_page():
urlopen('https://example.org')


start = time.time()
threads = []
for _ in range(10):
thread = Thread(target=get_example_page)
thread.start()
threads.append(thread)

for thread in threads:
thread.join()

print(f'Took {time.time() - start} seconds')
# Took 0.8039891719818115 seconds

知识点
  • 由于 GIL 的存在,Python 中的线程无法并行地在多个 CPU 核心上执行
  • Python 中的多线程能够并行地发起多个系统调用,因而可以同时处理计算任务和阻塞式 IO

使用 Lock 避免数据竞争

GIL 总是会阻止 Python 代码在多个 CPU 核心上并行执行,任意时刻都只能有一个 Python 线程处于活跃状态。
但 GIL 并不会保护代码不受数据竞争的影响。一个线程对于数据结构的操作仍有可能被 Python 解释器中邻近的字节码破坏,尤其是在通过多线程同步地去访问同一个对象的时候。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from threading import Thread


class Counter:
def __init__(self):
self.count = 0

def increment(self, offset):
self.count += offset


def worker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)


how_many = 10 ** 5
counter = Counter()

threads = []
for i in range(5):
thread = Thread(target=worker,
args=(i, how_many, counter))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

expected = how_many * 5
found = counter.count
print(f'Counter should be {expected}, got {found}')
# Counter should be 500000, got 252472

上述代码模拟了一个从传感器网络并行地读取数据并计数的过程。对任意一个传感器,其数据的读取都属于阻塞式 IO,由独立的工作线程去处理,数据读取完成后该工作线程会调用一个计数器对象来累计结果。

但程序运行后,实际得到的计数结果与预期差距很大。
Python 解释器在执行多个线程时会确保这些线程之间的“平等关系”,令它们获得几乎相等的处理时间。这因此需要 Python 时不时地在线程间进行切换,暂时挂起一个正在运行的线程,转而去恢复执行另一个线程。
一个线程甚至有可能在看似符合原子性的操作中间被暂停。

比如 += 操作符在作用到实例的属性上时,类似这样的代码:

1
counter.count += 1

实际上等同于 Python 做出如下所示的三个分开的步骤:

1
2
3
value = getattr(counter, 'count')
result = value + 1
setattr(counter, 'count', result)

再加上线程切换,就有可能导致出现下面这种情况:

1
2
3
4
5
6
7
8
9
# Running in Thread A
value_a = getattr(counter, 'count')
# Context switch to Thread B
value_b = getattr(counter, 'count')
result_b = value_b + 1
setattr(counter, 'count', result_b)
# Context switch back to Thread A
result_a = value_a + 1
setattr(counter, 'count', result_a)

即原本应该计算两次的累加操作实际上只有一次生效了,最终导致出现错误的结果。

为避免上述情形中的数据竞争或者其他形式的数据结构损坏现象,可以借助 Lock 类保护特定的值不被多个线程同步访问。即任一时刻都只能有一个线程可以获得该数据的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from threading import Thread, Lock


class LockingCounter:
def __init__(self):
self.lock = Lock()
self.count = 0

def increment(self, offset):
with self.lock:
self.count += offset


def worker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)


how_many = 10 ** 5
counter = LockingCounter()

threads = []
for i in range(5):
thread = Thread(target=worker,
args=(i, how_many, counter))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

expected = how_many * 5
found = counter.count
print(f'Counter should be {expected}, got {found}')
# Counter should be 500000, got 500000

知识点
  • Python 有 GIL,但在编写代码时仍需关注多线程中的数据竞争
  • 允许多个线程修改同一个不加锁的对象,有可能会损坏数据结构
  • Lock 类可以保护多线程中数据的一致性

参考资料

Effective PYTHON Second Edition