标签归档:libevent

PHP ZeroMQ开发

ZeroMQ的名字有点巧妙,看起来是个MQ却加了个0,变得不是MQ。ZeroMQ是一个面向消息传递的网络通信框架,支持程序在进程内部部通信,进程之间通信,网络间通信,多播等。ZeroMQ对Socket进行了封装,支持多种网络结构范式如Request/Reply,Pub/Sub,Pull/Push,中介,路由等,还可以在这些模式再次扩展,动态扩容程序和分布式任务开发,能够轻易搭建服务程序集群。

ZeroMQ与支持AMQP的消息中间件不一样,ZeroMQ是一个网络通信库,需要自行实现中间节点和消息的管理。

在CentOS安装ZeroMQ4

git clone https://github.com/zeromq/zeromq4-x.git
cd zeromq4-x
./autogent.sh
./configure
sudo make
sudo make install

#声明libzmq库的位置
sudo vim /etc/ld.so.conf.d/libzmq.conf
#内容:/usr/local/lib

sudo ldconfig

ZeroMQ支持多种编程语言,也包括PHP。php-zmq安装

git clone https://github.com/mkoppanen/php-zmq.git
cd php-zmq
phpize
./configure
sudo make
sudo make install

sudo vim /etc/php.ini

编辑PHP.ini增加扩展信息

[zeromq]
extension = zmq.so

查看扩展是否加载成功:php -m | grep php。
先写一个简单请求-应答,首先是服务端reply.php

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5559");
while (true){
	$strMessage = $pServer->recv();
	echo $strMessage."\r\n";
	$pServer->send("From Server1:".$strMessage);
}

然后是客户端request.php

<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("tcp://localhost:5559");
$pClient->send("Hello From Client:".uniqid());
var_dump($pClient->recv());

分别在不同终端预先一下程序

#请求者可以先启动
php request.php
#另一个终端
php reply.php

使用ZeroMQ进行通信的步骤

  • 使用ZMQContext创建一个上下文
  • 使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括
    • PUB,SUB
    • REQ,REP
    • REQ,ROUTER (take care, REQ inserts an extra null frame)
    • DEALER,REP (take care, REP assumes a null frame)
    • DEALER,ROUTER
    • DEALER,DEALER
    • ROUTER,ROUTER
    • PUSH,PULL
    • PAIR,PAIR

    分类包括

    • 轮询,REQ,PUSH,DEALER
    • 多播,PUB
    • 公平排队,REP,SUB,PULL,DEALER
    • 明确寻址,ROUTER
    • 单播,PAIR
  • 如果是服务端就bind,如果是客户端就conncet,这里的连接信息支持
    • 进程内部通信,inproc://
    • 进程间通信,ipc://
    • 网络间通信,tcp://
    • 多播,pgm://
  • 使用send/recv发送/接收消息

使用ZeroMQ创建通信比socket简单多了,与stream_socket差不多。但是使用ZeroMQ,客户端可以先启动而不用管服务端是否已经启动了,等服务端连接上了便会自动传递消息,还可以维持节点之间的心跳。

ZeroMQ与socket通信是不一样的。ZeroMQ是无状态的,对socket的细节进行了封装,不能知道彼此的socket连接信息,仅能接收和发送消息;ZeroMQ能够使用一个socket与多个节点进行通信,具有极高的性能。

再回头看一下服务端程序,这里采用while循环来处理,亦即同一时刻只能处理一个请求,多个请求排队直到被轮询到,客户端的发送和接收都是同步等待。由于不知道客户端信息,也不能在子进程内处理完成再返回。这里就需要用到ZeroMQ各种范式的组合,比如下面这个
fig16
这里使用ROUTER和DEALER作为中介,转发请求,客户端可以异步发送求,不用等待服务端响应。

<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");

$pPoll = new ZMQPoll();
$pPoll->add($pFrontend, ZMQ::POLL_IN);
$pPoll->add($pBackend, ZMQ::POLL_IN);


