celery在Django中的应用

celery介绍

  • Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。
    Celery 专注于实时任务处理,支持任务调度。
    说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。

关于celery的概念介绍

  • Brokers

    • brokers 中文意思为中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列)
    • 常见的 brokers 有 rabbitmq、redis、Zookeeper 等
  • Result Stores / backend

    • 顾名思义就是结果储存的地方,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 Result Stores 了
    • 常见的 backend 有 redis、Memcached 甚至常用的数据都可以。
  • Workers

    • 就是 Celery 中的工作者,类似与生产/消费模型中的消费者,其从队列中取出任务并执行
  • Tasks

    • 就是我们想在队列中进行的任务咯,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。

celery安装

pyhon 环境是必须的
Celery 依赖于erlang,所以需要安装erlang。
Celery 需要消息队列配合使用,RabbitMQ, Redis都可以,以下在unbuntu中操作的命令。

1
2
3
4
5
apt-get install erlang
apt-get install rabbitmq-server
pip install celery
apt-get install sqlite
pip install sqlalchemy

其中,rabbitmq为我们选用的消息队列工具,sqlite为db,sqlalchemy为Python的ORM框架
如果是使用的django 需要安装 django-celery

1
pip install django-celery

Worker 需要额外安装task执行的时候的需要包,比如,requests

Flower 是celery的监控工具

1
pip install flower

Django配置Celery

假设工程目录如下:

1
2
3
4
5
+ /Django/
+ /ProjectName/
/celery.py
+ /Application
/task.py

  • celery 文件内容(Celery.py 文件主要用来设置celery)
    要放在工程的目录,比如:/ProjectName/celery.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    from __future__ import absolute_import
    from celery import Celery
    from kombu import Queue, Exchange
    import os
    from django.conf import settings
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ProjectName.settings')
    # set message queue and tasks list
    app = Celery('Appname',
    broker='amqp://guest:guest@192.168.143.133:5672//',
    backend='amqp://guest:guest@192.168.143.133:5672//',
    include=['Application.tasks'])
    # Using a string here means the worker will not have to
    # pickle the object when using Windows.
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    # Optional configuration, see the application user guide.
    app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=60,
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TIMEZONE='Asia/Shanghai',
    CELERY_ENABLE_UTC=True,
    CELERY_QUEUES=(
    Queue('Env_Testing', Exchange('run'), routing_key='run.Testing'),
    Queue('Env_Regression', Exchange('run'), routing_key='run.Regression'),
    ),
    CELERY_DEFAULT_QUEUE='default',
    CELERY_DEFAULT_EXCHANGE='default',
    CELERY_DEFAULT_EXCHANGE_TYPE='direct',
    CELERY_DEFAULT_ROUTING_KEY='default',
    CELERY_TRACK_STARTED=True,
    )
    if __name__ == '__main__':
    app.start()
    • task文件(task.py文件是task的内容,需要执行的代码)
      对应的application目录下, 比如:/App/task.py
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      from __future__ import absolute_import
      from ProjectName.celery import app
      from celery.result import AsyncResult
      @app.task
      def task1(ID):
      return task1(ID, "key")
      @app.task
      def task2(ID):
      run_task(ID)
      @app.task
      def error_handler(uuid):
      result = AsyncResult(uuid)
      exc = result.get(propagate=False)
      print('ERROR:Task {0} raised exception: {1!r}\n{2!r}'.format(
      uuid, exc, result.traceback))
  • task执行
    如何调用task?

    • 第一种:
      task.delay

      1. result = task1.delay(8, 8)
      2. result.wait() # wait for and return the result
    • 第二种:
      task.aplly_async
      可以指定task放到哪个队列里面 queue=”xxx”
      制定错误处理 link_error

      1
      2
      3
      4
      from InterfaceFrameWork.tasks import task1, task2, error_handler
      re = task1.apply_async(args="123", queue="QueueName", link_error=error_handler.s())
      result = re.get(timeout=60)

      其他方式参见Celery官网:http://www.celeryproject.org/

启动Celery worker

启动Celery之前确保消息队列服务已经开启。

1
celery -A ProjectName.celery:app worker -l info -Q Env_Regression -n work1@M11

-Q 为队列名称,不制定执行所有多列的消息,指定后只执行指定的消息队列任务
-n 设置worker名字

启动Flower

启动Flower之前确保消息队列服务已经开启。

1
2
cd Django
flower -A ProjectName.celery:app

开启后默认端口5555可以访问,监控各个worker

更多应用

查看 http://www.open-open.com/lib/view/open1484208699906.html