分类目录归档:php

PHP 进程间通信

上一篇介绍了PHP的多进程开发,进程间通过信号进行交互。这里介绍一下PHP的进程间通信(IPC)方法,包括基于System V IPC通信,如信号量、进程间消息、共享内存和基于socket的IPC通信。

信号量主要作为不同进程以及同一进程不同线程之间的同步手段。信号量是一个特殊的变量,程序对其访问都是原子操作,且只允许对它进行等待和发送信息操作。如果信号量是一个任意的整数,通常被称为计数信号量,或一般信号量;如果信号量只有二进制的0或1,称为二进制信号量。在linux系中,二进制信号量又称Mutex,互斥锁。以下例子采用信号量来协调进程对资源的访问

<?php
$key = ftok ( __FILE__, 's' );
// 同时最多只能有一个进程进入临界区
$sem_id = sem_get ( $key, 1 );
echo "This is a room,can only stay one people!\n\r";
// 派生子进程
$pid = pcntl_fork ();
if ($pid == - 1) {
	exit ( 'fork failed!' );
} else if ($pid > 0) {
	$name = 'parent';
} else {
	$name = 'child';
}
echo "{$name} want to enter the room \n";
sem_acquire ( $sem_id );
// 原子操作开始
echo "{$name} in the room , other people can't enter!\n";
sleep ( 3 );
echo "{$name} leave the room\n";
// 原子操作结束
sem_release ( $sem_id );
if ($pid > 0) {
	pcntl_waitpid ( $pid, $status );
	sem_remove ( $sem_id ); // 移除信号量
}

sem_get和sem_remove分别为创建和销毁信号量。当前进程(父进程)通过sem_acquire获取到信号量后其他进程(子进程)将会一直阻塞直到获取到信号量;在sem_acquire和sem_release之间操作都将是原子性的;当前进程通过sem_release释放所请求的信号量,其他进程便使用,从而实现对资源的有序访问。sem_acquire是阻塞操作,即之后的程序都需要等待获取到信号量后才能继续执行。使用多个信号量控制,需要注意是否会造成死锁。

消息队列提供了一种从一个进程向另一个进程异步发送一个数据块的方法,消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。以下是PHP子进程使用消息队列与父进程进行通信

// 生成key
$message_queue_key = ftok ( __FILE__, 'a' );
// 根据生成的key新建队列,也可自定,如123456
$message_queue = msg_get_queue ( $message_queue_key, 0666 );

$pids = array ();
for($i = 0; $i < 5; $i ++) {
	// 创建子进程
	$pids [$i] = pcntl_fork ();
	
	if ($pids [$i]) {
		echo "No.$i child process was created, the pid is $pids[$i]\r\n";
		pcntl_wait ( $status ); // 非阻塞的线程等待,防止僵尸进程的出现
	} elseif ($pids [$i] == 0) {
		$pid = posix_getpid ();
		echo "process.$pid is writing now\r\n";
		// 写队列
		msg_send ( $message_queue, 1, "this is process.$pid's data\r\n" );
		posix_kill ( $pid, SIGTERM );
	}
}

do {
	// 读队列
	msg_receive ( $message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT );
	echo $message;
	// 获取队列内消息数
	$a = msg_stat_queue ( $message_queue );
	if ($a ['msg_qnum'] == 0) {
		break;
	}
} while ( true );

消息队列存在消息大小及队列长度的限制,一旦超过将写不进去。

共享内存使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。一个进程在内存创建了一个共享区域,其他进程也可以对这块内存区域进行访问

<?php
//Check the command line arguments
if(sizeof($argv) < 2) {
     echo  "Usage: php shared_memory.php <send|get|delete> <integer identifier> <value>\n";
     exit;
}
 
//Define shared memory segment properties.
$key = "987654";
$permissions = 0666;
$size = 1024;
 
//Create or open the shared memory segment.
$segment = shm_attach($key, $size, $permissions);
 
//Handle operations for the segment.
switch($argv[1]) {
     case "send":
          shm_put_var($segment, $argv[2], $argv[3]);
          echo "Message sent to shared memory segment.\n";
          break;
     case "get":
          $data = shm_get_var($segment, $argv[2]);
          echo "Received data: {$data}\n";
          break;
     case "delete":
          shm_remove($segment);
          echo "Shared memory segment released.\n";
          break;
}

共享内存并未提供同步机制,往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。PHP还有另外一个共享内存扩展:shmop。注意,共享内存存在内存限制。

Sockets IPC提供了进程间双向的点对点通信。通过socket_create_pair创建一对socket,作为上行和下行,父进程和子进程分别使用其中一个进行读写通信

<?php
$sockets = array();
$strone = 'Message From Parent.';
$strtwo = 'Message From Child.';

if (socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $sockets) === false) {
    echo "socket_create_pair() failed. Reason: ".socket_strerror(socket_last_error());
}
$pid = pcntl_fork();
if ($pid == -1) {
    echo 'Could not fork Process.';
} elseif ($pid) {
    /*parent*/
    socket_close($sockets[0]);
    if (socket_write($sockets[1], $strone, strlen($strone)) === false) {
        echo "socket_write() failed. Reason: ".socket_strerror(socket_last_error($sockets[1]));
    }
    if (socket_read($sockets[1], strlen($strtwo), PHP_BINARY_READ) == $strtwo) {
        echo "Recieved $strtwo\n";
    }
    socket_close($sockets[1]);
} else {
    /*child*/
    socket_close($sockets[1]);
    if (socket_write($sockets[0], $strtwo, strlen($strtwo)) === false) {
        echo "socket_write() failed. Reason: ".socket_strerror(socket_last_error($sockets[0]));
    }
    if (socket_read($sockets[0], strlen($strone), PHP_BINARY_READ) == $strone) {
        echo "Recieved $strone\n";
    }
    socket_close($sockets[0]);
}