$arrRead = $arrWrite = array();
while(true){
	$nEvent = $pPoll->poll($arrRead, $arrWrite);
    if ($nEvent > 0) {
		foreach($arrRead as $pSocket){
			if($pSocket === $pFrontend){
				while (true){
					$strMessage = $pSocket->recv();
					$nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
					$pBackend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
					if(!$nMore){
						break;
					}
				}
			}
			else if ($pSocket === $pBackend){
				while (true){
					$strMessage = $pSocket->recv();
					$nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
					$pFrontend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
					if(!$nMore){
						break;
					}
				}
			}
		}
    }
}

然后更改服务端reply.php,不再绑定监听,而不是连接到DEALER上

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
//$pServer->bind("tcp://*:5555");
$pServer->connect("tcp://localhost:5560");
while (true){
	$strMessage = $pServer->recv();
	echo $strMessage."\r\n";
	$pServer->send("From Server1:".$strMessage);
}

这里使用ZMQPoll对ZMQSOcket的输入输出事件进行轮询,将ROUTER收到的REQ转发给服务端,将DEALER收到的REP转发给客户端。事实上,还有更简便的方法:使用ZMQDevice将ROUTER和DEALER组合起来

<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");

$pDevice = new ZMQDevice($pFrontend, $pBackend);
$pDevice->run();

ZeroMQ的Pub/Sub的通信模型支持一个发布者发布消息给多个订阅者,也支持一个订阅者从多个发布者订阅消息。首先写一个发布者

<?php
$pContext = new ZMQContext();
$pPublisher = new ZMQSocket($pContext, ZMQ::SOCKET_PUB);
$pPublisher->bind("tcp://*:5563");

while (true) {
    $pPublisher->send("A", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We don't want to see this");
    $pPublisher->send("B", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We would like to see this");
    sleep (1);
}

然后是订阅者

$pContext = new ZMQContext();
$pSubscriber = new ZMQSocket($pContext, ZMQ::SOCKET_SUB);
$pSubscriber->connect("tcp://localhost:5563");
#可以连接多个发布者
$pSubscriber->connect("tcp://localhost:5564");
$pSubscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "B");

while (true) {
    //  Read envelope with address
    $address = $pSubscriber->recv();
    //  Read message contents
    $contents = $pSubscriber->recv();
    printf ("[%s] %s%s", $address, $contents, PHP_EOL);
}

Pub/Sub模型,发布者只能发布消息,要求发布消息前,先声明主题(地址),然后发布消息内容;订阅者只能接收消息,先设置订阅主题,然后两次接收,第一次为消息主题,第二次为消息内容。
Pub/Sub模型通消息为单向流动,可以结合其他模型让订阅者与发布者互动,比如REQ\REP。

ZeroMQ的Push/Pull模型,生产者负责推送消息,消费者负责拉取消息。初看之下Pull/Push模型与Pub/sub模型类似,但是Pull/Push下生产者产生的消息只会投递给一个消费者,并不会发布给全部消费者,适合用于任务投递分配
fig5
Push和Pull都既可作为服务端,也可作为客户端。服务端Push.php

<?php
$pContext = new ZMQContext();
$pPush = new ZMQSocket($pContext, ZMQ::SOCKET_PUSH);

$pPush->bind("tcp://*:5558");
//$pPush->connect("tcp://localhost:5558");
//$pPush->connect("tcp://localhost:5559");

$pPush->send("Hello Client 1");

客户端Pull.php

<?php
$pContext = new ZMQContext();
$pPull = new ZMQSocket($pContext, ZMQ::SOCKET_PULL);

//$pPull->bind("tcp://*:5558");
$pPull->connect("tcp://localhost:5558");
$pPull->connect("tcp://localhost:5559");

var_dump($pPull->recv());

如果同时启动了两个客户端Pull.php,而只启动一个服务端Push.php,那么一次只会有一个客户端接收到消息。也可以以Pull作为主动监听,Push作为被动连接。可以同时接可以Pub/Sub和Pull/Push来处理任务
fig56
如果是用ZeroMQ传递消息收不到,可以按下面这个流程查问题
chapter1_9
除了客户端可以连接多个服务端,服务端同样可以绑定多个地址。在REQ/REP模型里,让服务端同时使用IPC(进程间通信)来处理本机的连接

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5556");
$pServer->bind("ipc:///tmp/req.ipc");
while(true){
	$message = $pServer->recv();
	echo $message . PHP_EOL;
	$pServer->send("Hello from server1:".$message);
}

客户端可以选择走TCP或者IPC进行消息通信

<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("ipc:///tmp/req.ipc");
//$pClient->connect("tcp://localhost:5556");
$pClient->send("Hello From Client1:".uniqid());
$strMessage = $pClient->recv();
echo $strMessage,PHP_EOL;

使用ZeroMQ的进程内部消息通信也很简单

$pServer  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
$pServer->bind("inproc://reply");


$pClient  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ);
$pClient->connect("inproc://reply");;
$pClient->send("Hello From Client1:".uniqid());

