分布式系统

好久以前写的了。在InfoQ上学习了分布式监控系统的设计与实现微信朋友圈技术之道,在架构设计的基础思想并不复杂。
首先是微信朋友圈技术之道,介绍了微信朋友圈团队的开发,团队仅4个人,因为他们站在巨人的肩膀上:
基础环境
全部才用C++开发
混合部署普通服务器
海量带宽

强大的基础设施
腾讯CDN,图片和视频上传、存储、分发
RPC框架
Key-Value(KV)存储系统
强大、方便、灵活的部署系统
强大的RPC框架
C++的框架
支持protobuf描述接口
支持进程/线程/协程多种模式:支持数十万的并发协程,方便编写和调试“同步”的网络调用和服务
从CGI到叶子服务器,全系统透明支持过载保护和QOS
为每次CGI调用自动生成全系统调用关系图
每个服务自动内建500多个监控项
高性能Key-value存储系统
三机一组,三机间支持数据一致性,容忍一台机器出错、自动切换
三机分布在一个数据中心的三个独立园区,在任意一个园区提供本区读写服务,容忍一个园区网络隔离

微信架构
接入层,维持长连接,避免重复连接开销;推送消息
逻辑层,注册/登录,消息,朋友圈,LBS;批处理,群聊,通知,好友推荐
存储代理层,账号,消息,关系链,朋友圈,群管理
存储层,Key-value存储
性能水平扩展(sharding)
相册,按照用户做水平扩展
发表,按照发表key做水平扩展
评论,按照评论key做水平扩展
时间线,按照用户做水平扩展

发表流程
上传图片到CDN,查找最近节点
调用朋友圈CGI
添加新发表
在相册增加新发表索引
添加时间线更新任务,批处理,给好友时间线添加新索引
返回发表成功

浏览流程
调用朋友圈CGI
拉取时间线
拉取新发表元数据
拉取CDN上的图片

数据单副本、索引写扩散、检查更新单读取
数据(发表)单副本:减少内存开销
索引写扩散、检查更新单读取:减少检查更新时的读扩散
写比较慢,失败可以重试;读不可等待

赞、评论与浏览
往赞、评论表插入发表索引和对应赞、评论

微信部署、接入与容灾
1地3园区对等部署,对等接入(电信、移动、联通);数据对等分布于同步;对等服务
容灾,任何两个园区都可以提供全量无损服务
数据中心分布:上海、深圳、香港、加拿大
通信优先专线服务;其次公网(加密)
各数据中心可独立提供服务
各数据中心通过idc queue异步同步写入(自动重试)

核心数据读写分析
相册与发表
本地idc写入,单向同步到其他idc
key的全局唯一性保证无冲突(根据idc预先生成)
时间线
本地idc只保存本地用户时间线的key,无需同步

跨洋同步的挑战和应对
挑战:大带宽延迟、丢包、乱序、低可靠性(专心中断)
应对:延迟、丢包、乱序达到一定程度自动从tcp切换到udp;专线中断自动切换到公网(AES加密)

评论、赞写冲突
因果一致性(不同数据中心保证key不重复,比如不同idc取模)

参考链接:
分布式监控系统的设计与实现
微信朋友圈技术之道

MySql 慢日志分析

最近老是碰上MySql报错:1203:User already has more than ‘max_user_connections’ active,之前都没出现过,感觉应该是慢查询导致的。向运维拷贝慢日志分析,慢日志开、启配置参考这里
拷贝出来的日志很大,需要按故障时间点做一下切割,以便缩小排查范围。按照这里提供的cutlogbytime.pl脚本运行却报错

[vagrant@centos64 mysql-log-filter-1.9]$ ./cutlogbytime.pl slow.log 1443103200 1443117600 > yestoday.log
: command not foundline 1:
: command not foundline 4:
./cutlogbytime.pl: line 5: use: command not found
: command not foundline 5:
./cutlogbytime.pl: line 6: use: command not found
: command not foundline 6:
: command not foundline 7:
'/cutlogbytime.pl: line 8: syntax error near unexpected token `{
'/cutlogbytime.pl: line 8: `if (@ARGV<2){

去掉顶行的空格后再运行,还是报错

[vagrant@centos64 mysql-log-filter-1.9]$ ./cutlogbytime.pl slow.log 1443103200 1443117600 > today.log
-bash: ./cutlogbytime.pl: /usr/bin/perl^M: bad interpreter: No such file or directory

最后参考stackoverflow上面的答案更改运行方式为Perl(而不是shell),就可以了。

[vagrant@centos64 mysql-log-filter-1.9]$ perl cutlogbytime.pl slow.log 1443103200 1443117600 > today.log

利用mysqlslowdump(perl脚本)来分析日志,-s参数表示排序方式:r表示影响行数(Rows),t表示耗时(Time),c表示查询次数(Count)

[vagrant@entos64 mysql-log-filter-1.9]$  perl mysqldumpslow.pl -s r -t 10 today4.log

Reading mysql slow query log from today4.log
Count: 1  Time=190.48s (190s)  Lock=0.00s (0s)  Rows=21829854.0 (21829854), xx[xxxx]@[192.168.10.139]
  SELECT /*!N SQL_NO_CACHE */ * FROM `errormessage`

Count: 32791  Time=40.95s (1342865s)  Lock=0.05s (1512s)  Rows=1.0 (32791), xx[xxxx]@10hosts
  select  *  from connectinfo where  ID=N  and AppType=N  ORDER BY CreateDatetime DESC LIMIT N

Count: 3  Time=3.71s (11s)  Lock=0.02s (0s)  Rows=300.0 (900), xx[xxxx]@2hosts
  select SeverName from errormessage where  ID='S'  and ServerType=N  and level=N  and MsgType <= N

第一个语句返回行数21829854,查看具体慢日志,之后需要插入这张表的进程均处于等待状态。

# Time: 150924  1:03:12
# User@Host: xx[xxxx] @  [192.168.10.139]  Id: 1493761
# Query_time: 190.479062  Lock_time: 0.000000 Rows_sent: 21829854  Rows_examined: 21829854
SET timestamp=1443027792;
SELECT /*!40001 SQL_NO_CACHE */ * FROM `errormessage`;
# Time: 150924  1:03:14
# User@Host: xx[xxxx] @  [192.168.10.168]  Id: 1498010
# Query_time: 59.669817  Lock_time: 57.159403 Rows_sent: 0  Rows_examined: 0
SET timestamp=1443027794;
insert into errormessage (`ID`,`ServerType`,`MsgType`,`Level`,`dev`,`content`,`EventTime`,`SeverName`) values ( '1217', '3', '4', '4', '827', 'erc:start erc error,songid=46243,keymd5=ee1275b26762e85a7f00e9890bdc092e,ercmd5=abbc3ea9102dbd003b7aa0547dcbf6fa', '2015-09-23 21:49:27', '192.168.15.117');
# User@Host: xx[xxxx] @  [192.168.10.205]  Id: 1494756
# Query_time: 157.211158  Lock_time: 154.673647 Rows_sent: 0  Rows_examined: 0
SET timestamp=1443027794;
insert into errormessage (`ID`,`ServerType`,`MsgType`,`Level`,`dev`,`content`,`EventTime`,`SeverName`) values ( '865', '3', '1', '2', '106', '检测正常!', '2015-09-24 01:01:18', '192.168.0.33');
# User@Host: xx[xxxx] @  [192.168.10.213]  Id: 1496479
# Query_time: 100.733230  Lock_time: 98.210902 Rows_sent: 0  Rows_examined: 0
SET timestamp=1443027794;
insert into errormessage (`ID`,`ServerType`,`MsgType`,`Level`,`dev`,`content`,`EventTime`,`SeverName`) values ( '2472', '3', '2', '4', '809', 'videoseripnoconfig', '2015-09-24 01:02:26', '192.168.0.18');

分析这几天的日志,发现故障时间点附近都是这个语句引起后面的SQL堵塞。原来是每天早上1点开始备份并同步全表数据,锁住了这个表导致后面的所有这个表的insert操作处于等待状态。mysqldump应该使用–single-transaction来避免锁表,类似下面这个

mysqldump –uuser -p --skip-opt -q -R  --single-transaction --default-character-set=utf8 --master-data=2  --create-option --no-autocommit –S ${sock} -B ${DBName}  > backup.sql

但是这样仍然是全表扫描进行备份,如果能够增量备份的话,影响要小很多,或者数据做冷热/新旧之分,定期将新(每天)/热数据转入旧(历史)/冷数据中。后面运维的解决方案是:升级数据库机器从虚拟机变为实体机,配置从机,并从从机进行备份同步。

上面mysqlslowdump使用影响行数来排序,事实上用另外两个类型(时间,次数)分析结果是connectinfo比较频繁,一直以来都认为是这个表的操作引起的。这里还尝试了其他工具来分析,使用mysqlsla.pl进行分析,相关参数选项参考这里

[vagrant@centos64 mysql-log-filter-1.9]$  perl mysqlsla.pl today.log
Auto-detected logs as slow logs
Report for slow logs: today4.log
60.57k queries total, 17 unique
Sorted by 't_sum'
Grand Totals: Time 5.38M s, Lock 3.22M s, Rows sent 21.86M, Rows Examined 184.46M


______________________________________________________________________ 001 ___
Count         : 25.59k  (42.24%)
Time          : 3905525.574451 s total, 152.643069 s avg, 113.07488 s to 2720.338946 s max  (72.64%)
  95% of Time : 3260112.482495 s total, 134.12789 s avg, 113.07488 s to 282.366041 s max
Lock Time (s) : 3168076.975558 s total, 123.820721 s avg, 108.548105 s to 311.639359 s max  (98.45%)
  95% of Lock : 2961933.212121 s total, 121.860167 s avg, 108.548105 s to 123.487106 s max
Rows sent     : 0 avg, 0 to 0 max  (0.00%)
Rows examined : 54 avg, 0 to 4.92k max  (0.75%)
Database      :
Users         :
        xx@ 192.168.10.147 : 10.65% (2724) of query, 10.26% (6215) of all users
        xx@ 192.168.10.209 : 10.33% (2643) of query, 10.16% (6156) of all users
        xx@ 192.168.10.205 : 10.16% (2599) of query, 9.97% (6036) of all users
        xx@ 192.168.10.211 : 10.13% (2591) of query, 9.98% (6042) of all users
        xx@ 192.168.10.207 : 9.93% (2541) of query, 9.95% (6024) of all users
        xx@ 192.168.10.161 : 9.83% (2515) of query, 9.84% (5960) of all users
        xx@ 192.168.10.149 : 9.81% (2510) of query, 9.95% (6028) of all users
        xx@ 192.168.10.215 : 9.76% (2498) of query, 9.85% (5963) of all users
        xx@ 192.168.10.168 : 9.71% (2485) of query, 9.69% (5868) of all users
        xx@ 192.168.10.213 : 9.69% (2480) of query, 9.66% (5851) of all users

