标签归档:消息队列

PHP ZeroMQ开发

ZeroMQ的名字有点巧妙,看起来是个MQ却加了个0,变得不是MQ。ZeroMQ是一个面向消息传递的网络通信框架,支持程序在进程内部部通信,进程之间通信,网络间通信,多播等。ZeroMQ对Socket进行了封装,支持多种网络结构范式如Request/Reply,Pub/Sub,Pull/Push,中介,路由等,还可以在这些模式再次扩展,动态扩容程序和分布式任务开发,能够轻易搭建服务程序集群。

ZeroMQ与支持AMQP的消息中间件不一样,ZeroMQ是一个网络通信库,需要自行实现中间节点和消息的管理。

在CentOS安装ZeroMQ4

git clone https://github.com/zeromq/zeromq4-x.git
cd zeromq4-x
./autogent.sh
./configure
sudo make
sudo make install

#声明libzmq库的位置
sudo vim /etc/ld.so.conf.d/libzmq.conf
#内容:/usr/local/lib

sudo ldconfig

ZeroMQ支持多种编程语言,也包括PHP。php-zmq安装

git clone https://github.com/mkoppanen/php-zmq.git
cd php-zmq
phpize
./configure
sudo make
sudo make install

sudo vim /etc/php.ini

编辑PHP.ini增加扩展信息

[zeromq]
extension = zmq.so

查看扩展是否加载成功:php -m | grep php。
先写一个简单请求-应答,首先是服务端reply.php

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5559");
while (true){
	$strMessage = $pServer->recv();
	echo $strMessage."\r\n";
	$pServer->send("From Server1:".$strMessage);
}

然后是客户端request.php

<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("tcp://localhost:5559");
$pClient->send("Hello From Client:".uniqid());
var_dump($pClient->recv());

分别在不同终端预先一下程序

#请求者可以先启动
php request.php
#另一个终端
php reply.php

使用ZeroMQ进行通信的步骤

  • 使用ZMQContext创建一个上下文
  • 使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括
    • PUB,SUB
    • REQ,REP
    • REQ,ROUTER (take care, REQ inserts an extra null frame)
    • DEALER,REP (take care, REP assumes a null frame)
    • DEALER,ROUTER
    • DEALER,DEALER
    • ROUTER,ROUTER
    • PUSH,PULL
    • PAIR,PAIR

    分类包括

    • 轮询,REQ,PUSH,DEALER
    • 多播,PUB
    • 公平排队,REP,SUB,PULL,DEALER
    • 明确寻址,ROUTER
    • 单播,PAIR
  • 如果是服务端就bind,如果是客户端就conncet,这里的连接信息支持
    • 进程内部通信,inproc://
    • 进程间通信,ipc://
    • 网络间通信,tcp://
    • 多播,pgm://
  • 使用send/recv发送/接收消息

使用ZeroMQ创建通信比socket简单多了,与stream_socket差不多。但是使用ZeroMQ,客户端可以先启动而不用管服务端是否已经启动了,等服务端连接上了便会自动传递消息,还可以维持节点之间的心跳。

ZeroMQ与socket通信是不一样的。ZeroMQ是无状态的,对socket的细节进行了封装,不能知道彼此的socket连接信息,仅能接收和发送消息;ZeroMQ能够使用一个socket与多个节点进行通信,具有极高的性能。

再回头看一下服务端程序,这里采用while循环来处理,亦即同一时刻只能处理一个请求,多个请求排队直到被轮询到,客户端的发送和接收都是同步等待。由于不知道客户端信息,也不能在子进程内处理完成再返回。这里就需要用到ZeroMQ各种范式的组合,比如下面这个
fig16
这里使用ROUTER和DEALER作为中介,转发请求,客户端可以异步发送求,不用等待服务端响应。

<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");

$pPoll = new ZMQPoll();
$pPoll->add($pFrontend, ZMQ::POLL_IN);
$pPoll->add($pBackend, ZMQ::POLL_IN);


$arrRead = $arrWrite = array();
while(true){
	$nEvent = $pPoll->poll($arrRead, $arrWrite);
    if ($nEvent > 0) {
		foreach($arrRead as $pSocket){
			if($pSocket === $pFrontend){
				while (true){
					$strMessage = $pSocket->recv();
					$nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
					$pBackend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
					if(!$nMore){
						break;
					}
				}
			}
			else if ($pSocket === $pBackend){
				while (true){
					$strMessage = $pSocket->recv();
					$nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
					$pFrontend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
					if(!$nMore){
						break;
					}
				}
			}
		}
    }
}

