再次理解 Flask 中 Celery 的用法(一)

再次理解 Flask 中 Celery 的用法(一)

Celery 概览

Celery 是用 Python 编写的任务队列工具,它使用 Python 的多任务库来并行地执行任务。

在下面我们用 Flask Server 来发送任务请求,由消息队列储存并转发给 Celery。Celery 运行的结果也同样由消息队列传递回去。

消息队列是一个队列,用来在生产者进程和消费者进程之间传递信息。不过消息队列在生产者接收到消息之后就立即把数据丢弃了,这时候可以用数据库来保存任务结果。

配置 Celery 环境

安装 Celery

1
pip install celery

安装 Flask 扩展来辅助处理 Celery 的初始化(可选)

1
pip install flask-celery-helper

安装 Redis 来作为消息队列以及储存最终的结果

1
2
brew install redis
redis-server # 使用 redis

在 Celery 中创建任务

在 Celery 中,broker 意思是中间人,指的就是消息队列本身,而 result backend 顾名思义就是存储得到的结果。

单文件示例

1
2
3
4
5
6
7
8
9
10
11
12
"""task.py"""

from celery import Celery
import time

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def add(x, y):
print 'hello celery'
time.sleep(10)
return x + y

使用 task:

  • 新建一个 Python Shell,使用 delay 或 apply_async 来运行 add 方法,注意只有当 Celery 实例存在时运行结果才会被收到

    1
    2
    3
    >>> from task import add
    >>> add.delay(3, 5)
    >>> add.apply_acync([7, 8])
  • 再在另一个终端中运行 Celery worker

    1
    celery worker -A task --loglevel=info

    worker 表示使用 Celery worker
    -A 参数是一个含有 Celery 实例的模块名,Celery 通过它来运行

运行 Celery worker 之后就可以看到任务信息了,如果要在 shell 中得到返回的结果则还要配置 backend

修改上面的 app 为如下即可

1
2
3
4
5
app = Celery(
'tasks',
broker='redis://localhost:6379',
backend='redis://localhost:6379'
)

然后就可以在 Python Shell 查看 Celery 运行的结果和状态了

1
2
3
4
5
>>> rst = add.delay(8, 7)
>>> rst.state() # 查看任务现在的状态
>>> rst.ready() # 判断任务是否完成
>>> rst.get() # 得到运行结果
15

Flask 中使用示例

使用扩展来配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from flask import Flask
from flask_celery import Celery()

app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = Celery(app)


@celery.task()
def add_together(a, b):
return a + b

这里直接使用了上面安装的 flask-celery-helper 扩展,所以就直接接用扩展来初始化 Celery 实例即可。

手动配置

准备工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
"""celery_maker.py"""
from celery import Celery

def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery

使用 Celery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from flask import Flask
from celery_maker import make_celery

app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)


@celery.task()
def add_together(a, b):
return a + b

make_celery 的作用就是把每个对 Celery 任务的调用都包含到 Python 的 with 代码中,这样 Celery 在 Flask 的应用上下文中,就可以确保 Flask 扩展的调用都能正常工作了。

使用方法和之前的也是一样,不过在调用 task 的时候也必须在 Flask 的应用上下文中,不然就达不到预料的结果。

吐槽

我之前就是因为把配置文件的名字写错了导致一直提示 backend 不存在……不要盲目相信书上的内容……还是官方文档靠谱……