Keep learning, keep living...

0%

Celery实例介绍

Celery是一个基于消息传递的分布式任务队列。与单纯的消息队列相比较,队列中传递的消息为任务(Task/Job)信息,而不是数据类信息,Celery基于这些消息完成了任务信息的管理。实现这种任务管理的相似项目还有Gearman,只不过这个项目已经很久未更新了,更推荐使用Celery。

Celery支持同步和异步执行两种模式。同步模式为任务调用方等待任务执行完成,这种方式等同于RPC(Remote Procedure Call), 异步方式为任务在后台执行,调用方调用后就去做其他工作,之后再根据需要来查看任务结果。Celery自己没有实现消息队列,而是直接已存在的消息队列作为Broker角色。官方推荐的Broker为RabbitMQ,除此之外,Redis、Beanstalkd、MongoDB等也都支持,具体可参考官方文档

Celery整体架构可以理解为下图:

整体上包括三个角色:

  • Celery client: 这是任务生产者,它负责将任务发送到Broker中。
  • Broker: Broker负责将任务分发给相应的celery worker。
  • Celery worker: 这是任务的执行者,完成相应的业务逻辑,在具体实现上体现为Python函数。

下面我们通过实例来说明Celery用法。

首先,准备好Broker环境, 这里使用RabbitMQ。RabbitMQ安装及启动完成后, 首先创建必要的vhost, user并设置相应权限:

1
2
3
4
rabbitmqctl add_vhost demo1
rabbitmqctl add_user demo demo
rabbitmqctl set_user_tags demo demotag
rabbitmqctl rabbitmqctl set_permissions -p demo1 demo ".*" ".*" ".*"

创建celery worker部分代码文件:demo1.py,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from celery import Celery
import time

app = Celery('demo1',
broker='amqp://demo:demo@localhost:5672/demo1',
backend='rpc://')

@app.task
def add(x, y):
time.sleep(2)
return x + y

@app.task
def sub(x, y):
time.sleep(2)
return x - y

demo1中实现了两个task,addsub两个函数。@app.task修饰符告诉Celery这个函数并不在celery client端执行,当它们被调用时只是将调用信息通过Brocker发送给celery workers执行。backend参数表示celery workers执行完的结果需要保存,rpc表示结果将通过RPC(Remote Procedure Call)模式被送到RabbitMQ。如果不指定backend参数,任务结果将被丢弃。

接着,准备celery client端代码。创建client.py文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from celery import Celery
import demo1

import time

app1 = Celery('demo1',
broker='amqp://demo:demo@localhost:5672/demo1',
backend='rpc://')

x = demo1.add.delay(1, 2)
print "Ready? ", x.ready()
print "Result: ", x.result

time.sleep(3)

print "Ready? ", x.ready()
print "Result: ", x.result

接下来,启动celery worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
[root@centos1 celery]# celery  -A demo1 worker --loglevel=info
/usr/lib/python2.7/site-packages/celery/platforms.py:796: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

uid=uid, euid=euid, gid=gid, egid=egid,

-------------- celery@centos1 v4.2.0 (windowlicker)
---- **** -----
--- * *** * -- Linux-3.10.0-229.14.1.el7.x86_64-x86_64-with-centos-7.1.1503-Core 2018-08-14 06:54:40
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: demo1:0x2af55d0
- ** ---------- .> transport: amqp://demo:**@localhost:5672/demo1
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. demo1.add
. demo1.sub

[2018-08-14 06:54:40,230: INFO/MainProcess] Connected to amqp://demo:**@127.0.0.1:5672/demo1
[2018-08-14 06:54:40,249: INFO/MainProcess] mingle: searching for neighbors
[2018-08-14 06:54:41,282: INFO/MainProcess] mingle: all alone
[2018-08-14 06:54:41,298: INFO/MainProcess] celery@centos1 ready.

执行celery client:

