标签归档:libev

PHP Socket通信

前一篇介绍了跨语言的服务调用框架Thrift,模块与模块之间调用,网络通信必不可少。这里具体介绍下如何使用PHP socket客户端与服务端进行通信。

PHP 的Socket扩展是基于流行的BSD sockets,实现了和socket通讯功能的底层接口,它可以和通用客户端一样当做一个socket服务器。这里的通用客户端是指stream_socket_*系列封装的函数。

首先写一个socket服务端

<?php
class ServerSocket{
	protected $strHost = "127.0.0.1";
	protected $nPort = 2015;
	protected $nProtocol = SOL_TCP;
	protected $pSocket = null;
	protected $pClient = null;

	public $strErrorCode = "";
	public $strErrorMsg  = "";
	public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
		//参数验证
		$this->strHost = $p_strHost;
		$this->nPort = $p_nPort;
		$this->nProtocol = $p_nProtocol;
		if($this->_create()&&$this->_bind()){
			$this->_listen();
		}
	}
	protected function _create(){
		$this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
		if(!$this->pSocket){
			$this->_log();
		}
		return $this->pSocket;
	}
	protected function _bind(){
		$bRes = socket_bind($this->pSocket, $this->strHost, $this->nPort);
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	protected function _listen(){
		$bRes  = socket_listen($this->pSocket, 10) ;
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	public function accept(){
		$this->pClient = socket_accept($this->pSocket);
		if(!$this->pClient){
			$this->_log();
		}
		return $this->pClient;
	}
	protected function _connect(){
		$this->accept();
			
		if(socket_getpeername($this->pClient, $address, $port)){
			echo "Client $address : $port is now connected to us. \n";
		}
		$this->write("hello world from server\n");
	}
	protected function _reply(){
		$mxData = $this->read();
		var_dump($mxData);
		if ($mxData == false) {
			socket_close($this->pClient);

			echo "client disconnected.\n";
			return false;
		}
		else{
			$strMessage = "Client: ".trim($mxData)."\n";
			$this->write($strMessage);
			return true;
		}
	}
	public function run(){
		$this->_connect();
		$this->_reply();
	}
	public function read(){
		$mxMessage = socket_read($this->pClient, 1024, PHP_BINARY_READ);
		if($mxMessage === false){
			$this->_log();
		}
		return $mxMessage;
	}
	public function write($p_strMessage){
		$bRes = socket_write($this->pClient, $p_strMessage, strlen ($p_strMessage));
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	public function close(){
		$bRes = socket_close($this->pSocket);

		$this->pSocket = null;
	}
	protected function _log(){
		$this->strErrorCode = socket_last_error();
		$this->strErrorMsg = socket_strerror($this->strErrorCode);
		//throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
	}
	public function __destruct(){
		if($this->pSocket){
			$this->close();
		}
	}
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new ServerSocket($strHost, $nPort);
$pServer->run();

这里对socket_*系列函数进行了包装,创建socket服务端的步骤

  • 使用socket_create创建socket(套接字)。第一个参数AF_INET指IPV4网络协议,TCP和UDP均可使用,对应IPV6网络协议为AF_INET6,也可以使用UNIX socket协议AF_UNIX,作进程间通信
  • 。第二个参数对应套接字类型,SOCK_STREAM对应TCP协议使用,SOCK_DGRAM对应UDP协议使用,还有SOCK_SEQPACKET,SOCK_RAW,SOCK_RDM等类型。第三个为协议类型,TCP协议对应常量SOL_TCP,UDP协议对应常量SOL_UDP,其他协议可以从getprotobyname函数获取。

  • 使用socket_bind将套接字绑定到对应的主机端口或者UNIX socket上
  • 使用socket_listen监听该套接字上的连接
  • 使用socket_accept接收套接字上的请求连接,返回一个新的套接字用于与客户端通信。如果没有连接接入,将会阻塞住;如果有多个连接,使用第一个达到的连接。
  • 开始通信,使用socket_read获取请求信息,使用socket_write返回响应结果
  • 使用socket_close关闭连接,包括原始的和socket_accept产生的套接字

这个过程中,可以使用socket_last_error和socket_strerror获取错误信息。接着创建客户端

<?php
class ClientSocket{
	protected $strHost = "127.0.0.1";
	protected $nPort = 2015;
	protected $nProtocol = SOL_TCP;
	private $pSocket = null;
	public $strErrorCode = "";
	public $strErrorMsg  = "";

	public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
		//参数验证
		$this->strHost = $p_strHost;
		$this->nPort = $p_nPort;
		$this->nProtocol = $p_nProtocol;
		if($this->_create()){
			$this->_connect();
		}
	}
	private function _create(){
		$this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
		if(!$this->pSocket){
			$this->_log();
		}
		return $this->pSocket;
	}
	private function _connect(){
		$pSocket = $this->_create();
		$bRes = socket_connect($pSocket, $this->strHost, $this->nPort);
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	public function read(){
		$strMessage = "";
		$strBuffer = "";
		while ($strBuffer = socket_read ($this->pSocket, 1024, PHP_BINARY_READ)) {
			$strMessage .= $strBuffer;
		}
		return $strMessage;
	}
	public function write($p_strMessage){
		$bRes = socket_write($this->pSocket, $p_strMessage, strlen($p_strMessage));
		if(!$bRes){
			$this->_log();
		}
	}
	public function send($p_strMessage){
		$bRes = socket_send($this->pSocket , $p_strMessage , strlen($p_strMessage) , 0);
		if(!$bRes){
			$this->_log();
		}
		return true;
	}
	public function recv(){
		$strMessage = "";
		$strBuffer = "";
		$bRes = socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL);
		if(!$bRes){
			$this->_log();
		}
		$strMessage .=$strBuffer;
		return $strMessage;
	}
	public function close(){
		$bRes = socket_close($this->pSocket);

		$this->pSocket = null;
	}
	private function _log(){
		$this->strErrorCode = socket_last_error();
		$this->strErrorMsg = socket_strerror($this->strErrorCode);
		//throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
	}
	public function __destruct(){
		if($this->pSocket){
			$this->close();
		}
	}
}

$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);

$pClient = new ClientSocket($strHost, $nPort);

var_dump($pClient->read());
$strMessage = 'Some Thing :'.uniqid();
var_dump($strMessage);
$pClient->write($strMessage);
var_dump($pClient->read());

/*
 var_dump($pClient->recv());
$pClient->send('hello');
var_dump($pClient->recv());
*/
$pClient->close();

客户端的创建步骤:

  • 使用socket_create创建socket套接字,与服务端对应
  • 使用socket_connect连接到服务端的地址或UNIX socket
  • 开始通信,可以使用socket_write和socket_read向套接字写入和读取信息,也可以使用socket_send和socket_recv发送和接收信息
  • 使用socket_close关闭连接

运行服务端程序

php serversocket.php

在另一个终端里运行

[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             0.0.0.0:*                   LISTEN      12139/php

如果运行服务端失败,提示 socket_bind(): unable to bind address [98]: Address already in use ,则是端口绑定失败。查看端口占用

[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             127.0.0.1:36618             TIME_WAIT   -

该端口处于TIME_WAIT状态,需要再等一会儿才会释放。这是因为TCP连接关闭需要四次握手,服务端主动关闭了连接,但是未收到客户端发过来的关闭确认,导致处于等待状态,具体原因见火丁笔记《再叙TIME_WAIT》

如果服务端已经运行成功,在另一个终端里运行客户端程序

php clientsocket.php

这是一个简单服务端/客户端请求应答模型,通常服务端会一直处于监听状态,处理新的请求,重新写一个循环监听的服务端

class SelectServerSocket extends ServerSocket{
	public function run(){
		$this->loop();
	}
	public function loop(){
		$arrRead = array();
		$arrWrite = $arrExp = null;
		$arrClient = array($this->pSocket);
	
		while(true){
			$arrRead = $arrClient;
			if (socket_select($arrRead, $arrWrite, $arrExp, null) < 1){
				continue;
			}
			foreach ($arrRead as $pSocket){
				if($pSocket == $this->pSocket){
					$this->_connect();
						
					$arrClient[] = $this->pClient;
				}
				else{
					$bRes = $this->_reply();
					if($bRes === false){
						$nKey = array_search($this->pClient, $arrClient);
						unset($arrClient[$nKey]);
						continue;
					}
				}
	
			}
		}
		//usleep(100);
	}
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new SelectServerSocket($strHost, $nPort);
$pServer->run();

在循环里面使用socket_select查询有可以读的套接字,如果套接字为原始监听的套接字,则使用socket_accept获取新接入的通信套接字进行通信;如果是通信套接字,则与客户端进行交互。

这里socket_select($arrRead, $arrWrite, $arrExp, null)的第四个参数为null,表示可以无限阻塞,如果为0则不阻塞立即返回,其他大于0值则等待超时。
socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL)的第四个参数为MSG_WAITALL,表示阻塞读取结果。
socket_read ($this->pSocket, 1024, PHP_BINARY_READ )的第三个参数PHP_BINARY_READ表示读取以\0结束,PHP_NORMAL_READ表示读取以\n或\r结束

在终端里运行服务端,会一直在那里等待新的连接。这时候运行客户端,客户端确也阻塞住了。解决的办法有两种:超时设置和非阻塞设置。给ClientSocket类增加超时和阻塞设置的方法

	public function setTimeOut($p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
		$nSend = (int)$p_nSendTimeOut;
		$nRecv = (int)$p_nRecvTimeOut;

		$arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
		$arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));

		socket_set_option($this->pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
		socket_set_option($this->pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
	}
	public function setBlock($p_bType = true){
		if($p_bType){
			socket_set_block($this->pSocket);
		}
		else{
			socket_set_nonblock($this->pSocket);
		}
	}

客户端端运行前,先设置一下超时或非阻塞即,此时程序不会再阻塞了

$pClient = new ClientSocket($strHost, $nPort);

$pClient->setTimeOut(1, 1);
//$pClient->setBlock(false);

//do request here

同样在服务端设置超时和非阻塞也是可以的,给ServerSocket增加超时和非阻塞设置的方法

	protected function _setNoBlock($p_pSocket){
		socket_set_nonblock($p_pSocket);
	}
	protected function _setTimeOut($p_pSocket, $p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
		$nSend = (int)$p_nSendTimeOut;
		$nRecv = (int)$p_nRecvTimeOut;

		$arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
		$arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));

		socket_set_option($p_pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
		socket_set_option($p_pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
	}

将SelectServerSocket的socket_accept后产生的连接设置为非阻塞

	public function loop(){
		$arrRead = array();
		$arrWrite = $arrExp = null;
		$arrClient = array($this->pSocket);
		$this->_setNoBlock($this->pSocket);
	
		while(true){

再次运行服务端,客户端也不会阻塞了。

在while循环里面使用socket_select进行查询,效率比较低下,有先的连接要等下次循环才能处理;有时候并没有连接需要处理,也一直在循环。可以结合前面介绍过的PHP Libev扩展进行监听

class EvServerSocket extends ServerSocket{
	protected function _onConnect(){
		$this->_connect();
	
		$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
			$this->_reply();
		});
		Ev::run();
	}
	public function run(){
		$pReadWatcher = new EvIo($this->pSocket, Ev::READ, function ($watcher, $revents) {
			$this->_onConnect();
		});
		Ev::run();
	}
}

$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new EvServerSocket($strHost, $nPort);
$pServer->run();

代码看起来简单了很多。当原始套接字监听到可读事件时,便为新的套接字也创建可读事件监听 ,在事件里面处理新的连接。

通常的服务端程序是一个进程监听原始套接字,然后交由其他进程/线程处理新的连接套接字,与客户端进行交互,提升服务端性能。这样子又涉及到了多进程/多线程的控制、通信,需要一套完善的体系才行。


class MulProcessServerSocket extends EvServerSocket{
	protected function _execute(){
		if(!$this->_reply()){
			//子进程执行完毕,通知父进程
			exit();
		}
	}
	protected function _onConnect(){
		$pid = pcntl_fork();
		//父进程和子进程都会执行下面代码
		if ($pid == -1) {
			//错误处理:创建子进程失败时返回-1.
			die('could not fork');
		} else if ($pid) {
			//父进程会得到子进程号,所以这里是父进程执行的逻辑
			pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
		} else {
			//子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
			$this->_connect();

			$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
				$this->_execute();
			});
			Ev::run();
		}

	}
}

还可以使用stream_socket_*系列函数来创建sockt服务端和客户端。类似的创建一个客户端与之前的服务端进行交互

<?php
class ClientStreamSocket{
	private $pConnetion = null;
	protected $strAddress = "tcp://127.0.0.1:2016";
	protected $nTimeOut   = 3;
	protected $nFlag      = STREAM_CLIENT_CONNECT;
	public $strErrorCode = "";
	public $strErrorMsg  = "";
	const BLOCK   = 1;
	const NOBLOCK = 0;
	public function __construct($p_strAddress, $p_nTimeOut = 3, $p_nFlag = STREAM_CLIENT_CONNECT){
		$this->strAddress = $p_strAddress;
		$this->nTimeOut   = $p_nTimeOut;
		$this->nFlag      = $p_nFlag;
		$this->_connect();
	}
	private function _connect(){
		$this->pConnetion = stream_socket_client($this->strAddress, $this->strErrorCode, $this->strErrorMsg, $this->nTimeOut, $this->nFlag);
		if(!$this->pConnetion){
			throw new Exception("connect exception:".$this->strErrorMsg, $this->strErrorCode);
		}
		return $this->pConnetion;
	}
	public function write($p_strMessage){
		if(fwrite($this->pConnetion, $p_strMessage) !== strlen($p_strMessage))
		{
			throw new Exception('Can not send data');
		}
		return true;
	}
	public function read(){
		//接收一行,阻塞至\n结束
		//$strMessage = fgets($this->pConnetion);
		//指定长度读取
		//$strMessage = fread($this->pConnetion, 1024);
		$strMessage = stream_socket_recvfrom($this->pConnetion, 1024);
		//$strMessage = stream_get_contents($this->pConnetion);

		return $strMessage;
	}
	public function close(){
		fclose($this->pConnetion);
		$this->pConnetion = null;
	}
	public function setContext(){

	}
	public function setTimeOut($p_nTimeOut = 1){
		$bRes = stream_set_timeout($this->pConnetion, $p_nTimeOut);
	}
	public function setBlock($p_nMode = ClientStreamSocket::BLOCK){
		$bRes = stream_set_blocking($this->pConnetion, $p_nMode);
	}
	public function __destruct(){
		if($this->pConnetion){
			$this->close();
		}
	}
}

$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);