var_dump($pServer->recv());

ZeroMQ为消息传递的提供极简的方法,提供了各种连接模型,可以自由扩展。zguide更像是一个网络编程指南,指导大家如何利用ZeroMQ搭建各种网络通信模式,提高程序扩展性和健壮性。虽然ZeroMQ解决了进程间和网络间的通信问题,但是各个组件本身进程控制仍然需要自行实现。

更新:ZeroMQ的作者用C语言创建了另外一个支持多种通用通信范式的socket库:nanomsg,可以用来代替ZeroMQ做的那些事,提供了更好的伸缩性,也有对应的PHP扩展

参考链接:
ZMQ 指南
ZeroMQ in PHP
zeromq is the answer
ZeroMQ + libevent in PHP
Europycon2011: Implementing distributed application using ZeroMQ
Getting Started with ‘nanomsg’
A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

PHP RPC开发之Thrift

Apache Thrift是一个跨语言的服务部署框架,通过一个中间语言(IDL, 接口定义语言)来定义RPC的接口和数据类型,然后通过一个编译器生成不同语言的代码(支持C++,Java,Python,PHP, GO,Javascript,Ruby,Erlang,Perl, Haskell, C#等),并由生成的代码负责RPC协议层和传输层的实现。

在CentOS 6.5上安装Thrift

sudo yum -y update
sudo yum -y groupinstall "Development Tools"

#升级autoconf,必须2.65以上
wget http://ftp.gnu.org/gnu/autoconf/autoconf-2.69.tar.gz
tar xvf autoconf-2.69.tar.gz
cd autoconf-2.69
./configure --prefix=/usr
make
sudo make install
cd ..

#升级automake必须1.14以上
wget http://ftp.gnu.org/gnu/automake/automake-1.14.tar.gz
tar xvf automake-1.14.tar.gz
cd automake-1.14
./configure --prefix=/usr
make
sudo make install
cd ..

#升级bsion
wget http://ftp.gnu.org/gnu/bison/bison-2.5.1.tar.gz
tar xvf bison-2.5.1.tar.gz
cd bison-2.5.1
./configure --prefix=/usr
make
sudo make install
cd ..

#安装boost
wget http://sourceforge.net/projects/boost/files/boost/1.55.0/boost_1_55_0.tar.gz
tar xvf boost_1_55_0.tar.gz
cd boost_1_55_0
./bootstrap.sh
sudo ./b2 install
cd ..

#安装thrift,编译会比较久,内存最好1024M以上
git clone https://git-wip-us.apache.org/repos/asf/thrift.git
cd thrift
./bootstrap.sh
./configure
make
sudo make install
cd ..

#查看版本
thrift -version

#安装thrift_protocol扩展,仅支持二进制读写
cd thrift/lib/php/src/ext/thrift_protocol
phpize
./configure
sudo make
sudo make install
#这里不需要更改php.ini,已自动在/etc/php.d/thrift_protocol.ini里面添加
php -m | grep thrift

Thrift的PHP类库位于thrift/lib/php/lib/Thrift目录下面,Thrift对于数据传输格式、数据传输方式,服务器模型均做了定义,方便自行扩展。

数据传输格式(protocol)是定义的了传输内容,对Thrift Type的打包解包,包括

  • TBinaryProtocol,二进制格式,TBinaryProtocolAccelerated则是依赖于thrift_protocol扩展的快速打包解包。
  • TCompactProtocol,压缩格式
  • TJSONProtocol,JSON格式
  • TMultiplexedProtocol,利用前三种数据格式与支持多路复用协议的服务端(同时提供多个服务,TMultiplexedProcessor)交互

数据传输方式(transport),定义了如何发送(write)和接收(read)数据,包括

  • TBufferedTransport,缓存传输,写入数据并不立即开始传输,直到刷新缓存。
  • TSocket,使用socket传输
  • TFramedTransport,采用分块方式进行传输,具体传输实现依赖其他传输方式,比如TSocket
  • TCurlClient,使用curl与服务端交互
  • THttpClient,采用stream方式与HTTP服务端交互
  • TMemoryBuffer,使用内存方式交换数据
  • TPhpStream,使用PHP标准输入输出流进行传输
  • TNullTransport,关闭数据传输
  • TSocketPool在TSocket基础支持多个服务端管理(需要APC支持),自动剔除无效的服务器
  • TNonblockingSocket,非官方实现非阻塞socket

服务模型,定义了当PHP作为服务端如何监听端口处理请求

  • TForkingServer,采用子进程处理请求
  • TSimpleServer,在TServerSocket基础上处理请求
  • TNonblockingServer,基于libevent的非官方实现非阻塞服务端,与TNonblockingServerSocket,TNonblockingSocket配合使用

另外还定义了一些工厂,以便在Server模式下对数据传输格式和传输方式进行绑定

  • TProtocolFactory,数据传输格式工厂类,对protocol的工厂化生产,包括TBinaryProtocolFactory,TCompactProtocolFactory,TJSONProtocolFactory
  • TTransportFactory,数据传输方式工厂类,对transport的工厂化生产,作为server时,需要自行实现
  • TStringFuncFactory,字符串处理工厂类

其他文件便是异常,字符串处理,自动加载器的定义等等。

现在开始编写一个简单接IDL文件HelloWorld.thrift

namespace php Services.HelloWorld
service HelloWorld
{
    string sayHello(1:string name);
}

然后通过生成器生成PHP文件

#不指明:server不生成processor。。
thrift --gen php:server HelloWorld.thrift

生成文件在gen-php目录下面的Services/HelloWord/HelloWorld.php(目录与namesapce定义一致),这是个公共文件,服务端和客户端都需要包括它。其中客户端调用的代码(HelloWorldClient )已经生成好了

//服务端需要继承该接口
interface HelloWorldIf {
  /**
   * @param string $name
   * @return string
   */
  public function sayHello($name);
}
//提供给客户端调用的方法
class HelloWorldClient implements \Services\HelloWorld\HelloWorldIf {
  public function sayHello($name)
  {
    $this->send_sayHello($name);
    return $this->recv_sayHello();
  }
  public function send_sayHello($name)
  {
  }
  public function recv_sayHello()
  {
  }
}
//HelloWord类sayHello方法参数读取
class HelloWorld_sayHello_args {
}
//HelloWord类sayHello方法结果写入
class HelloWorld_sayHello_result {
}
//作为服务端才会生成
class HelloWorldProcessor {
}

而服务端的服务实现代码则需要继承HelloWorldIf 实现代码HelloWorldHandler.php

<?php
namespace Services\HelloWorld;

class HelloWorldHandler implements HelloWorldIf {
  public function sayHello($name)
  {
      return "Hello $name";
  }
}

编写服务端代码Server.php

<?php
namespace Services\HelloWorld;

error_reporting(E_ALL);

define('THRIFT_ROOT', __DIR__.'/../../../');
require_once  THRIFT_ROOT.'Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift',  THRIFT_ROOT);
$loader->registerDefinition('Service',  THRIFT_ROOT.'/gen-php');
$loader->register();

