标签归档:异步

PHP Socket通信

前一篇介绍了跨语言的服务调用框架Thrift,模块与模块之间调用,网络通信必不可少。这里具体介绍下如何使用PHP socket客户端与服务端进行通信。

PHP 的Socket扩展是基于流行的BSD sockets,实现了和socket通讯功能的底层接口,它可以和通用客户端一样当做一个socket服务器。这里的通用客户端是指stream_socket_*系列封装的函数。

首先写一个socket服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?php
class ServerSocket{
    protected $strHost = "127.0.0.1";
    protected $nPort = 2015;
    protected $nProtocol = SOL_TCP;
    protected $pSocket = null;
    protected $pClient = null;
 
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
        //参数验证
        $this->strHost = $p_strHost;
        $this->nPort = $p_nPort;
        $this->nProtocol = $p_nProtocol;
        if($this->_create()&&$this->_bind()){
            $this->_listen();
        }
    }
    protected function _create(){
        $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
        if(!$this->pSocket){
            $this->_log();
        }
        return $this->pSocket;
    }
    protected function _bind(){
        $bRes = socket_bind($this->pSocket, $this->strHost, $this->nPort);
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    protected function _listen(){
        $bRes  = socket_listen($this->pSocket, 10) ;
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function accept(){
        $this->pClient = socket_accept($this->pSocket);
        if(!$this->pClient){
            $this->_log();
        }
        return $this->pClient;
    }
    protected function _connect(){
        $this->accept();
             
        if(socket_getpeername($this->pClient, $address, $port)){
            echo "Client $address : $port is now connected to us. \n";
        }
        $this->write("hello world from server\n");
    }
    protected function _reply(){
        $mxData = $this->read();
        var_dump($mxData);
        if ($mxData == false) {
            socket_close($this->pClient);
 
            echo "client disconnected.\n";
            return false;
        }
        else{
            $strMessage = "Client: ".trim($mxData)."\n";
            $this->write($strMessage);
            return true;
        }
    }
    public function run(){
        $this->_connect();
        $this->_reply();
    }
    public function read(){
        $mxMessage = socket_read($this->pClient, 1024, PHP_BINARY_READ);
        if($mxMessage === false){
            $this->_log();
        }
        return $mxMessage;
    }
    public function write($p_strMessage){
        $bRes = socket_write($this->pClient, $p_strMessage, strlen ($p_strMessage));
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function close(){
        $bRes = socket_close($this->pSocket);
 
        $this->pSocket = null;
    }
    protected function _log(){
        $this->strErrorCode = socket_last_error();
        $this->strErrorMsg = socket_strerror($this->strErrorCode);
        //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
    }
    public function __destruct(){
        if($this->pSocket){
            $this->close();
        }
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new ServerSocket($strHost, $nPort);
$pServer->run();

这里对socket_*系列函数进行了包装,创建socket服务端的步骤

  • 使用socket_create创建socket(套接字)。第一个参数AF_INET指IPV4网络协议,TCP和UDP均可使用,对应IPV6网络协议为AF_INET6,也可以使用UNIX socket协议AF_UNIX,作进程间通信
  • 。第二个参数对应套接字类型,SOCK_STREAM对应TCP协议使用,SOCK_DGRAM对应UDP协议使用,还有SOCK_SEQPACKET,SOCK_RAW,SOCK_RDM等类型。第三个为协议类型,TCP协议对应常量SOL_TCP,UDP协议对应常量SOL_UDP,其他协议可以从getprotobyname函数获取。

  • 使用socket_bind将套接字绑定到对应的主机端口或者UNIX socket上
  • 使用socket_listen监听该套接字上的连接
  • 使用socket_accept接收套接字上的请求连接,返回一个新的套接字用于与客户端通信。如果没有连接接入,将会阻塞住;如果有多个连接,使用第一个达到的连接。
  • 开始通信,使用socket_read获取请求信息,使用socket_write返回响应结果
  • 使用socket_close关闭连接,包括原始的和socket_accept产生的套接字

这个过程中,可以使用socket_last_error和socket_strerror获取错误信息。接着创建客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
<?php
class ClientSocket{
    protected $strHost = "127.0.0.1";
    protected $nPort = 2015;
    protected $nProtocol = SOL_TCP;
    private $pSocket = null;
    public $strErrorCode = "";
    public $strErrorMsg  = "";
 
    public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
        //参数验证
        $this->strHost = $p_strHost;
        $this->nPort = $p_nPort;
        $this->nProtocol = $p_nProtocol;
        if($this->_create()){
            $this->_connect();
        }
    }
    private function _create(){
        $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
        if(!$this->pSocket){
            $this->_log();
        }
        return $this->pSocket;
    }
    private function _connect(){
        $pSocket = $this->_create();
        $bRes = socket_connect($pSocket, $this->strHost, $this->nPort);
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function read(){
        $strMessage = "";
        $strBuffer = "";
        while ($strBuffer = socket_read ($this->pSocket, 1024, PHP_BINARY_READ)) {
            $strMessage .= $strBuffer;
        }
        return $strMessage;
    }
    public function write($p_strMessage){
        $bRes = socket_write($this->pSocket, $p_strMessage, strlen($p_strMessage));
        if(!$bRes){
            $this->_log();
        }
    }
    public function send($p_strMessage){
        $bRes = socket_send($this->pSocket , $p_strMessage , strlen($p_strMessage) , 0);
        if(!$bRes){
            $this->_log();
        }
        return true;
    }
    public function recv(){
        $strMessage = "";
        $strBuffer = "";
        $bRes = socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL);
        if(!$bRes){
            $this->_log();
        }
        $strMessage .=$strBuffer;
        return $strMessage;
    }
    public function close(){
        $bRes = socket_close($this->pSocket);
 
        $this->pSocket = null;
    }
    private function _log(){
        $this->strErrorCode = socket_last_error();
        $this->strErrorMsg = socket_strerror($this->strErrorCode);
        //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
    }
    public function __destruct(){
        if($this->pSocket){
            $this->close();
        }
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$pClient = new ClientSocket($strHost, $nPort);
 
var_dump($pClient->read());
$strMessage = 'Some Thing :'.uniqid();
var_dump($strMessage);
$pClient->write($strMessage);
var_dump($pClient->read());
 
/*
 var_dump($pClient->recv());
$pClient->send('hello');
var_dump($pClient->recv());
*/
$pClient->close();

客户端的创建步骤:

  • 使用socket_create创建socket套接字,与服务端对应
  • 使用socket_connect连接到服务端的地址或UNIX socket
  • 开始通信,可以使用socket_write和socket_read向套接字写入和读取信息,也可以使用socket_send和socket_recv发送和接收信息
  • 使用socket_close关闭连接

运行服务端程序

1
php serversocket.php

在另一个终端里运行

1
2
[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             0.0.0.0:*                   LISTEN      12139/php

如果运行服务端失败,提示 socket_bind(): unable to bind address [98]: Address already in use ,则是端口绑定失败。查看端口占用

1
2
[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             127.0.0.1:36618             TIME_WAIT   -

该端口处于TIME_WAIT状态,需要再等一会儿才会释放。这是因为TCP连接关闭需要四次握手,服务端主动关闭了连接,但是未收到客户端发过来的关闭确认,导致处于等待状态,具体原因见火丁笔记《再叙TIME_WAIT》

如果服务端已经运行成功,在另一个终端里运行客户端程序

1
php clientsocket.php

这是一个简单服务端/客户端请求应答模型,通常服务端会一直处于监听状态,处理新的请求,重新写一个循环监听的服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SelectServerSocket extends ServerSocket{
    public function run(){
        $this->loop();
    }
    public function loop(){
        $arrRead = array();
        $arrWrite = $arrExp = null;
        $arrClient = array($this->pSocket);
     
        while(true){
            $arrRead = $arrClient;
            if (socket_select($arrRead, $arrWrite, $arrExp, null) < 1){
                continue;
            }
            foreach ($arrRead as $pSocket){
                if($pSocket == $this->pSocket){
                    $this->_connect();
                         
                    $arrClient[] = $this->pClient;
                }
                else{
                    $bRes = $this->_reply();
                    if($bRes === false){
                        $nKey = array_search($this->pClient, $arrClient);
                        unset($arrClient[$nKey]);
                        continue;
                    }
                }
     
            }
        }
        //usleep(100);
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new SelectServerSocket($strHost, $nPort);
$pServer->run();

在循环里面使用socket_select查询有可以读的套接字,如果套接字为原始监听的套接字,则使用socket_accept获取新接入的通信套接字进行通信;如果是通信套接字,则与客户端进行交互。

这里socket_select($arrRead, $arrWrite, $arrExp, null)的第四个参数为null,表示可以无限阻塞,如果为0则不阻塞立即返回,其他大于0值则等待超时。
socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL)的第四个参数为MSG_WAITALL,表示阻塞读取结果。
socket_read ($this->pSocket, 1024, PHP_BINARY_READ )的第三个参数PHP_BINARY_READ表示读取以\0结束,PHP_NORMAL_READ表示读取以\n或\r结束

在终端里运行服务端,会一直在那里等待新的连接。这时候运行客户端,客户端确也阻塞住了。解决的办法有两种:超时设置和非阻塞设置。给ClientSocket类增加超时和阻塞设置的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public function setTimeOut($p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
    $nSend = (int)$p_nSendTimeOut;
    $nRecv = (int)$p_nRecvTimeOut;
 
    $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
    $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
 
    socket_set_option($this->pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
    socket_set_option($this->pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}
public function setBlock($p_bType = true){
    if($p_bType){
        socket_set_block($this->pSocket);
    }
    else{
        socket_set_nonblock($this->pSocket);
    }
}

客户端端运行前,先设置一下超时或非阻塞即,此时程序不会再阻塞了

1
2
3
4
5
6
$pClient = new ClientSocket($strHost, $nPort);
 
$pClient->setTimeOut(1, 1);
//$pClient->setBlock(false);
 
//do request here

同样在服务端设置超时和非阻塞也是可以的,给ServerSocket增加超时和非阻塞设置的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
protected function _setNoBlock($p_pSocket){
    socket_set_nonblock($p_pSocket);
}
protected function _setTimeOut($p_pSocket, $p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
    $nSend = (int)$p_nSendTimeOut;
    $nRecv = (int)$p_nRecvTimeOut;
 
    $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
    $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
 
    socket_set_option($p_pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
    socket_set_option($p_pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}

将SelectServerSocket的socket_accept后产生的连接设置为非阻塞

1
2
3
4
5
6
7
public function loop(){
    $arrRead = array();
    $arrWrite = $arrExp = null;
    $arrClient = array($this->pSocket);
    $this->_setNoBlock($this->pSocket);
 
    while(true){

再次运行服务端,客户端也不会阻塞了。

在while循环里面使用socket_select进行查询,效率比较低下,有先的连接要等下次循环才能处理;有时候并没有连接需要处理,也一直在循环。可以结合前面介绍过的PHP Libev扩展进行监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class EvServerSocket extends ServerSocket{
    protected function _onConnect(){
        $this->_connect();
     
        $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
            $this->_reply();
        });
        Ev::run();
    }
    public function run(){
        $pReadWatcher = new EvIo($this->pSocket, Ev::READ, function ($watcher, $revents) {
            $this->_onConnect();
        });
        Ev::run();
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new EvServerSocket($strHost, $nPort);
$pServer->run();

代码看起来简单了很多。当原始套接字监听到可读事件时,便为新的套接字也创建可读事件监听 ,在事件里面处理新的连接。

通常的服务端程序是一个进程监听原始套接字,然后交由其他进程/线程处理新的连接套接字,与客户端进行交互,提升服务端性能。这样子又涉及到了多进程/多线程的控制、通信,需要一套完善的体系才行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MulProcessServerSocket extends EvServerSocket{
    protected function _execute(){
        if(!$this->_reply()){
            //子进程执行完毕,通知父进程
            exit();
        }
    }
    protected function _onConnect(){
        $pid = pcntl_fork();
        //父进程和子进程都会执行下面代码
        if ($pid == -1) {
            //错误处理:创建子进程失败时返回-1.
            die('could not fork');
        } else if ($pid) {
            //父进程会得到子进程号,所以这里是父进程执行的逻辑
            pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
        } else {
            //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
            $this->_connect();
 
            $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
                $this->_execute();
            });
            Ev::run();
        }
 
    }
}

还可以使用stream_socket_*系列函数来创建sockt服务端和客户端。类似的创建一个客户端与之前的服务端进行交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<?php
class ClientStreamSocket{
    private $pConnetion = null;
    protected $strAddress = "tcp://127.0.0.1:2016";
    protected $nTimeOut   = 3;
    protected $nFlag      = STREAM_CLIENT_CONNECT;
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    const BLOCK   = 1;
    const NOBLOCK = 0;
    public function __construct($p_strAddress, $p_nTimeOut = 3, $p_nFlag = STREAM_CLIENT_CONNECT){
        $this->strAddress = $p_strAddress;
        $this->nTimeOut   = $p_nTimeOut;
        $this->nFlag      = $p_nFlag;
        $this->_connect();
    }
    private function _connect(){
        $this->pConnetion = stream_socket_client($this->strAddress, $this->strErrorCode, $this->strErrorMsg, $this->nTimeOut, $this->nFlag);
        if(!$this->pConnetion){
            throw new Exception("connect exception:".$this->strErrorMsg, $this->strErrorCode);
        }
        return $this->pConnetion;
    }
    public function write($p_strMessage){
        if(fwrite($this->pConnetion, $p_strMessage) !== strlen($p_strMessage))
        {
            throw new Exception('Can not send data');
        }
        return true;
    }
    public function read(){
        //接收一行,阻塞至\n结束
        //$strMessage = fgets($this->pConnetion);
        //指定长度读取
        //$strMessage = fread($this->pConnetion, 1024);
        $strMessage = stream_socket_recvfrom($this->pConnetion, 1024);
        //$strMessage = stream_get_contents($this->pConnetion);
 
        return $strMessage;
    }
    public function close(){
        fclose($this->pConnetion);
        $this->pConnetion = null;
    }
    public function setContext(){
 
    }
    public function setTimeOut($p_nTimeOut = 1){
        $bRes = stream_set_timeout($this->pConnetion, $p_nTimeOut);
    }
    public function setBlock($p_nMode = ClientStreamSocket::BLOCK){
        $bRes = stream_set_blocking($this->pConnetion, $p_nMode);
    }
    public function __destruct(){
        if($this->pConnetion){
            $this->close();
        }
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$strAddress = $strProtocol."://".$strHost.":".$nPort;
 
$pStream = new ClientStreamSocket($strAddress);
//$pStream->setBlock(ClientStreamSocket::NOBLOCK);
//$pStream->setTimeOut(1);
var_dump($pStream->read());
$pStream->write("hello from client\n");
var_dump($pStream->read());
$pStream->close();

使用stream_socket_*系列函数创建客户端要简单不少

  • 首先使用stream_socket_client创建一个socket操作流(stream)
  • 然后就可以像操作流式文件那样造成socket stream,使用fread和fwrite进行读写操作,也可以使用stream_socket_recvfrom和stream_socket_sendto进行操作
  • 使用fclose或stream_socket_shutdown关闭连接

使用stream_socket_*系列函数创建一个服务端来与之前的客户端进行交互,同样很简单,也与ServerSocket类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
<?php
class ServerStreamSocket{
    protected $pServer = null;
    protected $pClient = null;
    protected $strAddress = "tcp://127.0.0.1:2016";
    protected $nFlag      = STREAM_SERVER_LISTEN;
     
    const BLOCK   = 1;
    const NOBLOCK = 0;
     
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    public function __construct($p_strAddress, $p_nFlag = STREAM_SERVER_LISTEN){
        $this->strAddress = $p_strAddress;
        $this->nFlag = $p_nFlag;
        $this->_create();
    }
    protected function _create(){
        $this->pServer = stream_socket_server($this->strAddress, $this->strErrorCode, $this->strErrorMsg);
        if(!$this->pServer ){
            throw new Exception("create exception:".$this->strErrorMsg, $this->strErrorCode);
        }
        return $this->pServer ;
    }
    public function accept(){
        $this->pClient = stream_socket_accept($this->pServer);
        if(!$this->pClient ){
            return false;
        }
        return $this->pClient ;
    }
    protected function _connect(){
        $this->accept();
        echo "Client". stream_socket_get_name($this->pClient, true)." is now connected to us. \n";
        $this->write("hello world from server\n");
    }
    protected function _reply(){
        $mxData = $this->read();
        var_dump($mxData);
        if($mxData == false){
            fclose($this->pClient);
                 
            echo "client disconnected.\n";
            return false;
        }
        else{
            $strMessage = "Client:".trim($mxData)."\n";
            $this->write($strMessage);
            return true;
        }
    }
    public function run(){
        $this->_connect();
        $this->_reply();
    }
    public function write($p_strMessage){
        //$nLen = fwrite($this->pClient, $p_strMessage);
        $nLen = stream_socket_sendto($this->pClient, $p_strMessage);
        if($nLen !== strlen($p_strMessage))
        {
            throw new Exception('Can not send data');
        }
        return true;
    }
    public function read(){
        //接收一行,阻塞至\n结束
        //$strMessage = fgets($this->pClient);
        //指定长度读取
        //$strMessage = fread($this->pClient, 1024);
        $strMessage = stream_socket_recvfrom($this->pClient, 1024);
        //$strMessage = stream_get_contents($this->pClient);
 
        return $strMessage;
    }
    public function close(){
        fclose($this->pServer);
         
        $this->pServer = null;
    }
    public function setContext(){
 
    }
    public function setTimeOut($p_pConnetction, $p_nTimeOut = 1){
        $bRes = stream_set_timeout($p_pConnetction, $p_nTimeOut);
    }
    public function setBlock($p_pConnetction, $p_nMode = ServerStreamSocket::BLOCK){
        $bRes = stream_set_blocking($p_pConnetction, $p_nMode);
    }
    public function __destruct(){
        if($this->pServer){
            $this->close();
        }
    }
}
class SelectServerStreamSocket extends ServerStreamSocket{
    public function run(){
        $this->loop();
    }
    public function loop(){
        $arrRead = array();
        $arrWrite = $arrExp = null;
        $arrClient = array($this->pServer);
        while(true){
            $arrRead = $arrClient;
            if (stream_select($arrRead, $arrWrite, $arrExp, null) < 1){
                continue;
            }
            if(in_array($this->pServer, $arrRead)){
                $this->_connect();
     
                $arrClient[] = $this->pClient;
     
                $nKey = array_search($this->pServer, $arrRead);
                unset($arrRead[$nKey]);
            }
            foreach($arrRead as $pConnetcion){
                $bRes = $this->_reply();
                if($bRes === false){
                    $nKey = array_search($this->pClient, $arrClient);
                    unset($arrClient[$nKey]);;
                    continue;
                }
            }
        }
        //usleep(100);
    }
}
class EvServerStreamSocket extends ServerStreamSocket{
    protected function _onConnect(){
        $this->_connect();
         
        $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
            $this->_reply();
        });
        Ev::run();
    }
    public function run(){
        $pReadWatcher = new EvIo($this->pServer, Ev::READ, function ($watcher, $revents) {
            $this->_onConnect();
        });
        Ev::run();
    }
}
class MulProcessServerStreamSocket extends EvServerStreamSocket{
    protected function _execute(){
        if(!$this->_reply()){
            //子进程执行完毕,通知父进程
            exit();
        }
    }
    protected function _onConnect(){
        $pid = pcntl_fork();
        if ($pid == -1) {
            die('could not fork');
        } else if ($pid) {
            pcntl_wait($status);
        } else {
            $this->_connect();
 
            $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
                $this->_execute();
            });
            Ev::run();
        }
 
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$strAddress = $strProtocol."://".$strHost.":".$nPort;
 
$pServer = new EvServerStreamSocket($strAddress);
 
$pServer->run();

这里演示客户端与服务端交互,都是两步走,先发送一个请求再获取结果。在Thrift RPC远程调用中,既可先发送请求,过一会儿再来获取结果,达到异步交互的目的;也可发送完请求后立即获取结果,达到同步请求的目的。

参考链接:
Socket Programming in PHP
Socket programming with streams in php
PHP Socket programming tutorial
php 实例说明 socket通信机制
Mpass – PHP做Socket服务的解决方案
How to forcibly close a socket in TIME_WAIT?

PHP RPC开发之Thrift

Apache Thrift是一个跨语言的服务部署框架,通过一个中间语言(IDL, 接口定义语言)来定义RPC的接口和数据类型,然后通过一个编译器生成不同语言的代码(支持C++,Java,Python,PHP, GO,Javascript,Ruby,Erlang,Perl, Haskell, C#等),并由生成的代码负责RPC协议层和传输层的实现。

在CentOS 6.5上安装Thrift

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
sudo yum -y update
sudo yum -y groupinstall "Development Tools"
 
#升级autoconf,必须2.65以上
wget http://ftp.gnu.org/gnu/autoconf/autoconf-2.69.tar.gz
tar xvf autoconf-2.69.tar.gz
cd autoconf-2.69
./configure --prefix=/usr
make
sudo make install
cd ..
 
#升级automake必须1.14以上
wget http://ftp.gnu.org/gnu/automake/automake-1.14.tar.gz
tar xvf automake-1.14.tar.gz
cd automake-1.14
./configure --prefix=/usr
make
sudo make install
cd ..
 
#升级bsion
wget http://ftp.gnu.org/gnu/bison/bison-2.5.1.tar.gz
tar xvf bison-2.5.1.tar.gz
cd bison-2.5.1
./configure --prefix=/usr
make
sudo make install
cd ..
 
#安装boost
wget http://sourceforge.net/projects/boost/files/boost/1.55.0/boost_1_55_0.tar.gz
tar xvf boost_1_55_0.tar.gz
cd boost_1_55_0
./bootstrap.sh
sudo ./b2 install
cd ..
 
#安装thrift,编译会比较久,内存最好1024M以上
git clone https://git-wip-us.apache.org/repos/asf/thrift.git
cd thrift
./bootstrap.sh
./configure
make
sudo make install
cd ..
 
#查看版本
thrift -version
 
#安装thrift_protocol扩展,仅支持二进制读写
cd thrift/lib/php/src/ext/thrift_protocol
phpize
./configure
sudo make
sudo make install
#这里不需要更改php.ini,已自动在/etc/php.d/thrift_protocol.ini里面添加
php -m | grep thrift

Thrift的PHP类库位于thrift/lib/php/lib/Thrift目录下面,Thrift对于数据传输格式、数据传输方式,服务器模型均做了定义,方便自行扩展。

数据传输格式(protocol)是定义的了传输内容,对Thrift Type的打包解包,包括

  • TBinaryProtocol,二进制格式,TBinaryProtocolAccelerated则是依赖于thrift_protocol扩展的快速打包解包。
  • TCompactProtocol,压缩格式
  • TJSONProtocol,JSON格式
  • TMultiplexedProtocol,利用前三种数据格式与支持多路复用协议的服务端(同时提供多个服务,TMultiplexedProcessor)交互

数据传输方式(transport),定义了如何发送(write)和接收(read)数据,包括

  • TBufferedTransport,缓存传输,写入数据并不立即开始传输,直到刷新缓存。
  • TSocket,使用socket传输
  • TFramedTransport,采用分块方式进行传输,具体传输实现依赖其他传输方式,比如TSocket
  • TCurlClient,使用curl与服务端交互
  • THttpClient,采用stream方式与HTTP服务端交互
  • TMemoryBuffer,使用内存方式交换数据
  • TPhpStream,使用PHP标准输入输出流进行传输
  • TNullTransport,关闭数据传输
  • TSocketPool在TSocket基础支持多个服务端管理(需要APC支持),自动剔除无效的服务器
  • TNonblockingSocket,非官方实现非阻塞socket

服务模型,定义了当PHP作为服务端如何监听端口处理请求

  • TForkingServer,采用子进程处理请求
  • TSimpleServer,在TServerSocket基础上处理请求
  • TNonblockingServer,基于libevent的非官方实现非阻塞服务端,与TNonblockingServerSocket,TNonblockingSocket配合使用

另外还定义了一些工厂,以便在Server模式下对数据传输格式和传输方式进行绑定

  • TProtocolFactory,数据传输格式工厂类,对protocol的工厂化生产,包括TBinaryProtocolFactory,TCompactProtocolFactory,TJSONProtocolFactory
  • TTransportFactory,数据传输方式工厂类,对transport的工厂化生产,作为server时,需要自行实现
  • TStringFuncFactory,字符串处理工厂类

其他文件便是异常,字符串处理,自动加载器的定义等等。

现在开始编写一个简单接IDL文件HelloWorld.thrift

1
2
3
4
5
namespace php Services.HelloWorld
service HelloWorld
{
    string sayHello(1:string name);
}

然后通过生成器生成PHP文件

1
2
#不指明:server不生成processor。。
thrift --gen php:server HelloWorld.thrift

生成文件在gen-php目录下面的Services/HelloWord/HelloWorld.php(目录与namesapce定义一致),这是个公共文件,服务端和客户端都需要包括它。其中客户端调用的代码(HelloWorldClient )已经生成好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//服务端需要继承该接口
interface HelloWorldIf {
  /**
   * @param string $name
   * @return string
   */
  public function sayHello($name);
}
//提供给客户端调用的方法
class HelloWorldClient implements \Services\HelloWorld\HelloWorldIf {
  public function sayHello($name)
  {
    $this->send_sayHello($name);
    return $this->recv_sayHello();
  }
  public function send_sayHello($name)
  {
  }
  public function recv_sayHello()
  {
  }
}
//HelloWord类sayHello方法参数读取
class HelloWorld_sayHello_args {
}
//HelloWord类sayHello方法结果写入
class HelloWorld_sayHello_result {
}
//作为服务端才会生成
class HelloWorldProcessor {
}

而服务端的服务实现代码则需要继承HelloWorldIf 实现代码HelloWorldHandler.php

1
2
3
4
5
6
7
8
9
<?php
namespace Services\HelloWorld;
 
class HelloWorldHandler implements HelloWorldIf {
  public function sayHello($name)
  {
      return "Hello $name";
  }
}

编写服务端代码Server.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<?php
namespace Services\HelloWorld;
 
error_reporting(E_ALL);
 
define('THRIFT_ROOT', __DIR__.'/../../../');
require_once  THRIFT_ROOT.'Thrift/ClassLoader/ThriftClassLoader.php';
 
use Thrift\ClassLoader\ThriftClassLoader;
 
$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift',  THRIFT_ROOT);
$loader->registerDefinition('Service',  THRIFT_ROOT.'/gen-php');
$loader->register();
 
use Thrift\Exception\TException;
use Thrift\Factory\TBinaryProtocolFactory;
use Thrift\Factory\TBufferedTransportFactory;
 
use Thrift\Server\TServerSocket;
use Thrift\Server\TSimpleServer;
 
//use Thrift\Server\TNonblockingServerSocket;
//use Thrift\Server\TNonblockingServer;
 
//use Thrift\Protocol\TBinaryProtocol;
//use Thrift\Transport\TPhpStream;
//use Thrift\Transport\TBufferedTransport;
 
 
try {
    require_once 'HelloWorldHandler.php';
    $handler = new \Services\HelloWorld\HelloWorldHandler();
    $processor = new \Services\HelloWorld\HelloWorldProcessor($handler);
     
    $transportFactory = new TBufferedTransportFactory();
    $protocolFactory = new TBinaryProtocolFactory(true, true);
     
    //作为cli方式运行,监听端口,官方实现
    $transport = new TServerSocket('localhost', 9090);
    $server = new TSimpleServer($processor, $transport, $transportFactory, $transportFactory, $protocolFactory, $protocolFactory);
    $server->serve();
     
    //作为cli方式运行,非阻塞方式监听,基于libevent实现,非官方实现
    //$transport = new TNonblockingServerSocket('localhost', 9090);
    //$server = new TNonblockingServer($processor, $transport, $transportFactory, $transportFactory, $protocolFactory, $protocolFactory);
    //$server->serve();
 
    //客户端和服务端在同一个输入输出流上
    //使用方式
    //1) cli 方式:php Client.php | php Server.php
    //2) cgi 方式:利用Apache或nginx监听http请求,调用php-fpm处理,将请求转换为PHP标准输入输出流
    //$transport = new TBufferedTransport(new TPhpStream(TPhpStream::MODE_R | TPhpStream::MODE_W));
    //$protocol = new TBinaryProtocol($transport, true, true);
    //$transport->open();
    //$processor->process($protocol, $protocol);
    //$transport->close();
     
} catch (TException $tx) {
    print 'TException: '.$tx->getMessage()."\n";
}

服务端创建的步骤:

  • 首先初始化服务提供者handler
  • 然后利用该handler初始化自动生成的processor
  • 初始化数据传输方式transport
  • 利用该传输方式初始化数据传输格式protocol
  • 开始服务

编写客户端代码Client.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?php
namespace Services\HelloWorld;
 
error_reporting(E_ALL);
 
define('THRIFT_ROOT', __DIR__.'/../../../');
require_once  THRIFT_ROOT.'Thrift/ClassLoader/ThriftClassLoader.php';
 
use Thrift\ClassLoader\ThriftClassLoader;
 
$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift',  THRIFT_ROOT);
$loader->registerDefinition('Service',  THRIFT_ROOT.'/gen-php');
$loader->register();
 
//use Thrift\Transport\TPhpStream;
 
use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\TBufferedTransport;
use Thrift\Exception\TException;
 
try {
    //仅在与服务端处于同一输出输出流有用
    //使用方式:php Client.php | php Server.php
    //$transport = new TBufferedTransport(new TPhpStream(TPhpStream::MODE_R | TPhpStream::MODE_W));
     
    //socket方式连接服务端
    //数据传输格式和数据传输方式与服务端一一对应
    //如果服务端以http方式提供服务,可以使用THttpClient/TCurlClient数据传输方式
    $transport = new TBufferedTransport(new TSocket('localhost', 9090));
    $protocol = new TBinaryProtocol($transport);
    $client = new \Services\HelloWorld\HelloWorldClient($protocol);
 
    $transport->open();
     
    //同步方式进行交互
    $recv = $client->sayHello('Courages');
    echo "\n sayHello11dd:".$recv." \n";
     
    //异步方式进行交互
    $client->send_sayHello('Us');
    echo "\n send_sayHello \n";
    $recv = $client->recv_sayHello();
    echo "\n recv_sayHello:".$recv." \n";
     
    $transport->close();
} catch (TException $tx) {
    print 'TException: '.$tx->getMessage()."\n";
}
    

客户端调用的步骤:

  • 初始化数据传输方式transport,与服务端对应
  • 利用该传输方式初始化数据传输格式protocol,与服务端对应
  • 实例化自动生成的Client对象
  • 开始调用

在终端上运行

1
2
3
4
5
6
7
8
9
#以cli方式运行TPhpStream
#php Client.php | php Server.php
 
#先运行Server.php
#要不然会报错:TException: TSocket: Could not connect to localhost:9090 (Connection refused [111])
php Server.php
 
#在另外一个终端运行
php Client.ph

官方给的例子,PHP作为服务端是以web方式进行提供的,在cli方式下并不能运行。

Thrift作为一个跨语言的服务框架,方便不同语言、模块之间互相调用,解耦服务逻辑代码,拓展了PHP的处理能力(如与Hbase交互),使得WEB架构更具弹性。与基于 SOAP 消息格式的 Web Service和基于 JSON 消息格式的 RESTful 服务不同,Thrif数据传输格式默认采用二进制传格式,对 XML 和 JSON 体积更小,但对于服务端的CPU占用比JSON、XML要高。PHP虽然有thrift_protocol扩展,但仅仅作为二进制数据传输格式化使用,其他文件的加载仍然为PHP,需要更多的开销。

如果由PHP来做为Thrift的服务端,仅仅这样子做仍然是不够的,Thrift仅仅实现的数据定义和传输,未实现RPC架构

  • 需要避免重复加载各类文件,是否做成PHP扩展
  • 数据传输格式和方式是否适需要自行扩展
  • 客户端要能够自动连可使用的服务端,剔除失效的服务器
  • 服务端需要处理客户端并发情况,是否多进程/异步处理
  • 服务端需要监控服务是否正常

workerman-thrift-rpc对这些问题进行了解决,基于thrift提供了一个可靠性的RPC框架。对客户端和服务端的调用做了封装,提供统一入口,利用workerman做socket中转,当客户端发出请求时,将给socket转给服务端使用,提供服务。workerman-json-rpc与workerman-thrift-rpc类似,采用异步(分步)收发,但简单多了,更像是一种约定。数据格式,发送时仅发送class,function,parameters三个参数,接收时,仅code,msg,data三个返回值,在格式约束及跨语言上,需要自行处理;不需要thrift那样依赖于生成器所生成的文件,客户端完全独立于服务端。

注:以上示例使用修改过的代码,附上代码:thrift

参考链接:
Apache Thrift – 可伸缩的跨语言服务开发框架
Thirft框架介绍
Apache Thrift
Building Apache Thrift on CentOS 6.5
PHP Tutorial
Creating a public API with Apache Thrift
hadoop + Hbase + thrift + php 安裝設定與程式設計
php实现的thrift socket server
Our own “Hello World!”

PHP 进程控制PCNTL

PHP的进程控制PCNTL支持实现了Unix方式的进程创建, 程序执行,信号处理以及进程的中断。 PCNTL只支持linux平台下cli模式,不支持Windows平台,也不能被应用在Web服务器环境(cgi等),当其被用于Web服务环境时可能会带来意外的结果。通常,PCNTL会结合另外一个扩展来使用POSIX来开发(也不支持Windows平台)。

pcntl_fork可以创建一个子进程,父进程和子进程 都从fork的位置开始向下继续执行。创建成功时,父进程得到的返回值是子进程号而子进程得到的返回值是0;创建失败时,父进程得到返回值是-1,不会创建子进程,并触发一个PHP错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
<?php
 
$pid = pcntl_fork();
//父进程和子进程都会执行下面代码
if ($pid == -1) {
    //错误处理:创建子进程失败时返回-1.
     die('could not fork');
} else if ($pid) {
     //父进程会得到子进程号,所以这里是父进程执行的逻辑
     pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
} else {
     //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
}

在对应的父进程结束执行后,子进程就会变成孤儿进程,但之后会立即由init进程(进程ID为1)“收养”为其子进程。

某一子进程终止执行后,若其父进程未提前调用wait,则内核会持续保留子进程的退出状态等信息,以使父进程可以wait获取之[2] 。而因为在这种情况下,子进程虽已终止,但仍在消耗系统资源,所以其亦称僵尸进程。wait常于SIGCHLD信号的处理函数中调用。

为避免产生僵尸进程,一般采取的方式是:将父进程中对SIGCHLD信号的处理函数设为SIG_IGN(忽略信号);fork两次并杀死一级子进程,令二级子进程成为孤儿进程而被init所“收养”、清理。

采用二次创建子进程的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php
    $pid = pcntl_fork();
    if($pid) {
        //创建成功,在父进程中执行
        echo "run in parent process";//pcntl_wait($status);
    } else if($pid == -1) {
        //创建失败,在父进程中处理
        echo "Couldn't create child process.";
    } else {
        //创建成功,在子进程中执行
        //再次创建子进程,即孙进程
        $pid = pcntl_fork();
        if($pid == 0) {
            //在孙进程中执行
            if(-1 == posix_setsid())
            {
                // 出错退出
                exit("Setsid fail");
            }
            echo "run in grandchild process";
        } else if($pid == -1) {
            echo "Couldn’t create child process.";
        } else {
            //在子进程中处理
            echo "run in child process.";//posix_kill(posix_getpid(), SIGUSR1);
            exit;
        }
    }

通常还会把子进程的pid收集以来,以便监控、回收,如workerman。二次创建子进程通常应用在PHP多进程,守护进程上,比如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?php
defined('DEAMON_LOCK_FILE') ||
define('DEAMON_LOCK_FILE', 'run/deamon.pid');
 
if($_SERVER['argc'] >= 2 && $_SERVER['argv'][1] == 'kill')
{
    $fh = fopen(realpath(__DIR__) . '/' . DEAMON_LOCK_FILE, 'r');
    $pid = fread($fh, 8);
 
    if( $pid )
        posix_kill($pid, SIGTERM);
 
    exit;
}
 
global $DEAMON_LOCK_HANDLER;
 
function daemonize($signalHandler = false ) {
    global $DEAMON_LOCK_HANDLER;
 
    if( ! deamon_file_lock() ) {
        printf("Deamon is already running...\n");
        exit();
    }
 
    umask(0);
 
    $pid = pcntl_fork();
 
    if( $pid < 0 ) {
        printf("Can't fork\n");
        exit;
    }
    else if( $pid ) {
        exit;
    }
 
    $sid = posix_setsid();
 
    if( $sid < 0 ) {
        printf("Can't set session leader\n");
        exit;
    }
 
    deamon_bind_signals($signalHandler);
 
    $pid = pcntl_fork();
 
    if( $pid < 0 || $pid ) {
        exit;
    }
 
    ftruncate($DEAMON_LOCK_HANDLER, 0);
    fwrite($DEAMON_LOCK_HANDLER, posix_getpid());
 
    chdir('/');
 
    fclose( STDIN );
    fclose( STDOUT );
    fclose( STDERR );
}
 
function deamon_bind_signals($signalHandler = false) {
    $signalHandler = !$signalHandler ? "deamon_signal_handler" : $signalHandler;
 
    pcntl_signal(SIGTERM, $signalHandler);
    pcntl_signal(SIGHUP,  $signalHandler);
    pcntl_signal(SIGUSR1, $signalHandler);
    pcntl_signal(SIGINT, $signalHandler);
}
 
function deamon_file_lock() {
    global $DEAMON_LOCK_HANDLER;
    $DEAMON_LOCK_HANDLER = fopen(realpath(__DIR__) . '/' . DEAMON_LOCK_FILE, 'c');
 
    if( ! $DEAMON_LOCK_HANDLER ) {
        printf("Can't open lock file\n");
        die();
    }
    if( !flock( $DEAMON_LOCK_HANDLER, LOCK_EX | LOCK_NB ) ) {
        return false;
    }
    return true;
}
 
function deamon_signal_handler($signo) {
    switch( $signo ) {
        case SIGTERM:
        case SIGHUP:
        case SIGUSR1:
            break;
    }
}
 
function sighandler($sig) {
        //do something
    if( $sig == SIGTERM ) {
        global $DEAMON_LOCK_HANDLER;
        fclose( $DEAMON_LOCK_HANDLER );
        exit;
    }
}
daemonize("sighandler");
 
while( 1 ) {
    pcntl_signal_dispatch();
    // do something here
    sleep( 1 );
}

可以通过ps -ef | grep php查看过程中的php进程产生情况,CentOS下安装PHP5.4的Posix扩展为:sudo yum instal php54w-process。

pcntl_signal可以注册信号处理函数,捕获信号后交给对应回调函数处理,实现信号通信,例如当某一子进程结束、中断或恢复执行时,内核会发送SIGCHLD信号予其父进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?php
declare(ticks = 1);
 
pcntl_signal(SIGCHLD, "signal_handler");
 
function signal_handler($signal) {
    switch($signal) {
        case SIGCHLD:
            while (pcntl_waitpid(0, $status) != -1) {
                $status = pcntl_wexitstatus($status);
                echo "Child $status completed\n";
            }
 
            exit;
    }
}
 
for ($i = 1; $i <= 5; ++$i) {
    $pid = pcntl_fork();
 
    if (!$pid) {
        sleep(1);
        print "In child $i\n";
        exit($i);
    }
}
 
while(1) {
    // parent does processing here...
}

pcntl_alarm创建一个计时器,在指定的秒数后向进程发送一个SIGALRM信号,结合pcntl_signal和pcntl_alarm可以做一个秒级的定时器(注意:pcntl_alarm是一次性消耗,需要再次设置)

1
2
3
4
5
6
7
8
9
10
11
12
13
declare(ticks = 1);
 
function signal_handler($signal) {
    //do your work here
    print "Caught SIGALRM\n";
    pcntl_alarm(3);
}
 
pcntl_signal(SIGALRM, "signal_handler", true);
pcntl_alarm(3);
 
while(1) {
}

利用PHP的进程控制便可以实现守护进程监控,如socke端口监听;多进程处理,如socke请求事件处理、任务并行、异步处理,提升PHP程序性能。

参考链接:
PHP 进程控制
Getting into multiprocessing
Timing your signals
PHP Deamon
PHP中利用pcntl进行多进程并发控制
PHP高级编程之守护进程
PHP多进程编程一,PHP多进程编程二。
PHP的ticks机制
PHP如何将进程作为守护进程
Daemonising a PHP cli script on a posix system
异步毫秒定时器
The declare() function and ticks
子进程

PHP队列开发之Beanstalk

Beanstalk是一个基于内存的(binlog持久化到硬盘),事件驱动(libevent),简单、快速的任务队列,支持大部分编程语言,将前台的任务转为后台异步处理,为web开发提供更高弹性。它可以支持多个server(客户端支持),一个任务只会被投递到一台server,一个任务只会被一个消费者获取(Reverse)。

相比RabbitMQ,Beanstalk作为一个任务队列,设计比较简单,支持以下特性:

  • 优先级(priority),可以对任务进行优先处理(或降级),越小的值优先级越高(0~4,294,967,295),默认按先进先出(FIFO)
  • 延迟执行(delay),一个任务创建完成并稍后再执行(比如等待主从同步)
  • 超时重试(TTR),一个任务没有在指定时间内完成,将会被重新投递,由其他客户端处理。客户端也可以主动进行延时(touch)或重新入队(release)
  • 隐藏(bury),一个任务执行失败了,可以先隐藏,隐藏的任务可以被重新激活(kick);

一个任务如果没有被删除,那么它就可以被重新获取。下面是大多数任务的生命周期:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
put with delay               release with delay
----------------> [DELAYED] <------------.
                      |                   |
                      | (time passes)     |
                      |                   |
 put                  v     reserve       |       delete
-----------------> [READY] ---------> [RESERVED] --------> *poof*
                     ^  ^                |  |
                     |   \  release      |  |
                     |    `-------------'   |
                     |                      |
                     | kick                 |
                     |                      |
                     |       bury           |
                  [BURIED] <---------------'
                     |
                     |  delete
                      `--------> *poof*

CentOS下安装Beanstalkd

1
2
3
4
sudo yum install beanstalkd
#启动beanstalk
sudo service beanstalkd start
#beanstalkd -l 192.168.33.14 -p 11300

PHP下面有个C扩展beanstalk库可以使用,基于libbeanstalkclient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
git clone https://github.com/bergundy/libbeanstalkclient.git
cd libbeanstalkclient
mkdir m4
#开始编译
sudo ./autogen.sh
 
#创建libbeanstalkclient.conf,内容为/usr/lib
sudo vim /etc/ld.so.conf.d/libbeanstalkclient.conf
#使配置生效
sudo ldconfig
 
git clone https://github.com/nil-zhang/php-beanstalk.git
cd php-beanstalk
phpize
./configure
sudo make
sudo make install
sudo vim /etc/php.ini

编辑php.ini增加以下内容

1
2
[beanstalk]
extension = "beanstalk.so"

查看是否加载成功

1
2
3
php -m
#加载成功则重启php-fpm
sudo service php-fpm restart

PHP测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?php
    $bsc = new Beanstalk();
 
    $bsc->addserver("192.168.33.14", 11300);
    $bsc->addserver("192.168.33.12", 11300);
 
    $tubes = $bsc->list_tubes();
    print_r($tubes);
 
    for($i = 0; $i < 10; $i++)
    {
        $key = "key".$i;
        $value = "value".$i;
 
        $bsc->use($key);
        $bsc->put($key, $value);
        echo "$key\t$value\n";
 
        $bsc->watch($key);
        $job = $bsc->reserve($key);
        print_r($job);
 
        if($bsc->bury($job['id'], $key))
            echo "bury ok\n";
        else
            echo "bury failed\n";
 
        $bsc->kick(100, $key);
        if($bsc->delete($job['id'], $key))
            echo "delete ok\n";
        else
            echo "delete failed \n";
 
        $bsc->ignore($key);
        echo "\n";
    }
 
    echo "done\n";

注意由于Beanstalk服务端实现的比较简单,协议特性需要客户端支持,不同的实现可能效果不一样,这个客户端并没有实现延时发送(delay),超时重试(TTR)。需要这些特性建议使用这个库:PHP Beanstalkd。前台生产者创建任务:

1
2
3
4
5
6
7
8
9
<?php
include 'lib/Beanstalk.php';
$bean = Beanstalk::init();
$bean->addServer('192.168.33.14', 11300);
$bean->addServer('192.168.33.12', 11300);
$bean->useTube('my-tube');
$bean->put('Hello World!', 1024);
$bean->put('Hello World!2', 1023);
$bean->put(json_encode(array('what','how')), 1000, 1, 1);

后台消费者处理任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
include 'lib/Beanstalk.php';
$bean = Beanstalk::init();
$bean->addServer('192.168.33.12', 11300);
$bean->addServer('192.168.33.14', 11300);
$bean->watchTube('my-tube');
 
while (true)
{
    try
    {
        $job = $bean->reserve($timeout = 10);
 
        /* process job ... */
        var_dump($job);
        //var_dump($job->getMessage());
 
        $job->delete();
    }
    catch (BeanstalkException $e)
    {
        switch ($e->getCode())
        {
            case BeanstalkException::TIMED_OUT:
                echo "Timed out waiting for a job.  Retrying in 1 second.";
                sleep(1);
                continue;
                break;
            default:
                throw $e;
                break;
        }
    }
}

注意:客户端获取任务(reverse)是阻塞的(blocking),直到超时;同一个队列(tube)的任务按FIFO进行处理(除非指定优先级);任务内容长度不能超过65536;作为内存队列需要注意是否会内存超出,可以快速处理到Mysql。

使用Beanstalk任务队列提升PHP异步处理能力,降低程序耦合度,使前台更专注,后台处理耗时、扩展性任务(也可以使用其他语言开发),使得web架构更具扩展性。

参考链接:
Scalable Work Queues with Beanstalk
Beanstalk Protocol
Frequently Asked Questions for beanstalkd
Getting Started with Beanstalkd
Queue your work
Asynchronous Processing in Web Applications, Part 2: Developers Need to Understand Message Queues

PHP异步IO

为了能更快的响应请求,耗时任务的执行不能阻塞当前脚本执行,而是放在最后执行,比如fastcgi_finsh_request。现在又多了一个方法,PHP也可以进行异步IO处理了。

PHP扩展eio是基于C语言的libeio库中开发的异步IO处理,可用于异步文件读写,自定义异步任务执行。Nodejs使用的libuv库封装了libeio和libev(libev也有对应的PHP扩展:ev),前者提供异步IO,后者提供高性能的事件驱动,进行高性能的请求的处理。另外,PHP异步多线程框架Swoole也提供了异步IO处理的能力。

在CentOS下编译安装PHP eio扩展:

1
2
3
4
5
6
7
wget https://pecl.php.net/get/eio-1.2.5.tgz
tar -zxvf eio-1.2.5.tzg
cd eio-1.2.5
phpize
./configure
sudo make
sudo make install

更改php.ini添加扩展

1
2
[eio]
extension=eio.so

查看是否安装成功
eio0
注意:eio扩展必须在socket扩展之后加载,否则会提示’ undefined symbol: php_sockets_le_socket in Unknown on line 0’。

PHP eio支持libeio的所有操作,当调用eio_poll/eio_event_loop时,触发IO处理,处理完成进行回调;未主动调用eio_poll/eio_event_loop则不阻塞当前程序执行,等待脚本执行结束后在提交异步处理。使用eio_customeio_nop可以自定义异步任务调用。

以下以写文件进行比较,示例一是常规的文件写操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$strBasePath = dirname(__FILE__);
$strFileName = $strBasePath.'/file.log';
$strContent = 'something to log ...';
 
$nStart = microtime(true);
$rFile = fopen($strFileName ,'a+');
for($i=0;$i<1000;$i++){
    $nLength = fwrite($rFile, $strContent);
}
fclose($rFile);
echo "done\r\n";
 
register_shutdown_function(function(){
    global $nStart;
    echo microtime(true) - $nStart;
    echo "\r\n";
});

示例二是PHP eio的异步写操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
$strBasePath = dirname(__FILE__);
$strFileName = $strBasePath.'/file.log';
$strContent = 'something to log ...';
$nLenth = strlen($strContent);
 
function eioOpenCallBack($p_mxData ,$p_mxResult){
    //echo "Open.\r\n";
    if ($p_mxResult > 0) {
        eio_fstat($p_mxResult, EIO_PRI_DEFAULT, "eioStatCallBack", $p_mxResult);
    }
}
function eioStatCallBack($p_mxData ,$p_mxResult){
    global $strContent ,$nLenth;
    //echo "Stat.\r\n";
    if ($p_mxResult > 0) {
        eio_write($p_mxData, $strContent, $nLenth, $p_mxResult['size'], EIO_PRI_DEFAULT, "eioWriteCallBack", $p_mxData);
    }
}
function eioWriteCallBack($p_mxData ,$p_mxResult){
    //echo "Write.\r\n";
    if ($p_mxResult > 0) {
        eio_close($p_mxData, EIO_PRI_DEFAULT, "eioCloseCallBack", $p_mxData);
    }
}
function eioCloseCallBack($p_mxData ,$p_mxResult){
    //echo "Close.\r\n";
    if($p_mxResult == 0){
        //echo "End\r\n";
    }
}
 
$nStart = microtime(true);
for($i=0;$i<1000;$i++){
    eio_open($strFileName, EIO_O_CREAT | EIO_O_RDWR, EIO_S_IRUSR | EIO_S_IWUSR,EIO_PRI_DEFAULT, "eioOpenCallBack", $strFileName);
     
    //echo "Begin\r\n";
     
    //eio_event_loop();
}
echo "done\r\n";
register_shutdown_function(function(){
    global $nStart;
    echo microtime(true) - $nStart;
    echo "\r\n";
});

示例三是swoole扩展的异步写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$strBasePath = dirname(__FILE__);
$strFileName = $strBasePath.'/file.log';
$strContent = 'something to log ...';
 
$nStart = microtime(true);
for($i=0;$i<1000;$i++){
    swoole_async_write($strFileName ,$strContent);
}
echo "done\r\n";
register_shutdown_function(function(){
    global $nStart;
    echo microtime(true) - $nStart;
    echo "\r\n";
});

对比情况
eio1
可以看出,eio和swoole提交异步IO处理后,处理非常快,并未阻塞当前进程运行;eio更像是只花了循环的时间。swoole只能运行于CLI模式,使用了Linux Native AIO,处理非常快,写法也比较简单。eio可以运行于CGI和CLI模式,提交异步处理后会创建至多4个线程(eio_nthreads)进行处理,处理完成后仍然会返回主线程,所以PHP脚本执行结束后,主线程仍然会在在那里等待。eio的异步处理通过回调进行保证,写法上更加复杂。这就需要对异步回调进行包装,提供类似同步的代码,参见协程

参考链接:
PHP异步执行长时间任务
PHP的生命周期
libeio源码学习
PHP新增eio扩展,可以写类似node.js一样的异步IO了
深入浅出Node.js(五):初探Node.js的异步I/O实现
异步AIO的研究
关于C10K、异步回调、协程、同步阻塞
协程
linux AIO (异步IO) 那点事儿
什么程序设计语言机制是处理异步 IO 最恰当的抽象?
nodejs异步IO的实现
向facebook学习,通过协程实现mysql查询的异步化
Python 中的进程、线程、协程、同步、异步、回调
一个“蝇量级” C 语言协程库