标签归档:php

php

PHP ZeroMQ开发

ZeroMQ的名字有点巧妙,看起来是个MQ却加了个0,变得不是MQ。ZeroMQ是一个面向消息传递的网络通信框架,支持程序在进程内部部通信,进程之间通信,网络间通信,多播等。ZeroMQ对Socket进行了封装,支持多种网络结构范式如Request/Reply,Pub/Sub,Pull/Push,中介,路由等,还可以在这些模式再次扩展,动态扩容程序和分布式任务开发,能够轻易搭建服务程序集群。

ZeroMQ与支持AMQP的消息中间件不一样,ZeroMQ是一个网络通信库,需要自行实现中间节点和消息的管理。

在CentOS安装ZeroMQ4

git clone https://github.com/zeromq/zeromq4-x.git
cd zeromq4-x
./autogent.sh
./configure
sudo make
sudo make install

#声明libzmq库的位置
sudo vim /etc/ld.so.conf.d/libzmq.conf
#内容:/usr/local/lib

sudo ldconfig

ZeroMQ支持多种编程语言,也包括PHP。php-zmq安装

git clone https://github.com/mkoppanen/php-zmq.git
cd php-zmq
phpize
./configure
sudo make
sudo make install

sudo vim /etc/php.ini

编辑PHP.ini增加扩展信息

[zeromq]
extension = zmq.so

查看扩展是否加载成功:php -m | grep php。
先写一个简单请求-应答,首先是服务端reply.php

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5559");
while (true){
	$strMessage = $pServer->recv();
	echo $strMessage."\r\n";
	$pServer->send("From Server1:".$strMessage);
}

然后是客户端request.php

<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("tcp://localhost:5559");
$pClient->send("Hello From Client:".uniqid());
var_dump($pClient->recv());

分别在不同终端预先一下程序

#请求者可以先启动
php request.php
#另一个终端
php reply.php

使用ZeroMQ进行通信的步骤

  • 使用ZMQContext创建一个上下文
  • 使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括
    • PUB,SUB
    • REQ,REP
    • REQ,ROUTER (take care, REQ inserts an extra null frame)
    • DEALER,REP (take care, REP assumes a null frame)
    • DEALER,ROUTER
    • DEALER,DEALER
    • ROUTER,ROUTER
    • PUSH,PULL
    • PAIR,PAIR

    分类包括

    • 轮询,REQ,PUSH,DEALER
    • 多播,PUB
    • 公平排队,REP,SUB,PULL,DEALER
    • 明确寻址,ROUTER
    • 单播,PAIR
  • 如果是服务端就bind,如果是客户端就conncet,这里的连接信息支持
    • 进程内部通信,inproc://
    • 进程间通信,ipc://
    • 网络间通信,tcp://
    • 多播,pgm://
  • 使用send/recv发送/接收消息

使用ZeroMQ创建通信比socket简单多了,与stream_socket差不多。但是使用ZeroMQ,客户端可以先启动而不用管服务端是否已经启动了,等服务端连接上了便会自动传递消息,还可以维持节点之间的心跳。

ZeroMQ与socket通信是不一样的。ZeroMQ是无状态的,对socket的细节进行了封装,不能知道彼此的socket连接信息,仅能接收和发送消息;ZeroMQ能够使用一个socket与多个节点进行通信,具有极高的性能。

再回头看一下服务端程序,这里采用while循环来处理,亦即同一时刻只能处理一个请求,多个请求排队直到被轮询到,客户端的发送和接收都是同步等待。由于不知道客户端信息,也不能在子进程内处理完成再返回。这里就需要用到ZeroMQ各种范式的组合,比如下面这个
fig16
这里使用ROUTER和DEALER作为中介,转发请求,客户端可以异步发送求,不用等待服务端响应。

<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");

$pPoll = new ZMQPoll();
$pPoll->add($pFrontend, ZMQ::POLL_IN);
$pPoll->add($pBackend, ZMQ::POLL_IN);


$arrRead = $arrWrite = array();
while(true){
	$nEvent = $pPoll->poll($arrRead, $arrWrite);
    if ($nEvent > 0) {
		foreach($arrRead as $pSocket){
			if($pSocket === $pFrontend){
				while (true){
					$strMessage = $pSocket->recv();
					$nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
					$pBackend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
					if(!$nMore){
						break;
					}
				}
			}
			else if ($pSocket === $pBackend){
				while (true){
					$strMessage = $pSocket->recv();
					$nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
					$pFrontend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
					if(!$nMore){
						break;
					}
				}
			}
		}
    }
}

然后更改服务端reply.php,不再绑定监听,而不是连接到DEALER上

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
//$pServer->bind("tcp://*:5555");
$pServer->connect("tcp://localhost:5560");
while (true){
	$strMessage = $pServer->recv();
	echo $strMessage."\r\n";
	$pServer->send("From Server1:".$strMessage);
}

这里使用ZMQPoll对ZMQSOcket的输入输出事件进行轮询,将ROUTER收到的REQ转发给服务端,将DEALER收到的REP转发给客户端。事实上,还有更简便的方法:使用ZMQDevice将ROUTER和DEALER组合起来

<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");

$pDevice = new ZMQDevice($pFrontend, $pBackend);
$pDevice->run();

ZeroMQ的Pub/Sub的通信模型支持一个发布者发布消息给多个订阅者,也支持一个订阅者从多个发布者订阅消息。首先写一个发布者

<?php
$pContext = new ZMQContext();
$pPublisher = new ZMQSocket($pContext, ZMQ::SOCKET_PUB);
$pPublisher->bind("tcp://*:5563");

while (true) {
    $pPublisher->send("A", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We don't want to see this");
    $pPublisher->send("B", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We would like to see this");
    sleep (1);
}

然后是订阅者

$pContext = new ZMQContext();
$pSubscriber = new ZMQSocket($pContext, ZMQ::SOCKET_SUB);
$pSubscriber->connect("tcp://localhost:5563");
#可以连接多个发布者
$pSubscriber->connect("tcp://localhost:5564");
$pSubscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "B");

while (true) {
    //  Read envelope with address
    $address = $pSubscriber->recv();
    //  Read message contents
    $contents = $pSubscriber->recv();
    printf ("[%s] %s%s", $address, $contents, PHP_EOL);
}

Pub/Sub模型,发布者只能发布消息,要求发布消息前,先声明主题(地址),然后发布消息内容;订阅者只能接收消息,先设置订阅主题,然后两次接收,第一次为消息主题,第二次为消息内容。
Pub/Sub模型通消息为单向流动,可以结合其他模型让订阅者与发布者互动,比如REQ\REP。

ZeroMQ的Push/Pull模型,生产者负责推送消息,消费者负责拉取消息。初看之下Pull/Push模型与Pub/sub模型类似,但是Pull/Push下生产者产生的消息只会投递给一个消费者,并不会发布给全部消费者,适合用于任务投递分配
fig5
Push和Pull都既可作为服务端,也可作为客户端。服务端Push.php

<?php
$pContext = new ZMQContext();
$pPush = new ZMQSocket($pContext, ZMQ::SOCKET_PUSH);

$pPush->bind("tcp://*:5558");
//$pPush->connect("tcp://localhost:5558");
//$pPush->connect("tcp://localhost:5559");

$pPush->send("Hello Client 1");

客户端Pull.php

<?php
$pContext = new ZMQContext();
$pPull = new ZMQSocket($pContext, ZMQ::SOCKET_PULL);

//$pPull->bind("tcp://*:5558");
$pPull->connect("tcp://localhost:5558");
$pPull->connect("tcp://localhost:5559");

var_dump($pPull->recv());

如果同时启动了两个客户端Pull.php,而只启动一个服务端Push.php,那么一次只会有一个客户端接收到消息。也可以以Pull作为主动监听,Push作为被动连接。可以同时接可以Pub/Sub和Pull/Push来处理任务
fig56
如果是用ZeroMQ传递消息收不到,可以按下面这个流程查问题
chapter1_9
除了客户端可以连接多个服务端,服务端同样可以绑定多个地址。在REQ/REP模型里,让服务端同时使用IPC(进程间通信)来处理本机的连接

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5556");
$pServer->bind("ipc:///tmp/req.ipc");
while(true){
	$message = $pServer->recv();
	echo $message . PHP_EOL;
	$pServer->send("Hello from server1:".$message);
}

客户端可以选择走TCP或者IPC进行消息通信

<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("ipc:///tmp/req.ipc");
//$pClient->connect("tcp://localhost:5556");
$pClient->send("Hello From Client1:".uniqid());
$strMessage = $pClient->recv();
echo $strMessage,PHP_EOL;

使用ZeroMQ的进程内部消息通信也很简单

$pServer  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
$pServer->bind("inproc://reply");


$pClient  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ);
$pClient->connect("inproc://reply");;
$pClient->send("Hello From Client1:".uniqid());

var_dump($pServer->recv());

ZeroMQ为消息传递的提供极简的方法,提供了各种连接模型,可以自由扩展。zguide更像是一个网络编程指南,指导大家如何利用ZeroMQ搭建各种网络通信模式,提高程序扩展性和健壮性。虽然ZeroMQ解决了进程间和网络间的通信问题,但是各个组件本身进程控制仍然需要自行实现。

更新:ZeroMQ的作者用C语言创建了另外一个支持多种通用通信范式的socket库:nanomsg,可以用来代替ZeroMQ做的那些事,提供了更好的伸缩性,也有对应的PHP扩展

参考链接:
ZMQ 指南
ZeroMQ in PHP
zeromq is the answer
ZeroMQ + libevent in PHP
Europycon2011: Implementing distributed application using ZeroMQ
Getting Started with ‘nanomsg’
A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

PHP Socket通信

前一篇介绍了跨语言的服务调用框架Thrift,模块与模块之间调用,网络通信必不可少。这里具体介绍下如何使用PHP socket客户端与服务端进行通信。

PHP 的Socket扩展是基于流行的BSD sockets,实现了和socket通讯功能的底层接口,它可以和通用客户端一样当做一个socket服务器。这里的通用客户端是指stream_socket_*系列封装的函数。

首先写一个socket服务端

<?php
class ServerSocket{
	protected $strHost = "127.0.0.1";
	protected $nPort = 2015;
	protected $nProtocol = SOL_TCP;
	protected $pSocket = null;
	protected $pClient = null;