以下例子是github上一个利用PHP Sockets IP开发的多进程任务处理

<?php
namespace Lifo\IPC;

declare(ticks = 1);

interface ProcessInterface
{
	public function run($parent);
}

class ProcessPoolException extends \Exception
{
	
}

class ProcessPool
{
    /** @var Integer Maximum workers allowed at once */
    protected $max;
    /** @var boolean If true workers will fork. Otherwise they will run synchronously */
    protected $fork;
    /** @var Integer Total results collected */
    protected $count;
    /** @var array Pending processes that have not been started yet */
    protected $pending;
    /** @var array Processes that have been started */
    protected $workers;
    /** @var array Results that have been collected */
    protected $results;
    /** @var \Closure Function to call every time a child is forked */
    protected $createCallback;
    /** @var array children PID's that died prematurely */
    private   $caught;
    /** @var boolean Is the signal handler initialized? */
    private   $initialized;
    private static $instance = array();
    public function __construct($max = 1, $fork = true)
    {
        //$pid = getmypid();
        //if (isset(self::$instance[$pid])) {
        //    $caller = debug_backtrace();
        //    throw new ProcessPoolException("Cannot instantiate more than 1 ProcessPool in the same process in {$caller[0]['file']} line {$caller[0]['line']}");
        //}
        //self::$instance[$pid] = $this;
        $this->count = 0;
        $this->max = $max;
        $this->fork = $fork;
        $this->results = array();
        $this->workers = array();
        $this->pending = array();
        $this->caught = array();
        $this->initialized = false;
    }
    public function __destruct()
    {
        // make sure signal handler is removed
        $this->uninit();
        //unset(self::$instance[getmygid()]);
    }
    /**
     * Initialize the signal handler.
     *
     * Note: This will replace any current handler for SIGCHLD.
     *
     * @param boolean $force Force initialization even if already initialized
     */
    private function init($force = false)
    {
        if ($this->initialized and !$force) {
            return;
        }
        $this->initialized = true;
        pcntl_signal(SIGCHLD, array($this, 'signalHandler'));
    }
    private function uninit()
    {
        if (!$this->initialized) {
            return;
        }
        $this->initialized = false;
        pcntl_signal(SIGCHLD, SIG_DFL);
    }
    public function signalHandler($signo)
    {
        switch ($signo) {
            case SIGCHLD:
                $this->reaper();
                break;
        }
    }
    /**
     * Reap any dead children
     */
    public function reaper($pid = null, $status = null)
    {
        if ($pid === null) {
            $pid = pcntl_waitpid(-1, $status, WNOHANG);
        }
        while ($pid > 0) {
            if (isset($this->workers[$pid])) {
                // @todo does the socket really need to be closed?
                //@socket_close($this->workers[$pid]['socket']);
                unset($this->workers[$pid]);
            } else {
                // the child died before the parent could initialize the $worker
                // queue. So we track it temporarily so we can handle it in
                // self::create().
                $this->caught[$pid] = $status;
            }
            $pid = pcntl_waitpid(-1, $status, WNOHANG);
        }
    }
    /**
     * Wait for any child to be ready
     *
     * @param integer $timeout Timeout to wait (fractional seconds)
     * @return array|null Returns array of sockets ready to be READ or null
     */
    public function wait($timeout = null)
    {
        $x = null;                      // trash var needed for socket_select
        $startTime = microtime(true);
        while (true) {
            $this->apply();                         // maintain worker queue
            // check each child socket pair for a new result
            $read = array_map(function($w){ return $w['socket']; }, $this->workers);
            // it's possible for no workers/sockets to be present due to REAPING
            if (!empty($read)) {
                $ok = @socket_select($read, $x, $x, $timeout);
                if ($ok !== false and $ok > 0) {
                    return $read;
                }
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                return null;
            }
            // no sense in waiting if we have no workers and no more pending
            if (empty($this->workers) and empty($this->pending)) {
                return null;
            }
        }
    }
    /**
     * Return the next available result.
     *
     * Blocks unless a $timeout is specified.
     *
     * @param integer $timeout Timeout in fractional seconds if no results are available.
     * @return mixed Returns next child response or null on timeout
     * @throws ProcessPoolException On timeout if $nullOnTimeout is false
     */
    public function get($timeout = null, $nullOnTimeout = false)
    {
        $startTime = microtime(true);
        while ($this->getPending()) {
            // return the next result
            if ($this->hasResult()) {
                return $this->getResult();
            }
            // wait for the next result
            $ready = $this->wait($timeout);
            if (is_array($ready)) {
                foreach ($ready as $socket) {
                    $res = self::socket_fetch($socket);
                    if ($res !== null) {
                        $this->results[] = $res;
                        $this->count++;
                    }
                }
                if ($this->hasResult()) {
                    return $this->getResult();
                }
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                if ($nullOnTimeout) {
                    return null;
                }
                throw new ProcessPoolException("Timeout");
            }
        }
    }
    /**
     * Return results from all workers.
     *
     * Does not return until all pending workers are complete or the $timeout
     * is reached.
     *
     * @param integer $timeout Timeout in fractional seconds if no results are available.
     * @return array Returns an array of results
     * @throws ProcessPoolException On timeout if $nullOnTimeout is false
     */
    public function getAll($timeout = null, $nullOnTimeout = false)
    {
        $results = array();
        $startTime = microtime(true);
        while ($this->getPending()) {
            try {
                $res = $this->get($timeout);
                if ($res !== null) {
                    $results[] = $res;
                }
            } catch (ProcessPoolException $e) {
                // timed out
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                if ($nullOnTimeout) {
                    return null;
                }
                throw new ProcessPoolException("Timeout");
            }
        }
        return $results;
    }
    public function hasResult()
    {
        return !empty($this->results);
    }
    /**
     * Return the next available result or null if none are available.
     *
     * This does not wait or manage the worker queue.
     */
    public function getResult()
    {
        if (empty($this->results)) {
            return null;
        }
        return array_shift($this->results);
    }
    /**
     * Apply a worker to the working or pending queue
     *
     * @param Callable $func Callback function to fork into.
     * @return ProcessPool
     */
    public function apply($func = null)
    {
        // add new function to pending queue
        if ($func !== null) {
            if ($func instanceof \Closure or $func instanceof ProcessInterface or is_callable($func)) {
                $this->pending[] = func_get_args();
            } else {
                throw new \UnexpectedValueException("Parameter 1 in ProcessPool#apply must be a Closure or callable");
            }
        }
        // start a new worker if our current worker queue is low
        if (!empty($this->pending) and count($this->workers) < $this->max) {
            call_user_func_array(array($this, 'create'), array_shift($this->pending));
        }
        return $this;
    }
    /**
     * Create a new worker.
     *
     * If forking is disabled this will BLOCK.
     *
     * @param Closure $func Callback function.
     * @param mixed Any extra parameters are passed to the callback function.
     * @throws \RuntimeException if the child can not be forked.
     */
    protected function create($func /*, ...*/)
    {
        // create a socket pair before forking so our child process can write to the PARENT.
        $sockets = array();
        $domain = strtoupper(substr(PHP_OS, 0, 3)) == 'WIN' ? AF_INET : AF_UNIX;
        if (socket_create_pair($domain, SOCK_STREAM, 0, $sockets) === false) {
            throw new \RuntimeException("socket_create_pair failed: " . socket_strerror(socket_last_error()));
        }
        list($child, $parent) = $sockets; // just to make the code below more readable
        unset($sockets);
        $args = array_merge(array($parent), array_slice(func_get_args(), 1));
        $this->init();                  // make sure signal handler is installed
        if ($this->fork) {
            $pid = pcntl_fork();
            if ($pid == -1) {
                throw new \RuntimeException("Could not fork");
            }
            if ($pid > 0) {
                // PARENT PROCESS; Just track the child and return
                socket_close($parent);
                $this->workers[$pid] = array(
                    'pid' => $pid,
                    'socket' => $child,
                );
                // don't pass $parent to callback
                $this->doOnCreate(array_slice($args, 1));
                // If a SIGCHLD was already caught at this point we need to
                // manually handle it to avoid a defunct process.
                if (isset($this->caught[$pid])) {
                    $this->reaper($pid, $this->caught[$pid]);
                    unset($this->caught[$pid]);
                }
            } else {
                // CHILD PROCESS; execute the callback function and wait for response
                socket_close($child);
                try {
                    if ($func instanceof ProcessInterface) {
                        $result = call_user_func_array(array($func, 'run'), $args);
                    } else {
                        $result = call_user_func_array($func, $args);
                    }
                    if ($result !== null) {
                        self::socket_send($parent, $result);
                    }
                } catch (\Exception $e) {
                    // this is kind of useless in a forking context but at
                    // least the developer can see the exception if it occurs.
                    throw $e;
                }
                exit(0);
            }
        } else {
            // forking is disabled so we simply run the child worker and wait
            // synchronously for response.
            try {
                if ($func instanceof ProcessInterface) {
                    $result = call_user_func_array(array($func, 'run'), $args);
                } else {
                    $result = call_user_func_array($func, $args);
                }
                if ($result !== null) {
                    //$this->results[] = $result;
                    self::socket_send($parent, $result);
                }
                // read anything pending from the worker if they chose to write
                // to the socket instead of just returning a value.
                $x = null;
                do {
                    $read = array($child);
                    $ok = socket_select($read, $x, $x, 0);
                    if ($ok !== false and $ok > 0) {
                        $res = self::socket_fetch($read[0]);
                        if ($res !== null) {
                            $this->results[] = $res;
                        }
                    }
                } while ($ok);
            } catch (\Exception $e) {
                // nop; we didn't fork so let the caller handle it
                throw $e;
            }
        }
    }
    /**
     * Clear all pending workers from the queue.
     */
    public function clear()
    {
        $this->pending = array();
        return $this;
    }
    /**
     * Send a SIGTERM (or other) signal to the PID given
     */
    public function kill($pid, $signo = SIGTERM)
    {
        posix_kill($pid, $signo);
        return $this;
    }
    /**
     * Send a SIGTERM (or other) signal to all current workers
     */
    public function killAll($signo = SIGTERM)
    {
        foreach ($this->workers as $w) {
            $this->kill($w['pid'], $signo);
        }
        return $this;
    }
    /**
     * Set a callback when a new forked process is created. This will allow the
     * parent to perform some sort of cleanup after every child is created.
     *
     * This is useful to reinitialize certain resources like DB connections
     * since children will inherit the parent resources.
     *
     * @param \Closure $callback Function to callback after every forked child.
     */
    public function setOnCreate(\Closure $callback = null)
    {
        $this->createCallback = $callback;
    }
    protected function doOnCreate($args = array())
    {
        if ($this->createCallback) {
            call_user_func_array($this->createCallback, $args);
        }
    }
    /**
     * Return the total jobs that have NOT completed yet.
     */
    public function getPending($pendingOnly = false)
    {
        if ($pendingOnly) {
            return count($this->pending);
        }
        return count($this->pending) + count($this->workers) + count($this->results);
    }
    public function getWorkers()
    {
        return count($this->workers);
    }
    public function getActive()
    {
        return count($this->pending) + count($this->workers);
    }
    public function getCompleted()
    {
        return $this->count;
    }
    public function setForking($fork)
    {
        $this->fork = $fork;
        return $this;
    }
    public function setMax($max)
    {
        if (!is_numeric($max) or $max < 1) {
            throw new \InvalidArgumentException("Max value must be > 0");
        }
        $this->max = $max;
        return $this;
    }
    public function getMax()
    {
        return $this->max;
    }
    /**
     * Write the data to the socket in a predetermined format
     */
    public static function socket_send($socket, $data)
    {
        $serialized = serialize($data);
        $hdr = pack('N', strlen($serialized));    // 4 byte length
        $buffer = $hdr . $serialized;
        $total = strlen($buffer);
        while (true) {
            $sent = socket_write($socket, $buffer);
            if ($sent === false) {
                // @todo handle error?
                //$error = socket_strerror(socket_last_error());
                break;
            }
            if ($sent >= $total) {
                break;
            }
            $total -= $sent;
            $buffer = substr($buffer, $sent);
        }
    }
    /**
     * Read a data packet from the socket in a predetermined format.
     *
     * Blocking.
     *
     */
    public static function socket_fetch($socket)
    {
        // read 4 byte length first
        $hdr = '';
        do {
            $read = socket_read($socket, 4 - strlen($hdr));
            if ($read === false or $read === '') {
                return null;
            }
            $hdr .= $read;
        } while (strlen($hdr) < 4);
        list($len) = array_values(unpack("N", $hdr));
        // read the full buffer
        $buffer = '';
        do {
            $read = socket_read($socket, $len - strlen($buffer));
            if ($read === false or $read == '') {
                return null;
            }
            $buffer .= $read;
        } while (strlen($buffer) < $len);
        $data = unserialize($buffer);
        return $data;
    }
}