$strAddress = $strProtocol."://".$strHost.":".$nPort;

$pStream = new ClientStreamSocket($strAddress);
//$pStream->setBlock(ClientStreamSocket::NOBLOCK);
//$pStream->setTimeOut(1);
var_dump($pStream->read());
$pStream->write("hello from client\n");
var_dump($pStream->read());
$pStream->close();

使用stream_socket_*系列函数创建客户端要简单不少

  • 首先使用stream_socket_client创建一个socket操作流(stream)
  • 然后就可以像操作流式文件那样造成socket stream,使用fread和fwrite进行读写操作,也可以使用stream_socket_recvfrom和stream_socket_sendto进行操作
  • 使用fclose或stream_socket_shutdown关闭连接

使用stream_socket_*系列函数创建一个服务端来与之前的客户端进行交互,同样很简单,也与ServerSocket类似

<?php
class ServerStreamSocket{
	protected $pServer = null;
	protected $pClient = null;
	protected $strAddress = "tcp://127.0.0.1:2016";
	protected $nFlag      = STREAM_SERVER_LISTEN;
	
	const BLOCK   = 1;
	const NOBLOCK = 0;
	
	public $strErrorCode = "";
	public $strErrorMsg  = "";
	public function __construct($p_strAddress, $p_nFlag = STREAM_SERVER_LISTEN){
		$this->strAddress = $p_strAddress;
		$this->nFlag = $p_nFlag;
		$this->_create();
	}
	protected function _create(){
		$this->pServer = stream_socket_server($this->strAddress, $this->strErrorCode, $this->strErrorMsg);
		if(!$this->pServer ){
			throw new Exception("create exception:".$this->strErrorMsg, $this->strErrorCode);
		}
		return $this->pServer ;
	}
	public function accept(){
		$this->pClient = stream_socket_accept($this->pServer);
		if(!$this->pClient ){
			return false;
		}
		return $this->pClient ;
	}
	protected function _connect(){
		$this->accept();
		echo "Client". stream_socket_get_name($this->pClient, true)." is now connected to us. \n";
		$this->write("hello world from server\n");
	}
	protected function _reply(){
		$mxData = $this->read();
		var_dump($mxData);
		if($mxData == false){
			fclose($this->pClient);
				
			echo "client disconnected.\n";
			return false;
		}
		else{
			$strMessage = "Client:".trim($mxData)."\n";
			$this->write($strMessage);
			return true;
		}
	}
	public function run(){
		$this->_connect();
		$this->_reply();
	}
	public function write($p_strMessage){
		//$nLen = fwrite($this->pClient, $p_strMessage);
		$nLen = stream_socket_sendto($this->pClient, $p_strMessage);
		if($nLen !== strlen($p_strMessage))
		{
			throw new Exception('Can not send data');
		}
		return true;
	}
	public function read(){
		//接收一行,阻塞至\n结束
		//$strMessage = fgets($this->pClient);
		//指定长度读取
		//$strMessage = fread($this->pClient, 1024);
		$strMessage = stream_socket_recvfrom($this->pClient, 1024);
		//$strMessage = stream_get_contents($this->pClient);

		return $strMessage;
	}
	public function close(){
		fclose($this->pServer);
		
		$this->pServer = null;
	}
	public function setContext(){

	}
	public function setTimeOut($p_pConnetction, $p_nTimeOut = 1){
		$bRes = stream_set_timeout($p_pConnetction, $p_nTimeOut);
	}
	public function setBlock($p_pConnetction, $p_nMode = ServerStreamSocket::BLOCK){
		$bRes = stream_set_blocking($p_pConnetction, $p_nMode);
	}
	public function __destruct(){
		if($this->pServer){
			$this->close();
		}
	}
}
class SelectServerStreamSocket extends ServerStreamSocket{
	public function run(){
		$this->loop();
	}
	public function loop(){
		$arrRead = array();
		$arrWrite = $arrExp = null;
		$arrClient = array($this->pServer);
		while(true){
			$arrRead = $arrClient;
			if (stream_select($arrRead, $arrWrite, $arrExp, null) < 1){
				continue;
			}
			if(in_array($this->pServer, $arrRead)){
				$this->_connect();
	
				$arrClient[] = $this->pClient;
	
				$nKey = array_search($this->pServer, $arrRead);
				unset($arrRead[$nKey]);
			}
			foreach($arrRead as $pConnetcion){
				$bRes = $this->_reply();
				if($bRes === false){
					$nKey = array_search($this->pClient, $arrClient);
					unset($arrClient[$nKey]);;
					continue;
				}
			}
		}
		//usleep(100);
	}
}
class EvServerStreamSocket extends ServerStreamSocket{
	protected function _onConnect(){
		$this->_connect();
		
		$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
			$this->_reply();
		});
		Ev::run();
	}
	public function run(){
		$pReadWatcher = new EvIo($this->pServer, Ev::READ, function ($watcher, $revents) {
			$this->_onConnect();
		});
		Ev::run();
	}
}
class MulProcessServerStreamSocket extends EvServerStreamSocket{
	protected function _execute(){
		if(!$this->_reply()){
			//子进程执行完毕,通知父进程
			exit();
		}
	}
	protected function _onConnect(){
		$pid = pcntl_fork();
		if ($pid == -1) {
			die('could not fork');
		} else if ($pid) {
			pcntl_wait($status);
		} else {
			$this->_connect();

			$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
				$this->_execute();
			});
			Ev::run();
		}

	}
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);

