标签归档:Celery

Django + Celery处理异步任务/计划任务

在Web开发的过程中,难免有一些操作是耗时任务,有时候我们不想阻塞Web进程,希望提交到后台处理,有结果了再通知前端;有时候用户不需要实时结果,可以异步处理的,比如发送邮件通知;有时候我们希望定时执行某些任务,复用已有的一些Web代码。对于第一种情况,可以是RPC或调用其他接口处理;第二种情况则是放入队列,由后台进程异步处理;第三种情况可以是在定时任务内处理,也可以是触发消息放入队列,由后台进程任务处理,同第二种情况。对于第一种情况,也可以是放入队列由后台进程处理,Web前端,定时轮询队列/接口是否有结果。这样三种情况都可以统一为一种情况,即不同来源事件(用户/定时器)触发消息,放入队列由后台任务进程异步处理,异步查询结果。
之前的一个Python项目即类似。不管代码是PHP还是Python思想都是类似,只是不同语言有不同工具/优势来处理。Web前端展示统计报表,后台进程定时查询Impala,分析统计数据;Web前端也可以触发事件比如统计、发送邮件报告等,后台进程监听并处理。Web前端选择Django,是Python写的一个MVT框架,提供了登录认证、管理后台模块,还有命令行助手工具可以生成项目,易于快速搭建并扩展Web站点。后台任务处理选择Celery,是Python写的一个分布式的任务处理平台,支持各种消息队列,比如RabbbitMQ、Redis,并提供Flower监控工具。Django本身是一个Web框架,生产环境最好使用性能更好的HTTP服务器,Gunicorn是Python写的一个WSGI HTTP服务器,用来监听HTTP请求,调用Django。Gunicorn前端最好使用Nginx来做代理,分发请求,结构如下:

安装Python 3和Django,并生成项目框架代码。注意:由于机器上面本来就有Pyton 2.7,所以要重命名下。

yum -y update
yum -y install yum-utils
yum -y groupinstall development
yum install zlib-devel bzip2-devel openssl-devel ncurese-devel expat-devel gdbm-devel readline-devel sqlite-devel libffi-devel
wget https://www.python.org/ftp/python/3.6.2/Python-3.6.2.tar.xz
tar Jxvf Python-3.6.2.tar.xz
mv Python-3.6.2 /opt/Python-3.6.2
cd /opt/Python-3.6.2
./configure --enable-shared --prefix=/usr/local/python3
make && make install
vim /etc/ld.so.conf
/usr/local/python3/lib/
/sbin/ldconfig -v
ln -s /usr/local/python3/bin/python3.6 /usr/bin/python3.6
ln -s /usr/bin/python3.6 /usr/bin/python3
#ln -s /usr/local/python3/bin/python3.6 /usr/bin/python
ln -s /usr/local/python3/bin/pip3 /usr/bin/pip3
python3 -V
pip3 -V
pip3 install Django
python3 -m django --version
ln -s /usr/local/python3/bin/django-admin /usr/bin/django-admin/
cd /home/rc
django-admin startproject qrd
cd qrd
python3 manage.py startapp master
vim qrd/settings.py

编辑settings.py,监听相应域名、IP的请求

ALLOWED_HOSTS = ['dev.example.com','localhost','127.0.0.1','10.1.*.*]
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'master',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
]

测试一下

python3 manage.py runserver 0:80

然后安装Gunicorn

$ pip3 install gunicorn
$ pip3 install greenlet
$ pip3 install ConfigParser
$ ln -s /usr/local/python3/bin/gunicorn /usr/bin/gunicorn

测试运行

$ cd ~/qrd/qrd
$ gunicorn -w 4 qrd.wsgi --bind  unix:/tmp/gunicorn.sock
$ sudo vim /etc/systemd/system/gunicorn.service

创建Gunicorn服务

[Unit]
Description=gunicorn daemon
After=network.target
[Service]
User=root
Group=root
WorkingDirectory=/home/rc/qrd/qrd
ExecStart=/usr/bin/gunicorn --access-logfile - --workers 3 --bind unix:/tmp/gunicorn.sock qrd.wsgi
[Install]
WantedBy=multi-user.target

开机启动

systemctl start gunicorn
systemctl enable gunicorn
#systemctl stop gunicorn

接下来安装Nginx

yum install nginx
vim /etc/nginx/conf.d/default.conf

配置Nginx与Gunicorn通过UNIX Sockect通信

server {
    listen       80;
    server_name  localhost;
    #charset koi8-r;
    #access_log  /var/log/nginx/host.access.log  main;
    #location / {
    #   root   /usr/share/nginx/html;
    #   index  index.html index.htm;
    #
    location / {
        proxy_pass http://unix:/tmp/gunicorn.sock;
        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }
}

启动Nginx

systemctl start nginx
systemctl enable nginx

安装MySQL及RabbitMQ

yum install mysql-community-devel
pip3 install mysqlclient


yum install rabbitmq-server
rabbitmq-server -detached
rabbitmqctl status

创建RabbitMQ vhost

