标签归档:php

php

使用Phar打包发布PHP程序

上一遍介绍PHPUnit时,下载的PHPUnit是个Phar格式的文件并且可以独立运行。Phar归档是PHP 5.3增加的新特性,借鉴了JAVA中的JAR归档,可以将整个目录下的文件打包成单个可执行文件。虽然单个PHP文件也是可执行(Composer的install就是单个PHP文件执行创建对应的Phar),但是显得不方便。
Phar的创建、引用、解压、转换均可以在PHP中完成。要创建Phar需要更改php.ini如下

1
2
;phar.readonly = On
phar.readonly = Off

首先在swoole\server目录下创建一个需要打包的文件swoole.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?php
$serv = new swoole_server("127.0.0.1", 9002);
$serv->set(array(
    'worker_num' => 8,   //工作进程数量
    'daemonize' => true, //是否作为守护进程
));
$serv->on('connect', function ($serv, $fd){
    echo "Client:Connect.\n";
});
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
    $serv->send($fd, 'Swoole: '.$data);
    $serv->close($fd);
});
$serv->on('close', function ($serv, $fd) {
    echo "Client: Close.\n";
});
$serv->start();

在swoole目录下面创建index.php

1
2
<?php
require 'server/swoole.php';

测试一下是否能正常运行

1
2
3
[root@vagrant-centos64 swoole]php index.php
[root@vagrant-centos64 swoole]# netstat  -apn | grep 9002
tcp        0      0 127.0.0.1:9002              0.0.0.0:*                   LISTEN      8436/php

在swoole目录下创建执行打包操作的文件build.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?php
$dir = __DIR__;             // 需要打包的目录
$file = 'swoole.phar';      // 包的名称, 注意它不仅仅是一个文件名, 在stub中也会作为入口前缀
$phar = new Phar(__DIR__ . '/' . $file, FilesystemIterator::CURRENT_AS_FILEINFO | FilesystemIterator::KEY_AS_FILENAME, $file);
// 开始打包
$phar->startBuffering();
$phar->buildFromDirectory($dir);
$phar->delete('build.php');
// 设置入口
$phar->setStub("<?php
Phar::mapPhar('{$file}');
require 'phar://{$file}/index.php';
__HALT_COMPILER();
?>");
$phar->stopBuffering();
// 打包完成
echo "Finished {$file}\n";

进行打包

1
2
[root@vagrant-centos64 swoole]$ php build.php
Finished swoole.phar

测试执行一下

1
2
3
[root@vagrant-centos64 swoole]$ php swoole.phar
[root@vagrant-centos64 swoole]# netstat  -apn | grep 9002
tcp        0      0 127.0.0.1:9002              0.0.0.0:*                   LISTEN      8635/php

到这里swoole.phar可以单独发布执行力。也可以在其他项目(比如WEB)里面引用,例如

1
2
3
4
<?php
include 'swoole.phar';
//引用里面的文件
//include 'phar://swoole.phar/server/swoole.php';

SegmentFault便是将PHP打包成Phar进行发布的。
打包完成后,也可以将Phar转成Zip格式或解压出来,比如解压PHPUnit

1
2
3
4
<?php
$phar = new Phar('phpunit.phar');
$phar->convertToExecutable(Phar::ZIP);
//$phar->extractTo('phpunit');

刚才我们这样运行的:php swoole.phar,就是还需要在PharP包前面还要加一个php。如果想做成直接可运行的Phar包,可以像单个PHP可执行文件那样在文件开头加上:#!/usr/bin/env php

1
2
3
4
5
6
$phar->setStub("#!/usr/bin/env php
<?php
Phar::mapPhar('{$file}');
require 'phar://{$file}/index.php';
__HALT_COMPILER();
?>");

运行一下

1
./swoole.phar

注意如果你在window下面编辑刚才段代码的话,可能会报错:-bash: ./swoole.phar: /usr/bin/env: bad interpreter: No such file or directory。解决办法便是更改换行符为Unix格式,参见这里

GitHub上有个项目:phar-composer可以利用Composer来打包相应的项目为单个可执行文件。

参考链接:
使用phar上线你的代码包
PHP的Phar简介
使用Phar来打包发布PHP程序
Packaging Your Apps with Phar
PHP V5.3 中的新特性,第 4 部分: 创建并使用 Phar 归档
rpm打包利器rpm_create简介

PHP测试框架初探:PHPUnit

最近开始一个新的项目,涉及到了网络通信的客户/端服务端,刚开始都是边写边调试,完成之后觉得有必要留下相关的测试示例,以便日后使用调试。一些PHP项目会把示例写在文件开头的注释里面,但这样又没办法验证是否能正确运行。
workerman是在文件最后留下的运行代码,如RpcClient.php

1
2
3
4
5
// ==以下调用示例==
if(PHP_SAPI == 'cli' && isset($argv[0]) && $argv[0] == basename(__FILE__))
{
    //这里是RpcClient的测试用例
}

PHPUnit则是这样的

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env php
<?php
if (__FILE__ == realpath($GLOBALS['_SERVER']['SCRIPT_NAME'])) {
    $phar    = realpath($GLOBALS['_SERVER']['SCRIPT_NAME']);
    $execute = true;
} else {
    $files   = get_included_files();
    $phar    = $files[0];
    $execute = false;
}

这有点像python

1
2
if __name__ == "__main__":
    //这里运行相关调用

这样就可以对单个文件的代码边看边测试,但是这会在相关的代码里面混入不必要的测试用例代码,当有依赖于外部文件又不是很方便,于是便想把这部分测例代码独立出来。
PHPUnit是xUnit测试家族的一员,可以方便对测试代码进行管理,还可以结合其他扩展。依照官网步骤开始:

1
2
3
4
5
wget https://phar.phpunit.de/phpunit.phar
chmod +x phpunit.phar
sudo mv phpunit.phar /usr/local/bin/phpunit
phpunit --version
#PHPUnit 4.7.7 by Sebastian Bergmann and contributors.

写一个测试代码(PHPUnit也可以自动生成对应的测试代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?php
namespace Test\Client;
use Client\RPC;
class RPCTest extends \PHPUnit_Framework_TestCase{
    public $pClient = null;
    public $nMatchID = 567;
    public function __construct(){
        parent::__construct();
         
        $arrAddress = array(
                'tcp://127.0.0.1:9001'
        );
        // 配置服务端列表
        RPC::config($arrAddress);
        $this->nMatchID = 567;
        $this->pClient = RPC::instance('\Prize\Service');
    }
    public function testGet(){
        $mxRes = $this->pClient->get($this->nMatchID);
        $this->assertTrue(is_array($mxRes));
        $this->assertGreaterThan(0, count($mxRes));
    }
    public function testSendGet(){
        $this->assertTrue($this->pClient->send_get($this->nMatchID));
    }
    /**
     * 使用依赖,以便保证在同一个socket链接之内
     * @depends testSendGet
     */
    public function testRecvGet(){
        $mxRes = $this->pClient->recv_get($this->nMatchID);
        $this->assertTrue(is_array($mxRes));
        $this->assertGreaterThan(0, count($mxRes));
    }
}

这边的使用了外部类RPC,于是在Bootstrap.php里面定义相关的自动加载规则,以便测试时自动加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
define('STIE_PATH', getcwd().'/');
define('ROOT_PATH', STIE_PATH);
define('LIB_PATH', ROOT_PATH.'Lib/');
define('TEST_PATH', ROOT_PATH.'Test/');
 
spl_autoload_register(function($p_strClassName){
    $arrClassName = explode('\\', trim($p_strClassName, '\\'));
    if(!empty($arrClassName)){
        if($arrClassName[0] == 'Test'){
            $strPath = ROOT_PATH.implode(DIRECTORY_SEPARATOR, $arrClassName).'.php';
        }
        else{
            $strPath = LIB_PATH.implode(DIRECTORY_SEPARATOR, $arrClassName).'.php';
        }
    }
    if(is_file($strPath)){
        include $strPath;
    }
    else{
        throw new Exception("File not find : ".$strPath);
    }
});

现在运行测试代码

1
phpunit --bootstrap Test/Bootstrap.php Test/

出现了一些警报和错误。其中一个提示找不到PHPUnit_Extensions_Story_TestCase文件:

1
PHP Warning:  include(/usr/share/nginx/html/tutorial/mars/Lib/PHPUnit_Extensions_Story_TestCase.php): failed to open stream: No such file or directory in /usr/share/nginx/html/tutorial/mars/Test/Bootstrap.php on line 20

这文件是一个PHPUnit的基于故事的行为驱动开发(BDD)扩展,于是上网找了下想安装。

PHPUnit现在只支持Composer安装,如果用PEAR安装会提示失败

1
2
3
4
5
6
sudo pear channel-discover pear.phpunit.de
Discovering channel pear.phpunit.de over http:// failed with message: channel-add: Cannot open "http://pear.phpunit.de/channel.xml" (File http://pear.phpunit.de:80/channel.xml not valid (received: HTTP/1.1 410 Gone
))
Trying to discover channel pear.phpunit.de over https:// instead
Discovery of channel "pear.phpunit.de" failed (channel-add: Cannot open "https://pear.phpunit.de/channel.xml" (File https://pear.phpunit.de:443/channel.xml not valid (received: HTTP/1.1 410 Gone
)))

