Celery介绍(一)

celery 是一个用来处理大量消息的分布式系统,在系统内可以实时处理任务,还支持任务调度,本质是一个分布式任务队列。 ### 简易执行流程
在 celery 中有 3 个基本概念:task worker application
application 就是我们的项目,里面包含了定义好的 task,这些task和我们定义的 app 相关联,而 worker 是执行 application的单元,通过命令行来启动,形如celery -A [application] worker ,只需指定 application,它就会执行 我们定义的任务。可以开启多个worker。也可将代码复制到多台机器上,开启多个worker,实现分布式。

再来了解个概念:broker: celery 分布式的核心就在于 broker,每当我们调用task时,就会向broker发一个请求, 这个请求包含 调用的task路径和参数。这样就等于将这个任务 ”入队“,然后根据一些规则,这个任务就会分配给某个worker进行执行。

最简单的 demo

首先要新建一个文件来配置我们的应用,新建一个文件夹 - celery_proj,然后在里面 新建一个 celery_app.py 和 tasks.py。

1
2
3
4
5
6
7
# celery_app.py
from celery import Celery

app = Celery(broker="redis://47.111.175.222:6379/6", include=['celery_proj.tasks'])

if __name__ == '__main__':
    app.worker_main()
1
2
3
4
5
6
# tasks.py
from celery_proj.celery_app import app

@app.task
def add(a, b):
    return a + b

启动:celery -A celery_proj.celery_app worker -l info -P solo
调用:可以新建一个py文件,也可直接在控制台中执行。代码如下:

1
2
3
from celery_proj.tasks import add

add.delay(1,2)

正常的话,我们可以在worker的命令行界面看到任务的执行情况。


总结一下过程:首先我们再 celery_app.py 中定义了一个 celery 应用,主要定义了broker的地址 和 task的路径。 在 tasks.py 中我们定义了一个 task ,即用装饰器将函数注册到我们定义的应用中就是一个task了。至此,一个简单的应用就 配置完了,然后我们需要启动一个 worker 来”消费“任务。先切换地址到项目所在的目录下,执行上面的命令即可。命令的参数 的含义在 worker中会说明。

-P solo 是指启动一个进程。如果我们是在 windows 下执行一定要加这个参数。 因为 celery 并不完全支持 windows 系统,我们在 windows 下使用会碰到各种坑。

上面对 Celery 的整体运行流程有了大致的介绍,下面我们就深入到各个对象中去。

application

application 最主要作用就是 配置,添加配置的方式很多,但一般下面这三种就够了:

  • 在实例化一个 app 时配置,如 我们写的简易 demo
  • 定义一个配置类,如下: ```python from celery import Celery

app = Celery()

class Config: enable_utc = True timezone = ‘Europe/London’

app.config_from_object(Config)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- 从一个模块中定义,app.config_from_object('module.path'),如在 app 的同级目录下新建一个 config.py,那么
我们只需 `app.config_form_object('config')`就行了。  
完整的配置清单可以参考 [celery 配置](http://docs.celeryproject.org/en/latest/userguide/configuration.html)
一个 配置 demo:
```python
# config.py
BROKER_URL = 'redis://127.0.0.1:6379/1'  # 使用Redis作为消息代理

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'  # 把任务结果存在了Redis

# CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案

CELERY_RESULT_SERIALIZER = 'json'  # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显

CELERY_ACCEPT_CONTENT = ['json', 'msgpack']  # 指定接受的内容类型

最后,千万别忘了使用 include=['tasks.path'] 参数关联 task 到 app。

task

task 可以说是这里面最有趣也最复杂的。 task 是 celery 应用的基础,它扮演着两个角色:当任务被调用时我们可以做什么事情 和 任务交给 worker 执行时 worker 可以做什么。我们定义一个任务就是定义这些行为。

定义 task

通过前面的 demo 可以看到,一个 最简单的 task 只有我们定义的业务逻辑,我们并不能获取其他的有关 Task 的属性和方法, 意味着我们就不能定义(控制) 其他的行为。其实在我们将一个方法注册为 task 的装饰器是可以传参的,我们可以添加一个参数 @app.task(bound=self) ,这样我们通过 self 就可以获取 这个 task 的实例,进而就可以获取我们想要的信息了。 更直接的办法是: 我们直接继承 celery.Task 这个类来自定义,然后在装饰器中指定即可@app.task(base=CustomTask)。这种方案 非常灵活且扩展性高。

下面来具体看看一些主要的属性和方法。

  1. Task.request, 这个属性是 类属性,指向另一个类:celery.worker.request.Request, 里面包含了有关于此次任务的信息和任务状态。这些属性就不一一列举了,可以到 文档查阅。 这个 Request 类我们也可以重载的,非常灵活。

  2. Task.retry(),这是一个实例方法,可以在发生错误或则我们自定义的情况下重新执行任务,并且任务会发送到相同的队列, 任务的状态也会记录此次 重试记录,以便于我们追踪。
    但是,当我们调用这个方法同时,我们也 抛出(raise) 了一个 Retry 类型的”异常“,此后的代码不会执行。官方解释说,这个 ”异常“类似于 断言 ,是用来给 worker 通知说这个任务要重试,这样才能保存正确的任务状态。
    我们可以设置 重试延迟,也就是延迟多久后重试,最大重试次数等,我们可以在 装饰器参数中设置:@app.task(retry_kwargs={}) 参数中的的 自动重试设置(autoretry_for) 也是很强大的功能,有兴趣可以了解。
    关于 task 的其他配置选项可以参考文档

  3. Task.state,我们可以通过这个属性追踪任务的当前状态,前提是需要配置 结果后端(result backend),celery 内置了四种状态, 并且可以利用 实例的 Task.update_state() 方法更新状态以及 状态元数据(state meta-data)。同时我们也可自定义状态名。

  4. 在 Task 类中有 四个 事件回调方法,分别是 after_return on_failure on_success on_retry,在需要时可以定义。

调用 task

总共有 3 种调用方式:

  1. task_func.delay(),只能添加函数的参数
  2. task_func.apply_async(),除了函数的参数,还可以添加 很多选项
  3. task_func(),直接使用函数,这样只会本地执行,和一般的函数执行方式相同

下面默认以 apply_async 这种调用方式。

  • 延时执行
    countdown=5,5 秒后执行 eta = datetime.datetime.now() + datetime.timedelta(days=1) 一天后执行

  • 过期限制
    expires = datetime.datetime.now() + datetime.timedelta(days=1) 一天后任务过期,这样当 worker 从队列中获取 任务信息时,如果过期,就会将任务状态置为 REVOKED

  • 消息序列化方式
    serializer = ‘json’,可选的有:json pickle yaml msgpack

  • 消息压缩方式
    compression= ‘brotli’ ,可选的有很多,但是大都需要安装 python 环境时同时编译安装相应的包

  • 消息执行队列
    queue = ‘queue_name’,可以指定消息的传递的队列

  • 忽略结果
    ignore_result=False,可以选择是否存储执行结果。需要 result backend 的支持。

另外一个很有用的可选项是任务状态变化的回调,当将一个任务执行后,需要监控状态的变化,这时就可以使用 on_message 这个 回调功能了:

1
2
3
4
5
6
7
8
@app.task(bind=True)
def hello(self, a, b):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 50})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 90})
    time.sleep(1)
    return 'hello world: %i' % (a+b)
1
2
3
4
5
def on_raw_message(body):
    print(body)

r = hello.apply_async()
print(r.get(on_message=on_raw_message, propagate=False))

每当任务状态有变化时,就会触发回调函数。