Query abstract:
SET timestamp=N; UPDATE connectinfo SET devicetag='S', connectipaddress='S', updatedatetime=now() WHERE ID=N AND apptype=N;

Query sample:
SET timestamp=1443027797;
update connectinfo set DeviceTag='1070A416AF000000', ConnectIPAddress='60.174.116.165', UpdateDatetime=now() where ID=5358 and AppType=0;

______________________________________________________________________ 002 ___
Count         : 32.79k  (54.14%)
Time          : 1344378.871914 s total, 40.99841 s avg, 2.000747 s to 1944.548192 s max  (25.01%)
  95% of Time : 587407.556704 s total, 18.85678 s avg, 2.000747 s to 233.465042 s max
Lock Time (s) : 1512.917798 s total, 46.138 ms avg, 76 ▒s to 114.302 ms max  (0.05%)
  95% of Lock : 1414.978902 s total, 45.423 ms avg, 76 ▒s to 50.514 ms max
Rows sent     : 1 avg, 1 to 1 max  (0.15%)
Rows examined : 4.92k avg, 4.92k to 4.92k max  (87.41%)
Database      :
Users         :
        xx@ 192.168.10.209 : 10.24% (3359) of query, 10.16% (6156) of all users
        xx@ 192.168.10.149 : 10.16% (3331) of query, 9.95% (6028) of all users
        xx@ 192.168.10.147 : 10.11% (3315) of query, 10.26% (6215) of all users
        xx@ 192.168.10.211 : 10.03% (3288) of query, 9.98% (6042) of all users
        xx@ 192.168.10.207 : 10.02% (3285) of query, 9.95% (6024) of all users
        xx@ 192.168.10.161 : 9.97% (3268) of query, 9.84% (5960) of all users
        xx@ 192.168.10.215 : 9.96% (3266) of query, 9.85% (5963) of all users
        xx@ 192.168.10.205 : 9.92% (3254) of query, 9.97% (6036) of all users
        xx@ 192.168.10.168 : 9.86% (3234) of query, 9.69% (5868) of all users
        xx@ 192.168.10.213 : 9.73% (3191) of query, 9.66% (5851) of all users

Query abstract:
SET timestamp=N; SELECT * FROM connectinfo WHERE ID=N AND apptype=N ORDER BY createdatetime DESC LIMIT N;

Query sample:
SET timestamp=1443027795;
select  *  from connectinfo where  ID=7646  and AppType=0  ORDER BY CreateDatetime DESC LIMIT 1;

______________________________________________________________________ 003 ___
Count         : 842  (1.39%)
Time          : 66663.314786 s total, 79.172583 s avg, 2.011408 s to 673.604537 s max  (1.24%)
  95% of Time : 56684.989954 s total, 70.944919 s avg, 2.011408 s to 193.623235 s max
Lock Time (s) : 48221.988255 s total, 57.27077 s avg, 69 ▒s to 185.402303 s max  (1.50%)
  95% of Lock : 40627.196184 s total, 50.847555 s avg, 69 ▒s to 166.67704 s max
Rows sent     : 0 avg, 0 to 0 max  (0.00%)
Rows examined : 0 avg, 0 to 0 max  (0.00%)
Database      :
Users         :
        xx@ 192.168.10.207 : 11.64% (98) of query, 9.95% (6024) of all users
        xx@ 192.168.10.205 : 11.28% (95) of query, 9.97% (6036) of all users
        xx@ 192.168.10.213 : 10.93% (92) of query, 9.66% (5851) of all users
        xx@ 192.168.10.161 : 10.45% (88) of query, 9.84% (5960) of all users
        xx@ 192.168.10.149 : 10.33% (87) of query, 9.95% (6028) of all users
        xx@ 192.168.10.211 : 9.74% (82) of query, 9.98% (6042) of all users
        xx@ 192.168.10.147 : 9.38% (79) of query, 10.26% (6215) of all users
        xx@ 192.168.10.215 : 9.38% (79) of query, 9.85% (5963) of all users
        xx@ 192.168.10.168 : 9.03% (76) of query, 9.69% (5868) of all users
        xx@ 192.168.10.209 : 7.84% (66) of query, 10.16% (6156) of all users

Query abstract:
SET timestamp=N; INSERT INTO errormessage (id,servertype,msgtype,level,dev,content,eventtime,severname) VALUES ( 'S', 'S', 'S', 'S', 'S', 'S', 'S', 'S')1;

使用mysqlsla可以看SQL语句的执行数量/比例,影响行数,用户,占比等。,从这里看很可能认为是connectinfo表(95%以上)引起,SHOW PROCESSLIST也是如此 。
由于这个文件我是单独下载回来的,运行mysqlsla.pl时候碰到很多错误,逐个安装解决了

[vagrant@centos64 mysql-log-filter-1.9]$  perl mysqlsla.pl today.log
Can't locate Time/HiRes.pm in @INC (@INC contains: /usr/local/lib64/perl5 /usr/local/share/perl5 /usr/lib64/perl5/vendor_perl /usr/share/perl5/vendor_perl /usr/lib64/perl5 /usr/share/perl5 .) at mysqlsla.pl line 2092.
BEGIN failed--compilation aborted at mysqlsla.pl line 2092.
#解决方法
[vagrant@centos64 mysql-log-filter-1.9]$ sudo yum install perl-Time-HiRes

[vagrant@centos64 mysql-log-filter-1.9]$ perl -MCPAN -e 'install DBI'
Can't locate CPAN.pm in @INC (@INC contains: /usr/local/lib64/perl5 /usr/local/share/perl5 /usr/lib64/perl5/vendor_perl /usr/share/perl5/vendor_perl /usr/lib64/perl5 /usr/share/perl5 .).
BEGIN failed--compilation aborted.

#解决方法
[vagrant@centos64 mysql-log-filter-1.9]$ sudo yum install perl-DBI

正确的方法是应该检测对应根目录下面的Makefile.PL,比如

[vagrant@centos64 percona-toolkit-2.2.15]$ perl Makefile.PL
#如果报以下错误,需要先安装对应模块,简单点就是
#sudo yum install perl-devel
Can't locate ExtUtils/MakeMaker.pm in @INC (@INC contains: /usr/local/lib64/perl5 /usr/local/share/perl5 /usr/lib64/perl5/vendor_perl /usr/share/perl5/vendor_perl /usr/lib64/perl5 /usr/share/perl5 .) at Makefile.PL line 1.
BEGIN failed--compilation aborted at Makefile.PL line 1.

然后安装对应模块,参考这里
使用Percona公司的工具pt-query-digest来分析也得到了同mysqlsla类似的结果

[vagrant@vagrant-centos64 bin]$ pt-query-digest ../../today4.log

# 9.8s user time, 700ms system time, 21.05M rss, 73.66M vsz
# Current date: Mon Oct  5 05:52:01 2015
# Hostname: vagrant-centos64.vagrantup.com
# Files: ../../today.log
# Overall: 60.57k total, 17 unique, 4.54 QPS, 402.68x concurrency ________
# Time range: 2015-09-23 22:17:29 to 2015-09-24 02:00:00
# Attribute          total     min     max     avg     95%  stddev  median
# ============     ======= ======= ======= ======= ======= ======= =======
# Exec time        5376198s      2s   2720s     89s    258s    118s     57s
# Lock time        3217840s       0    312s     53s    118s     60s    48ms
# Rows sent         20.85M       0  20.82M  361.00    0.99  84.46k    0.99
# Rows examine     175.91M       0  20.82M   2.97k   4.71k  84.48k   4.71k
# Query size         7.85M      64     597  135.90  151.03   27.56  112.70

# Profile
# Rank Query ID           Response time      Calls R/Call   V/M   Item
# ==== ================== ================== ===== ======== ===== ========
#    1 0xF1132168DB0BFC57 3905525.5745 72.6% 25586 152.6431 61.61 UPDATE connectinfo
#    2 0xD4B317E755A0ABD7 1344378.8719 25.0% 32791  40.9984 30... SELECT connectinfo
#    3 0xE23849EE6FB19DAE   66663.3148  1.2%   842  79.1726 62.99 INSERT errormessage
...

# Query 1: 7.52 QPS, 1.15kx concurrency, ID 0xF1132168DB0BFC57 at byte 16243195
# This item is included in the report because it matches --limit.
# Scores: V/M = 61.61
# Time range: 2015-09-24 01:03:17 to 02:00:00
# Attribute    pct   total     min     max     avg     95%  stddev  median
# ============ === ======= ======= ======= ======= ======= ======= =======
# Count         42   25586
# Exec time     72 3905526s    113s   2720s    153s    271s     97s    124s
# Lock time     98 3168077s    109s    312s    124s    118s     14s    118s
# Rows sent      0       0       0       0       0       0       0       0
# Rows examine   0   1.33M       0   4.80k   54.39       0  504.65       0
# Query size    48   3.78M     149     157  154.94  151.03    0.52  151.03
# String:
# Hosts        192.168.10.147 (2724/10%)... 9 more
# Users        gate
# Query_time distribution
#   1us
#  10us
# 100us
#   1ms
#  10ms
# 100ms
#    1s
#  10s+  ################################################################
# Tables
#    SHOW TABLE STATUS LIKE 'connectinfo'\G
#    SHOW CREATE TABLE `connectinfo`\G
update connectinfo set DeviceTag='10705BDDCD000000', ConnectIPAddress='115.231.63.78', UpdateDatetime=now() where ID=6912 and AppType=0\G
# Converted for EXPLAIN
# EXPLAIN /*!50100 PARTITIONS*/
select  DeviceTag='10705BDDCD000000', ConnectIPAddress='115.231.63.78', UpdateDatetime=now() from connectinfo where  ID=6912 and AppType=0\G

PS:这个问题解决后没几天,数据库又出问题,以为又是慢SQL什么的,结果是交换机网口问题。。。数据传输太慢,导致主从不同步。。。

参考链接:
mysql 慢日志分析
慢日志按时间截取
MySQL 之 slow log
MySQL优化—工欲善其事,必先利其器(2)
日志常用统计技巧
性能优化之MySQL优化(一)
mysqlsla的安装与使用
安装DBI组件。 Can’t locate DBI.pm in @INC-mysql接口
Issue 12: Can’t locate Time/HiRes.pm
打开MySQL的慢查询记录
“Can’t locate ExtUtils/MakeMaker.pm” while compile git
analysing slow MySQL queries with pt-query-digest
mysqldump备份原理
mysqldump –single-transaction, yet update queries are waiting for the backup
mysql 利用binlog增量备份,还原实例
MySQl备份恢复策略(完全+增量备份策略)

