再次理解 Flask 中 Celery 的用法(一)

Author Avatar
patrickcty 9月 01, 2017

再次理解 Flask 中 Celery 的用法(一)

Celery 概览

Celery 是用 Python 编写的任务队列工具,它使用 Python 的多任务库来并行地执行任务。

在下面我们用 Flask Server 来发送任务请求,由消息队列储存并转发给 Celery。Celery 运行的结果也同样由消息队列传递回去。

消息队列是一个队列,用来在生产者进程和消费者进程之间传递信息。不过消息队列在生产者接收到消息之后就立即把数据丢弃了,这时候可以用数据库来保存任务结果。

配置 Celery 环境

安装 Celery

pip install celery

安装 Flask 扩展来辅助处理 Celery 的初始化(可选)

pip install flask-celery-helper

安装 Redis 来作为消息队列以及储存最终的结果

brew install redis
redis-server  # 使用 redis

在 Celery 中创建任务

在 Celery 中,broker 意思是中间人,指的就是消息队列本身,而 result backend 顾名思义就是存储得到的结果。

单文件示例

"""task.py"""

from celery import Celery
import time

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def add(x, y):
    print 'hello celery'
    time.sleep(10)
    return x + y

使用 task:

  • 新建一个 Python Shell,使用 delay 或 apply_async 来运行 add 方法,注意只有当 Celery 实例存在时运行结果才会被收到

    >>> from task import add
    >>> add.delay(3, 5)
    >>> add.apply_acync([7, 8])
  • 再在另一个终端中运行 Celery worker

    celery worker -A task --loglevel=info

    worker 表示使用 Celery worker
    -A 参数是一个含有 Celery 实例的模块名,Celery 通过它来运行

运行 Celery worker 之后就可以看到任务信息了,如果要在 shell 中得到返回的结果则还要配置 backend

修改上面的 app 为如下即可

app = Celery(
    'tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

然后就可以在 Python Shell 查看 Celery 运行的结果和状态了

>>> rst = add.delay(8, 7)
>>> rst.state()  # 查看任务现在的状态
>>> rst.ready()  # 判断任务是否完成
>>> rst.get()    # 得到运行结果
15

Flask 中使用示例

使用扩展来配置

from flask import Flask
from flask_celery import Celery()

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = Celery(app)


@celery.task()
def add_together(a, b):
    return a + b

这里直接使用了上面安装的 flask-celery-helper 扩展,所以就直接接用扩展来初始化 Celery 实例即可。

手动配置

准备工作

"""celery_maker.py"""
from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name, 
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

使用 Celery

from flask import Flask
from celery_maker import make_celery

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)


@celery.task()
def add_together(a, b):
    return a + b

make_celery 的作用就是把每个对 Celery 任务的调用都包含到 Python 的 with 代码中,这样 Celery 在 Flask 的应用上下文中,就可以确保 Flask 扩展的调用都能正常工作了。

使用方法和之前的也是一样,不过在调用 task 的时候也必须在 Flask 的应用上下文中,不然就达不到预料的结果。

吐槽

我之前就是因为把配置文件的名字写错了导致一直提示 backend 不存在……不要盲目相信书上的内容……还是官方文档靠谱……