$pool = new ProcessPool(16);
for ($i=0; $i<100; $i++) {
	$pool->apply(function($parent) use ($i) {
		echo "$i running...\n";
		mt_srand(); // must re-seed for each child
		$rand = mt_rand(1000000, 2000000);
		usleep($rand);
		return $i . ' : slept for ' . ($rand / 1000000) . ' seconds';
        });
}
while ($pool->getPending()) {
	try {
		$result = $pool->get(1);    // timeout in 1 second
		echo "GOT: ", $result, "\n";
	} catch (ProcessPoolException $e) {
			// timeout
	}
}

当前进程(父进程)添加任务时交由其他进程(子进程)处理,不阻塞当前进程;其他进程运行结束后,通过socket返回结果给父进程。

当然进程间通信也可以通过文件(FIFO)或者类似的中介角色如异步消息队列,Mysql,Redis等等进行交互。

参考链接:
Semaphore, Shared Memory and IPC
深刻理解Linux进程间通信(IPC)
PHP IPC with Daemon Service using Message Queues, Shared Memory and Semaphores
关于PHP你可能不知道的-PHP的事件驱动化设计
Store datasets directly in shared memory with PHP
PHP Dark Arts: Shared Memory Segments (IPC)
Something Like Threading – PHP Process Forking and Interprocess Communication
Mimicking Threading in PHP
基于System V Message queue的PHP消息队列封装
PHP进程间通信System V消息队列
PHP进程间通信System V信号量
we Do web sockets on PHP from null. A part 2. IPC
proc_open