$strAddress = $strProtocol."://".$strHost.":".$nPort;

$pServer = new EvServerStreamSocket($strAddress);

$pServer->run();

这里演示客户端与服务端交互,都是两步走,先发送一个请求再获取结果。在Thrift RPC远程调用中,既可先发送请求,过一会儿再来获取结果,达到异步交互的目的;也可发送完请求后立即获取结果,达到同步请求的目的。

参考链接:
Socket Programming in PHP
Socket programming with streams in php
PHP Socket programming tutorial
php 实例说明 socket通信机制
Mpass – PHP做Socket服务的解决方案
How to forcibly close a socket in TIME_WAIT?

PHP 事件驱动开发

最近在学习PHP的系统事件驱动(event-base)开发,发现PHP有好几个event扩展,根据底层库依赖分为两类:libeventlibev。libevent可以为文件描述符、信号、超时设定等事件提供了监听回调,支持poll/kqueue/event port/select/epoll。libevent 库的其他组件提供其他功能,包括缓冲的事件系统(用于缓冲发送到客户端/从客户端接收的数据)以及 HTTP、DNS 和 RPC 系统的核心实现。libev提供了各种监听器,包括子进程监听,超时设定,定时器,IO监听,信号监听,文件监视等,支持epoll/kqueue/event ports/inotify/eventfd/signalfd,更快的时钟管理,时间变化检测和修正。PHP依赖libevent扩展有libeventevent,PHP依赖libev扩展则有Evlibev