1
2
3
4
5
[root@centos1 celery]# python client.py
Ready? False
Result: None
Ready? True
Result: 3

从结果可以看到,第一次调用x.ready()查看任务是否完成时返回值为False, 任务结果x.result还没有生成。延迟3秒之后,再次调用,此时任务已经执行完成,通过x.result获取到了任务结果。

观察worker的日志输出,可以看到任务执行成功:

1
2
[2018-08-14 06:56:06,695: INFO/MainProcess] Received task: demo1.add[ceef3d4f-c546-4853-bbf6-3e4c050a9957]
[2018-08-14 06:56:08,734: INFO/ForkPoolWorker-2] Task demo1.add[ceef3d4f-c546-4853-bbf6-3e4c050a9957] succeeded in 2.036982472s: 3

在上述例子中,尽管celery client并不需要执行任务本身实现的相关代码,但还是需要import任务实现模块。这在一些场景下并不合适。Celery还提供了直接通过AMQP协议的方式来进行任务相关操作,比如,可以使用app.send_task函数来触发任务执行, 具体参考文档

我们将client.py修改为如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery import Celery
import time

app1 = Celery('demo1',
broker='amqp://demo:demo@localhost:5672/demo1',
backend='rpc://')

x = app1.send_task('demo1.add', args=[1, 2], kwargs={})
print "Ready? ", x.ready()
print "Result: ", x.result

time.sleep(3)

print "Ready? ", x.ready()
print "Result: ", x.result

再次执行client, 结果与之前一致:

1
2
3
4
5
[root@centos1 celery]# python client.py
Ready? False
Result: None
Ready? True
Result: 3

除了上述的异步任务,Celery还支持周期性任务(Periodic Tasks)。这种任务依赖另一个celery beat进程根据配置周期性地将任务发往broker。

我们设置每10秒钟执行一次demo1.add任务, 这需要在demo1.py中添加如下代码:

1
2
3
4
5
6
7
app.conf.beat_schedule = {
‘add_every_10_seconds': {
'task': 'demo1.add',
'schedule': 10.0,
'args': (1, 2),
},
}

重新启动celery worker, 并启动celery beat:

1
2
3
4
5
6
7
8
9
10
11
[root@centos1 celery]# celery -A demo1 beat
celery beat v4.2.0 (windowlicker) is starting.
__ - ... __ - _
LocalTime -> 2018-08-14 08:02:05
Configuration ->
. broker -> amqp://demo:**@localhost:5672/demo1
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)

我们可以从worker日志中看到周期性的任务执行:

1
2
3
4
5
6
7
8
[2018-08-14 08:12:05,666: INFO/MainProcess] Received task: demo1.add[4d08d394-2d19-48ce-bb4b-d39c58f42f47]
[2018-08-14 08:12:07,669: INFO/ForkPoolWorker-2] Task demo1.add[4d08d394-2d19-48ce-bb4b-d39c58f42f47] succeeded in 2.002474438s: 3
[2018-08-14 08:12:35,686: INFO/MainProcess] Received task: demo1.add[55837f1d-b7dc-4c08-965b-215d92132394]
[2018-08-14 08:12:37,692: INFO/ForkPoolWorker-2] Task demo1.add[55837f1d-b7dc-4c08-965b-215d92132394] succeeded in 2.003697262s: 3
[2018-08-14 08:13:05,704: INFO/MainProcess] Received task: demo1.add[8d8591de-52cd-4271-b079-5f94b7f41af1]
[2018-08-14 08:13:07,709: INFO/ForkPoolWorker-2] Task demo1.add[8d8591de-52cd-4271-b079-5f94b7f41af1] succeeded in 2.003211596s: 3
[2018-08-14 08:13:35,725: INFO/MainProcess] Received task: demo1.add[1f546d46-fd5d-4572-9712-5848459a4cea]
[2018-08-14 08:13:37,730: INFO/ForkPoolWorker-2] Task demo1.add[1f546d46-fd5d-4572-9712-5848459a4cea] succeeded in 2.003228644s: 3