先安装Composer

1
2
3
curl -sS https://getcomposer.org/installer | php
mv composer.phar /usr/local/bin/composer
composer -V

创建一个composer.json,以便自动下载相关的库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
    "require-dev": {
        "phpunit/phpunit": "4.7.*",
        "phpunit/phpunit-selenium": ">=1.4",
        "phpunit/dbunit": ">=1.3",
        "phpunit/phpunit-story": "*",
        "phpunit/php-invoker": "*"
    },
    "autoload": {
        "psr-0": {"": "src"}
    },
    "config": {
        "bin-dir": "bin/"
    }
}

运行composer,注意:composer在国内慢的要死,可以参照这里

1
composer install --dev

便会在同级目录下面生成一个vendor目录,各自的依赖库都下载在里面,并且生成了autolaod.php,用于加载相关的类库。编辑Bootstarp.php增加一行

1
include ROOT_PATH."vendor/autoload.php";

再次运行PHPUnit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[vagrant@vagrant-centos64 mars]$ phpunit --bootstrap Test/Bootstrap.php Test/KM/Client/RPCTest
PHPUnit 4.7.7 by Sebastian Bergmann and contributors.
 
EES
 
Time: 540 ms, Memory: 14.25Mb
 
There were 2 errors:
 
1) Test\KM\Client\RPCTest::testGet
stream_socket_client(): unable to connect to tcp://127.0.0.1:9001 (Connection refused)
 
/usr/share/nginx/html/tutorial/mars/Lib/Client/Stream.php:19
/usr/share/nginx/html/tutorial/mars/Lib/Client/Stream.php:16
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:174
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:141
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:130
/usr/share/nginx/html/tutorial/mars/Test/Client/RPCTest.php:21
/usr/share/nginx/html/tutorial/mars/Test/Client/RPCTest.php:21
 
2) Test\KM\Client\RPCTest::testSendGet
stream_socket_client(): unable to connect to tcp://127.0.0.1:9001 (Connection refused)
 
/usr/share/nginx/html/tutorial/mars/Lib/Client/Stream.php:19
/usr/share/nginx/html/tutorial/mars/Lib/Client/Stream.php:16
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:174
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:141
/usr/share/nginx/html/tutorial/mars/Lib/Client/RPC.php:116
/usr/share/nginx/html/tutorial/mars/Test/Client/RPCTest.php:26
/usr/share/nginx/html/tutorial/mars/Test/Client/RPCTest.php:26
 
FAILURES!
Tests: 2, Assertions: 0, Errors: 2, Skipped: 1.

这下子没有警报了,只有详细的测试错误报告。刚才的测试代码里面testRecvGet方法依赖于testSendGet,一旦后者失败了,前者便会被跳过。
启动服务端再次运行测试代码

1
2
3
4
5
6
7
8
[vagrant@vagrant-centos64 mars]$ phpunit --colors --bootstrap Test/Bootstrap.php Test/
PHPUnit 4.7.7 by Sebastian Bergmann and contributors.
 
...
 
Time: 496 ms, Memory: 14.25Mb
 
OK (3 tests, 5 assertions)

这下子全部测试通过了。