PHP 处理行结束符

我们平台有个功能支持导入一行一个编号的文本,进行批量处理,类似CSV文件,代码是这样的:

$arrLine = file($strFilePath);
foreach($arrLine as $k => $v){
}

一直用都用的好好的,今天有同事导入却不正常了。用记事本打开要导入的文件来看,发现文件里面的内容都是只有一行;用EditPlus打开来看却又是正常的,不过显示是Mac平台的文件,猜测应该是换行符不一样导致的。Google了一下,发现果然如此,不同操作系统的行结束符是不一样的:

  • Windows\DOS:\r\n <\li>
  • Unix\Linux:\n <\li>
  • Macintosh:\r <\li>

另外,TCP/IP协议传输文本结束符也是\r\n。PHP的文件系统函数file和fgets默认不检测行结束符,所以不能正确处理。可以将这些换行符处理成其他的,再来断行

$strFileContent = str_replace(array("/r/n", "/r", "/n"), ",", $strFileContent); 
$arrLine = explode(',' , $strFileContent);

PHP官网也提供了相应的解决方法,更改php.ini配置,或者ini_set(‘auto_detect_line_endings’, ‘On’)即可。

Note: 在读取在 Macintosh 电脑中或由其创建的文件时, 如果 PHP 不能正确的识别行结束符,启用运行时配置可选项 auto_detect_line_endings 也许可以解决此问题。

auto_detect_line_endings boolean
当设为 On 时,PHP 将检查通过 fgets() 和 file() 取得的数据中的行结束符号是符合 Unix,MS-DOS,还是 Macintosh 的习惯。

这使得 PHP 可以和 Macintosh 系统交互操作,但是默认值是 Off,因为在检测第一行的 EOL 习惯时会有很小的性能损失,而且在 Unix 系统下使用回车符号作为项目分隔符的人们会遭遇向下不兼容的行为。

PHP从4.3开始使用常量PHP_EOL来代替不同平台的换行符,在代码中也应该使用PHP_EOL而不是写\r\n,否则会造成不必要的平台环境问题。

参考链接:
Difference between \n and \r?
what is the difference between windows csv and mac csv?
PHP Fucntion file
PHP auto-detect-line-endings
When do I use the PHP constant “PHP_EOL”?

Git及GitHub 使用

在前面的文章里面,经常会看到使用Git来下载代码,比如

$ git clone https://github.com/zeromq/zeromq4-x.git

Git 是有Linux开发者Linus Torvalds开发的分布式版本控制软件,最初用于Linux内核代码管理,它不需要服务器端软件,就可以进行版本控制,使得离线/本地开发非常方便。与SVN不同,Git基本在本机工作(就像在本机也搭建了一个SVN服务器),具有以下优点:

  • 可以向本地提交代码,当服务器不可用时,仍然可以离线提交代码
  • 可以向本地提交代码,在本地功能开发完成后,再向服务器提交有用的变更,项目代码不会有频繁变更
  • 分布式存储,每个开发者均有一份代码,不用担心服务器上的代码丢失
  • 支持Pull Request,功能完成后通知其他成员进行代码审查

Git提倡使用主干作为稳定可发布的代码,使用分支进行开发,与我们的软件管理思路大致一致:分支开发,分支测试,分支/主干发布。多人协作项目,需要有良好的分支开发意识,以免在主干上互相冲突和影响软件稳定。
通过MINGW32安装Git,在Windows下面也可以使用Git。它提供了在右键菜单里面集成了Git管理的一些操作,也提供了Bash进行交互。Git常用操作包括:

# 获取帮助
$ git --help
# 获取某个git命令的帮助信息,使用默认浏览器查看
$ git help branch

# 初始化当前目录作为git仓库
$ git init
# 复制一个远程仓库到本地
$ git clone <url>

# 切换到主分支master(默认)
$ git checkout master

# 设置个人账号信息,提交代码时会同时提交这些信息
$ git config --global user.name "mystery"
$ git config --global user.email mystery@example.com:

# 提交全部代码变更
$ git add --all
# 仅提交boot.php
$ git add boot.php
# 仅提交路径generator/lib
$ git add generator/lib

# 仅提交路径generator/lib
$ git add generator/lib

# 删除文件 test.php
$ git rm test.php

# 移动文件 test.php
$ git mvtest.php

# 查看当前代码是否有未提交的变更
$ git status
# 比较代码差异
$ git diff
# 确认提交代码并备注
$ git commit -m 'test comment '

# 自动提交所有文件变更,不需要上面的步骤了
$ git commit -a -m 'added new comments'
# 查看提交历史记录
$ git log
# 查看历史记录前2条,并显示变更
$ git log -p -2

# 撤销上一次commit修改
$ git reset HEAD

# 与远程仓库库同步
$ git fetch origin
$ git rebase origin/master

# 多个本地合并commit合并成一个
$ git rebase -i origin/master

# 推送本地更新到远程仓库
$ git push --force origin feature-x

# 在master分支上创建分支develop,-b则表示创建分支并切换到该分支
$ git checkout -b develop master
# 在develop分支上创建feature-x分支
$ git checkout -b feature-x develop

# 列出所有分支
$ git branch
# 创建master分支
$ git branch master
# 删除分支feature-x
$ git branch -d feature-x

# 在mater上面合并feature-x分支代码
$ git checkout master
$ git merge feature-x

# 合并远程仓库代码到本地
$ git pull origin master
$ git fetch origin
$ git merge --no-ff origin/master

# 查看当前镜像
$ git tag
# 为当前仓库创建一个镜像标签
$ git tag -a v1.0 -m 'publish '

在已有项目目录上使用git

git init .
git remote add origin <repository-url>
git pull origin master
git add .
git commit -am "commit message"
git push origin master

Git还提供了一个工具,可以将svn仓库代码导入git仓库,参考这里

GitHub是一个著名的在线代码托管/分享/交流网站,提供免费的代码托管服务,支持分布式版本控制软件Git管理的工程代码,方便交流和协作。在这里可以找到各种各样的开源软件代码,比如PHP-SRC,对于程序员的帮助也是显而易见的,反之从一个程序员关注的项目也可以看出一个人的开发方向和水平。

GitHub开发了一个客户端用于与GitHub网站进行交互,隐藏了一些操作的细节,相比SVN要大气,方便一些。但是在线安装的方式对于国内并不方面。。
通过GitHub客户端可以直接创建、管理GitHub线上托管的代码。
git-1
git-2
与TortoiseSVN类似,也有TortoiseGit用于Windows下面的Git管理。
作公司内部的代码管理,可以使用GitLab来搭建私有的代码托管服务器,参考这里

参考链接:
Git 使用规范流程
Github使用指南
Pro Git(中文版)
使用 Git 管理源代码
Git工作流程
Git 分支管理
介绍一个成功的 Git 分支模型
从其他代码管理工具迁移到Git
Git分支管理策略
[Sever Hacks] 搭建私有 GitLab 代码托管服务器

PHP C扩展框架Phalcon

Phalcon是一个C语言写的高性能PHP框架,相比PHP写的框架,它作为PHP的扩展在进程开启时便加载了,节省了每一次请求时的类库加载、MVC框架分派的开销。前面提到的使用Zephir来开发PHP扩展也是这个项目贡献的。
在CentOS 上安装Phalcon,因为之前安装过zephire了,所以直接下载cphalcon来编译安装就可以了,可以参照这里安装所需其他lib

git clone --depth=1 git://github.com/phalcon/cphalcon.git
cd cphalcon/build/
sudo ./install

sudo vim /etc/php.d/phalcon.ini

创建phalcon.ini增加以下内容

[phalcon]
extension=phalcon.so

这里单独把phalcon.ini单独配置,是因为Phalcon扩展加载要在PDO扩展之后,而我的PDO也是单独配置的。如果不是的话会提示这个错误:

$ php -m | grep phalcon
PHP Warning:  PHP Startup: Unable to load dynamic library '/usr/lib64/php/modules/phalcon.so' - /usr/lib64/php/modules/phalcon.so: undefined symbol: php_pdo_get_dbh_ce in Unknown on line 0

Zend Framwork类似,Phalcon也提供开发工具用于快速生成项目骨架

git clone https://github.com/phalcon/phalcon-devtools.git
cd phalcon-devtools
./phalcon.sh
sudo ln -s ~/phalcon-devtools/phalcon.php /usr/bin/phalcon
sudo chmod ugo+x /usr/bin/phalcon

phalcon commands

注意,这个开发工具也是需要依赖Phalcon扩展的,否则会提示

$ phalcon command
PHP Fatal error:  Class 'Phalcon\Script' not found in /home/vagrant/phalcon-devtools/phalcon.php on line 40
PHP Stack trace:
PHP   1. {main}() /home/vagrant/phalcon-devtools/phalcon.php:0

生成一个测试项目store

phalcon scaffold store

按照官方的Nginx说明配置

server {

    listen   8005;
    server_name store.localhost;

    index index.php index.html index.htm;
    set $root_path '/usr/share/nginx/html/tutorial/store/public';
    root $root_path;

    try_files $uri $uri/ @rewrite;

    location @rewrite {
        rewrite ^/(.*)$ /index.php?_url=/$1;
    }

    location ~ \.php {
        fastcgi_pass unix:/tmp/php5-fpm.sock;
        fastcgi_index /index.php;

        include fastcgi.conf;

        fastcgi_split_path_info       ^(.+\.php)(/.+)$;
        fastcgi_param PATH_INFO       $fastcgi_path_info;
        fastcgi_param PATH_TRANSLATED $document_root$fastcgi_path_info;
        fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
    }

    location ~* ^/(css|img|js|flv|swf|download)/(.+)$ {
        root $root_path;
    }

    location ~ /\.ht {
        deny all;
    }
}

访问一下192.168.33.14:8005,提示:Access denied。。。查看Nginx错误日志

[root@vagrant nginx]# tail -n 20 error.log
2015/07/28 03:41:42 [error] 7343#0: *1 FastCGI sent in stderr: "Access to the script '/usr/share/nginx/html/tutorial/store/public' has been denied (see security.limit_extensions)" while reading response header from upstream, client: 192.168.33.1, server: store.localhost, request: "GET / HTTP/1.1", upstream: "fastcgi://unix:/tmp/php5-fpm.sock:", host: "192.168.33.14:8005"

一开始还以为是PHP-FPM配置security.limit_extensions引起的,后来发现这个这个参数默认就是注释掉的,没限制。 最后在这里找到答案:原来是之前安装Nginx时配置了:cgi.fix_pathinfo=0,将它改回1(默认值)就可以了。当这个参数为1时,会把 PATH_TRANSLATED 转换为 SCRIPT_FILENAME。当然把这个参数去掉也是可以的

        ;fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;

重启Nginx

sudo service nginx restart