PHP 进程控制PCNTL

PHP的进程控制PCNTL支持实现了Unix方式的进程创建, 程序执行,信号处理以及进程的中断。 PCNTL只支持linux平台下cli模式,不支持Windows平台,也不能被应用在Web服务器环境(cgi等),当其被用于Web服务环境时可能会带来意外的结果。通常,PCNTL会结合另外一个扩展来使用POSIX来开发(也不支持Windows平台)。

pcntl_fork可以创建一个子进程,父进程和子进程 都从fork的位置开始向下继续执行。创建成功时,父进程得到的返回值是子进程号而子进程得到的返回值是0;创建失败时,父进程得到返回值是-1,不会创建子进程,并触发一个PHP错误。

<?php

$pid = pcntl_fork();
//父进程和子进程都会执行下面代码
if ($pid == -1) {
    //错误处理:创建子进程失败时返回-1.
     die('could not fork');
} else if ($pid) {
     //父进程会得到子进程号,所以这里是父进程执行的逻辑
     pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
} else {
     //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
}

在对应的父进程结束执行后,子进程就会变成孤儿进程,但之后会立即由init进程(进程ID为1)“收养”为其子进程。

某一子进程终止执行后,若其父进程未提前调用wait,则内核会持续保留子进程的退出状态等信息,以使父进程可以wait获取之[2] 。而因为在这种情况下,子进程虽已终止,但仍在消耗系统资源,所以其亦称僵尸进程。wait常于SIGCHLD信号的处理函数中调用。

