Celery 是一个简单、灵活且可靠的分布式消息处理系统,主要用来作为任务队列对海量消息数据进行实时的处理,在多个程序线程或者主机之间传递和分发工作任务。同时也支持计划任务等需求。
一、环境配置
Celery 框架自身并不对传入的消息进行存储,因此在使用前需要先安装第三方的 Message Broker。如 RabbitMQ 和 Redis 等。
安装 RabbitMQ
对于 Linux 系统,执行以下命令:1
2
3
4
5 sudo apt-get install rabbitmq-server # 安装 RabbitMQ
sudo rabbitmqctl add_user myuser mypassword # 添加用户 myuser/mypassword
sudo rabbitmqctl add_vhost myvhost # 添加 vhost
sudo rabbitmqctl set_user_tags myuser mytag
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*" # 为用户 myuser 设置访问 myvhost 的权限
通过 Docker 安装的步骤如下:1
2
3
4
5
6$ docker pull rabbitmq:3.8-management # 拉取 docker 镜像(包含 web 管理)
# 启动 rabbitmq 容器
$ docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname myRabbit \
-e RABBITMQ_DEFAULT_VHOST=myvhost \
-e RABBITMQ_DEFAULT_USER=myuser \
-e RABBITMQ_DEFAULT_PASS=mypassword rabbitmq:3.8-management
安装 Redis
$ sudo apt-get install redis-server
安装 Celery
$ pip install celery
二、创建 Celery 应用
Celery 应用是该框架所能提供的所有功能(如管理 tasks 和 workers 等)的入口,须确保它可以被其他模块导入。
以下是一段简单的 Celery app 代码 tasks.py
:1
2
3
4
5
6
7
8
9
10# tasks.py
from celery import Celery
app = Celery('tasks',
broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',
backend='redis://localhost:6379/0')
def add(x, y):
return x + y
使用 RabbitMQ 作为 broker 接收和发送任务消息,使用 Redis 作为 backend 存储计算结果。
运行 Celery worker 服务
$ celery -A tasks worker --loglevel=info
1 | celery -A tasks worker --loglevel=info |
任务测试
进入 Python Shell,执行以下命令发布任务并获取结果:1
2
3
4
5
6
7
8
9
10from tasks import add
4, 4) result = add.delay(
result
<AsyncResult: 6f435bc7-f194-469c-837f-54d77f880ace>
result.ready()
True
result.get()
8
result.traceback
>>>
delay()
方法用于发布任务消息,它是 apply_async()
方法的简写,即以异步的方式将任务需求提交给前面启动好的 worker 去处理。delay()
方法返回一个 AsyncResult
对象。result.ready()
方法可以用来检查提交的任务是否已经完成,返回布尔值。
result.get()
方法则用于获取执行完成后的结果。如任务未完成,则程序会一直等待直到有结果返回。因此该方法是阻塞的,并不常用。可以传入 timeout
参数指定等待的时间上限。
如 result.get(timeout=1)
,尝试获取任务执行后的结果,等待 1 秒。若 1 秒之后结果仍未返回,抛出 celery.exceptions.TimeoutError: The operation timed out.
异常。
如果任务执行过程中有抛出异常,则使用 get()
方法获取结果时会重新抛出该异常导致程序中断。可以通过修改 propagate
参数避免此情况:result.get(propagate=False)
result.traceback
则用于获取任务的 traceback 信息。
三、Calling Tasks
Celery 定义了一些可供 task 实例调用的通用的 Calling API,包括三个方法和一些标准的执行选项:
apply_async(args[, kwargs[, ...]])
:发送任务消息给 workerdelay(*args, **kwargs)
:发送任务消息的简写形式,不支持执行选项calling (__call__)
:即在本地进程中直接执行任务函数,不通过 worker 异步执行
以下是一些常见的调用示例:
T.delay(arg, kwarg=value)
T.apply_async((arg,), {'kwarg': value})
T.apply_async(countdown=10)
10 秒之后开始执行某个任务T.apply_async(eta=now + timedelta(seconds=10))
10 秒之后开始执行某个任务T.apply_async(countdown=60, expires=120)
预计 1 分钟后开始执行,但 2 分钟后还未执行则失效T.apply_async(expires=now + timedelta(days=2))
2 天后失效
通过 countdown
设置任务的延迟执行:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15from tasks import add
2, 3)) result = add.apply_async((
result.get()
5
2, 3), countdown=15) delay_result = add.apply_async((
delay_result.ready()
False
delay_result.ready()
False
delay_result.ready()
False
delay_result.ready()
True
delay_result.get()
5
还可以通过 eta
(estimated time of arrival) 设置延迟执行的时间:1
2
3
4from datetime import datetime, timedelta
1) tomorrow = datetime.utcnow() + timedelta(days=
2, 3), eta=tomorrow) add.apply_async((
<AsyncResult: c7dc6d7f-8b87-49d1-8077-73d7f046d709>
此时 worker 在命令行的日志输出如下:1
2[2019-11-06 05:16:21,362: INFO/MainProcess] Received task: tasks.add[c7dc6d7f-8b87-49d1-8077-73d7f046d709]
ETA:[2019-11-07 05:16:06.652736+00:00]
四、计划任务
Celery 允许像使用 crontab
那样按计划地定时执行某个任务。参考代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19# tasks.py
from celery import Celery
app = Celery('tasks',
broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',
backend='redis://localhost:6379/1')
app.conf.beat_schedule = {
'add-every-60-seconds': {
'task': 'tasks.add',
'schedule': 60.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'
def add(x, y):
print(x + y)
运行 celery -A tasks worker -B
启动 worker 服务。-B
选项表示 beat
,即 celery beat 服务,负责执行计划任务。
输出如下(每隔一分钟执行一次):1
2
3
4
5
6
7
8 celery -A tasks worker -B
...
[2019-11-06 05:41:34,057: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:42:33,998: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:43:34,056: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:44:34,105: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:45:34,157: WARNING/ForkPoolWorker-3] 32
...
同时 Celery 也支持更复杂的 crontab
类型的时间规划:1
2
3
4
5
6
7
8
9
10from celery.schedules import crontab
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
Crontab 表达式支持的语法如下:
Example | Meaning |
---|---|
crontab() |
每分钟执行一次 |
crontab(minute=0, hour=0) |
每天半夜 0 点执行 |
crontab(minute=0, hour='*/3') |
每隔 3 小时执行一次(从 0 时开始) |
crontab(minute=0, hour='0,3,6,9,12,15,18,21') |
同上一条 |
crontab(day_of_week='sunday') |
只在周日执行,每隔一分钟执行一次 |
crontab(minute='*', hour='*', day_of_week='sun') |
同上一条 |
crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') |
只在周四、周五的 3、17、22 时执行,每隔 10 分钟执行一次 |
crontab(minute=0, hour='*/2,*/3') |
只在能被 2 或者 3 整除的整点执行 |
crontab(minute=0, hour='*/3,8-17') |
在能被 3 整除的整点,和 8-17 点之间的整点执行 |
crontab(0, 0, day_of_month='2') |
在每个月的第二天的 0 时执行 |
crontab(0, 0, day_of_month='11', month_of_year='5') |
在每年的 5 月 11 号 0 点执行 |