与其他PHP框架一样,Phalcon 支持MVC应用开发,提供了诸多组件如,数据库,缓存,队列等。
Phalcon\DI 作为全局的服务管理,用于服务注册,比如数据库,缓存,文件等。将这些资源统一起来管理,以便其他地方可以调用,各个需要的地方不再需要是new 一个对象,解除耦合关系。
Phalcon\Events\Manager作为全局的事件管理,用于事件注册,触发,以便自定义消息通知,如数据库查询事件,初始化事件。
Phalcon还有一个比较有特色的地方支持注释解析: Phalcon\Annotations 。官方教程举例如何使用注释做输出缓存。首先在Dispatcher服务中注册一个监听dispactch事件的缓存插件

<?php

$di['dispatcher'] = function () {

    $eventsManager = new \Phalcon\Events\Manager();

    // 添加插件到dispatch事件中
    $eventsManager->attach('dispatch', new CacheEnablerPlugin());

    $dispatcher = new \Phalcon\Mvc\Dispatcher();
    $dispatcher->setEventsManager($eventsManager);
    return $dispatcher;
};

然后实现监听的缓存插件

<?php

/**
 * 为视图启动缓存,如果被执行的action带有@Cache 注释单元。
 *
 */
class CacheEnablerPlugin extends \Phalcon\Mvc\User\Plugin
{

    /**
     * 这个事件在dispatcher中的每个路由被执行前执行
     *
     */
    public function beforeExecuteRoute($event, $dispatcher)
    {

        // 解析目前访问的控制的方法的注释
        $annotations = $this->annotations->getMethod(
            $dispatcher->getActiveController(),
            $dispatcher->getActiveMethod()
        );

        // 检查是否方法中带有注释名称‘Cache’的注释单元
        if ($annotations->has('Cache')) {

            // 这个方法带有‘Cache’注释单元
            $annotation = $annotations->get('Cache');

            // 获取注释单元的‘lifetime’参数
            $lifetime = $annotation->getNamedParameter('lifetime');

            $options = array('lifetime' => $lifetime);

            // 检查注释单元中是否有用户定义的‘key’参数
            if ($annotation->hasNamedParameter('key')) {
                $options['key'] = $annotation->getNamedParameter('key');
            }

            // 为当前dispatcher访问的方法开启cache
            $this->view->cache($options);
        }

    }

}

现在就可以使用注释来进行缓存了

<?php

class NewsController extends \Phalcon\Mvc\Controller
{

    public function indexAction()
    {

    }

    /**
     * This is a comment
     *
     * @Cache(lifetime=86400)
     */
    public function showAllAction()
    {
        $this->view->article = Articles::find();
    }

    /**
     * This is a comment
     *
     * @Cache(key="my-key", lifetime=86400)
     */
    public function showAction($slug)
    {
        $this->view->article = Articles::findFirstByTitle($slug);
    }

}

为Action增加了缓存,但对Action的代码改动为0(无侵入)并且非常简洁。数据模型里面也可以使用注释来对字段进行处理

<?php

use Phalcon\Mvc\Model;

class Robots extends Model
{
    /**
     * @Primary
     * @Identity
     * @Column(type="integer", nullable=false)
     */
    public $id;

    /**
     * @Column(type="string", length=70, nullable=false)
     */
    public $name;

    /**
     * @Column(type="string", length=32, nullable=false)
     */
    public $type;

    /**
     * @Column(type="integer", nullable=false)
     */
    public $year;
}

路由解析也可以使用RouterAnnotations反射来实现

?php

/**
 * @RoutePrefix("/api/products")
 */
class ProductsController
{

    /**
     * @Get("/")
     */
    public function indexAction()
    {

    }

    /**
     * @Get("/edit/{id:[0-9]+}", name="edit-robot")
     */
    public function editAction($id)
    {

    }

    /**
     * @Route("/save", methods={"POST", "PUT"}, name="save-robot")
     */
    public function saveAction()
    {

    }

    /**
     * @Route("/delete/{id:[0-9]+}", methods="DELETE",
     *      conversors={id="MyConversors::checkId"})
     */
    public function deleteAction($id)
    {

    }

    public function infoAction($id)
    {

    }

}

是不是跟Java annotation有点像?之前介绍的PHPUnit,Zend Framework,还有Symfony 2 Doctrine 2已经在PHP层面上支持,使用ReflectionClass便可实现。
Phalcon也支持创建命令行应用,可以将PHP应用打包成Phar,直接运行,也可以结合命令行颜色进行输出。

参考链接:
Nginx 安装说明(Nginx Installation Notes)
Can’t upgrade to Phalcon 1.3.0
Nginx/PHP-FPM “Access denied.” error
Access denied (403) for PHP files with Nginx + PHP-FPM
Phalcon 开发工具(Phalcon Developer Tools)
IDE autocomplete for PhalconPHP
Annotations in PHP: They Exist
PHP Annotations Are a Horrible Idea
PHP CLI Colors – PHP Class Command Line Colors (bash)

使用Phar打包发布PHP程序

上一遍介绍PHPUnit时,下载的PHPUnit是个Phar格式的文件并且可以独立运行。Phar归档是PHP 5.3增加的新特性,借鉴了JAVA中的JAR归档,可以将整个目录下的文件打包成单个可执行文件。虽然单个PHP文件也是可执行(Composer的install就是单个PHP文件执行创建对应的Phar),但是显得不方便。
Phar的创建、引用、解压、转换均可以在PHP中完成。要创建Phar需要更改php.ini如下

;phar.readonly = On
phar.readonly = Off

首先在swoole\server目录下创建一个需要打包的文件swoole.php

<?php
$serv = new swoole_server("127.0.0.1", 9002);
$serv->set(array(
    'worker_num' => 8,   //工作进程数量
    'daemonize' => true, //是否作为守护进程
));
$serv->on('connect', function ($serv, $fd){
    echo "Client:Connect.\n";
});
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
    $serv->send($fd, 'Swoole: '.$data);
    $serv->close($fd);
});
$serv->on('close', function ($serv, $fd) {
    echo "Client: Close.\n";
});
$serv->start();

在swoole目录下面创建index.php

<?php
require 'server/swoole.php';

测试一下是否能正常运行

[root@vagrant-centos64 swoole]php index.php
[root@vagrant-centos64 swoole]# netstat  -apn | grep 9002
tcp        0      0 127.0.0.1:9002              0.0.0.0:*                   LISTEN      8436/php

在swoole目录下创建执行打包操作的文件build.php

<?php
$dir = __DIR__;             // 需要打包的目录
$file = 'swoole.phar';      // 包的名称, 注意它不仅仅是一个文件名, 在stub中也会作为入口前缀
$phar = new Phar(__DIR__ . '/' . $file, FilesystemIterator::CURRENT_AS_FILEINFO | FilesystemIterator::KEY_AS_FILENAME, $file);
// 开始打包
$phar->startBuffering();
$phar->buildFromDirectory($dir);
$phar->delete('build.php');
// 设置入口
$phar->setStub("<?php
Phar::mapPhar('{$file}');
require 'phar://{$file}/index.php';
__HALT_COMPILER();
?>");
$phar->stopBuffering();
// 打包完成
echo "Finished {$file}\n";

进行打包

[root@vagrant-centos64 swoole]$ php build.php
Finished swoole.phar

测试执行一下

[root@vagrant-centos64 swoole]$ php swoole.phar
[root@vagrant-centos64 swoole]# netstat  -apn | grep 9002
tcp        0      0 127.0.0.1:9002              0.0.0.0:*                   LISTEN      8635/php

到这里swoole.phar可以单独发布执行力。也可以在其他项目(比如WEB)里面引用,例如

<?php
include 'swoole.phar';
//引用里面的文件
//include 'phar://swoole.phar/server/swoole.php';

SegmentFault便是将PHP打包成Phar进行发布的。
打包完成后,也可以将Phar转成Zip格式或解压出来,比如解压PHPUnit

<?php
$phar = new Phar('phpunit.phar');
$phar->convertToExecutable(Phar::ZIP);
//$phar->extractTo('phpunit'); 

刚才我们这样运行的:php swoole.phar,就是还需要在PharP包前面还要加一个php。如果想做成直接可运行的Phar包,可以像单个PHP可执行文件那样在文件开头加上:#!/usr/bin/env php

$phar->setStub("#!/usr/bin/env php
<?php
Phar::mapPhar('{$file}');
require 'phar://{$file}/index.php';
__HALT_COMPILER();
?>");

运行一下

./swoole.phar

注意如果你在window下面编辑刚才段代码的话,可能会报错:-bash: ./swoole.phar: /usr/bin/env: bad interpreter: No such file or directory。解决办法便是更改换行符为Unix格式,参见这里

GitHub上有个项目:phar-composer可以利用Composer来打包相应的项目为单个可执行文件。

参考链接:
使用phar上线你的代码包
PHP的Phar简介
使用Phar来打包发布PHP程序
Packaging Your Apps with Phar
PHP V5.3 中的新特性,第 4 部分: 创建并使用 Phar 归档
rpm打包利器rpm_create简介

PHP测试框架初探:PHPUnit

最近开始一个新的项目,涉及到了网络通信的客户/端服务端,刚开始都是边写边调试,完成之后觉得有必要留下相关的测试示例,以便日后使用调试。一些PHP项目会把示例写在文件开头的注释里面,但这样又没办法验证是否能正确运行。
workerman是在文件最后留下的运行代码,如RpcClient.php

// ==以下调用示例==
if(PHP_SAPI == 'cli' && isset($argv[0]) && $argv[0] == basename(__FILE__))
{
    //这里是RpcClient的测试用例
}

PHPUnit则是这样的

#!/usr/bin/env php
<?php
if (__FILE__ == realpath($GLOBALS['_SERVER']['SCRIPT_NAME'])) {
    $phar    = realpath($GLOBALS['_SERVER']['SCRIPT_NAME']);
    $execute = true;
} else {
    $files   = get_included_files();
    $phar    = $files[0];
    $execute = false;
}

这有点像python

if __name__ == "__main__":
    //这里运行相关调用

这样就可以对单个文件的代码边看边测试,但是这会在相关的代码里面混入不必要的测试用例代码,当有依赖于外部文件又不是很方便,于是便想把这部分测例代码独立出来。
PHPUnit是xUnit测试家族的一员,可以方便对测试代码进行管理,还可以结合其他扩展。依照官网步骤开始:

wget https://phar.phpunit.de/phpunit.phar
chmod +x phpunit.phar
sudo mv phpunit.phar /usr/local/bin/phpunit
phpunit --version
#PHPUnit 4.7.7 by Sebastian Bergmann and contributors.

写一个测试代码(PHPUnit也可以自动生成对应的测试代码)

<?php
namespace Test\Client;
use Client\RPC;
class RPCTest extends \PHPUnit_Framework_TestCase{
	public $pClient = null;
	public $nMatchID = 567;
	public function __construct(){
		parent::__construct();
		
		$arrAddress = array(
				'tcp://127.0.0.1:9001'
		);
		// 配置服务端列表
		RPC::config($arrAddress);
		$this->nMatchID = 567;
		$this->pClient = RPC::instance('\Prize\Service');
	}
	public function testGet(){
		$mxRes = $this->pClient->get($this->nMatchID);
		$this->assertTrue(is_array($mxRes));
		$this->assertGreaterThan(0, count($mxRes));
	}
	public function testSendGet(){
		$this->assertTrue($this->pClient->send_get($this->nMatchID));
	}
	/**
	 * 使用依赖,以便保证在同一个socket链接之内
	 * @depends testSendGet
	 */
	public function testRecvGet(){
		$mxRes = $this->pClient->recv_get($this->nMatchID);
		$this->assertTrue(is_array($mxRes));
		$this->assertGreaterThan(0, count($mxRes));
	}
}

这边的使用了外部类RPC,于是在Bootstrap.php里面定义相关的自动加载规则,以便测试时自动加载

<?php
define('STIE_PATH', getcwd().'/');
define('ROOT_PATH', STIE_PATH);
define('LIB_PATH', ROOT_PATH.'Lib/');
define('TEST_PATH',	ROOT_PATH.'Test/');

spl_autoload_register(function($p_strClassName){
	$arrClassName = explode('\\', trim($p_strClassName, '\\'));
	if(!empty($arrClassName)){
		if($arrClassName[0] == 'Test'){
			$strPath = ROOT_PATH.implode(DIRECTORY_SEPARATOR, $arrClassName).'.php';
		}
		else{
			$strPath = LIB_PATH.implode(DIRECTORY_SEPARATOR, $arrClassName).'.php';
		}
	}
	if(is_file($strPath)){
		include $strPath;
	}
	else{
		throw new Exception("File not find : ".$strPath);
	}
});

现在运行测试代码

phpunit --bootstrap Test/Bootstrap.php Test/

出现了一些警报和错误。其中一个提示找不到PHPUnit_Extensions_Story_TestCase文件:

PHP Warning:  include(/usr/share/nginx/html/tutorial/mars/Lib/PHPUnit_Extensions_Story_TestCase.php): failed to open stream: No such file or directory in /usr/share/nginx/html/tutorial/mars/Test/Bootstrap.php on line 20

这文件是一个PHPUnit的基于故事的行为驱动开发(BDD)扩展,于是上网找了下想安装。

PHPUnit现在只支持Composer安装,如果用PEAR安装会提示失败

sudo pear channel-discover pear.phpunit.de
Discovering channel pear.phpunit.de over http:// failed with message: channel-add: Cannot open "http://pear.phpunit.de/channel.xml" (File http://pear.phpunit.de:80/channel.xml not valid (received: HTTP/1.1 410 Gone
))
Trying to discover channel pear.phpunit.de over https:// instead
Discovery of channel "pear.phpunit.de" failed (channel-add: Cannot open "https://pear.phpunit.de/channel.xml" (File https://pear.phpunit.de:443/channel.xml not valid (received: HTTP/1.1 410 Gone
)))