	public $strErrorCode = "";
	public $strErrorMsg  = "";
	public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
		//参数验证
		$this->strHost = $p_strHost;
		$this->nPort = $p_nPort;
		$this->nProtocol = $p_nProtocol;
		if($this->_create()&&$this->_bind()){
			$this->_listen();
		}
	}
	protected function _create(){
		$this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
		if(!$this->pSocket){
			$this->_log();
		}
		return $this->pSocket;
	}
	protected function _bind(){
		$bRes = socket_bind($this->pSocket, $this->strHost, $this->nPort);
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	protected function _listen(){
		$bRes  = socket_listen($this->pSocket, 10) ;
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	public function accept(){
		$this->pClient = socket_accept($this->pSocket);
		if(!$this->pClient){
			$this->_log();
		}
		return $this->pClient;
	}
	protected function _connect(){
		$this->accept();
			
		if(socket_getpeername($this->pClient, $address, $port)){
			echo "Client $address : $port is now connected to us. \n";
		}
		$this->write("hello world from server\n");
	}
	protected function _reply(){
		$mxData = $this->read();
		var_dump($mxData);
		if ($mxData == false) {
			socket_close($this->pClient);

			echo "client disconnected.\n";
			return false;
		}
		else{
			$strMessage = "Client: ".trim($mxData)."\n";
			$this->write($strMessage);
			return true;
		}
	}
	public function run(){
		$this->_connect();
		$this->_reply();
	}
	public function read(){
		$mxMessage = socket_read($this->pClient, 1024, PHP_BINARY_READ);
		if($mxMessage === false){
			$this->_log();
		}
		return $mxMessage;
	}
	public function write($p_strMessage){
		$bRes = socket_write($this->pClient, $p_strMessage, strlen ($p_strMessage));
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	public function close(){
		$bRes = socket_close($this->pSocket);

		$this->pSocket = null;
	}
	protected function _log(){
		$this->strErrorCode = socket_last_error();
		$this->strErrorMsg = socket_strerror($this->strErrorCode);
		//throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
	}
	public function __destruct(){
		if($this->pSocket){
			$this->close();
		}
	}
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new ServerSocket($strHost, $nPort);
$pServer->run();

这里对socket_*系列函数进行了包装,创建socket服务端的步骤

  • 使用socket_create创建socket(套接字)。第一个参数AF_INET指IPV4网络协议,TCP和UDP均可使用,对应IPV6网络协议为AF_INET6,也可以使用UNIX socket协议AF_UNIX,作进程间通信
  • 。第二个参数对应套接字类型,SOCK_STREAM对应TCP协议使用,SOCK_DGRAM对应UDP协议使用,还有SOCK_SEQPACKET,SOCK_RAW,SOCK_RDM等类型。第三个为协议类型,TCP协议对应常量SOL_TCP,UDP协议对应常量SOL_UDP,其他协议可以从getprotobyname函数获取。

  • 使用socket_bind将套接字绑定到对应的主机端口或者UNIX socket上
  • 使用socket_listen监听该套接字上的连接
  • 使用socket_accept接收套接字上的请求连接,返回一个新的套接字用于与客户端通信。如果没有连接接入,将会阻塞住;如果有多个连接,使用第一个达到的连接。
  • 开始通信,使用socket_read获取请求信息,使用socket_write返回响应结果
  • 使用socket_close关闭连接,包括原始的和socket_accept产生的套接字

这个过程中,可以使用socket_last_error和socket_strerror获取错误信息。接着创建客户端

<?php
class ClientSocket{
	protected $strHost = "127.0.0.1";
	protected $nPort = 2015;
	protected $nProtocol = SOL_TCP;
	private $pSocket = null;
	public $strErrorCode = "";
	public $strErrorMsg  = "";

	public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
		//参数验证
		$this->strHost = $p_strHost;
		$this->nPort = $p_nPort;
		$this->nProtocol = $p_nProtocol;
		if($this->_create()){
			$this->_connect();
		}
	}
	private function _create(){
		$this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
		if(!$this->pSocket){
			$this->_log();
		}
		return $this->pSocket;
	}
	private function _connect(){
		$pSocket = $this->_create();
		$bRes = socket_connect($pSocket, $this->strHost, $this->nPort);
		if(!$bRes){
			$this->_log();
		}
		return $bRes;
	}
	public function read(){
		$strMessage = "";
		$strBuffer = "";
		while ($strBuffer = socket_read ($this->pSocket, 1024, PHP_BINARY_READ)) {
			$strMessage .= $strBuffer;
		}
		return $strMessage;
	}
	public function write($p_strMessage){
		$bRes = socket_write($this->pSocket, $p_strMessage, strlen($p_strMessage));
		if(!$bRes){
			$this->_log();
		}
	}
	public function send($p_strMessage){
		$bRes = socket_send($this->pSocket , $p_strMessage , strlen($p_strMessage) , 0);
		if(!$bRes){
			$this->_log();
		}
		return true;
	}
	public function recv(){
		$strMessage = "";
		$strBuffer = "";
		$bRes = socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL);
		if(!$bRes){
			$this->_log();
		}
		$strMessage .=$strBuffer;
		return $strMessage;
	}
	public function close(){
		$bRes = socket_close($this->pSocket);

		$this->pSocket = null;
	}
	private function _log(){
		$this->strErrorCode = socket_last_error();
		$this->strErrorMsg = socket_strerror($this->strErrorCode);
		//throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
	}
	public function __destruct(){
		if($this->pSocket){
			$this->close();
		}
	}
}

$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);

$pClient = new ClientSocket($strHost, $nPort);

var_dump($pClient->read());
$strMessage = 'Some Thing :'.uniqid();
var_dump($strMessage);
$pClient->write($strMessage);
var_dump($pClient->read());

/*
 var_dump($pClient->recv());
$pClient->send('hello');
var_dump($pClient->recv());
*/
$pClient->close();

客户端的创建步骤:

  • 使用socket_create创建socket套接字,与服务端对应
  • 使用socket_connect连接到服务端的地址或UNIX socket
  • 开始通信,可以使用socket_write和socket_read向套接字写入和读取信息,也可以使用socket_send和socket_recv发送和接收信息
  • 使用socket_close关闭连接

运行服务端程序

php serversocket.php

在另一个终端里运行

[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             0.0.0.0:*                   LISTEN      12139/php

如果运行服务端失败,提示 socket_bind(): unable to bind address [98]: Address already in use ,则是端口绑定失败。查看端口占用

[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             127.0.0.1:36618             TIME_WAIT   -

该端口处于TIME_WAIT状态,需要再等一会儿才会释放。这是因为TCP连接关闭需要四次握手,服务端主动关闭了连接,但是未收到客户端发过来的关闭确认,导致处于等待状态,具体原因见火丁笔记《再叙TIME_WAIT》

如果服务端已经运行成功,在另一个终端里运行客户端程序

php clientsocket.php

这是一个简单服务端/客户端请求应答模型,通常服务端会一直处于监听状态,处理新的请求,重新写一个循环监听的服务端

class SelectServerSocket extends ServerSocket{
	public function run(){
		$this->loop();
	}
	public function loop(){
		$arrRead = array();
		$arrWrite = $arrExp = null;
		$arrClient = array($this->pSocket);
	
		while(true){
			$arrRead = $arrClient;
			if (socket_select($arrRead, $arrWrite, $arrExp, null) < 1){
				continue;
			}
			foreach ($arrRead as $pSocket){
				if($pSocket == $this->pSocket){
					$this->_connect();
						
					$arrClient[] = $this->pClient;
				}
				else{
					$bRes = $this->_reply();
					if($bRes === false){
						$nKey = array_search($this->pClient, $arrClient);
						unset($arrClient[$nKey]);
						continue;
					}
				}
	
			}
		}
		//usleep(100);
	}
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new SelectServerSocket($strHost, $nPort);
$pServer->run();

在循环里面使用socket_select查询有可以读的套接字,如果套接字为原始监听的套接字,则使用socket_accept获取新接入的通信套接字进行通信;如果是通信套接字,则与客户端进行交互。

这里socket_select($arrRead, $arrWrite, $arrExp, null)的第四个参数为null,表示可以无限阻塞,如果为0则不阻塞立即返回,其他大于0值则等待超时。
socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL)的第四个参数为MSG_WAITALL,表示阻塞读取结果。
socket_read ($this->pSocket, 1024, PHP_BINARY_READ )的第三个参数PHP_BINARY_READ表示读取以\0结束,PHP_NORMAL_READ表示读取以\n或\r结束

在终端里运行服务端,会一直在那里等待新的连接。这时候运行客户端,客户端确也阻塞住了。解决的办法有两种:超时设置和非阻塞设置。给ClientSocket类增加超时和阻塞设置的方法

	public function setTimeOut($p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
		$nSend = (int)$p_nSendTimeOut;
		$nRecv = (int)$p_nRecvTimeOut;

		$arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
		$arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));

		socket_set_option($this->pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
		socket_set_option($this->pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
	}
	public function setBlock($p_bType = true){
		if($p_bType){
			socket_set_block($this->pSocket);
		}
		else{
			socket_set_nonblock($this->pSocket);
		}
	}

客户端端运行前,先设置一下超时或非阻塞即,此时程序不会再阻塞了

$pClient = new ClientSocket($strHost, $nPort);

$pClient->setTimeOut(1, 1);
//$pClient->setBlock(false);

//do request here

同样在服务端设置超时和非阻塞也是可以的,给ServerSocket增加超时和非阻塞设置的方法

	protected function _setNoBlock($p_pSocket){
		socket_set_nonblock($p_pSocket);
	}
	protected function _setTimeOut($p_pSocket, $p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
		$nSend = (int)$p_nSendTimeOut;
		$nRecv = (int)$p_nRecvTimeOut;

		$arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
		$arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));

		socket_set_option($p_pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
		socket_set_option($p_pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
	}

将SelectServerSocket的socket_accept后产生的连接设置为非阻塞

	public function loop(){
		$arrRead = array();
		$arrWrite = $arrExp = null;
		$arrClient = array($this->pSocket);
		$this->_setNoBlock($this->pSocket);
	
		while(true){

再次运行服务端,客户端也不会阻塞了。

在while循环里面使用socket_select进行查询,效率比较低下,有先的连接要等下次循环才能处理;有时候并没有连接需要处理,也一直在循环。可以结合前面介绍过的PHP Libev扩展进行监听

class EvServerSocket extends ServerSocket{
	protected function _onConnect(){
		$this->_connect();
	
		$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
			$this->_reply();
		});
		Ev::run();
	}
	public function run(){
		$pReadWatcher = new EvIo($this->pSocket, Ev::READ, function ($watcher, $revents) {
			$this->_onConnect();
		});
		Ev::run();
	}
}

$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new EvServerSocket($strHost, $nPort);
$pServer->run();

代码看起来简单了很多。当原始套接字监听到可读事件时,便为新的套接字也创建可读事件监听 ,在事件里面处理新的连接。

通常的服务端程序是一个进程监听原始套接字,然后交由其他进程/线程处理新的连接套接字,与客户端进行交互,提升服务端性能。这样子又涉及到了多进程/多线程的控制、通信,需要一套完善的体系才行。


class MulProcessServerSocket extends EvServerSocket{
	protected function _execute(){
		if(!$this->_reply()){
			//子进程执行完毕,通知父进程
			exit();
		}
	}
	protected function _onConnect(){
		$pid = pcntl_fork();
		//父进程和子进程都会执行下面代码
		if ($pid == -1) {
			//错误处理:创建子进程失败时返回-1.
			die('could not fork');
		} else if ($pid) {
			//父进程会得到子进程号,所以这里是父进程执行的逻辑
			pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
		} else {
			//子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
			$this->_connect();

			$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
				$this->_execute();
			});
			Ev::run();
		}

	}
}

还可以使用stream_socket_*系列函数来创建sockt服务端和客户端。类似的创建一个客户端与之前的服务端进行交互

<?php
class ClientStreamSocket{
	private $pConnetion = null;
	protected $strAddress = "tcp://127.0.0.1:2016";
	protected $nTimeOut   = 3;
	protected $nFlag      = STREAM_CLIENT_CONNECT;
	public $strErrorCode = "";
	public $strErrorMsg  = "";
	const BLOCK   = 1;
	const NOBLOCK = 0;
	public function __construct($p_strAddress, $p_nTimeOut = 3, $p_nFlag = STREAM_CLIENT_CONNECT){
		$this->strAddress = $p_strAddress;
		$this->nTimeOut   = $p_nTimeOut;
		$this->nFlag      = $p_nFlag;
		$this->_connect();
	}
	private function _connect(){
		$this->pConnetion = stream_socket_client($this->strAddress, $this->strErrorCode, $this->strErrorMsg, $this->nTimeOut, $this->nFlag);
		if(!$this->pConnetion){
			throw new Exception("connect exception:".$this->strErrorMsg, $this->strErrorCode);
		}
		return $this->pConnetion;
	}
	public function write($p_strMessage){
		if(fwrite($this->pConnetion, $p_strMessage) !== strlen($p_strMessage))
		{
			throw new Exception('Can not send data');
		}
		return true;
	}
	public function read(){
		//接收一行,阻塞至\n结束
		//$strMessage = fgets($this->pConnetion);
		//指定长度读取
		//$strMessage = fread($this->pConnetion, 1024);
		$strMessage = stream_socket_recvfrom($this->pConnetion, 1024);
		//$strMessage = stream_get_contents($this->pConnetion);

		return $strMessage;
	}
	public function close(){
		fclose($this->pConnetion);
		$this->pConnetion = null;
	}
	public function setContext(){

	}
	public function setTimeOut($p_nTimeOut = 1){
		$bRes = stream_set_timeout($this->pConnetion, $p_nTimeOut);
	}
	public function setBlock($p_nMode = ClientStreamSocket::BLOCK){
		$bRes = stream_set_blocking($this->pConnetion, $p_nMode);
	}
	public function __destruct(){
		if($this->pConnetion){
			$this->close();
		}
	}
}

$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);

$strAddress = $strProtocol."://".$strHost.":".$nPort;

$pStream = new ClientStreamSocket($strAddress);
//$pStream->setBlock(ClientStreamSocket::NOBLOCK);
//$pStream->setTimeOut(1);
var_dump($pStream->read());
$pStream->write("hello from client\n");
var_dump($pStream->read());
$pStream->close();

使用stream_socket_*系列函数创建客户端要简单不少

  • 首先使用stream_socket_client创建一个socket操作流(stream)
  • 然后就可以像操作流式文件那样造成socket stream,使用fread和fwrite进行读写操作,也可以使用stream_socket_recvfrom和stream_socket_sendto进行操作
  • 使用fclose或stream_socket_shutdown关闭连接

使用stream_socket_*系列函数创建一个服务端来与之前的客户端进行交互,同样很简单,也与ServerSocket类似

<?php
class ServerStreamSocket{
	protected $pServer = null;
	protected $pClient = null;
	protected $strAddress = "tcp://127.0.0.1:2016";
	protected $nFlag      = STREAM_SERVER_LISTEN;
	
	const BLOCK   = 1;
	const NOBLOCK = 0;
	