libevent在PHP事件驱动开发上应用广泛,比如workermanphpDaemonReactPHPKellner。CentOS上PHP 5.4安装libevent扩展

sudo yum install libevent-devel

wget https://pecl.php.net/get/libevent-0.1.0.tgz
tar -zxvf libevent-0.1.0.tgz
cd libevent-0.1.0
phpize 
./configure
sudo make
sudo make install

#增加libevent.so
sudo vim /etc/php.ini

#是否安装成功
php -m | grep libevent

前面介绍过使用ticks和pcntl_signal来做定时器,然而tick运行机制是PHP解释器每执行 N 条可计时的低级语句就会发生的事件,如果tick值设置小了,会产生频繁的系统调用,设置大了又不能保证及时。使用libevent来设置一个定时器

<?php
function print_dot(){
	echo ".";
}

class Timer{
	protected  $pEventBase;
	protected $pEvent;
	public $nInterval = 1;
	public function __construct(){
		$this->pEventBase = event_base_new();
	}
	public function addEvent($p_pFunc, $p_mxArgs = null){
		$this->pEvent = event_new();
		event_set($this->pEvent, 0, EV_TIMEOUT, $p_pFunc, $p_mxArgs);
		event_base_set($this->pEvent, $this->pEventBase);
	}
	public function loop(){
		event_add($this->pEvent, $this->nInterval*1000000);
		event_base_loop($this->pEventBase);
	}
}