为避免产生僵尸进程,一般采取的方式是:将父进程中对SIGCHLD信号的处理函数设为SIG_IGN(忽略信号);fork两次并杀死一级子进程,令二级子进程成为孤儿进程而被init所“收养”、清理。

采用二次创建子进程的方式

<?php
	$pid = pcntl_fork();
	if($pid) {
		//创建成功,在父进程中执行
		echo "run in parent process";//pcntl_wait($status);
	} else if($pid == -1) {
		//创建失败,在父进程中处理
		echo "Couldn't create child process.";
	} else {
		//创建成功,在子进程中执行
		//再次创建子进程,即孙进程
		$pid = pcntl_fork();
		if($pid == 0) {
			//在孙进程中执行
			if(-1 == posix_setsid())
	        {
	            // 出错退出
	            exit("Setsid fail");
	        }
			echo "run in grandchild process";
		} else if($pid == -1) {
			echo "Couldn’t create child process.";
		} else {
			//在子进程中处理
			echo "run in child process.";//posix_kill(posix_getpid(), SIGUSR1);
			exit;
		}
	}

通常还会把子进程的pid收集以来,以便监控、回收,如workerman。二次创建子进程通常应用在PHP多进程,守护进程上,比如

<?php
defined('DEAMON_LOCK_FILE') ||
define('DEAMON_LOCK_FILE', 'run/deamon.pid');

if($_SERVER['argc'] >= 2 && $_SERVER['argv'][1] == 'kill')
{
	$fh = fopen(realpath(__DIR__) . '/' . DEAMON_LOCK_FILE, 'r');
	$pid = fread($fh, 8);

	if( $pid )
		posix_kill($pid, SIGTERM);

	exit;
}

global $DEAMON_LOCK_HANDLER;

function daemonize($signalHandler = false ) {
	global $DEAMON_LOCK_HANDLER;

	if( ! deamon_file_lock() ) {
		printf("Deamon is already running...\n");
		exit();
	}

	umask(0);

	$pid = pcntl_fork();

	if( $pid < 0 ) {
		printf("Can't fork\n");
		exit;
	}
	else if( $pid ) {
		exit;
	}

	$sid = posix_setsid();

	if( $sid < 0 ) {
		printf("Can't set session leader\n");
		exit;
	}

	deamon_bind_signals($signalHandler);

	$pid = pcntl_fork();

	if( $pid < 0 || $pid ) {
		exit;
	}

	ftruncate($DEAMON_LOCK_HANDLER, 0);
	fwrite($DEAMON_LOCK_HANDLER, posix_getpid());

	chdir('/');

	fclose( STDIN );
	fclose( STDOUT );
	fclose( STDERR );
}

function deamon_bind_signals($signalHandler = false) {
	$signalHandler = !$signalHandler ? "deamon_signal_handler" : $signalHandler;

	pcntl_signal(SIGTERM, $signalHandler);
	pcntl_signal(SIGHUP,  $signalHandler);
	pcntl_signal(SIGUSR1, $signalHandler);
	pcntl_signal(SIGINT, $signalHandler);
}

function deamon_file_lock() {
	global $DEAMON_LOCK_HANDLER;
	$DEAMON_LOCK_HANDLER = fopen(realpath(__DIR__) . '/' . DEAMON_LOCK_FILE, 'c');

	if( ! $DEAMON_LOCK_HANDLER ) {
		printf("Can't open lock file\n");
		die();
	}
	if( !flock( $DEAMON_LOCK_HANDLER, LOCK_EX | LOCK_NB ) ) {
		return false;
	}
	return true;
}

function deamon_signal_handler($signo) {
	switch( $signo ) {
		case SIGTERM:
		case SIGHUP:
		case SIGUSR1:
			break;
	}
}

function sighandler($sig) {
        //do something
	if( $sig == SIGTERM ) {
		global $DEAMON_LOCK_HANDLER;
		fclose( $DEAMON_LOCK_HANDLER );
		exit;
	}
}
daemonize("sighandler");

while( 1 ) {
	pcntl_signal_dispatch();
	// do something here
	sleep( 1 );
}

可以通过ps -ef | grep php查看过程中的php进程产生情况,CentOS下安装PHP5.4的Posix扩展为:sudo yum instal php54w-process。

pcntl_signal可以注册信号处理函数,捕获信号后交给对应回调函数处理,实现信号通信,例如当某一子进程结束、中断或恢复执行时,内核会发送SIGCHLD信号予其父进程

<?php
declare(ticks = 1);

pcntl_signal(SIGCHLD, "signal_handler");

function signal_handler($signal) {
	switch($signal) {
		case SIGCHLD:
			while (pcntl_waitpid(0, $status) != -1) {
				$status = pcntl_wexitstatus($status);
				echo "Child $status completed\n";
			}

			exit;
	}
}

for ($i = 1; $i <= 5; ++$i) {
	$pid = pcntl_fork();

	if (!$pid) {
		sleep(1);
		print "In child $i\n";
		exit($i);
	}
}

while(1) {
	// parent does processing here...
}

pcntl_alarm创建一个计时器,在指定的秒数后向进程发送一个SIGALRM信号,结合pcntl_signal和pcntl_alarm可以做一个秒级的定时器(注意:pcntl_alarm是一次性消耗,需要再次设置)

declare(ticks = 1);

function signal_handler($signal) {
	//do your work here
	print "Caught SIGALRM\n";
	pcntl_alarm(3);
}

pcntl_signal(SIGALRM, "signal_handler", true);
pcntl_alarm(3);

while(1) {
}

