再次理解 Flask 中 Celery 的用法(二)

Author Avatar
patrickcty 9月 03, 2017

再次理解 Flask 中 Celery 的用法(二)

运行 Celery task

我们之前都是用 Celery task 的 delay() 方法来运行任务,但是这只是 apply_async 方法的简略版本,后者的使用方法如下:

使用方法:

task.apply_async(
    args=[1, 2],  # 可选,表示参数列表,是一个 list
    kwargs={'kwarg1': '1', 'kwarg2': '2'},
    # countdown 表示指定秒数后后开始处理这个任务
    # 不是一定在那个时候执行,要根据其他任务来定
    countdown=600,
    # eta 表示指定时间开始处理这个任务
    # 执行时间和 countdown 一样不完全可靠
    # 我用这个参数来发邮件结果第二天才发出去……
    eta=datetime.datime.now() + datetime.timedelta(hours=1)
)

Celery 签名/子任务

签名可以把 task 生成函数来便于其他的函数进行调用。

>>> from celery import signature
>>> from webapp.tasks import multiply
# signature 的第一个参数要写全
>>> signature('webapp.tasks.multiply', args=[4, 4], countdown=10)
webapp.tasks.multiply(4, 4)  # 返回的是一个函数
# 和上面相同的功能
>>> multiply.subtask((4, 4), countdown=10)
webapp.tasks.multiply(4, 4)
# 上面的缩略版本,没有关键字参数
>>> multiply.s(4, 4)
webapp.tasks.multiply(4, 4)
>>> multiply.s(4, 4)()  # 调用函数
16
>>> multiply.s(4, 4).delay()  # 创建任务

偏函数

Celery 任务签名的第一个应用是偏函数。偏函数来源于一个要接受很多参数的函数,这个函数被施加某种操作之后,生成了一个新的函数,在调用这个函数的时候,前 n 个参数永远是一样的。

>>> partial = multiply.s(4)  # 第一个参数永远是 4
>>> partial(5)
20
>>> partial.delay(5)

回调函数

回调函数就是根据前一个任务执行的结果来执行的函数。在 apply_async() 中使用 link 参数来添加回调函数。

这个执行任务返回的值并不是回调函数的结果,而是前一个任务执行的结果。

# link 接受一个签名作为参数
# log 接受 multiply 的结果作为参数
# 这里是建立了两个 task,并且返回值都是 16
>>> multiply.apply_async((4, 4), link=log.s())
# 如果回调函数不接受输入或者不需要上一个任务的结果
# 那么签名必须使用 si 方法(设置为不可变类型)
# 结果为 16 而不是 hello
>>> multiply.apply_async((4, 4), link=log.si('hello'))

回调函数常常用来做两个联系紧密的任务,比如每次创建用户的任务之后都发送一封欢迎 email。

偏函数也可以和回调函数一起用:

>>> multiply.apply_async((4, 4), link=multiply.s(4))

任务组

任务组函数接受一组任务签名的列表,并生成一个函数,调用该函数可并行执行所有的任务签名,并返回所有结果的列表。

>>> from celery import group
>>> sig = group(multiply.s(i, i + 5) for i in range(10))
>>> rst = sig.delay()
>>> rst.get()
[0, 6, 14, 24, 36, 50, 66, 84, 104, 126]

任务链

任务链和回调函数有点相像,它接受一组任务签名,把每个签名的执行结果传给任务链的下一个,最后只会返回最后一个的结果。

>>> from celery import chain
# 事实上也是创建了多个任务,但是只会返回最终的结果
>>> sig = chain(multiply.s(10, 10), multiply.s(10), multiply.s(10))
# 另一种写法
>>> sig = (multiply.s(10, 10) | multiply.s(10) | multiply.s(10))

任务链也可以用来组合偏函数生成新的偏函数,这样的话任务链也可以嵌套了

>>> func = (multiply.s(10) | multiply.s(10))
>>> rst = func.delay(16)
>>> rst.get()
1600

复合任务

复合任务是函数生成一个任务签名的时候,会先执行一个任务组,然后把最终结果传给回调函数,最后回调函数的结果就是最终的结果,这里和上面的回调函数部分有些不同。

>>> from celery import chord
>>> sig = chord(
    group(multiply(i, i + 5) for i in range(10)),
    # 整个任务组执行的结果,也就是回调函数的参数是 list
    multiply.s(2) 
)
>>> rst = sig.delay()
>>> rst.get()
[0, 6, 14, 24, 36, 50, 66, 84, 104, 126, 0, 6, 14, 24, 36, 50, 66, 84, 104, 126]
# 还有另一种写法
>>> sig = (group([0, 6, 14, 24, 36, 50, 66, 84, 104, 126]) | multiply.s(2))

定期执行任务

和 Linux 的 cron 命令有点类似,不过这里完全是在 Flask 的上下文中来执行任务的。

要在 config 文件中配置:

CELERY_SCHEDULE = {
    'log-every-30-seconds': {
        'task': 'webapp.tasks.log',
        'schedule': datetime.timedelta(seconds=30),
        'args': ['hello',]
    },
}

这里是定义了 log 任务每隔 30s 执行一次

如果要运行定期任务,就要使用 Celery 的 beat 工作进程,当然任务也是在 worker 进程中执行的。

celery -A celery_runner beat

如果要在一个精确的时间执行,那么就要用 crontab 对象了,具体的用法参考 Celery 官方文档

from celery.schedules import crontab
# 每天凌晨
>>> crontab(minute=0, hour=0)
# 早上五点,十点,下午三点,八点
>>> crontab(minute=0, hour=[5, 10, 15, 20])

在 Flower 中通过网页监控

Flower 是针对 Celery 的基于网页的试试管理工具。

安装 Flower:

pip install flower

运行 Flower:

celery flower -A celery_runner --loglevel=info

最后

说实话第一次看到上面这些的时候真的十分头大,一脸懵逼,不过现在回头整理了一遍之后明了了许多,不过要吐槽的是 eta 为什么这么坑啊!