Python 进阶之并发编程中的多线程

Two events are concurrent if neither can causally affect the other.

从编程的角度讲,某个问题是可并发的,即代表它可以被完全或部分地分解成多个组件,且这几个组件之间是顺序独立的。
换句话说,一个事件被分解成多个相互之间无依赖关系的具体步骤,这些步骤可以独立地被完成,且不管各自完成的顺序如何,都不影响最终的结果。

就像华罗庚先生在《统筹方法》中提到的例子,喝茶需要清洗茶具和烧热水,这是两个相互独立的事件。可以先清洗茶具再烧壶热水,则最终的等待时间是两者独立完成所需的时长之和。
更有效率的方法为,先执行比较耗费时间的烧热水动作,并且在等待热水烧开的同时清洗好茶具,效果类似于两个准备步骤同时执行而不是按顺序依次执行。

在程序的世界中,类似的需求同样屡见不鲜。比如用户点击链接下载一个大的文件,通常会把文件下载任务放在后台执行,使得用户可以继续浏览网页,不需要等待下载任务完成。

多线程

线程是操作系统能够进行运算调度的最小单位,程序设计者可以将其工作划分成多个可同步运行的线程以应对某些场景和需求。不过在单核系统中,多线程并不会加速程序的运行。
而多核或多处理器系统可以将不同的线程分配给多个 CPU 核心同时进行处理,因而能够带来性能上的提升(但 Python 由于 GIL 机制使得这种提升仍有一定的限制)。

在单核系统中,可以通过一种名为时间分片(timeslicing)的机制来实现多线程。即 CPU 可以在多个线程间非常迅速地进行切换,达成一种多个线程“同时”在运行的假象。这种虚拟的并行方式虽然不能带来性能上的提升,但是仍然能够非常好地应对某些特定的需求场景。如:

  • 构建响应式接口:比如将长时间运行的任务放在后台执行,使其等待的过程不会阻塞用户其他的交互动作
  • 任务的分发与委派:对于依赖于第三方资源的进程(比如需要对远程 Web 服务进行大量的请求),多线程可以起到很好的加速效果
  • 构建多用户应用:多用户状态下的多线程类似于独立执行的进程,只不过在某些层面上更加易于管理(共享内存)

示例:多线程请求 Web 数据

requests 获取天气信息

以下是一个简单的 Web 请求示例,通过 requests 模块访问中国天气网
,获取部分城市的天气信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
import requests

def get_weather(cityid):
api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
results = requests.get(api_url)
results.encoding = 'utf-8'
weather_info = results.json()['weatherinfo']
return weather_info

print(get_weather('101210101'))

# => {'city': '杭州', 'cityid': '101210101', 'temp': '24.8', 'WD': '东北风', 'WS': '小于3级', 'SD':
# '81%', 'AP': '1000.3hPa', 'njd': '暂无实况', 'WSE': '<3', 'time': '17:50', 'sm': '2.1', 'isRadar': '1', 'Radar': 'JC_RADAR_AZ9571_JB'}

获取多个城市天气
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import requests
import time

def get_weather(cityid):
api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
results = requests.get(api_url)
results.encoding = 'utf-8'
weather_info = results.json()['weatherinfo']

print("%s (tmp/humi): %s/%s" % (
weather_info['city'],
weather_info['temp'],
weather_info['SD']
))

cityids = (
'101210101', '101010100', '101090201',
'101020100', '101280101', '101230201'
)

def main():
for id in cityids:
get_weather(id)

if __name__ == '__main__':
started = time.time()
main()
elapsed = time.time() - started
print("Time elapsed: {:.2f}s".format(elapsed))

# => 杭州 (tmp/humi): 24.8/81%
# => 北京 (tmp/humi): 27.9/28%
# => 保定 (tmp/humi): 27.5/43%
# => 上海 (tmp/humi): 23.5/80%
# => 广州 (tmp/humi): 26.6/83%
# => 厦门 (tmp/humi): 26.8/87%
# => Time elapsed: 0.16s
多线程获取多个城市的天气

完整代码如下,主要修改了 main 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from threading import Thread
import requests
import time

def get_weather(cityid):
api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
results = requests.get(api_url)
results.encoding = 'utf-8'
weather_info = results.json()['weatherinfo']