然后更改服务端reply.php,不再绑定监听,而不是连接到DEALER上

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
//$pServer->bind("tcp://*:5555");
$pServer->connect("tcp://localhost:5560");
while (true){
	$strMessage = $pServer->recv();
	echo $strMessage."\r\n";
	$pServer->send("From Server1:".$strMessage);
}

这里使用ZMQPoll对ZMQSOcket的输入输出事件进行轮询,将ROUTER收到的REQ转发给服务端,将DEALER收到的REP转发给客户端。事实上,还有更简便的方法:使用ZMQDevice将ROUTER和DEALER组合起来

<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");

$pDevice = new ZMQDevice($pFrontend, $pBackend);
$pDevice->run();

ZeroMQ的Pub/Sub的通信模型支持一个发布者发布消息给多个订阅者,也支持一个订阅者从多个发布者订阅消息。首先写一个发布者

<?php
$pContext = new ZMQContext();
$pPublisher = new ZMQSocket($pContext, ZMQ::SOCKET_PUB);
$pPublisher->bind("tcp://*:5563");

while (true) {
    $pPublisher->send("A", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We don't want to see this");
    $pPublisher->send("B", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We would like to see this");
    sleep (1);
}

然后是订阅者

$pContext = new ZMQContext();
$pSubscriber = new ZMQSocket($pContext, ZMQ::SOCKET_SUB);
$pSubscriber->connect("tcp://localhost:5563");
#可以连接多个发布者
$pSubscriber->connect("tcp://localhost:5564");
$pSubscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "B");

while (true) {
    //  Read envelope with address
    $address = $pSubscriber->recv();
    //  Read message contents
    $contents = $pSubscriber->recv();
    printf ("[%s] %s%s", $address, $contents, PHP_EOL);
}

Pub/Sub模型,发布者只能发布消息,要求发布消息前,先声明主题(地址),然后发布消息内容;订阅者只能接收消息,先设置订阅主题,然后两次接收,第一次为消息主题,第二次为消息内容。
Pub/Sub模型通消息为单向流动,可以结合其他模型让订阅者与发布者互动,比如REQ\REP。

ZeroMQ的Push/Pull模型,生产者负责推送消息,消费者负责拉取消息。初看之下Pull/Push模型与Pub/sub模型类似,但是Pull/Push下生产者产生的消息只会投递给一个消费者,并不会发布给全部消费者,适合用于任务投递分配
fig5
Push和Pull都既可作为服务端,也可作为客户端。服务端Push.php

<?php
$pContext = new ZMQContext();
$pPush = new ZMQSocket($pContext, ZMQ::SOCKET_PUSH);

$pPush->bind("tcp://*:5558");
//$pPush->connect("tcp://localhost:5558");
//$pPush->connect("tcp://localhost:5559");

$pPush->send("Hello Client 1");

客户端Pull.php

<?php
$pContext = new ZMQContext();
$pPull = new ZMQSocket($pContext, ZMQ::SOCKET_PULL);

//$pPull->bind("tcp://*:5558");
$pPull->connect("tcp://localhost:5558");
$pPull->connect("tcp://localhost:5559");

var_dump($pPull->recv());

如果同时启动了两个客户端Pull.php,而只启动一个服务端Push.php,那么一次只会有一个客户端接收到消息。也可以以Pull作为主动监听,Push作为被动连接。可以同时接可以Pub/Sub和Pull/Push来处理任务
fig56
如果是用ZeroMQ传递消息收不到,可以按下面这个流程查问题
chapter1_9
除了客户端可以连接多个服务端,服务端同样可以绑定多个地址。在REQ/REP模型里,让服务端同时使用IPC(进程间通信)来处理本机的连接

<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5556");
$pServer->bind("ipc:///tmp/req.ipc");
while(true){
	$message = $pServer->recv();
	echo $message . PHP_EOL;
	$pServer->send("Hello from server1:".$message);
}

客户端可以选择走TCP或者IPC进行消息通信

<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("ipc:///tmp/req.ipc");
//$pClient->connect("tcp://localhost:5556");
$pClient->send("Hello From Client1:".uniqid());
$strMessage = $pClient->recv();
echo $strMessage,PHP_EOL;

使用ZeroMQ的进程内部消息通信也很简单

$pServer  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
$pServer->bind("inproc://reply");


$pClient  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ);
$pClient->connect("inproc://reply");;
$pClient->send("Hello From Client1:".uniqid());

