上一篇介绍了PHP的多进程开发,进程间通过信号进行交互。这里介绍一下PHP的进程间通信(IPC)方法,包括基于System V IPC通信,如信号量、进程间消息、共享内存和基于socket的IPC通信。
信号量主要作为不同进程以及同一进程不同线程之间的同步手段。信号量是一个特殊的变量,程序对其访问都是原子操作,且只允许对它进行等待和发送信息操作。如果信号量是一个任意的整数,通常被称为计数信号量,或一般信号量;如果信号量只有二进制的0或1,称为二进制信号量。在linux系中,二进制信号量又称Mutex,互斥锁。以下例子采用信号量来协调进程对资源的访问
<?php
$key = ftok ( __FILE__, 's' );
// 同时最多只能有一个进程进入临界区
$sem_id = sem_get ( $key, 1 );
echo "This is a room,can only stay one people!\n\r";
// 派生子进程
$pid = pcntl_fork ();
if ($pid == - 1) {
exit ( 'fork failed!' );
} else if ($pid > 0) {
$name = 'parent';
} else {
$name = 'child';
}
echo "{$name} want to enter the room \n";
sem_acquire ( $sem_id );
// 原子操作开始
echo "{$name} in the room , other people can't enter!\n";
sleep ( 3 );
echo "{$name} leave the room\n";
// 原子操作结束
sem_release ( $sem_id );
if ($pid > 0) {
pcntl_waitpid ( $pid, $status );
sem_remove ( $sem_id ); // 移除信号量
}
sem_get和sem_remove分别为创建和销毁信号量。当前进程(父进程)通过sem_acquire获取到信号量后其他进程(子进程)将会一直阻塞直到获取到信号量;在sem_acquire和sem_release之间操作都将是原子性的;当前进程通过sem_release释放所请求的信号量,其他进程便使用,从而实现对资源的有序访问。sem_acquire是阻塞操作,即之后的程序都需要等待获取到信号量后才能继续执行。使用多个信号量控制,需要注意是否会造成死锁。
消息队列提供了一种从一个进程向另一个进程异步发送一个数据块的方法,消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。以下是PHP子进程使用消息队列与父进程进行通信
// 生成key
$message_queue_key = ftok ( __FILE__, 'a' );
// 根据生成的key新建队列,也可自定,如123456
$message_queue = msg_get_queue ( $message_queue_key, 0666 );
$pids = array ();
for($i = 0; $i < 5; $i ++) {
// 创建子进程
$pids [$i] = pcntl_fork ();
if ($pids [$i]) {
echo "No.$i child process was created, the pid is $pids[$i]\r\n";
pcntl_wait ( $status ); // 非阻塞的线程等待,防止僵尸进程的出现
} elseif ($pids [$i] == 0) {
$pid = posix_getpid ();
echo "process.$pid is writing now\r\n";
// 写队列
msg_send ( $message_queue, 1, "this is process.$pid's data\r\n" );
posix_kill ( $pid, SIGTERM );
}
}
do {
// 读队列
msg_receive ( $message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT );
echo $message;
// 获取队列内消息数
$a = msg_stat_queue ( $message_queue );
if ($a ['msg_qnum'] == 0) {
break;
}
} while ( true );
消息队列存在消息大小及队列长度的限制,一旦超过将写不进去。
共享内存使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。一个进程在内存创建了一个共享区域,其他进程也可以对这块内存区域进行访问
<?php
//Check the command line arguments
if(sizeof($argv) < 2) {
echo "Usage: php shared_memory.php <send|get|delete> <integer identifier> <value>\n";
exit;
}
//Define shared memory segment properties.
$key = "987654";
$permissions = 0666;
$size = 1024;
//Create or open the shared memory segment.
$segment = shm_attach($key, $size, $permissions);
//Handle operations for the segment.
switch($argv[1]) {
case "send":
shm_put_var($segment, $argv[2], $argv[3]);
echo "Message sent to shared memory segment.\n";
break;
case "get":
$data = shm_get_var($segment, $argv[2]);
echo "Received data: {$data}\n";
break;
case "delete":
shm_remove($segment);
echo "Shared memory segment released.\n";
break;
}
共享内存并未提供同步机制,往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。PHP还有另外一个共享内存扩展:shmop。注意,共享内存存在内存限制。
Sockets IPC提供了进程间双向的点对点通信。通过socket_create_pair创建一对socket,作为上行和下行,父进程和子进程分别使用其中一个进行读写通信
<?php
$sockets = array();
$strone = 'Message From Parent.';
$strtwo = 'Message From Child.';
if (socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $sockets) === false) {
echo "socket_create_pair() failed. Reason: ".socket_strerror(socket_last_error());
}
$pid = pcntl_fork();
if ($pid == -1) {
echo 'Could not fork Process.';
} elseif ($pid) {
/*parent*/
socket_close($sockets[0]);
if (socket_write($sockets[1], $strone, strlen($strone)) === false) {
echo "socket_write() failed. Reason: ".socket_strerror(socket_last_error($sockets[1]));
}
if (socket_read($sockets[1], strlen($strtwo), PHP_BINARY_READ) == $strtwo) {
echo "Recieved $strtwo\n";
}
socket_close($sockets[1]);
} else {
/*child*/
socket_close($sockets[1]);
if (socket_write($sockets[0], $strtwo, strlen($strtwo)) === false) {
echo "socket_write() failed. Reason: ".socket_strerror(socket_last_error($sockets[0]));
}
if (socket_read($sockets[0], strlen($strone), PHP_BINARY_READ) == $strone) {
echo "Recieved $strone\n";
}
socket_close($sockets[0]);
}
以下例子是github上一个利用PHP Sockets IP开发的多进程任务处理
<?php
namespace Lifo\IPC;
declare(ticks = 1);
interface ProcessInterface
{
public function run($parent);
}
class ProcessPoolException extends \Exception
{
}
class ProcessPool
{
/** @var Integer Maximum workers allowed at once */
protected $max;
/** @var boolean If true workers will fork. Otherwise they will run synchronously */
protected $fork;
/** @var Integer Total results collected */
protected $count;
/** @var array Pending processes that have not been started yet */
protected $pending;
/** @var array Processes that have been started */
protected $workers;
/** @var array Results that have been collected */
protected $results;
/** @var \Closure Function to call every time a child is forked */
protected $createCallback;
/** @var array children PID's that died prematurely */
private $caught;
/** @var boolean Is the signal handler initialized? */
private $initialized;
private static $instance = array();
public function __construct($max = 1, $fork = true)
{
//$pid = getmypid();
//if (isset(self::$instance[$pid])) {
// $caller = debug_backtrace();
// throw new ProcessPoolException("Cannot instantiate more than 1 ProcessPool in the same process in {$caller[0]['file']} line {$caller[0]['line']}");
//}
//self::$instance[$pid] = $this;
$this->count = 0;
$this->max = $max;
$this->fork = $fork;
$this->results = array();
$this->workers = array();
$this->pending = array();
$this->caught = array();
$this->initialized = false;
}
public function __destruct()
{
// make sure signal handler is removed
$this->uninit();
//unset(self::$instance[getmygid()]);
}
/**
* Initialize the signal handler.
*
* Note: This will replace any current handler for SIGCHLD.
*
* @param boolean $force Force initialization even if already initialized
*/
private function init($force = false)
{
if ($this->initialized and !$force) {
return;
}
$this->initialized = true;
pcntl_signal(SIGCHLD, array($this, 'signalHandler'));
}
private function uninit()
{
if (!$this->initialized) {
return;
}
$this->initialized = false;
pcntl_signal(SIGCHLD, SIG_DFL);
}
public function signalHandler($signo)
{
switch ($signo) {
case SIGCHLD:
$this->reaper();
break;
}
}
/**
* Reap any dead children
*/
public function reaper($pid = null, $status = null)
{
if ($pid === null) {
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
while ($pid > 0) {
if (isset($this->workers[$pid])) {
// @todo does the socket really need to be closed?
//@socket_close($this->workers[$pid]['socket']);
unset($this->workers[$pid]);
} else {
// the child died before the parent could initialize the $worker
// queue. So we track it temporarily so we can handle it in
// self::create().
$this->caught[$pid] = $status;
}
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
}
/**
* Wait for any child to be ready
*
* @param integer $timeout Timeout to wait (fractional seconds)
* @return array|null Returns array of sockets ready to be READ or null
*/
public function wait($timeout = null)
{
$x = null; // trash var needed for socket_select
$startTime = microtime(true);
while (true) {
$this->apply(); // maintain worker queue
// check each child socket pair for a new result
$read = array_map(function($w){ return $w['socket']; }, $this->workers);
// it's possible for no workers/sockets to be present due to REAPING
if (!empty($read)) {
$ok = @socket_select($read, $x, $x, $timeout);
if ($ok !== false and $ok > 0) {
return $read;
}
}
// timed out?
if ($timeout and microtime(true) - $startTime > $timeout) {
return null;
}
// no sense in waiting if we have no workers and no more pending
if (empty($this->workers) and empty($this->pending)) {
return null;
}
}
}
/**
* Return the next available result.
*
* Blocks unless a $timeout is specified.
*
* @param integer $timeout Timeout in fractional seconds if no results are available.
* @return mixed Returns next child response or null on timeout
* @throws ProcessPoolException On timeout if $nullOnTimeout is false
*/
public function get($timeout = null, $nullOnTimeout = false)
{
$startTime = microtime(true);
while ($this->getPending()) {
// return the next result
if ($this->hasResult()) {
return $this->getResult();
}
// wait for the next result
$ready = $this->wait($timeout);
if (is_array($ready)) {
foreach ($ready as $socket) {
$res = self::socket_fetch($socket);
if ($res !== null) {
$this->results[] = $res;
$this->count++;
}
}
if ($this->hasResult()) {
return $this->getResult();
}
}
// timed out?
if ($timeout and microtime(true) - $startTime > $timeout) {
if ($nullOnTimeout) {
return null;
}
throw new ProcessPoolException("Timeout");
}
}
}
/**
* Return results from all workers.
*
* Does not return until all pending workers are complete or the $timeout
* is reached.
*
* @param integer $timeout Timeout in fractional seconds if no results are available.
* @return array Returns an array of results
* @throws ProcessPoolException On timeout if $nullOnTimeout is false
*/
public function getAll($timeout = null, $nullOnTimeout = false)
{
$results = array();
$startTime = microtime(true);
while ($this->getPending()) {
try {
$res = $this->get($timeout);
if ($res !== null) {
$results[] = $res;
}
} catch (ProcessPoolException $e) {
// timed out
}
// timed out?
if ($timeout and microtime(true) - $startTime > $timeout) {
if ($nullOnTimeout) {
return null;
}
throw new ProcessPoolException("Timeout");
}
}
return $results;
}
public function hasResult()
{
return !empty($this->results);
}
/**
* Return the next available result or null if none are available.
*
* This does not wait or manage the worker queue.
*/
public function getResult()
{
if (empty($this->results)) {
return null;
}
return array_shift($this->results);
}
/**
* Apply a worker to the working or pending queue
*
* @param Callable $func Callback function to fork into.
* @return ProcessPool
*/
public function apply($func = null)
{
// add new function to pending queue
if ($func !== null) {
if ($func instanceof \Closure or $func instanceof ProcessInterface or is_callable($func)) {
$this->pending[] = func_get_args();
} else {
throw new \UnexpectedValueException("Parameter 1 in ProcessPool#apply must be a Closure or callable");
}
}
// start a new worker if our current worker queue is low
if (!empty($this->pending) and count($this->workers) < $this->max) {
call_user_func_array(array($this, 'create'), array_shift($this->pending));
}
return $this;
}
/**
* Create a new worker.
*
* If forking is disabled this will BLOCK.
*
* @param Closure $func Callback function.
* @param mixed Any extra parameters are passed to the callback function.
* @throws \RuntimeException if the child can not be forked.
*/
protected function create($func /*, ...*/)
{
// create a socket pair before forking so our child process can write to the PARENT.
$sockets = array();
$domain = strtoupper(substr(PHP_OS, 0, 3)) == 'WIN' ? AF_INET : AF_UNIX;
if (socket_create_pair($domain, SOCK_STREAM, 0, $sockets) === false) {
throw new \RuntimeException("socket_create_pair failed: " . socket_strerror(socket_last_error()));
}
list($child, $parent) = $sockets; // just to make the code below more readable
unset($sockets);
$args = array_merge(array($parent), array_slice(func_get_args(), 1));
$this->init(); // make sure signal handler is installed
if ($this->fork) {
$pid = pcntl_fork();
if ($pid == -1) {
throw new \RuntimeException("Could not fork");
}
if ($pid > 0) {
// PARENT PROCESS; Just track the child and return
socket_close($parent);
$this->workers[$pid] = array(
'pid' => $pid,
'socket' => $child,
);
// don't pass $parent to callback
$this->doOnCreate(array_slice($args, 1));
// If a SIGCHLD was already caught at this point we need to
// manually handle it to avoid a defunct process.
if (isset($this->caught[$pid])) {
$this->reaper($pid, $this->caught[$pid]);
unset($this->caught[$pid]);
}
} else {
// CHILD PROCESS; execute the callback function and wait for response
socket_close($child);
try {
if ($func instanceof ProcessInterface) {
$result = call_user_func_array(array($func, 'run'), $args);
} else {
$result = call_user_func_array($func, $args);
}
if ($result !== null) {
self::socket_send($parent, $result);
}
} catch (\Exception $e) {
// this is kind of useless in a forking context but at
// least the developer can see the exception if it occurs.
throw $e;
}
exit(0);
}
} else {
// forking is disabled so we simply run the child worker and wait
// synchronously for response.
try {
if ($func instanceof ProcessInterface) {
$result = call_user_func_array(array($func, 'run'), $args);
} else {
$result = call_user_func_array($func, $args);
}
if ($result !== null) {
//$this->results[] = $result;
self::socket_send($parent, $result);
}
// read anything pending from the worker if they chose to write
// to the socket instead of just returning a value.
$x = null;
do {
$read = array($child);
$ok = socket_select($read, $x, $x, 0);
if ($ok !== false and $ok > 0) {
$res = self::socket_fetch($read[0]);
if ($res !== null) {
$this->results[] = $res;
}
}
} while ($ok);
} catch (\Exception $e) {
// nop; we didn't fork so let the caller handle it
throw $e;
}
}
}
/**
* Clear all pending workers from the queue.
*/
public function clear()
{
$this->pending = array();
return $this;
}
/**
* Send a SIGTERM (or other) signal to the PID given
*/
public function kill($pid, $signo = SIGTERM)
{
posix_kill($pid, $signo);
return $this;
}
/**
* Send a SIGTERM (or other) signal to all current workers
*/
public function killAll($signo = SIGTERM)
{
foreach ($this->workers as $w) {
$this->kill($w['pid'], $signo);
}
return $this;
}
/**
* Set a callback when a new forked process is created. This will allow the
* parent to perform some sort of cleanup after every child is created.
*
* This is useful to reinitialize certain resources like DB connections
* since children will inherit the parent resources.
*
* @param \Closure $callback Function to callback after every forked child.
*/
public function setOnCreate(\Closure $callback = null)
{
$this->createCallback = $callback;
}
protected function doOnCreate($args = array())
{
if ($this->createCallback) {
call_user_func_array($this->createCallback, $args);
}
}
/**
* Return the total jobs that have NOT completed yet.
*/
public function getPending($pendingOnly = false)
{
if ($pendingOnly) {
return count($this->pending);
}
return count($this->pending) + count($this->workers) + count($this->results);
}
public function getWorkers()
{
return count($this->workers);
}
public function getActive()
{
return count($this->pending) + count($this->workers);
}
public function getCompleted()
{
return $this->count;
}
public function setForking($fork)
{
$this->fork = $fork;
return $this;
}
public function setMax($max)
{
if (!is_numeric($max) or $max < 1) {
throw new \InvalidArgumentException("Max value must be > 0");
}
$this->max = $max;
return $this;
}
public function getMax()
{
return $this->max;
}
/**
* Write the data to the socket in a predetermined format
*/
public static function socket_send($socket, $data)
{
$serialized = serialize($data);
$hdr = pack('N', strlen($serialized)); // 4 byte length
$buffer = $hdr . $serialized;
$total = strlen($buffer);
while (true) {
$sent = socket_write($socket, $buffer);
if ($sent === false) {
// @todo handle error?
//$error = socket_strerror(socket_last_error());
break;
}
if ($sent >= $total) {
break;
}
$total -= $sent;
$buffer = substr($buffer, $sent);
}
}
/**
* Read a data packet from the socket in a predetermined format.
*
* Blocking.
*
*/
public static function socket_fetch($socket)
{
// read 4 byte length first
$hdr = '';
do {
$read = socket_read($socket, 4 - strlen($hdr));
if ($read === false or $read === '') {
return null;
}
$hdr .= $read;
} while (strlen($hdr) < 4);
list($len) = array_values(unpack("N", $hdr));
// read the full buffer
$buffer = '';
do {
$read = socket_read($socket, $len - strlen($buffer));
if ($read === false or $read == '') {
return null;
}
$buffer .= $read;
} while (strlen($buffer) < $len);
$data = unserialize($buffer);
return $data;
}
}
$pool = new ProcessPool(16);
for ($i=0; $i<100; $i++) {
$pool->apply(function($parent) use ($i) {
echo "$i running...\n";
mt_srand(); // must re-seed for each child
$rand = mt_rand(1000000, 2000000);
usleep($rand);
return $i . ' : slept for ' . ($rand / 1000000) . ' seconds';
});
}
while ($pool->getPending()) {
try {
$result = $pool->get(1); // timeout in 1 second
echo "GOT: ", $result, "\n";
} catch (ProcessPoolException $e) {
// timeout
}
}
当前进程(父进程)添加任务时交由其他进程(子进程)处理,不阻塞当前进程;其他进程运行结束后,通过socket返回结果给父进程。
当然进程间通信也可以通过文件(FIFO)或者类似的中介角色如异步消息队列,Mysql,Redis等等进行交互。
参考链接:
Semaphore, Shared Memory and IPC
深刻理解Linux进程间通信(IPC)
PHP IPC with Daemon Service using Message Queues, Shared Memory and Semaphores
关于PHP你可能不知道的-PHP的事件驱动化设计
Store datasets directly in shared memory with PHP
PHP Dark Arts: Shared Memory Segments (IPC)
Something Like Threading – PHP Process Forking and Interprocess Communication
Mimicking Threading in PHP
基于System V Message queue的PHP消息队列封装
PHP进程间通信System V消息队列
PHP进程间通信System V信号量
we Do web sockets on PHP from null. A part 2. IPC
proc_open