除了上边这种简单的执行频率设置,Celery还支持以crontab的形式来设定执行时间,这种方式需要注意时区的设置, 具体的参数参考文档

下面我们以crontab有形式设置每分钟执行一次demo1.add, 将demo1.py修改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from celery import Celery
from celery.schedules import crontab

import time

app = Celery('demo1',
broker='amqp://demo:demo@localhost:5672/demo1',
backend='rpc://')

app.conf.beat_schedule = {
'every_minute': {
'task': 'demo1.add',
'schedule': crontab(minute='*/1'),
'args': (1, 2),
},
}

@app.task
def add(x, y):
time.sleep(2)
return x + y

@app.task
def sub(x, y):
time.sleep(2)
return x - y

重启worker和beat, 观察worker的执行日志, 可以看到约每分钟执行了一次任务:

1
2
3
4
5
6
[2018-08-14 08:42:18,011: INFO/MainProcess] Received task: demo1.add[b532debb-49b4-4b9a-ac83-a3dad6ebe68c]
[2018-08-14 08:42:20,032: INFO/ForkPoolWorker-2] Task demo1.add[b532debb-49b4-4b9a-ac83-a3dad6ebe68c] succeeded in 2.019885588s: 3
[2018-08-14 08:43:00,039: INFO/MainProcess] Received task: demo1.add[9fa61f49-5bcb-47dc-9d18-7c921fd5ed18]
[2018-08-14 08:43:02,044: INFO/ForkPoolWorker-2] Task demo1.add[9fa61f49-5bcb-47dc-9d18-7c921fd5ed18] succeeded in 2.003939354s: 3
[2018-08-14 08:44:00,056: INFO/MainProcess] Received task: demo1.add[cd1f0f5b-9bea-4764-ad42-9a62102d419f]
[2018-08-14 08:44:02,062: INFO/ForkPoolWorker-2] Task demo1.add[cd1f0f5b-9bea-4764-ad42-9a62102d419f] succeeded in 2.00425067799s: 3