var_dump($pServer->recv());

ZeroMQ为消息传递的提供极简的方法,提供了各种连接模型,可以自由扩展。zguide更像是一个网络编程指南,指导大家如何利用ZeroMQ搭建各种网络通信模式,提高程序扩展性和健壮性。虽然ZeroMQ解决了进程间和网络间的通信问题,但是各个组件本身进程控制仍然需要自行实现。

更新:ZeroMQ的作者用C语言创建了另外一个支持多种通用通信范式的socket库:nanomsg,可以用来代替ZeroMQ做的那些事,提供了更好的伸缩性,也有对应的PHP扩展

参考链接:
ZMQ 指南
ZeroMQ in PHP
zeromq is the answer
ZeroMQ + libevent in PHP
Europycon2011: Implementing distributed application using ZeroMQ
Getting Started with ‘nanomsg’
A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

PHP 进程间通信

上一篇介绍了PHP的多进程开发,进程间通过信号进行交互。这里介绍一下PHP的进程间通信(IPC)方法,包括基于System V IPC通信,如信号量、进程间消息、共享内存和基于socket的IPC通信。

信号量主要作为不同进程以及同一进程不同线程之间的同步手段。信号量是一个特殊的变量,程序对其访问都是原子操作,且只允许对它进行等待和发送信息操作。如果信号量是一个任意的整数,通常被称为计数信号量,或一般信号量;如果信号量只有二进制的0或1,称为二进制信号量。在linux系中,二进制信号量又称Mutex,互斥锁。以下例子采用信号量来协调进程对资源的访问

<?php
$key = ftok ( __FILE__, 's' );
// 同时最多只能有一个进程进入临界区
$sem_id = sem_get ( $key, 1 );
echo "This is a room,can only stay one people!\n\r";
// 派生子进程
$pid = pcntl_fork ();
if ($pid == - 1) {
	exit ( 'fork failed!' );
} else if ($pid > 0) {
	$name = 'parent';
} else {
	$name = 'child';
}
echo "{$name} want to enter the room \n";
sem_acquire ( $sem_id );
// 原子操作开始
echo "{$name} in the room , other people can't enter!\n";
sleep ( 3 );
echo "{$name} leave the room\n";
// 原子操作结束
sem_release ( $sem_id );
if ($pid > 0) {
	pcntl_waitpid ( $pid, $status );
	sem_remove ( $sem_id ); // 移除信号量
}

sem_get和sem_remove分别为创建和销毁信号量。当前进程(父进程)通过sem_acquire获取到信号量后其他进程(子进程)将会一直阻塞直到获取到信号量;在sem_acquire和sem_release之间操作都将是原子性的;当前进程通过sem_release释放所请求的信号量,其他进程便使用,从而实现对资源的有序访问。sem_acquire是阻塞操作,即之后的程序都需要等待获取到信号量后才能继续执行。使用多个信号量控制,需要注意是否会造成死锁。

消息队列提供了一种从一个进程向另一个进程异步发送一个数据块的方法,消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。以下是PHP子进程使用消息队列与父进程进行通信

// 生成key
$message_queue_key = ftok ( __FILE__, 'a' );
// 根据生成的key新建队列,也可自定,如123456
$message_queue = msg_get_queue ( $message_queue_key, 0666 );

$pids = array ();
for($i = 0; $i < 5; $i ++) {
	// 创建子进程
	$pids [$i] = pcntl_fork ();
	
	if ($pids [$i]) {
		echo "No.$i child process was created, the pid is $pids[$i]\r\n";
		pcntl_wait ( $status ); // 非阻塞的线程等待,防止僵尸进程的出现
	} elseif ($pids [$i] == 0) {
		$pid = posix_getpid ();
		echo "process.$pid is writing now\r\n";
		// 写队列
		msg_send ( $message_queue, 1, "this is process.$pid's data\r\n" );
		posix_kill ( $pid, SIGTERM );
	}
}

