Python 通过 Celery 框架实现分布式任务队列

Celery 是一个简单、灵活且可靠的分布式消息处理系统,主要用来作为任务队列对海量消息数据进行实时的处理,在多个程序线程或者主机之间传递和分发工作任务。同时也支持计划任务等需求。

一、环境配置

Celery 框架自身并不对传入的消息进行存储,因此在使用前需要先安装第三方的 Message Broker。如 RabbitMQRedis 等。

安装 RabbitMQ

对于 Linux 系统,执行以下命令:

1
2
3
4
5
$ sudo apt-get install rabbitmq-server    # 安装 RabbitMQ
$ sudo rabbitmqctl add_user myuser mypassword # 添加用户 myuser/mypassword
$ sudo rabbitmqctl add_vhost myvhost # 添加 vhost
$ sudo rabbitmqctl set_user_tags myuser mytag
$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*" # 为用户 myuser 设置访问 myvhost 的权限

通过 Docker 安装的步骤如下:

1
2
3
4
5
6
$ docker pull rabbitmq:3.8-management    # 拉取 docker 镜像(包含 web 管理)
# 启动 rabbitmq 容器
$ docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname myRabbit \
-e RABBITMQ_DEFAULT_VHOST=myvhost \
-e RABBITMQ_DEFAULT_USER=myuser \
-e RABBITMQ_DEFAULT_PASS=mypassword rabbitmq:3.8-management

安装 Redis

$ sudo apt-get install redis-server

安装 Celery

$ pip install celery

二、创建 Celery 应用

Celery 应用是该框架所能提供的所有功能(如管理 tasks 和 workers 等)的入口,须确保它可以被其他模块导入。
以下是一段简单的 Celery app 代码 tasks.py

1
2
3
4
5
6
7
8
9
10
# tasks.py
from celery import Celery

app = Celery('tasks',
broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',
backend='redis://localhost:6379/0')

@app.task
def add(x, y):
return x + y

使用 RabbitMQ 作为 broker 接收和发送任务消息,使用 Redis 作为 backend 存储计算结果。

运行 Celery worker 服务

$ celery -A tasks worker --loglevel=info

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ celery -A tasks worker --loglevel=info

-------------- celery@skitarniu-ubuntu18 v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-4.15.0-60-generic-x86_64-with-debian-buster-sid 2019-11-01 07:21:34
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f4f30b84a90
- ** ---------- .> transport: amqp://myuser:**@localhost:5672/myvhost
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. tasks.add

[2019-11-01 07:21:35,316: INFO/MainProcess] Connected to amqp://myuser:**@127.0.0.1:5672/myvhost
[2019-11-01 07:21:35,367: INFO/MainProcess] mingle: searching for neighbors
[2019-11-01 07:21:36,535: INFO/MainProcess] mingle: all alone
[2019-11-01 07:21:36,782: INFO/MainProcess] celery@skitarniu-ubuntu18 ready.
任务测试

进入 Python Shell,执行以下命令发布任务并获取结果:

1
2
3
4
5
6
7
8
9
10
>>> from tasks import add
>>> result = add.delay(4, 4)
>>> result
<AsyncResult: 6f435bc7-f194-469c-837f-54d77f880ace>
>>> result.ready()
True
>>> result.get()
8
>>> result.traceback
>>>

delay() 方法用于发布任务消息,它是 apply_async() 方法的简写,即以异步的方式将任务需求提交给前面启动好的 worker 去处理。delay() 方法返回一个 AsyncResult 对象。
result.ready() 方法可以用来检查提交的任务是否已经完成,返回布尔值。

result.get() 方法则用于获取执行完成后的结果。如任务未完成,则程序会一直等待直到有结果返回。因此该方法是阻塞的,并不常用。可以传入 timeout 参数指定等待的时间上限。
result.get(timeout=1),尝试获取任务执行后的结果,等待 1 秒。若 1 秒之后结果仍未返回,抛出 celery.exceptions.TimeoutError: The operation timed out. 异常。