前面运行测试是这样的:phpunit –bootstrap Test/Bootstrap.php Test/。–bootstrap是指在运行测试时首先加载的文件可以在这里面定义一些配置,自动加载规则,最后一个参数Test/是指要测试的目录,也可以指明运行某个测试用例,比如Test/Client/RPCTest,指运行RPCTest这个测试用例。

PHPUnit还有许多其他的命令行参数选项,如果每次这么输入也挺麻烦的,可以做成配置文件,加载运行就好了,比如phpunit.xml.dist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<phpunit backupGlobals="false"
         backupStaticAttributes="false"
         colors="true"
         convertErrorsToExceptions="true"
         convertNoticesToExceptions="true"
         convertWarningsToExceptions="true"
         processIsolation="false"
         stopOnFailure="false"
         syntaxCheck="false"
         bootstrap="Test/Bootstrap.php"
>
    <testsuites>
        <testsuite name="Lib Test Suite">
            <directory>./Test/</directory>
        </testsuite>
    </testsuites>
</phpunit>

然后运行:phpunit -c phpunit.xml.dist 就可以了,更多XML配置参考这里。还可以在这基础上更改执行配置,比如phpunit -c phpunit.xml.dist Test/Client/RPCTest,便是依照配置仅运行RPCTest。

以前挺纠结要不要写测试代码,一方面有测试代码的话,每次变更可以自动检测是否通过,另一方面,详细的测试用例需要大量时间去考虑,并且多数情况下都是服务模块之间的互相调用,难以预估结果,超出单元测试范围。现在看来写的这部门测试代码除了验证程序是否准确外,还可以作为示例文档。

参考链接:
PHPUnit: Failed opening required `PHPUnit_Extensions_Story_TestCase.php`
Issues while installing pear.phpunit.de/PHPUnit
单元测试
“单元测试要做多细?”
我讨厌单元测试:滕振宇谈如何进行单元测试
自动化单元测试实践之路
Unit Testing Tutorial Part I: Introduction to PHPUnit

MySQL秒杀优化

今天学习了楼方鑫先生《基于SQL的秒杀解决方案》,讲解了如何定位和优化秒杀业务中问题。
首先介绍了库存业务,库存可以分为前端库存,后端库存,实体库存。秒杀时,存在的主要问题

  • 库存数据不准确,下单、付款后,得知零库存;超卖或少卖
  • 废单较多,只下单不付款,转化率低
  • 热点商品,拖垮整个站

秒杀过程中,需要解决的技术点包括

  • 余额减一
  • 操作明细,方便追溯对账,防止一个帐号多次参与
  • 完整事务,保障记录明细与扣减库存同时完成
  • 数据落地,内存数据不可靠

针对库存技术要求,做了多个库存解决方案,比如Mysql + Read /Write Cache 。Read Cache方案不足是读有延迟影响用户体验;Write Cache方案存在多个APP写数据不一致性。Mysql + Cache + NoSQL方案则太复杂未实现。

于是又重新回到优化Mysql上。Mysql优势在于事务机制成熟,程序稳定。存在技术难点:单行并发,热点商品,瞬间压力,前一分钟,千万用户,容易堵塞,拖垮网站。于是从以下几个方面进行优化

  • 事务优化,单行更新
  • 并发优化,最大并发数
  • 排队优化,抢同一商品

分析秒杀时的处理逻辑,扫描系统代码,发现大部分程序都在等待确认Update记录数,才提交事务。

  • 开启事务
  • Insert库存明细
  • Update库存余额
  • 提交事务

在良好设计下,Mysql的Insert操作,不使用自增列是不会阻塞请求。但是Mysql的Update同一条记录是串行的,需要等远程客户端发送提交命令后才能释放锁,让其他会话继续。简单的更新操作,不考虑IO和锁冲突,一条语句执行的时间大约是0.1ms,一般条件下的网络时延为0.4-0.8ms,即等待事务提交通知的时间比真正SQL执行的时间长数倍。
于是扩展了SQL语法(OneSQL),指定在Update执行完后自动提交,不需要等待客户端发送提交命令,从而节约这一个网络来回的事务等待时间,提升网络性能。

秒杀时如果遇到大量请求需要进行排队,以免太多的请求拖垮Mysql

  • 在应用层排队的缺点,应用需要改造,使用统一框架(需要考虑跨语言),应用集群扩容时,控制不准确(连接数分配)
  • 在Mysql排队的优点,应用改造极少,只需修改少量SQL语句,无需统一框架,排队精确,发挥InnoDB性能。

于是开发了兼容Mysql的分布式数据访问层(OneProxy),为并发请求进行排队。

另外,还对热点商品进行独立数据库拆分和优化。目前,双十一前商品便已挂出,用户可以收藏或预购,对于商家而言可以准备更多商品;对于平台而言可以预先发现热点商品做优化。

总结,对于业务优化,需要循序渐进,深入了解业务逻辑和技术点,比较不同的解决方案,就算是平常的update操作也有优化空间;同时需要从其他方面进行特定优化,如高并发排队,热点数据分离等。

除了后端数据库优化,对于秒杀抽奖业务,问题的解决核心就是控制单位时间内的流量,使其不超过后端的处理能力。前端的做法包括

  • 分批次(少量多次)进行秒杀
  • 先玩游戏再抢购,如抽奖
  • 随机过滤掉部分请求,仅部分进入系统,如1/10
  • 阈值控制,一旦达到阈值,不再接收新请求
  • 预约排号,未排号用户返回失败(用户分类)
  • 验证码验证

另外,OneProxy 提供的连接池功能对于PHP非常有用。PHP运行在CGI下面,每一个请求到来便需要重新创建一个数据库连接与Mysql进行交互,并发量大量的情况下便会出现:too many connetion,乃至拖垮数据库:mysql server has gone away,影响其他业务。因此Mysql连接池,对于PHP显得非常重要。

更新:小米网在开发抢购系统的时候,最早使用PHP + Mysql碰到了一些问题,例如并发性能,数据一致性,在OneSQL上面都已经做了改进优化,只是小米自己使用Go语言重构,开发大秒系统(BigTap)。