do {
	// 读队列
	msg_receive ( $message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT );
	echo $message;
	// 获取队列内消息数
	$a = msg_stat_queue ( $message_queue );
	if ($a ['msg_qnum'] == 0) {
		break;
	}
} while ( true );

消息队列存在消息大小及队列长度的限制,一旦超过将写不进去。

共享内存使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。一个进程在内存创建了一个共享区域,其他进程也可以对这块内存区域进行访问

<?php
//Check the command line arguments
if(sizeof($argv) < 2) {
     echo  "Usage: php shared_memory.php <send|get|delete> <integer identifier> <value>\n";
     exit;
}
 
//Define shared memory segment properties.
$key = "987654";
$permissions = 0666;
$size = 1024;
 
//Create or open the shared memory segment.
$segment = shm_attach($key, $size, $permissions);
 
//Handle operations for the segment.
switch($argv[1]) {
     case "send":
          shm_put_var($segment, $argv[2], $argv[3]);
          echo "Message sent to shared memory segment.\n";
          break;
     case "get":
          $data = shm_get_var($segment, $argv[2]);
          echo "Received data: {$data}\n";
          break;
     case "delete":
          shm_remove($segment);
          echo "Shared memory segment released.\n";
          break;
}

共享内存并未提供同步机制,往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。PHP还有另外一个共享内存扩展:shmop。注意,共享内存存在内存限制。

Sockets IPC提供了进程间双向的点对点通信。通过socket_create_pair创建一对socket,作为上行和下行,父进程和子进程分别使用其中一个进行读写通信

<?php
$sockets = array();
$strone = 'Message From Parent.';
$strtwo = 'Message From Child.';

if (socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $sockets) === false) {
    echo "socket_create_pair() failed. Reason: ".socket_strerror(socket_last_error());
}
$pid = pcntl_fork();
if ($pid == -1) {
    echo 'Could not fork Process.';
} elseif ($pid) {
    /*parent*/
    socket_close($sockets[0]);
    if (socket_write($sockets[1], $strone, strlen($strone)) === false) {
        echo "socket_write() failed. Reason: ".socket_strerror(socket_last_error($sockets[1]));
    }
    if (socket_read($sockets[1], strlen($strtwo), PHP_BINARY_READ) == $strtwo) {
        echo "Recieved $strtwo\n";
    }
    socket_close($sockets[1]);
} else {
    /*child*/
    socket_close($sockets[1]);
    if (socket_write($sockets[0], $strtwo, strlen($strtwo)) === false) {
        echo "socket_write() failed. Reason: ".socket_strerror(socket_last_error($sockets[0]));
    }
    if (socket_read($sockets[0], strlen($strone), PHP_BINARY_READ) == $strone) {
        echo "Recieved $strone\n";
    }
    socket_close($sockets[0]);
}

以下例子是github上一个利用PHP Sockets IP开发的多进程任务处理

<?php
namespace Lifo\IPC;

declare(ticks = 1);

interface ProcessInterface
{
	public function run($parent);
}

class ProcessPoolException extends \Exception
{
	
}

