标签归档:thrift

PHP Socket通信

前一篇介绍了跨语言的服务调用框架Thrift,模块与模块之间调用,网络通信必不可少。这里具体介绍下如何使用PHP socket客户端与服务端进行通信。

PHP 的Socket扩展是基于流行的BSD sockets,实现了和socket通讯功能的底层接口,它可以和通用客户端一样当做一个socket服务器。这里的通用客户端是指stream_socket_*系列封装的函数。

首先写一个socket服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?php
class ServerSocket{
    protected $strHost = "127.0.0.1";
    protected $nPort = 2015;
    protected $nProtocol = SOL_TCP;
    protected $pSocket = null;
    protected $pClient = null;
 
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
        //参数验证
        $this->strHost = $p_strHost;
        $this->nPort = $p_nPort;
        $this->nProtocol = $p_nProtocol;
        if($this->_create()&&$this->_bind()){
            $this->_listen();
        }
    }
    protected function _create(){
        $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
        if(!$this->pSocket){
            $this->_log();
        }
        return $this->pSocket;
    }
    protected function _bind(){
        $bRes = socket_bind($this->pSocket, $this->strHost, $this->nPort);
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    protected function _listen(){
        $bRes  = socket_listen($this->pSocket, 10) ;
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function accept(){
        $this->pClient = socket_accept($this->pSocket);
        if(!$this->pClient){
            $this->_log();
        }
        return $this->pClient;
    }
    protected function _connect(){
        $this->accept();
             
        if(socket_getpeername($this->pClient, $address, $port)){
            echo "Client $address : $port is now connected to us. \n";
        }
        $this->write("hello world from server\n");
    }
    protected function _reply(){
        $mxData = $this->read();
        var_dump($mxData);
        if ($mxData == false) {
            socket_close($this->pClient);
 
            echo "client disconnected.\n";
            return false;
        }
        else{
            $strMessage = "Client: ".trim($mxData)."\n";
            $this->write($strMessage);
            return true;
        }
    }
    public function run(){
        $this->_connect();
        $this->_reply();
    }
    public function read(){
        $mxMessage = socket_read($this->pClient, 1024, PHP_BINARY_READ);
        if($mxMessage === false){
            $this->_log();
        }
        return $mxMessage;
    }
    public function write($p_strMessage){
        $bRes = socket_write($this->pClient, $p_strMessage, strlen ($p_strMessage));
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function close(){
        $bRes = socket_close($this->pSocket);
 
        $this->pSocket = null;
    }
    protected function _log(){
        $this->strErrorCode = socket_last_error();
        $this->strErrorMsg = socket_strerror($this->strErrorCode);
        //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
    }
    public function __destruct(){
        if($this->pSocket){
            $this->close();
        }
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new ServerSocket($strHost, $nPort);
$pServer->run();

这里对socket_*系列函数进行了包装,创建socket服务端的步骤

  • 使用socket_create创建socket(套接字)。第一个参数AF_INET指IPV4网络协议,TCP和UDP均可使用,对应IPV6网络协议为AF_INET6,也可以使用UNIX socket协议AF_UNIX,作进程间通信
  • 。第二个参数对应套接字类型,SOCK_STREAM对应TCP协议使用,SOCK_DGRAM对应UDP协议使用,还有SOCK_SEQPACKET,SOCK_RAW,SOCK_RDM等类型。第三个为协议类型,TCP协议对应常量SOL_TCP,UDP协议对应常量SOL_UDP,其他协议可以从getprotobyname函数获取。

  • 使用socket_bind将套接字绑定到对应的主机端口或者UNIX socket上
  • 使用socket_listen监听该套接字上的连接
  • 使用socket_accept接收套接字上的请求连接,返回一个新的套接字用于与客户端通信。如果没有连接接入,将会阻塞住;如果有多个连接,使用第一个达到的连接。
  • 开始通信,使用socket_read获取请求信息,使用socket_write返回响应结果
  • 使用socket_close关闭连接,包括原始的和socket_accept产生的套接字

这个过程中,可以使用socket_last_error和socket_strerror获取错误信息。接着创建客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
<?php
class ClientSocket{
    protected $strHost = "127.0.0.1";
    protected $nPort = 2015;
    protected $nProtocol = SOL_TCP;
    private $pSocket = null;
    public $strErrorCode = "";
    public $strErrorMsg  = "";
 
    public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
        //参数验证
        $this->strHost = $p_strHost;
        $this->nPort = $p_nPort;
        $this->nProtocol = $p_nProtocol;
        if($this->_create()){
            $this->_connect();
        }
    }
    private function _create(){
        $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
        if(!$this->pSocket){
            $this->_log();
        }
        return $this->pSocket;
    }
    private function _connect(){
        $pSocket = $this->_create();
        $bRes = socket_connect($pSocket, $this->strHost, $this->nPort);
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function read(){
        $strMessage = "";
        $strBuffer = "";
        while ($strBuffer = socket_read ($this->pSocket, 1024, PHP_BINARY_READ)) {
            $strMessage .= $strBuffer;
        }
        return $strMessage;
    }
    public function write($p_strMessage){
        $bRes = socket_write($this->pSocket, $p_strMessage, strlen($p_strMessage));
        if(!$bRes){
            $this->_log();
        }
    }
    public function send($p_strMessage){
        $bRes = socket_send($this->pSocket , $p_strMessage , strlen($p_strMessage) , 0);
        if(!$bRes){
            $this->_log();
        }
        return true;
    }
    public function recv(){
        $strMessage = "";
        $strBuffer = "";
        $bRes = socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL);
        if(!$bRes){
            $this->_log();
        }
        $strMessage .=$strBuffer;
        return $strMessage;
    }
    public function close(){
        $bRes = socket_close($this->pSocket);
 
        $this->pSocket = null;
    }
    private function _log(){
        $this->strErrorCode = socket_last_error();
        $this->strErrorMsg = socket_strerror($this->strErrorCode);
        //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
    }
    public function __destruct(){
        if($this->pSocket){
            $this->close();
        }
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$pClient = new ClientSocket($strHost, $nPort);
 
var_dump($pClient->read());
$strMessage = 'Some Thing :'.uniqid();
var_dump($strMessage);
$pClient->write($strMessage);
var_dump($pClient->read());
 
/*
 var_dump($pClient->recv());
$pClient->send('hello');
var_dump($pClient->recv());
*/
$pClient->close();

客户端的创建步骤:

  • 使用socket_create创建socket套接字,与服务端对应
  • 使用socket_connect连接到服务端的地址或UNIX socket
  • 开始通信,可以使用socket_write和socket_read向套接字写入和读取信息,也可以使用socket_send和socket_recv发送和接收信息
  • 使用socket_close关闭连接

运行服务端程序

1
php serversocket.php

在另一个终端里运行

1
2
[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             0.0.0.0:*                   LISTEN      12139/php

如果运行服务端失败,提示 socket_bind(): unable to bind address [98]: Address already in use ,则是端口绑定失败。查看端口占用

1
2
[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             127.0.0.1:36618             TIME_WAIT   -

该端口处于TIME_WAIT状态,需要再等一会儿才会释放。这是因为TCP连接关闭需要四次握手,服务端主动关闭了连接,但是未收到客户端发过来的关闭确认,导致处于等待状态,具体原因见火丁笔记《再叙TIME_WAIT》

如果服务端已经运行成功,在另一个终端里运行客户端程序

1
php clientsocket.php

这是一个简单服务端/客户端请求应答模型,通常服务端会一直处于监听状态,处理新的请求,重新写一个循环监听的服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SelectServerSocket extends ServerSocket{
    public function run(){
        $this->loop();
    }
    public function loop(){
        $arrRead = array();
        $arrWrite = $arrExp = null;
        $arrClient = array($this->pSocket);
     
        while(true){
            $arrRead = $arrClient;
            if (socket_select($arrRead, $arrWrite, $arrExp, null) < 1){
                continue;
            }
            foreach ($arrRead as $pSocket){
                if($pSocket == $this->pSocket){
                    $this->_connect();
                         
                    $arrClient[] = $this->pClient;
                }
                else{
                    $bRes = $this->_reply();
                    if($bRes === false){
                        $nKey = array_search($this->pClient, $arrClient);
                        unset($arrClient[$nKey]);
                        continue;
                    }
                }
     
            }
        }
        //usleep(100);
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new SelectServerSocket($strHost, $nPort);
$pServer->run();

在循环里面使用socket_select查询有可以读的套接字,如果套接字为原始监听的套接字,则使用socket_accept获取新接入的通信套接字进行通信;如果是通信套接字,则与客户端进行交互。

这里socket_select($arrRead, $arrWrite, $arrExp, null)的第四个参数为null,表示可以无限阻塞,如果为0则不阻塞立即返回,其他大于0值则等待超时。
socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL)的第四个参数为MSG_WAITALL,表示阻塞读取结果。
socket_read ($this->pSocket, 1024, PHP_BINARY_READ )的第三个参数PHP_BINARY_READ表示读取以\0结束,PHP_NORMAL_READ表示读取以\n或\r结束

在终端里运行服务端,会一直在那里等待新的连接。这时候运行客户端,客户端确也阻塞住了。解决的办法有两种:超时设置和非阻塞设置。给ClientSocket类增加超时和阻塞设置的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public function setTimeOut($p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
    $nSend = (int)$p_nSendTimeOut;
    $nRecv = (int)$p_nRecvTimeOut;
 
    $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
    $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
 
    socket_set_option($this->pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
    socket_set_option($this->pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}
public function setBlock($p_bType = true){
    if($p_bType){
        socket_set_block($this->pSocket);
    }
    else{
        socket_set_nonblock($this->pSocket);
    }
}

客户端端运行前,先设置一下超时或非阻塞即,此时程序不会再阻塞了

1
2
3
4
5
6
$pClient = new ClientSocket($strHost, $nPort);
 
$pClient->setTimeOut(1, 1);
//$pClient->setBlock(false);
 
//do request here

同样在服务端设置超时和非阻塞也是可以的,给ServerSocket增加超时和非阻塞设置的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
protected function _setNoBlock($p_pSocket){
    socket_set_nonblock($p_pSocket);
}
protected function _setTimeOut($p_pSocket, $p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
    $nSend = (int)$p_nSendTimeOut;
    $nRecv = (int)$p_nRecvTimeOut;
 
    $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
    $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
 
    socket_set_option($p_pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
    socket_set_option($p_pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}

将SelectServerSocket的socket_accept后产生的连接设置为非阻塞

1
2
3
4
5
6
7
public function loop(){
    $arrRead = array();
    $arrWrite = $arrExp = null;
    $arrClient = array($this->pSocket);
    $this->_setNoBlock($this->pSocket);
 
    while(true){

再次运行服务端,客户端也不会阻塞了。

在while循环里面使用socket_select进行查询,效率比较低下,有先的连接要等下次循环才能处理;有时候并没有连接需要处理,也一直在循环。可以结合前面介绍过的PHP Libev扩展进行监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class EvServerSocket extends ServerSocket{
    protected function _onConnect(){
        $this->_connect();
     
        $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
            $this->_reply();
        });
        Ev::run();
    }
    public function run(){
        $pReadWatcher = new EvIo($this->pSocket, Ev::READ, function ($watcher, $revents) {
            $this->_onConnect();
        });
        Ev::run();
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new EvServerSocket($strHost, $nPort);
$pServer->run();

代码看起来简单了很多。当原始套接字监听到可读事件时,便为新的套接字也创建可读事件监听 ,在事件里面处理新的连接。

通常的服务端程序是一个进程监听原始套接字,然后交由其他进程/线程处理新的连接套接字,与客户端进行交互,提升服务端性能。这样子又涉及到了多进程/多线程的控制、通信,需要一套完善的体系才行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MulProcessServerSocket extends EvServerSocket{
    protected function _execute(){
        if(!$this->_reply()){
            //子进程执行完毕,通知父进程
            exit();
        }
    }
    protected function _onConnect(){
        $pid = pcntl_fork();
        //父进程和子进程都会执行下面代码
        if ($pid == -1) {
            //错误处理:创建子进程失败时返回-1.
            die('could not fork');
        } else if ($pid) {
            //父进程会得到子进程号,所以这里是父进程执行的逻辑
            pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
        } else {
            //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
            $this->_connect();
 
            $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
                $this->_execute();
            });
            Ev::run();
        }
 
    }
}

还可以使用stream_socket_*系列函数来创建sockt服务端和客户端。类似的创建一个客户端与之前的服务端进行交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<?php
class ClientStreamSocket{
    private $pConnetion = null;
    protected $strAddress = "tcp://127.0.0.1:2016";
    protected $nTimeOut   = 3;
    protected $nFlag      = STREAM_CLIENT_CONNECT;
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    const BLOCK   = 1;
    const NOBLOCK = 0;
    public function __construct($p_strAddress, $p_nTimeOut = 3, $p_nFlag = STREAM_CLIENT_CONNECT){
        $this->strAddress = $p_strAddress;
        $this->nTimeOut   = $p_nTimeOut;
        $this->nFlag      = $p_nFlag;
        $this->_connect();
    }
    private function _connect(){
        $this->pConnetion = stream_socket_client($this->strAddress, $this->strErrorCode, $this->strErrorMsg, $this->nTimeOut, $this->nFlag);
        if(!$this->pConnetion){
            throw new Exception("connect exception:".$this->strErrorMsg, $this->strErrorCode);
        }
        return $this->pConnetion;
    }
    public function write($p_strMessage){
        if(fwrite($this->pConnetion, $p_strMessage) !== strlen($p_strMessage))
        {
            throw new Exception('Can not send data');
        }
        return true;
    }
    public function read(){
        //接收一行,阻塞至\n结束
        //$strMessage = fgets($this->pConnetion);
        //指定长度读取
        //$strMessage = fread($this->pConnetion, 1024);
        $strMessage = stream_socket_recvfrom($this->pConnetion, 1024);
        //$strMessage = stream_get_contents($this->pConnetion);
 
        return $strMessage;
    }
    public function close(){
        fclose($this->pConnetion);
        $this->pConnetion = null;
    }
    public function setContext(){
 
    }
    public function setTimeOut($p_nTimeOut = 1){
        $bRes = stream_set_timeout($this->pConnetion, $p_nTimeOut);
    }
    public function setBlock($p_nMode = ClientStreamSocket::BLOCK){
        $bRes = stream_set_blocking($this->pConnetion, $p_nMode);
    }
    public function __destruct(){
        if($this->pConnetion){
            $this->close();
        }
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$strAddress = $strProtocol."://".$strHost.":".$nPort;
 
$pStream = new ClientStreamSocket($strAddress);
//$pStream->setBlock(ClientStreamSocket::NOBLOCK);
//$pStream->setTimeOut(1);
var_dump($pStream->read());
$pStream->write("hello from client\n");
var_dump($pStream->read());
$pStream->close();

使用stream_socket_*系列函数创建客户端要简单不少

  • 首先使用stream_socket_client创建一个socket操作流(stream)
  • 然后就可以像操作流式文件那样造成socket stream,使用fread和fwrite进行读写操作,也可以使用stream_socket_recvfrom和stream_socket_sendto进行操作
  • 使用fclose或stream_socket_shutdown关闭连接

使用stream_socket_*系列函数创建一个服务端来与之前的客户端进行交互,同样很简单,也与ServerSocket类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
<?php
class ServerStreamSocket{
    protected $pServer = null;
    protected $pClient = null;
    protected $strAddress = "tcp://127.0.0.1:2016";
    protected $nFlag      = STREAM_SERVER_LISTEN;
     
    const BLOCK   = 1;
    const NOBLOCK = 0;
     
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    public function __construct($p_strAddress, $p_nFlag = STREAM_SERVER_LISTEN){
        $this->strAddress = $p_strAddress;
        $this->nFlag = $p_nFlag;
        $this->_create();
    }
    protected function _create(){
        $this->pServer = stream_socket_server($this->strAddress, $this->strErrorCode, $this->strErrorMsg);
        if(!$this->pServer ){
            throw new Exception("create exception:".$this->strErrorMsg, $this->strErrorCode);
        }
        return $this->pServer ;
    }
    public function accept(){
        $this->pClient = stream_socket_accept($this->pServer);
        if(!$this->pClient ){
            return false;
        }
        return $this->pClient ;
    }
    protected function _connect(){
        $this->accept();
        echo "Client". stream_socket_get_name($this->pClient, true)." is now connected to us. \n";
        $this->write("hello world from server\n");
    }
    protected function _reply(){
        $mxData = $this->read();
        var_dump($mxData);
        if($mxData == false){
            fclose($this->pClient);
                 
            echo "client disconnected.\n";
            return false;
        }
        else{
            $strMessage = "Client:".trim($mxData)."\n";
            $this->write($strMessage);
            return true;
        }
    }
    public function run(){
        $this->_connect();
        $this->_reply();
    }
    public function write($p_strMessage){
        //$nLen = fwrite($this->pClient, $p_strMessage);
        $nLen = stream_socket_sendto($this->pClient, $p_strMessage);
        if($nLen !== strlen($p_strMessage))
        {
            throw new Exception('Can not send data');
        }
        return true;
    }
    public function read(){
        //接收一行,阻塞至\n结束
        //$strMessage = fgets($this->pClient);
        //指定长度读取
        //$strMessage = fread($this->pClient, 1024);
        $strMessage = stream_socket_recvfrom($this->pClient, 1024);
        //$strMessage = stream_get_contents($this->pClient);
 
        return $strMessage;
    }
    public function close(){
        fclose($this->pServer);
         
        $this->pServer = null;
    }
    public function setContext(){
 
    }
    public function setTimeOut($p_pConnetction, $p_nTimeOut = 1){
        $bRes = stream_set_timeout($p_pConnetction, $p_nTimeOut);
    }
    public function setBlock($p_pConnetction, $p_nMode = ServerStreamSocket::BLOCK){
        $bRes = stream_set_blocking($p_pConnetction, $p_nMode);
    }
    public function __destruct(){
        if($this->pServer){
            $this->close();
        }
    }
}
class SelectServerStreamSocket extends ServerStreamSocket{
    public function run(){
        $this->loop();
    }
    public function loop(){
        $arrRead = array();
        $arrWrite = $arrExp = null;
        $arrClient = array($this->pServer);
        while(true){
            $arrRead = $arrClient;
            if (stream_select($arrRead, $arrWrite, $arrExp, null) < 1){
                continue;
            }
            if(in_array($this->pServer, $arrRead)){
                $this->_connect();
     
                $arrClient[] = $this->pClient;
     
                $nKey = array_search($this->pServer, $arrRead);
                unset($arrRead[$nKey]);
            }
            foreach($arrRead as $pConnetcion){
                $bRes = $this->_reply();
                if($bRes === false){
                    $nKey = array_search($this->pClient, $arrClient);
                    unset($arrClient[$nKey]);;
                    continue;
                }
            }
        }
        //usleep(100);
    }
}
class EvServerStreamSocket extends ServerStreamSocket{
    protected function _onConnect(){
        $this->_connect();
         
        $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
            $this->_reply();
        });
        Ev::run();
    }
    public function run(){
        $pReadWatcher = new EvIo($this->pServer, Ev::READ, function ($watcher, $revents) {
            $this->_onConnect();
        });
        Ev::run();
    }
}
class MulProcessServerStreamSocket extends EvServerStreamSocket{
    protected function _execute(){
        if(!$this->_reply()){
            //子进程执行完毕,通知父进程
            exit();
        }
    }
    protected function _onConnect(){
        $pid = pcntl_fork();
        if ($pid == -1) {
            die('could not fork');
        } else if ($pid) {
            pcntl_wait($status);
        } else {
            $this->_connect();
 
            $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
                $this->_execute();
            });
            Ev::run();
        }
 
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$strAddress = $strProtocol."://".$strHost.":".$nPort;
 
$pServer = new EvServerStreamSocket($strAddress);
 
$pServer->run();

这里演示客户端与服务端交互,都是两步走,先发送一个请求再获取结果。在Thrift RPC远程调用中,既可先发送请求,过一会儿再来获取结果,达到异步交互的目的;也可发送完请求后立即获取结果,达到同步请求的目的。

参考链接:
Socket Programming in PHP
Socket programming with streams in php
PHP Socket programming tutorial
php 实例说明 socket通信机制
Mpass – PHP做Socket服务的解决方案
How to forcibly close a socket in TIME_WAIT?

PHP RPC开发之Thrift

Apache Thrift是一个跨语言的服务部署框架,通过一个中间语言(IDL, 接口定义语言)来定义RPC的接口和数据类型,然后通过一个编译器生成不同语言的代码(支持C++,Java,Python,PHP, GO,Javascript,Ruby,Erlang,Perl, Haskell, C#等),并由生成的代码负责RPC协议层和传输层的实现。

在CentOS 6.5上安装Thrift

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
sudo yum -y update
sudo yum -y groupinstall "Development Tools"
 
#升级autoconf,必须2.65以上
wget http://ftp.gnu.org/gnu/autoconf/autoconf-2.69.tar.gz
tar xvf autoconf-2.69.tar.gz
cd autoconf-2.69
./configure --prefix=/usr
make
sudo make install
cd ..
 
#升级automake必须1.14以上
wget http://ftp.gnu.org/gnu/automake/automake-1.14.tar.gz
tar xvf automake-1.14.tar.gz
cd automake-1.14
./configure --prefix=/usr
make
sudo make install
cd ..
 
#升级bsion
wget http://ftp.gnu.org/gnu/bison/bison-2.5.1.tar.gz
tar xvf bison-2.5.1.tar.gz
cd bison-2.5.1
./configure --prefix=/usr
make
sudo make install
cd ..
 
#安装boost
wget http://sourceforge.net/projects/boost/files/boost/1.55.0/boost_1_55_0.tar.gz
tar xvf boost_1_55_0.tar.gz
cd boost_1_55_0
./bootstrap.sh
sudo ./b2 install
cd ..
 
#安装thrift,编译会比较久,内存最好1024M以上
git clone https://git-wip-us.apache.org/repos/asf/thrift.git
cd thrift
./bootstrap.sh
./configure
make
sudo make install
cd ..
 
#查看版本
thrift -version
 
#安装thrift_protocol扩展,仅支持二进制读写
cd thrift/lib/php/src/ext/thrift_protocol
phpize
./configure
sudo make
sudo make install
#这里不需要更改php.ini,已自动在/etc/php.d/thrift_protocol.ini里面添加
php -m | grep thrift

Thrift的PHP类库位于thrift/lib/php/lib/Thrift目录下面,Thrift对于数据传输格式、数据传输方式,服务器模型均做了定义,方便自行扩展。

数据传输格式(protocol)是定义的了传输内容,对Thrift Type的打包解包,包括

  • TBinaryProtocol,二进制格式,TBinaryProtocolAccelerated则是依赖于thrift_protocol扩展的快速打包解包。
  • TCompactProtocol,压缩格式
  • TJSONProtocol,JSON格式
  • TMultiplexedProtocol,利用前三种数据格式与支持多路复用协议的服务端(同时提供多个服务,TMultiplexedProcessor)交互

数据传输方式(transport),定义了如何发送(write)和接收(read)数据,包括

  • TBufferedTransport,缓存传输,写入数据并不立即开始传输,直到刷新缓存。
  • TSocket,使用socket传输
  • TFramedTransport,采用分块方式进行传输,具体传输实现依赖其他传输方式,比如TSocket
  • TCurlClient,使用curl与服务端交互
  • THttpClient,采用stream方式与HTTP服务端交互
  • TMemoryBuffer,使用内存方式交换数据
  • TPhpStream,使用PHP标准输入输出流进行传输
  • TNullTransport,关闭数据传输
  • TSocketPool在TSocket基础支持多个服务端管理(需要APC支持),自动剔除无效的服务器
  • TNonblockingSocket,非官方实现非阻塞socket

服务模型,定义了当PHP作为服务端如何监听端口处理请求

  • TForkingServer,采用子进程处理请求
  • TSimpleServer,在TServerSocket基础上处理请求
  • TNonblockingServer,基于libevent的非官方实现非阻塞服务端,与TNonblockingServerSocket,TNonblockingSocket配合使用

另外还定义了一些工厂,以便在Server模式下对数据传输格式和传输方式进行绑定

  • TProtocolFactory,数据传输格式工厂类,对protocol的工厂化生产,包括TBinaryProtocolFactory,TCompactProtocolFactory,TJSONProtocolFactory
  • TTransportFactory,数据传输方式工厂类,对transport的工厂化生产,作为server时,需要自行实现
  • TStringFuncFactory,字符串处理工厂类

其他文件便是异常,字符串处理,自动加载器的定义等等。

现在开始编写一个简单接IDL文件HelloWorld.thrift

1
2
3
4
5
namespace php Services.HelloWorld
service HelloWorld
{
    string sayHello(1:string name);
}

然后通过生成器生成PHP文件

1
2
#不指明:server不生成processor。。
thrift --gen php:server HelloWorld.thrift

生成文件在gen-php目录下面的Services/HelloWord/HelloWorld.php(目录与namesapce定义一致),这是个公共文件,服务端和客户端都需要包括它。其中客户端调用的代码(HelloWorldClient )已经生成好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//服务端需要继承该接口
interface HelloWorldIf {
  /**
   * @param string $name
   * @return string
   */
  public function sayHello($name);
}
//提供给客户端调用的方法
class HelloWorldClient implements \Services\HelloWorld\HelloWorldIf {
  public function sayHello($name)
  {
    $this->send_sayHello($name);
    return $this->recv_sayHello();
  }
  public function send_sayHello($name)
  {
  }
  public function recv_sayHello()
  {
  }
}
//HelloWord类sayHello方法参数读取
class HelloWorld_sayHello_args {
}
//HelloWord类sayHello方法结果写入
class HelloWorld_sayHello_result {
}
//作为服务端才会生成
class HelloWorldProcessor {
}

而服务端的服务实现代码则需要继承HelloWorldIf 实现代码HelloWorldHandler.php

1
2
3
4
5
6
7
8
9
<?php
namespace Services\HelloWorld;
 
class HelloWorldHandler implements HelloWorldIf {
  public function sayHello($name)
  {
      return "Hello $name";
  }
}

编写服务端代码Server.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<?php
namespace Services\HelloWorld;
 
error_reporting(E_ALL);
 
define('THRIFT_ROOT', __DIR__.'/../../../');
require_once  THRIFT_ROOT.'Thrift/ClassLoader/ThriftClassLoader.php';
 
use Thrift\ClassLoader\ThriftClassLoader;
 
$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift',  THRIFT_ROOT);
$loader->registerDefinition('Service',  THRIFT_ROOT.'/gen-php');
$loader->register();
 
use Thrift\Exception\TException;
use Thrift\Factory\TBinaryProtocolFactory;
use Thrift\Factory\TBufferedTransportFactory;
 
use Thrift\Server\TServerSocket;
use Thrift\Server\TSimpleServer;
 
//use Thrift\Server\TNonblockingServerSocket;
//use Thrift\Server\TNonblockingServer;
 
//use Thrift\Protocol\TBinaryProtocol;
//use Thrift\Transport\TPhpStream;
//use Thrift\Transport\TBufferedTransport;
 
 
try {
    require_once 'HelloWorldHandler.php';
    $handler = new \Services\HelloWorld\HelloWorldHandler();
    $processor = new \Services\HelloWorld\HelloWorldProcessor($handler);
     
    $transportFactory = new TBufferedTransportFactory();
    $protocolFactory = new TBinaryProtocolFactory(true, true);
     
    //作为cli方式运行,监听端口,官方实现
    $transport = new TServerSocket('localhost', 9090);
    $server = new TSimpleServer($processor, $transport, $transportFactory, $transportFactory, $protocolFactory, $protocolFactory);
    $server->serve();
     
    //作为cli方式运行,非阻塞方式监听,基于libevent实现,非官方实现
    //$transport = new TNonblockingServerSocket('localhost', 9090);
    //$server = new TNonblockingServer($processor, $transport, $transportFactory, $transportFactory, $protocolFactory, $protocolFactory);
    //$server->serve();
 
    //客户端和服务端在同一个输入输出流上
    //使用方式
    //1) cli 方式:php Client.php | php Server.php
    //2) cgi 方式:利用Apache或nginx监听http请求,调用php-fpm处理,将请求转换为PHP标准输入输出流
    //$transport = new TBufferedTransport(new TPhpStream(TPhpStream::MODE_R | TPhpStream::MODE_W));
    //$protocol = new TBinaryProtocol($transport, true, true);
    //$transport->open();
    //$processor->process($protocol, $protocol);
    //$transport->close();
     
} catch (TException $tx) {
    print 'TException: '.$tx->getMessage()."\n";
}

服务端创建的步骤:

  • 首先初始化服务提供者handler
  • 然后利用该handler初始化自动生成的processor
  • 初始化数据传输方式transport
  • 利用该传输方式初始化数据传输格式protocol
  • 开始服务

编写客户端代码Client.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?php
namespace Services\HelloWorld;
 
error_reporting(E_ALL);
 
define('THRIFT_ROOT', __DIR__.'/../../../');
require_once  THRIFT_ROOT.'Thrift/ClassLoader/ThriftClassLoader.php';
 
use Thrift\ClassLoader\ThriftClassLoader;
 
$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift',  THRIFT_ROOT);
$loader->registerDefinition('Service',  THRIFT_ROOT.'/gen-php');
$loader->register();
 
//use Thrift\Transport\TPhpStream;
 
use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\TBufferedTransport;
use Thrift\Exception\TException;
 
try {
    //仅在与服务端处于同一输出输出流有用
    //使用方式:php Client.php | php Server.php
    //$transport = new TBufferedTransport(new TPhpStream(TPhpStream::MODE_R | TPhpStream::MODE_W));
     
    //socket方式连接服务端
    //数据传输格式和数据传输方式与服务端一一对应
    //如果服务端以http方式提供服务,可以使用THttpClient/TCurlClient数据传输方式
    $transport = new TBufferedTransport(new TSocket('localhost', 9090));
    $protocol = new TBinaryProtocol($transport);
    $client = new \Services\HelloWorld\HelloWorldClient($protocol);
 
    $transport->open();
     
    //同步方式进行交互
    $recv = $client->sayHello('Courages');
    echo "\n sayHello11dd:".$recv." \n";
     
    //异步方式进行交互
    $client->send_sayHello('Us');
    echo "\n send_sayHello \n";
    $recv = $client->recv_sayHello();
    echo "\n recv_sayHello:".$recv." \n";
     
    $transport->close();
} catch (TException $tx) {
    print 'TException: '.$tx->getMessage()."\n";
}
    

客户端调用的步骤:

  • 初始化数据传输方式transport,与服务端对应
  • 利用该传输方式初始化数据传输格式protocol,与服务端对应
  • 实例化自动生成的Client对象
  • 开始调用

在终端上运行

1
2
3
4
5
6
7
8
9
#以cli方式运行TPhpStream
#php Client.php | php Server.php
 
#先运行Server.php
#要不然会报错:TException: TSocket: Could not connect to localhost:9090 (Connection refused [111])
php Server.php
 
#在另外一个终端运行
php Client.ph

官方给的例子,PHP作为服务端是以web方式进行提供的,在cli方式下并不能运行。

Thrift作为一个跨语言的服务框架,方便不同语言、模块之间互相调用,解耦服务逻辑代码,拓展了PHP的处理能力(如与Hbase交互),使得WEB架构更具弹性。与基于 SOAP 消息格式的 Web Service和基于 JSON 消息格式的 RESTful 服务不同,Thrif数据传输格式默认采用二进制传格式,对 XML 和 JSON 体积更小,但对于服务端的CPU占用比JSON、XML要高。PHP虽然有thrift_protocol扩展,但仅仅作为二进制数据传输格式化使用,其他文件的加载仍然为PHP,需要更多的开销。

如果由PHP来做为Thrift的服务端,仅仅这样子做仍然是不够的,Thrift仅仅实现的数据定义和传输,未实现RPC架构

  • 需要避免重复加载各类文件,是否做成PHP扩展
  • 数据传输格式和方式是否适需要自行扩展
  • 客户端要能够自动连可使用的服务端,剔除失效的服务器
  • 服务端需要处理客户端并发情况,是否多进程/异步处理
  • 服务端需要监控服务是否正常

workerman-thrift-rpc对这些问题进行了解决,基于thrift提供了一个可靠性的RPC框架。对客户端和服务端的调用做了封装,提供统一入口,利用workerman做socket中转,当客户端发出请求时,将给socket转给服务端使用,提供服务。workerman-json-rpc与workerman-thrift-rpc类似,采用异步(分步)收发,但简单多了,更像是一种约定。数据格式,发送时仅发送class,function,parameters三个参数,接收时,仅code,msg,data三个返回值,在格式约束及跨语言上,需要自行处理;不需要thrift那样依赖于生成器所生成的文件,客户端完全独立于服务端。

注:以上示例使用修改过的代码,附上代码:thrift

参考链接:
Apache Thrift – 可伸缩的跨语言服务开发框架
Thirft框架介绍
Apache Thrift
Building Apache Thrift on CentOS 6.5
PHP Tutorial
Creating a public API with Apache Thrift
hadoop + Hbase + thrift + php 安裝設定與程式設計
php实现的thrift socket server
Our own “Hello World!”