前一篇介绍了跨语言的服务调用框架Thrift,模块与模块之间调用,网络通信必不可少。这里具体介绍下如何使用PHP socket客户端与服务端进行通信。
PHP 的Socket扩展是基于流行的BSD sockets,实现了和socket通讯功能的底层接口,它可以和通用客户端一样当做一个socket服务器。这里的通用客户端是指stream_socket_*系列封装的函数。
<?php class ServerSocket{ protected $strHost = ""; protected $nPort = 2015; protected $nProtocol = SOL_TCP; protected $pSocket = null; protected $pClient = null; public $strErrorCode = ""; public $strErrorMsg = ""; public function __construct($p_strHost = "", $p_nPort =2015, $p_nProtocol = SOL_TCP){ //参数验证 $this->strHost = $p_strHost; $this->nPort = $p_nPort; $this->nProtocol = $p_nProtocol; if($this->_create()&&$this->_bind()){ $this->_listen(); } } protected function _create(){ $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol); if(!$this->pSocket){ $this->_log(); } return $this->pSocket; } protected function _bind(){ $bRes = socket_bind($this->pSocket, $this->strHost, $this->nPort); if(!$bRes){ $this->_log(); } return $bRes; } protected function _listen(){ $bRes = socket_listen($this->pSocket, 10) ; if(!$bRes){ $this->_log(); } return $bRes; } public function accept(){ $this->pClient = socket_accept($this->pSocket); if(!$this->pClient){ $this->_log(); } return $this->pClient; } protected function _connect(){ $this->accept(); if(socket_getpeername($this->pClient, $address, $port)){ echo "Client $address : $port is now connected to us. \n"; } $this->write("hello world from server\n"); } protected function _reply(){ $mxData = $this->read(); var_dump($mxData); if ($mxData == false) { socket_close($this->pClient); echo "client disconnected.\n"; return false; } else{ $strMessage = "Client: ".trim($mxData)."\n"; $this->write($strMessage); return true; } } public function run(){ $this->_connect(); $this->_reply(); } public function read(){ $mxMessage = socket_read($this->pClient, 1024, PHP_BINARY_READ); if($mxMessage === false){ $this->_log(); } return $mxMessage; } public function write($p_strMessage){ $bRes = socket_write($this->pClient, $p_strMessage, strlen ($p_strMessage)); if(!$bRes){ $this->_log(); } return $bRes; } public function close(){ $bRes = socket_close($this->pSocket); $this->pSocket = null; } protected function _log(){ $this->strErrorCode = socket_last_error(); $this->strErrorMsg = socket_strerror($this->strErrorCode); //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode); } public function __destruct(){ if($this->pSocket){ $this->close(); } } } $strHost = ""; $nPort = 25003; $strProtocol = "tcp"; $pServer = new ServerSocket($strHost, $nPort); $pServer->run();
- 使用socket_create创建socket(套接字)。第一个参数AF_INET指IPV4网络协议,TCP和UDP均可使用,对应IPV6网络协议为AF_INET6,也可以使用UNIX socket协议AF_UNIX,作进程间通信
- 使用socket_bind将套接字绑定到对应的主机端口或者UNIX socket上
- 使用socket_listen监听该套接字上的连接
- 使用socket_accept接收套接字上的请求连接,返回一个新的套接字用于与客户端通信。如果没有连接接入,将会阻塞住;如果有多个连接,使用第一个达到的连接。
- 开始通信,使用socket_read获取请求信息,使用socket_write返回响应结果
- 使用socket_close关闭连接,包括原始的和socket_accept产生的套接字
<?php class ClientSocket{ protected $strHost = ""; protected $nPort = 2015; protected $nProtocol = SOL_TCP; private $pSocket = null; public $strErrorCode = ""; public $strErrorMsg = ""; public function __construct($p_strHost = "", $p_nPort =2015, $p_nProtocol = SOL_TCP){ //参数验证 $this->strHost = $p_strHost; $this->nPort = $p_nPort; $this->nProtocol = $p_nProtocol; if($this->_create()){ $this->_connect(); } } private function _create(){ $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol); if(!$this->pSocket){ $this->_log(); } return $this->pSocket; } private function _connect(){ $pSocket = $this->_create(); $bRes = socket_connect($pSocket, $this->strHost, $this->nPort); if(!$bRes){ $this->_log(); } return $bRes; } public function read(){ $strMessage = ""; $strBuffer = ""; while ($strBuffer = socket_read ($this->pSocket, 1024, PHP_BINARY_READ)) { $strMessage .= $strBuffer; } return $strMessage; } public function write($p_strMessage){ $bRes = socket_write($this->pSocket, $p_strMessage, strlen($p_strMessage)); if(!$bRes){ $this->_log(); } } public function send($p_strMessage){ $bRes = socket_send($this->pSocket , $p_strMessage , strlen($p_strMessage) , 0); if(!$bRes){ $this->_log(); } return true; } public function recv(){ $strMessage = ""; $strBuffer = ""; $bRes = socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL); if(!$bRes){ $this->_log(); } $strMessage .=$strBuffer; return $strMessage; } public function close(){ $bRes = socket_close($this->pSocket); $this->pSocket = null; } private function _log(){ $this->strErrorCode = socket_last_error(); $this->strErrorMsg = socket_strerror($this->strErrorCode); //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode); } public function __destruct(){ if($this->pSocket){ $this->close(); } } } $strHost = ""; $nPort = 25003; $strProtocol = "tcp"; //$nProtocol = getprotobyname($strProtocol); $pClient = new ClientSocket($strHost, $nPort); var_dump($pClient->read()); $strMessage = 'Some Thing :'.uniqid(); var_dump($strMessage); $pClient->write($strMessage); var_dump($pClient->read()); /* var_dump($pClient->recv()); $pClient->send('hello'); var_dump($pClient->recv()); */ $pClient->close();
- 使用socket_create创建socket套接字,与服务端对应
- 使用socket_connect连接到服务端的地址或UNIX socket
- 开始通信,可以使用socket_write和socket_read向套接字写入和读取信息,也可以使用socket_send和socket_recv发送和接收信息
- 使用socket_close关闭连接
php serversocket.php
[root@vagrant socket]# netstat -apn | grep 25003 tcp 0 0* LISTEN 12139/php
如果运行服务端失败,提示 socket_bind(): unable to bind address [98]: Address already in use ,则是端口绑定失败。查看端口占用
[root@vagrant socket]# netstat -apn | grep 25003 tcp 0 0 TIME_WAIT -
php clientsocket.php
class SelectServerSocket extends ServerSocket{ public function run(){ $this->loop(); } public function loop(){ $arrRead = array(); $arrWrite = $arrExp = null; $arrClient = array($this->pSocket); while(true){ $arrRead = $arrClient; if (socket_select($arrRead, $arrWrite, $arrExp, null) < 1){ continue; } foreach ($arrRead as $pSocket){ if($pSocket == $this->pSocket){ $this->_connect(); $arrClient[] = $this->pClient; } else{ $bRes = $this->_reply(); if($bRes === false){ $nKey = array_search($this->pClient, $arrClient); unset($arrClient[$nKey]); continue; } } } } //usleep(100); } } $strHost = ""; $nPort = 25003; $strProtocol = "tcp"; $pServer = new SelectServerSocket($strHost, $nPort); $pServer->run();
这里socket_select($arrRead, $arrWrite, $arrExp, null)的第四个参数为null,表示可以无限阻塞,如果为0则不阻塞立即返回,其他大于0值则等待超时。
socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL)的第四个参数为MSG_WAITALL,表示阻塞读取结果。
socket_read ($this->pSocket, 1024, PHP_BINARY_READ )的第三个参数PHP_BINARY_READ表示读取以\0结束,PHP_NORMAL_READ表示读取以\n或\r结束
public function setTimeOut($p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){ $nSend = (int)$p_nSendTimeOut; $nRecv = (int)$p_nRecvTimeOut; $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000)); $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000)); socket_set_option($this->pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend); socket_set_option($this->pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv); } public function setBlock($p_bType = true){ if($p_bType){ socket_set_block($this->pSocket); } else{ socket_set_nonblock($this->pSocket); } }
$pClient = new ClientSocket($strHost, $nPort); $pClient->setTimeOut(1, 1); //$pClient->setBlock(false); //do request here
protected function _setNoBlock($p_pSocket){ socket_set_nonblock($p_pSocket); } protected function _setTimeOut($p_pSocket, $p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){ $nSend = (int)$p_nSendTimeOut; $nRecv = (int)$p_nRecvTimeOut; $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000)); $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000)); socket_set_option($p_pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend); socket_set_option($p_pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv); }
public function loop(){ $arrRead = array(); $arrWrite = $arrExp = null; $arrClient = array($this->pSocket); $this->_setNoBlock($this->pSocket); while(true){
在while循环里面使用socket_select进行查询,效率比较低下,有先的连接要等下次循环才能处理;有时候并没有连接需要处理,也一直在循环。可以结合前面介绍过的PHP Libev扩展进行监听
class EvServerSocket extends ServerSocket{ protected function _onConnect(){ $this->_connect(); $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) { $this->_reply(); }); Ev::run(); } public function run(){ $pReadWatcher = new EvIo($this->pSocket, Ev::READ, function ($watcher, $revents) { $this->_onConnect(); }); Ev::run(); } } $strHost = ""; $nPort = 25003; $strProtocol = "tcp"; $pServer = new EvServerSocket($strHost, $nPort); $pServer->run();
代码看起来简单了很多。当原始套接字监听到可读事件时,便为新的套接字也创建可读事件监听 ,在事件里面处理新的连接。
class MulProcessServerSocket extends EvServerSocket{ protected function _execute(){ if(!$this->_reply()){ //子进程执行完毕,通知父进程 exit(); } } protected function _onConnect(){ $pid = pcntl_fork(); //父进程和子进程都会执行下面代码 if ($pid == -1) { //错误处理:创建子进程失败时返回-1. die('could not fork'); } else if ($pid) { //父进程会得到子进程号,所以这里是父进程执行的逻辑 pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。 } else { //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。 $this->_connect(); $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) { $this->_execute(); }); Ev::run(); } } }
<?php class ClientStreamSocket{ private $pConnetion = null; protected $strAddress = "tcp://"; protected $nTimeOut = 3; protected $nFlag = STREAM_CLIENT_CONNECT; public $strErrorCode = ""; public $strErrorMsg = ""; const BLOCK = 1; const NOBLOCK = 0; public function __construct($p_strAddress, $p_nTimeOut = 3, $p_nFlag = STREAM_CLIENT_CONNECT){ $this->strAddress = $p_strAddress; $this->nTimeOut = $p_nTimeOut; $this->nFlag = $p_nFlag; $this->_connect(); } private function _connect(){ $this->pConnetion = stream_socket_client($this->strAddress, $this->strErrorCode, $this->strErrorMsg, $this->nTimeOut, $this->nFlag); if(!$this->pConnetion){ throw new Exception("connect exception:".$this->strErrorMsg, $this->strErrorCode); } return $this->pConnetion; } public function write($p_strMessage){ if(fwrite($this->pConnetion, $p_strMessage) !== strlen($p_strMessage)) { throw new Exception('Can not send data'); } return true; } public function read(){ //接收一行,阻塞至\n结束 //$strMessage = fgets($this->pConnetion); //指定长度读取 //$strMessage = fread($this->pConnetion, 1024); $strMessage = stream_socket_recvfrom($this->pConnetion, 1024); //$strMessage = stream_get_contents($this->pConnetion); return $strMessage; } public function close(){ fclose($this->pConnetion); $this->pConnetion = null; } public function setContext(){ } public function setTimeOut($p_nTimeOut = 1){ $bRes = stream_set_timeout($this->pConnetion, $p_nTimeOut); } public function setBlock($p_nMode = ClientStreamSocket::BLOCK){ $bRes = stream_set_blocking($this->pConnetion, $p_nMode); } public function __destruct(){ if($this->pConnetion){ $this->close(); } } } $strHost = ""; $nPort = 25003; $strProtocol = "tcp"; //$nProtocol = getprotobyname($strProtocol); $strAddress = $strProtocol."://".$strHost.":".$nPort; $pStream = new ClientStreamSocket($strAddress); //$pStream->setBlock(ClientStreamSocket::NOBLOCK); //$pStream->setTimeOut(1); var_dump($pStream->read()); $pStream->write("hello from client\n"); var_dump($pStream->read()); $pStream->close();
- 首先使用stream_socket_client创建一个socket操作流(stream)
- 然后就可以像操作流式文件那样造成socket stream,使用fread和fwrite进行读写操作,也可以使用stream_socket_recvfrom和stream_socket_sendto进行操作
- 使用fclose或stream_socket_shutdown关闭连接
<?php class ServerStreamSocket{ protected $pServer = null; protected $pClient = null; protected $strAddress = "tcp://"; protected $nFlag = STREAM_SERVER_LISTEN; const BLOCK = 1; const NOBLOCK = 0; public $strErrorCode = ""; public $strErrorMsg = ""; public function __construct($p_strAddress, $p_nFlag = STREAM_SERVER_LISTEN){ $this->strAddress = $p_strAddress; $this->nFlag = $p_nFlag; $this->_create(); } protected function _create(){ $this->pServer = stream_socket_server($this->strAddress, $this->strErrorCode, $this->strErrorMsg); if(!$this->pServer ){ throw new Exception("create exception:".$this->strErrorMsg, $this->strErrorCode); } return $this->pServer ; } public function accept(){ $this->pClient = stream_socket_accept($this->pServer); if(!$this->pClient ){ return false; } return $this->pClient ; } protected function _connect(){ $this->accept(); echo "Client". stream_socket_get_name($this->pClient, true)." is now connected to us. \n"; $this->write("hello world from server\n"); } protected function _reply(){ $mxData = $this->read(); var_dump($mxData); if($mxData == false){ fclose($this->pClient); echo "client disconnected.\n"; return false; } else{ $strMessage = "Client:".trim($mxData)."\n"; $this->write($strMessage); return true; } } public function run(){ $this->_connect(); $this->_reply(); } public function write($p_strMessage){ //$nLen = fwrite($this->pClient, $p_strMessage); $nLen = stream_socket_sendto($this->pClient, $p_strMessage); if($nLen !== strlen($p_strMessage)) { throw new Exception('Can not send data'); } return true; } public function read(){ //接收一行,阻塞至\n结束 //$strMessage = fgets($this->pClient); //指定长度读取 //$strMessage = fread($this->pClient, 1024); $strMessage = stream_socket_recvfrom($this->pClient, 1024); //$strMessage = stream_get_contents($this->pClient); return $strMessage; } public function close(){ fclose($this->pServer); $this->pServer = null; } public function setContext(){ } public function setTimeOut($p_pConnetction, $p_nTimeOut = 1){ $bRes = stream_set_timeout($p_pConnetction, $p_nTimeOut); } public function setBlock($p_pConnetction, $p_nMode = ServerStreamSocket::BLOCK){ $bRes = stream_set_blocking($p_pConnetction, $p_nMode); } public function __destruct(){ if($this->pServer){ $this->close(); } } } class SelectServerStreamSocket extends ServerStreamSocket{ public function run(){ $this->loop(); } public function loop(){ $arrRead = array(); $arrWrite = $arrExp = null; $arrClient = array($this->pServer); while(true){ $arrRead = $arrClient; if (stream_select($arrRead, $arrWrite, $arrExp, null) < 1){ continue; } if(in_array($this->pServer, $arrRead)){ $this->_connect(); $arrClient[] = $this->pClient; $nKey = array_search($this->pServer, $arrRead); unset($arrRead[$nKey]); } foreach($arrRead as $pConnetcion){ $bRes = $this->_reply(); if($bRes === false){ $nKey = array_search($this->pClient, $arrClient); unset($arrClient[$nKey]);; continue; } } } //usleep(100); } } class EvServerStreamSocket extends ServerStreamSocket{ protected function _onConnect(){ $this->_connect(); $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) { $this->_reply(); }); Ev::run(); } public function run(){ $pReadWatcher = new EvIo($this->pServer, Ev::READ, function ($watcher, $revents) { $this->_onConnect(); }); Ev::run(); } } class MulProcessServerStreamSocket extends EvServerStreamSocket{ protected function _execute(){ if(!$this->_reply()){ //子进程执行完毕,通知父进程 exit(); } } protected function _onConnect(){ $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { pcntl_wait($status); } else { $this->_connect(); $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) { $this->_execute(); }); Ev::run(); } } } $strHost = ""; $nPort = 25003; $strProtocol = "tcp"; //$nProtocol = getprotobyname($strProtocol); $strAddress = $strProtocol."://".$strHost.":".$nPort; $pServer = new EvServerStreamSocket($strAddress); $pServer->run();
这里演示客户端与服务端交互,都是两步走,先发送一个请求再获取结果。在Thrift RPC远程调用中,既可先发送请求,过一会儿再来获取结果,达到异步交互的目的;也可发送完请求后立即获取结果,达到同步请求的目的。
Socket Programming in PHP
Socket programming with streams in php
PHP Socket programming tutorial
php 实例说明 socket通信机制
Mpass – PHP做Socket服务的解决方案
How to forcibly close a socket in TIME_WAIT?