class ProcessPool
{
    /** @var Integer Maximum workers allowed at once */
    protected $max;
    /** @var boolean If true workers will fork. Otherwise they will run synchronously */
    protected $fork;
    /** @var Integer Total results collected */
    protected $count;
    /** @var array Pending processes that have not been started yet */
    protected $pending;
    /** @var array Processes that have been started */
    protected $workers;
    /** @var array Results that have been collected */
    protected $results;
    /** @var \Closure Function to call every time a child is forked */
    protected $createCallback;
    /** @var array children PID's that died prematurely */
    private   $caught;
    /** @var boolean Is the signal handler initialized? */
    private   $initialized;
    private static $instance = array();
    public function __construct($max = 1, $fork = true)
    {
        //$pid = getmypid();
        //if (isset(self::$instance[$pid])) {
        //    $caller = debug_backtrace();
        //    throw new ProcessPoolException("Cannot instantiate more than 1 ProcessPool in the same process in {$caller[0]['file']} line {$caller[0]['line']}");
        //}
        //self::$instance[$pid] = $this;
        $this->count = 0;
        $this->max = $max;
        $this->fork = $fork;
        $this->results = array();
        $this->workers = array();
        $this->pending = array();
        $this->caught = array();
        $this->initialized = false;
    }
    public function __destruct()
    {
        // make sure signal handler is removed
        $this->uninit();
        //unset(self::$instance[getmygid()]);
    }
    /**
     * Initialize the signal handler.
     *
     * Note: This will replace any current handler for SIGCHLD.
     *
     * @param boolean $force Force initialization even if already initialized
     */
    private function init($force = false)
    {
        if ($this->initialized and !$force) {
            return;
        }
        $this->initialized = true;
        pcntl_signal(SIGCHLD, array($this, 'signalHandler'));
    }
    private function uninit()
    {
        if (!$this->initialized) {
            return;
        }
        $this->initialized = false;
        pcntl_signal(SIGCHLD, SIG_DFL);
    }
    public function signalHandler($signo)
    {
        switch ($signo) {
            case SIGCHLD:
                $this->reaper();
                break;
        }
    }
    /**
     * Reap any dead children
     */
    public function reaper($pid = null, $status = null)
    {
        if ($pid === null) {
            $pid = pcntl_waitpid(-1, $status, WNOHANG);
        }
        while ($pid > 0) {
            if (isset($this->workers[$pid])) {
                // @todo does the socket really need to be closed?
                //@socket_close($this->workers[$pid]['socket']);
                unset($this->workers[$pid]);
            } else {
                // the child died before the parent could initialize the $worker
                // queue. So we track it temporarily so we can handle it in
                // self::create().
                $this->caught[$pid] = $status;
            }
            $pid = pcntl_waitpid(-1, $status, WNOHANG);
        }
    }
    /**
     * Wait for any child to be ready
     *
     * @param integer $timeout Timeout to wait (fractional seconds)
     * @return array|null Returns array of sockets ready to be READ or null
     */
    public function wait($timeout = null)
    {
        $x = null;                      // trash var needed for socket_select
        $startTime = microtime(true);
        while (true) {
            $this->apply();                         // maintain worker queue
            // check each child socket pair for a new result
            $read = array_map(function($w){ return $w['socket']; }, $this->workers);
            // it's possible for no workers/sockets to be present due to REAPING
            if (!empty($read)) {
                $ok = @socket_select($read, $x, $x, $timeout);
                if ($ok !== false and $ok > 0) {
                    return $read;
                }
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                return null;
            }
            // no sense in waiting if we have no workers and no more pending
            if (empty($this->workers) and empty($this->pending)) {
                return null;
            }
        }
    }
    /**
     * Return the next available result.
     *
     * Blocks unless a $timeout is specified.
     *
     * @param integer $timeout Timeout in fractional seconds if no results are available.
     * @return mixed Returns next child response or null on timeout
     * @throws ProcessPoolException On timeout if $nullOnTimeout is false
     */
    public function get($timeout = null, $nullOnTimeout = false)
    {
        $startTime = microtime(true);
        while ($this->getPending()) {
            // return the next result
            if ($this->hasResult()) {
                return $this->getResult();
            }
            // wait for the next result
            $ready = $this->wait($timeout);
            if (is_array($ready)) {
                foreach ($ready as $socket) {
                    $res = self::socket_fetch($socket);
                    if ($res !== null) {
                        $this->results[] = $res;
                        $this->count++;
                    }
                }
                if ($this->hasResult()) {
                    return $this->getResult();
                }
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                if ($nullOnTimeout) {
                    return null;
                }
                throw new ProcessPoolException("Timeout");
            }
        }
    }
    /**
     * Return results from all workers.
     *
     * Does not return until all pending workers are complete or the $timeout
     * is reached.
     *
     * @param integer $timeout Timeout in fractional seconds if no results are available.
     * @return array Returns an array of results
     * @throws ProcessPoolException On timeout if $nullOnTimeout is false
     */
    public function getAll($timeout = null, $nullOnTimeout = false)
    {
        $results = array();
        $startTime = microtime(true);
        while ($this->getPending()) {
            try {
                $res = $this->get($timeout);
                if ($res !== null) {
                    $results[] = $res;
                }
            } catch (ProcessPoolException $e) {
                // timed out
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                if ($nullOnTimeout) {
                    return null;
                }
                throw new ProcessPoolException("Timeout");
            }
        }
        return $results;
    }
    public function hasResult()
    {
        return !empty($this->results);
    }
    /**
     * Return the next available result or null if none are available.
     *
     * This does not wait or manage the worker queue.
     */
    public function getResult()
    {
        if (empty($this->results)) {
            return null;
        }
        return array_shift($this->results);
    }
    /**
     * Apply a worker to the working or pending queue
     *
     * @param Callable $func Callback function to fork into.
     * @return ProcessPool
     */
    public function apply($func = null)
    {
        // add new function to pending queue
        if ($func !== null) {
            if ($func instanceof \Closure or $func instanceof ProcessInterface or is_callable($func)) {
                $this->pending[] = func_get_args();
            } else {
                throw new \UnexpectedValueException("Parameter 1 in ProcessPool#apply must be a Closure or callable");
            }
        }
        // start a new worker if our current worker queue is low
        if (!empty($this->pending) and count($this->workers) < $this->max) {
            call_user_func_array(array($this, 'create'), array_shift($this->pending));
        }
        return $this;
    }
    /**
     * Create a new worker.
     *
     * If forking is disabled this will BLOCK.
     *
     * @param Closure $func Callback function.
     * @param mixed Any extra parameters are passed to the callback function.
     * @throws \RuntimeException if the child can not be forked.
     */
    protected function create($func /*, ...*/)
    {
        // create a socket pair before forking so our child process can write to the PARENT.
        $sockets = array();
        $domain = strtoupper(substr(PHP_OS, 0, 3)) == 'WIN' ? AF_INET : AF_UNIX;
        if (socket_create_pair($domain, SOCK_STREAM, 0, $sockets) === false) {
            throw new \RuntimeException("socket_create_pair failed: " . socket_strerror(socket_last_error()));
        }
        list($child, $parent) = $sockets; // just to make the code below more readable
        unset($sockets);
        $args = array_merge(array($parent), array_slice(func_get_args(), 1));
        $this->init();                  // make sure signal handler is installed
        if ($this->fork) {
            $pid = pcntl_fork();
            if ($pid == -1) {
                throw new \RuntimeException("Could not fork");
            }
            if ($pid > 0) {
                // PARENT PROCESS; Just track the child and return
                socket_close($parent);
                $this->workers[$pid] = array(
                    'pid' => $pid,
                    'socket' => $child,
                );
                // don't pass $parent to callback
                $this->doOnCreate(array_slice($args, 1));
                // If a SIGCHLD was already caught at this point we need to
                // manually handle it to avoid a defunct process.
                if (isset($this->caught[$pid])) {
                    $this->reaper($pid, $this->caught[$pid]);
                    unset($this->caught[$pid]);
                }
            } else {
                // CHILD PROCESS; execute the callback function and wait for response
                socket_close($child);
                try {
                    if ($func instanceof ProcessInterface) {
                        $result = call_user_func_array(array($func, 'run'), $args);
                    } else {
                        $result = call_user_func_array($func, $args);
                    }
                    if ($result !== null) {
                        self::socket_send($parent, $result);
                    }
                } catch (\Exception $e) {
                    // this is kind of useless in a forking context but at
                    // least the developer can see the exception if it occurs.
                    throw $e;
                }
                exit(0);
            }
        } else {
            // forking is disabled so we simply run the child worker and wait
            // synchronously for response.
            try {
                if ($func instanceof ProcessInterface) {
                    $result = call_user_func_array(array($func, 'run'), $args);
                } else {
                    $result = call_user_func_array($func, $args);
                }
                if ($result !== null) {
                    //$this->results[] = $result;
                    self::socket_send($parent, $result);
                }
                // read anything pending from the worker if they chose to write
                // to the socket instead of just returning a value.
                $x = null;
                do {
                    $read = array($child);
                    $ok = socket_select($read, $x, $x, 0);
                    if ($ok !== false and $ok > 0) {
                        $res = self::socket_fetch($read[0]);
                        if ($res !== null) {
                            $this->results[] = $res;
                        }
                    }
                } while ($ok);
            } catch (\Exception $e) {
                // nop; we didn't fork so let the caller handle it
                throw $e;
            }
        }
    }
    /**
     * Clear all pending workers from the queue.
     */
    public function clear()
    {
        $this->pending = array();
        return $this;
    }
    /**
     * Send a SIGTERM (or other) signal to the PID given
     */
    public function kill($pid, $signo = SIGTERM)
    {
        posix_kill($pid, $signo);
        return $this;
    }
    /**
     * Send a SIGTERM (or other) signal to all current workers
     */
    public function killAll($signo = SIGTERM)
    {
        foreach ($this->workers as $w) {
            $this->kill($w['pid'], $signo);
        }
        return $this;
    }
    /**
     * Set a callback when a new forked process is created. This will allow the
     * parent to perform some sort of cleanup after every child is created.
     *
     * This is useful to reinitialize certain resources like DB connections
     * since children will inherit the parent resources.
     *
     * @param \Closure $callback Function to callback after every forked child.
     */
    public function setOnCreate(\Closure $callback = null)
    {
        $this->createCallback = $callback;
    }
    protected function doOnCreate($args = array())
    {
        if ($this->createCallback) {
            call_user_func_array($this->createCallback, $args);
        }
    }
    /**
     * Return the total jobs that have NOT completed yet.
     */
    public function getPending($pendingOnly = false)
    {
        if ($pendingOnly) {
            return count($this->pending);
        }
        return count($this->pending) + count($this->workers) + count($this->results);
    }
    public function getWorkers()
    {
        return count($this->workers);
    }
    public function getActive()
    {
        return count($this->pending) + count($this->workers);
    }
    public function getCompleted()
    {
        return $this->count;
    }
    public function setForking($fork)
    {
        $this->fork = $fork;
        return $this;
    }
    public function setMax($max)
    {
        if (!is_numeric($max) or $max < 1) {
            throw new \InvalidArgumentException("Max value must be > 0");
        }
        $this->max = $max;
        return $this;
    }
    public function getMax()
    {
        return $this->max;
    }
    /**
     * Write the data to the socket in a predetermined format
     */
    public static function socket_send($socket, $data)
    {
        $serialized = serialize($data);
        $hdr = pack('N', strlen($serialized));    // 4 byte length
        $buffer = $hdr . $serialized;
        $total = strlen($buffer);
        while (true) {
            $sent = socket_write($socket, $buffer);
            if ($sent === false) {
                // @todo handle error?
                //$error = socket_strerror(socket_last_error());
                break;
            }
            if ($sent >= $total) {
                break;
            }
            $total -= $sent;
            $buffer = substr($buffer, $sent);
        }
    }
    /**
     * Read a data packet from the socket in a predetermined format.
     *
     * Blocking.
     *
     */
    public static function socket_fetch($socket)
    {
        // read 4 byte length first
        $hdr = '';
        do {
            $read = socket_read($socket, 4 - strlen($hdr));
            if ($read === false or $read === '') {
                return null;
            }
            $hdr .= $read;
        } while (strlen($hdr) < 4);
        list($len) = array_values(unpack("N", $hdr));
        // read the full buffer
        $buffer = '';
        do {
            $read = socket_read($socket, $len - strlen($buffer));
            if ($read === false or $read == '') {
                return null;
            }
            $buffer .= $read;
        } while (strlen($buffer) < $len);
        $data = unserialize($buffer);
        return $data;
    }
}