$pTimer = new Timer();
$pTimer->addEvent("print_dot");
while(1){
	$pTimer->loop();
}

libevent使用也很简单:

  • 使用event_base_new和event_new分别创建event_base和event
  • 使用event_set为event设置要监听文件描述符fd,比如文件、socke、信号,超时则fd为0,事件类型和回调函数
  • 使用event_base_set关联event_base和event
  • 使用event_add将设置好的event加入事件监听器
  • 调用event_base_loop开始处理事件

官网上有个例子用来做socket监听处理

<?php
$socket = stream_socket_server ('tcp://0.0.0.0:2000', $errno, $errstr);
stream_set_blocking($socket, 0);
$base = event_base_new();
$event = event_new();
event_set($event, $socket, EV_READ | EV_PERSIST, 'ev_accept', $base);
event_base_set($event, $base);
event_add($event);
event_base_loop($base);

$GLOBALS['connections'] = array();
$GLOBALS['buffers'] = array();

function ev_accept($socket, $flag, $base) {
    static $id = 0;
    
    $connection = stream_socket_accept($socket);
    stream_set_blocking($connection, 0);
    
    $id += 1;
    
    $buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $id);
    event_buffer_base_set($buffer, $base);
    event_buffer_timeout_set($buffer, 30, 30);
    event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
    event_buffer_priority_set($buffer, 10);
    event_buffer_enable($buffer, EV_READ | EV_PERSIST);
    
    // we need to save both buffer and connection outside
    $GLOBALS['connections'][$id] = $connection;
    $GLOBALS['buffers'][$id] = $buffer;
}