use Thrift\Exception\TException;
use Thrift\Factory\TBinaryProtocolFactory;
use Thrift\Factory\TBufferedTransportFactory;

use Thrift\Server\TServerSocket;
use Thrift\Server\TSimpleServer;

//use Thrift\Server\TNonblockingServerSocket;
//use Thrift\Server\TNonblockingServer;

//use Thrift\Protocol\TBinaryProtocol;
//use Thrift\Transport\TPhpStream;
//use Thrift\Transport\TBufferedTransport;


try {
	require_once 'HelloWorldHandler.php';
	$handler = new \Services\HelloWorld\HelloWorldHandler();
	$processor = new \Services\HelloWorld\HelloWorldProcessor($handler);
	
	$transportFactory = new TBufferedTransportFactory();
	$protocolFactory = new TBinaryProtocolFactory(true, true);
	
	//作为cli方式运行,监听端口,官方实现
	$transport = new TServerSocket('localhost', 9090);
	$server = new TSimpleServer($processor, $transport, $transportFactory, $transportFactory, $protocolFactory, $protocolFactory);
	$server->serve();
	
	//作为cli方式运行,非阻塞方式监听,基于libevent实现,非官方实现
	//$transport = new TNonblockingServerSocket('localhost', 9090);
	//$server = new TNonblockingServer($processor, $transport, $transportFactory, $transportFactory, $protocolFactory, $protocolFactory);
	//$server->serve();

	//客户端和服务端在同一个输入输出流上
	//使用方式
	//1) cli 方式:php Client.php | php Server.php 
	//2) cgi 方式:利用Apache或nginx监听http请求,调用php-fpm处理,将请求转换为PHP标准输入输出流
	//$transport = new TBufferedTransport(new TPhpStream(TPhpStream::MODE_R | TPhpStream::MODE_W));
	//$protocol = new TBinaryProtocol($transport, true, true);
	//$transport->open();
	//$processor->process($protocol, $protocol);
	//$transport->close();
	
} catch (TException $tx) {
	print 'TException: '.$tx->getMessage()."\n";
}

