标签归档:Python

使用SkPy创建Skype群组会话

每一次产品发布或故障支持,都需要将相关人员拉到同一个Skype群聊会话,以便讨论、测试、支持。如果每次都需要手动做这些动作是比较累;如果复用已有会话,又会影响本次无关人员。于是有了这样一个需求:在Web后台定义相关组员及关联关系,在Web前台点击即可以创建或加入相关会话。这要求提供一个HTTP的接口,接收会话人员及主题,创建聊天室。搜了一圈,发现SkPy这个库最简单,支持创建会话,发送/接收消息,事件监听等等,其他的库要么功能太简单不满足,要么需要安装Skype客户端,要么不支持最新(live)注册的Skype用户,决定使用这个来开发。由于只是一个简单的HTTP接口,决定使用web.py
首先安装SkPy和web.py,注意如果是CentOS 6,Python certifi版本只能是2015.04.28,否则会报错

sudo pip install SkPy
sudo pip install web.py
#you need to do follow steps on Centos 6.x, to make requests works
sudo pip uninstall -y certifi
sudo pip install certifi==2015.04.28

web.py只要单个文件就可以工作了,创建chat.py如下

import web
from skpy import Skype
from skpy import SkypeAuthException
import logging
import hashlib
import os.path
import io
 
 
urls = (
    '/', 'index',
    '/chat', 'chat'
)
'''
try:
    import http.client as http_client
except ImportError:
    # Python 2
    import httplib as http_client
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
'''
 
 
class SkypeService:
    def __init__(self):
        self.username = '<skype account>'
        self.password = '<skype password>'
        self.token_file="/tmp/tokens-app"
        self.skype = Skype(connect=False)
        self.skype.conn.setTokenFile(self.getTokenFile())
 
    def getTokenFile(self):
        if not os.path.isfile(self.token_file):
            with io.open(self.token_file, 'a') as file:
                file.close()
        return self.token_file
 
    def connect(self):
        try:
            self.skype.conn.readToken()
        except SkypeAuthException:
            self.skype.conn.setUserPwd(self.username, self.password)
            self.skype.conn.getSkypeToken()
 
    def creatChatRoom(self, member, topic):
        ch = self.skype.chats.create(members=member)
        ch.setTopic(topic)
        return ch
 
    def getShardLink(self, channel):
        return channel.joinUrl
 
    def createAndGetSharedLink(self, member, topic):
        self.connect()
        ch = self.creatChatRoom(member, topic)
        # ch.sendMsg("welcome")
        # return {"id": ch.id, "url": self.getShardLink(ch)}
        return self.getShardLink(ch)

    def getConversationIdByUrl(self, url):
        id = self.skype.chats.urlToIds(url)["Resource"]
        return id
    
    def getChatroomByUrl(self, url):
        id = self.getConversationIdByUrl(url)
        ch = getChatroomByConversationId(id)
        return ch

    def getChatroomByConversationId(self, id):
        ch = self.skype.chats.chat(id)
        return ch

    def sendMessageByConversationId(self, id, message):
        ch = self.getChatroomByConversationId(id)
        return ch.sendMsg("Hello world!")

    def getMessagesByConversationId(self, id):
        ch = self.getChatroomByConversationId(id)
        return ch.getMsgs()
 
 
class Storage:
    def __init__(self):
        self.cache_path = '/tmp/'
 
    def set(self, key, value):
        cache_file = self.cache_path + key
        try:
            with io.open(cache_file, 'w') as file:
                file.write(value)
        except:
            raise Exception('file: {0} write failure'.format(cache_file))
        return True
 
    def get(self, key):
        cache_file = self.cache_path + key
        try:
            with io.open(cache_file) as file:
                value = file.read()
        except:
            raise Exception('file: {0} not exists'.format(cache_file))
        return value
 
 
class index:
    def GET(self):
        return "Hello, world!"
 
 
class chat:
    def GET(self):
        url = web.ctx.home + web.ctx.path + web.ctx.query
        key = hashlib.md5(url).hexdigest()
        storage = Storage()
        try:
            join_url = storage.get(key)
        except:
            param = web.input()
            users = param.user
            member = tuple(users.split(','))
            topic = param.topic
            sk = SkypeService()
            join_url = sk.createAndGetSharedLink(member, topic)
            storage.set(key, join_url)
 
        return join_url
 
 
if __name__ == "__main__":
    app = web.application(urls, globals())
    app.run()

然后运行chat.py,默认监听8080端口

python chat.py [port]

在浏览器访问,

http://127.0.0.1:8080/chat?user=user1,user2&topic=19.5Release

即可以创建一个聊天会话,并且返回join url,点击这个URL会尝试打开Skype应用。注意这个会话默认是开放的,允许任何人加入

https://join.skype.com/LRRUuan7kNH3

