celery 是一个分布式任务队列,任务可以分发给多个worker,worker 可以在不同的进程,不同的服务器,还可以有route,故为分布式。
celery核心概念和类是任务,围绕任务,有任务分发,任务执行,结果保存,结果接收等过程
- client 发出任务,是 producer,完成任务的 worker( Task 对象)是 comsumer。
- 而任务结束后,如果结果要保存起来,则 worker 是 producer,而接收结果的 get_result 模块是consumer
- BROKER_BACKEND (默认为 "amqp")
- CELERY_RESULT_BACKEND
你也可以使用Redis/Database
如果使用 rabbitmq,则queue的名字为 celery,一个任务是该queue上的一个message
CELERY_RESULT_BACKEND也可以用多种方法,且各有其优劣,请一定详细了解这些优劣。
1) AMQP Result Backend: 结果并没有保存起来,而是以消息方式传递,接受结果的模块不用pull,随时可以接受消息,实时性比较好。但是消息只能被提取一次,意味着你不能有两个模块接受同一个结果。
并且还有queue的问题,每一个task的结果对应一个自己的queue,该queue的名字为 task id,结果被接收后该queue被删除。你必须保证每一个结果都被接受,否则queue会越来越多,造成rabbitmq性能下降。也可以设置CELERY_AMQP_TASK_RESULT_EXPIRES,则一定时间后结果会被清除。
注意:rabbitmqctl 管理工具不提供删除queue的命令。你只能通过 amqp api 来删除,问题是如果queue过多,rabbitmq就停止响应了
查看queue
rabbitmqctl list_queues -p <myvhost> name messages consumers
2)采用数据库作为 result backend
3)可以指定task抛弃结果(有些任务结果并不重要,比如发送email)
get_result 模块要知道是 task id,才能获得结果。这可以采用client 或者 worker保存task id,get_result获得task id。下面例子中 task id 保存在文件中,生产环境中不推荐这样做。
# client result = add.delay(4,4) result.task_id # worker from celery.task import task @task(name="sum-of-two-numbers", serializer="json") def add(x, y): print("Executing task id %r, args: %r kwargs: %r" % ( add.request.id, add.request.args, add.request.kwargs)) r = x + y f = open('id.txt','w') f.write("%r"%add.request.id) f.close() return r # get_result from tasks import add line = open('id.txt','r').read() task_id = eval(line) result = add.AsyncResult(task_id) print result.get()上面用到了一个python技巧,eval
你也可以在worker中使用 cache 来保存 task id 甚至 结果,而不让celery代劳.
First steps with Celery 在python下如何使用celery
在 windows 下
python -m celery.bin.celeryd
-m 把modele 作为 script来运行
First steps with Django
在 windows 下,要加 --settings=settings
python manage.py celeryd -l info --settings=settings
如果有新的task worker,要重新执行
python manage.py celeryd -l info --settings=settings
才会注册这些worker
0 comments:
Post a Comment