前一篇介绍了跨语言的服务调用框架Thrift,模块与模块之间调用,网络通信必不可少。这里具体介绍下如何使用PHP socket客户端与服务端进行通信。
PHP 的Socket扩展是基于流行的BSD sockets,实现了和socket通讯功能的底层接口,它可以和通用客户端一样当做一个socket服务器。这里的通用客户端是指stream_socket_*系列封装的函数。
首先写一个socket服务端
<?php
class ServerSocket{
protected $strHost = "127.0.0.1";
protected $nPort = 2015;
protected $nProtocol = SOL_TCP;
protected $pSocket = null;
protected $pClient = null;
public $strErrorCode = "";
public $strErrorMsg = "";
public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
//参数验证
$this->strHost = $p_strHost;
$this->nPort = $p_nPort;
$this->nProtocol = $p_nProtocol;
if($this->_create()&&$this->_bind()){
$this->_listen();
}
}
protected function _create(){
$this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
if(!$this->pSocket){
$this->_log();
}
return $this->pSocket;
}
protected function _bind(){
$bRes = socket_bind($this->pSocket, $this->strHost, $this->nPort);
if(!$bRes){
$this->_log();
}
return $bRes;
}
protected function _listen(){
$bRes = socket_listen($this->pSocket, 10) ;
if(!$bRes){
$this->_log();
}
return $bRes;
}
public function accept(){
$this->pClient = socket_accept($this->pSocket);
if(!$this->pClient){
$this->_log();
}
return $this->pClient;
}
protected function _connect(){
$this->accept();
if(socket_getpeername($this->pClient, $address, $port)){
echo "Client $address : $port is now connected to us. \n";
}
$this->write("hello world from server\n");
}
protected function _reply(){
$mxData = $this->read();
var_dump($mxData);
if ($mxData == false) {
socket_close($this->pClient);
echo "client disconnected.\n";
return false;
}
else{
$strMessage = "Client: ".trim($mxData)."\n";
$this->write($strMessage);
return true;
}
}
public function run(){
$this->_connect();
$this->_reply();
}
public function read(){
$mxMessage = socket_read($this->pClient, 1024, PHP_BINARY_READ);
if($mxMessage === false){
$this->_log();
}
return $mxMessage;
}
public function write($p_strMessage){
$bRes = socket_write($this->pClient, $p_strMessage, strlen ($p_strMessage));
if(!$bRes){
$this->_log();
}
return $bRes;
}
public function close(){
$bRes = socket_close($this->pSocket);
$this->pSocket = null;
}
protected function _log(){
$this->strErrorCode = socket_last_error();
$this->strErrorMsg = socket_strerror($this->strErrorCode);
//throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
}
public function __destruct(){
if($this->pSocket){
$this->close();
}
}
}
$strHost = "127.0.0.1";
$nPort = 25003;
$strProtocol = "tcp";
$pServer = new ServerSocket($strHost, $nPort);
$pServer->run();
这里对socket_*系列函数进行了包装,创建socket服务端的步骤
- 使用socket_create创建socket(套接字)。第一个参数AF_INET指IPV4网络协议,TCP和UDP均可使用,对应IPV6网络协议为AF_INET6,也可以使用UNIX socket协议AF_UNIX,作进程间通信
。第二个参数对应套接字类型,SOCK_STREAM对应TCP协议使用,SOCK_DGRAM对应UDP协议使用,还有SOCK_SEQPACKET,SOCK_RAW,SOCK_RDM等类型。第三个为协议类型,TCP协议对应常量SOL_TCP,UDP协议对应常量SOL_UDP,其他协议可以从getprotobyname函数获取。
- 使用socket_bind将套接字绑定到对应的主机端口或者UNIX socket上
- 使用socket_listen监听该套接字上的连接
- 使用socket_accept接收套接字上的请求连接,返回一个新的套接字用于与客户端通信。如果没有连接接入,将会阻塞住;如果有多个连接,使用第一个达到的连接。
- 开始通信,使用socket_read获取请求信息,使用socket_write返回响应结果
- 使用socket_close关闭连接,包括原始的和socket_accept产生的套接字
这个过程中,可以使用socket_last_error和socket_strerror获取错误信息。接着创建客户端
<?php
class ClientSocket{
protected $strHost = "127.0.0.1";
protected $nPort = 2015;
protected $nProtocol = SOL_TCP;
private $pSocket = null;
public $strErrorCode = "";
public $strErrorMsg = "";
public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
//参数验证
$this->strHost = $p_strHost;
$this->nPort = $p_nPort;
$this->nProtocol = $p_nProtocol;
if($this->_create()){
$this->_connect();
}
}
private function _create(){
$this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
if(!$this->pSocket){
$this->_log();
}
return $this->pSocket;
}
private function _connect(){
$pSocket = $this->_create();
$bRes = socket_connect($pSocket, $this->strHost, $this->nPort);
if(!$bRes){
$this->_log();
}
return $bRes;
}
public function read(){
$strMessage = "";
$strBuffer = "";
while ($strBuffer = socket_read ($this->pSocket, 1024, PHP_BINARY_READ)) {
$strMessage .= $strBuffer;
}
return $strMessage;
}
public function write($p_strMessage){
$bRes = socket_write($this->pSocket, $p_strMessage, strlen($p_strMessage));
if(!$bRes){
$this->_log();
}
}
public function send($p_strMessage){
$bRes = socket_send($this->pSocket , $p_strMessage , strlen($p_strMessage) , 0);
if(!$bRes){
$this->_log();
}
return true;
}
public function recv(){
$strMessage = "";
$strBuffer = "";
$bRes = socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL);
if(!$bRes){
$this->_log();
}
$strMessage .=$strBuffer;
return $strMessage;
}
public function close(){
$bRes = socket_close($this->pSocket);
$this->pSocket = null;
}
private function _log(){
$this->strErrorCode = socket_last_error();
$this->strErrorMsg = socket_strerror($this->strErrorCode);
//throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
}
public function __destruct(){
if($this->pSocket){
$this->close();
}
}
}
$strHost = "127.0.0.1";
$nPort = 25003;
$strProtocol = "tcp";
//$nProtocol = getprotobyname($strProtocol);
$pClient = new ClientSocket($strHost, $nPort);
var_dump($pClient->read());
$strMessage = 'Some Thing :'.uniqid();
var_dump($strMessage);
$pClient->write($strMessage);
var_dump($pClient->read());
/*
var_dump($pClient->recv());
$pClient->send('hello');
var_dump($pClient->recv());
*/
$pClient->close();
客户端的创建步骤:
- 使用socket_create创建socket套接字,与服务端对应
- 使用socket_connect连接到服务端的地址或UNIX socket
- 开始通信,可以使用socket_write和socket_read向套接字写入和读取信息,也可以使用socket_send和socket_recv发送和接收信息
- 使用socket_close关闭连接
运行服务端程序
php serversocket.php
在另一个终端里运行
[root@vagrant socket]# netstat -apn | grep 25003
tcp 0 0 127.0.0.1:25003 0.0.0.0:* LISTEN 12139/php
如果运行服务端失败,提示 socket_bind(): unable to bind address [98]: Address already in use ,则是端口绑定失败。查看端口占用
[root@vagrant socket]# netstat -apn | grep 25003
tcp 0 0 127.0.0.1:25003 127.0.0.1:36618 TIME_WAIT -
该端口处于TIME_WAIT状态,需要再等一会儿才会释放。这是因为TCP连接关闭需要四次握手,服务端主动关闭了连接,但是未收到客户端发过来的关闭确认,导致处于等待状态,具体原因见火丁笔记《再叙TIME_WAIT》。
如果服务端已经运行成功,在另一个终端里运行客户端程序
php clientsocket.php
这是一个简单服务端/客户端请求应答模型,通常服务端会一直处于监听状态,处理新的请求,重新写一个循环监听的服务端
class SelectServerSocket extends ServerSocket{
public function run(){
$this->loop();
}
public function loop(){
$arrRead = array();
$arrWrite = $arrExp = null;
$arrClient = array($this->pSocket);
while(true){
$arrRead = $arrClient;
if (socket_select($arrRead, $arrWrite, $arrExp, null) < 1){
continue;
}
foreach ($arrRead as $pSocket){
if($pSocket == $this->pSocket){
$this->_connect();
$arrClient[] = $this->pClient;
}
else{
$bRes = $this->_reply();
if($bRes === false){
$nKey = array_search($this->pClient, $arrClient);
unset($arrClient[$nKey]);
continue;
}
}
}
}
//usleep(100);
}
}
$strHost = "127.0.0.1";
$nPort = 25003;
$strProtocol = "tcp";
$pServer = new SelectServerSocket($strHost, $nPort);
$pServer->run();
在循环里面使用socket_select查询有可以读的套接字,如果套接字为原始监听的套接字,则使用socket_accept获取新接入的通信套接字进行通信;如果是通信套接字,则与客户端进行交互。
这里socket_select($arrRead, $arrWrite, $arrExp, null)的第四个参数为null,表示可以无限阻塞,如果为0则不阻塞立即返回,其他大于0值则等待超时。
socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL)的第四个参数为MSG_WAITALL,表示阻塞读取结果。
socket_read ($this->pSocket, 1024, PHP_BINARY_READ )的第三个参数PHP_BINARY_READ表示读取以\0结束,PHP_NORMAL_READ表示读取以\n或\r结束
在终端里运行服务端,会一直在那里等待新的连接。这时候运行客户端,客户端确也阻塞住了。解决的办法有两种:超时设置和非阻塞设置。给ClientSocket类增加超时和阻塞设置的方法
public function setTimeOut($p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
$nSend = (int)$p_nSendTimeOut;
$nRecv = (int)$p_nRecvTimeOut;
$arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
$arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
socket_set_option($this->pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
socket_set_option($this->pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}
public function setBlock($p_bType = true){
if($p_bType){
socket_set_block($this->pSocket);
}
else{
socket_set_nonblock($this->pSocket);
}
}
客户端端运行前,先设置一下超时或非阻塞即,此时程序不会再阻塞了
$pClient = new ClientSocket($strHost, $nPort);
$pClient->setTimeOut(1, 1);
//$pClient->setBlock(false);
//do request here
同样在服务端设置超时和非阻塞也是可以的,给ServerSocket增加超时和非阻塞设置的方法
protected function _setNoBlock($p_pSocket){
socket_set_nonblock($p_pSocket);
}
protected function _setTimeOut($p_pSocket, $p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
$nSend = (int)$p_nSendTimeOut;
$nRecv = (int)$p_nRecvTimeOut;
$arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
$arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
socket_set_option($p_pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
socket_set_option($p_pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}
将SelectServerSocket的socket_accept后产生的连接设置为非阻塞
public function loop(){
$arrRead = array();
$arrWrite = $arrExp = null;
$arrClient = array($this->pSocket);
$this->_setNoBlock($this->pSocket);
while(true){
再次运行服务端,客户端也不会阻塞了。
在while循环里面使用socket_select进行查询,效率比较低下,有先的连接要等下次循环才能处理;有时候并没有连接需要处理,也一直在循环。可以结合前面介绍过的PHP Libev扩展进行监听
class EvServerSocket extends ServerSocket{
protected function _onConnect(){
$this->_connect();
$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
$this->_reply();
});
Ev::run();
}
public function run(){
$pReadWatcher = new EvIo($this->pSocket, Ev::READ, function ($watcher, $revents) {
$this->_onConnect();
});
Ev::run();
}
}
$strHost = "127.0.0.1";
$nPort = 25003;
$strProtocol = "tcp";
$pServer = new EvServerSocket($strHost, $nPort);
$pServer->run();
代码看起来简单了很多。当原始套接字监听到可读事件时,便为新的套接字也创建可读事件监听 ,在事件里面处理新的连接。
通常的服务端程序是一个进程监听原始套接字,然后交由其他进程/线程处理新的连接套接字,与客户端进行交互,提升服务端性能。这样子又涉及到了多进程/多线程的控制、通信,需要一套完善的体系才行。
class MulProcessServerSocket extends EvServerSocket{
protected function _execute(){
if(!$this->_reply()){
//子进程执行完毕,通知父进程
exit();
}
}
protected function _onConnect(){
$pid = pcntl_fork();
//父进程和子进程都会执行下面代码
if ($pid == -1) {
//错误处理:创建子进程失败时返回-1.
die('could not fork');
} else if ($pid) {
//父进程会得到子进程号,所以这里是父进程执行的逻辑
pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
} else {
//子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
$this->_connect();
$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
$this->_execute();
});
Ev::run();
}
}
}
还可以使用stream_socket_*系列函数来创建sockt服务端和客户端。类似的创建一个客户端与之前的服务端进行交互
<?php
class ClientStreamSocket{
private $pConnetion = null;
protected $strAddress = "tcp://127.0.0.1:2016";
protected $nTimeOut = 3;
protected $nFlag = STREAM_CLIENT_CONNECT;
public $strErrorCode = "";
public $strErrorMsg = "";
const BLOCK = 1;
const NOBLOCK = 0;
public function __construct($p_strAddress, $p_nTimeOut = 3, $p_nFlag = STREAM_CLIENT_CONNECT){
$this->strAddress = $p_strAddress;
$this->nTimeOut = $p_nTimeOut;
$this->nFlag = $p_nFlag;
$this->_connect();
}
private function _connect(){
$this->pConnetion = stream_socket_client($this->strAddress, $this->strErrorCode, $this->strErrorMsg, $this->nTimeOut, $this->nFlag);
if(!$this->pConnetion){
throw new Exception("connect exception:".$this->strErrorMsg, $this->strErrorCode);
}
return $this->pConnetion;
}
public function write($p_strMessage){
if(fwrite($this->pConnetion, $p_strMessage) !== strlen($p_strMessage))
{
throw new Exception('Can not send data');
}
return true;
}
public function read(){
//接收一行,阻塞至\n结束
//$strMessage = fgets($this->pConnetion);
//指定长度读取
//$strMessage = fread($this->pConnetion, 1024);
$strMessage = stream_socket_recvfrom($this->pConnetion, 1024);
//$strMessage = stream_get_contents($this->pConnetion);
return $strMessage;
}
public function close(){
fclose($this->pConnetion);
$this->pConnetion = null;
}
public function setContext(){
}
public function setTimeOut($p_nTimeOut = 1){
$bRes = stream_set_timeout($this->pConnetion, $p_nTimeOut);
}
public function setBlock($p_nMode = ClientStreamSocket::BLOCK){
$bRes = stream_set_blocking($this->pConnetion, $p_nMode);
}
public function __destruct(){
if($this->pConnetion){
$this->close();
}
}
}
$strHost = "127.0.0.1";
$nPort = 25003;
$strProtocol = "tcp";
//$nProtocol = getprotobyname($strProtocol);
$strAddress = $strProtocol."://".$strHost.":".$nPort;
$pStream = new ClientStreamSocket($strAddress);
//$pStream->setBlock(ClientStreamSocket::NOBLOCK);
//$pStream->setTimeOut(1);
var_dump($pStream->read());
$pStream->write("hello from client\n");
var_dump($pStream->read());
$pStream->close();
使用stream_socket_*系列函数创建客户端要简单不少
- 首先使用stream_socket_client创建一个socket操作流(stream)
- 然后就可以像操作流式文件那样造成socket stream,使用fread和fwrite进行读写操作,也可以使用stream_socket_recvfrom和stream_socket_sendto进行操作
- 使用fclose或stream_socket_shutdown关闭连接
使用stream_socket_*系列函数创建一个服务端来与之前的客户端进行交互,同样很简单,也与ServerSocket类似
<?php
class ServerStreamSocket{
protected $pServer = null;
protected $pClient = null;
protected $strAddress = "tcp://127.0.0.1:2016";
protected $nFlag = STREAM_SERVER_LISTEN;
const BLOCK = 1;
const NOBLOCK = 0;
public $strErrorCode = "";
public $strErrorMsg = "";
public function __construct($p_strAddress, $p_nFlag = STREAM_SERVER_LISTEN){
$this->strAddress = $p_strAddress;
$this->nFlag = $p_nFlag;
$this->_create();
}
protected function _create(){
$this->pServer = stream_socket_server($this->strAddress, $this->strErrorCode, $this->strErrorMsg);
if(!$this->pServer ){
throw new Exception("create exception:".$this->strErrorMsg, $this->strErrorCode);
}
return $this->pServer ;
}
public function accept(){
$this->pClient = stream_socket_accept($this->pServer);
if(!$this->pClient ){
return false;
}
return $this->pClient ;
}
protected function _connect(){
$this->accept();
echo "Client". stream_socket_get_name($this->pClient, true)." is now connected to us. \n";
$this->write("hello world from server\n");
}
protected function _reply(){
$mxData = $this->read();
var_dump($mxData);
if($mxData == false){
fclose($this->pClient);
echo "client disconnected.\n";
return false;
}
else{
$strMessage = "Client:".trim($mxData)."\n";
$this->write($strMessage);
return true;
}
}
public function run(){
$this->_connect();
$this->_reply();
}
public function write($p_strMessage){
//$nLen = fwrite($this->pClient, $p_strMessage);
$nLen = stream_socket_sendto($this->pClient, $p_strMessage);
if($nLen !== strlen($p_strMessage))
{
throw new Exception('Can not send data');
}
return true;
}
public function read(){
//接收一行,阻塞至\n结束
//$strMessage = fgets($this->pClient);
//指定长度读取
//$strMessage = fread($this->pClient, 1024);
$strMessage = stream_socket_recvfrom($this->pClient, 1024);
//$strMessage = stream_get_contents($this->pClient);
return $strMessage;
}
public function close(){
fclose($this->pServer);
$this->pServer = null;
}
public function setContext(){
}
public function setTimeOut($p_pConnetction, $p_nTimeOut = 1){
$bRes = stream_set_timeout($p_pConnetction, $p_nTimeOut);
}
public function setBlock($p_pConnetction, $p_nMode = ServerStreamSocket::BLOCK){
$bRes = stream_set_blocking($p_pConnetction, $p_nMode);
}
public function __destruct(){
if($this->pServer){
$this->close();
}
}
}
class SelectServerStreamSocket extends ServerStreamSocket{
public function run(){
$this->loop();
}
public function loop(){
$arrRead = array();
$arrWrite = $arrExp = null;
$arrClient = array($this->pServer);
while(true){
$arrRead = $arrClient;
if (stream_select($arrRead, $arrWrite, $arrExp, null) < 1){
continue;
}
if(in_array($this->pServer, $arrRead)){
$this->_connect();
$arrClient[] = $this->pClient;
$nKey = array_search($this->pServer, $arrRead);
unset($arrRead[$nKey]);
}
foreach($arrRead as $pConnetcion){
$bRes = $this->_reply();
if($bRes === false){
$nKey = array_search($this->pClient, $arrClient);
unset($arrClient[$nKey]);;
continue;
}
}
}
//usleep(100);
}
}
class EvServerStreamSocket extends ServerStreamSocket{
protected function _onConnect(){
$this->_connect();
$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
$this->_reply();
});
Ev::run();
}
public function run(){
$pReadWatcher = new EvIo($this->pServer, Ev::READ, function ($watcher, $revents) {
$this->_onConnect();
});
Ev::run();
}
}
class MulProcessServerStreamSocket extends EvServerStreamSocket{
protected function _execute(){
if(!$this->_reply()){
//子进程执行完毕,通知父进程
exit();
}
}
protected function _onConnect(){
$pid = pcntl_fork();
if ($pid == -1) {
die('could not fork');
} else if ($pid) {
pcntl_wait($status);
} else {
$this->_connect();
$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
$this->_execute();
});
Ev::run();
}
}
}
$strHost = "127.0.0.1";
$nPort = 25003;
$strProtocol = "tcp";
//$nProtocol = getprotobyname($strProtocol);
$strAddress = $strProtocol."://".$strHost.":".$nPort;
$pServer = new EvServerStreamSocket($strAddress);
$pServer->run();
这里演示客户端与服务端交互,都是两步走,先发送一个请求再获取结果。在Thrift RPC远程调用中,既可先发送请求,过一会儿再来获取结果,达到异步交互的目的;也可发送完请求后立即获取结果,达到同步请求的目的。
参考链接:
Socket Programming in PHP
Socket programming with streams in php
PHP Socket programming tutorial
php 实例说明 socket通信机制
Mpass – PHP做Socket服务的解决方案
How to forcibly close a socket in TIME_WAIT?