如果任务执行过程中有抛出异常,则使用 get() 方法获取结果时会重新抛出该异常导致程序中断。可以通过修改 propagate 参数避免此情况:
result.get(propagate=False)
result.traceback 则用于获取任务的 traceback 信息。

三、Calling Tasks

Celery 定义了一些可供 task 实例调用的通用的 Calling API,包括三个方法和一些标准的执行选项:

  • apply_async(args[, kwargs[, ...]]):发送任务消息给 worker
  • delay(*args, **kwargs):发送任务消息的简写形式,不支持执行选项
  • calling (__call__):即在本地进程中直接执行任务函数,不通过 worker 异步执行

以下是一些常见的调用示例:

  • T.delay(arg, kwarg=value)
  • T.apply_async((arg,), {'kwarg': value})
  • T.apply_async(countdown=10)
    10 秒之后开始执行某个任务
  • T.apply_async(eta=now + timedelta(seconds=10))
    10 秒之后开始执行某个任务
  • T.apply_async(countdown=60, expires=120)
    预计 1 分钟后开始执行,但 2 分钟后还未执行则失效
  • T.apply_async(expires=now + timedelta(days=2))
    2 天后失效

通过 countdown 设置任务的延迟执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> from tasks import add
>>> result = add.apply_async((2, 3))
>>> result.get()
5
>>> delay_result = add.apply_async((2, 3), countdown=15)
>>> delay_result.ready()
False
>>> delay_result.ready()
False
>>> delay_result.ready()
False
>>> delay_result.ready()
True
>>> delay_result.get()
5

还可以通过 eta(estimated time of arrival) 设置延迟执行的时间:

1
2
3
4
>>> from datetime import datetime, timedelta
>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 3), eta=tomorrow)
<AsyncResult: c7dc6d7f-8b87-49d1-8077-73d7f046d709>

此时 worker 在命令行的日志输出如下:

1
2
[2019-11-06 05:16:21,362: INFO/MainProcess] Received task: tasks.add[c7dc6d7f-8b87-49d1-8077-73d7f046d709]
ETA:[2019-11-07 05:16:06.652736+00:00]

四、计划任务

Celery 允许像使用 crontab 那样按计划地定时执行某个任务。参考代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# tasks.py
from celery import Celery

app = Celery('tasks',
broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',
backend='redis://localhost:6379/1')

app.conf.beat_schedule = {
'add-every-60-seconds': {
'task': 'tasks.add',
'schedule': 60.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'

@app.task
def add(x, y):
print(x + y)

运行 celery -A tasks worker -B 启动 worker 服务。
-B 选项表示 beat,即 celery beat 服务,负责执行计划任务。

输出如下(每隔一分钟执行一次):

1
2
3
4
5
6
7
8
$ celery -A tasks worker -B
...
[2019-11-06 05:41:34,057: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:42:33,998: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:43:34,056: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:44:34,105: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:45:34,157: WARNING/ForkPoolWorker-3] 32
...

同时 Celery 也支持更复杂的 crontab 类型的时间规划:

1
2
3
4
5
6
7
8
9
10
from celery.schedules import crontab

app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}

Crontab 表达式支持的语法如下:

Example Meaning
crontab() 每分钟执行一次
crontab(minute=0, hour=0) 每天半夜 0 点执行
crontab(minute=0, hour='*/3') 每隔 3 小时执行一次(从 0 时开始)
crontab(minute=0, hour='0,3,6,9,12,15,18,21') 同上一条
crontab(day_of_week='sunday') 只在周日执行,每隔一分钟执行一次
crontab(minute='*', hour='*', day_of_week='sun') 同上一条
crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') 只在周四、周五的 3、17、22 时执行,每隔 10 分钟执行一次
crontab(minute=0, hour='*/2,*/3') 只在能被 2 或者 3 整除的整点执行
crontab(minute=0, hour='*/3,8-17') 在能被 3 整除的整点,和 8-17 点之间的整点执行
crontab(0, 0, day_of_month='2') 在每个月的第二天的 0 时执行
crontab(0, 0, day_of_month='11', month_of_year='5') 在每年的 5 月 11 号 0 点执行

参考资料

Celery 4.3.0 documentation