$pool = new ProcessPool(16);
for ($i=0; $i<100; $i++) {
	$pool->apply(function($parent) use ($i) {
		echo "$i running...\n";
		mt_srand(); // must re-seed for each child
		$rand = mt_rand(1000000, 2000000);
		usleep($rand);
		return $i . ' : slept for ' . ($rand / 1000000) . ' seconds';
        });
}
while ($pool->getPending()) {
	try {
		$result = $pool->get(1);    // timeout in 1 second
		echo "GOT: ", $result, "\n";
	} catch (ProcessPoolException $e) {
			// timeout
	}
}

当前进程(父进程)添加任务时交由其他进程(子进程)处理,不阻塞当前进程;其他进程运行结束后,通过socket返回结果给父进程。

当然进程间通信也可以通过文件(FIFO)或者类似的中介角色如异步消息队列,Mysql,Redis等等进行交互。

参考链接:
Semaphore, Shared Memory and IPC
深刻理解Linux进程间通信(IPC)
PHP IPC with Daemon Service using Message Queues, Shared Memory and Semaphores
关于PHP你可能不知道的-PHP的事件驱动化设计
Store datasets directly in shared memory with PHP
PHP Dark Arts: Shared Memory Segments (IPC)
Something Like Threading – PHP Process Forking and Interprocess Communication
Mimicking Threading in PHP
基于System V Message queue的PHP消息队列封装
PHP进程间通信System V消息队列
PHP进程间通信System V信号量
we Do web sockets on PHP from null. A part 2. IPC
proc_open