	public $strErrorCode = "";
	public $strErrorMsg  = "";
	public function __construct($p_strAddress, $p_nFlag = STREAM_SERVER_LISTEN){
		$this->strAddress = $p_strAddress;
		$this->nFlag = $p_nFlag;
		$this->_create();
	}
	protected function _create(){
		$this->pServer = stream_socket_server($this->strAddress, $this->strErrorCode, $this->strErrorMsg);
		if(!$this->pServer ){
			throw new Exception("create exception:".$this->strErrorMsg, $this->strErrorCode);
		}
		return $this->pServer ;
	}
	public function accept(){
		$this->pClient = stream_socket_accept($this->pServer);
		if(!$this->pClient ){
			return false;
		}
		return $this->pClient ;
	}
	protected function _connect(){
		$this->accept();
		echo "Client". stream_socket_get_name($this->pClient, true)." is now connected to us. \n";
		$this->write("hello world from server\n");
	}
	protected function _reply(){
		$mxData = $this->read();
		var_dump($mxData);
		if($mxData == false){
			fclose($this->pClient);
				
			echo "client disconnected.\n";
			return false;
		}
		else{
			$strMessage = "Client:".trim($mxData)."\n";
			$this->write($strMessage);
			return true;
		}
	}
	public function run(){
		$this->_connect();
		$this->_reply();
	}
	public function write($p_strMessage){
		//$nLen = fwrite($this->pClient, $p_strMessage);
		$nLen = stream_socket_sendto($this->pClient, $p_strMessage);
		if($nLen !== strlen($p_strMessage))
		{
			throw new Exception('Can not send data');
		}
		return true;
	}
	public function read(){
		//接收一行,阻塞至\n结束
		//$strMessage = fgets($this->pClient);
		//指定长度读取
		//$strMessage = fread($this->pClient, 1024);
		$strMessage = stream_socket_recvfrom($this->pClient, 1024);
		//$strMessage = stream_get_contents($this->pClient);

		return $strMessage;
	}
	public function close(){
		fclose($this->pServer);
		
		$this->pServer = null;
	}
	public function setContext(){

	}
	public function setTimeOut($p_pConnetction, $p_nTimeOut = 1){
		$bRes = stream_set_timeout($p_pConnetction, $p_nTimeOut);
	}
	public function setBlock($p_pConnetction, $p_nMode = ServerStreamSocket::BLOCK){
		$bRes = stream_set_blocking($p_pConnetction, $p_nMode);
	}
	public function __destruct(){
		if($this->pServer){
			$this->close();
		}
	}
}
class SelectServerStreamSocket extends ServerStreamSocket{
	public function run(){
		$this->loop();
	}
	public function loop(){
		$arrRead = array();
		$arrWrite = $arrExp = null;
		$arrClient = array($this->pServer);
		while(true){
			$arrRead = $arrClient;
			if (stream_select($arrRead, $arrWrite, $arrExp, null) < 1){
				continue;
			}
			if(in_array($this->pServer, $arrRead)){
				$this->_connect();
	
				$arrClient[] = $this->pClient;
	
				$nKey = array_search($this->pServer, $arrRead);
				unset($arrRead[$nKey]);
			}
			foreach($arrRead as $pConnetcion){
				$bRes = $this->_reply();
				if($bRes === false){
					$nKey = array_search($this->pClient, $arrClient);
					unset($arrClient[$nKey]);;
					continue;
				}
			}
		}
		//usleep(100);
	}
}
class EvServerStreamSocket extends ServerStreamSocket{
	protected function _onConnect(){
		$this->_connect();
		
		$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
			$this->_reply();
		});
		Ev::run();
	}
	public function run(){
		$pReadWatcher = new EvIo($this->pServer, Ev::READ, function ($watcher, $revents) {
			$this->_onConnect();
		});
		Ev::run();
	}
}
class MulProcessServerStreamSocket extends EvServerStreamSocket{
	protected function _execute(){
		if(!$this->_reply()){
			//子进程执行完毕,通知父进程
			exit();
		}
	}
	protected function _onConnect(){
		$pid = pcntl_fork();
		if ($pid == -1) {
			die('could not fork');
		} else if ($pid) {
			pcntl_wait($status);
		} else {
			$this->_connect();

			$pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
				$this->_execute();
			});
			Ev::run();
		}

	}
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);

$strAddress = $strProtocol."://".$strHost.":".$nPort;

$pServer = new EvServerStreamSocket($strAddress);

$pServer->run();

这里演示客户端与服务端交互,都是两步走,先发送一个请求再获取结果。在Thrift RPC远程调用中,既可先发送请求,过一会儿再来获取结果,达到异步交互的目的;也可发送完请求后立即获取结果,达到同步请求的目的。

参考链接:
Socket Programming in PHP
Socket programming with streams in php
PHP Socket programming tutorial
php 实例说明 socket通信机制
Mpass – PHP做Socket服务的解决方案
How to forcibly close a socket in TIME_WAIT?

PHP RPC开发之Thrift

Apache Thrift是一个跨语言的服务部署框架,通过一个中间语言(IDL, 接口定义语言)来定义RPC的接口和数据类型,然后通过一个编译器生成不同语言的代码(支持C++,Java,Python,PHP, GO,Javascript,Ruby,Erlang,Perl, Haskell, C#等),并由生成的代码负责RPC协议层和传输层的实现。

在CentOS 6.5上安装Thrift

sudo yum -y update
sudo yum -y groupinstall "Development Tools"

#升级autoconf,必须2.65以上
wget http://ftp.gnu.org/gnu/autoconf/autoconf-2.69.tar.gz
tar xvf autoconf-2.69.tar.gz
cd autoconf-2.69
./configure --prefix=/usr
make
sudo make install
cd ..

#升级automake必须1.14以上
wget http://ftp.gnu.org/gnu/automake/automake-1.14.tar.gz
tar xvf automake-1.14.tar.gz
cd automake-1.14
./configure --prefix=/usr
make
sudo make install
cd ..

#升级bsion
wget http://ftp.gnu.org/gnu/bison/bison-2.5.1.tar.gz
tar xvf bison-2.5.1.tar.gz
cd bison-2.5.1
./configure --prefix=/usr
make
sudo make install
cd ..

#安装boost
wget http://sourceforge.net/projects/boost/files/boost/1.55.0/boost_1_55_0.tar.gz
tar xvf boost_1_55_0.tar.gz
cd boost_1_55_0
./bootstrap.sh
sudo ./b2 install
cd ..

#安装thrift,编译会比较久,内存最好1024M以上
git clone https://git-wip-us.apache.org/repos/asf/thrift.git
cd thrift
./bootstrap.sh
./configure
make
sudo make install
cd ..

#查看版本
thrift -version

#安装thrift_protocol扩展,仅支持二进制读写
cd thrift/lib/php/src/ext/thrift_protocol
phpize
./configure
sudo make
sudo make install
#这里不需要更改php.ini,已自动在/etc/php.d/thrift_protocol.ini里面添加
php -m | grep thrift

Thrift的PHP类库位于thrift/lib/php/lib/Thrift目录下面,Thrift对于数据传输格式、数据传输方式,服务器模型均做了定义,方便自行扩展。

数据传输格式(protocol)是定义的了传输内容,对Thrift Type的打包解包,包括

  • TBinaryProtocol,二进制格式,TBinaryProtocolAccelerated则是依赖于thrift_protocol扩展的快速打包解包。
  • TCompactProtocol,压缩格式
  • TJSONProtocol,JSON格式
  • TMultiplexedProtocol,利用前三种数据格式与支持多路复用协议的服务端(同时提供多个服务,TMultiplexedProcessor)交互

数据传输方式(transport),定义了如何发送(write)和接收(read)数据,包括

  • TBufferedTransport,缓存传输,写入数据并不立即开始传输,直到刷新缓存。
  • TSocket,使用socket传输
  • TFramedTransport,采用分块方式进行传输,具体传输实现依赖其他传输方式,比如TSocket
  • TCurlClient,使用curl与服务端交互
  • THttpClient,采用stream方式与HTTP服务端交互
  • TMemoryBuffer,使用内存方式交换数据
  • TPhpStream,使用PHP标准输入输出流进行传输
  • TNullTransport,关闭数据传输
  • TSocketPool在TSocket基础支持多个服务端管理(需要APC支持),自动剔除无效的服务器
  • TNonblockingSocket,非官方实现非阻塞socket

服务模型,定义了当PHP作为服务端如何监听端口处理请求

  • TForkingServer,采用子进程处理请求
  • TSimpleServer,在TServerSocket基础上处理请求
  • TNonblockingServer,基于libevent的非官方实现非阻塞服务端,与TNonblockingServerSocket,TNonblockingSocket配合使用

另外还定义了一些工厂,以便在Server模式下对数据传输格式和传输方式进行绑定

  • TProtocolFactory,数据传输格式工厂类,对protocol的工厂化生产,包括TBinaryProtocolFactory,TCompactProtocolFactory,TJSONProtocolFactory
  • TTransportFactory,数据传输方式工厂类,对transport的工厂化生产,作为server时,需要自行实现
  • TStringFuncFactory,字符串处理工厂类

其他文件便是异常,字符串处理,自动加载器的定义等等。

现在开始编写一个简单接IDL文件HelloWorld.thrift

namespace php Services.HelloWorld
service HelloWorld
{
    string sayHello(1:string name);
}

然后通过生成器生成PHP文件

#不指明:server不生成processor。。
thrift --gen php:server HelloWorld.thrift

生成文件在gen-php目录下面的Services/HelloWord/HelloWorld.php(目录与namesapce定义一致),这是个公共文件,服务端和客户端都需要包括它。其中客户端调用的代码(HelloWorldClient )已经生成好了

//服务端需要继承该接口
interface HelloWorldIf {
  /**
   * @param string $name
   * @return string
   */
  public function sayHello($name);
}
//提供给客户端调用的方法
class HelloWorldClient implements \Services\HelloWorld\HelloWorldIf {
  public function sayHello($name)
  {
    $this->send_sayHello($name);
    return $this->recv_sayHello();
  }
  public function send_sayHello($name)
  {
  }
  public function recv_sayHello()
  {
  }
}
//HelloWord类sayHello方法参数读取
class HelloWorld_sayHello_args {
}
//HelloWord类sayHello方法结果写入
class HelloWorld_sayHello_result {
}
//作为服务端才会生成
class HelloWorldProcessor {
}

而服务端的服务实现代码则需要继承HelloWorldIf 实现代码HelloWorldHandler.php

<?php
namespace Services\HelloWorld;

class HelloWorldHandler implements HelloWorldIf {
  public function sayHello($name)
  {
      return "Hello $name";
  }
}

编写服务端代码Server.php

<?php
namespace Services\HelloWorld;

error_reporting(E_ALL);

define('THRIFT_ROOT', __DIR__.'/../../../');
require_once  THRIFT_ROOT.'Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift',  THRIFT_ROOT);
$loader->registerDefinition('Service',  THRIFT_ROOT.'/gen-php');
$loader->register();

use Thrift\Exception\TException;
use Thrift\Factory\TBinaryProtocolFactory;
use Thrift\Factory\TBufferedTransportFactory;

use Thrift\Server\TServerSocket;
use Thrift\Server\TSimpleServer;

//use Thrift\Server\TNonblockingServerSocket;
//use Thrift\Server\TNonblockingServer;

//use Thrift\Protocol\TBinaryProtocol;
//use Thrift\Transport\TPhpStream;
//use Thrift\Transport\TBufferedTransport;


try {
	require_once 'HelloWorldHandler.php';
	$handler = new \Services\HelloWorld\HelloWorldHandler();
	$processor = new \Services\HelloWorld\HelloWorldProcessor($handler);
	
	$transportFactory = new TBufferedTransportFactory();
	$protocolFactory = new TBinaryProtocolFactory(true, true);
	
	//作为cli方式运行,监听端口,官方实现
	$transport = new TServerSocket('localhost', 9090);
	$server = new TSimpleServer($processor, $transport, $transportFactory, $transportFactory, $protocolFactory, $protocolFactory);
	$server->serve();
	
	//作为cli方式运行,非阻塞方式监听,基于libevent实现,非官方实现
	//$transport = new TNonblockingServerSocket('localhost', 9090);
	//$server = new TNonblockingServer($processor, $transport, $transportFactory, $transportFactory, $protocolFactory, $protocolFactory);
	//$server->serve();

	//客户端和服务端在同一个输入输出流上
	//使用方式
	//1) cli 方式:php Client.php | php Server.php 
	//2) cgi 方式:利用Apache或nginx监听http请求,调用php-fpm处理,将请求转换为PHP标准输入输出流
	//$transport = new TBufferedTransport(new TPhpStream(TPhpStream::MODE_R | TPhpStream::MODE_W));
	//$protocol = new TBinaryProtocol($transport, true, true);
	//$transport->open();
	//$processor->process($protocol, $protocol);
	//$transport->close();
	
} catch (TException $tx) {
	print 'TException: '.$tx->getMessage()."\n";
}