去掉logging注释,可以看到API调用的过程,作者也作了详细的协议文档,可以看出登录流程相当复杂,也可以根据这个开放出其他语言的SDK。
注意这个bot运行的是个人账号,使用的是与web.skype.com相同的HTTP API,最好是在Skype 开发者平台上注册,官方也提供了NodeJS的SDK
之前许多QQ机器人使用都是Web QQ的接口,目前已关闭。同时官方的API,发现并没有创建任意群聊的API。对比国外软件,国内的API真的是不开放。国外公司甚至有专门的API Platform团队,负责API开发开放,以及与第三方平台的集成。

参考连接:
SkPy

Django + Celery处理异步任务/计划任务

在Web开发的过程中,难免有一些操作是耗时任务,有时候我们不想阻塞Web进程,希望提交到后台处理,有结果了再通知前端;有时候用户不需要实时结果,可以异步处理的,比如发送邮件通知;有时候我们希望定时执行某些任务,复用已有的一些Web代码。对于第一种情况,可以是RPC或调用其他接口处理;第二种情况则是放入队列,由后台进程异步处理;第三种情况可以是在定时任务内处理,也可以是触发消息放入队列,由后台进程任务处理,同第二种情况。对于第一种情况,也可以是放入队列由后台进程处理,Web前端,定时轮询队列/接口是否有结果。这样三种情况都可以统一为一种情况,即不同来源事件(用户/定时器)触发消息,放入队列由后台任务进程异步处理,异步查询结果。
之前的一个Python项目即类似。不管代码是PHP还是Python思想都是类似,只是不同语言有不同工具/优势来处理。Web前端展示统计报表,后台进程定时查询Impala,分析统计数据;Web前端也可以触发事件比如统计、发送邮件报告等,后台进程监听并处理。Web前端选择Django,是Python写的一个MVT框架,提供了登录认证、管理后台模块,还有命令行助手工具可以生成项目,易于快速搭建并扩展Web站点。后台任务处理选择Celery,是Python写的一个分布式的任务处理平台,支持各种消息队列,比如RabbbitMQ、Redis,并提供Flower监控工具。Django本身是一个Web框架,生产环境最好使用性能更好的HTTP服务器,Gunicorn是Python写的一个WSGI HTTP服务器,用来监听HTTP请求,调用Django。Gunicorn前端最好使用Nginx来做代理,分发请求,结构如下:

安装Python 3和Django,并生成项目框架代码。注意:由于机器上面本来就有Pyton 2.7,所以要重命名下。

yum -y update
yum -y install yum-utils
yum -y groupinstall development
yum install zlib-devel bzip2-devel openssl-devel ncurese-devel expat-devel gdbm-devel readline-devel sqlite-devel libffi-devel
wget https://www.python.org/ftp/python/3.6.2/Python-3.6.2.tar.xz
tar Jxvf Python-3.6.2.tar.xz
mv Python-3.6.2 /opt/Python-3.6.2
cd /opt/Python-3.6.2
./configure --enable-shared --prefix=/usr/local/python3
make && make install
vim /etc/ld.so.conf
/usr/local/python3/lib/
/sbin/ldconfig -v
ln -s /usr/local/python3/bin/python3.6 /usr/bin/python3.6
ln -s /usr/bin/python3.6 /usr/bin/python3
#ln -s /usr/local/python3/bin/python3.6 /usr/bin/python
ln -s /usr/local/python3/bin/pip3 /usr/bin/pip3
python3 -V
pip3 -V
pip3 install Django
python3 -m django --version
ln -s /usr/local/python3/bin/django-admin /usr/bin/django-admin/
cd /home/rc
django-admin startproject qrd
cd qrd
python3 manage.py startapp master
vim qrd/settings.py

编辑settings.py,监听相应域名、IP的请求

ALLOWED_HOSTS = ['dev.example.com','localhost','127.0.0.1','10.1.*.*]
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'master',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
]

测试一下

python3 manage.py runserver 0:80

然后安装Gunicorn

$ pip3 install gunicorn
$ pip3 install greenlet
$ pip3 install ConfigParser
$ ln -s /usr/local/python3/bin/gunicorn /usr/bin/gunicorn

测试运行

$ cd ~/qrd/qrd
$ gunicorn -w 4 qrd.wsgi --bind  unix:/tmp/gunicorn.sock
$ sudo vim /etc/systemd/system/gunicorn.service

创建Gunicorn服务

[Unit]
Description=gunicorn daemon
After=network.target
[Service]
User=root
Group=root
WorkingDirectory=/home/rc/qrd/qrd
ExecStart=/usr/bin/gunicorn --access-logfile - --workers 3 --bind unix:/tmp/gunicorn.sock qrd.wsgi
[Install]
WantedBy=multi-user.target

开机启动

systemctl start gunicorn
systemctl enable gunicorn
#systemctl stop gunicorn

接下来安装Nginx

yum install nginx
vim /etc/nginx/conf.d/default.conf

配置Nginx与Gunicorn通过UNIX Sockect通信