print("%s (tmp/humi): %s/%s" % (
weather_info['city'],
weather_info['temp'],
weather_info['SD']
))

cityids = (
'101210101', '101010100', '101090201',
'101020100', '101280101', '101230201'
)

def main():
threads = []
for id in cityids:
thread = Thread(target=get_weather, args=[id])
thread.start()
threads.append(thread)

while threads:
threads.pop().join()

if __name__ == '__main__':
started = time.time()
main()
elapsed = time.time() - started
print("Time elapsed: {:.2f}s".format(elapsed))

# => 保定 (tmp/humi): 27.5/43%
# => 广州 (tmp/humi): 26.6/83%
# => 杭州 (tmp/humi): 24.8/81%
# => 上海 (tmp/humi): 23.5/80%
# => 北京 (tmp/humi): 27.9/28%
# => 厦门 (tmp/humi): 26.8/87%
# => Time elapsed: 0.08s

与前一个版本(在单个线程中顺序地依次请求多个城市的天气信息)相比,此版本通过不同的线程同时请求多个城市的天气信息,每个线程负责一个城市。
虽然多线程在系统资源上增加了一定程度的消耗,但是相对于网络资源响应以及 IO 传输产生的延迟和等待,仍然在效率上有了一定程度的提升。

同时也可以从对输出结果的比较中看出,多线程方案获取到的天气信息并不是按顺序返回的。即表明在请求某个数据时,程序并没有等待上一个请求完全解决,而是在结果返回之前又发起了新的请求。从而在一定程度上减弱了网络延迟对整个程序的阻塞效果。

线程池

一个程序所能运行的线程数量并不是毫无限制的,很多时候需要构建一个固定大小的线程池来处理所有带并行需求的工作任务。这些并行任务可以先存储在一种名为队列(queue)的数据结构中,以先进先出(FIFO)的原则分配给线程池中固定数量的线程去处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from queue import Queue, Empty
from threading import Thread
import requests
import time

THREAD_POOL_SIZE = 4
cityids = (
'101210101', '101010100', '101090201',
'101020100', '101280101', '101230201'
)

def worker(work_queue):
while not work_queue.empty():
try:
item = work_queue.get(block=False)
except Empty:
break
else:
get_weather(item)
work_queue.task_done()

def get_weather(cityid):
api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
results = requests.get(api_url)
results.encoding = 'utf-8'
weather_info = results.json()['weatherinfo']

print("%s (tmp/humi): %s/%s" % (
weather_info['city'],
weather_info['temp'],
weather_info['SD']
))

def main():
work_queue = Queue()

for id in cityids:
work_queue.put(id)

threads = [
Thread(target=worker, args=(work_queue,)) for _ in range(THREAD_POOL_SIZE)
]

for thread in threads:
thread.start()

work_queue.join()

while threads:
threads.pop().join()

if __name__ == '__main__':
started = time.time()
main()
elapsed = time.time() - started
print("Time elapsed: {:.2f}s".format(elapsed))

# => 保定 (tmp/humi): 27.5/43%
# => 杭州 (tmp/humi): 24.8/81%
# => 北京 (tmp/humi): 27.9/28%
# => 上海 (tmp/humi): 23.5/80%
# => 广州 (tmp/humi): 26.6/83%
# => 厦门 (tmp/humi): 26.8/87%
# => Time elapsed: 0.08s

其中 get_weather 函数可以通过 id 值获取对应城市的天气信息,所有需要请求的 id 参数保存在 main 中定义的队列 work_queue 里;
worker 函数是与线程相关联的工作代码,它可以逐个获取队列中存储的 id 参数并传递给 get_weather 函数;
main 函数中的 threads 列表则初始化了固定数量(THREAD_POOL_SIZE)的线程对象,所有的请求任务最终都由这些线程去处理。
队列中的任何一个 id 参数交由任何一个线程处理时都会立即从队列中弹出,因而保证了各线程间的相互独立(同一个任务不会被多个线程获取)。

简单来说,固定数量的线程完成了队列中大量任务的并行处理。可以适当修改线程数量以达到系统资源和执行效率的平衡。

双向队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import time
from queue import Queue, Empty
from threading import Thread
import requests