先安装Composer

curl -sS https://getcomposer.org/installer | php
mv composer.phar /usr/local/bin/composer
composer -V

创建一个composer.json,以便自动下载相关的库

{
    "require-dev": {
        "phpunit/phpunit": "4.7.*",
        "phpunit/phpunit-selenium": ">=1.4",
        "phpunit/dbunit": ">=1.3",
        "phpunit/phpunit-story": "*",
        "phpunit/php-invoker": "*"
    },
    "autoload": {
        "psr-0": {"": "src"}
    },
    "config": {
        "bin-dir": "bin/"
    }
}

运行composer,注意:composer在国内慢的要死,可以参照这里

composer install --dev

便会在同级目录下面生成一个vendor目录,各自的依赖库都下载在里面,并且生成了autolaod.php,用于加载相关的类库。编辑Bootstarp.php增加一行

include ROOT_PATH."vendor/autoload.php";

再次运行PHPUnit

[vagrant@vagrant-centos64 mars]$ phpunit --bootstrap Test/Bootstrap.php Test/KM/Client/RPCTest
PHPUnit 4.7.7 by Sebastian Bergmann and contributors.

EES

Time: 540 ms, Memory: 14.25Mb

There were 2 errors:

1) Test\KM\Client\RPCTest::testGet
stream_socket_client(): unable to connect to tcp://127.0.0.1:9001 (Connection refused)

/usr/share/nginx/html/tutorial/mars/Lib/Client/Stream.php:19
/usr/share/nginx/html/tutorial/mars/Lib/Client/Stream.php:16
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:174
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:141
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:130
/usr/share/nginx/html/tutorial/mars/Test/Client/RPCTest.php:21
/usr/share/nginx/html/tutorial/mars/Test/Client/RPCTest.php:21

2) Test\KM\Client\RPCTest::testSendGet
stream_socket_client(): unable to connect to tcp://127.0.0.1:9001 (Connection refused)

/usr/share/nginx/html/tutorial/mars/Lib/Client/Stream.php:19
/usr/share/nginx/html/tutorial/mars/Lib/Client/Stream.php:16
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:174
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:141
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:116
/usr/share/nginx/html/tutorial/mars/Test/Client/RPCTest.php:26
/usr/share/nginx/html/tutorial/mars/Test/Client/RPCTest.php:26

FAILURES!
Tests: 2, Assertions: 0, Errors: 2, Skipped: 1.

这下子没有警报了,只有详细的测试错误报告。刚才的测试代码里面testRecvGet方法依赖于testSendGet,一旦后者失败了,前者便会被跳过。
启动服务端再次运行测试代码

[vagrant@vagrant-centos64 mars]$ phpunit --colors --bootstrap Test/Bootstrap.php Test/
PHPUnit 4.7.7 by Sebastian Bergmann and contributors.

...

Time: 496 ms, Memory: 14.25Mb

OK (3 tests, 5 assertions)

这下子全部测试通过了。

前面运行测试是这样的:phpunit –bootstrap Test/Bootstrap.php Test/。–bootstrap是指在运行测试时首先加载的文件可以在这里面定义一些配置,自动加载规则,最后一个参数Test/是指要测试的目录,也可以指明运行某个测试用例,比如Test/Client/RPCTest,指运行RPCTest这个测试用例。

PHPUnit还有许多其他的命令行参数选项,如果每次这么输入也挺麻烦的,可以做成配置文件,加载运行就好了,比如phpunit.xml.dist

<?xml version="1.0" encoding="UTF-8"?>
<phpunit backupGlobals="false"
         backupStaticAttributes="false"
         colors="true"
         convertErrorsToExceptions="true"
         convertNoticesToExceptions="true"
         convertWarningsToExceptions="true"
         processIsolation="false"
         stopOnFailure="false"
         syntaxCheck="false"
         bootstrap="Test/Bootstrap.php"
>
    <testsuites>
        <testsuite name="Lib Test Suite">
            <directory>./Test/</directory>
        </testsuite>
    </testsuites>
</phpunit>

然后运行:phpunit -c phpunit.xml.dist 就可以了,更多XML配置参考这里。还可以在这基础上更改执行配置,比如phpunit -c phpunit.xml.dist Test/Client/RPCTest,便是依照配置仅运行RPCTest。

以前挺纠结要不要写测试代码,一方面有测试代码的话,每次变更可以自动检测是否通过,另一方面,详细的测试用例需要大量时间去考虑,并且多数情况下都是服务模块之间的互相调用,难以预估结果,超出单元测试范围。现在看来写的这部门测试代码除了验证程序是否准确外,还可以作为示例文档。

参考链接:
PHPUnit: Failed opening required `PHPUnit_Extensions_Story_TestCase.php`
Issues while installing pear.phpunit.de/PHPUnit
单元测试
“单元测试要做多细?”
我讨厌单元测试:滕振宇谈如何进行单元测试
自动化单元测试实践之路
Unit Testing Tutorial Part I: Introduction to PHPUnit

MySQL秒杀优化

今天学习了楼方鑫先生《基于SQL的秒杀解决方案》,讲解了如何定位和优化秒杀业务中问题。
首先介绍了库存业务,库存可以分为前端库存,后端库存,实体库存。秒杀时,存在的主要问题

  • 库存数据不准确,下单、付款后,得知零库存;超卖或少卖
  • 废单较多,只下单不付款,转化率低
  • 热点商品,拖垮整个站

秒杀过程中,需要解决的技术点包括

  • 余额减一
  • 操作明细,方便追溯对账,防止一个帐号多次参与
  • 完整事务,保障记录明细与扣减库存同时完成
  • 数据落地,内存数据不可靠

针对库存技术要求,做了多个库存解决方案,比如Mysql + Read /Write Cache 。Read Cache方案不足是读有延迟影响用户体验;Write Cache方案存在多个APP写数据不一致性。Mysql + Cache + NoSQL方案则太复杂未实现。

于是又重新回到优化Mysql上。Mysql优势在于事务机制成熟,程序稳定。存在技术难点:单行并发,热点商品,瞬间压力,前一分钟,千万用户,容易堵塞,拖垮网站。于是从以下几个方面进行优化

  • 事务优化,单行更新
  • 并发优化,最大并发数
  • 排队优化,抢同一商品

分析秒杀时的处理逻辑,扫描系统代码,发现大部分程序都在等待确认Update记录数,才提交事务。

  • 开启事务
  • Insert库存明细
  • Update库存余额
  • 提交事务

在良好设计下,Mysql的Insert操作,不使用自增列是不会阻塞请求。但是Mysql的Update同一条记录是串行的,需要等远程客户端发送提交命令后才能释放锁,让其他会话继续。简单的更新操作,不考虑IO和锁冲突,一条语句执行的时间大约是0.1ms,一般条件下的网络时延为0.4-0.8ms,即等待事务提交通知的时间比真正SQL执行的时间长数倍。
于是扩展了SQL语法(OneSQL),指定在Update执行完后自动提交,不需要等待客户端发送提交命令,从而节约这一个网络来回的事务等待时间,提升网络性能。

秒杀时如果遇到大量请求需要进行排队,以免太多的请求拖垮Mysql

  • 在应用层排队的缺点,应用需要改造,使用统一框架(需要考虑跨语言),应用集群扩容时,控制不准确(连接数分配)
  • 在Mysql排队的优点,应用改造极少,只需修改少量SQL语句,无需统一框架,排队精确,发挥InnoDB性能。

于是开发了兼容Mysql的分布式数据访问层(OneProxy),为并发请求进行排队。