服务端创建的步骤:

  • 首先初始化服务提供者handler
  • 然后利用该handler初始化自动生成的processor
  • 初始化数据传输方式transport
  • 利用该传输方式初始化数据传输格式protocol
  • 开始服务

编写客户端代码Client.php

<?php
namespace Services\HelloWorld;

error_reporting(E_ALL);

define('THRIFT_ROOT', __DIR__.'/../../../');
require_once  THRIFT_ROOT.'Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift',  THRIFT_ROOT);
$loader->registerDefinition('Service',  THRIFT_ROOT.'/gen-php');
$loader->register();

//use Thrift\Transport\TPhpStream;

use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\TBufferedTransport;
use Thrift\Exception\TException;

try {
	//仅在与服务端处于同一输出输出流有用
	//使用方式:php Client.php | php Server.php 
	//$transport = new TBufferedTransport(new TPhpStream(TPhpStream::MODE_R | TPhpStream::MODE_W));
	
	//socket方式连接服务端
	//数据传输格式和数据传输方式与服务端一一对应
	//如果服务端以http方式提供服务,可以使用THttpClient/TCurlClient数据传输方式
	$transport = new TBufferedTransport(new TSocket('localhost', 9090));
	$protocol = new TBinaryProtocol($transport);
	$client = new \Services\HelloWorld\HelloWorldClient($protocol);

	$transport->open();
	
	//同步方式进行交互
	$recv = $client->sayHello('Courages');
	echo "\n sayHello11dd:".$recv." \n";
	
	//异步方式进行交互
	$client->send_sayHello('Us');
	echo "\n send_sayHello \n";
	$recv = $client->recv_sayHello();
	echo "\n recv_sayHello:".$recv." \n";
	
	$transport->close();
} catch (TException $tx) {
	print 'TException: '.$tx->getMessage()."\n";
}
	

客户端调用的步骤:

  • 初始化数据传输方式transport,与服务端对应
  • 利用该传输方式初始化数据传输格式protocol,与服务端对应
  • 实例化自动生成的Client对象
  • 开始调用

在终端上运行

#以cli方式运行TPhpStream 
#php Client.php | php Server.php

#先运行Server.php
#要不然会报错:TException: TSocket: Could not connect to localhost:9090 (Connection refused [111])
php Server.php

#在另外一个终端运行
php Client.ph

官方给的例子,PHP作为服务端是以web方式进行提供的,在cli方式下并不能运行。

Thrift作为一个跨语言的服务框架,方便不同语言、模块之间互相调用,解耦服务逻辑代码,拓展了PHP的处理能力(如与Hbase交互),使得WEB架构更具弹性。与基于 SOAP 消息格式的 Web Service和基于 JSON 消息格式的 RESTful 服务不同,Thrif数据传输格式默认采用二进制传格式,对 XML 和 JSON 体积更小,但对于服务端的CPU占用比JSON、XML要高。PHP虽然有thrift_protocol扩展,但仅仅作为二进制数据传输格式化使用,其他文件的加载仍然为PHP,需要更多的开销。

如果由PHP来做为Thrift的服务端,仅仅这样子做仍然是不够的,Thrift仅仅实现的数据定义和传输,未实现RPC架构

  • 需要避免重复加载各类文件,是否做成PHP扩展
  • 数据传输格式和方式是否适需要自行扩展
  • 客户端要能够自动连可使用的服务端,剔除失效的服务器
  • 服务端需要处理客户端并发情况,是否多进程/异步处理
  • 服务端需要监控服务是否正常

workerman-thrift-rpc对这些问题进行了解决,基于thrift提供了一个可靠性的RPC框架。对客户端和服务端的调用做了封装,提供统一入口,利用workerman做socket中转,当客户端发出请求时,将给socket转给服务端使用,提供服务。workerman-json-rpc与workerman-thrift-rpc类似,采用异步(分步)收发,但简单多了,更像是一种约定。数据格式,发送时仅发送class,function,parameters三个参数,接收时,仅code,msg,data三个返回值,在格式约束及跨语言上,需要自行处理;不需要thrift那样依赖于生成器所生成的文件,客户端完全独立于服务端。

注:以上示例使用修改过的代码,附上代码:thrift

参考链接:
Apache Thrift – 可伸缩的跨语言服务开发框架
Thirft框架介绍
Apache Thrift
Building Apache Thrift on CentOS 6.5
PHP Tutorial
Creating a public API with Apache Thrift
hadoop + Hbase + thrift + php 安裝設定與程式設計
php实现的thrift socket server
Our own “Hello World!”

PHP 事件驱动开发

最近在学习PHP的系统事件驱动(event-base)开发,发现PHP有好几个event扩展,根据底层库依赖分为两类:libeventlibev。libevent可以为文件描述符、信号、超时设定等事件提供了监听回调,支持poll/kqueue/event port/select/epoll。libevent 库的其他组件提供其他功能,包括缓冲的事件系统(用于缓冲发送到客户端/从客户端接收的数据)以及 HTTP、DNS 和 RPC 系统的核心实现。libev提供了各种监听器,包括子进程监听,超时设定,定时器,IO监听,信号监听,文件监视等,支持epoll/kqueue/event ports/inotify/eventfd/signalfd,更快的时钟管理,时间变化检测和修正。PHP依赖libevent扩展有libeventevent,PHP依赖libev扩展则有Evlibev

libevent在PHP事件驱动开发上应用广泛,比如workermanphpDaemonReactPHPKellner。CentOS上PHP 5.4安装libevent扩展

sudo yum install libevent-devel

wget https://pecl.php.net/get/libevent-0.1.0.tgz
tar -zxvf libevent-0.1.0.tgz
cd libevent-0.1.0
phpize 
./configure
sudo make
sudo make install

#增加libevent.so
sudo vim /etc/php.ini

#是否安装成功
php -m | grep libevent

前面介绍过使用ticks和pcntl_signal来做定时器,然而tick运行机制是PHP解释器每执行 N 条可计时的低级语句就会发生的事件,如果tick值设置小了,会产生频繁的系统调用,设置大了又不能保证及时。使用libevent来设置一个定时器

<?php
function print_dot(){
	echo ".";
}

class Timer{
	protected  $pEventBase;
	protected $pEvent;
	public $nInterval = 1;
	public function __construct(){
		$this->pEventBase = event_base_new();
	}
	public function addEvent($p_pFunc, $p_mxArgs = null){
		$this->pEvent = event_new();
		event_set($this->pEvent, 0, EV_TIMEOUT, $p_pFunc, $p_mxArgs);
		event_base_set($this->pEvent, $this->pEventBase);
	}
	public function loop(){
		event_add($this->pEvent, $this->nInterval*1000000);
		event_base_loop($this->pEventBase);
	}
}

$pTimer = new Timer();
$pTimer->addEvent("print_dot");
while(1){
	$pTimer->loop();
}

libevent使用也很简单:

  • 使用event_base_new和event_new分别创建event_base和event
  • 使用event_set为event设置要监听文件描述符fd,比如文件、socke、信号,超时则fd为0,事件类型和回调函数
  • 使用event_base_set关联event_base和event
  • 使用event_add将设置好的event加入事件监听器
  • 调用event_base_loop开始处理事件

官网上有个例子用来做socket监听处理

<?php
$socket = stream_socket_server ('tcp://0.0.0.0:2000', $errno, $errstr);
stream_set_blocking($socket, 0);
$base = event_base_new();
$event = event_new();
event_set($event, $socket, EV_READ | EV_PERSIST, 'ev_accept', $base);
event_base_set($event, $base);
event_add($event);
event_base_loop($base);

$GLOBALS['connections'] = array();
$GLOBALS['buffers'] = array();

function ev_accept($socket, $flag, $base) {
    static $id = 0;
    
    $connection = stream_socket_accept($socket);
    stream_set_blocking($connection, 0);
    
    $id += 1;
    
    $buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $id);
    event_buffer_base_set($buffer, $base);
    event_buffer_timeout_set($buffer, 30, 30);
    event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
    event_buffer_priority_set($buffer, 10);
    event_buffer_enable($buffer, EV_READ | EV_PERSIST);
    
    // we need to save both buffer and connection outside
    $GLOBALS['connections'][$id] = $connection;
    $GLOBALS['buffers'][$id] = $buffer;
}

function ev_error($buffer, $error, $id) {
    event_buffer_disable($GLOBALS['buffers'][$id], EV_READ | EV_WRITE);
    event_buffer_free($GLOBALS['buffers'][$id]);
    fclose($GLOBALS['connections'][$id]);
    unset($GLOBALS['buffers'][$id], $GLOBALS['connections'][$id]);
}

function ev_read($buffer, $id) {
    while ($read = event_buffer_read($buffer, 256)) {
        var_dump($read);
    }
}

相比libevent,event扩展提供了面向对象的方法,支持libevent 2+ 的特性,对HTTP,DNS,OpenSSL等协议操作进行封装。Kellner框架比较有意思,在PHP的libevent扩展基础上将http请求处理封装成了扩展,使用cli模式处理http请求,并给出了基于Zend Framework 2的示例。

libev自称libevent的替代者,克服了libevent的一些不利影响,开销更小,Node JS便是利用它来做事件驱动。相比基于libeventd的扩展,基于libev的ev扩展更新比较积极,支持设置各种的监听器,为感兴趣的事件注册回调,比如文件变化,超时。CentOS上PHP 5.4安装ev扩展

wget https://pecl.php.net/get/ev-0.2.15.tgz
tar -zxvf ev-0.2.15
cd ev-0.2.15
phpize 
./configure
sudo make
sudo make install

#增加ev.so
sudo vim /etc/php.ini

#是否安装成功
php -m | grep ev

libev封装了各种监视器,操作也比较简单。

<?php
/**
 * 延迟1秒后执行,不重复
 */
$pDelay = new EvTimer(1, 0, function () {
	echo "1 delay \n";
});
/**
 * 每隔一秒执行一次的定时器,0秒后执行
 */
$pTimer = new EvTimer(0, 1, function () {
	echo "1 seconds \n";
});
/**
 * 如果没有其他更高等级的监视器,那么就执行EvIdle,处于低优先级则不执行
 */
$pIdle = new EvIdle(function(){
	sleep(1);
	echo "idle timer \n";
},0,2);
/**
 * 每一次loop开始都会执行
 */
$pPrepare = new EvPrepare(function(){
	echo "before timer \n";
},0);
/**
 * 每一次loop都会执行,可以通过优先级调整执行顺序
 */
$c = new EvCheck(function(){
	echo "after timer \n";
},0,-1);
/**
 * 定时器,每隔1.5秒后执行一次,0秒后开始
 */
$pPeriod = new EvPeriodic(0., 1.5, NULL, function ($w, $revents) {
	echo time(), PHP_EOL;
});
/**
 * IO输入事件监听,可以拿去监听socket的Ev::WRITE和Ev::READ事件
 */
$pReadWatcher = new EvIo(STDIN, Ev::READ, function ($watcher, $revents) {
	echo "STDIN is readable\n";
});

/**
 * 注册监听感兴趣的信号
 */
$pSignal = new EvSignal(SIGTERM, function ($watcher) {
	echo "SIGTERM received\n";
	$watcher->stop();
});
/**
 * 文件变化监听器,10秒监测一次
 */
$pStatWatcher = new EvStat("/var/log/messages", 10, function ($w) {
	echo "/var/log/messages changed\n";
	
	$attr = $pStatWatcher->attr();
	
	if ($attr['nlink']) {
		printf("Current size: %ld\n", $attr['size']);
		printf("Current atime: %ld\n", $attr['atime']);
		printf("Current mtime: %ld\n", $attr['mtime']);
	} else {
		fprintf(STDERR, "`messages` file is not there!");
		$pStatWatcher->stop();
	}
});
	
/**
 * 开始执行Ev::RUN_ONCE则立即执行Ev::RUN_NOWAIT则非阻塞执行
 */
Ev::run();

也可以监听子进程

$pid = pcntl_fork();

if ($pid == -1) {
    fprintf(STDERR, "pcntl_fork failed\n");
} elseif ($pid) {
    $w = new EvChild($pid, FALSE, function ($w, $revents) {
        $w->stop();

        printf("Process %d exited with status %d\n", $w->rpid, $w->rstatus);
    });

    Ev::run();

    // Protect against Zombies
    pcntl_wait($status);
} else {
    //Forked child
    exit(2);
}

php的libev扩展也实现了libev的所有监视器,提供类似的用法,但比较久没更新了。

在网络编程中,使用事件驱动模型监听感兴趣的事件,结合异步处理,能够大大提高服务器性能。传统服务器模型如Apache为每一个请求生成一个子进程。当用户连接到服务器的一个子进程就产生,并处理连接。每个连接获得一个单独的线程和子进程。当用户请求数据返回时,子进程开始等待数据库操作返回。如果此时另一个用户也请求返回数据,这时就产生了阻塞。以下引用自《使用事件驱动模型实现高效稳定的网络服务器程序》