利用PHP的进程控制便可以实现守护进程监控,如socke端口监听;多进程处理,如socke请求事件处理、任务并行、异步处理,提升PHP程序性能。

参考链接:
PHP 进程控制
Getting into multiprocessing
Timing your signals
PHP Deamon
PHP中利用pcntl进行多进程并发控制
PHP高级编程之守护进程
PHP多进程编程一,PHP多进程编程二。
PHP的ticks机制
PHP如何将进程作为守护进程
Daemonising a PHP cli script on a posix system
异步毫秒定时器
The declare() function and ticks
子进程

PHP 扩展开发之C

前面介绍了使用Zephir来开发PHP扩展,将PHP代码转为扩展,以提升性能,保护代码。目前更多的扩展都是采用C/C++开发的,最近在项目开发中,需要在这些已有的PHP扩展上开发,也只能用C/C++来开发了。

首先去PHP官网下载对应版本的PHP源码,解压并进入对应的目录。
创建扩展courages:

[vagrant@vagrant-centos64 ext]$ ./ext_skel
./ext_skel --extname=module [--proto=file] [--stubs=file] [--xml[=file]]
           [--skel=dir] [--full-xml] [--no-help]

  --extname=module   module is the name of your extension
  --proto=file       file contains prototypes of functions to create
  --stubs=file       generate only function stubs in file
  --xml              generate xml documentation to be added to phpdoc-cvs
  --skel=dir         path to the skeleton directory
  --full-xml         generate xml documentation for a self-contained extension
                     (not yet implemented)
  --no-help          don't try to be nice and create comments in the code
                     and helper functions to test if the module compiled
[vagrant@vagrant-centos64 ext]$ ./ext_skel --extname=courages
Creating directory courages
Creating basic files: config.m4 config.w32 .svnignore courages.c php_courages.h CREDITS EXPERIMENTAL tests/001.phpt courages.php [done].

To use your new extension, you will have to execute the following steps:

1.  $ cd ..
2.  $ vi ext/courages/config.m4
3.  $ ./buildconf
4.  $ ./configure --[with|enable]-courages
5.  $ make
6.  $ ./sapi/cli/php -f ext/courages/courages.php
7.  $ vi ext/courages/courages.c
8.  $ make

Repeat steps 3-6 until you are satisfied with ext/courages/config.m4 and
step 6 confirms that your module is compiled into PHP. Then, start writing
code and repeat the last two steps as often as necessary.

这里的步骤说的很清楚,但这一次,步骤3被phpize代替了。

按部就班,编辑config.m4,PHP_ARG_WITH是采用动态库方式加载(PHP_ARG_ENABLE则是编译内核中,configure是–enable-extension使用),将

dnl PHP_ARG_WITH(courages, whether to enable courages support,
dnl Make sure that the comment is aligned:
dnl [  --with-courages             Include courages support])

更改为

PHP_ARG_WITH(courages, for courages support,
[  --with-courages             Include courages support])

然后,在php_courages.h增加函数声明

PHP_FUNCTION(confirm_courages_compiled);	/* For testing, remove later. */
PHP_FUNCTION(courages_helloworld);

接着,编辑courages.c,在function_entry中增加函数注册

const zend_function_entry courages_functions[] = {
	PHP_FE(confirm_courages_compiled,	NULL)		/* For testing, remove later. */
 	PHP_FE(courages_helloworld,  NULL)
	PHP_FE_END	/* Must be the last line in courages_functions[] */
};

然后是courages_helloworld函数实现

PHP_FUNCTION(courages_helloworld)
{
        char *arg = NULL;
	int arg_len, len;
	char *strg;
	if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &arg, &arg_len) == FAILURE) {
		return;
	}
	len = spprintf(&strg, 0, "Your input string: %s/n", arg);
	php_printf(strg);
	return SUCCESS;
}

最后就是编译

phpize
./configure
sudo make
sudo make install

sudo vim /etc/php.ini

在php.ini中增加扩展courages.so

[courages]
extension = courages.so

测试一下

[vagrant@vagrant-centos64 courages]$ php -m | grep 'courages'
courages

[vagrant@vagrant-centos64 courages]$ php courages.php
Functions available in the test extension:
confirm_courages_compiled
courages_helloworld

Your input string: hellow world
Congratulations! You have successfully modified ext/courages/config.m4. Module courages is now compiled into PHP.

到这里一个扩展的开发流程就结束了。

这里分享一些小技巧。
首先是如何在PHP扩展中获取PHP全局数组$_SERVER($_POST/GET)变量中的值:

static char *get_server_var_by_name(char *str){
	// This code makes sure $_SERVER has been initialized
	if (!zend_hash_exists(&EG(symbol_table), "_SERVER", 8)) {
		zend_auto_global* auto_global;
		if (zend_hash_find(CG(auto_globals), "_SERVER", 8, (void **)&auto_global) != FAILURE) {
			auto_global->armed = auto_global->auto_global_callback(auto_global->name, auto_global->name_len TSRMLS_CC);
		}
	}

	// This fetches $_SERVER['PHP_SELF']
	zval** arr;
	char* script_name;
	if (zend_hash_find(&EG(symbol_table), "_SERVER", 8, (void**)&arr) != FAILURE) {
		HashTable* ht = Z_ARRVAL_P(*arr);
		zval** val;
		if (zend_hash_find(ht, str, strlen(str)+1, (void**)&val) != FAILURE) {
			script_name = Z_STRVAL_PP(val);
		}
	}
	return script_name;
}