参考链接:
限量秒杀等高并发活动的正确性如何保证?
MySQL 5.6.17/Percona5.6.16/MariaDB 10.0.11/OneSQL 5.6.16 TpmC测试
由12306.cn谈谈网站性能技术
“米粉节”背后的故事——小米网抢购系统开发实践
Web系统大规模并发——电商秒杀与抢购
OneProxy : 如何给PHP页面以及其他Ruby/Python/Go程序添加连接池功能?
基于Swoole实现的Mysql连接池

PHP ZeroMQ开发

ZeroMQ的名字有点巧妙,看起来是个MQ却加了个0,变得不是MQ。ZeroMQ是一个面向消息传递的网络通信框架,支持程序在进程内部部通信,进程之间通信,网络间通信,多播等。ZeroMQ对Socket进行了封装,支持多种网络结构范式如Request/Reply,Pub/Sub,Pull/Push,中介,路由等,还可以在这些模式再次扩展,动态扩容程序和分布式任务开发,能够轻易搭建服务程序集群。

ZeroMQ与支持AMQP的消息中间件不一样,ZeroMQ是一个网络通信库,需要自行实现中间节点和消息的管理。

在CentOS安装ZeroMQ4

1
2
3
4
5
6
7
8
9
10
11
12
git clone https://github.com/zeromq/zeromq4-x.git
cd zeromq4-x
./autogent.sh
./configure
sudo make
sudo make install
 
#声明libzmq库的位置
sudo vim /etc/ld.so.conf.d/libzmq.conf
#内容:/usr/local/lib
 
sudo ldconfig

ZeroMQ支持多种编程语言,也包括PHP。php-zmq安装

1
2
3
4
5
6
7
8
git clone https://github.com/mkoppanen/php-zmq.git
cd php-zmq
phpize
./configure
sudo make
sudo make install
 
sudo vim /etc/php.ini

编辑PHP.ini增加扩展信息

1
2
[zeromq]
extension = zmq.so

查看扩展是否加载成功:php -m | grep php。
先写一个简单请求-应答,首先是服务端reply.php

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5559");
while (true){
    $strMessage = $pServer->recv();
    echo $strMessage."\r\n";
    $pServer->send("From Server1:".$strMessage);
}

然后是客户端request.php

1
2
3
4
5
6
<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("tcp://localhost:5559");
$pClient->send("Hello From Client:".uniqid());
var_dump($pClient->recv());

分别在不同终端预先一下程序

1
2
3
4
#请求者可以先启动
php request.php
#另一个终端
php reply.php

使用ZeroMQ进行通信的步骤

  • 使用ZMQContext创建一个上下文
  • 使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括
    • PUB,SUB
    • REQ,REP
    • REQ,ROUTER (take care, REQ inserts an extra null frame)
    • DEALER,REP (take care, REP assumes a null frame)
    • DEALER,ROUTER
    • DEALER,DEALER
    • ROUTER,ROUTER
    • PUSH,PULL
    • PAIR,PAIR

    分类包括

    • 轮询,REQ,PUSH,DEALER
    • 多播,PUB
    • 公平排队,REP,SUB,PULL,DEALER
    • 明确寻址,ROUTER
    • 单播,PAIR
  • 如果是服务端就bind,如果是客户端就conncet,这里的连接信息支持
    • 进程内部通信,inproc://
    • 进程间通信,ipc://
    • 网络间通信,tcp://
    • 多播,pgm://
  • 使用send/recv发送/接收消息

使用ZeroMQ创建通信比socket简单多了,与stream_socket差不多。但是使用ZeroMQ,客户端可以先启动而不用管服务端是否已经启动了,等服务端连接上了便会自动传递消息,还可以维持节点之间的心跳。

ZeroMQ与socket通信是不一样的。ZeroMQ是无状态的,对socket的细节进行了封装,不能知道彼此的socket连接信息,仅能接收和发送消息;ZeroMQ能够使用一个socket与多个节点进行通信,具有极高的性能。

再回头看一下服务端程序,这里采用while循环来处理,亦即同一时刻只能处理一个请求,多个请求排队直到被轮询到,客户端的发送和接收都是同步等待。由于不知道客户端信息,也不能在子进程内处理完成再返回。这里就需要用到ZeroMQ各种范式的组合,比如下面这个
fig16
这里使用ROUTER和DEALER作为中介,转发请求,客户端可以异步发送求,不用等待服务端响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");
 
$pPoll = new ZMQPoll();
$pPoll->add($pFrontend, ZMQ::POLL_IN);
$pPoll->add($pBackend, ZMQ::POLL_IN);
 
 
$arrRead = $arrWrite = array();
while(true){
    $nEvent = $pPoll->poll($arrRead, $arrWrite);
    if ($nEvent > 0) {
        foreach($arrRead as $pSocket){
            if($pSocket === $pFrontend){
                while (true){
                    $strMessage = $pSocket->recv();
                    $nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
                    $pBackend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
                    if(!$nMore){
                        break;
                    }
                }
            }
            else if ($pSocket === $pBackend){
                while (true){
                    $strMessage = $pSocket->recv();
                    $nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
                    $pFrontend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
                    if(!$nMore){
                        break;
                    }
                }
            }
        }
    }
}

然后更改服务端reply.php,不再绑定监听,而不是连接到DEALER上

1
2
3
4
5
6
7
8
9
10
<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
//$pServer->bind("tcp://*:5555");
$pServer->connect("tcp://localhost:5560");
while (true){
    $strMessage = $pServer->recv();
    echo $strMessage."\r\n";
    $pServer->send("From Server1:".$strMessage);
}

这里使用ZMQPoll对ZMQSOcket的输入输出事件进行轮询,将ROUTER收到的REQ转发给服务端,将DEALER收到的REP转发给客户端。事实上,还有更简便的方法:使用ZMQDevice将ROUTER和DEALER组合起来

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");
 
$pDevice = new ZMQDevice($pFrontend, $pBackend);
$pDevice->run();

ZeroMQ的Pub/Sub的通信模型支持一个发布者发布消息给多个订阅者,也支持一个订阅者从多个发布者订阅消息。首先写一个发布者

1
2
3
4
5
6
7
8
9
10
11
12
<?php
$pContext = new ZMQContext();
$pPublisher = new ZMQSocket($pContext, ZMQ::SOCKET_PUB);
$pPublisher->bind("tcp://*:5563");
 