另外,还对热点商品进行独立数据库拆分和优化。目前,双十一前商品便已挂出,用户可以收藏或预购,对于商家而言可以准备更多商品;对于平台而言可以预先发现热点商品做优化。

总结,对于业务优化,需要循序渐进,深入了解业务逻辑和技术点,比较不同的解决方案,就算是平常的update操作也有优化空间;同时需要从其他方面进行特定优化,如高并发排队,热点数据分离等。

除了后端数据库优化,对于秒杀抽奖业务,问题的解决核心就是控制单位时间内的流量,使其不超过后端的处理能力。前端的做法包括

  • 分批次(少量多次)进行秒杀
  • 先玩游戏再抢购,如抽奖
  • 随机过滤掉部分请求,仅部分进入系统,如1/10
  • 阈值控制,一旦达到阈值,不再接收新请求
  • 预约排号,未排号用户返回失败(用户分类)
  • 验证码验证

另外,OneProxy 提供的连接池功能对于PHP非常有用。PHP运行在CGI下面,每一个请求到来便需要重新创建一个数据库连接与Mysql进行交互,并发量大量的情况下便会出现:too many connetion,乃至拖垮数据库:mysql server has gone away,影响其他业务。因此Mysql连接池,对于PHP显得非常重要。

更新:小米网在开发抢购系统的时候,最早使用PHP + Mysql碰到了一些问题,例如并发性能,数据一致性,在OneSQL上面都已经做了改进优化,只是小米自己使用Go语言重构,开发大秒系统(BigTap)。

参考链接:
限量秒杀等高并发活动的正确性如何保证?
MySQL 5.6.17/Percona5.6.16/MariaDB 10.0.11/OneSQL 5.6.16 TpmC测试
由12306.cn谈谈网站性能技术
“米粉节”背后的故事——小米网抢购系统开发实践
Web系统大规模并发——电商秒杀与抢购
OneProxy : 如何给PHP页面以及其他Ruby/Python/Go程序添加连接池功能?
基于Swoole实现的Mysql连接池

PHP ZeroMQ开发

ZeroMQ的名字有点巧妙,看起来是个MQ却加了个0,变得不是MQ。ZeroMQ是一个面向消息传递的网络通信框架,支持程序在进程内部部通信,进程之间通信,网络间通信,多播等。ZeroMQ对Socket进行了封装,支持多种网络结构范式如Request/Reply,Pub/Sub,Pull/Push,中介,路由等,还可以在这些模式再次扩展,动态扩容程序和分布式任务开发,能够轻易搭建服务程序集群。

ZeroMQ与支持AMQP的消息中间件不一样,ZeroMQ是一个网络通信库,需要自行实现中间节点和消息的管理。

在CentOS安装ZeroMQ4

git clone https://github.com/zeromq/zeromq4-x.git
cd zeromq4-x
./autogent.sh
./configure
sudo make
sudo make install

#声明libzmq库的位置
sudo vim /etc/ld.so.conf.d/libzmq.conf
#内容:/usr/local/lib

sudo ldconfig

ZeroMQ支持多种编程语言,也包括PHP。php-zmq安装

git clone https://github.com/mkoppanen/php-zmq.git
cd php-zmq
phpize
./configure
sudo make
sudo make install

sudo vim /etc/php.ini

编辑PHP.ini增加扩展信息

[zeromq]
extension = zmq.so

查看扩展是否加载成功:php -m | grep php。
先写一个简单请求-应答,首先是服务端reply.php

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5559");
while (true){
	$strMessage = $pServer->recv();
	echo $strMessage."\r\n";
	$pServer->send("From Server1:".$strMessage);
}

然后是客户端request.php

<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("tcp://localhost:5559");
$pClient->send("Hello From Client:".uniqid());
var_dump($pClient->recv());

分别在不同终端预先一下程序

#请求者可以先启动
php request.php
#另一个终端
php reply.php

使用ZeroMQ进行通信的步骤

  • 使用ZMQContext创建一个上下文
  • 使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括
    • PUB,SUB
    • REQ,REP
    • REQ,ROUTER (take care, REQ inserts an extra null frame)
    • DEALER,REP (take care, REP assumes a null frame)
    • DEALER,ROUTER
    • DEALER,DEALER
    • ROUTER,ROUTER
    • PUSH,PULL
    • PAIR,PAIR

    分类包括

    • 轮询,REQ,PUSH,DEALER
    • 多播,PUB
    • 公平排队,REP,SUB,PULL,DEALER
    • 明确寻址,ROUTER
    • 单播,PAIR
  • 如果是服务端就bind,如果是客户端就conncet,这里的连接信息支持
    • 进程内部通信,inproc://
    • 进程间通信,ipc://
    • 网络间通信,tcp://
    • 多播,pgm://
  • 使用send/recv发送/接收消息

使用ZeroMQ创建通信比socket简单多了,与stream_socket差不多。但是使用ZeroMQ,客户端可以先启动而不用管服务端是否已经启动了,等服务端连接上了便会自动传递消息,还可以维持节点之间的心跳。

ZeroMQ与socket通信是不一样的。ZeroMQ是无状态的,对socket的细节进行了封装,不能知道彼此的socket连接信息,仅能接收和发送消息;ZeroMQ能够使用一个socket与多个节点进行通信,具有极高的性能。

再回头看一下服务端程序,这里采用while循环来处理,亦即同一时刻只能处理一个请求,多个请求排队直到被轮询到,客户端的发送和接收都是同步等待。由于不知道客户端信息,也不能在子进程内处理完成再返回。这里就需要用到ZeroMQ各种范式的组合,比如下面这个
fig16
这里使用ROUTER和DEALER作为中介,转发请求,客户端可以异步发送求,不用等待服务端响应。

<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");

$pPoll = new ZMQPoll();
$pPoll->add($pFrontend, ZMQ::POLL_IN);
$pPoll->add($pBackend, ZMQ::POLL_IN);


$arrRead = $arrWrite = array();
while(true){
	$nEvent = $pPoll->poll($arrRead, $arrWrite);
    if ($nEvent > 0) {
		foreach($arrRead as $pSocket){
			if($pSocket === $pFrontend){
				while (true){
					$strMessage = $pSocket->recv();
					$nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
					$pBackend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
					if(!$nMore){
						break;
					}
				}
			}
			else if ($pSocket === $pBackend){
				while (true){
					$strMessage = $pSocket->recv();
					$nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
					$pFrontend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
					if(!$nMore){
						break;
					}
				}
			}
		}
    }
}

然后更改服务端reply.php,不再绑定监听,而不是连接到DEALER上

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
//$pServer->bind("tcp://*:5555");
$pServer->connect("tcp://localhost:5560");
while (true){
	$strMessage = $pServer->recv();
	echo $strMessage."\r\n";
	$pServer->send("From Server1:".$strMessage);
}

这里使用ZMQPoll对ZMQSOcket的输入输出事件进行轮询,将ROUTER收到的REQ转发给服务端,将DEALER收到的REP转发给客户端。事实上,还有更简便的方法:使用ZMQDevice将ROUTER和DEALER组合起来

<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");

$pDevice = new ZMQDevice($pFrontend, $pBackend);
$pDevice->run();

ZeroMQ的Pub/Sub的通信模型支持一个发布者发布消息给多个订阅者,也支持一个订阅者从多个发布者订阅消息。首先写一个发布者

<?php
$pContext = new ZMQContext();
$pPublisher = new ZMQSocket($pContext, ZMQ::SOCKET_PUB);
$pPublisher->bind("tcp://*:5563");

while (true) {
    $pPublisher->send("A", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We don't want to see this");
    $pPublisher->send("B", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We would like to see this");
    sleep (1);
}

然后是订阅者

$pContext = new ZMQContext();
$pSubscriber = new ZMQSocket($pContext, ZMQ::SOCKET_SUB);
$pSubscriber->connect("tcp://localhost:5563");
#可以连接多个发布者
$pSubscriber->connect("tcp://localhost:5564");
$pSubscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "B");

while (true) {
    //  Read envelope with address
    $address = $pSubscriber->recv();
    //  Read message contents
    $contents = $pSubscriber->recv();
    printf ("[%s] %s%s", $address, $contents, PHP_EOL);
}

Pub/Sub模型,发布者只能发布消息,要求发布消息前,先声明主题(地址),然后发布消息内容;订阅者只能接收消息,先设置订阅主题,然后两次接收,第一次为消息主题,第二次为消息内容。
Pub/Sub模型通消息为单向流动,可以结合其他模型让订阅者与发布者互动,比如REQ\REP。

ZeroMQ的Push/Pull模型,生产者负责推送消息,消费者负责拉取消息。初看之下Pull/Push模型与Pub/sub模型类似,但是Pull/Push下生产者产生的消息只会投递给一个消费者,并不会发布给全部消费者,适合用于任务投递分配
fig5
Push和Pull都既可作为服务端,也可作为客户端。服务端Push.php

<?php
$pContext = new ZMQContext();
$pPush = new ZMQSocket($pContext, ZMQ::SOCKET_PUSH);

$pPush->bind("tcp://*:5558");
//$pPush->connect("tcp://localhost:5558");
//$pPush->connect("tcp://localhost:5559");

$pPush->send("Hello Client 1");

客户端Pull.php

<?php
$pContext = new ZMQContext();
$pPull = new ZMQSocket($pContext, ZMQ::SOCKET_PULL);

//$pPull->bind("tcp://*:5558");
$pPull->connect("tcp://localhost:5558");
$pPull->connect("tcp://localhost:5559");

var_dump($pPull->recv());

如果同时启动了两个客户端Pull.php,而只启动一个服务端Push.php,那么一次只会有一个客户端接收到消息。也可以以Pull作为主动监听,Push作为被动连接。可以同时接可以Pub/Sub和Pull/Push来处理任务
fig56
如果是用ZeroMQ传递消息收不到,可以按下面这个流程查问题
chapter1_9
除了客户端可以连接多个服务端,服务端同样可以绑定多个地址。在REQ/REP模型里,让服务端同时使用IPC(进程间通信)来处理本机的连接

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5556");
$pServer->bind("ipc:///tmp/req.ipc");
while(true){
	$message = $pServer->recv();
	echo $message . PHP_EOL;
	$pServer->send("Hello from server1:".$message);
}

客户端可以选择走TCP或者IPC进行消息通信

<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("ipc:///tmp/req.ipc");
//$pClient->connect("tcp://localhost:5556");
$pClient->send("Hello From Client1:".uniqid());
$strMessage = $pClient->recv();
echo $strMessage,PHP_EOL;

使用ZeroMQ的进程内部消息通信也很简单

$pServer  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
$pServer->bind("inproc://reply");


$pClient  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ);
$pClient->connect("inproc://reply");;
$pClient->send("Hello From Client1:".uniqid());

var_dump($pServer->recv());

