celery 是一个用来处理大量消息的分布式系统,在系统内可以实时处理任务,还支持任务调度,本质是一个分布式任务队列。 ### 简易执行流程
在 celery 中有 3 个基本概念:task
worker
application
application
就是我们的项目,里面包含了定义好的task
,这些task
和我们定义的 app 相关联,而worker
是执行application
的单元,通过命令行来启动,形如celery -A [application] worker
,只需指定 application,它就会执行 我们定义的任务。可以开启多个worker
。也可将代码复制到多台机器上,开启多个worker
,实现分布式。
再来了解个概念:broker
: celery 分布式的核心就在于 broker
,每当我们调用task
时,就会向broker
发一个请求,
这个请求包含 调用的task
路径和参数。这样就等于将这个任务 ”入队“,然后根据一些规则,这个任务就会分配给某个worker
进行执行。
最简单的 demo
首先要新建一个文件来配置我们的应用,新建一个文件夹 - celery_proj,然后在里面 新建一个 celery_app.py 和 tasks.py。
1 |
|
1 |
|
启动:celery -A celery_proj.celery_app worker -l info -P solo
调用:可以新建一个py
文件,也可直接在控制台中执行。代码如下:
1 |
|
正常的话,我们可以在worker
的命令行界面看到任务的执行情况。
总结一下过程:首先我们再 celery_app.py 中定义了一个 celery 应用,主要定义了broker
的地址 和 task
的路径。
在 tasks.py 中我们定义了一个 task
,即用装饰器将函数注册到我们定义的应用中就是一个task
了。至此,一个简单的应用就
配置完了,然后我们需要启动一个 worker
来”消费“任务。先切换地址到项目所在的目录下,执行上面的命令即可。命令的参数
的含义在 worker
中会说明。
-P solo 是指启动一个进程。如果我们是在 windows 下执行一定要加这个参数。 因为 celery 并不完全支持 windows 系统,我们在 windows 下使用会碰到各种坑。
上面对 Celery 的整体运行流程有了大致的介绍,下面我们就深入到各个对象中去。
application
application
最主要作用就是 配置,添加配置的方式很多,但一般下面这三种就够了:
- 在实例化一个 app 时配置,如 我们写的简易 demo
- 定义一个配置类,如下: ```python from celery import Celery
app = Celery()
class Config: enable_utc = True timezone = ‘Europe/London’
app.config_from_object(Config)
1 |
|
最后,千万别忘了使用 include=['tasks.path']
参数关联 task
到 app。
task
task 可以说是这里面最有趣也最复杂的。 task 是 celery 应用的基础,它扮演着两个角色:当任务被调用时我们可以做什么事情 和 任务交给 worker 执行时 worker 可以做什么。我们定义一个任务就是定义这些行为。
定义 task
通过前面的 demo 可以看到,一个 最简单的 task 只有我们定义的业务逻辑,我们并不能获取其他的有关 Task 的属性和方法,
意味着我们就不能定义(控制) 其他的行为。其实在我们将一个方法注册为 task 的装饰器是可以传参的,我们可以添加一个参数
@app.task(bound=self)
,这样我们通过 self
就可以获取 这个 task 的实例,进而就可以获取我们想要的信息了。
更直接的办法是: 我们直接继承 celery.Task
这个类来自定义,然后在装饰器中指定即可@app.task(base=CustomTask)
。这种方案
非常灵活且扩展性高。
下面来具体看看一些主要的属性和方法。
-
Task.request
, 这个属性是 类属性,指向另一个类:celery.worker.request.Request
, 里面包含了有关于此次任务的信息和任务状态。这些属性就不一一列举了,可以到 文档查阅。 这个 Request 类我们也可以重载的,非常灵活。 -
Task.retry()
,这是一个实例方法,可以在发生错误或则我们自定义的情况下重新执行任务,并且任务会发送到相同的队列, 任务的状态也会记录此次 重试记录,以便于我们追踪。
但是,当我们调用这个方法同时,我们也 抛出(raise) 了一个Retry
类型的”异常“,此后的代码不会执行。官方解释说,这个 ”异常“类似于 断言 ,是用来给 worker 通知说这个任务要重试,这样才能保存正确的任务状态。
我们可以设置 重试延迟,也就是延迟多久后重试,最大重试次数等,我们可以在 装饰器参数中设置:@app.task(retry_kwargs={})
参数中的的 自动重试设置(autoretry_for) 也是很强大的功能,有兴趣可以了解。
关于 task 的其他配置选项可以参考文档 -
Task.state
,我们可以通过这个属性追踪任务的当前状态,前提是需要配置 结果后端(result backend),celery 内置了四种状态, 并且可以利用 实例的 Task.update_state() 方法更新状态以及 状态元数据(state meta-data)。同时我们也可自定义状态名。 -
在 Task 类中有 四个 事件回调方法,分别是 after_return on_failure on_success on_retry,在需要时可以定义。
调用 task
总共有 3 种调用方式:
- task_func.delay(),只能添加函数的参数
- task_func.apply_async(),除了函数的参数,还可以添加 很多选项
- task_func(),直接使用函数,这样只会本地执行,和一般的函数执行方式相同
下面默认以 apply_async 这种调用方式。
-
延时执行
countdown=5,5 秒后执行 eta = datetime.datetime.now() + datetime.timedelta(days=1) 一天后执行 -
过期限制
expires = datetime.datetime.now() + datetime.timedelta(days=1) 一天后任务过期,这样当 worker 从队列中获取 任务信息时,如果过期,就会将任务状态置为 REVOKED -
消息序列化方式
serializer = ‘json’,可选的有:json pickle yaml msgpack -
消息压缩方式
compression= ‘brotli’ ,可选的有很多,但是大都需要安装 python 环境时同时编译安装相应的包 -
消息执行队列
queue = ‘queue_name’,可以指定消息的传递的队列 -
忽略结果
ignore_result=False,可以选择是否存储执行结果。需要 result backend 的支持。
另外一个很有用的可选项是任务状态变化的回调,当将一个任务执行后,需要监控状态的变化,这时就可以使用 on_message 这个 回调功能了:
1 |
|
1 |
|
每当任务状态有变化时,就会触发回调函数。