while (true) {
    $pPublisher->send("A", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We don't want to see this");
    $pPublisher->send("B", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We would like to see this");
    sleep (1);
}

然后是订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$pContext = new ZMQContext();
$pSubscriber = new ZMQSocket($pContext, ZMQ::SOCKET_SUB);
$pSubscriber->connect("tcp://localhost:5563");
#可以连接多个发布者
$pSubscriber->connect("tcp://localhost:5564");
$pSubscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "B");
 
while (true) {
    //  Read envelope with address
    $address = $pSubscriber->recv();
    //  Read message contents
    $contents = $pSubscriber->recv();
    printf ("[%s] %s%s", $address, $contents, PHP_EOL);
}

Pub/Sub模型,发布者只能发布消息,要求发布消息前,先声明主题(地址),然后发布消息内容;订阅者只能接收消息,先设置订阅主题,然后两次接收,第一次为消息主题,第二次为消息内容。
Pub/Sub模型通消息为单向流动,可以结合其他模型让订阅者与发布者互动,比如REQ\REP。

ZeroMQ的Push/Pull模型,生产者负责推送消息,消费者负责拉取消息。初看之下Pull/Push模型与Pub/sub模型类似,但是Pull/Push下生产者产生的消息只会投递给一个消费者,并不会发布给全部消费者,适合用于任务投递分配
fig5
Push和Pull都既可作为服务端,也可作为客户端。服务端Push.php

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pPush = new ZMQSocket($pContext, ZMQ::SOCKET_PUSH);
 
$pPush->bind("tcp://*:5558");
//$pPush->connect("tcp://localhost:5558");
//$pPush->connect("tcp://localhost:5559");
 
$pPush->send("Hello Client 1");

客户端Pull.php

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pPull = new ZMQSocket($pContext, ZMQ::SOCKET_PULL);
 
//$pPull->bind("tcp://*:5558");
$pPull->connect("tcp://localhost:5558");
$pPull->connect("tcp://localhost:5559");
 
var_dump($pPull->recv());

如果同时启动了两个客户端Pull.php,而只启动一个服务端Push.php,那么一次只会有一个客户端接收到消息。也可以以Pull作为主动监听,Push作为被动连接。可以同时接可以Pub/Sub和Pull/Push来处理任务
fig56
如果是用ZeroMQ传递消息收不到,可以按下面这个流程查问题
chapter1_9
除了客户端可以连接多个服务端,服务端同样可以绑定多个地址。在REQ/REP模型里,让服务端同时使用IPC(进程间通信)来处理本机的连接

1
2
3
4
5
6
7
8
9
10
<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5556");
$pServer->bind("ipc:///tmp/req.ipc");
while(true){
    $message = $pServer->recv();
    echo $message . PHP_EOL;
    $pServer->send("Hello from server1:".$message);
}

客户端可以选择走TCP或者IPC进行消息通信

1
2
3
4
5
6
7
8
<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("ipc:///tmp/req.ipc");
//$pClient->connect("tcp://localhost:5556");
$pClient->send("Hello From Client1:".uniqid());
$strMessage = $pClient->recv();
echo $strMessage,PHP_EOL;

使用ZeroMQ的进程内部消息通信也很简单

1
2
3
4
5
6
7
8
9
$pServer  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
$pServer->bind("inproc://reply");
 
 
$pClient  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ);
$pClient->connect("inproc://reply");;
$pClient->send("Hello From Client1:".uniqid());
 
var_dump($pServer->recv());

ZeroMQ为消息传递的提供极简的方法,提供了各种连接模型,可以自由扩展。zguide更像是一个网络编程指南,指导大家如何利用ZeroMQ搭建各种网络通信模式,提高程序扩展性和健壮性。虽然ZeroMQ解决了进程间和网络间的通信问题,但是各个组件本身进程控制仍然需要自行实现。

更新:ZeroMQ的作者用C语言创建了另外一个支持多种通用通信范式的socket库:nanomsg,可以用来代替ZeroMQ做的那些事,提供了更好的伸缩性,也有对应的PHP扩展

参考链接:
ZMQ 指南
ZeroMQ in PHP
zeromq is the answer
ZeroMQ + libevent in PHP
Europycon2011: Implementing distributed application using ZeroMQ
Getting Started with ‘nanomsg’
A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

PHP Socket通信

前一篇介绍了跨语言的服务调用框架Thrift,模块与模块之间调用,网络通信必不可少。这里具体介绍下如何使用PHP socket客户端与服务端进行通信。

PHP 的Socket扩展是基于流行的BSD sockets,实现了和socket通讯功能的底层接口,它可以和通用客户端一样当做一个socket服务器。这里的通用客户端是指stream_socket_*系列封装的函数。