简单网络编程模型里面,服务器与客户端都是一应一答,大部分的 socket 接口都是阻塞型的。在面对多个客户端的请求时候,最简单的解决方式是在服务器端使用多线程(或多进程)。如果要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而线程与进程本身也更容易进入假死状态。
于是便有了“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。
但是,“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用 IO 接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。对付可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。

于是便有了基于事件驱动的非阻塞型服务器,比如Nginx,Node.js。Nginx采用事件驱动,使用epoll事件模型,充分使用异步逻辑,削减了上下文调度开销,并发服务能力更强。Node.js 的异步机制是基于事件的,所有的磁盘 I/O、网络通信、数据库查询都以非阻塞的方式请求,返回的结果由事件循环来处理。Node.js 在执行的过程中会维护一个事件队列,程序在执行时进入事件循环等待下一个事件到来,每个异步式 I/O 请求完成后会被推送到事件队列,等待程序进程进行处理。

参考链接:
libev – a high performance full-featured event loop written in C
Working with events
使用 libevent 和 libev 提高网络应用性能
为什么事件驱动服务器这么火
Asynchronous PHP and Real-time Messaging
react.php 中的异步实现

PHP ZooKeeper分布式应用开发

ZooKeeper是一个中心化服务,用于分布式应用下的配置同步和协调,提供统一配置服务,统一命名服务,分布式同步,集群管理等。Zookeeper 从设计模式角度来看,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。ZooKeeper的应用场景包括:统一命名服务;配置管理;集群管理;队列管理等。

ZooKeeper作为一个Java应用程序,有大神开发了PHP的扩展:php-zookeeper。利用ZooKeeper,我们可以让分布式的PHP应用程序协调产生leader,为woker分配任务,当leader崩溃时,自动选举产生leader;也可以作分布式的锁和队列。

ZooKeeper本身是一个集群,至少需要表示3台,只要超过半数节点正常就可以工作,避免单点故障。首先需要安装JDK环境

yum search java | grep 'java-'
sudo yum install java-1.8.0-openjdk-devel

然后安装ZooKeeper,从官网下载

tar zxfv zookeeper-3.4.6.tar.gz
cd zookeeper-3.4.6/src/c
./configure --prefix=/usr/
make
sudo make install

#创建libzookeeper.conf,内容为/usr/lib,以便编译扩展使用
sudo vim /etc/ld.so.conf.d/libzookeeper.conf
#使配置生效
sudo ldconfig

然后安装PHP的扩展

cd
git clone https://github.com/andreiz/php-zookeeper.git
cd php-zookeeper
phpize
./configure
make
sudo make install

更改php.ini配置,增加以下内容

[zookeeper]
extension = zookeeper.so

查看是否加载成功

php -m | grep zookeeper

更改ZooKeeper配置,可以改变里面的DataDir熟悉,默认在/tmp下面

cp conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg

然后终端A里面运行ZooKeeper,通过shell进行交互

cd zookeeper-3.4.6/bin
./zkServer.sh start
./zkCli.sh -server 127.0.0.1:2181
create /test hello
;Created /test
ls /
;[test, zookeeper]

这时便已成功连到了ZooKeeper,并创建了一个名为“/test”的znode。ZooKeeper以树形结构保存数据。这很类似于文件系统,但“文件夹”又和文件很像。znode是ZooKeeper保存的实体。

新建一个PHP脚本来测试一下

<?php
 
class ZookeeperDemo extends Zookeeper {
 
  public function watcher( $i, $type, $key ) {
    echo "Insider Watcher\n";
 
    // Watcher gets consumed so we need to set a new one
    $this->get( '/test', array($this, 'watcher' ) );
  }
 
}
 
$zoo = new ZookeeperDemo('127.0.0.1:2181');
$zoo->get( '/test', array($zoo, 'watcher' ) );
 
while( true ) {
  echo '.';
  sleep(2);
}

在新的终端B里面运行这个脚本

$ php zookeeperdemo1.php

返回刚才的那个终端A里面,改变节点“/test”存储的数据

set /test world

这时候在终端B里面变化打印“Insider Watcher”。注意:这里注册的回到函数仅支持对象的方法,不支持普通的函数。

前面说过,ZooKeeper是一个基于观察者模式设计的分布式服务管理框架。Zookeeper提供了绑定在znode上的监听器,一旦监听到znode数据发生变化,便会通知所有注册的客户端。所以也可以应用于发布订阅模式。

这篇文章还举例,如何让多个PHP脚本自动选举leader,分配工作。

<?php
 
class Worker extends Zookeeper {
 
  const CONTAINER = '/cluster';
 
  protected $acl = array(
                    array(
                      'perms' => Zookeeper::PERM_ALL,
                      'scheme' => 'world',
                      'id' => 'anyone' ) );
 
  private $isLeader = false;
 
  private $znode;
 
  public function __construct( $host = '', $watcher_cb = null, $recv_timeout = 10000 ) {
    parent::__construct( $host, $watcher_cb, $recv_timeout );
  }
 
  public function register() {
    if( ! $this->exists( self::CONTAINER ) ) {
      $this->create( self::CONTAINER, null, $this->acl );
    }
 
    $this->znode = $this->create( self::CONTAINER . '/w-',
                                  null,
                                  $this->acl,
                                  Zookeeper::<span class="KSFIND_CLASS" id="0KSFindDIV">EPHEMER</span>AL | Zookeeper::SEQUENCE );
 
    $this->znode = str_replace( self::CONTAINER .'/', '', $this->znode );
 
    printf( "I'm registred as: %s\n", $this->znode );
 
    $watching = $this->watchPrevious();
 
    if( $watching == $this->znode ) {
      printf( "Nobody here, I'm the leader\n" );
      $this->setLeader( true );
    }
    else {
      printf( "I'm watching %s\n", $watching );
    }
  }
 
  public function watchPrevious() {
    $workers = $this->getChildren( self::CONTAINER );
    sort( $workers );
    $size = sizeof( $workers );
    for( $i = 0 ; $i < $size ; $i++ ) {
      if( $this->znode == $workers[ $i ] ) {
        if( $i > 0 ) {
          $this->get( self::CONTAINER . '/' . $workers[ $i - 1 ], array( $this, 'watchNode' ) );
          return $workers[ $i - 1 ];
        }
 
        return $workers[ $i ];
      }
    }
 
    throw new Exception(  sprintf( "Something went very wrong! I can't find myself: %s/%s",
                          self::CONTAINER,
                          $this->znode ) );
  }
 
  public function watchNode( $i, $type, $name ) {
    $watching = $this->watchPrevious();
    if( $watching == $this->znode ) {
      printf( "I'm the new leader!\n" );
      $this->setLeader( true );
    }
    else {
      printf( "Now I'm watching %s\n", $watching );
    }
  }
 
  public function isLeader() {
    return $this->isLeader;
  }
 
  public function setLeader($flag) {
    $this->isLeader = $flag;
  }
 
  public function run() {
    $this->register();
 
    while( true ) {
      if( $this->isLeader() ) {
        $this->doLeaderJob();
    }
    else {
      $this->doWorkerJob();
    }
 
      sleep( 2 );
    }
  }
 
  public function doLeaderJob() {
    echo "Leading\n";
  }
 
  public function doWorkerJob() {
    echo "Working\n";
  }
 
}
 
$worker = new Worker( '127.0.0.1:2181' );
$worker->run();

打开多个终端运行这个脚本。使用Ctrl+c或其他方法退出第一个脚本。刚开始不会有任何变化,worker可以继续工作。后来,ZooKeeper会发现超时,并选举出新的leader。

除此之外,利用这个扩展还可以实现一下其他的应用场景,比如排他锁和共享锁:php-zookeeper-recipes

参考链接:
Distributed application in PHP with Apache Zookeeper
分布式服务框架 Zookeeper — 管理分布式环境中的数据
使用Apache Zookeeper分布式部署PHP应用程序
分布式服务框架:Zookeeper

PHP 进程间通信

上一篇介绍了PHP的多进程开发,进程间通过信号进行交互。这里介绍一下PHP的进程间通信(IPC)方法,包括基于System V IPC通信,如信号量、进程间消息、共享内存和基于socket的IPC通信。

信号量主要作为不同进程以及同一进程不同线程之间的同步手段。信号量是一个特殊的变量,程序对其访问都是原子操作,且只允许对它进行等待和发送信息操作。如果信号量是一个任意的整数,通常被称为计数信号量,或一般信号量;如果信号量只有二进制的0或1,称为二进制信号量。在linux系中,二进制信号量又称Mutex,互斥锁。以下例子采用信号量来协调进程对资源的访问

<?php
$key = ftok ( __FILE__, 's' );
// 同时最多只能有一个进程进入临界区
$sem_id = sem_get ( $key, 1 );
echo "This is a room,can only stay one people!\n\r";
// 派生子进程
$pid = pcntl_fork ();
if ($pid == - 1) {
	exit ( 'fork failed!' );
} else if ($pid > 0) {
	$name = 'parent';
} else {
	$name = 'child';
}
echo "{$name} want to enter the room \n";
sem_acquire ( $sem_id );
// 原子操作开始
echo "{$name} in the room , other people can't enter!\n";
sleep ( 3 );
echo "{$name} leave the room\n";
// 原子操作结束
sem_release ( $sem_id );
if ($pid > 0) {
	pcntl_waitpid ( $pid, $status );
	sem_remove ( $sem_id ); // 移除信号量
}

sem_get和sem_remove分别为创建和销毁信号量。当前进程(父进程)通过sem_acquire获取到信号量后其他进程(子进程)将会一直阻塞直到获取到信号量;在sem_acquire和sem_release之间操作都将是原子性的;当前进程通过sem_release释放所请求的信号量,其他进程便使用,从而实现对资源的有序访问。sem_acquire是阻塞操作,即之后的程序都需要等待获取到信号量后才能继续执行。使用多个信号量控制,需要注意是否会造成死锁。

消息队列提供了一种从一个进程向另一个进程异步发送一个数据块的方法,消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。以下是PHP子进程使用消息队列与父进程进行通信

// 生成key
$message_queue_key = ftok ( __FILE__, 'a' );
// 根据生成的key新建队列,也可自定,如123456
$message_queue = msg_get_queue ( $message_queue_key, 0666 );

$pids = array ();
for($i = 0; $i < 5; $i ++) {
	// 创建子进程
	$pids [$i] = pcntl_fork ();
	
	if ($pids [$i]) {
		echo "No.$i child process was created, the pid is $pids[$i]\r\n";
		pcntl_wait ( $status ); // 非阻塞的线程等待,防止僵尸进程的出现
	} elseif ($pids [$i] == 0) {
		$pid = posix_getpid ();
		echo "process.$pid is writing now\r\n";
		// 写队列
		msg_send ( $message_queue, 1, "this is process.$pid's data\r\n" );
		posix_kill ( $pid, SIGTERM );
	}
}

do {
	// 读队列
	msg_receive ( $message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT );
	echo $message;
	// 获取队列内消息数
	$a = msg_stat_queue ( $message_queue );
	if ($a ['msg_qnum'] == 0) {
		break;
	}
} while ( true );

消息队列存在消息大小及队列长度的限制,一旦超过将写不进去。

共享内存使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。一个进程在内存创建了一个共享区域,其他进程也可以对这块内存区域进行访问

<?php
//Check the command line arguments
if(sizeof($argv) < 2) {
     echo  "Usage: php shared_memory.php <send|get|delete> <integer identifier> <value>\n";
     exit;
}
 
//Define shared memory segment properties.
$key = "987654";
$permissions = 0666;
$size = 1024;
 
//Create or open the shared memory segment.
$segment = shm_attach($key, $size, $permissions);
 
//Handle operations for the segment.
switch($argv[1]) {
     case "send":
          shm_put_var($segment, $argv[2], $argv[3]);
          echo "Message sent to shared memory segment.\n";
          break;
     case "get":
          $data = shm_get_var($segment, $argv[2]);
          echo "Received data: {$data}\n";
          break;
     case "delete":
          shm_remove($segment);
          echo "Shared memory segment released.\n";
          break;
}

共享内存并未提供同步机制,往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。PHP还有另外一个共享内存扩展:shmop。注意,共享内存存在内存限制。

Sockets IPC提供了进程间双向的点对点通信。通过socket_create_pair创建一对socket,作为上行和下行,父进程和子进程分别使用其中一个进行读写通信

<?php
$sockets = array();
$strone = 'Message From Parent.';
$strtwo = 'Message From Child.';

