Jun 29, 2011

celery, django-celery

celery 使用说明
celery 是一个分布式任务队列,任务可以分发给多个worker,worker 可以在不同的进程,不同的服务器,还可以有route,故为分布式。
celery核心概念和类是任务,围绕任务,有任务分发,任务执行,结果保存,结果接收等过程
  • client 发出任务,是 producer,完成任务的 worker( Task 对象)是 comsumer。
  • 而任务结束后,如果结果要保存起来,则 worker 是 producer,而接收结果的 get_result 模块是consumer
这两部分的backend在celery中是分开的,分别的设置为
  • BROKER_BACKEND (默认为 "amqp")
  • CELERY_RESULT_BACKEND
BROKER_BACKEND推荐使用rabbitmq(rabbitmq 要做些设置)
你也可以使用Redis/Database
如果使用 rabbitmq,则queue的名字为 celery,一个任务是该queue上的一个message

CELERY_RESULT_BACKEND也可以用多种方法,且各有其优劣,请一定详细了解这些优劣。
1) AMQP Result Backend: 结果并没有保存起来,而是以消息方式传递,接受结果的模块不用pull,随时可以接受消息,实时性比较好。但是消息只能被提取一次,意味着你不能有两个模块接受同一个结果。
并且还有queue的问题,每一个task的结果对应一个自己的queue,该queue的名字为 task id,结果被接收后该queue被删除。你必须保证每一个结果都被接受,否则queue会越来越多,造成rabbitmq性能下降。也可以设置CELERY_AMQP_TASK_RESULT_EXPIRES,则一定时间后结果会被清除。
注意:rabbitmqctl 管理工具不提供删除queue的命令。你只能通过 amqp api 来删除,问题是如果queue过多,rabbitmq就停止响应了

查看queue
rabbitmqctl list_queues -p <myvhost> name messages consumers

2)采用数据库作为 result backend
3)可以指定task抛弃结果(有些任务结果并不重要,比如发送email)

get_result 模块要知道是 task id,才能获得结果。这可以采用client 或者 worker保存task id,get_result获得task id。下面例子中 task id 保存在文件中,生产环境中不推荐这样做。

# client
result = add.delay(4,4)
result.task_id

# worker
from celery.task import task

@task(name="sum-of-two-numbers", serializer="json")
def add(x, y):
    print("Executing task id %r, args: %r kwargs: %r" % (
        add.request.id, add.request.args, add.request.kwargs))
    r = x + y
    f = open('id.txt','w')
    f.write("%r"%add.request.id)
    f.close()
    return r

# get_result
from tasks import add

line = open('id.txt','r').read()
task_id = eval(line)

result = add.AsyncResult(task_id)
print result.get()
上面用到了一个python技巧,eval

你也可以在worker中使用 cache 来保存 task id 甚至 结果,而不让celery代劳.

First steps with Celery 在python下如何使用celery
在 windows 下
python -m celery.bin.celeryd
-m 把modele 作为 script来运行

First steps with Django
在 windows 下,要加 --settings=settings
python manage.py celeryd -l info --settings=settings

如果有新的task worker,要重新执行
python manage.py celeryd -l info --settings=settings
才会注册这些worker

0 comments: