再次理解 Flask 中 Celery 的用法(一)
再次理解 Flask 中 Celery 的用法(一)
Celery 概览
Celery 是用 Python 编写的任务队列工具,它使用 Python 的多任务库来并行地执行任务。
在下面我们用 Flask Server 来发送任务请求,由消息队列储存并转发给 Celery。Celery 运行的结果也同样由消息队列传递回去。
消息队列是一个队列,用来在生产者进程和消费者进程之间传递信息。不过消息队列在生产者接收到消息之后就立即把数据丢弃了,这时候可以用数据库来保存任务结果。
配置 Celery 环境
安装 Celery
pip install celery
安装 Flask 扩展来辅助处理 Celery 的初始化(可选)
pip install flask-celery-helper
安装 Redis 来作为消息队列以及储存最终的结果
brew install redis
redis-server # 使用 redis
在 Celery 中创建任务
在 Celery 中,broker 意思是中间人,指的就是消息队列本身,而 result backend 顾名思义就是存储得到的结果。
单文件示例
"""task.py"""
from celery import Celery
import time
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def add(x, y):
print 'hello celery'
time.sleep(10)
return x + y
使用 task:
新建一个 Python Shell,使用 delay 或 apply_async 来运行 add 方法,注意只有当 Celery 实例存在时运行结果才会被收到
>>> from task import add >>> add.delay(3, 5) >>> add.apply_acync([7, 8])
再在另一个终端中运行 Celery worker
celery worker -A task --loglevel=info
worker 表示使用 Celery worker
-A 参数是一个含有 Celery 实例的模块名,Celery 通过它来运行
运行 Celery worker 之后就可以看到任务信息了,如果要在 shell 中得到返回的结果则还要配置 backend
修改上面的 app 为如下即可
app = Celery(
'tasks',
broker='redis://localhost:6379',
backend='redis://localhost:6379'
)
然后就可以在 Python Shell 查看 Celery 运行的结果和状态了
>>> rst = add.delay(8, 7)
>>> rst.state() # 查看任务现在的状态
>>> rst.ready() # 判断任务是否完成
>>> rst.get() # 得到运行结果
15
Flask 中使用示例
使用扩展来配置
from flask import Flask
from flask_celery import Celery()
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = Celery(app)
@celery.task()
def add_together(a, b):
return a + b
这里直接使用了上面安装的 flask-celery-helper 扩展,所以就直接接用扩展来初始化 Celery 实例即可。
手动配置
准备工作
"""celery_maker.py"""
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
使用 Celery
from flask import Flask
from celery_maker import make_celery
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)
@celery.task()
def add_together(a, b):
return a + b
make_celery 的作用就是把每个对 Celery 任务的调用都包含到 Python 的 with 代码中,这样 Celery 在 Flask 的应用上下文中,就可以确保 Flask 扩展的调用都能正常工作了。
使用方法和之前的也是一样,不过在调用 task 的时候也必须在 Flask 的应用上下文中,不然就达不到预料的结果。
吐槽
我之前就是因为把配置文件的名字写错了导致一直提示 backend 不存在……不要盲目相信书上的内容……还是官方文档靠谱……