THREAD_POOL_SIZE = 4
cityids = (
'101210101', '101010100', '101090201',
'101020100', '101280101', '101230201'
)

def get_weather(cityid):
api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
results = requests.get(api_url)
results.encoding = 'utf-8'
weather_info = results.json()['weatherinfo']
return weather_info

def present_result(weather_info):
print("%s (tmp/humi): %s/%s" % (
weather_info['city'],
weather_info['temp'],
weather_info['SD']
))

def worker(work_queue, results_queue):
while not work_queue.empty():
try:
item = work_queue.get(block=False)
except Empty:
break
else:
results_queue.put(
get_weather(item)
)
work_queue.task_done()

def main():
work_queue = Queue()
results_queue = Queue()

for id in cityids:
work_queue.put(id)

threads = [
Thread(target=worker, args=(work_queue, results_queue)) for _ in range(THREAD_POOL_SIZE)
]

for thread in threads:
thread.start()

work_queue.join()

while threads:
threads.pop().join()

while not results_queue.empty():
present_result(results_queue.get())

if __name__ == '__main__':
started = time.time()
main()
elapsed = time.time() - started
print("Time elapsed: {:.2f}s".format(elapsed))

# => 杭州 (tmp/humi): 24.8/81%
# => 北京 (tmp/humi): 27.9/28%
# => 保定 (tmp/humi): 27.5/43%
# => 上海 (tmp/humi): 23.5/80%
# => 广州 (tmp/humi): 26.6/83%
# => 厦门 (tmp/humi): 26.8/87%
# => Time elapsed: 0.08s

与上一个版本的方案不同,此处的程序除了创建包含请求信息的 work_queue 队列外,还创建了一个用于保存返回结果的 results_queue 队列。
即工作线程只用于对远程 API 发起请求并获取结果,而最终结果的整理及打印输出等则交由主线程来处理。此举可以减弱某些无关的操作可能对工作线程产生的不利影响。

错误处理

为了应对请求数据的过程中可能出现的错误,可以在之前代码的基础上添加异常捕获功能。即在 worker 函数中添加 try...except 语句,当执行成功时将返回的结果保存至 results_queue 队列;如有异常发生,则将异常对象保存至 results_queue 队列。
然后在 main 函数中对 results_queue 中的内容进行判断,是直接输出结果还是抛出异常对象。
最终代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import time
from queue import Queue, Empty
from threading import Thread
import requests

THREAD_POOL_SIZE = 4
cityids = (
'101210101', '101010100', '101090201',
'101020100', '101280101', '101230201'
)

def get_weather(cityid):
api_url = 'http://www.weather.com.cn/data/sk/' + cityid + '.html'
results = requests.get(api_url)
results.encoding = 'utf-8'
weather_info = results.json()['weatherinfo']
return weather_info

def present_result(weather_info):
print("%s (tmp/humi): %s/%s" % (
weather_info['city'],
weather_info['temp'],
weather_info['SD']
))

def worker(work_queue, results_queue):
while not work_queue.empty():
try:
item = work_queue.get(block=False)
except Empty:
break
else:
try:
result = get_weather(item)
except Exception as err:
results_queue.put(err)
else:
results_queue.put(result)
finally:
work_queue.task_done()

def main():
work_queue = Queue()
results_queue = Queue()

for id in cityids:
work_queue.put(id)

threads = [
Thread(target=worker, args=(work_queue, results_queue)) for _ in range(THREAD_POOL_SIZE)
]

for thread in threads:
thread.start()

work_queue.join()

while threads:
threads.pop().join()

while not results_queue.empty():
result = results_queue.get()

if isinstance(result, Exception):
raise result
present_result(result)

if __name__ == '__main__':
started = time.time()
main()
elapsed = time.time() - started
print("Time elapsed: {:.2f}s".format(elapsed))

# => 杭州 (tmp/humi): 24.8/81%
# => 北京 (tmp/humi): 27.9/28%
# => 保定 (tmp/humi): 27.5/43%
# => 上海 (tmp/humi): 23.5/80%
# => 广州 (tmp/humi): 26.6/83%
# => 厦门 (tmp/humi): 26.8/87%
# => Time elapsed: 0.07s

参考资料

Expert Python Programming - Second Edition