首先写一个socket服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?php
class ServerSocket{
    protected $strHost = "127.0.0.1";
    protected $nPort = 2015;
    protected $nProtocol = SOL_TCP;
    protected $pSocket = null;
    protected $pClient = null;
 
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
        //参数验证
        $this->strHost = $p_strHost;
        $this->nPort = $p_nPort;
        $this->nProtocol = $p_nProtocol;
        if($this->_create()&&$this->_bind()){
            $this->_listen();
        }
    }
    protected function _create(){
        $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
        if(!$this->pSocket){
            $this->_log();
        }
        return $this->pSocket;
    }
    protected function _bind(){
        $bRes = socket_bind($this->pSocket, $this->strHost, $this->nPort);
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    protected function _listen(){
        $bRes  = socket_listen($this->pSocket, 10) ;
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function accept(){
        $this->pClient = socket_accept($this->pSocket);
        if(!$this->pClient){
            $this->_log();
        }
        return $this->pClient;
    }
    protected function _connect(){
        $this->accept();
             
        if(socket_getpeername($this->pClient, $address, $port)){
            echo "Client $address : $port is now connected to us. \n";
        }
        $this->write("hello world from server\n");
    }
    protected function _reply(){
        $mxData = $this->read();
        var_dump($mxData);
        if ($mxData == false) {
            socket_close($this->pClient);
 
            echo "client disconnected.\n";
            return false;
        }
        else{
            $strMessage = "Client: ".trim($mxData)."\n";
            $this->write($strMessage);
            return true;
        }
    }
    public function run(){
        $this->_connect();
        $this->_reply();
    }
    public function read(){
        $mxMessage = socket_read($this->pClient, 1024, PHP_BINARY_READ);
        if($mxMessage === false){
            $this->_log();
        }
        return $mxMessage;
    }
    public function write($p_strMessage){
        $bRes = socket_write($this->pClient, $p_strMessage, strlen ($p_strMessage));
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function close(){
        $bRes = socket_close($this->pSocket);
 
        $this->pSocket = null;
    }
    protected function _log(){
        $this->strErrorCode = socket_last_error();
        $this->strErrorMsg = socket_strerror($this->strErrorCode);
        //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
    }
    public function __destruct(){
        if($this->pSocket){
            $this->close();
        }
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new ServerSocket($strHost, $nPort);
$pServer->run();

这里对socket_*系列函数进行了包装,创建socket服务端的步骤

  • 使用socket_create创建socket(套接字)。第一个参数AF_INET指IPV4网络协议,TCP和UDP均可使用,对应IPV6网络协议为AF_INET6,也可以使用UNIX socket协议AF_UNIX,作进程间通信
  • 。第二个参数对应套接字类型,SOCK_STREAM对应TCP协议使用,SOCK_DGRAM对应UDP协议使用,还有SOCK_SEQPACKET,SOCK_RAW,SOCK_RDM等类型。第三个为协议类型,TCP协议对应常量SOL_TCP,UDP协议对应常量SOL_UDP,其他协议可以从getprotobyname函数获取。

  • 使用socket_bind将套接字绑定到对应的主机端口或者UNIX socket上
  • 使用socket_listen监听该套接字上的连接
  • 使用socket_accept接收套接字上的请求连接,返回一个新的套接字用于与客户端通信。如果没有连接接入,将会阻塞住;如果有多个连接,使用第一个达到的连接。
  • 开始通信,使用socket_read获取请求信息,使用socket_write返回响应结果
  • 使用socket_close关闭连接,包括原始的和socket_accept产生的套接字

这个过程中,可以使用socket_last_error和socket_strerror获取错误信息。接着创建客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
<?php
class ClientSocket{
    protected $strHost = "127.0.0.1";
    protected $nPort = 2015;
    protected $nProtocol = SOL_TCP;
    private $pSocket = null;
    public $strErrorCode = "";
    public $strErrorMsg  = "";
 
    public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
        //参数验证
        $this->strHost = $p_strHost;
        $this->nPort = $p_nPort;
        $this->nProtocol = $p_nProtocol;
        if($this->_create()){
            $this->_connect();
        }
    }
    private function _create(){
        $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
        if(!$this->pSocket){
            $this->_log();
        }
        return $this->pSocket;
    }
    private function _connect(){
        $pSocket = $this->_create();
        $bRes = socket_connect($pSocket, $this->strHost, $this->nPort);
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function read(){
        $strMessage = "";
        $strBuffer = "";
        while ($strBuffer = socket_read ($this->pSocket, 1024, PHP_BINARY_READ)) {
            $strMessage .= $strBuffer;
        }
        return $strMessage;
    }
    public function write($p_strMessage){
        $bRes = socket_write($this->pSocket, $p_strMessage, strlen($p_strMessage));
        if(!$bRes){
            $this->_log();
        }
    }
    public function send($p_strMessage){
        $bRes = socket_send($this->pSocket , $p_strMessage , strlen($p_strMessage) , 0);
        if(!$bRes){
            $this->_log();
        }
        return true;
    }
    public function recv(){
        $strMessage = "";
        $strBuffer = "";
        $bRes = socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL);
        if(!$bRes){
            $this->_log();
        }
        $strMessage .=$strBuffer;
        return $strMessage;
    }
    public function close(){
        $bRes = socket_close($this->pSocket);
 
        $this->pSocket = null;
    }
    private function _log(){
        $this->strErrorCode = socket_last_error();
        $this->strErrorMsg = socket_strerror($this->strErrorCode);
        //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
    }
    public function __destruct(){
        if($this->pSocket){
            $this->close();
        }
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$pClient = new ClientSocket($strHost, $nPort);
 
var_dump($pClient->read());
$strMessage = 'Some Thing :'.uniqid();
var_dump($strMessage);
$pClient->write($strMessage);
var_dump($pClient->read());
 
/*
 var_dump($pClient->recv());
$pClient->send('hello');
var_dump($pClient->recv());
*/
$pClient->close();

客户端的创建步骤:

  • 使用socket_create创建socket套接字,与服务端对应
  • 使用socket_connect连接到服务端的地址或UNIX socket
  • 开始通信,可以使用socket_write和socket_read向套接字写入和读取信息,也可以使用socket_send和socket_recv发送和接收信息
  • 使用socket_close关闭连接

运行服务端程序

1
php serversocket.php

在另一个终端里运行

1
2
[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             0.0.0.0:*                   LISTEN      12139/php

如果运行服务端失败,提示 socket_bind(): unable to bind address [98]: Address already in use ,则是端口绑定失败。查看端口占用

1
2
[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             127.0.0.1:36618             TIME_WAIT   -

该端口处于TIME_WAIT状态,需要再等一会儿才会释放。这是因为TCP连接关闭需要四次握手,服务端主动关闭了连接,但是未收到客户端发过来的关闭确认,导致处于等待状态,具体原因见火丁笔记《再叙TIME_WAIT》

如果服务端已经运行成功,在另一个终端里运行客户端程序

1
php clientsocket.php

这是一个简单服务端/客户端请求应答模型,通常服务端会一直处于监听状态,处理新的请求,重新写一个循环监听的服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SelectServerSocket extends ServerSocket{
    public function run(){
        $this->loop();
    }
    public function loop(){
        $arrRead = array();
        $arrWrite = $arrExp = null;
        $arrClient = array($this->pSocket);
     
        while(true){
            $arrRead = $arrClient;
            if (socket_select($arrRead, $arrWrite, $arrExp, null) < 1){
                continue;
            }
            foreach ($arrRead as $pSocket){
                if($pSocket == $this->pSocket){
                    $this->_connect();
                         
                    $arrClient[] = $this->pClient;
                }
                else{
                    $bRes = $this->_reply();
                    if($bRes === false){
                        $nKey = array_search($this->pClient, $arrClient);
                        unset($arrClient[$nKey]);
                        continue;
                    }
                }
     
            }
        }
        //usleep(100);
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new SelectServerSocket($strHost, $nPort);
$pServer->run();

在循环里面使用socket_select查询有可以读的套接字,如果套接字为原始监听的套接字,则使用socket_accept获取新接入的通信套接字进行通信;如果是通信套接字,则与客户端进行交互。

这里socket_select($arrRead, $arrWrite, $arrExp, null)的第四个参数为null,表示可以无限阻塞,如果为0则不阻塞立即返回,其他大于0值则等待超时。
socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL)的第四个参数为MSG_WAITALL,表示阻塞读取结果。
socket_read ($this->pSocket, 1024, PHP_BINARY_READ )的第三个参数PHP_BINARY_READ表示读取以\0结束,PHP_NORMAL_READ表示读取以\n或\r结束

在终端里运行服务端,会一直在那里等待新的连接。这时候运行客户端,客户端确也阻塞住了。解决的办法有两种:超时设置和非阻塞设置。给ClientSocket类增加超时和阻塞设置的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public function setTimeOut($p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
    $nSend = (int)$p_nSendTimeOut;
    $nRecv = (int)$p_nRecvTimeOut;
 
    $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
    $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
 
    socket_set_option($this->pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
    socket_set_option($this->pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}
public function setBlock($p_bType = true){
    if($p_bType){
        socket_set_block($this->pSocket);
    }
    else{
        socket_set_nonblock($this->pSocket);
    }
}

客户端端运行前,先设置一下超时或非阻塞即,此时程序不会再阻塞了

1
2
3
4
5
6
$pClient = new ClientSocket($strHost, $nPort);
 
$pClient->setTimeOut(1, 1);
//$pClient->setBlock(false);
 
//do request here

同样在服务端设置超时和非阻塞也是可以的,给ServerSocket增加超时和非阻塞设置的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
protected function _setNoBlock($p_pSocket){
    socket_set_nonblock($p_pSocket);
}
protected function _setTimeOut($p_pSocket, $p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
    $nSend = (int)$p_nSendTimeOut;
    $nRecv = (int)$p_nRecvTimeOut;
 
    $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
    $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
 
    socket_set_option($p_pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
    socket_set_option($p_pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}

将SelectServerSocket的socket_accept后产生的连接设置为非阻塞

1
2
3
4
5
6
7
public function loop(){
    $arrRead = array();
    $arrWrite = $arrExp = null;
    $arrClient = array($this->pSocket);
    $this->_setNoBlock($this->pSocket);
 
    while(true){

再次运行服务端,客户端也不会阻塞了。

在while循环里面使用socket_select进行查询,效率比较低下,有先的连接要等下次循环才能处理;有时候并没有连接需要处理,也一直在循环。可以结合前面介绍过的PHP Libev扩展进行监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class EvServerSocket extends ServerSocket{
    protected function _onConnect(){
        $this->_connect();
     
        $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
            $this->_reply();
        });
        Ev::run();
    }
    public function run(){
        $pReadWatcher = new EvIo($this->pSocket, Ev::READ, function ($watcher, $revents) {
            $this->_onConnect();
        });
        Ev::run();
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new EvServerSocket($strHost, $nPort);
$pServer->run();

代码看起来简单了很多。当原始套接字监听到可读事件时,便为新的套接字也创建可读事件监听 ,在事件里面处理新的连接。

通常的服务端程序是一个进程监听原始套接字,然后交由其他进程/线程处理新的连接套接字,与客户端进行交互,提升服务端性能。这样子又涉及到了多进程/多线程的控制、通信,需要一套完善的体系才行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MulProcessServerSocket extends EvServerSocket{
    protected function _execute(){
        if(!$this->_reply()){
            //子进程执行完毕,通知父进程
            exit();
        }
    }
    protected function _onConnect(){
        $pid = pcntl_fork();
        //父进程和子进程都会执行下面代码
        if ($pid == -1) {
            //错误处理:创建子进程失败时返回-1.
            die('could not fork');
        } else if ($pid) {
            //父进程会得到子进程号,所以这里是父进程执行的逻辑
            pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
        } else {
            //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
            $this->_connect();
 
            $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
                $this->_execute();
            });
            Ev::run();
        }
 
    }
}

还可以使用stream_socket_*系列函数来创建sockt服务端和客户端。类似的创建一个客户端与之前的服务端进行交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<?php
class ClientStreamSocket{
    private $pConnetion = null;
    protected $strAddress = "tcp://127.0.0.1:2016";
    protected $nTimeOut   = 3;
    protected $nFlag      = STREAM_CLIENT_CONNECT;
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    const BLOCK   = 1;
    const NOBLOCK = 0;
    public function __construct($p_strAddress, $p_nTimeOut = 3, $p_nFlag = STREAM_CLIENT_CONNECT){
        $this->strAddress = $p_strAddress;
        $this->nTimeOut   = $p_nTimeOut;
        $this->nFlag      = $p_nFlag;
        $this->_connect();
    }
    private function _connect(){
        $this->pConnetion = stream_socket_client($this->strAddress, $this->strErrorCode, $this->strErrorMsg, $this->nTimeOut, $this->nFlag);
        if(!$this->pConnetion){
            throw new Exception("connect exception:".$this->strErrorMsg, $this->strErrorCode);
        }
        return $this->pConnetion;
    }
    public function write($p_strMessage){
        if(fwrite($this->pConnetion, $p_strMessage) !== strlen($p_strMessage))
        {
            throw new Exception('Can not send data');
        }
        return true;
    }
    public function read(){
        //接收一行,阻塞至\n结束
        //$strMessage = fgets($this->pConnetion);
        //指定长度读取
        //$strMessage = fread($this->pConnetion, 1024);
        $strMessage = stream_socket_recvfrom($this->pConnetion, 1024);
        //$strMessage = stream_get_contents($this->pConnetion);
 
        return $strMessage;
    }
    public function close(){
        fclose($this->pConnetion);
        $this->pConnetion = null;
    }
    public function setContext(){
 
    }
    public function setTimeOut($p_nTimeOut = 1){
        $bRes = stream_set_timeout($this->pConnetion, $p_nTimeOut);
    }
    public function setBlock($p_nMode = ClientStreamSocket::BLOCK){
        $bRes = stream_set_blocking($this->pConnetion, $p_nMode);
    }
    public function __destruct(){
        if($this->pConnetion){
            $this->close();
        }
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$strAddress = $strProtocol."://".$strHost.":".$nPort;
 
$pStream = new ClientStreamSocket($strAddress);
//$pStream->setBlock(ClientStreamSocket::NOBLOCK);
//$pStream->setTimeOut(1);
var_dump($pStream->read());
$pStream->write("hello from client\n");
var_dump($pStream->read());
$pStream->close();

使用stream_socket_*系列函数创建客户端要简单不少

  • 首先使用stream_socket_client创建一个socket操作流(stream)
  • 然后就可以像操作流式文件那样造成socket stream,使用fread和fwrite进行读写操作,也可以使用stream_socket_recvfrom和stream_socket_sendto进行操作
  • 使用fclose或stream_socket_shutdown关闭连接

使用stream_socket_*系列函数创建一个服务端来与之前的客户端进行交互,同样很简单,也与ServerSocket类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
<?php
class ServerStreamSocket{
    protected $pServer = null;
    protected $pClient = null;
    protected $strAddress = "tcp://127.0.0.1:2016";
    protected $nFlag      = STREAM_SERVER_LISTEN;
     
    const BLOCK   = 1;
    const NOBLOCK = 0;
     
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    public function __construct($p_strAddress, $p_nFlag = STREAM_SERVER_LISTEN){
        $this->strAddress = $p_strAddress;
        $this->nFlag = $p_nFlag;
        $this->_create();
    }
    protected function _create(){
        $this->pServer = stream_socket_server($this->strAddress, $this->strErrorCode, $this->strErrorMsg);
        if(!$this->pServer ){
            throw new Exception("create exception:".$this->strErrorMsg, $this->strErrorCode);
        }
        return $this->pServer ;
    }
    public function accept(){
        $this->pClient = stream_socket_accept($this->pServer);
        if(!$this->pClient ){
            return false;
        }
        return $this->pClient ;
    }
    protected function _connect(){
        $this->accept();
        echo "Client". stream_socket_get_name($this->pClient, true)." is now connected to us. \n";
        $this->write("hello world from server\n");
    }
    protected function _reply(){
        $mxData = $this->read();
        var_dump($mxData);
        if($mxData == false){
            fclose($this->pClient);
                 
            echo "client disconnected.\n";
            return false;
        }
        else{
            $strMessage = "Client:".trim($mxData)."\n";
            $this->write($strMessage);
            return true;
        }
    }
    public function run(){
        $this->_connect();
        $this->_reply();
    }
    public function write($p_strMessage){
        //$nLen = fwrite($this->pClient, $p_strMessage);
        $nLen = stream_socket_sendto($this->pClient, $p_strMessage);
        if($nLen !== strlen($p_strMessage))
        {
            throw new Exception('Can not send data');
        }
        return true;
    }
    public function read(){
        //接收一行,阻塞至\n结束
        //$strMessage = fgets($this->pClient);
        //指定长度读取
        //$strMessage = fread($this->pClient, 1024);
        $strMessage = stream_socket_recvfrom($this->pClient, 1024);
        //$strMessage = stream_get_contents($this->pClient);
 
        return $strMessage;
    }
    public function close(){
        fclose($this->pServer);
         
        $this->pServer = null;
    }
    public function setContext(){
 
    }
    public function setTimeOut($p_pConnetction, $p_nTimeOut = 1){
        $bRes = stream_set_timeout($p_pConnetction, $p_nTimeOut);
    }
    public function setBlock($p_pConnetction, $p_nMode = ServerStreamSocket::BLOCK){
        $bRes = stream_set_blocking($p_pConnetction, $p_nMode);
    }
    public function __destruct(){
        if($this->pServer){
            $this->close();
        }
    }
}
class SelectServerStreamSocket extends ServerStreamSocket{
    public function run(){
        $this->loop();
    }
    public function loop(){
        $arrRead = array();
        $arrWrite = $arrExp = null;
        $arrClient = array($this->pServer);
        while(true){
            $arrRead = $arrClient;
            if (stream_select($arrRead, $arrWrite, $arrExp, null) < 1){
                continue;
            }
            if(in_array($this->pServer, $arrRead)){
                $this->_connect();
     
                $arrClient[] = $this->pClient;
     
                $nKey = array_search($this->pServer, $arrRead);
                unset($arrRead[$nKey]);
            }
            foreach($arrRead as $pConnetcion){
                $bRes = $this->_reply();
                if($bRes === false){
                    $nKey = array_search($this->pClient, $arrClient);
                    unset($arrClient[$nKey]);;
                    continue;
                }
            }
        }
        //usleep(100);
    }
}
class EvServerStreamSocket extends ServerStreamSocket{
    protected function _onConnect(){
        $this->_connect();
         
        $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
            $this->_reply();
        });
        Ev::run();
    }
    public function run(){
        $pReadWatcher = new EvIo($this->pServer, Ev::READ, function ($watcher, $revents) {
            $this->_onConnect();
        });
        Ev::run();
    }
}
class MulProcessServerStreamSocket extends EvServerStreamSocket{
    protected function _execute(){
        if(!$this->_reply()){
            //子进程执行完毕,通知父进程
            exit();
        }
    }
    protected function _onConnect(){
        $pid = pcntl_fork();
        if ($pid == -1) {
            die('could not fork');
        } else if ($pid) {
            pcntl_wait($status);
        } else {
            $this->_connect();
 
            $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
                $this->_execute();
            });
            Ev::run();
        }
 
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$strAddress = $strProtocol."://".$strHost.":".$nPort;
 
$pServer = new EvServerStreamSocket($strAddress);
 
$pServer->run();

这里演示客户端与服务端交互,都是两步走,先发送一个请求再获取结果。在Thrift RPC远程调用中,既可先发送请求,过一会儿再来获取结果,达到异步交互的目的;也可发送完请求后立即获取结果,达到同步请求的目的。

参考链接:
Socket Programming in PHP
Socket programming with streams in php
PHP Socket programming tutorial
php 实例说明 socket通信机制
Mpass – PHP做Socket服务的解决方案
How to forcibly close a socket in TIME_WAIT?