if (socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $sockets) === false) {
    echo "socket_create_pair() failed. Reason: ".socket_strerror(socket_last_error());
}
$pid = pcntl_fork();
if ($pid == -1) {
    echo 'Could not fork Process.';
} elseif ($pid) {
    /*parent*/
    socket_close($sockets[0]);
    if (socket_write($sockets[1], $strone, strlen($strone)) === false) {
        echo "socket_write() failed. Reason: ".socket_strerror(socket_last_error($sockets[1]));
    }
    if (socket_read($sockets[1], strlen($strtwo), PHP_BINARY_READ) == $strtwo) {
        echo "Recieved $strtwo\n";
    }
    socket_close($sockets[1]);
} else {
    /*child*/
    socket_close($sockets[1]);
    if (socket_write($sockets[0], $strtwo, strlen($strtwo)) === false) {
        echo "socket_write() failed. Reason: ".socket_strerror(socket_last_error($sockets[0]));
    }
    if (socket_read($sockets[0], strlen($strone), PHP_BINARY_READ) == $strone) {
        echo "Recieved $strone\n";
    }
    socket_close($sockets[0]);
}

以下例子是github上一个利用PHP Sockets IP开发的多进程任务处理

<?php
namespace Lifo\IPC;

declare(ticks = 1);

interface ProcessInterface
{
	public function run($parent);
}

class ProcessPoolException extends \Exception
{
	
}

class ProcessPool
{
    /** @var Integer Maximum workers allowed at once */
    protected $max;
    /** @var boolean If true workers will fork. Otherwise they will run synchronously */
    protected $fork;
    /** @var Integer Total results collected */
    protected $count;
    /** @var array Pending processes that have not been started yet */
    protected $pending;
    /** @var array Processes that have been started */
    protected $workers;
    /** @var array Results that have been collected */
    protected $results;
    /** @var \Closure Function to call every time a child is forked */
    protected $createCallback;
    /** @var array children PID's that died prematurely */
    private   $caught;
    /** @var boolean Is the signal handler initialized? */
    private   $initialized;
    private static $instance = array();
    public function __construct($max = 1, $fork = true)
    {
        //$pid = getmypid();
        //if (isset(self::$instance[$pid])) {
        //    $caller = debug_backtrace();
        //    throw new ProcessPoolException("Cannot instantiate more than 1 ProcessPool in the same process in {$caller[0]['file']} line {$caller[0]['line']}");
        //}
        //self::$instance[$pid] = $this;
        $this->count = 0;
        $this->max = $max;
        $this->fork = $fork;
        $this->results = array();
        $this->workers = array();
        $this->pending = array();
        $this->caught = array();
        $this->initialized = false;
    }
    public function __destruct()
    {
        // make sure signal handler is removed
        $this->uninit();
        //unset(self::$instance[getmygid()]);
    }
    /**
     * Initialize the signal handler.
     *
     * Note: This will replace any current handler for SIGCHLD.
     *
     * @param boolean $force Force initialization even if already initialized
     */
    private function init($force = false)
    {
        if ($this->initialized and !$force) {
            return;
        }
        $this->initialized = true;
        pcntl_signal(SIGCHLD, array($this, 'signalHandler'));
    }
    private function uninit()
    {
        if (!$this->initialized) {
            return;
        }
        $this->initialized = false;
        pcntl_signal(SIGCHLD, SIG_DFL);
    }
    public function signalHandler($signo)
    {
        switch ($signo) {
            case SIGCHLD:
                $this->reaper();
                break;
        }
    }
    /**
     * Reap any dead children
     */
    public function reaper($pid = null, $status = null)
    {
        if ($pid === null) {
            $pid = pcntl_waitpid(-1, $status, WNOHANG);
        }
        while ($pid > 0) {
            if (isset($this->workers[$pid])) {
                // @todo does the socket really need to be closed?
                //@socket_close($this->workers[$pid]['socket']);
                unset($this->workers[$pid]);
            } else {
                // the child died before the parent could initialize the $worker
                // queue. So we track it temporarily so we can handle it in
                // self::create().
                $this->caught[$pid] = $status;
            }
            $pid = pcntl_waitpid(-1, $status, WNOHANG);
        }
    }
    /**
     * Wait for any child to be ready
     *
     * @param integer $timeout Timeout to wait (fractional seconds)
     * @return array|null Returns array of sockets ready to be READ or null
     */
    public function wait($timeout = null)
    {
        $x = null;                      // trash var needed for socket_select
        $startTime = microtime(true);
        while (true) {
            $this->apply();                         // maintain worker queue
            // check each child socket pair for a new result
            $read = array_map(function($w){ return $w['socket']; }, $this->workers);
            // it's possible for no workers/sockets to be present due to REAPING
            if (!empty($read)) {
                $ok = @socket_select($read, $x, $x, $timeout);
                if ($ok !== false and $ok > 0) {
                    return $read;
                }
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                return null;
            }
            // no sense in waiting if we have no workers and no more pending
            if (empty($this->workers) and empty($this->pending)) {
                return null;
            }
        }
    }
    /**
     * Return the next available result.
     *
     * Blocks unless a $timeout is specified.
     *
     * @param integer $timeout Timeout in fractional seconds if no results are available.
     * @return mixed Returns next child response or null on timeout
     * @throws ProcessPoolException On timeout if $nullOnTimeout is false
     */
    public function get($timeout = null, $nullOnTimeout = false)
    {
        $startTime = microtime(true);
        while ($this->getPending()) {
            // return the next result
            if ($this->hasResult()) {
                return $this->getResult();
            }
            // wait for the next result
            $ready = $this->wait($timeout);
            if (is_array($ready)) {
                foreach ($ready as $socket) {
                    $res = self::socket_fetch($socket);
                    if ($res !== null) {
                        $this->results[] = $res;
                        $this->count++;
                    }
                }
                if ($this->hasResult()) {
                    return $this->getResult();
                }
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                if ($nullOnTimeout) {
                    return null;
                }
                throw new ProcessPoolException("Timeout");
            }
        }
    }
    /**
     * Return results from all workers.
     *
     * Does not return until all pending workers are complete or the $timeout
     * is reached.
     *
     * @param integer $timeout Timeout in fractional seconds if no results are available.
     * @return array Returns an array of results
     * @throws ProcessPoolException On timeout if $nullOnTimeout is false
     */
    public function getAll($timeout = null, $nullOnTimeout = false)
    {
        $results = array();
        $startTime = microtime(true);
        while ($this->getPending()) {
            try {
                $res = $this->get($timeout);
                if ($res !== null) {
                    $results[] = $res;
                }
            } catch (ProcessPoolException $e) {
                // timed out
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                if ($nullOnTimeout) {
                    return null;
                }
                throw new ProcessPoolException("Timeout");
            }
        }
        return $results;
    }
    public function hasResult()
    {
        return !empty($this->results);
    }
    /**
     * Return the next available result or null if none are available.
     *
     * This does not wait or manage the worker queue.
     */
    public function getResult()
    {
        if (empty($this->results)) {
            return null;
        }
        return array_shift($this->results);
    }
    /**
     * Apply a worker to the working or pending queue
     *
     * @param Callable $func Callback function to fork into.
     * @return ProcessPool
     */
    public function apply($func = null)
    {
        // add new function to pending queue
        if ($func !== null) {
            if ($func instanceof \Closure or $func instanceof ProcessInterface or is_callable($func)) {
                $this->pending[] = func_get_args();
            } else {
                throw new \UnexpectedValueException("Parameter 1 in ProcessPool#apply must be a Closure or callable");
            }
        }
        // start a new worker if our current worker queue is low
        if (!empty($this->pending) and count($this->workers) < $this->max) {
            call_user_func_array(array($this, 'create'), array_shift($this->pending));
        }
        return $this;
    }
    /**
     * Create a new worker.
     *
     * If forking is disabled this will BLOCK.
     *
     * @param Closure $func Callback function.
     * @param mixed Any extra parameters are passed to the callback function.
     * @throws \RuntimeException if the child can not be forked.
     */
    protected function create($func /*, ...*/)
    {
        // create a socket pair before forking so our child process can write to the PARENT.
        $sockets = array();
        $domain = strtoupper(substr(PHP_OS, 0, 3)) == 'WIN' ? AF_INET : AF_UNIX;
        if (socket_create_pair($domain, SOCK_STREAM, 0, $sockets) === false) {
            throw new \RuntimeException("socket_create_pair failed: " . socket_strerror(socket_last_error()));
        }
        list($child, $parent) = $sockets; // just to make the code below more readable
        unset($sockets);
        $args = array_merge(array($parent), array_slice(func_get_args(), 1));
        $this->init();                  // make sure signal handler is installed
        if ($this->fork) {
            $pid = pcntl_fork();
            if ($pid == -1) {
                throw new \RuntimeException("Could not fork");
            }
            if ($pid > 0) {
                // PARENT PROCESS; Just track the child and return
                socket_close($parent);
                $this->workers[$pid] = array(
                    'pid' => $pid,
                    'socket' => $child,
                );
                // don't pass $parent to callback
                $this->doOnCreate(array_slice($args, 1));
                // If a SIGCHLD was already caught at this point we need to
                // manually handle it to avoid a defunct process.
                if (isset($this->caught[$pid])) {
                    $this->reaper($pid, $this->caught[$pid]);
                    unset($this->caught[$pid]);
                }
            } else {
                // CHILD PROCESS; execute the callback function and wait for response
                socket_close($child);
                try {
                    if ($func instanceof ProcessInterface) {
                        $result = call_user_func_array(array($func, 'run'), $args);
                    } else {
                        $result = call_user_func_array($func, $args);
                    }
                    if ($result !== null) {
                        self::socket_send($parent, $result);
                    }
                } catch (\Exception $e) {
                    // this is kind of useless in a forking context but at
                    // least the developer can see the exception if it occurs.
                    throw $e;
                }
                exit(0);
            }
        } else {
            // forking is disabled so we simply run the child worker and wait
            // synchronously for response.
            try {
                if ($func instanceof ProcessInterface) {
                    $result = call_user_func_array(array($func, 'run'), $args);
                } else {
                    $result = call_user_func_array($func, $args);
                }
                if ($result !== null) {
                    //$this->results[] = $result;
                    self::socket_send($parent, $result);
                }
                // read anything pending from the worker if they chose to write
                // to the socket instead of just returning a value.
                $x = null;
                do {
                    $read = array($child);
                    $ok = socket_select($read, $x, $x, 0);
                    if ($ok !== false and $ok > 0) {
                        $res = self::socket_fetch($read[0]);
                        if ($res !== null) {
                            $this->results[] = $res;
                        }
                    }
                } while ($ok);
            } catch (\Exception $e) {
                // nop; we didn't fork so let the caller handle it
                throw $e;
            }
        }
    }
    /**
     * Clear all pending workers from the queue.
     */
    public function clear()
    {
        $this->pending = array();
        return $this;
    }
    /**
     * Send a SIGTERM (or other) signal to the PID given
     */
    public function kill($pid, $signo = SIGTERM)
    {
        posix_kill($pid, $signo);
        return $this;
    }
    /**
     * Send a SIGTERM (or other) signal to all current workers
     */
    public function killAll($signo = SIGTERM)
    {
        foreach ($this->workers as $w) {
            $this->kill($w['pid'], $signo);
        }
        return $this;
    }
    /**
     * Set a callback when a new forked process is created. This will allow the
     * parent to perform some sort of cleanup after every child is created.
     *
     * This is useful to reinitialize certain resources like DB connections
     * since children will inherit the parent resources.
     *
     * @param \Closure $callback Function to callback after every forked child.
     */
    public function setOnCreate(\Closure $callback = null)
    {
        $this->createCallback = $callback;
    }
    protected function doOnCreate($args = array())
    {
        if ($this->createCallback) {
            call_user_func_array($this->createCallback, $args);
        }
    }
    /**
     * Return the total jobs that have NOT completed yet.
     */
    public function getPending($pendingOnly = false)
    {
        if ($pendingOnly) {
            return count($this->pending);
        }
        return count($this->pending) + count($this->workers) + count($this->results);
    }
    public function getWorkers()
    {
        return count($this->workers);
    }
    public function getActive()
    {
        return count($this->pending) + count($this->workers);
    }
    public function getCompleted()
    {
        return $this->count;
    }
    public function setForking($fork)
    {
        $this->fork = $fork;
        return $this;
    }
    public function setMax($max)
    {
        if (!is_numeric($max) or $max < 1) {
            throw new \InvalidArgumentException("Max value must be > 0");
        }
        $this->max = $max;
        return $this;
    }
    public function getMax()
    {
        return $this->max;
    }
    /**
     * Write the data to the socket in a predetermined format
     */
    public static function socket_send($socket, $data)
    {
        $serialized = serialize($data);
        $hdr = pack('N', strlen($serialized));    // 4 byte length
        $buffer = $hdr . $serialized;
        $total = strlen($buffer);
        while (true) {
            $sent = socket_write($socket, $buffer);
            if ($sent === false) {
                // @todo handle error?
                //$error = socket_strerror(socket_last_error());
                break;
            }
            if ($sent >= $total) {
                break;
            }
            $total -= $sent;
            $buffer = substr($buffer, $sent);
        }
    }
    /**
     * Read a data packet from the socket in a predetermined format.
     *
     * Blocking.
     *
     */
    public static function socket_fetch($socket)
    {
        // read 4 byte length first
        $hdr = '';
        do {
            $read = socket_read($socket, 4 - strlen($hdr));
            if ($read === false or $read === '') {
                return null;
            }
            $hdr .= $read;
        } while (strlen($hdr) < 4);
        list($len) = array_values(unpack("N", $hdr));
        // read the full buffer
        $buffer = '';
        do {
            $read = socket_read($socket, $len - strlen($buffer));
            if ($read === false or $read == '') {
                return null;
            }
            $buffer .= $read;
        } while (strlen($buffer) < $len);
        $data = unserialize($buffer);
        return $data;
    }
}