ZeroMQ为消息传递的提供极简的方法,提供了各种连接模型,可以自由扩展。zguide更像是一个网络编程指南,指导大家如何利用ZeroMQ搭建各种网络通信模式,提高程序扩展性和健壮性。虽然ZeroMQ解决了进程间和网络间的通信问题,但是各个组件本身进程控制仍然需要自行实现。

更新:ZeroMQ的作者用C语言创建了另外一个支持多种通用通信范式的socket库:nanomsg,可以用来代替ZeroMQ做的那些事,提供了更好的伸缩性,也有对应的PHP扩展

参考链接:
ZMQ 指南
ZeroMQ in PHP
zeromq is the answer
ZeroMQ + libevent in PHP
Europycon2011: Implementing distributed application using ZeroMQ
Getting Started with ‘nanomsg’
A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

PHP Socket通信

前一篇介绍了跨语言的服务调用框架Thrift,模块与模块之间调用,网络通信必不可少。这里具体介绍下如何使用PHP socket客户端与服务端进行通信。

PHP 的Socket扩展是基于流行的BSD sockets,实现了和socket通讯功能的底层接口,它可以和通用客户端一样当做一个socket服务器。这里的通用客户端是指stream_socket_*系列封装的函数。

首先写一个socket服务端

<?php
class ServerSocket{
	protected $strHost = "127.0.0.1";
	protected $nPort = 2015;
	protected $nProtocol = SOL_TCP;
	protected $pSocket = null;
	protected $pClient = null;

	public $strErrorCode = "";
	public $strErrorMsg  = "";
	public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
		//参数验证
		$this->strHost = $p_strHost;
		$this->nPort = $p_nPort;
		$this->nProtocol = $p_nProtocol;
		if($this->_create()&&$this->_bind()){
			$this->_listen();
		}
	}
	protected function _create(){
		$this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
		if(!$this->pSocket){
			$this->_log();
		}
		return $this->pSocket;
	}
	protected function _bind(){
		$bRes = socket_bind($this->pSocket, $this->strHost, $this->nPort);
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	protected function _listen(){
		$bRes  = socket_listen($this->pSocket, 10) ;
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	public function accept(){
		$this->pClient = socket_accept($this->pSocket);
		if(!$this->pClient){
			$this->_log();
		}
		return $this->pClient;
	}
	protected function _connect(){
		$this->accept();
			
		if(socket_getpeername($this->pClient, $address, $port)){
			echo "Client $address : $port is now connected to us. \n";
		}
		$this->write("hello world from server\n");
	}
	protected function _reply(){
		$mxData = $this->read();
		var_dump($mxData);
		if ($mxData == false) {
			socket_close($this->pClient);

			echo "client disconnected.\n";
			return false;
		}
		else{
			$strMessage = "Client: ".trim($mxData)."\n";
			$this->write($strMessage);
			return true;
		}
	}
	public function run(){
		$this->_connect();
		$this->_reply();
	}
	public function read(){
		$mxMessage = socket_read($this->pClient, 1024, PHP_BINARY_READ);
		if($mxMessage === false){
			$this->_log();
		}
		return $mxMessage;
	}
	public function write($p_strMessage){
		$bRes = socket_write($this->pClient, $p_strMessage, strlen ($p_strMessage));
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	public function close(){
		$bRes = socket_close($this->pSocket);

		$this->pSocket = null;
	}
	protected function _log(){
		$this->strErrorCode = socket_last_error();
		$this->strErrorMsg = socket_strerror($this->strErrorCode);
		//throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
	}
	public function __destruct(){
		if($this->pSocket){
			$this->close();
		}
	}
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new ServerSocket($strHost, $nPort);
$pServer->run();

这里对socket_*系列函数进行了包装,创建socket服务端的步骤

  • 使用socket_create创建socket(套接字)。第一个参数AF_INET指IPV4网络协议,TCP和UDP均可使用,对应IPV6网络协议为AF_INET6,也可以使用UNIX socket协议AF_UNIX,作进程间通信
  • 。第二个参数对应套接字类型,SOCK_STREAM对应TCP协议使用,SOCK_DGRAM对应UDP协议使用,还有SOCK_SEQPACKET,SOCK_RAW,SOCK_RDM等类型。第三个为协议类型,TCP协议对应常量SOL_TCP,UDP协议对应常量SOL_UDP,其他协议可以从getprotobyname函数获取。

  • 使用socket_bind将套接字绑定到对应的主机端口或者UNIX socket上
  • 使用socket_listen监听该套接字上的连接
  • 使用socket_accept接收套接字上的请求连接,返回一个新的套接字用于与客户端通信。如果没有连接接入,将会阻塞住;如果有多个连接,使用第一个达到的连接。
  • 开始通信,使用socket_read获取请求信息,使用socket_write返回响应结果
  • 使用socket_close关闭连接,包括原始的和socket_accept产生的套接字

这个过程中,可以使用socket_last_error和socket_strerror获取错误信息。接着创建客户端

<?php
class ClientSocket{
	protected $strHost = "127.0.0.1";
	protected $nPort = 2015;
	protected $nProtocol = SOL_TCP;
	private $pSocket = null;
	public $strErrorCode = "";
	public $strErrorMsg  = "";

	public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
		//参数验证
		$this->strHost = $p_strHost;
		$this->nPort = $p_nPort;
		$this->nProtocol = $p_nProtocol;
		if($this->_create()){
			$this->_connect();
		}
	}
	private function _create(){
		$this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
		if(!$this->pSocket){
			$this->_log();
		}
		return $this->pSocket;
	}
	private function _connect(){
		$pSocket = $this->_create();
		$bRes = socket_connect($pSocket, $this->strHost, $this->nPort);
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	public function read(){
		$strMessage = "";
		$strBuffer = "";
		while ($strBuffer = socket_read ($this->pSocket, 1024, PHP_BINARY_READ)) {
			$strMessage .= $strBuffer;
		}
		return $strMessage;
	}
	public function write($p_strMessage){
		$bRes = socket_write($this->pSocket, $p_strMessage, strlen($p_strMessage));
		if(!$bRes){
			$this->_log();
		}
	}
	public function send($p_strMessage){
		$bRes = socket_send($this->pSocket , $p_strMessage , strlen($p_strMessage) , 0);
		if(!$bRes){
			$this->_log();
		}
		return true;
	}
	public function recv(){
		$strMessage = "";
		$strBuffer = "";
		$bRes = socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL);
		if(!$bRes){
			$this->_log();
		}
		$strMessage .=$strBuffer;
		return $strMessage;
	}
	public function close(){
		$bRes = socket_close($this->pSocket);

		$this->pSocket = null;
	}
	private function _log(){
		$this->strErrorCode = socket_last_error();
		$this->strErrorMsg = socket_strerror($this->strErrorCode);
		//throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
	}
	public function __destruct(){
		if($this->pSocket){
			$this->close();
		}
	}
}

$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);

$pClient = new ClientSocket($strHost, $nPort);

var_dump($pClient->read());
$strMessage = 'Some Thing :'.uniqid();
var_dump($strMessage);
$pClient->write($strMessage);
var_dump($pClient->read());

/*
 var_dump($pClient->recv());
$pClient->send('hello');
var_dump($pClient->recv());
*/
$pClient->close();

客户端的创建步骤:

  • 使用socket_create创建socket套接字,与服务端对应
  • 使用socket_connect连接到服务端的地址或UNIX socket
  • 开始通信,可以使用socket_write和socket_read向套接字写入和读取信息,也可以使用socket_send和socket_recv发送和接收信息
  • 使用socket_close关闭连接

运行服务端程序

php serversocket.php

在另一个终端里运行

[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             0.0.0.0:*                   LISTEN      12139/php

如果运行服务端失败,提示 socket_bind(): unable to bind address [98]: Address already in use ,则是端口绑定失败。查看端口占用

[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             127.0.0.1:36618             TIME_WAIT   -

该端口处于TIME_WAIT状态,需要再等一会儿才会释放。这是因为TCP连接关闭需要四次握手,服务端主动关闭了连接,但是未收到客户端发过来的关闭确认,导致处于等待状态,具体原因见火丁笔记《再叙TIME_WAIT》

如果服务端已经运行成功,在另一个终端里运行客户端程序

php clientsocket.php

这是一个简单服务端/客户端请求应答模型,通常服务端会一直处于监听状态,处理新的请求,重新写一个循环监听的服务端

class SelectServerSocket extends ServerSocket{
	public function run(){
		$this->loop();
	}
	public function loop(){
		$arrRead = array();
		$arrWrite = $arrExp = null;
		$arrClient = array($this->pSocket);
	
		while(true){
			$arrRead = $arrClient;
			if (socket_select($arrRead, $arrWrite, $arrExp, null) < 1){
				continue;
			}
			foreach ($arrRead as $pSocket){
				if($pSocket == $this->pSocket){
					$this->_connect();
						
					$arrClient[] = $this->pClient;
				}
				else{
					$bRes = $this->_reply();
					if($bRes === false){
						$nKey = array_search($this->pClient, $arrClient);
						unset($arrClient[$nKey]);
						continue;
					}
				}
	
			}
		}
		//usleep(100);
	}
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new SelectServerSocket($strHost, $nPort);
$pServer->run();

在循环里面使用socket_select查询有可以读的套接字,如果套接字为原始监听的套接字,则使用socket_accept获取新接入的通信套接字进行通信;如果是通信套接字,则与客户端进行交互。

这里socket_select($arrRead, $arrWrite, $arrExp, null)的第四个参数为null,表示可以无限阻塞,如果为0则不阻塞立即返回,其他大于0值则等待超时。
socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL)的第四个参数为MSG_WAITALL,表示阻塞读取结果。
socket_read ($this->pSocket, 1024, PHP_BINARY_READ )的第三个参数PHP_BINARY_READ表示读取以\0结束,PHP_NORMAL_READ表示读取以\n或\r结束

在终端里运行服务端,会一直在那里等待新的连接。这时候运行客户端,客户端确也阻塞住了。解决的办法有两种:超时设置和非阻塞设置。给ClientSocket类增加超时和阻塞设置的方法

	public function setTimeOut($p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
		$nSend = (int)$p_nSendTimeOut;
		$nRecv = (int)$p_nRecvTimeOut;

		$arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
		$arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));

		socket_set_option($this->pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
		socket_set_option($this->pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
	}
	public function setBlock($p_bType = true){
		if($p_bType){
			socket_set_block($this->pSocket);
		}
		else{
			socket_set_nonblock($this->pSocket);
		}
	}

客户端端运行前,先设置一下超时或非阻塞即,此时程序不会再阻塞了

$pClient = new ClientSocket($strHost, $nPort);

$pClient->setTimeOut(1, 1);
//$pClient->setBlock(false);

//do request here

同样在服务端设置超时和非阻塞也是可以的,给ServerSocket增加超时和非阻塞设置的方法

	protected function _setNoBlock($p_pSocket){
		socket_set_nonblock($p_pSocket);
	}
	protected function _setTimeOut($p_pSocket, $p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
		$nSend = (int)$p_nSendTimeOut;
		$nRecv = (int)$p_nRecvTimeOut;

		$arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
		$arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));

		socket_set_option($p_pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
		socket_set_option($p_pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
	}

将SelectServerSocket的socket_accept后产生的连接设置为非阻塞

	public function loop(){
		$arrRead = array();
		$arrWrite = $arrExp = null;
		$arrClient = array($this->pSocket);
		$this->_setNoBlock($this->pSocket);
	
		while(true){

再次运行服务端,客户端也不会阻塞了。

在while循环里面使用socket_select进行查询,效率比较低下,有先的连接要等下次循环才能处理;有时候并没有连接需要处理,也一直在循环。可以结合前面介绍过的PHP Libev扩展进行监听

class EvServerSocket extends ServerSocket{
	protected function _onConnect(){
		$this->_connect();
	
		$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
			$this->_reply();
		});
		Ev::run();
	}
	public function run(){
		$pReadWatcher = new EvIo($this->pSocket, Ev::READ, function ($watcher, $revents) {
			$this->_onConnect();
		});
		Ev::run();
	}
}

$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new EvServerSocket($strHost, $nPort);
$pServer->run();

代码看起来简单了很多。当原始套接字监听到可读事件时,便为新的套接字也创建可读事件监听 ,在事件里面处理新的连接。

通常的服务端程序是一个进程监听原始套接字,然后交由其他进程/线程处理新的连接套接字,与客户端进行交互,提升服务端性能。这样子又涉及到了多进程/多线程的控制、通信,需要一套完善的体系才行。


class MulProcessServerSocket extends EvServerSocket{
	protected function _execute(){
		if(!$this->_reply()){
			//子进程执行完毕,通知父进程
			exit();
		}
	}
	protected function _onConnect(){
		$pid = pcntl_fork();
		//父进程和子进程都会执行下面代码
		if ($pid == -1) {
			//错误处理:创建子进程失败时返回-1.
			die('could not fork');
		} else if ($pid) {
			//父进程会得到子进程号,所以这里是父进程执行的逻辑
			pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
		} else {
			//子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
			$this->_connect();

			$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
				$this->_execute();
			});
			Ev::run();
		}

	}
}

还可以使用stream_socket_*系列函数来创建sockt服务端和客户端。类似的创建一个客户端与之前的服务端进行交互

<?php
class ClientStreamSocket{
	private $pConnetion = null;
	protected $strAddress = "tcp://127.0.0.1:2016";
	protected $nTimeOut   = 3;
	protected $nFlag      = STREAM_CLIENT_CONNECT;
	public $strErrorCode = "";
	public $strErrorMsg  = "";
	const BLOCK   = 1;
	const NOBLOCK = 0;
	public function __construct($p_strAddress, $p_nTimeOut = 3, $p_nFlag = STREAM_CLIENT_CONNECT){
		$this->strAddress = $p_strAddress;
		$this->nTimeOut   = $p_nTimeOut;
		$this->nFlag      = $p_nFlag;
		$this->_connect();
	}
	private function _connect(){
		$this->pConnetion = stream_socket_client($this->strAddress, $this->strErrorCode, $this->strErrorMsg, $this->nTimeOut, $this->nFlag);
		if(!$this->pConnetion){
			throw new Exception("connect exception:".$this->strErrorMsg, $this->strErrorCode);
		}
		return $this->pConnetion;
	}
	public function write($p_strMessage){
		if(fwrite($this->pConnetion, $p_strMessage) !== strlen($p_strMessage))
		{
			throw new Exception('Can not send data');
		}
		return true;
	}
	public function read(){
		//接收一行,阻塞至\n结束
		//$strMessage = fgets($this->pConnetion);
		//指定长度读取
		//$strMessage = fread($this->pConnetion, 1024);
		$strMessage = stream_socket_recvfrom($this->pConnetion, 1024);
		//$strMessage = stream_get_contents($this->pConnetion);

		return $strMessage;
	}
	public function close(){
		fclose($this->pConnetion);
		$this->pConnetion = null;
	}
	public function setContext(){

	}
	public function setTimeOut($p_nTimeOut = 1){
		$bRes = stream_set_timeout($this->pConnetion, $p_nTimeOut);
	}
	public function setBlock($p_nMode = ClientStreamSocket::BLOCK){
		$bRes = stream_set_blocking($this->pConnetion, $p_nMode);
	}
	public function __destruct(){
		if($this->pConnetion){
			$this->close();
		}
	}
}

$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);