服务端创建的步骤:

  • 首先初始化服务提供者handler
  • 然后利用该handler初始化自动生成的processor
  • 初始化数据传输方式transport
  • 利用该传输方式初始化数据传输格式protocol
  • 开始服务

编写客户端代码Client.php

<?php
namespace Services\HelloWorld;

error_reporting(E_ALL);

define('THRIFT_ROOT', __DIR__.'/../../../');
require_once  THRIFT_ROOT.'Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift',  THRIFT_ROOT);
$loader->registerDefinition('Service',  THRIFT_ROOT.'/gen-php');
$loader->register();

//use Thrift\Transport\TPhpStream;

use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\TBufferedTransport;
use Thrift\Exception\TException;

try {
	//仅在与服务端处于同一输出输出流有用
	//使用方式:php Client.php | php Server.php 
	//$transport = new TBufferedTransport(new TPhpStream(TPhpStream::MODE_R | TPhpStream::MODE_W));
	
	//socket方式连接服务端
	//数据传输格式和数据传输方式与服务端一一对应
	//如果服务端以http方式提供服务,可以使用THttpClient/TCurlClient数据传输方式
	$transport = new TBufferedTransport(new TSocket('localhost', 9090));
	$protocol = new TBinaryProtocol($transport);
	$client = new \Services\HelloWorld\HelloWorldClient($protocol);

	$transport->open();
	
	//同步方式进行交互
	$recv = $client->sayHello('Courages');
	echo "\n sayHello11dd:".$recv." \n";
	
	//异步方式进行交互
	$client->send_sayHello('Us');
	echo "\n send_sayHello \n";
	$recv = $client->recv_sayHello();
	echo "\n recv_sayHello:".$recv." \n";
	
	$transport->close();
} catch (TException $tx) {
	print 'TException: '.$tx->getMessage()."\n";
}
	

客户端调用的步骤:

  • 初始化数据传输方式transport,与服务端对应
  • 利用该传输方式初始化数据传输格式protocol,与服务端对应
  • 实例化自动生成的Client对象
  • 开始调用

在终端上运行

#以cli方式运行TPhpStream 
#php Client.php | php Server.php

#先运行Server.php
#要不然会报错:TException: TSocket: Could not connect to localhost:9090 (Connection refused [111])
php Server.php

