标签归档:事件

PHP Socket通信

前一篇介绍了跨语言的服务调用框架Thrift,模块与模块之间调用,网络通信必不可少。这里具体介绍下如何使用PHP socket客户端与服务端进行通信。

PHP 的Socket扩展是基于流行的BSD sockets,实现了和socket通讯功能的底层接口,它可以和通用客户端一样当做一个socket服务器。这里的通用客户端是指stream_socket_*系列封装的函数。

首先写一个socket服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?php
class ServerSocket{
    protected $strHost = "127.0.0.1";
    protected $nPort = 2015;
    protected $nProtocol = SOL_TCP;
    protected $pSocket = null;
    protected $pClient = null;
 
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
        //参数验证
        $this->strHost = $p_strHost;
        $this->nPort = $p_nPort;
        $this->nProtocol = $p_nProtocol;
        if($this->_create()&&$this->_bind()){
            $this->_listen();
        }
    }
    protected function _create(){
        $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
        if(!$this->pSocket){
            $this->_log();
        }
        return $this->pSocket;
    }
    protected function _bind(){
        $bRes = socket_bind($this->pSocket, $this->strHost, $this->nPort);
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    protected function _listen(){
        $bRes  = socket_listen($this->pSocket, 10) ;
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function accept(){
        $this->pClient = socket_accept($this->pSocket);
        if(!$this->pClient){
            $this->_log();
        }
        return $this->pClient;
    }
    protected function _connect(){
        $this->accept();
             
        if(socket_getpeername($this->pClient, $address, $port)){
            echo "Client $address : $port is now connected to us. \n";
        }
        $this->write("hello world from server\n");
    }
    protected function _reply(){
        $mxData = $this->read();
        var_dump($mxData);
        if ($mxData == false) {
            socket_close($this->pClient);
 
            echo "client disconnected.\n";
            return false;
        }
        else{
            $strMessage = "Client: ".trim($mxData)."\n";
            $this->write($strMessage);
            return true;
        }
    }
    public function run(){
        $this->_connect();
        $this->_reply();
    }
    public function read(){
        $mxMessage = socket_read($this->pClient, 1024, PHP_BINARY_READ);
        if($mxMessage === false){
            $this->_log();
        }
        return $mxMessage;
    }
    public function write($p_strMessage){
        $bRes = socket_write($this->pClient, $p_strMessage, strlen ($p_strMessage));
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function close(){
        $bRes = socket_close($this->pSocket);
 
        $this->pSocket = null;
    }
    protected function _log(){
        $this->strErrorCode = socket_last_error();
        $this->strErrorMsg = socket_strerror($this->strErrorCode);
        //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
    }
    public function __destruct(){
        if($this->pSocket){
            $this->close();
        }
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new ServerSocket($strHost, $nPort);
$pServer->run();

这里对socket_*系列函数进行了包装,创建socket服务端的步骤

  • 使用socket_create创建socket(套接字)。第一个参数AF_INET指IPV4网络协议,TCP和UDP均可使用,对应IPV6网络协议为AF_INET6,也可以使用UNIX socket协议AF_UNIX,作进程间通信
  • 。第二个参数对应套接字类型,SOCK_STREAM对应TCP协议使用,SOCK_DGRAM对应UDP协议使用,还有SOCK_SEQPACKET,SOCK_RAW,SOCK_RDM等类型。第三个为协议类型,TCP协议对应常量SOL_TCP,UDP协议对应常量SOL_UDP,其他协议可以从getprotobyname函数获取。

  • 使用socket_bind将套接字绑定到对应的主机端口或者UNIX socket上
  • 使用socket_listen监听该套接字上的连接
  • 使用socket_accept接收套接字上的请求连接,返回一个新的套接字用于与客户端通信。如果没有连接接入,将会阻塞住;如果有多个连接,使用第一个达到的连接。
  • 开始通信,使用socket_read获取请求信息,使用socket_write返回响应结果
  • 使用socket_close关闭连接,包括原始的和socket_accept产生的套接字

这个过程中,可以使用socket_last_error和socket_strerror获取错误信息。接着创建客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
<?php
class ClientSocket{
    protected $strHost = "127.0.0.1";
    protected $nPort = 2015;
    protected $nProtocol = SOL_TCP;
    private $pSocket = null;
    public $strErrorCode = "";
    public $strErrorMsg  = "";
 
    public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
        //参数验证
        $this->strHost = $p_strHost;
        $this->nPort = $p_nPort;
        $this->nProtocol = $p_nProtocol;
        if($this->_create()){
            $this->_connect();
        }
    }
    private function _create(){
        $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
        if(!$this->pSocket){
            $this->_log();
        }
        return $this->pSocket;
    }
    private function _connect(){
        $pSocket = $this->_create();
        $bRes = socket_connect($pSocket, $this->strHost, $this->nPort);
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function read(){
        $strMessage = "";
        $strBuffer = "";
        while ($strBuffer = socket_read ($this->pSocket, 1024, PHP_BINARY_READ)) {
            $strMessage .= $strBuffer;
        }
        return $strMessage;
    }
    public function write($p_strMessage){
        $bRes = socket_write($this->pSocket, $p_strMessage, strlen($p_strMessage));
        if(!$bRes){
            $this->_log();
        }
    }
    public function send($p_strMessage){
        $bRes = socket_send($this->pSocket , $p_strMessage , strlen($p_strMessage) , 0);
        if(!$bRes){
            $this->_log();
        }
        return true;
    }
    public function recv(){
        $strMessage = "";
        $strBuffer = "";
        $bRes = socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL);
        if(!$bRes){
            $this->_log();
        }
        $strMessage .=$strBuffer;
        return $strMessage;
    }
    public function close(){
        $bRes = socket_close($this->pSocket);
 
        $this->pSocket = null;
    }
    private function _log(){
        $this->strErrorCode = socket_last_error();
        $this->strErrorMsg = socket_strerror($this->strErrorCode);
        //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
    }
    public function __destruct(){
        if($this->pSocket){
            $this->close();
        }
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$pClient = new ClientSocket($strHost, $nPort);
 
var_dump($pClient->read());
$strMessage = 'Some Thing :'.uniqid();
var_dump($strMessage);
$pClient->write($strMessage);
var_dump($pClient->read());
 
/*
 var_dump($pClient->recv());
$pClient->send('hello');
var_dump($pClient->recv());
*/
$pClient->close();

客户端的创建步骤:

  • 使用socket_create创建socket套接字,与服务端对应
  • 使用socket_connect连接到服务端的地址或UNIX socket
  • 开始通信,可以使用socket_write和socket_read向套接字写入和读取信息,也可以使用socket_send和socket_recv发送和接收信息
  • 使用socket_close关闭连接

运行服务端程序

1
php serversocket.php

在另一个终端里运行

1
2
[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             0.0.0.0:*                   LISTEN      12139/php

如果运行服务端失败,提示 socket_bind(): unable to bind address [98]: Address already in use ,则是端口绑定失败。查看端口占用

1
2
[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             127.0.0.1:36618             TIME_WAIT   -

该端口处于TIME_WAIT状态,需要再等一会儿才会释放。这是因为TCP连接关闭需要四次握手,服务端主动关闭了连接,但是未收到客户端发过来的关闭确认,导致处于等待状态,具体原因见火丁笔记《再叙TIME_WAIT》

如果服务端已经运行成功,在另一个终端里运行客户端程序

1
php clientsocket.php

这是一个简单服务端/客户端请求应答模型,通常服务端会一直处于监听状态,处理新的请求,重新写一个循环监听的服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SelectServerSocket extends ServerSocket{
    public function run(){
        $this->loop();
    }
    public function loop(){
        $arrRead = array();
        $arrWrite = $arrExp = null;
        $arrClient = array($this->pSocket);
     
        while(true){
            $arrRead = $arrClient;
            if (socket_select($arrRead, $arrWrite, $arrExp, null) < 1){
                continue;
            }
            foreach ($arrRead as $pSocket){
                if($pSocket == $this->pSocket){
                    $this->_connect();
                         
                    $arrClient[] = $this->pClient;
                }
                else{
                    $bRes = $this->_reply();
                    if($bRes === false){
                        $nKey = array_search($this->pClient, $arrClient);
                        unset($arrClient[$nKey]);
                        continue;
                    }
                }
     
            }
        }
        //usleep(100);
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new SelectServerSocket($strHost, $nPort);
$pServer->run();

在循环里面使用socket_select查询有可以读的套接字,如果套接字为原始监听的套接字,则使用socket_accept获取新接入的通信套接字进行通信;如果是通信套接字,则与客户端进行交互。

这里socket_select($arrRead, $arrWrite, $arrExp, null)的第四个参数为null,表示可以无限阻塞,如果为0则不阻塞立即返回,其他大于0值则等待超时。
socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL)的第四个参数为MSG_WAITALL,表示阻塞读取结果。
socket_read ($this->pSocket, 1024, PHP_BINARY_READ )的第三个参数PHP_BINARY_READ表示读取以\0结束,PHP_NORMAL_READ表示读取以\n或\r结束

在终端里运行服务端,会一直在那里等待新的连接。这时候运行客户端,客户端确也阻塞住了。解决的办法有两种:超时设置和非阻塞设置。给ClientSocket类增加超时和阻塞设置的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public function setTimeOut($p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
    $nSend = (int)$p_nSendTimeOut;
    $nRecv = (int)$p_nRecvTimeOut;
 
    $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
    $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
 
    socket_set_option($this->pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
    socket_set_option($this->pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}
public function setBlock($p_bType = true){
    if($p_bType){
        socket_set_block($this->pSocket);
    }
    else{
        socket_set_nonblock($this->pSocket);
    }
}

客户端端运行前,先设置一下超时或非阻塞即,此时程序不会再阻塞了

1
2
3
4
5
6
$pClient = new ClientSocket($strHost, $nPort);
 
$pClient->setTimeOut(1, 1);
//$pClient->setBlock(false);
 
//do request here

同样在服务端设置超时和非阻塞也是可以的,给ServerSocket增加超时和非阻塞设置的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
protected function _setNoBlock($p_pSocket){
    socket_set_nonblock($p_pSocket);
}
protected function _setTimeOut($p_pSocket, $p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
    $nSend = (int)$p_nSendTimeOut;
    $nRecv = (int)$p_nRecvTimeOut;
 
    $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
    $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
 
    socket_set_option($p_pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
    socket_set_option($p_pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}

将SelectServerSocket的socket_accept后产生的连接设置为非阻塞

1
2
3
4
5
6
7
public function loop(){
    $arrRead = array();
    $arrWrite = $arrExp = null;
    $arrClient = array($this->pSocket);
    $this->_setNoBlock($this->pSocket);
 
    while(true){

再次运行服务端,客户端也不会阻塞了。

在while循环里面使用socket_select进行查询,效率比较低下,有先的连接要等下次循环才能处理;有时候并没有连接需要处理,也一直在循环。可以结合前面介绍过的PHP Libev扩展进行监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class EvServerSocket extends ServerSocket{
    protected function _onConnect(){
        $this->_connect();
     
        $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
            $this->_reply();
        });
        Ev::run();
    }
    public function run(){
        $pReadWatcher = new EvIo($this->pSocket, Ev::READ, function ($watcher, $revents) {
            $this->_onConnect();
        });
        Ev::run();
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new EvServerSocket($strHost, $nPort);
$pServer->run();

代码看起来简单了很多。当原始套接字监听到可读事件时,便为新的套接字也创建可读事件监听 ,在事件里面处理新的连接。

通常的服务端程序是一个进程监听原始套接字,然后交由其他进程/线程处理新的连接套接字,与客户端进行交互,提升服务端性能。这样子又涉及到了多进程/多线程的控制、通信,需要一套完善的体系才行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MulProcessServerSocket extends EvServerSocket{
    protected function _execute(){
        if(!$this->_reply()){
            //子进程执行完毕,通知父进程
            exit();
        }
    }
    protected function _onConnect(){
        $pid = pcntl_fork();
        //父进程和子进程都会执行下面代码
        if ($pid == -1) {
            //错误处理:创建子进程失败时返回-1.
            die('could not fork');
        } else if ($pid) {
            //父进程会得到子进程号,所以这里是父进程执行的逻辑
            pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
        } else {
            //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
            $this->_connect();
 
            $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
                $this->_execute();
            });
            Ev::run();
        }
 
    }
}

还可以使用stream_socket_*系列函数来创建sockt服务端和客户端。类似的创建一个客户端与之前的服务端进行交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<?php
class ClientStreamSocket{
    private $pConnetion = null;
    protected $strAddress = "tcp://127.0.0.1:2016";
    protected $nTimeOut   = 3;
    protected $nFlag      = STREAM_CLIENT_CONNECT;
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    const BLOCK   = 1;
    const NOBLOCK = 0;
    public function __construct($p_strAddress, $p_nTimeOut = 3, $p_nFlag = STREAM_CLIENT_CONNECT){
        $this->strAddress = $p_strAddress;
        $this->nTimeOut   = $p_nTimeOut;
        $this->nFlag      = $p_nFlag;
        $this->_connect();
    }
    private function _connect(){
        $this->pConnetion = stream_socket_client($this->strAddress, $this->strErrorCode, $this->strErrorMsg, $this->nTimeOut, $this->nFlag);
        if(!$this->pConnetion){
            throw new Exception("connect exception:".$this->strErrorMsg, $this->strErrorCode);
        }
        return $this->pConnetion;
    }
    public function write($p_strMessage){
        if(fwrite($this->pConnetion, $p_strMessage) !== strlen($p_strMessage))
        {
            throw new Exception('Can not send data');
        }
        return true;
    }
    public function read(){
        //接收一行,阻塞至\n结束
        //$strMessage = fgets($this->pConnetion);
        //指定长度读取
        //$strMessage = fread($this->pConnetion, 1024);
        $strMessage = stream_socket_recvfrom($this->pConnetion, 1024);
        //$strMessage = stream_get_contents($this->pConnetion);
 
        return $strMessage;
    }
    public function close(){
        fclose($this->pConnetion);
        $this->pConnetion = null;
    }
    public function setContext(){
 
    }
    public function setTimeOut($p_nTimeOut = 1){
        $bRes = stream_set_timeout($this->pConnetion, $p_nTimeOut);
    }
    public function setBlock($p_nMode = ClientStreamSocket::BLOCK){
        $bRes = stream_set_blocking($this->pConnetion, $p_nMode);
    }
    public function __destruct(){
        if($this->pConnetion){
            $this->close();
        }
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$strAddress = $strProtocol."://".$strHost.":".$nPort;
 
$pStream = new ClientStreamSocket($strAddress);
//$pStream->setBlock(ClientStreamSocket::NOBLOCK);
//$pStream->setTimeOut(1);
var_dump($pStream->read());
$pStream->write("hello from client\n");
var_dump($pStream->read());
$pStream->close();

使用stream_socket_*系列函数创建客户端要简单不少

  • 首先使用stream_socket_client创建一个socket操作流(stream)
  • 然后就可以像操作流式文件那样造成socket stream,使用fread和fwrite进行读写操作,也可以使用stream_socket_recvfrom和stream_socket_sendto进行操作
  • 使用fclose或stream_socket_shutdown关闭连接

使用stream_socket_*系列函数创建一个服务端来与之前的客户端进行交互,同样很简单,也与ServerSocket类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
<?php
class ServerStreamSocket{
    protected $pServer = null;
    protected $pClient = null;
    protected $strAddress = "tcp://127.0.0.1:2016";
    protected $nFlag      = STREAM_SERVER_LISTEN;
     
    const BLOCK   = 1;
    const NOBLOCK = 0;
     
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    public function __construct($p_strAddress, $p_nFlag = STREAM_SERVER_LISTEN){
        $this->strAddress = $p_strAddress;
        $this->nFlag = $p_nFlag;
        $this->_create();
    }
    protected function _create(){
        $this->pServer = stream_socket_server($this->strAddress, $this->strErrorCode, $this->strErrorMsg);
        if(!$this->pServer ){
            throw new Exception("create exception:".$this->strErrorMsg, $this->strErrorCode);
        }
        return $this->pServer ;
    }
    public function accept(){
        $this->pClient = stream_socket_accept($this->pServer);
        if(!$this->pClient ){
            return false;
        }
        return $this->pClient ;
    }
    protected function _connect(){
        $this->accept();
        echo "Client". stream_socket_get_name($this->pClient, true)." is now connected to us. \n";
        $this->write("hello world from server\n");
    }
    protected function _reply(){
        $mxData = $this->read();
        var_dump($mxData);
        if($mxData == false){
            fclose($this->pClient);
                 
            echo "client disconnected.\n";
            return false;
        }
        else{
            $strMessage = "Client:".trim($mxData)."\n";
            $this->write($strMessage);
            return true;
        }
    }
    public function run(){
        $this->_connect();
        $this->_reply();
    }
    public function write($p_strMessage){
        //$nLen = fwrite($this->pClient, $p_strMessage);
        $nLen = stream_socket_sendto($this->pClient, $p_strMessage);
        if($nLen !== strlen($p_strMessage))
        {
            throw new Exception('Can not send data');
        }
        return true;
    }
    public function read(){
        //接收一行,阻塞至\n结束
        //$strMessage = fgets($this->pClient);
        //指定长度读取
        //$strMessage = fread($this->pClient, 1024);
        $strMessage = stream_socket_recvfrom($this->pClient, 1024);
        //$strMessage = stream_get_contents($this->pClient);
 
        return $strMessage;
    }
    public function close(){
        fclose($this->pServer);
         
        $this->pServer = null;
    }
    public function setContext(){
 
    }
    public function setTimeOut($p_pConnetction, $p_nTimeOut = 1){
        $bRes = stream_set_timeout($p_pConnetction, $p_nTimeOut);
    }
    public function setBlock($p_pConnetction, $p_nMode = ServerStreamSocket::BLOCK){
        $bRes = stream_set_blocking($p_pConnetction, $p_nMode);
    }
    public function __destruct(){
        if($this->pServer){
            $this->close();
        }
    }
}
class SelectServerStreamSocket extends ServerStreamSocket{
    public function run(){
        $this->loop();
    }
    public function loop(){
        $arrRead = array();
        $arrWrite = $arrExp = null;
        $arrClient = array($this->pServer);
        while(true){
            $arrRead = $arrClient;
            if (stream_select($arrRead, $arrWrite, $arrExp, null) < 1){
                continue;
            }
            if(in_array($this->pServer, $arrRead)){
                $this->_connect();
     
                $arrClient[] = $this->pClient;
     
                $nKey = array_search($this->pServer, $arrRead);
                unset($arrRead[$nKey]);
            }
            foreach($arrRead as $pConnetcion){
                $bRes = $this->_reply();
                if($bRes === false){
                    $nKey = array_search($this->pClient, $arrClient);
                    unset($arrClient[$nKey]);;
                    continue;
                }
            }
        }
        //usleep(100);
    }
}
class EvServerStreamSocket extends ServerStreamSocket{
    protected function _onConnect(){
        $this->_connect();
         
        $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
            $this->_reply();
        });
        Ev::run();
    }
    public function run(){
        $pReadWatcher = new EvIo($this->pServer, Ev::READ, function ($watcher, $revents) {
            $this->_onConnect();
        });
        Ev::run();
    }
}
class MulProcessServerStreamSocket extends EvServerStreamSocket{
    protected function _execute(){
        if(!$this->_reply()){
            //子进程执行完毕,通知父进程
            exit();
        }
    }
    protected function _onConnect(){
        $pid = pcntl_fork();
        if ($pid == -1) {
            die('could not fork');
        } else if ($pid) {
            pcntl_wait($status);
        } else {
            $this->_connect();
 
            $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
                $this->_execute();
            });
            Ev::run();
        }
 
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$strAddress = $strProtocol."://".$strHost.":".$nPort;
 
$pServer = new EvServerStreamSocket($strAddress);
 
$pServer->run();

这里演示客户端与服务端交互,都是两步走,先发送一个请求再获取结果。在Thrift RPC远程调用中,既可先发送请求,过一会儿再来获取结果,达到异步交互的目的;也可发送完请求后立即获取结果,达到同步请求的目的。

参考链接:
Socket Programming in PHP
Socket programming with streams in php
PHP Socket programming tutorial
php 实例说明 socket通信机制
Mpass – PHP做Socket服务的解决方案
How to forcibly close a socket in TIME_WAIT?

PHP 事件驱动开发

最近在学习PHP的系统事件驱动(event-base)开发,发现PHP有好几个event扩展,根据底层库依赖分为两类:libeventlibev。libevent可以为文件描述符、信号、超时设定等事件提供了监听回调,支持poll/kqueue/event port/select/epoll。libevent 库的其他组件提供其他功能,包括缓冲的事件系统(用于缓冲发送到客户端/从客户端接收的数据)以及 HTTP、DNS 和 RPC 系统的核心实现。libev提供了各种监听器,包括子进程监听,超时设定,定时器,IO监听,信号监听,文件监视等,支持epoll/kqueue/event ports/inotify/eventfd/signalfd,更快的时钟管理,时间变化检测和修正。PHP依赖libevent扩展有libeventevent,PHP依赖libev扩展则有Evlibev

libevent在PHP事件驱动开发上应用广泛,比如workermanphpDaemonReactPHPKellner。CentOS上PHP 5.4安装libevent扩展

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sudo yum install libevent-devel
 
wget https://pecl.php.net/get/libevent-0.1.0.tgz
tar -zxvf libevent-0.1.0.tgz
cd libevent-0.1.0
phpize
./configure
sudo make
sudo make install
 
#增加libevent.so
sudo vim /etc/php.ini
 
#是否安装成功
php -m | grep libevent

前面介绍过使用ticks和pcntl_signal来做定时器,然而tick运行机制是PHP解释器每执行 N 条可计时的低级语句就会发生的事件,如果tick值设置小了,会产生频繁的系统调用,设置大了又不能保证及时。使用libevent来设置一个定时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php
function print_dot(){
    echo ".";
}
 
class Timer{
    protected  $pEventBase;
    protected $pEvent;
    public $nInterval = 1;
    public function __construct(){
        $this->pEventBase = event_base_new();
    }
    public function addEvent($p_pFunc, $p_mxArgs = null){
        $this->pEvent = event_new();
        event_set($this->pEvent, 0, EV_TIMEOUT, $p_pFunc, $p_mxArgs);
        event_base_set($this->pEvent, $this->pEventBase);
    }
    public function loop(){
        event_add($this->pEvent, $this->nInterval*1000000);
        event_base_loop($this->pEventBase);
    }
}
 
$pTimer = new Timer();
$pTimer->addEvent("print_dot");
while(1){
    $pTimer->loop();
}

libevent使用也很简单:

  • 使用event_base_new和event_new分别创建event_base和event
  • 使用event_set为event设置要监听文件描述符fd,比如文件、socke、信号,超时则fd为0,事件类型和回调函数
  • 使用event_base_set关联event_base和event
  • 使用event_add将设置好的event加入事件监听器
  • 调用event_base_loop开始处理事件

官网上有个例子用来做socket监听处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?php
$socket = stream_socket_server ('tcp://0.0.0.0:2000', $errno, $errstr);
stream_set_blocking($socket, 0);
$base = event_base_new();
$event = event_new();
event_set($event, $socket, EV_READ | EV_PERSIST, 'ev_accept', $base);
event_base_set($event, $base);
event_add($event);
event_base_loop($base);
 
$GLOBALS['connections'] = array();
$GLOBALS['buffers'] = array();
 
function ev_accept($socket, $flag, $base) {
    static $id = 0;
     
    $connection = stream_socket_accept($socket);
    stream_set_blocking($connection, 0);
     
    $id += 1;
     
    $buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $id);
    event_buffer_base_set($buffer, $base);
    event_buffer_timeout_set($buffer, 30, 30);
    event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
    event_buffer_priority_set($buffer, 10);
    event_buffer_enable($buffer, EV_READ | EV_PERSIST);
     
    // we need to save both buffer and connection outside
    $GLOBALS['connections'][$id] = $connection;
    $GLOBALS['buffers'][$id] = $buffer;
}
 
function ev_error($buffer, $error, $id) {
    event_buffer_disable($GLOBALS['buffers'][$id], EV_READ | EV_WRITE);
    event_buffer_free($GLOBALS['buffers'][$id]);
    fclose($GLOBALS['connections'][$id]);
    unset($GLOBALS['buffers'][$id], $GLOBALS['connections'][$id]);
}
 
function ev_read($buffer, $id) {
    while ($read = event_buffer_read($buffer, 256)) {
        var_dump($read);
    }
}

相比libevent,event扩展提供了面向对象的方法,支持libevent 2+ 的特性,对HTTP,DNS,OpenSSL等协议操作进行封装。Kellner框架比较有意思,在PHP的libevent扩展基础上将http请求处理封装成了扩展,使用cli模式处理http请求,并给出了基于Zend Framework 2的示例。

libev自称libevent的替代者,克服了libevent的一些不利影响,开销更小,Node JS便是利用它来做事件驱动。相比基于libeventd的扩展,基于libev的ev扩展更新比较积极,支持设置各种的监听器,为感兴趣的事件注册回调,比如文件变化,超时。CentOS上PHP 5.4安装ev扩展

1
2
3
4
5
6
7
8
9
10
11
12
13
wget https://pecl.php.net/get/ev-0.2.15.tgz
tar -zxvf ev-0.2.15
cd ev-0.2.15
phpize
./configure
sudo make
sudo make install
 
#增加ev.so
sudo vim /etc/php.ini
 
#是否安装成功
php -m | grep ev

libev封装了各种监视器,操作也比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<?php
/**
 * 延迟1秒后执行,不重复
 */
$pDelay = new EvTimer(1, 0, function () {
    echo "1 delay \n";
});
/**
 * 每隔一秒执行一次的定时器,0秒后执行
 */
$pTimer = new EvTimer(0, 1, function () {
    echo "1 seconds \n";
});
/**
 * 如果没有其他更高等级的监视器,那么就执行EvIdle,处于低优先级则不执行
 */
$pIdle = new EvIdle(function(){
    sleep(1);
    echo "idle timer \n";
},0,2);
/**
 * 每一次loop开始都会执行
 */
$pPrepare = new EvPrepare(function(){
    echo "before timer \n";
},0);
/**
 * 每一次loop都会执行,可以通过优先级调整执行顺序
 */
$c = new EvCheck(function(){
    echo "after timer \n";
},0,-1);
/**
 * 定时器,每隔1.5秒后执行一次,0秒后开始
 */
$pPeriod = new EvPeriodic(0., 1.5, NULL, function ($w, $revents) {
    echo time(), PHP_EOL;
});
/**
 * IO输入事件监听,可以拿去监听socket的Ev::WRITE和Ev::READ事件
 */
$pReadWatcher = new EvIo(STDIN, Ev::READ, function ($watcher, $revents) {
    echo "STDIN is readable\n";
});
 
/**
 * 注册监听感兴趣的信号
 */
$pSignal = new EvSignal(SIGTERM, function ($watcher) {
    echo "SIGTERM received\n";
    $watcher->stop();
});
/**
 * 文件变化监听器,10秒监测一次
 */
$pStatWatcher = new EvStat("/var/log/messages", 10, function ($w) {
    echo "/var/log/messages changed\n";
     
    $attr = $pStatWatcher->attr();
     
    if ($attr['nlink']) {
        printf("Current size: %ld\n", $attr['size']);
        printf("Current atime: %ld\n", $attr['atime']);
        printf("Current mtime: %ld\n", $attr['mtime']);
    } else {
        fprintf(STDERR, "`messages` file is not there!");
        $pStatWatcher->stop();
    }
});
     
/**
 * 开始执行Ev::RUN_ONCE则立即执行Ev::RUN_NOWAIT则非阻塞执行
 */
Ev::run();

也可以监听子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$pid = pcntl_fork();
 
if ($pid == -1) {
    fprintf(STDERR, "pcntl_fork failed\n");
} elseif ($pid) {
    $w = new EvChild($pid, FALSE, function ($w, $revents) {
        $w->stop();
 
        printf("Process %d exited with status %d\n", $w->rpid, $w->rstatus);
    });
 
    Ev::run();
 
    // Protect against Zombies
    pcntl_wait($status);
} else {
    //Forked child
    exit(2);
}

php的libev扩展也实现了libev的所有监视器,提供类似的用法,但比较久没更新了。

在网络编程中,使用事件驱动模型监听感兴趣的事件,结合异步处理,能够大大提高服务器性能。传统服务器模型如Apache为每一个请求生成一个子进程。当用户连接到服务器的一个子进程就产生,并处理连接。每个连接获得一个单独的线程和子进程。当用户请求数据返回时,子进程开始等待数据库操作返回。如果此时另一个用户也请求返回数据,这时就产生了阻塞。以下引用自《使用事件驱动模型实现高效稳定的网络服务器程序》

简单网络编程模型里面,服务器与客户端都是一应一答,大部分的 socket 接口都是阻塞型的。在面对多个客户端的请求时候,最简单的解决方式是在服务器端使用多线程(或多进程)。如果要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而线程与进程本身也更容易进入假死状态。
于是便有了“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。
但是,“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用 IO 接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。对付可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。

于是便有了基于事件驱动的非阻塞型服务器,比如Nginx,Node.js。Nginx采用事件驱动,使用epoll事件模型,充分使用异步逻辑,削减了上下文调度开销,并发服务能力更强。Node.js 的异步机制是基于事件的,所有的磁盘 I/O、网络通信、数据库查询都以非阻塞的方式请求,返回的结果由事件循环来处理。Node.js 在执行的过程中会维护一个事件队列,程序在执行时进入事件循环等待下一个事件到来,每个异步式 I/O 请求完成后会被推送到事件队列,等待程序进程进行处理。

参考链接:
libev – a high performance full-featured event loop written in C
Working with events
使用 libevent 和 libev 提高网络应用性能
为什么事件驱动服务器这么火
Asynchronous PHP and Real-time Messaging
react.php 中的异步实现