从beat的debug级别的日志中也可以看调度信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[root@centos1 celery]# celery -A demo1 beat --loglevel=debug
celery beat v4.2.0 (windowlicker) is starting.
__ - ... __ - _
LocalTime -> 2018-08-14 08:42:46
Configuration ->
. broker -> amqp://demo:**@localhost:5672/demo1
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%DEBUG
. maxinterval -> 5.00 minutes (300s)
[2018-08-14 08:42:46,250: DEBUG/MainProcess] Setting default socket timeout to 30
[2018-08-14 08:42:46,251: INFO/MainProcess] beat: Starting...
[2018-08-14 08:42:46,259: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: every_minute demo1.add(1, 2) <crontab: */1 * * * * (m/h/d/dM/MY)>
[2018-08-14 08:42:46,259: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2018-08-14 08:42:46,259: DEBUG/MainProcess] beat: Waking up in 13.73 seconds.
[2018-08-14 08:43:00,003: DEBUG/MainProcess] beat: Synchronizing schedule...
[2018-08-14 08:43:00,024: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2018 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@centos1', 'platform': 'Erlang/OTP 21.0.3', 'version': '3.7.7'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US']
[2018-08-14 08:43:00,027: INFO/MainProcess] Scheduler: Sending due task every_minute (demo1.add)
[2018-08-14 08:43:00,030: DEBUG/MainProcess] using channel_id: 1
[2018-08-14 08:43:00,031: DEBUG/MainProcess] Channel open
[2018-08-14 08:43:00,037: DEBUG/MainProcess] demo1.add sent. id->9fa61f49-5bcb-47dc-9d18-7c921fd5ed18
[2018-08-14 08:43:00,037: DEBUG/MainProcess] beat: Waking up in 59.95 seconds.
[2018-08-14 08:44:00,051: INFO/MainProcess] Scheduler: Sending due task every_minute (demo1.add)
[2018-08-14 08:44:00,054: DEBUG/MainProcess] demo1.add sent. id->cd1f0f5b-9bea-4764-ad42-9a62102d419f
[2018-08-14 08:44:00,054: DEBUG/MainProcess] beat: Waking up in 59.94 seconds.

在实际应用场景中,我们往往需要关注任务的执行状态及信息统计。Celery提供了一些命令可以来监控worker和tasks的状态。

status命令可以查看worker列表:

1
2
3
4
[root@centos1 celery]# celery -A demo1 status
celery@centos1: OK

1 node online.

inspect active命令可以查看worker正在执行的任务,没有任务时显示:

1
2
3
[root@centos1 celery]# celery -A demo1 inspect active
-> celery@centos1: OK
- empty -

当worker执行任务时,显示出了正在执行的任务信息:

1
2
3
[root@centos1 celery]# celery -A demo1 inspect active
-> celery@centos1: OK
* {u'args': u'[1, 2]', u'time_start': 1534227677.0447896, u'name': u'demo1.add', u'delivery_info': {u'priority': 0, u'redelivered': False, u'routing_key': u'celery', u'exchange': u''}, u'hostname': u'celery@centos1', u'acknowledged': True, u'kwargs': u'{}', u'type': u'demo1.add', u'id': u'92fd8829-b1a8-4239-95ac-aeabca724bc5', u'worker_pid': 25917}

inspect status命令可以获取worker的统计信息,它输出的信息非常丰富,具体可以参考文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
[root@centos1 celery]# celery -A demo1 inspect stats
-> celery@centos1: OK
{
"broker": {
"alternates": [],
"connect_timeout": 4,
"failover_strategy": "round-robin",
"heartbeat": 120.0,
"hostname": "127.0.0.1",
"insist": false,
"login_method": "AMQPLAIN",
"port": 5672,
"ssl": false,
"transport": "amqp",
"transport_options": {},
"uri_prefix": null,
"userid": "demo",
"virtual_host": "demo1"
},
"clock": "343",
"pid": 25909,
"pool": {
"max-concurrency": 2,
"max-tasks-per-child": "N/A",
"processes": [
25916,
25917
],
"put-guarded-by-semaphore": false,
"timeouts": [
0,
0
],
"writes": {
"all": "100.00%",
"avg": "100.00%",
"inqueues": {
"active": 0,
"total": 2
},
"raw": "1",
"strategy": "fair",
"total": 1
}
},
"prefetch_count": 8,
"rusage": {
"idrss": 0,
"inblock": 88,
"isrss": 0,
"ixrss": 0,
"majflt": 1,
"maxrss": 31160,
"minflt": 15953,
"msgrcv": 0,
"msgsnd": 0,
"nivcsw": 20,
"nsignals": 0,
"nswap": 0,
"nvcsw": 502,
"oublock": 8,
"stime": 0.509435,
"utime": 0.327057
},
"total": {
"demo1.add": 1
}
}

除此之外,Celery的一个关联项目flower提供了基于WEB的实时监控。

可以直接使用pip安装flower:

1
pip install flower

启动flower, 默认情况下,flower监听127.0.0.1:5555, 我们可以通过address参数来设定监听地址:

1
celery -A demo1 flower --address=0.0.0.0 --port=5555

使用浏览器访问flower的地址, 可以看到flower的Dashboard页面:

flower不只提供了监控能力,还提供了执行任务的REST API。我们可以直接通过HTTP请求来提交任务, 比如:

1
2
[root@centos1 celery]# curl -X POST -d '{"args":[1,2]}' 'http://127.0.0.1:5555/api/task/async-apply/demo1.add?refresh=True'
{"task-id": "fde316ea-7aed-4a85-8255-06935c5b015a", "state": "PENDING”}

但这些API的稳定性还有些问题,不建议生产环境使用。具体API细节请参考文档