server {
    listen       80;
    server_name  localhost;
    #charset koi8-r;
    #access_log  /var/log/nginx/host.access.log  main;
    #location / {
    #   root   /usr/share/nginx/html;
    #   index  index.html index.htm;
    #
    location / {
        proxy_pass http://unix:/tmp/gunicorn.sock;
        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }
}

启动Nginx

systemctl start nginx
systemctl enable nginx

安装MySQL及RabbitMQ

yum install mysql-community-devel
pip3 install mysqlclient


yum install rabbitmq-server
rabbitmq-server -detached
rabbitmqctl status

创建RabbitMQ vhost

$ rabbitmqctl add_user qrdweb <password>
Creating user "qrdweb" ...
...done.
$ rabbitmqctl add_vhost qrdweb
Creating vhost "qrdweb" ...
...done.
$ rabbitmqctl set_user_tags qrdweb management
Setting tags for user "qrdweb" to [management] ...
...done.
$ rabbitmqctl set_permissions -p qrdweb qrdweb ".*" ".*" ".*"
Setting permissions for user "qrdweb" in vhost "qrdweb" ...
...done.
$ netstat -apn | grep rabbitmq
$ rabbitmqctl status

安装Celery

pip3 install celery
ln -s /usr/local/python3/bin/celery /usr/bin/celery

测试Celery的异步任务worker及计划任务schedule

$ cd /home/qrd/qrd/
$ ls
dashboard  db.sqlite3  manage.py  master  qrd  static
$ celery -A qrd worker -l info
/usr/local/python3/lib/python3.6/site-packages/celery/platforms.py:795: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the -u option.
User information: uid=0 euid=0 gid=0 egid=0
  uid=uid, euid=euid, gid=gid, egid=egid,
 -------------- celery@localhost.localdomain v4.1.0 (latentcall)
---- **** -----
--- * ***  * -- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2017-09-22 08:19:39
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         qrd:0x7fda62e705c0
- ** ---------- .> transport:   amqp://qrdweb:**@localhost:5672/qrdweb
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
 
[tasks]
  . dashboard.tasks.debug_task
[2017-09-22 08:19:39,769: INFO/MainProcess] Connected to amqp://qrdweb:**@127.0.0.1:5672/qrdweb
[2017-09-22 08:19:39,781: INFO/MainProcess] mingle: searching for neighbors
[2017-09-22 08:19:40,811: INFO/MainProcess] mingle: all alone
[2017-09-22 08:19:40,860: WARNING/MainProcess] /usr/local/python3/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2017-09-22 08:19:40,860: INFO/MainProcess] celery@localhost.localdomain ready.
[2017-09-22 08:20:55,023: INFO/MainProcess] Received task: dashboard.tasks.debug_task[71e6c0e1-92e1-494e-b5e9-163eeb7bd24e]
[2017-09-22 08:20:55,027: INFO/ForkPoolWorker-1] Task dashboard.tasks.debug_task[71e6c0e1-92e1-494e-b5e9-163eeb7bd24e] succeeded in 0.001253978000022471s: 'debug_task'
[2017-09-22 08:22:21,179: INFO/MainProcess] Received task: dashboard.tasks.debug_task[b81fe9a0-1725-4702-ba0e-13196c9b5977]
[2017-09-22 08:22:21,180: INFO/ForkPoolWorker-1] Task dashboard.tasks.debug_task[b81fe9a0-1725-4702-ba0e-13196c9b5977] succeeded in 0.00018433199147693813s: 'debug_task'
 
 
$ celery -A qrd beat -l info -s /tmp/celerybeat-schedule
celery beat v4.1.0 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2017-09-24 04:20:37
Configuration ->
    . broker -> amqp://qrdweb:**@localhost:5672/qrdweb
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> /tmp/celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2017-09-24 04:20:37,823: INFO/MainProcess] beat: Starting...
[2017-09-24 04:20:37,866: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)
[2017-09-24 04:20:47,856: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)
[2017-09-24 04:20:57,858: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)
[2017-09-24 04:20:57,861: INFO/MainProcess] Scheduler: Sending due task qrd.celery.test('world') (qrd.celery.test)
[2017-09-24 04:21:07,858: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)
[2017-09-24 04:21:17,859: INFO/MainProcess] Scheduler: Sending due task add every 10 (qrd.celery.test)

运行成功,可以使用Supervisord来守护监控Celery的运行,参考这里
Django项目结果如下

先配置Celery使用RabbitMQ作为Broker,使用Django DB来保存调用结果settings.py

import os
from configparser import RawConfigParser

#https://code.djangoproject.com/wiki/SplitSettings
config = RawConfigParser()
config.read('/home/qrd/setting/settings.ini')

STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(BASE_DIR, 'static')

CELERY_BROKER_URL = 'amqp://usr:pwd@localhost:5672/qrdweb'
CELERY_RESULT_BACKEND = 'django-db'

然后在Django项目下创建celery.py文件

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'qrd.settings')

app = Celery('qrd')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


app.conf.beat_schedule = {
    'hue-tasks-debug_task': {
        'task': 'hue.tasks.debug_task',
        'schedule': 10.0,
        'args': ()
    },
}

