使用 subprocess
管理子进程
由 Python 启动的子进程能够以并行的方式运行,从而最大化地利用 CPU 的多个核心。
可以借助 subprocess
内置模块调用子进程。1
2
3
4
5
6
7
8
9
10
11import subprocess
result = subprocess.run(
['echo', 'Hello from the child!'],
capture_output=True,
encoding='utf-8'
)
result.check_returncode()
print(result.stdout)
# => Hello from the child!
子进程相对于其父进程是独立地运行的。
如果使用 Popen
类创建一个子进程处理某个任务,则主程序能够在处理其他任务的同时,通过轮询的方式定期查看子进程的状态,确认其是否已经终止运行。Popen
中的 poll
方法可以实时地检查子进程的运行状态。若子进程还在运行中,则返回 None
;若子进程执行完毕,则返回一个 returncode 值。
1 | import subprocess |
解耦子进程与父进程使得父进程可以同时调用多个并行执行的子程序。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import time
import subprocess
start = time.time()
sleep_procs = []
for _ in range(10):
proc = subprocess.Popen(['sleep', '1'])
sleep_procs.append(proc)
for proc in sleep_procs:
proc.communicate()
end = time.time()
print(f'Finished in {(end - start):.3} seconds')
# => Finished in 1.01 seconds
代码中的 communicate
方法可以用来与子进程通信并等待其终止,此处用于等待所有的子进程执行完毕。
如果上述代码中的子进程以顺序的方式执行,最终整体的延迟会达到 10s 以上。而实际的延迟只略大于 1s,即多个子进程之间是并行的关系。
可以通过管道从 Python 程序向调用的子进程传递数据,并获取子进程的输出内容。
比如调用如下形式的 Shell 测试脚本:1
2
3
4
echo input your name
read name
echo your name is $name
1 | import subprocess |
其中在初始化 Popen
对象时,传入了 stdin=subprocess.PIPE
和 stdout=subprocess.PIPE
两个参数,目的是将子进程的标准输入 STDIN 绑定到 proc
实例的 stdin
属性上,将标准输出 STDOUT 绑定到 proc
实例的 stdout
属性上。从而可以使用 proc.stdin.write()
方法向子进程传入数据。proc
实例的 communicate
方法会等待子进程终止,并返回 stdout
和 stderr
,即子进程的标准输出和标准错误输出。
若初始化 Popen
时未传入 stdout=subprocess.PIPE
参数,则上面返回的 stdout
为 None
。
如果担心子程序永远不会终止或者长时间阻塞了输入和输出,可以向 communicate
方法传入 timeout
参数来指定等待的最长时间。1
2
3
4
5
6
7
8
9
10
11import subprocess
proc = subprocess.Popen(['sleep', '10'])
try:
proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
proc.terminate()
proc.wait()
print('Exit status', proc.poll())
# Exit status -15
知识点
subprocess
模块可以调用子进程,且能够管理子进程的输入流和输出流,达到源程序与子进程交互的目的- 子进程和 Python 解释器之间是并行运行的,因而可以最大化地利用 CPU 的多个核心
subprocess
模块提供的run
函数可以完成简单的调用操作,而Popen
类提供了类似 Unix 管线的高级功能communicate
方法的timeout
参数可以避免死锁及卡住的子进程
使用线程处理阻塞式 IO
Python 的标准实现叫做 CPython。CPython 在运行 Python 程序时,会首先解析源代码并将其编译为字节码,再通过一个基于栈的解释器来运行字节码。
CPython 通过一种称为 GIL 的机制来管理解释器自身的状态信息,强化其一致性。GIL 是一种可以阻止 CPython 解释器受抢占式多线程影响的互斥锁(mutex),从而使控制程序的线程不会被另一个线程意外中断,导致解释器的状态发生混乱。
但 GIL 有一个非常严重的负面影响。不像 C++ 或 Java 等语言可以利用多线程最大化多核心 CPU 的计算能力,Python 虽然支持多线程,但 GIL 会导致任一时刻实际上都只能有一个线程在推进。
简单来说,Python 中的多线程不是并行计算,无法同时利用 CPU 的多个核心来提升计算密集型多任务的效率。
单线程处理计算密集型任务:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import time
def factorize(number):
for i in range(1, number + 1):
if number % i == 0:
yield i
numbers = [21390799, 12147599, 15166379, 18522859, 12345678, 87654321]
start = time.time()
for number in numbers:
list(factorize(number))
end = time.time()
print(f'Took {(end - start):.3} seconds')
# Took 6.19 seconds
多线程处理计算密集型任务:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34import time
from threading import Thread
def factorize(number):
for i in range(1, number + 1):
if number % i == 0:
yield i
class FactorizeThread(Thread):
def __init__(self, number):
super().__init__()
self.number = number
def run(self):
self.factors = list(factorize(self.number))
numbers = [21390799, 12147599, 15166379, 18522859, 12345678, 87654321]
start = time.time()
threads = []
for number in numbers:
thread = FactorizeThread(number)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
end = time.time()
print(f'Took {(end -start):.3} seconds')
# Took 6.3 seconds
可以看出,Python 中的单线程和多线程在应对计算密集型任务时,两者的处理时间没有相差多少。
但是对于 IO 密集 型的任务,比如从磁盘读写文件、网络传输等阻塞式 IO 操作,使用 Python 中的多线程对于效率的提升就会非常显著。
多线程使得 CPU 不必去等待缓慢的文件读写等 IO 操作。
单线程处理 IO 密集型任务:1
2
3
4
5
6
7
8
9
10
11
12
13
14import time
from urllib.request import urlopen
def get_example_page():
urlopen('https://example.org')
start = time.time()
for i in range(10):
get_example_page()
print(f'Took {time.time() - start} seconds')
# Took 6.853585243225098 seconds
多线程处理 IO 密集型任务:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import time
from urllib.request import urlopen
from threading import Thread
def get_example_page():
urlopen('https://example.org')
start = time.time()
threads = []
for _ in range(10):
thread = Thread(target=get_example_page)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print(f'Took {time.time() - start} seconds')
# Took 0.8039891719818115 seconds
知识点
- 由于 GIL 的存在,Python 中的线程无法并行地在多个 CPU 核心上执行
- Python 中的多线程能够并行地发起多个系统调用,因而可以同时处理计算任务和阻塞式 IO
使用 Lock 避免数据竞争
GIL 总是会阻止 Python 代码在多个 CPU 核心上并行执行,任意时刻都只能有一个 Python 线程处于活跃状态。
但 GIL 并不会保护代码不受数据竞争的影响。一个线程对于数据结构的操作仍有可能被 Python 解释器中邻近的字节码破坏,尤其是在通过多线程同步地去访问同一个对象的时候。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33from threading import Thread
class Counter:
def __init__(self):
self.count = 0
def increment(self, offset):
self.count += offset
def worker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)
how_many = 10 ** 5
counter = Counter()
threads = []
for i in range(5):
thread = Thread(target=worker,
args=(i, how_many, counter))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
expected = how_many * 5
found = counter.count
print(f'Counter should be {expected}, got {found}')
# Counter should be 500000, got 252472
上述代码模拟了一个从传感器网络并行地读取数据并计数的过程。对任意一个传感器,其数据的读取都属于阻塞式 IO,由独立的工作线程去处理,数据读取完成后该工作线程会调用一个计数器对象来累计结果。
但程序运行后,实际得到的计数结果与预期差距很大。
Python 解释器在执行多个线程时会确保这些线程之间的“平等关系”,令它们获得几乎相等的处理时间。这因此需要 Python 时不时地在线程间进行切换,暂时挂起一个正在运行的线程,转而去恢复执行另一个线程。
一个线程甚至有可能在看似符合原子性的操作中间被暂停。
比如 +=
操作符在作用到实例的属性上时,类似这样的代码:1
counter.count += 1
实际上等同于 Python 做出如下所示的三个分开的步骤:1
2
3value = getattr(counter, 'count')
result = value + 1
setattr(counter, 'count', result)
再加上线程切换,就有可能导致出现下面这种情况:1
2
3
4
5
6
7
8
9# Running in Thread A
value_a = getattr(counter, 'count')
# Context switch to Thread B
value_b = getattr(counter, 'count')
result_b = value_b + 1
setattr(counter, 'count', result_b)
# Context switch back to Thread A
result_a = value_a + 1
setattr(counter, 'count', result_a)
即原本应该计算两次的累加操作实际上只有一次生效了,最终导致出现错误的结果。
为避免上述情形中的数据竞争或者其他形式的数据结构损坏现象,可以借助 Lock
类保护特定的值不被多个线程同步访问。即任一时刻都只能有一个线程可以获得该数据的锁。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35from threading import Thread, Lock
class LockingCounter:
def __init__(self):
self.lock = Lock()
self.count = 0
def increment(self, offset):
with self.lock:
self.count += offset
def worker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)
how_many = 10 ** 5
counter = LockingCounter()
threads = []
for i in range(5):
thread = Thread(target=worker,
args=(i, how_many, counter))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
expected = how_many * 5
found = counter.count
print(f'Counter should be {expected}, got {found}')
# Counter should be 500000, got 500000
知识点
- Python 有 GIL,但在编写代码时仍需关注多线程中的数据竞争
- 允许多个线程修改同一个不加锁的对象,有可能会损坏数据结构
Lock
类可以保护多线程中数据的一致性