$ rabbitmqctl add_user qrdweb <password>
Creating user "qrdweb" ...
...done.
$ rabbitmqctl add_vhost qrdweb
Creating vhost "qrdweb" ...
...done.
$ rabbitmqctl set_user_tags qrdweb management
Setting tags for user "qrdweb" to [management] ...
...done.
$ rabbitmqctl set_permissions -p qrdweb qrdweb ".*" ".*" ".*"
Setting permissions for user "qrdweb" in vhost "qrdweb" ...
...done.
$ netstat -apn | grep rabbitmq
$ rabbitmqctl status

安装Celery

pip3 install celery
ln -s /usr/local/python3/bin/celery /usr/bin/celery

测试Celery的异步任务worker及计划任务schedule

$ cd /home/qrd/qrd/
$ ls
dashboard  db.sqlite3  manage.py  master  qrd  static
$ celery -A qrd worker -l info
/usr/local/python3/lib/python3.6/site-packages/celery/platforms.py:795: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the -u option.
User information: uid=0 euid=0 gid=0 egid=0
  uid=uid, euid=euid, gid=gid, egid=egid,
 -------------- [email protected] v4.1.0 (latentcall)
---- **** -----
--- * ***  * -- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2017-09-22 08:19:39
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         qrd:0x7fda62e705c0
- ** ---------- .> transport:   amqp://qrdweb:**@localhost:5672/qrdweb
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
 
[tasks]
  . dashboard.tasks.debug_task
[2017-09-22 08:19:39,769: INFO/MainProcess] Connected to amqp://qrdweb:**@127.0.0.1:5672/qrdweb
[2017-09-22 08:19:39,781: INFO/MainProcess] mingle: searching for neighbors
[2017-09-22 08:19:40,811: INFO/MainProcess] mingle: all alone
[2017-09-22 08:19:40,860: WARNING/MainProcess] /usr/local/python3/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2017-09-22 08:19:40,860: INFO/MainProcess] [email protected] ready.
[2017-09-22 08:20:55,023: INFO/MainProcess] Received task: dashboard.tasks.debug_task[71e6c0e1-92e1-494e-b5e9-163eeb7bd24e]
[2017-09-22 08:20:55,027: INFO/ForkPoolWorker-1] Task dashboard.tasks.debug_task[71e6c0e1-92e1-494e-b5e9-163eeb7bd24e] succeeded in 0.001253978000022471s: 'debug_task'
[2017-09-22 08:22:21,179: INFO/MainProcess] Received task: dashboard.tasks.debug_task[b81fe9a0-1725-4702-ba0e-13196c9b5977]
[2017-09-22 08:22:21,180: INFO/ForkPoolWorker-1] Task dashboard.tasks.debug_task[b81fe9a0-1725-4702-ba0e-13196c9b5977] succeeded in 0.00018433199147693813s: 'debug_task'
 
 
$ celery -A qrd beat -l info -s /tmp/celerybeat-schedule
celery beat v4.1.0 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2017-09-24 04:20:37
Configuration ->
    . broker -> amqp://qrdweb:**@localhost:5672/qrdweb
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> /tmp/celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2017-09-24 04:20:37,823: INFO/MainProcess] beat: Starting...
[2017-09-24 04:20:37,866: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)
[2017-09-24 04:20:47,856: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)
[2017-09-24 04:20:57,858: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)
[2017-09-24 04:20:57,861: INFO/MainProcess] Scheduler: Sending due task qrd.celery.test('world') (qrd.celery.test)
[2017-09-24 04:21:07,858: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)
[2017-09-24 04:21:17,859: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)

运行成功,可以使用Supervisord来守护监控Celery的运行,参考这里
Django项目结果如下

先配置Celery使用RabbitMQ作为Broker,使用Django DB来保存调用结果settings.py

import os
from configparser import RawConfigParser

#https://code.djangoproject.com/wiki/SplitSettings
config = RawConfigParser()
config.read('/home/qrd/setting/settings.ini')

STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(BASE_DIR, 'static')

CELERY_BROKER_URL = 'amqp://usr:pwd@localhost:5672/qrdweb'
CELERY_RESULT_BACKEND = 'django-db'

然后在Django项目下创建celery.py文件

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'qrd.settings')

app = Celery('qrd')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


app.conf.beat_schedule = {
    'hue-tasks-debug_task': {
        'task': 'hue.tasks.debug_task',
        'schedule': 10.0,
        'args': ()
    },
}

并且在__init__.py引入Celery即可集成

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

Django的异步任务只能定义在各个app的task.py文件里,比如qrd.hue.tasks定义了一个定时任务

from celery import task

@task
def debug_task():
    #print(arg)
    return 'debug_task'


也可以在其模块里面调用

from tasks import debug_task

def save(data):
    debug_task.delay()

顺便推荐一个Bootstrap管理后台模板:gentelella

参考链接:
异步任务神器 Celery
Django配置celery执行异步任务和定时任务
淺談 Gunicorn 各個 worker type 適合的情境