$pool = new ProcessPool(16);
for ($i=0; $i<100; $i++) {
	$pool->apply(function($parent) use ($i) {
		echo "$i running...\n";
		mt_srand(); // must re-seed for each child
		$rand = mt_rand(1000000, 2000000);
		usleep($rand);
		return $i . ' : slept for ' . ($rand / 1000000) . ' seconds';
        });
}
while ($pool->getPending()) {
	try {
		$result = $pool->get(1);    // timeout in 1 second
		echo "GOT: ", $result, "\n";
	} catch (ProcessPoolException $e) {
			// timeout
	}
}

当前进程(父进程)添加任务时交由其他进程(子进程)处理,不阻塞当前进程;其他进程运行结束后,通过socket返回结果给父进程。

当然进程间通信也可以通过文件(FIFO)或者类似的中介角色如异步消息队列,Mysql,Redis等等进行交互。

参考链接:
Semaphore, Shared Memory and IPC
深刻理解Linux进程间通信(IPC)
PHP IPC with Daemon Service using Message Queues, Shared Memory and Semaphores
关于PHP你可能不知道的-PHP的事件驱动化设计
Store datasets directly in shared memory with PHP
PHP Dark Arts: Shared Memory Segments (IPC)
Something Like Threading – PHP Process Forking and Interprocess Communication
Mimicking Threading in PHP
基于System V Message queue的PHP消息队列封装
PHP进程间通信System V消息队列
PHP进程间通信System V信号量
we Do web sockets on PHP from null. A part 2. IPC
proc_open

PHP 进程控制PCNTL

PHP的进程控制PCNTL支持实现了Unix方式的进程创建, 程序执行,信号处理以及进程的中断。 PCNTL只支持linux平台下cli模式,不支持Windows平台,也不能被应用在Web服务器环境(cgi等),当其被用于Web服务环境时可能会带来意外的结果。通常,PCNTL会结合另外一个扩展来使用POSIX来开发(也不支持Windows平台)。

pcntl_fork可以创建一个子进程,父进程和子进程 都从fork的位置开始向下继续执行。创建成功时,父进程得到的返回值是子进程号而子进程得到的返回值是0;创建失败时,父进程得到返回值是-1,不会创建子进程,并触发一个PHP错误。

<?php

$pid = pcntl_fork();
//父进程和子进程都会执行下面代码
if ($pid == -1) {
    //错误处理:创建子进程失败时返回-1.
     die('could not fork');
} else if ($pid) {
     //父进程会得到子进程号,所以这里是父进程执行的逻辑
     pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
} else {
     //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
}

在对应的父进程结束执行后,子进程就会变成孤儿进程,但之后会立即由init进程(进程ID为1)“收养”为其子进程。

某一子进程终止执行后,若其父进程未提前调用wait,则内核会持续保留子进程的退出状态等信息,以使父进程可以wait获取之[2] 。而因为在这种情况下,子进程虽已终止,但仍在消耗系统资源,所以其亦称僵尸进程。wait常于SIGCHLD信号的处理函数中调用。

为避免产生僵尸进程,一般采取的方式是:将父进程中对SIGCHLD信号的处理函数设为SIG_IGN(忽略信号);fork两次并杀死一级子进程,令二级子进程成为孤儿进程而被init所“收养”、清理。

采用二次创建子进程的方式

<?php
	$pid = pcntl_fork();
	if($pid) {
		//创建成功,在父进程中执行
		echo "run in parent process";//pcntl_wait($status);
	} else if($pid == -1) {
		//创建失败,在父进程中处理
		echo "Couldn't create child process.";
	} else {
		//创建成功,在子进程中执行
		//再次创建子进程,即孙进程
		$pid = pcntl_fork();
		if($pid == 0) {
			//在孙进程中执行
			if(-1 == posix_setsid())
	        {
	            // 出错退出
	            exit("Setsid fail");
	        }
			echo "run in grandchild process";
		} else if($pid == -1) {
			echo "Couldn’t create child process.";
		} else {
			//在子进程中处理
			echo "run in child process.";//posix_kill(posix_getpid(), SIGUSR1);
			exit;
		}
	}

通常还会把子进程的pid收集以来,以便监控、回收,如workerman。二次创建子进程通常应用在PHP多进程,守护进程上,比如

<?php
defined('DEAMON_LOCK_FILE') ||
define('DEAMON_LOCK_FILE', 'run/deamon.pid');

if($_SERVER['argc'] >= 2 && $_SERVER['argv'][1] == 'kill')
{
	$fh = fopen(realpath(__DIR__) . '/' . DEAMON_LOCK_FILE, 'r');
	$pid = fread($fh, 8);

	if( $pid )
		posix_kill($pid, SIGTERM);

	exit;
}

global $DEAMON_LOCK_HANDLER;

function daemonize($signalHandler = false ) {
	global $DEAMON_LOCK_HANDLER;

	if( ! deamon_file_lock() ) {
		printf("Deamon is already running...\n");
		exit();
	}

	umask(0);

	$pid = pcntl_fork();

	if( $pid < 0 ) {
		printf("Can't fork\n");
		exit;
	}
	else if( $pid ) {
		exit;
	}

	$sid = posix_setsid();

	if( $sid < 0 ) {
		printf("Can't set session leader\n");
		exit;
	}

	deamon_bind_signals($signalHandler);

	$pid = pcntl_fork();

	if( $pid < 0 || $pid ) {
		exit;
	}

	ftruncate($DEAMON_LOCK_HANDLER, 0);
	fwrite($DEAMON_LOCK_HANDLER, posix_getpid());

	chdir('/');

	fclose( STDIN );
	fclose( STDOUT );
	fclose( STDERR );
}

function deamon_bind_signals($signalHandler = false) {
	$signalHandler = !$signalHandler ? "deamon_signal_handler" : $signalHandler;

	pcntl_signal(SIGTERM, $signalHandler);
	pcntl_signal(SIGHUP,  $signalHandler);
	pcntl_signal(SIGUSR1, $signalHandler);
	pcntl_signal(SIGINT, $signalHandler);
}

function deamon_file_lock() {
	global $DEAMON_LOCK_HANDLER;
	$DEAMON_LOCK_HANDLER = fopen(realpath(__DIR__) . '/' . DEAMON_LOCK_FILE, 'c');

	if( ! $DEAMON_LOCK_HANDLER ) {
		printf("Can't open lock file\n");
		die();
	}
	if( !flock( $DEAMON_LOCK_HANDLER, LOCK_EX | LOCK_NB ) ) {
		return false;
	}
	return true;
}

function deamon_signal_handler($signo) {
	switch( $signo ) {
		case SIGTERM:
		case SIGHUP:
		case SIGUSR1:
			break;
	}
}

function sighandler($sig) {
        //do something
	if( $sig == SIGTERM ) {
		global $DEAMON_LOCK_HANDLER;
		fclose( $DEAMON_LOCK_HANDLER );
		exit;
	}
}
daemonize("sighandler");

while( 1 ) {
	pcntl_signal_dispatch();
	// do something here
	sleep( 1 );
}

可以通过ps -ef | grep php查看过程中的php进程产生情况,CentOS下安装PHP5.4的Posix扩展为:sudo yum instal php54w-process。

pcntl_signal可以注册信号处理函数,捕获信号后交给对应回调函数处理,实现信号通信,例如当某一子进程结束、中断或恢复执行时,内核会发送SIGCHLD信号予其父进程

<?php
declare(ticks = 1);

pcntl_signal(SIGCHLD, "signal_handler");

function signal_handler($signal) {
	switch($signal) {
		case SIGCHLD:
			while (pcntl_waitpid(0, $status) != -1) {
				$status = pcntl_wexitstatus($status);
				echo "Child $status completed\n";
			}

			exit;
	}
}

for ($i = 1; $i <= 5; ++$i) {
	$pid = pcntl_fork();

	if (!$pid) {
		sleep(1);
		print "In child $i\n";
		exit($i);
	}
}

while(1) {
	// parent does processing here...
}

pcntl_alarm创建一个计时器,在指定的秒数后向进程发送一个SIGALRM信号,结合pcntl_signal和pcntl_alarm可以做一个秒级的定时器(注意:pcntl_alarm是一次性消耗,需要再次设置)

declare(ticks = 1);

function signal_handler($signal) {
	//do your work here
	print "Caught SIGALRM\n";
	pcntl_alarm(3);
}

pcntl_signal(SIGALRM, "signal_handler", true);
pcntl_alarm(3);

while(1) {
}

利用PHP的进程控制便可以实现守护进程监控,如socke端口监听;多进程处理,如socke请求事件处理、任务并行、异步处理,提升PHP程序性能。

参考链接:
PHP 进程控制
Getting into multiprocessing
Timing your signals
PHP Deamon
PHP中利用pcntl进行多进程并发控制
PHP高级编程之守护进程
PHP多进程编程一,PHP多进程编程二。
PHP的ticks机制
PHP如何将进程作为守护进程
Daemonising a PHP cli script on a posix system
异步毫秒定时器
The declare() function and ticks
子进程

PHP 扩展开发之C

前面介绍了使用Zephir来开发PHP扩展,将PHP代码转为扩展,以提升性能,保护代码。目前更多的扩展都是采用C/C++开发的,最近在项目开发中,需要在这些已有的PHP扩展上开发,也只能用C/C++来开发了。

首先去PHP官网下载对应版本的PHP源码,解压并进入对应的目录。
创建扩展courages:

[vagrant@vagrant-centos64 ext]$ ./ext_skel
./ext_skel --extname=module [--proto=file] [--stubs=file] [--xml[=file]]
           [--skel=dir] [--full-xml] [--no-help]

  --extname=module   module is the name of your extension
  --proto=file       file contains prototypes of functions to create
  --stubs=file       generate only function stubs in file
  --xml              generate xml documentation to be added to phpdoc-cvs
  --skel=dir         path to the skeleton directory
  --full-xml         generate xml documentation for a self-contained extension
                     (not yet implemented)
  --no-help          don't try to be nice and create comments in the code
                     and helper functions to test if the module compiled
[vagrant@vagrant-centos64 ext]$ ./ext_skel --extname=courages
Creating directory courages
Creating basic files: config.m4 config.w32 .svnignore courages.c php_courages.h CREDITS EXPERIMENTAL tests/001.phpt courages.php [done].

To use your new extension, you will have to execute the following steps:

1.  $ cd ..
2.  $ vi ext/courages/config.m4
3.  $ ./buildconf
4.  $ ./configure --[with|enable]-courages
5.  $ make
6.  $ ./sapi/cli/php -f ext/courages/courages.php
7.  $ vi ext/courages/courages.c
8.  $ make

Repeat steps 3-6 until you are satisfied with ext/courages/config.m4 and
step 6 confirms that your module is compiled into PHP. Then, start writing
code and repeat the last two steps as often as necessary.

这里的步骤说的很清楚,但这一次,步骤3被phpize代替了。

按部就班,编辑config.m4,PHP_ARG_WITH是采用动态库方式加载(PHP_ARG_ENABLE则是编译内核中,configure是–enable-extension使用),将

dnl PHP_ARG_WITH(courages, whether to enable courages support,
dnl Make sure that the comment is aligned:
dnl [  --with-courages             Include courages support])

更改为

PHP_ARG_WITH(courages, for courages support,
[  --with-courages             Include courages support])

然后,在php_courages.h增加函数声明

PHP_FUNCTION(confirm_courages_compiled);	/* For testing, remove later. */
PHP_FUNCTION(courages_helloworld);

接着,编辑courages.c,在function_entry中增加函数注册

const zend_function_entry courages_functions[] = {
	PHP_FE(confirm_courages_compiled,	NULL)		/* For testing, remove later. */
 	PHP_FE(courages_helloworld,  NULL)
	PHP_FE_END	/* Must be the last line in courages_functions[] */
};

然后是courages_helloworld函数实现