function ev_error($buffer, $error, $id) {
    event_buffer_disable($GLOBALS['buffers'][$id], EV_READ | EV_WRITE);
    event_buffer_free($GLOBALS['buffers'][$id]);
    fclose($GLOBALS['connections'][$id]);
    unset($GLOBALS['buffers'][$id], $GLOBALS['connections'][$id]);
}

function ev_read($buffer, $id) {
    while ($read = event_buffer_read($buffer, 256)) {
        var_dump($read);
    }
}

相比libevent,event扩展提供了面向对象的方法,支持libevent 2+ 的特性,对HTTP,DNS,OpenSSL等协议操作进行封装。Kellner框架比较有意思,在PHP的libevent扩展基础上将http请求处理封装成了扩展,使用cli模式处理http请求,并给出了基于Zend Framework 2的示例。

libev自称libevent的替代者,克服了libevent的一些不利影响,开销更小,Node JS便是利用它来做事件驱动。相比基于libeventd的扩展,基于libev的ev扩展更新比较积极,支持设置各种的监听器,为感兴趣的事件注册回调,比如文件变化,超时。CentOS上PHP 5.4安装ev扩展

wget https://pecl.php.net/get/ev-0.2.15.tgz
tar -zxvf ev-0.2.15
cd ev-0.2.15
phpize 
./configure
sudo make
sudo make install