#在另外一个终端运行
php Client.ph

官方给的例子,PHP作为服务端是以web方式进行提供的,在cli方式下并不能运行。

Thrift作为一个跨语言的服务框架,方便不同语言、模块之间互相调用,解耦服务逻辑代码,拓展了PHP的处理能力(如与Hbase交互),使得WEB架构更具弹性。与基于 SOAP 消息格式的 Web Service和基于 JSON 消息格式的 RESTful 服务不同,Thrif数据传输格式默认采用二进制传格式,对 XML 和 JSON 体积更小,但对于服务端的CPU占用比JSON、XML要高。PHP虽然有thrift_protocol扩展,但仅仅作为二进制数据传输格式化使用,其他文件的加载仍然为PHP,需要更多的开销。

如果由PHP来做为Thrift的服务端,仅仅这样子做仍然是不够的,Thrift仅仅实现的数据定义和传输,未实现RPC架构

  • 需要避免重复加载各类文件,是否做成PHP扩展
  • 数据传输格式和方式是否适需要自行扩展
  • 客户端要能够自动连可使用的服务端,剔除失效的服务器
  • 服务端需要处理客户端并发情况,是否多进程/异步处理
  • 服务端需要监控服务是否正常

workerman-thrift-rpc对这些问题进行了解决,基于thrift提供了一个可靠性的RPC框架。对客户端和服务端的调用做了封装,提供统一入口,利用workerman做socket中转,当客户端发出请求时,将给socket转给服务端使用,提供服务。workerman-json-rpc与workerman-thrift-rpc类似,采用异步(分步)收发,但简单多了,更像是一种约定。数据格式,发送时仅发送class,function,parameters三个参数,接收时,仅code,msg,data三个返回值,在格式约束及跨语言上,需要自行处理;不需要thrift那样依赖于生成器所生成的文件,客户端完全独立于服务端。

注:以上示例使用修改过的代码,附上代码:thrift

参考链接:
Apache Thrift – 可伸缩的跨语言服务开发框架
Thirft框架介绍
Apache Thrift
Building Apache Thrift on CentOS 6.5
PHP Tutorial
Creating a public API with Apache Thrift
hadoop + Hbase + thrift + php 安裝設定與程式設計
php实现的thrift socket server
Our own “Hello World!”

PHP 事件驱动开发

最近在学习PHP的系统事件驱动(event-base)开发,发现PHP有好几个event扩展,根据底层库依赖分为两类:libeventlibev。libevent可以为文件描述符、信号、超时设定等事件提供了监听回调,支持poll/kqueue/event port/select/epoll。libevent 库的其他组件提供其他功能,包括缓冲的事件系统(用于缓冲发送到客户端/从客户端接收的数据)以及 HTTP、DNS 和 RPC 系统的核心实现。libev提供了各种监听器,包括子进程监听,超时设定,定时器,IO监听,信号监听,文件监视等,支持epoll/kqueue/event ports/inotify/eventfd/signalfd,更快的时钟管理,时间变化检测和修正。PHP依赖libevent扩展有libeventevent,PHP依赖libev扩展则有Evlibev

libevent在PHP事件驱动开发上应用广泛,比如workermanphpDaemonReactPHPKellner。CentOS上PHP 5.4安装libevent扩展

sudo yum install libevent-devel

wget https://pecl.php.net/get/libevent-0.1.0.tgz
tar -zxvf libevent-0.1.0.tgz
cd libevent-0.1.0
phpize 
./configure
sudo make
sudo make install

#增加libevent.so
sudo vim /etc/php.ini

#是否安装成功
php -m | grep libevent

前面介绍过使用ticks和pcntl_signal来做定时器,然而tick运行机制是PHP解释器每执行 N 条可计时的低级语句就会发生的事件,如果tick值设置小了,会产生频繁的系统调用,设置大了又不能保证及时。使用libevent来设置一个定时器

<?php
function print_dot(){
	echo ".";
}