$strAddress = $strProtocol."://".$strHost.":".$nPort;

$pStream = new ClientStreamSocket($strAddress);
//$pStream->setBlock(ClientStreamSocket::NOBLOCK);
//$pStream->setTimeOut(1);
var_dump($pStream->read());
$pStream->write("hello from client\n");
var_dump($pStream->read());
$pStream->close();

使用stream_socket_*系列函数创建客户端要简单不少

  • 首先使用stream_socket_client创建一个socket操作流(stream)
  • 然后就可以像操作流式文件那样造成socket stream,使用fread和fwrite进行读写操作,也可以使用stream_socket_recvfrom和stream_socket_sendto进行操作
  • 使用fclose或stream_socket_shutdown关闭连接

使用stream_socket_*系列函数创建一个服务端来与之前的客户端进行交互,同样很简单,也与ServerSocket类似

<?php
class ServerStreamSocket{
	protected $pServer = null;
	protected $pClient = null;
	protected $strAddress = "tcp://127.0.0.1:2016";
	protected $nFlag      = STREAM_SERVER_LISTEN;
	
	const BLOCK   = 1;
	const NOBLOCK = 0;
	
	public $strErrorCode = "";
	public $strErrorMsg  = "";
	public function __construct($p_strAddress, $p_nFlag = STREAM_SERVER_LISTEN){
		$this->strAddress = $p_strAddress;
		$this->nFlag = $p_nFlag;
		$this->_create();
	}
	protected function _create(){
		$this->pServer = stream_socket_server($this->strAddress, $this->strErrorCode, $this->strErrorMsg);
		if(!$this->pServer ){
			throw new Exception("create exception:".$this->strErrorMsg, $this->strErrorCode);
		}
		return $this->pServer ;
	}
	public function accept(){
		$this->pClient = stream_socket_accept($this->pServer);
		if(!$this->pClient ){
			return false;
		}
		return $this->pClient ;
	}
	protected function _connect(){
		$this->accept();
		echo "Client". stream_socket_get_name($this->pClient, true)." is now connected to us. \n";
		$this->write("hello world from server\n");
	}
	protected function _reply(){
		$mxData = $this->read();
		var_dump($mxData);
		if($mxData == false){
			fclose($this->pClient);
				
			echo "client disconnected.\n";
			return false;
		}
		else{
			$strMessage = "Client:".trim($mxData)."\n";
			$this->write($strMessage);
			return true;
		}
	}
	public function run(){
		$this->_connect();
		$this->_reply();
	}
	public function write($p_strMessage){
		//$nLen = fwrite($this->pClient, $p_strMessage);
		$nLen = stream_socket_sendto($this->pClient, $p_strMessage);
		if($nLen !== strlen($p_strMessage))
		{
			throw new Exception('Can not send data');
		}
		return true;
	}
	public function read(){
		//接收一行,阻塞至\n结束
		//$strMessage = fgets($this->pClient);
		//指定长度读取
		//$strMessage = fread($this->pClient, 1024);
		$strMessage = stream_socket_recvfrom($this->pClient, 1024);
		//$strMessage = stream_get_contents($this->pClient);

		return $strMessage;
	}
	public function close(){
		fclose($this->pServer);
		
		$this->pServer = null;
	}
	public function setContext(){

	}
	public function setTimeOut($p_pConnetction, $p_nTimeOut = 1){
		$bRes = stream_set_timeout($p_pConnetction, $p_nTimeOut);
	}
	public function setBlock($p_pConnetction, $p_nMode = ServerStreamSocket::BLOCK){
		$bRes = stream_set_blocking($p_pConnetction, $p_nMode);
	}
	public function __destruct(){
		if($this->pServer){
			$this->close();
		}
	}
}
class SelectServerStreamSocket extends ServerStreamSocket{
	public function run(){
		$this->loop();
	}
	public function loop(){
		$arrRead = array();
		$arrWrite = $arrExp = null;
		$arrClient = array($this->pServer);
		while(true){
			$arrRead = $arrClient;
			if (stream_select($arrRead, $arrWrite, $arrExp, null) < 1){
				continue;
			}
			if(in_array($this->pServer, $arrRead)){
				$this->_connect();
	
				$arrClient[] = $this->pClient;
	
				$nKey = array_search($this->pServer, $arrRead);
				unset($arrRead[$nKey]);
			}
			foreach($arrRead as $pConnetcion){
				$bRes = $this->_reply();
				if($bRes === false){
					$nKey = array_search($this->pClient, $arrClient);
					unset($arrClient[$nKey]);;
					continue;
				}
			}
		}
		//usleep(100);
	}
}
class EvServerStreamSocket extends ServerStreamSocket{
	protected function _onConnect(){
		$this->_connect();
		
		$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
			$this->_reply();
		});
		Ev::run();
	}
	public function run(){
		$pReadWatcher = new EvIo($this->pServer, Ev::READ, function ($watcher, $revents) {
			$this->_onConnect();
		});
		Ev::run();
	}
}
class MulProcessServerStreamSocket extends EvServerStreamSocket{
	protected function _execute(){
		if(!$this->_reply()){
			//子进程执行完毕,通知父进程
			exit();
		}
	}
	protected function _onConnect(){
		$pid = pcntl_fork();
		if ($pid == -1) {
			die('could not fork');
		} else if ($pid) {
			pcntl_wait($status);
		} else {
			$this->_connect();

			$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
				$this->_execute();
			});
			Ev::run();
		}

	}
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);

$strAddress = $strProtocol."://".$strHost.":".$nPort;

$pServer = new EvServerStreamSocket($strAddress);

$pServer->run();

这里演示客户端与服务端交互,都是两步走,先发送一个请求再获取结果。在Thrift RPC远程调用中,既可先发送请求,过一会儿再来获取结果,达到异步交互的目的;也可发送完请求后立即获取结果,达到同步请求的目的。

参考链接:
Socket Programming in PHP
Socket programming with streams in php
PHP Socket programming tutorial
php 实例说明 socket通信机制
Mpass – PHP做Socket服务的解决方案
How to forcibly close a socket in TIME_WAIT?