#增加ev.so
sudo vim /etc/php.ini

#是否安装成功
php -m | grep ev

libev封装了各种监视器,操作也比较简单。

<?php
/**
 * 延迟1秒后执行,不重复
 */
$pDelay = new EvTimer(1, 0, function () {
	echo "1 delay \n";
});
/**
 * 每隔一秒执行一次的定时器,0秒后执行
 */
$pTimer = new EvTimer(0, 1, function () {
	echo "1 seconds \n";
});
/**
 * 如果没有其他更高等级的监视器,那么就执行EvIdle,处于低优先级则不执行
 */
$pIdle = new EvIdle(function(){
	sleep(1);
	echo "idle timer \n";
},0,2);
/**
 * 每一次loop开始都会执行
 */
$pPrepare = new EvPrepare(function(){
	echo "before timer \n";
},0);
/**
 * 每一次loop都会执行,可以通过优先级调整执行顺序
 */
$c = new EvCheck(function(){
	echo "after timer \n";
},0,-1);
/**
 * 定时器,每隔1.5秒后执行一次,0秒后开始
 */
$pPeriod = new EvPeriodic(0., 1.5, NULL, function ($w, $revents) {
	echo time(), PHP_EOL;
});
/**
 * IO输入事件监听,可以拿去监听socket的Ev::WRITE和Ev::READ事件
 */