然后是如何在PHP扩展调用PHP函数:

/*调用无参数函数*/
static char *get_sapi_name(){
    zval *function_name;
    zval *retval;
	char *sapi_name;

    MAKE_STD_ZVAL(function_name);
    ZVAL_STRING(function_name , "php_sapi_name", 1);


    if (call_user_function_ex(EG(function_table), NULL, function_name, &retval, 0, NULL, 0, EG(active_symbol_table) TSRMLS_CC) != SUCCESS)
    {
        zend_error(E_ERROR, "Function call failed");
    }

    if (retval != NULL && Z_TYPE_P(retval) != IS_STRING) {
        convert_to_string(retval);
        sapi_name = Z_STRVAL_P(retval);
    }
    else{
	sapi_name = "cli";
    }

    return sapi_name;
}
/*调用有参数函数*/
static int _ck_dir(char *dir TSRMLS_DC)
{
    zval *function_name;
    zval *retval;
    zval *str;
    zval **param[1];

    MAKE_STD_ZVAL(function_name);
    ZVAL_STRING(function_name , "is_dir", 1);

    MAKE_STD_ZVAL(str);
    ZVAL_STRING(str, dir, 1);
    param[0] = &str;

    if (call_user_function_ex(EG(function_table), NULL, function_name, &retval, 1, param, 0, EG(active_symbol_table) TSRMLS_CC) != SUCCESS)
    {
        zend_error(E_ERROR, "Function call failed");
    }

    if (retval != NULL && zval_is_true(retval)) {
        return SUCCESS;
    }

    return FAILURE;
}

更高级的一些技巧可以参考《PHP扩展开发及内核应用》和阅读别人的扩展开发代码。

参考链接:
php扩展实战 —— 获得ip的来源地址
如何编写一个PHP的C扩展
[原创]快速开发一个PHP扩展
用C/C++扩展你的PHP
Get the name of running script from a PHP extension
Build PHP extension and use call_user_function
Programming PHP
与 UNIX 构建系统交互: config.m4
call_user_function_ex() documentation
PHP Extensions Made Eldrich: PHP Variables
Convert Zval to char*
PHP扩展编写第一步:PHP和Zend介绍
PHP扩展开发:简单类实现
自己写PHP扩展之创建一个类[原创]
如何在扩展里调用PHP函数呢?

PHP队列开发之Beanstalk

Beanstalk是一个基于内存的(binlog持久化到硬盘),事件驱动(libevent),简单、快速的任务队列,支持大部分编程语言,将前台的任务转为后台异步处理,为web开发提供更高弹性。它可以支持多个server(客户端支持),一个任务只会被投递到一台server,一个任务只会被一个消费者获取(Reverse)。

相比RabbitMQ,Beanstalk作为一个任务队列,设计比较简单,支持以下特性:

  • 优先级(priority),可以对任务进行优先处理(或降级),越小的值优先级越高(0~4,294,967,295),默认按先进先出(FIFO)
  • 延迟执行(delay),一个任务创建完成并稍后再执行(比如等待主从同步)
  • 超时重试(TTR),一个任务没有在指定时间内完成,将会被重新投递,由其他客户端处理。客户端也可以主动进行延时(touch)或重新入队(release)
  • 隐藏(bury),一个任务执行失败了,可以先隐藏,隐藏的任务可以被重新激活(kick);

一个任务如果没有被删除,那么它就可以被重新获取。下面是大多数任务的生命周期:

   put with delay               release with delay
  ----------------> [DELAYED] <------------.
                        |                   |
                        | (time passes)     |
                        |                   |
   put                  v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                       ^  ^                |  |
                       |   \  release      |  |
                       |    `-------------'   |
                       |                      |
                       | kick                 |
                       |                      |
                       |       bury           |
                    [BURIED] <---------------'
                       |
                       |  delete
                        `--------> *poof*

CentOS下安装Beanstalkd

sudo yum install beanstalkd
#启动beanstalk
sudo service beanstalkd start
#beanstalkd -l 192.168.33.14 -p 11300

PHP下面有个C扩展beanstalk库可以使用,基于libbeanstalkclient

git clone https://github.com/bergundy/libbeanstalkclient.git
cd libbeanstalkclient
mkdir m4
#开始编译
sudo ./autogen.sh

#创建libbeanstalkclient.conf,内容为/usr/lib
sudo vim /etc/ld.so.conf.d/libbeanstalkclient.conf
#使配置生效
sudo ldconfig

git clone https://github.com/nil-zhang/php-beanstalk.git
cd php-beanstalk
phpize
./configure
sudo make
sudo make install
sudo vim /etc/php.ini

编辑php.ini增加以下内容

[beanstalk]
extension = "beanstalk.so"

查看是否加载成功

php -m
#加载成功则重启php-fpm
sudo service php-fpm restart

PHP测试代码

<?php
    $bsc = new Beanstalk();

    $bsc->addserver("192.168.33.14", 11300);
    $bsc->addserver("192.168.33.12", 11300);

    $tubes = $bsc->list_tubes();
    print_r($tubes);

    for($i = 0; $i < 10; $i++)
    {
        $key = "key".$i;
        $value = "value".$i;

        $bsc->use($key);
        $bsc->put($key, $value);
        echo "$key\t$value\n";

        $bsc->watch($key);
        $job = $bsc->reserve($key);
        print_r($job);

        if($bsc->bury($job['id'], $key))
            echo "bury ok\n";
        else
            echo "bury failed\n";

        $bsc->kick(100, $key);
        if($bsc->delete($job['id'], $key))
            echo "delete ok\n";
        else
            echo "delete failed \n";

        $bsc->ignore($key);
        echo "\n";
    }

    echo "done\n";