PHP_FUNCTION(courages_helloworld)
{
        char *arg = NULL;
	int arg_len, len;
	char *strg;
	if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &arg, &arg_len) == FAILURE) {
		return;
	}
	len = spprintf(&strg, 0, "Your input string: %s/n", arg);
	php_printf(strg);
	return SUCCESS;
}

最后就是编译

phpize
./configure
sudo make
sudo make install

sudo vim /etc/php.ini

在php.ini中增加扩展courages.so

[courages]
extension = courages.so

测试一下

[vagrant@vagrant-centos64 courages]$ php -m | grep 'courages'
courages

[vagrant@vagrant-centos64 courages]$ php courages.php
Functions available in the test extension:
confirm_courages_compiled
courages_helloworld

Your input string: hellow world
Congratulations! You have successfully modified ext/courages/config.m4. Module courages is now compiled into PHP.

到这里一个扩展的开发流程就结束了。

这里分享一些小技巧。
首先是如何在PHP扩展中获取PHP全局数组$_SERVER($_POST/GET)变量中的值:

static char *get_server_var_by_name(char *str){
	// This code makes sure $_SERVER has been initialized
	if (!zend_hash_exists(&EG(symbol_table), "_SERVER", 8)) {
		zend_auto_global* auto_global;
		if (zend_hash_find(CG(auto_globals), "_SERVER", 8, (void **)&auto_global) != FAILURE) {
			auto_global->armed = auto_global->auto_global_callback(auto_global->name, auto_global->name_len TSRMLS_CC);
		}
	}

	// This fetches $_SERVER['PHP_SELF']
	zval** arr;
	char* script_name;
	if (zend_hash_find(&EG(symbol_table), "_SERVER", 8, (void**)&arr) != FAILURE) {
		HashTable* ht = Z_ARRVAL_P(*arr);
		zval** val;
		if (zend_hash_find(ht, str, strlen(str)+1, (void**)&val) != FAILURE) {
			script_name = Z_STRVAL_PP(val);
		}
	}
	return script_name;
}

然后是如何在PHP扩展调用PHP函数:

/*调用无参数函数*/
static char *get_sapi_name(){
    zval *function_name;
    zval *retval;
	char *sapi_name;

    MAKE_STD_ZVAL(function_name);
    ZVAL_STRING(function_name , "php_sapi_name", 1);


    if (call_user_function_ex(EG(function_table), NULL, function_name, &retval, 0, NULL, 0, EG(active_symbol_table) TSRMLS_CC) != SUCCESS)
    {
        zend_error(E_ERROR, "Function call failed");
    }

    if (retval != NULL && Z_TYPE_P(retval) != IS_STRING) {
        convert_to_string(retval);
        sapi_name = Z_STRVAL_P(retval);
    }
    else{
	sapi_name = "cli";
    }

    return sapi_name;
}
/*调用有参数函数*/
static int _ck_dir(char *dir TSRMLS_DC)
{
    zval *function_name;
    zval *retval;
    zval *str;
    zval **param[1];

    MAKE_STD_ZVAL(function_name);
    ZVAL_STRING(function_name , "is_dir", 1);

    MAKE_STD_ZVAL(str);
    ZVAL_STRING(str, dir, 1);
    param[0] = &str;

    if (call_user_function_ex(EG(function_table), NULL, function_name, &retval, 1, param, 0, EG(active_symbol_table) TSRMLS_CC) != SUCCESS)
    {
        zend_error(E_ERROR, "Function call failed");
    }

    if (retval != NULL && zval_is_true(retval)) {
        return SUCCESS;
    }

    return FAILURE;
}

更高级的一些技巧可以参考《PHP扩展开发及内核应用》和阅读别人的扩展开发代码。

参考链接:
php扩展实战 —— 获得ip的来源地址
如何编写一个PHP的C扩展
[原创]快速开发一个PHP扩展
用C/C++扩展你的PHP
Get the name of running script from a PHP extension
Build PHP extension and use call_user_function
Programming PHP
与 UNIX 构建系统交互: config.m4
call_user_function_ex() documentation
PHP Extensions Made Eldrich: PHP Variables
Convert Zval to char*
PHP扩展编写第一步:PHP和Zend介绍
PHP扩展开发:简单类实现
自己写PHP扩展之创建一个类[原创]
如何在扩展里调用PHP函数呢?

PHP队列开发之Beanstalk

Beanstalk是一个基于内存的(binlog持久化到硬盘),事件驱动(libevent),简单、快速的任务队列,支持大部分编程语言,将前台的任务转为后台异步处理,为web开发提供更高弹性。它可以支持多个server(客户端支持),一个任务只会被投递到一台server,一个任务只会被一个消费者获取(Reverse)。

相比RabbitMQ,Beanstalk作为一个任务队列,设计比较简单,支持以下特性:

  • 优先级(priority),可以对任务进行优先处理(或降级),越小的值优先级越高(0~4,294,967,295),默认按先进先出(FIFO)
  • 延迟执行(delay),一个任务创建完成并稍后再执行(比如等待主从同步)
  • 超时重试(TTR),一个任务没有在指定时间内完成,将会被重新投递,由其他客户端处理。客户端也可以主动进行延时(touch)或重新入队(release)
  • 隐藏(bury),一个任务执行失败了,可以先隐藏,隐藏的任务可以被重新激活(kick);

一个任务如果没有被删除,那么它就可以被重新获取。下面是大多数任务的生命周期:

   put with delay               release with delay
  ----------------> [DELAYED] <------------.
                        |                   |
                        | (time passes)     |
                        |                   |
   put                  v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                       ^  ^                |  |
                       |   \  release      |  |
                       |    `-------------'   |
                       |                      |
                       | kick                 |
                       |                      |
                       |       bury           |
                    [BURIED] <---------------'
                       |
                       |  delete
                        `--------> *poof*

CentOS下安装Beanstalkd

sudo yum install beanstalkd
#启动beanstalk
sudo service beanstalkd start
#beanstalkd -l 192.168.33.14 -p 11300

PHP下面有个C扩展beanstalk库可以使用,基于libbeanstalkclient

git clone https://github.com/bergundy/libbeanstalkclient.git
cd libbeanstalkclient
mkdir m4
#开始编译
sudo ./autogen.sh

#创建libbeanstalkclient.conf,内容为/usr/lib
sudo vim /etc/ld.so.conf.d/libbeanstalkclient.conf
#使配置生效
sudo ldconfig

git clone https://github.com/nil-zhang/php-beanstalk.git
cd php-beanstalk
phpize
./configure
sudo make
sudo make install
sudo vim /etc/php.ini

编辑php.ini增加以下内容

[beanstalk]
extension = "beanstalk.so"

查看是否加载成功

php -m
#加载成功则重启php-fpm
sudo service php-fpm restart

PHP测试代码

<?php
    $bsc = new Beanstalk();

    $bsc->addserver("192.168.33.14", 11300);
    $bsc->addserver("192.168.33.12", 11300);

    $tubes = $bsc->list_tubes();
    print_r($tubes);

    for($i = 0; $i < 10; $i++)
    {
        $key = "key".$i;
        $value = "value".$i;

        $bsc->use($key);
        $bsc->put($key, $value);
        echo "$key\t$value\n";

        $bsc->watch($key);
        $job = $bsc->reserve($key);
        print_r($job);

        if($bsc->bury($job['id'], $key))
            echo "bury ok\n";
        else
            echo "bury failed\n";

        $bsc->kick(100, $key);
        if($bsc->delete($job['id'], $key))
            echo "delete ok\n";
        else
            echo "delete failed \n";

        $bsc->ignore($key);
        echo "\n";
    }

    echo "done\n";

注意由于Beanstalk服务端实现的比较简单,协议特性需要客户端支持,不同的实现可能效果不一样,这个客户端并没有实现延时发送(delay),超时重试(TTR)。需要这些特性建议使用这个库:PHP Beanstalkd。前台生产者创建任务:

<?php
include 'lib/Beanstalk.php';
$bean = Beanstalk::init();
$bean->addServer('192.168.33.14', 11300);
$bean->addServer('192.168.33.12', 11300);
$bean->useTube('my-tube');
$bean->put('Hello World!', 1024);
$bean->put('Hello World!2', 1023);
$bean->put(json_encode(array('what','how')), 1000, 1, 1);

后台消费者处理任务

include 'lib/Beanstalk.php';
$bean = Beanstalk::init();
$bean->addServer('192.168.33.12', 11300);
$bean->addServer('192.168.33.14', 11300);
$bean->watchTube('my-tube');

while (true)
{
	try
	{
		$job = $bean->reserve($timeout = 10);

		/* process job ... */
		var_dump($job);
		//var_dump($job->getMessage());

		$job->delete();
	}
	catch (BeanstalkException $e)
	{
		switch ($e->getCode())
		{
			case BeanstalkException::TIMED_OUT:
				echo "Timed out waiting for a job.  Retrying in 1 second.";
				sleep(1);
				continue;
				break;
			default:
				throw $e;
				break;
		}
	}
}

注意:客户端获取任务(reverse)是阻塞的(blocking),直到超时;同一个队列(tube)的任务按FIFO进行处理(除非指定优先级);任务内容长度不能超过65536;作为内存队列需要注意是否会内存超出,可以快速处理到Mysql。

使用Beanstalk任务队列提升PHP异步处理能力,降低程序耦合度,使前台更专注,后台处理耗时、扩展性任务(也可以使用其他语言开发),使得web架构更具扩展性。

参考链接:
Scalable Work Queues with Beanstalk
Beanstalk Protocol
Frequently Asked Questions for beanstalkd
Getting Started with Beanstalkd
Queue your work
Asynchronous Processing in Web Applications, Part 2: Developers Need to Understand Message Queues

PHP yield应用

PHP 5.5开始新增了神奇的关键字yield,能够从生成器(generators)中返回数据。yield有点像普通函数中的关键字return,但是不会彻底停止函数的执行(普通函数一旦return便不执行了),可以暂停循环并返回值,每一次调用便从中断处继续迭代。生成器可以用于替代循环迭代,每一次调用返回一个生成器对象(generator)

yield能够延迟执行,可以用于对大量数据进行迭代而不用预先在内存中生成数组。例如动态生成一个大数组:

<?php
function xrange($start, $end, $step = 1) {
    for ($i = $start; $i <= $end; $i += $step) {
        yield $i;
    }
}

foreach (xrange(1, 1000000) as $num) {
    echo $num, "\n";
}

$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)

利用yield简便、高效的生成fibonacci数列而不是循环或递归

<?php
function fibonacci($count) {
    $prev = 0;
    $current = 1;

    for ($i = 0; $i < $count; ++$i) {
        yield $prev;
        $next = $prev + $current;
        $prev = $current;
        $current = $next;
    }
}

foreach (fibonacci(48) as $i => $value) {
    echo $i , ' -> ' , $value, PHP_EOL;
}

利用yield来循环读取文件,而不需要像file函数那样一次性加载进来,节省内存

<?php
function file_lines($filename) {
    $file = fopen($filename, 'r'); 
    while (($line = fgets($file)) !== false) {
        yield $line; 
    } 
    fclose($file); 
}
 
foreach (file_lines('somefile') as $line) {
    // do some work here
}

yield除了能够返回值,用作变量时还可以接收值。

<?php
function logger($fileName) {
    $fileHandle = fopen($fileName, 'a');
    while (true) {
        fwrite($fileHandle, yield . "\n");
    }
}

$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar');

由于yield具有中断执行后再次调用又可以从中断处执行,外界又可以通过生成器对象(generator)的send方法进行交互,可以用于协程(coroutine),作多任务协作的流程控制。这里有个例子

<?php
class Task {
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;

    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }

    public function getTaskId() {
        return $this->taskId;
    }

    public function setSendValue($sendValue) {
        $this->sendValue = $sendValue;
    }

    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }

    public function isFinished() {
        return !$this->coroutine->valid();
    }
}

class Scheduler {
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;

    public function __construct() {
        $this->taskQueue = new SplQueue();
    }

    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }

    public function schedule(Task $task) {
        $this->taskQueue->enqueue($task);
    }

    public function run() {
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();
            $task->run();

            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}
function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield;
    }
}

function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield;
    }
}

$scheduler = new Scheduler;

$scheduler->newTask(task1());
$scheduler->newTask(task2());

$scheduler->run();

有些任务是需要交互进行的,如socket的监听和回复;有些任务异步执行又需要回调,如A执行不阻塞程序,但B执行又取决于A是否执行完毕。这些都可以使用yield来进行封装,达到流程控制的目的。

参考链接:
Generators overview
What does yield mean in PHP?
What is the difference between a generator and an array?
Cooperative multitasking using coroutines (in PHP!)
What Generators Can Do For You
Generators and Coroutines in PHP
Generators in PHP
协程与yield
http://www.hitoy.org/coroutine-and-yield.html
Co-operative PHP Multitasking
Generator (computer programming)
异步处理在分布式系统中的优化作用