class Timer{
	protected  $pEventBase;
	protected $pEvent;
	public $nInterval = 1;
	public function __construct(){
		$this->pEventBase = event_base_new();
	}
	public function addEvent($p_pFunc, $p_mxArgs = null){
		$this->pEvent = event_new();
		event_set($this->pEvent, 0, EV_TIMEOUT, $p_pFunc, $p_mxArgs);
		event_base_set($this->pEvent, $this->pEventBase);
	}
	public function loop(){
		event_add($this->pEvent, $this->nInterval*1000000);
		event_base_loop($this->pEventBase);
	}
}

$pTimer = new Timer();
$pTimer->addEvent("print_dot");
while(1){
	$pTimer->loop();
}

libevent使用也很简单:

  • 使用event_base_new和event_new分别创建event_base和event
  • 使用event_set为event设置要监听文件描述符fd,比如文件、socke、信号,超时则fd为0,事件类型和回调函数
  • 使用event_base_set关联event_base和event
  • 使用event_add将设置好的event加入事件监听器
  • 调用event_base_loop开始处理事件

官网上有个例子用来做socket监听处理

<?php
$socket = stream_socket_server ('tcp://0.0.0.0:2000', $errno, $errstr);
stream_set_blocking($socket, 0);
$base = event_base_new();
$event = event_new();
event_set($event, $socket, EV_READ | EV_PERSIST, 'ev_accept', $base);
event_base_set($event, $base);
event_add($event);
event_base_loop($base);

$GLOBALS['connections'] = array();
$GLOBALS['buffers'] = array();

function ev_accept($socket, $flag, $base) {
    static $id = 0;
    
    $connection = stream_socket_accept($socket);
    stream_set_blocking($connection, 0);
    
    $id += 1;
    
    $buffer = event_buffer_new($connection, 'ev_read', NULL, 'ev_error', $id);
    event_buffer_base_set($buffer, $base);
    event_buffer_timeout_set($buffer, 30, 30);
    event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
    event_buffer_priority_set($buffer, 10);
    event_buffer_enable($buffer, EV_READ | EV_PERSIST);
    
    // we need to save both buffer and connection outside
    $GLOBALS['connections'][$id] = $connection;
    $GLOBALS['buffers'][$id] = $buffer;
}

function ev_error($buffer, $error, $id) {
    event_buffer_disable($GLOBALS['buffers'][$id], EV_READ | EV_WRITE);
    event_buffer_free($GLOBALS['buffers'][$id]);
    fclose($GLOBALS['connections'][$id]);
    unset($GLOBALS['buffers'][$id], $GLOBALS['connections'][$id]);
}

function ev_read($buffer, $id) {
    while ($read = event_buffer_read($buffer, 256)) {
        var_dump($read);
    }
}

相比libevent,event扩展提供了面向对象的方法,支持libevent 2+ 的特性,对HTTP,DNS,OpenSSL等协议操作进行封装。Kellner框架比较有意思,在PHP的libevent扩展基础上将http请求处理封装成了扩展,使用cli模式处理http请求,并给出了基于Zend Framework 2的示例。

libev自称libevent的替代者,克服了libevent的一些不利影响,开销更小,Node JS便是利用它来做事件驱动。相比基于libeventd的扩展,基于libev的ev扩展更新比较积极,支持设置各种的监听器,为感兴趣的事件注册回调,比如文件变化,超时。CentOS上PHP 5.4安装ev扩展

wget https://pecl.php.net/get/ev-0.2.15.tgz
tar -zxvf ev-0.2.15
cd ev-0.2.15
phpize 
./configure
sudo make
sudo make install

#增加ev.so
sudo vim /etc/php.ini

#是否安装成功
php -m | grep ev

libev封装了各种监视器,操作也比较简单。

<?php
/**
 * 延迟1秒后执行,不重复
 */
$pDelay = new EvTimer(1, 0, function () {
	echo "1 delay \n";
});
/**
 * 每隔一秒执行一次的定时器,0秒后执行
 */
$pTimer = new EvTimer(0, 1, function () {
	echo "1 seconds \n";
});
/**
 * 如果没有其他更高等级的监视器,那么就执行EvIdle,处于低优先级则不执行
 */
