celery介绍
- Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。
Celery 专注于实时任务处理,支持任务调度。
说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。
关于celery的概念介绍
Brokers
- brokers 中文意思为中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列)
- 常见的 brokers 有 rabbitmq、redis、Zookeeper 等
Result Stores / backend
- 顾名思义就是结果储存的地方,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 Result Stores 了
- 常见的 backend 有 redis、Memcached 甚至常用的数据都可以。
Workers
- 就是 Celery 中的工作者,类似与生产/消费模型中的消费者,其从队列中取出任务并执行
Tasks
celery安装
pyhon 环境是必须的
Celery 依赖于erlang,所以需要安装erlang。
Celery 需要消息队列配合使用,RabbitMQ, Redis都可以,以下在unbuntu中操作的命令。
其中,rabbitmq为我们选用的消息队列工具,sqlite为db,sqlalchemy为Python的ORM框架
如果是使用的django 需要安装 django-celery
Worker 需要额外安装task执行的时候的需要包,比如,requests
Flower 是celery的监控工具
Django配置Celery
假设工程目录如下:
celery 文件内容(Celery.py 文件主要用来设置celery)
要放在工程的目录,比如:/ProjectName/celery.py12345678910111213141516171819202122232425262728293031323334353637383940414243from __future__ import absolute_importfrom celery import Celeryfrom kombu import Queue, Exchangeimport osfrom django.conf import settings# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ProjectName.settings')# set message queue and tasks listapp = Celery('Appname',broker='amqp://guest:guest@192.168.143.133:5672//',backend='amqp://guest:guest@192.168.143.133:5672//',include=['Application.tasks'])# Using a string here means the worker will not have to# pickle the object when using Windows.app.config_from_object('django.conf:settings')app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)# Optional configuration, see the application user guide.app.conf.update(CELERY_TASK_RESULT_EXPIRES=60,CELERY_TASK_SERIALIZER='json',CELERY_ACCEPT_CONTENT=['json'], # Ignore other contentCELERY_RESULT_SERIALIZER='json',CELERY_TIMEZONE='Asia/Shanghai',CELERY_ENABLE_UTC=True,CELERY_QUEUES=(Queue('Env_Testing', Exchange('run'), routing_key='run.Testing'),Queue('Env_Regression', Exchange('run'), routing_key='run.Regression'),),CELERY_DEFAULT_QUEUE='default',CELERY_DEFAULT_EXCHANGE='default',CELERY_DEFAULT_EXCHANGE_TYPE='direct',CELERY_DEFAULT_ROUTING_KEY='default',CELERY_TRACK_STARTED=True,)if __name__ == '__main__':app.start()- task文件(task.py文件是task的内容,需要执行的代码)
对应的application目录下, 比如:/App/task.py123456789101112131415161718from __future__ import absolute_importfrom ProjectName.celery import appfrom celery.result import AsyncResultdef task1(ID):return task1(ID, "key")def task2(ID):run_task(ID)def error_handler(uuid):result = AsyncResult(uuid)exc = result.get(propagate=False)print('ERROR:Task {0} raised exception: {1!r}\n{2!r}'.format(uuid, exc, result.traceback))
- task文件(task.py文件是task的内容,需要执行的代码)
task执行
如何调用task?第一种:
task.delay- result = task1.delay(8, 8)
- result.wait() # wait for and return the result
第二种:
task.aplly_async
可以指定task放到哪个队列里面 queue=”xxx”
制定错误处理 link_error1234from InterfaceFrameWork.tasks import task1, task2, error_handlerre = task1.apply_async(args="123", queue="QueueName", link_error=error_handler.s())result = re.get(timeout=60)其他方式参见Celery官网:http://www.celeryproject.org/
启动Celery worker
启动Celery之前确保消息队列服务已经开启。
-Q 为队列名称,不制定执行所有多列的消息,指定后只执行指定的消息队列任务
-n 设置worker名字
启动Flower
启动Flower之前确保消息队列服务已经开启。
开启后默认端口5555可以访问,监控各个worker