在Web开发的过程中,难免有一些操作是耗时任务,有时候我们不想阻塞Web进程,希望提交到后台处理,有结果了再通知前端;有时候用户不需要实时结果,可以异步处理的,比如发送邮件通知;有时候我们希望定时执行某些任务,复用已有的一些Web代码。对于第一种情况,可以是RPC或调用其他接口处理;第二种情况则是放入队列,由后台进程异步处理;第三种情况可以是在定时任务内处理,也可以是触发消息放入队列,由后台进程任务处理,同第二种情况。对于第一种情况,也可以是放入队列由后台进程处理,Web前端,定时轮询队列/接口是否有结果。这样三种情况都可以统一为一种情况,即不同来源事件(用户/定时器)触发消息,放入队列由后台任务进程异步处理,异步查询结果。
之前的一个Python项目即类似。不管代码是PHP还是Python思想都是类似,只是不同语言有不同工具/优势来处理。Web前端展示统计报表,后台进程定时查询Impala,分析统计数据;Web前端也可以触发事件比如统计、发送邮件报告等,后台进程监听并处理。Web前端选择Django,是Python写的一个MVT框架,提供了登录认证、管理后台模块,还有命令行助手工具可以生成项目,易于快速搭建并扩展Web站点。后台任务处理选择Celery,是Python写的一个分布式的任务处理平台,支持各种消息队列,比如RabbbitMQ、Redis,并提供Flower监控工具。Django本身是一个Web框架,生产环境最好使用性能更好的HTTP服务器,Gunicorn是Python写的一个WSGI HTTP服务器,用来监听HTTP请求,调用Django。Gunicorn前端最好使用Nginx来做代理,分发请求,结构如下:
安装Python 3和Django,并生成项目框架代码。注意:由于机器上面本来就有Pyton 2.7,所以要重命名下。
yum -y update yum -y install yum-utils yum -y groupinstall development yum install zlib-devel bzip2-devel openssl-devel ncurese-devel expat-devel gdbm-devel readline-devel sqlite-devel libffi-devel wget https://www.python.org/ftp/python/3.6.2/Python-3.6.2.tar.xz tar Jxvf Python-3.6.2.tar.xz mv Python-3.6.2 /opt/Python-3.6.2 cd /opt/Python-3.6.2 ./configure --enable-shared --prefix=/usr/local/python3 make && make install vim /etc/ld.so.conf /usr/local/python3/lib/ /sbin/ldconfig -v ln -s /usr/local/python3/bin/python3.6 /usr/bin/python3.6 ln -s /usr/bin/python3.6 /usr/bin/python3 #ln -s /usr/local/python3/bin/python3.6 /usr/bin/python ln -s /usr/local/python3/bin/pip3 /usr/bin/pip3 python3 -V pip3 -V pip3 install Django python3 -m django --version ln -s /usr/local/python3/bin/django-admin /usr/bin/django-admin/ cd /home/rc django-admin startproject qrd cd qrd python3 manage.py startapp master vim qrd/settings.py
编辑settings.py,监听相应域名、IP的请求
ALLOWED_HOSTS = ['dev.example.com','localhost','127.0.0.1','10.1.*.*] INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'master', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', ]
测试一下
python3 manage.py runserver 0:80
然后安装Gunicorn
$ pip3 install gunicorn $ pip3 install greenlet $ pip3 install ConfigParser $ ln -s /usr/local/python3/bin/gunicorn /usr/bin/gunicorn
测试运行
$ cd ~/qrd/qrd $ gunicorn -w 4 qrd.wsgi --bind unix:/tmp/gunicorn.sock $ sudo vim /etc/systemd/system/gunicorn.service
创建Gunicorn服务
[Unit] Description=gunicorn daemon After=network.target [Service] User=root Group=root WorkingDirectory=/home/rc/qrd/qrd ExecStart=/usr/bin/gunicorn --access-logfile - --workers 3 --bind unix:/tmp/gunicorn.sock qrd.wsgi [Install] WantedBy=multi-user.target
开机启动
systemctl start gunicorn systemctl enable gunicorn #systemctl stop gunicorn
接下来安装Nginx
yum install nginx vim /etc/nginx/conf.d/default.conf
配置Nginx与Gunicorn通过UNIX Sockect通信
server { listen 80; server_name localhost; #charset koi8-r; #access_log /var/log/nginx/host.access.log main; #location / { # root /usr/share/nginx/html; # index index.html index.htm; # location / { proxy_pass http://unix:/tmp/gunicorn.sock; proxy_set_header Host $host; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } }
启动Nginx
systemctl start nginx systemctl enable nginx
安装MySQL及RabbitMQ
yum install mysql-community-devel pip3 install mysqlclient yum install rabbitmq-server rabbitmq-server -detached rabbitmqctl status
创建RabbitMQ vhost
$ rabbitmqctl add_user qrdweb <password> Creating user "qrdweb" ... ...done. $ rabbitmqctl add_vhost qrdweb Creating vhost "qrdweb" ... ...done. $ rabbitmqctl set_user_tags qrdweb management Setting tags for user "qrdweb" to [management] ... ...done. $ rabbitmqctl set_permissions -p qrdweb qrdweb ".*" ".*" ".*" Setting permissions for user "qrdweb" in vhost "qrdweb" ... ...done. $ netstat -apn | grep rabbitmq $ rabbitmqctl status
安装Celery
pip3 install celery ln -s /usr/local/python3/bin/celery /usr/bin/celery
测试Celery的异步任务worker及计划任务schedule
$ cd /home/qrd/qrd/ $ ls dashboard db.sqlite3 manage.py master qrd static $ celery -A qrd worker -l info /usr/local/python3/lib/python3.6/site-packages/celery/platforms.py:795: RuntimeWarning: You're running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the -u option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- [email protected] v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2017-09-22 08:19:39 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: qrd:0x7fda62e705c0 - ** ---------- .> transport: amqp://qrdweb:**@localhost:5672/qrdweb - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . dashboard.tasks.debug_task [2017-09-22 08:19:39,769: INFO/MainProcess] Connected to amqp://qrdweb:**@127.0.0.1:5672/qrdweb [2017-09-22 08:19:39,781: INFO/MainProcess] mingle: searching for neighbors [2017-09-22 08:19:40,811: INFO/MainProcess] mingle: all alone [2017-09-22 08:19:40,860: WARNING/MainProcess] /usr/local/python3/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never ' [2017-09-22 08:19:40,860: INFO/MainProcess] [email protected] ready. [2017-09-22 08:20:55,023: INFO/MainProcess] Received task: dashboard.tasks.debug_task[71e6c0e1-92e1-494e-b5e9-163eeb7bd24e] [2017-09-22 08:20:55,027: INFO/ForkPoolWorker-1] Task dashboard.tasks.debug_task[71e6c0e1-92e1-494e-b5e9-163eeb7bd24e] succeeded in 0.001253978000022471s: 'debug_task' [2017-09-22 08:22:21,179: INFO/MainProcess] Received task: dashboard.tasks.debug_task[b81fe9a0-1725-4702-ba0e-13196c9b5977] [2017-09-22 08:22:21,180: INFO/ForkPoolWorker-1] Task dashboard.tasks.debug_task[b81fe9a0-1725-4702-ba0e-13196c9b5977] succeeded in 0.00018433199147693813s: 'debug_task' $ celery -A qrd beat -l info -s /tmp/celerybeat-schedule celery beat v4.1.0 (latentcall) is starting. __ - ... __ - _ LocalTime -> 2017-09-24 04:20:37 Configuration -> . broker -> amqp://qrdweb:**@localhost:5672/qrdweb . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> /tmp/celerybeat-schedule . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [2017-09-24 04:20:37,823: INFO/MainProcess] beat: Starting... [2017-09-24 04:20:37,866: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test) [2017-09-24 04:20:47,856: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test) [2017-09-24 04:20:57,858: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test) [2017-09-24 04:20:57,861: INFO/MainProcess] Scheduler: Sending due task qrd.celery.test('world') (qrd.celery.test) [2017-09-24 04:21:07,858: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test) [2017-09-24 04:21:17,859: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)
运行成功,可以使用Supervisord来守护监控Celery的运行,参考这里。
Django项目结果如下
先配置Celery使用RabbitMQ作为Broker,使用Django DB来保存调用结果settings.py
import os from configparser import RawConfigParser #https://code.djangoproject.com/wiki/SplitSettings config = RawConfigParser() config.read('/home/qrd/setting/settings.ini') STATIC_URL = '/static/' STATIC_ROOT = os.path.join(BASE_DIR, 'static') CELERY_BROKER_URL = 'amqp://usr:pwd@localhost:5672/qrdweb' CELERY_RESULT_BACKEND = 'django-db'
然后在Django项目下创建celery.py文件,
from __future__ import absolute_import, unicode_literals import os from celery import Celery from celery.schedules import crontab # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'qrd.settings') app = Celery('qrd') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() app.conf.beat_schedule = { 'hue-tasks-debug_task': { 'task': 'hue.tasks.debug_task', 'schedule': 10.0, 'args': () }, }
并且在__init__.py引入Celery即可集成
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app']
Django的异步任务只能定义在各个app的task.py文件里,比如qrd.hue.tasks定义了一个定时任务
from celery import task @task def debug_task(): #print(arg) return 'debug_task'
也可以在其模块里面调用
from tasks import debug_task def save(data): debug_task.delay()
顺便推荐一个Bootstrap管理后台模板:gentelella。
参考链接:
异步任务神器 Celery
Django配置celery执行异步任务和定时任务
淺談 Gunicorn 各個 worker type 適合的情境