并且在__init__.py引入Celery即可集成

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

Django的异步任务只能定义在各个app的task.py文件里,比如qrd.hue.tasks定义了一个定时任务

from celery import task

@task
def debug_task():
    #print(arg)
    return 'debug_task'


也可以在其模块里面调用

from tasks import debug_task

def save(data):
    debug_task.delay()

顺便推荐一个Bootstrap管理后台模板:gentelella

参考链接:
异步任务神器 Celery
Django配置celery执行异步任务和定时任务
淺談 Gunicorn 各個 worker type 適合的情境

Python使用Kerberos认证查询Impala

最近做QoS报告,数据来源于Impala,客户端认证采用的是Kerberos。
Impala是Cloudera公司开发并开源的一款基于HDFS/Hbase的MPP SQL引擎,它提供SQL语义,能够查询存储在Hadoop的HDFS和HBase中的PB级大数据。Kerberous本身是一个网络认证授权协议,借由中心服务器认证,对通信双方的客户端/服务端进行授权而不需要传递双方的密码。Kerberos的认证流程比较有意思,分为三个阶段

  • 客户端认证
    • 1 客户端发送自己用户名
    • 2 认证服务器返回使用客户端密钥加密的Client/TGS会话密钥和使用票据授权服务器密钥加密的TGT, 包括sessions key,用户信息及有效期
    • 3 客户端使用自己的密钥解密出Client/TGS会话密钥
  • 服务授权
    • 1 客户端发送两条消息:接收到的TGT和所请求的服务ID;使用Client/TGS会话密钥加密的用户ID和时间戳
    • 2 票据授权服务器使用自己的密钥解密TGT得到客户端的Client/TGS会话密钥,然后使用它解密出用户ID并进行认证。返回使用所请求服务端密钥加密的client-server票据和使用Client/TGS会话密钥加密的Client/Server会话密钥
    • 3 客户端使用Client/TGS会话密钥(Client/TGS Session Key)解密出Client/Server会话密钥
  • 服务请求
    • 1 客户端发送两条消息:使用所请求服务端密钥加密的client-server票据及使用Client/Server会话密钥加密的用户ID和时间戳
    • 2 服务端使用自己的密钥解密client-server票据从而得到Client/Server会话密钥,使用该密钥解密获得用户信息并认证。返回使用Client/Server会话密钥的新时间戳
    • 3 客户端使用Client/Server会话密钥解密该消息,认证结束并请求服务
    • 4 服务端提供服务

在CentOS上安装Kerberos

yum install krb5-devel pam_krb5 krb5-libs krb5-workstation

编辑配置

vim /etc/krb5.conf

配置KDC,认证服务器