注意由于Beanstalk服务端实现的比较简单,协议特性需要客户端支持,不同的实现可能效果不一样,这个客户端并没有实现延时发送(delay),超时重试(TTR)。需要这些特性建议使用这个库:PHP Beanstalkd。前台生产者创建任务:

<?php
include 'lib/Beanstalk.php';
$bean = Beanstalk::init();
$bean->addServer('192.168.33.14', 11300);
$bean->addServer('192.168.33.12', 11300);
$bean->useTube('my-tube');
$bean->put('Hello World!', 1024);
$bean->put('Hello World!2', 1023);
$bean->put(json_encode(array('what','how')), 1000, 1, 1);

后台消费者处理任务

include 'lib/Beanstalk.php';
$bean = Beanstalk::init();
$bean->addServer('192.168.33.12', 11300);
$bean->addServer('192.168.33.14', 11300);
$bean->watchTube('my-tube');

while (true)
{
	try
	{
		$job = $bean->reserve($timeout = 10);

		/* process job ... */
		var_dump($job);
		//var_dump($job->getMessage());

		$job->delete();
	}
	catch (BeanstalkException $e)
	{
		switch ($e->getCode())
		{
			case BeanstalkException::TIMED_OUT:
				echo "Timed out waiting for a job.  Retrying in 1 second.";
				sleep(1);
				continue;
				break;
			default:
				throw $e;
				break;
		}
	}
}

注意:客户端获取任务(reverse)是阻塞的(blocking),直到超时;同一个队列(tube)的任务按FIFO进行处理(除非指定优先级);任务内容长度不能超过65536;作为内存队列需要注意是否会内存超出,可以快速处理到Mysql。

使用Beanstalk任务队列提升PHP异步处理能力,降低程序耦合度,使前台更专注,后台处理耗时、扩展性任务(也可以使用其他语言开发),使得web架构更具扩展性。

参考链接:
Scalable Work Queues with Beanstalk
Beanstalk Protocol
Frequently Asked Questions for beanstalkd
Getting Started with Beanstalkd
Queue your work
Asynchronous Processing in Web Applications, Part 2: Developers Need to Understand Message Queues

PHPPHP 7

最近学习了PHP 7介绍的视频,看到了PHP每一天都在变好。PHP 将会带来许多新的特性:

  • 抽象语法树,在编译opcode时能够做更多的优化
  • INT64提升,支持大于2GB的字符串/文件上传
  • 标量类型声明,可以声明参数类型
  • 返回值声明,可以声明返回值类型
  • 新操作符,<=>操作符,大于返回1,等于返回0,小于返回-1
  • 统一变量语法;
  • 引擎异常改进,函数不存在的fata error转为异常可以捕获处理

等等。
除了新特性外,PHP 7使用了新引擎PHP NG,性能得到了巨大的提升(与HHVM对比)。
这里的性能优化比较有意思, 非常值得借鉴学习。为什么要进行重构:

  • PHP 5.4之后php引擎的性能提升已经不大,Zend VM已经高度优化
  • PHP JIT(Just In Time Compiler)项目在benchmakr上性能提升巨大(8倍),但是在实际业务提升不大(离开实际业务谈benchmark都是耍流氓)
  • 分析wordpress项目发现瓶颈并不在Zend VM上面,21% CPU时间花在了内存管理,12%时间花在了hashTable操作,30% CPU时间花在了内部调用,25% CUP时间花在了VM

首先应当分析实际业务的性能消耗占比,确定需要优化的地方。JIT仅仅优化了Zend VM,提升是有限的,需要对其他地方(内存操作,HashTable,函数调用)也进行优化,所以才进行的重构。

PHP NG在很多方面做了优化:

  • 标量类型不做引用计数,zval分配在堆上,增加Referance类型
  • 增加类型zend string,将字符串与值连续分配,避免CPU cache miss,避免二次读内存
  • 原本的HashTable变为zend array,数组值为zval,内存一次分配,提升data locality,避免CPU cache miss
  • 函数调用优化,减少重复入栈
  • 参数解析优化,将不确定转化为确定
  • zend_qsort排序优化
  • 内存管理优化,减少CPU cache miss
  • 常用函数优化
  • 字符串拼接优化,减少内存访问次数

通过这一系列的优化,改进了PHP的内存操作,改进了HashTable操作,改进了内部函数调用,降低了CPU 时间消耗,这时候再来做PHP JIT优化就有意义。PHP 7接下来仍然会继续优化,迁移常用扩展。
许多优化都是微小的,但是积累起来后却是巨大,特别是对于微博这样的网站,1%的性能提升都意义非凡。
优化是具有专向性的,对A场景的优化可能并不适合B场景,所以需要分析实际的业务中的瓶颈和调用频次,权衡优化的方向。
现在距离10月份发布已经不远了,大家也可以检查自己项目是否兼容PHP 7,性能提升多少。

参考链接:
PHP7 – New engine for good old train
PHP7 VS HHVM (WordPress)
PHPNG (next generation)
What to Expect When You’re Expecting: PHP 7, Part 1
PHP 7 Feature Freeze
HHVM
HHVM 是如何提升 PHP 性能的?
PHP Fights HHVM and Zephir with PHPNG
rlerdorf/php7dev
Building and testing the upcoming PHP7
Install PHP 7 on Ubuntu 14.04