$pIdle = new EvIdle(function(){
	sleep(1);
	echo "idle timer \n";
},0,2);
/**
 * 每一次loop开始都会执行
 */
$pPrepare = new EvPrepare(function(){
	echo "before timer \n";
},0);
/**
 * 每一次loop都会执行,可以通过优先级调整执行顺序
 */
$c = new EvCheck(function(){
	echo "after timer \n";
},0,-1);
/**
 * 定时器,每隔1.5秒后执行一次,0秒后开始
 */
$pPeriod = new EvPeriodic(0., 1.5, NULL, function ($w, $revents) {
	echo time(), PHP_EOL;
});
/**
 * IO输入事件监听,可以拿去监听socket的Ev::WRITE和Ev::READ事件
 */
$pReadWatcher = new EvIo(STDIN, Ev::READ, function ($watcher, $revents) {
	echo "STDIN is readable\n";
});

/**
 * 注册监听感兴趣的信号
 */
$pSignal = new EvSignal(SIGTERM, function ($watcher) {
	echo "SIGTERM received\n";
	$watcher->stop();
});
/**
 * 文件变化监听器,10秒监测一次
 */
$pStatWatcher = new EvStat("/var/log/messages", 10, function ($w) {
	echo "/var/log/messages changed\n";
	
	$attr = $pStatWatcher->attr();
	
	if ($attr['nlink']) {
		printf("Current size: %ld\n", $attr['size']);
		printf("Current atime: %ld\n", $attr['atime']);
		printf("Current mtime: %ld\n", $attr['mtime']);
	} else {
		fprintf(STDERR, "`messages` file is not there!");
		$pStatWatcher->stop();
	}
});
	
/**
 * 开始执行Ev::RUN_ONCE则立即执行Ev::RUN_NOWAIT则非阻塞执行
 */
Ev::run();

也可以监听子进程

$pid = pcntl_fork();

if ($pid == -1) {
    fprintf(STDERR, "pcntl_fork failed\n");
} elseif ($pid) {
    $w = new EvChild($pid, FALSE, function ($w, $revents) {
        $w->stop();

        printf("Process %d exited with status %d\n", $w->rpid, $w->rstatus);
    });

    Ev::run();

    // Protect against Zombies
    pcntl_wait($status);
} else {
    //Forked child
    exit(2);
}

php的libev扩展也实现了libev的所有监视器,提供类似的用法,但比较久没更新了。

在网络编程中,使用事件驱动模型监听感兴趣的事件,结合异步处理,能够大大提高服务器性能。传统服务器模型如Apache为每一个请求生成一个子进程。当用户连接到服务器的一个子进程就产生,并处理连接。每个连接获得一个单独的线程和子进程。当用户请求数据返回时,子进程开始等待数据库操作返回。如果此时另一个用户也请求返回数据,这时就产生了阻塞。以下引用自《使用事件驱动模型实现高效稳定的网络服务器程序》

简单网络编程模型里面,服务器与客户端都是一应一答,大部分的 socket 接口都是阻塞型的。在面对多个客户端的请求时候,最简单的解决方式是在服务器端使用多线程(或多进程)。如果要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而线程与进程本身也更容易进入假死状态。
于是便有了“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。
但是,“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用 IO 接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。对付可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。

于是便有了基于事件驱动的非阻塞型服务器,比如Nginx,Node.js。Nginx采用事件驱动,使用epoll事件模型,充分使用异步逻辑,削减了上下文调度开销,并发服务能力更强。Node.js 的异步机制是基于事件的,所有的磁盘 I/O、网络通信、数据库查询都以非阻塞的方式请求,返回的结果由事件循环来处理。Node.js 在执行的过程中会维护一个事件队列,程序在执行时进入事件循环等待下一个事件到来,每个异步式 I/O 请求完成后会被推送到事件队列,等待程序进程进行处理。

参考链接:
libev – a high performance full-featured event loop written in C
Working with events
使用 libevent 和 libev 提高网络应用性能
为什么事件驱动服务器这么火
Asynchronous PHP and Real-time Messaging
react.php 中的异步实现