[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = EXAMPLE.COM
dns_lookup_realm = false
dns_lookup_kdc = true
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
default_tkt_enctypes = rc4-hmac
default_tgs_enctypes = rc4-hmac
permitted_enctypes = rc4-hmac
[realms]
EXAMPLE.COM = {
default_domain = example.com
kdc = kdc01.example.com
kdc = kdc02.example.com
admin_server = adc01.example.com
admin_server = adc02.example.com
}
[domain_realm]
.example.com = EXAMPLE.COM
example.com = EXAMPLE.COM

测试一下

[root@localhost rc]# kinit abc.xyz@EXAMPLE.COM
Password for abc.xyz@EXAMPLE.COM:

注意这个配置文件每行前面的空格被删掉,是因为在VirtualBox里面每行开头有莫名其妙的乱码,Linux下并不可见,在EditPlus下面才发现,否则会乱报错

kinit: Improper format of Kerberos configuration file while initializing Kerberos 5 library
kinit: Cannot find KDC for realm "EXAMPLE.COM" while getting initial credentials

查看一下认证的ticket

[root@localhost vagrant]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: abc.xyz@EXAMPLE.COM

Valid starting       Expires              Service principal
09/21/2017 08:30:50  09/21/2017 18:30:50  krbtgt/EXAMPLE.COM@EXAMPLE.COM
        renew until 09/28/2017 08:30:42

这个ticket在28号就会过期了,到时候又要输入密码,这样也不便于自动化程序使用。可以使用ktutil创建keytab文件

$ ktutil
ktutil:  addent -password -p abc.xyz@EXAMPLE.COM -k 1 -e RC4-HMAC
Password for abc.xyz@EXAMPLE.COM:
ktutil:  wkt abc.xyz.keytab
ktutil:  q
$ ls
abc.xyz.keytab

测试一下

$ kinit -kt abc.xyz.keytab abc.xyz@EXAMPLE.COM
$ klist -k abc.xyz.keytab
Keytab name: FILE:abc.xyz.keytab
KVNO Principal
---- --------------------------------------------------------------------------
  1 abc.xyz@EXAMPLE.COM

之后便可以使用kinit自动更新ticket了。注意,如果更换了密码,需要重新生成新的keytab。
另外,相同用户生成的授权ticket在任意一台机器上都是相同的, kinit时会自动同步回来的。
公司的大数据平台使用Hue来提供基于web界面的查询,Impala也支持使用ODBC方式查询。在Python里使用的是impyla来查询,首先安装sasl的依赖

yum install libgsasl-devel cyrus-sasl-devel cyrus-sasl-gssapi
pip install impyla thrift_sasl

测试脚本

from impala.dbapi import connect
conn = connect(host="impalad.example.com", port=21050, auth_mechanism='GSSAPI', kerberos_service_name='impala', database='acme')
cur =  conn.cursor()
cur.execute(r'SELECT * FROM acme WHERE dt="2017-09-12" LIMIT 5')
print(cur.fetchall())

运行下

python test.py

如下报错,则是服务器不能连接,检查一下网络,DNS/hosts及VPN

thriftpy.transport.TTransportException: TTransportException(type=1, message="Could not connect to ('impalad.example.com', 21050)")

如下报错,CentOS则是需要cyrus-sasl-gssapi模块

thriftpy.transport.TTransportException: TTransportException(type=1, message="Could not start SASL: b'Error in sasl_client_start (-4) SASL(-4): no mechanism available: No worthy mechs found'")

参考链接:
Impala:新一代开源大数据分析引擎
大数据时代快速SQL引擎-Impala
CDH 5.2中Impala认证集成LDAP和Kerberos
Kerberos
Configuring Kerberos Authentication for Windows
Speaking Kerberos with KNIME Big Data Extensions

使用Supervisor 管理监控进程

Supervisor是用Python写的一款应用程监控管理工具,能够启动,停止,重启死进程,提供web管理界面,XML-RPC接口及事件监听。通常我们写了一些脚本都不会带有daemon功能,而是加&或者nohub,screen什么的丢到后台去运行,同时使用corntab定时检测脚本是否存活,以便重新运行脚本。使用Supervisor可以将这些脚本,程序转为守护进程,自动重启它们;还可以监控机器的进程运行状况,输出警报等。
Supervisor只能运行于Python 2.x的环境,但子进程可以为其他任意程序,比如Python 3,PHP等。这里使用pip来安装

$ wget https://bootstrap.pypa.io/get-pip.py
$ python -V
$ sudo python get-pip.py
$ sudo pip install supervisor

生成配置文件及日志目录

$ sudo echo_supervisord_conf > /etc/supervisord.conf
$ mkdir /var/log/supervisor
$ chmod 655 /var/log/supervisor

启动supervisord

$ sudo supervisord -c /etc/supervisord.conf
$ supervisorctl
$ Server requires authentication
$ Username:user
$ Password:

$ supervisor> status
$ supervisor> help

default commands (type help <topic>):
=====================================
add    exit      open  reload  restart   start   tail
avail  fg        pid   remove  shutdown  status  update
clear  maintail  quit  reread  signal    stop    version
$ supervisor> quit

这里没有任何进程。以下为常用命令:

  • supervisorctl stop program 停止某个进程
  • supervisorctl start program 启动某个进程
  • supervisorctl restart program 重启某个进程
  • supervisorctl stop group 重启属于group分组的所有进程(start,restart同理)
  • supervisorctl stop all 停止全部进程,注:start、restart、stop都不会载入最新的配置文件
  • supervisorctl reload 载入最新配置文件,停止原有进程并按新配置启动进程
  • supervisorctl update 根据最新配置文件,启动新配置或有改动的进程,没有改动的进程不受影响

编辑supervisord.conf启用web界面,账号密码为web及supervisorctl共用,必须更改

$ sudo vim /etc/supervisord.conf
#取消以下行的注释
[inet_http_server]         ; inet (TCP) server disabled by default
port=*:9002           ; ip_address:port specifier, *:port for all iface
username=user              ; default is no username (open server)
password=123               ; default is no password (open server)

#添加新应用qrd
[program:qrd]
command = /usr/bin/gunicorn --access-logfile - --workers 3 --bind 127.0.0.1:8000 qrd.wsgi
directory = /home/qrd/qrd/
user = root
autostart = true
autorestart = true
startsecs = 5
startretries = 3
stdout_logfile = /var/log/supervisor/qrd.log
stderr_logfile = /var/log/supervisor/qrd.log

重启使HTTP配置生效

ps aux | grep supervisord 
kill 
supervisord -c /etc/supervisord.conf

关于supervisord.conf配置项,命令释义可以参考这里

  • command 为要运行的脚本或程序
  • directory 为脚本或程序运行时的工作目录
  • user 为脚本或程序运行时的用户
  • autostart 随supervisord启动
  • startsecs 启动后等待时间,过了这个时间没起来就重新启动
  • startretries 启动失败后重试的次数
  • stdout_logfile,stderr_logfile 为输出日志

重新加载所有应用

$ supervisorctl
Server requires authentication
Username:user
Password:

supervisor> reload
Really restart the remote supervisord process y/N? y
Restarted supervisord
supervisor> status
qrd                              RUNNING   pid 3861, uptime 0:00:22

可以看到定义的qrd程序已经起来了。如果qrd程序意外退出了,那么supervisord将会重启它。如果杀掉了supervisord,那么qrd对应的进程也将被杀死。也可以去web界面查看http://127.0.0.1:9002/

可以通过web页面来启动,停止进程,查看日志等。
再增加下Celery的Worker,Beat配置

; ==================================
;  celery worker supervisor example
; ==================================

[program:qrdworker]
; Set full path to celery program if using virtualenv
command=/usr/bin/celery worker -A qrd --loglevel=INFO

; Alternatively,
;command=celery --app=your_app.celery:app worker --loglevel=INFO -n worker.%%h
; Or run a script
;command=celery.sh

directory=/home/qrd/qrd/
user=nobody
numprocs=1
stdout_logfile=/var/log/supervisor/qrdworker.log
stderr_logfile=/var/log/supervisor/qrdworker.log
autostart=true
autorestart=true
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; Causes supervisor to send the termination signal (SIGTERM) to the whole process group.
stopasgroup=true

; Set Celery priority higher than default (999)
; so, if rabbitmq is supervised, it will start first.
priority=1000

; ================================
;  celery beat supervisor example
; ================================

[program:qrdbeat]
; Set full path to celery program if using virtualenv
command=/usr/bin/celery -A qrd beat -l info -s /tmp/celerybeat-schedule

; remove the -A myapp argument if you aren't using an app instance

directory=/home/qrd/qrd/
user=nobody
numprocs=1
stdout_logfile=/var/log/supervisor/beat.log
stderr_logfile=/var/log/supervisor/beat.log
autostart=true
autorestart=true
startsecs=10

; Causes supervisor to send the termination signal (SIGTERM) to the whole process group.
stopasgroup=true

; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=999

然后启用它们

supervisor> update
qrdbeat: added process group
supervisor> status
qrd                              RUNNING   pid 4468, uptime 0:03:49
qrdbeat                          BACKOFF   Exited too quickly (process log may have details)
qrdworker                        RUNNING   pid 4469, uptime 0:03:49

查看下进程

$ sudo ps aux | grep python
root      1038  0.0  3.1 562392 15720 ?        Ssl  01:49   0:01 /usr/bin/python -Es /usr/sbin/tuned -l -P
root      3992  0.1  3.0 222124 15224 ?        Ss   03:49   0:00 /bin/python /bin/supervisord -c /etc/supervisord.conf
root      3993  0.3  3.8 211868 19296 ?        S    03:49   0:00 /usr/local/python3/bin/python3.6 /usr/bin/gunicorn --access-logfile - --workers 3 --bind 127.0.0.1:8000 qrd.wsgi
root      3996  0.3  6.9 264048 34780 ?        S    03:49   0:00 /usr/local/python3/bin/python3.6 /usr/bin/gunicorn --access-logfile - --workers 3 --bind 127.0.0.1:8000 qrd.wsgi
root      3998  0.3  6.9 264056 34784 ?        S    03:49   0:00 /usr/local/python3/bin/python3.6 /usr/bin/gunicorn --access-logfile - --workers 3 --bind 127.0.0.1:8000 qrd.wsgi
root      3999  0.4  6.9 264024 34784 ?        S    03:49   0:00 /usr/local/python3/bin/python3.6 /usr/bin/gunicorn --access-logfile - --workers 3 --bind 127.0.0.1:8000 qrd.wsgi
root      4014  0.0  0.1 112660   976 pts/0    R+   03:50   0:00 grep --color=auto python

除了program标签,Supervisor也支持group标签,用来启动一系列的program,比如先启动数据库,再启动web服务器。
Supervisor也可以用来管理PHP进程,比如使用 Supervisor 管理 Laravel 队列进程
Docker官方推荐一个docker容器只运行一个服务,如果你想启动多个脚本或程序,可以使用Supervisor会更简单点
如果需要Supervisor开启启动,可以使用GitHub上的脚本和配置。CentOS 7上可以配置为service,由systemctl来管理:

$ sudo vim /etc/systemd/system/supervisord.service

内容如下:

# supervisord service for systemd (CentOS 7.0+)
# by ET-CS (https://github.com/ET-CS)
[Unit]
Description=Supervisor daemon

[Service]
Type=forking
ExecStart=/usr/bin/supervisord
ExecStop=/usr/bin/supervisorctl $OPTIONS shutdown
ExecReload=/usr/bin/supervisorctl $OPTIONS reload
KillMode=process
Restart=on-failure
RestartSec=42s

[Install]
WantedBy=multi-user.target

然后启用

$ sudo systemctl enable supervisord.service
Created symlink from /etc/systemd/system/multi-user.target.wants/supervisord.service to /etc/systemd/system/supervisord.service.
$ sudo systemctl start supervisord.service
$ sudo ps aux | grep python

虽然supervisorctl及web界面都只能管理本机,但是Supervisor提供了XML-RPC接口,可以获取进程运行信息,因此诞生了许多监控平台,以便监控服务器集群的进程的运行状况。
Supervisor还提供了event listener配置,在这里Supervisor将会通知脚本其他进程的运行状态变更的事件,可以用来发送警报监控等。

参考链接:
使用Supervisor3.2.1基于Mac10.10.3对系统进程进行管理
supervisor的配置浅析
Python 进程管理工具 Supervisor 使用教程
Supervisor进程监护使用指南
supervisord 的 XML-RPC API 使用说明
Supervisor Event Listener
Dockerizing Nginx and SSH using Supervisord

PHP MongoDB Replica Set应用

MongoDB是个面向文档管理的NoSQL数据库,能够直接存取JSON数据,支持海量数据。公司内部的一个单点登录系统使用MongoDB来存储用户的session,以便在不同应用,服务器之间共享登录信息,解决了服务器切换用户状态丢失(被登出)的问题。单点登录系统前端采用PHP,与LDAP交互认证用户信息,提供登录界面,API等。MongoDB则采用Replica Set模式以便支持高可用。
在CentOS 6.5上安装MongoDB,首先添加源仓库

$ suod vim /etc/yum.repos.d/mongodb.repo
[mongodb]
name=MongoDB Repository
baseurl=http://downloads-distro.mongodb.org/repo/redhat/os/x86_64/
gpgcheck=0
enabled=0

安装,启动,连接MongoDB

$ sudo yum --enablerepo=mongodb install mongodb-org
$ sudo /sbin/chkconfig --levels 235 mongod on
$ sudo mongod --port 27017 --dbpath /data/db1
$ mongo --port 27017

创建用户SSOReplUser

> use admin 
> db.createUser( { user: "SSOReplUser", pwd: "<password>", roles: [ { role: "root", db: "admin" } ] });

创建集群要用到的key

$ sudo openssl rand -base64 741 > /home/sso/mongodb-keyfile 
$ sudo chmod 600 /home/sso/mongodb-keyfile

编辑MongoDB配置文件

$ sudo vim /etc/mongod.conf
# Replication Options 
# in replicated mongo databases, specify the replica set name here 
replSet=SSOReplSet 
# maximum size in megabytes for replication operation log 
#oplogSize=1024 
# path to a key file storing authentication info for connections # between replica set members keyFile=/home/sso/mongodb-keyfile

重启mongod服务

$ sudo /etc/init.d/mongod stop   
$ sudo /usr/bin/mongod -f /etc/mongod.conf

使用SSOReplUser登录

> use admin 
> db.auth("SSOReplUser", "<password>");

初始化集群

> rs.initiate()
> rs.conf()   
{
  "_id": "SSOReplSet",
  "version": 1,
  "members": [
    {
      "_id": 1,
      "host": "192.168.33.10:27017"
    }
  ]
}

在其他机器上按照上面步骤安装MongoDB,更改配置,复制对应的mongodb-keyfile并启动。
回到192.168.33.10并添加机器

SSOReplSet:PRIMARY> rs.add("192.168.33.11:27017") 
SSOReplSet:PRIMARY> rs.add("192.168.33.12:27017") 
SSOReplSet:PRIMARY> rs.add("192.168.33.13:27017")
#SSOReplSet:PRIMARY> rs.remove("192.168.33.13:27017")

在从机上认证查看

SSOReplSet:SECONDARY> use admin 
SSOReplSet:SECONDARY> db.auth("SSOReplUser", "<password>");   
SSOReplSet:SECONDARY> rs.status()

MongoDB的Replica set模式最少要求3台机器,以便在PRIMARY机器故障时,能够自我选举出新的PRIMARY,但需要n/2+1的机器投票同意。例如刚才配置了4台机器,那么需要4/2+1=3台机器投票,能够承受一台机器故障。如果是集群有5台机器,则能够承受2台机器故障。因此再配置一台不存储数据的Arbiter机器比较合理。
按照上面的步骤安装,复制mongodb-keyfile文件,但需要更改mongod.conf

$sudo vim /etc/mongod.conf
nojournal=true

然后启动mongod服务并在PRIMARY机器上将这台机器加入机器

SSOReplSet:PRIMARY> use admin   
SSOReplSet:PRIMARY> db.auth("SSOReplUser", "<password>");   
SSOReplSet:PRIMARY> rs.addArb("<ip of arbiter>");

PHP的mongo扩展安装比较简单,下载对应版本,启用即可。然而在应用过程中发现登录有点慢,启用看PHP mongo扩展profiling功能,查看PHP日志

<?php
\MongoLog::setLevel(\MongoLog::ALL);
\MongoLog::setModule(\MongoLog::ALL);
try{
    echo  microtime(true) . PHP_EOL;
    echo "aaa01(primary)" . PHP_EOL;
    $m = new \MongoClient("mongodb://admin:XXXXXXXXX@192.168.33.10:27017/?replicaSet=SSOReplSet ");
    echo  microtime(true) . PHP_EOL;
    echo "aaa01(primay), bbb01(secondary),ccc01(secondary),ddd01(secondary)" . PHP_EOL;
    $m = new \MongoClient("mongodb://admin:XXXXXXXXX@192.168.33.10:27017,192.168.33.11:27017,192.168.33.12:27017,192.168.33.13:27017/?replicaSet=SSOReplSet ");
    echo  microtime(true) . PHP_EOL;
} catch (\MongoConnectionException $e) {
    var_dump($e);
}

发现PHP连接MongoDB集群是这样子的

即:

  • 1)在连接里面配置了几个MongoDB连接服务器,PHP每个都会创建连接去查询
  • 2)从每台服务器上查询出整个集群的服务器列表,再分别ping和连接这些服务器,如果连接不存在或不匹配则创建,无效的则销毁
  • 3)汇总所有服务器返回集群列表
  • 4)选择离自己最近的服务器并使用该与该服务器的连接
  • MongoDB 的写操作默认在Primary节点上操作完成即返回成功;读操作默认是也是从Primary上读取,所以需要去查询服务器。由于SSO应用部署在多个数据中心,网络抖动会造成较大影响,跨数据中心的查询并不会很快,如果每次都去连接并查询整个列表是比较耗时。另外如果在php里面配置的是IP而MongoDB Replica Set里面配置的是域名,则连接名会出现不匹配,而创建新的连接并销毁旧连接,也耗时。如果配置的是域名,则需要DNS解析。由于每台PHP服务器均已配置HA检测,最终每个应用只配置了一台服务器,并统一配置MongoDB集群为IP。而连接最快的是在Primary机器上,可以根据本机是否Primary来做HA:

    #!/usr/bin/env bash
    count=`ps -fe |grep "mongod" | grep -v "grep" | wc -l`
    FILE="/home/scripts-bits/master.conf"
    SERVER=`hostname`
    
    if [ $count -lt 1 ]; then
        rm -f $FILE
    else
        PRIMAY=`/usr/bin/mongo ${SERVER}:27017 --quiet --eval 'printjson(db.isMaster().ismaster);'`
        if [ "$PRIMAY" == "true" ]; then
        	if [ ! -f "$FILE" ]; then
        		touch "$FILE"
        	fi
        	REMOVE=`/usr/bin/mongo ${SERVER}:27017/admin --quiet /home/scripts-bits/mongo_status.js`
        fi
    fi
    

    删除故障节点(not reachable/healthy)的脚本mongo_status.js:

    db.auth('admin','<password>');
    conf=rs.status();
    members=conf["members"];
    for(i in members){
    	if(members[i]["state"] == 8){
    		rs.remove(members[i]["name"]);
    	}
    }
    

    这中间也出现过因为网络/机器故障,导致PHP等待连接超时的情况的,将该机器从集群中移除即可。然而当服务器是启动的(可以到达)情况下,MongoDB故障,则不会超时。可以更改connectTimeoutMS,以便减少等待。
    MongoDB的日志默认都是记录在/var/log/mongodb/下面并且会越来越大。创建Python脚本来定时清除它:

    #!/bin/env python
    import commands
    import datetime,time
    
    def rotate_log(path, expire = 30):
        str_now = time.strftime("%Y-%m-%d")
        dat_now = time.strptime(str_now, "%Y-%m-%d")
        array_dat_now = datetime.datetime(dat_now[0], dat_now[1], dat_now[2])
        lns = commands.getoutput("/bin/ls --full-time %s|awk '{print $6, $9}'" % path)
        for ln in lns.split('\n'):
            ws = ln.split()
            if len(ws) != 2:
                continue
            ws1 = time.strptime(ws[0], "%Y-%m-%d")
            ws2 = datetime.datetime(ws1[0], ws1[1], ws1[2])
            if (array_dat_now - ws2).days > expire:
                v_del = commands.getoutput("/bin/rm -rf %s/%s" % (path, ws[1]))
    
    
    def rotate_mongo():
        # get mongo pid
        mongo_pid = commands.getoutput("/sbin/pidof mongod")
        #print mongo_pid
        # send Sig to mongo
        if mongo_pid != '':
            cmd = "/bin/kill -USR1 %s" % (mongo_pid)
            # print cmd
            mongo_rotate = commands.getoutput(cmd)
        else:
            print "mongod is not running..."
    
    if __name__ == "__main__":
        log_path = "/var/log/mongodb/"
        expire = 30
        rotate_mongo()
        rotate_log(log_path, expire)
    
    

    加入到crontab里面去执行

    10 1 * * * /usr/bin/python /home/sso/mongo_rotate.py > /dev/null 2>&1
    

    参考连接
    mongodb与mysql相比的优缺点
    PHP MongoDB 复制集合
    MongoDB Replication
    MongoDB Enable Auth
    MongoDB Rotate Log Files
    Blocking connect() leads to cumulative timeouts for multiple inaccessible servers
    How Raft consensus algorithm will make replication even better in MongoDB 3.2
    Write Concern for Replica Sets
    MongoDB Read Preference