$pReadWatcher = new EvIo(STDIN, Ev::READ, function ($watcher, $revents) {
	echo "STDIN is readable\n";
});

/**
 * 注册监听感兴趣的信号
 */
$pSignal = new EvSignal(SIGTERM, function ($watcher) {
	echo "SIGTERM received\n";
	$watcher->stop();
});
/**
 * 文件变化监听器,10秒监测一次
 */
$pStatWatcher = new EvStat("/var/log/messages", 10, function ($w) {
	echo "/var/log/messages changed\n";
	
	$attr = $pStatWatcher->attr();
	
	if ($attr['nlink']) {
		printf("Current size: %ld\n", $attr['size']);
		printf("Current atime: %ld\n", $attr['atime']);
		printf("Current mtime: %ld\n", $attr['mtime']);
	} else {
		fprintf(STDERR, "`messages` file is not there!");
		$pStatWatcher->stop();
	}
});
	
/**
 * 开始执行Ev::RUN_ONCE则立即执行Ev::RUN_NOWAIT则非阻塞执行
 */
Ev::run();

也可以监听子进程

$pid = pcntl_fork();

if ($pid == -1) {
    fprintf(STDERR, "pcntl_fork failed\n");
} elseif ($pid) {
    $w = new EvChild($pid, FALSE, function ($w, $revents) {
        $w->stop();

        printf("Process %d exited with status %d\n", $w->rpid, $w->rstatus);
    });

    Ev::run();

    // Protect against Zombies
    pcntl_wait($status);
} else {
    //Forked child
    exit(2);
}

php的libev扩展也实现了libev的所有监视器,提供类似的用法,但比较久没更新了。

在网络编程中,使用事件驱动模型监听感兴趣的事件,结合异步处理,能够大大提高服务器性能。传统服务器模型如Apache为每一个请求生成一个子进程。当用户连接到服务器的一个子进程就产生,并处理连接。每个连接获得一个单独的线程和子进程。当用户请求数据返回时,子进程开始等待数据库操作返回。如果此时另一个用户也请求返回数据,这时就产生了阻塞。以下引用自《使用事件驱动模型实现高效稳定的网络服务器程序》

简单网络编程模型里面,服务器与客户端都是一应一答,大部分的 socket 接口都是阻塞型的。在面对多个客户端的请求时候,最简单的解决方式是在服务器端使用多线程(或多进程)。如果要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而线程与进程本身也更容易进入假死状态。
于是便有了“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。
但是,“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用 IO 接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。对付可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。

于是便有了基于事件驱动的非阻塞型服务器,比如Nginx,Node.js。Nginx采用事件驱动,使用epoll事件模型,充分使用异步逻辑,削减了上下文调度开销,并发服务能力更强。Node.js 的异步机制是基于事件的,所有的磁盘 I/O、网络通信、数据库查询都以非阻塞的方式请求,返回的结果由事件循环来处理。Node.js 在执行的过程中会维护一个事件队列,程序在执行时进入事件循环等待下一个事件到来,每个异步式 I/O 请求完成后会被推送到事件队列,等待程序进程进行处理。

参考链接:
libev – a high performance full-featured event loop written in C
Working with events
使用 libevent 和 libev 提高网络应用性能
为什么事件驱动服务器这么火
Asynchronous PHP and Real-time Messaging
react.php 中的异步实现