ZeroMQ的名字有点巧妙,看起来是个MQ却加了个0,变得不是MQ。ZeroMQ是一个面向消息传递的网络通信框架,支持程序在进程内部部通信,进程之间通信,网络间通信,多播等。ZeroMQ对Socket进行了封装,支持多种网络结构范式如Request/Reply,Pub/Sub,Pull/Push,中介,路由等,还可以在这些模式再次扩展,动态扩容程序和分布式任务开发,能够轻易搭建服务程序集群。
ZeroMQ与支持AMQP的消息中间件不一样,ZeroMQ是一个网络通信库,需要自行实现中间节点和消息的管理。
在CentOS安装ZeroMQ4
git clone https://github.com/zeromq/zeromq4-x.git cd zeromq4-x ./autogent.sh ./configure sudo make sudo make install #声明libzmq库的位置 sudo vim /etc/ld.so.conf.d/libzmq.conf #内容:/usr/local/lib sudo ldconfig
ZeroMQ支持多种编程语言,也包括PHP。php-zmq安装
git clone https://github.com/mkoppanen/php-zmq.git cd php-zmq phpize ./configure sudo make sudo make install sudo vim /etc/php.ini
编辑PHP.ini增加扩展信息
[zeromq] extension = zmq.so
查看扩展是否加载成功:php -m | grep php。
先写一个简单请求-应答,首先是服务端reply.php
<?php $pContext = new ZMQContext(); $pServer = new ZMQSocket($pContext, ZMQ::SOCKET_REP); $pServer->bind("tcp://*:5559"); while (true){ $strMessage = $pServer->recv(); echo $strMessage."\r\n"; $pServer->send("From Server1:".$strMessage); }
然后是客户端request.php
<?php $pContext = new ZMQContext(); $pClient = new ZMQSocket($pContext, ZMQ::SOCKET_REQ); $pClient->connect("tcp://localhost:5559"); $pClient->send("Hello From Client:".uniqid()); var_dump($pClient->recv());
分别在不同终端预先一下程序
#请求者可以先启动 php request.php #另一个终端 php reply.php
使用ZeroMQ进行通信的步骤
- 使用ZMQContext创建一个上下文
- 使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括
- PUB,SUB
- REQ,REP
- REQ,ROUTER (take care, REQ inserts an extra null frame)
- DEALER,REP (take care, REP assumes a null frame)
- DEALER,ROUTER
- DEALER,DEALER
- ROUTER,ROUTER
- PUSH,PULL
- PAIR,PAIR
分类包括
- 轮询,REQ,PUSH,DEALER
- 多播,PUB
- 公平排队,REP,SUB,PULL,DEALER
- 明确寻址,ROUTER
- 单播,PAIR
- 如果是服务端就bind,如果是客户端就conncet,这里的连接信息支持
- 进程内部通信,inproc://
- 进程间通信,ipc://
- 网络间通信,tcp://
- 多播,pgm://
- 使用send/recv发送/接收消息
使用ZeroMQ创建通信比socket简单多了,与stream_socket差不多。但是使用ZeroMQ,客户端可以先启动而不用管服务端是否已经启动了,等服务端连接上了便会自动传递消息,还可以维持节点之间的心跳。
ZeroMQ与socket通信是不一样的。ZeroMQ是无状态的,对socket的细节进行了封装,不能知道彼此的socket连接信息,仅能接收和发送消息;ZeroMQ能够使用一个socket与多个节点进行通信,具有极高的性能。
再回头看一下服务端程序,这里采用while循环来处理,亦即同一时刻只能处理一个请求,多个请求排队直到被轮询到,客户端的发送和接收都是同步等待。由于不知道客户端信息,也不能在子进程内处理完成再返回。这里就需要用到ZeroMQ各种范式的组合,比如下面这个
这里使用ROUTER和DEALER作为中介,转发请求,客户端可以异步发送求,不用等待服务端响应。
<?php $pContext = new ZMQContext(); $pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER); $pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER); $pFrontend->bind("tcp://*:5559"); $pBackend->bind("tcp://*:5560"); $pPoll = new ZMQPoll(); $pPoll->add($pFrontend, ZMQ::POLL_IN); $pPoll->add($pBackend, ZMQ::POLL_IN); $arrRead = $arrWrite = array(); while(true){ $nEvent = $pPoll->poll($arrRead, $arrWrite); if ($nEvent > 0) { foreach($arrRead as $pSocket){ if($pSocket === $pFrontend){ while (true){ $strMessage = $pSocket->recv(); $nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE); $pBackend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null); if(!$nMore){ break; } } } else if ($pSocket === $pBackend){ while (true){ $strMessage = $pSocket->recv(); $nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE); $pFrontend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null); if(!$nMore){ break; } } } } } }
然后更改服务端reply.php,不再绑定监听,而不是连接到DEALER上
<?php $pContext = new ZMQContext(); $pServer = new ZMQSocket($pContext, ZMQ::SOCKET_REP); //$pServer->bind("tcp://*:5555"); $pServer->connect("tcp://localhost:5560"); while (true){ $strMessage = $pServer->recv(); echo $strMessage."\r\n"; $pServer->send("From Server1:".$strMessage); }
这里使用ZMQPoll对ZMQSOcket的输入输出事件进行轮询,将ROUTER收到的REQ转发给服务端,将DEALER收到的REP转发给客户端。事实上,还有更简便的方法:使用ZMQDevice将ROUTER和DEALER组合起来
<?php $pContext = new ZMQContext(); $pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER); $pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER); $pFrontend->bind("tcp://*:5559"); $pBackend->bind("tcp://*:5560"); $pDevice = new ZMQDevice($pFrontend, $pBackend); $pDevice->run();
ZeroMQ的Pub/Sub的通信模型支持一个发布者发布消息给多个订阅者,也支持一个订阅者从多个发布者订阅消息。首先写一个发布者
<?php $pContext = new ZMQContext(); $pPublisher = new ZMQSocket($pContext, ZMQ::SOCKET_PUB); $pPublisher->bind("tcp://*:5563"); while (true) { $pPublisher->send("A", ZMQ::MODE_SNDMORE); $pPublisher->send("1:We don't want to see this"); $pPublisher->send("B", ZMQ::MODE_SNDMORE); $pPublisher->send("1:We would like to see this"); sleep (1); }
然后是订阅者
$pContext = new ZMQContext(); $pSubscriber = new ZMQSocket($pContext, ZMQ::SOCKET_SUB); $pSubscriber->connect("tcp://localhost:5563"); #可以连接多个发布者 $pSubscriber->connect("tcp://localhost:5564"); $pSubscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "B"); while (true) { // Read envelope with address $address = $pSubscriber->recv(); // Read message contents $contents = $pSubscriber->recv(); printf ("[%s] %s%s", $address, $contents, PHP_EOL); }
Pub/Sub模型,发布者只能发布消息,要求发布消息前,先声明主题(地址),然后发布消息内容;订阅者只能接收消息,先设置订阅主题,然后两次接收,第一次为消息主题,第二次为消息内容。
Pub/Sub模型通消息为单向流动,可以结合其他模型让订阅者与发布者互动,比如REQ\REP。
ZeroMQ的Push/Pull模型,生产者负责推送消息,消费者负责拉取消息。初看之下Pull/Push模型与Pub/sub模型类似,但是Pull/Push下生产者产生的消息只会投递给一个消费者,并不会发布给全部消费者,适合用于任务投递分配
Push和Pull都既可作为服务端,也可作为客户端。服务端Push.php
<?php $pContext = new ZMQContext(); $pPush = new ZMQSocket($pContext, ZMQ::SOCKET_PUSH); $pPush->bind("tcp://*:5558"); //$pPush->connect("tcp://localhost:5558"); //$pPush->connect("tcp://localhost:5559"); $pPush->send("Hello Client 1");
客户端Pull.php
<?php $pContext = new ZMQContext(); $pPull = new ZMQSocket($pContext, ZMQ::SOCKET_PULL); //$pPull->bind("tcp://*:5558"); $pPull->connect("tcp://localhost:5558"); $pPull->connect("tcp://localhost:5559"); var_dump($pPull->recv());
如果同时启动了两个客户端Pull.php,而只启动一个服务端Push.php,那么一次只会有一个客户端接收到消息。也可以以Pull作为主动监听,Push作为被动连接。可以同时接可以Pub/Sub和Pull/Push来处理任务
如果是用ZeroMQ传递消息收不到,可以按下面这个流程查问题
除了客户端可以连接多个服务端,服务端同样可以绑定多个地址。在REQ/REP模型里,让服务端同时使用IPC(进程间通信)来处理本机的连接
<?php $pContext = new ZMQContext(); $pServer = new ZMQSocket($pContext, ZMQ::SOCKET_REP); $pServer->bind("tcp://*:5556"); $pServer->bind("ipc:///tmp/req.ipc"); while(true){ $message = $pServer->recv(); echo $message . PHP_EOL; $pServer->send("Hello from server1:".$message); }
客户端可以选择走TCP或者IPC进行消息通信
<?php $pContext = new ZMQContext(); $pClient = new ZMQSocket($pContext, ZMQ::SOCKET_REQ); $pClient->connect("ipc:///tmp/req.ipc"); //$pClient->connect("tcp://localhost:5556"); $pClient->send("Hello From Client1:".uniqid()); $strMessage = $pClient->recv(); echo $strMessage,PHP_EOL;
使用ZeroMQ的进程内部消息通信也很简单
$pServer = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP); $pServer->bind("inproc://reply"); $pClient = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ); $pClient->connect("inproc://reply");; $pClient->send("Hello From Client1:".uniqid()); var_dump($pServer->recv());
ZeroMQ为消息传递的提供极简的方法,提供了各种连接模型,可以自由扩展。zguide更像是一个网络编程指南,指导大家如何利用ZeroMQ搭建各种网络通信模式,提高程序扩展性和健壮性。虽然ZeroMQ解决了进程间和网络间的通信问题,但是各个组件本身进程控制仍然需要自行实现。
更新:ZeroMQ的作者用C语言创建了另外一个支持多种通用通信范式的socket库:nanomsg,可以用来代替ZeroMQ做的那些事,提供了更好的伸缩性,也有对应的PHP扩展。
参考链接:
ZMQ 指南
ZeroMQ in PHP
zeromq is the answer
ZeroMQ + libevent in PHP
Europycon2011: Implementing distributed application using ZeroMQ
Getting Started with ‘nanomsg’
A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)