Beanstalk是一个基于内存的(binlog持久化到硬盘),事件驱动(libevent),简单、快速的任务队列,支持大部分编程语言,将前台的任务转为后台异步处理,为web开发提供更高弹性。它可以支持多个server(客户端支持),一个任务只会被投递到一台server,一个任务只会被一个消费者获取(Reverse)。
相比RabbitMQ,Beanstalk作为一个任务队列,设计比较简单,支持以下特性:
- 优先级(priority),可以对任务进行优先处理(或降级),越小的值优先级越高(0~4,294,967,295),默认按先进先出(FIFO)
- 延迟执行(delay),一个任务创建完成并稍后再执行(比如等待主从同步)
- 超时重试(TTR),一个任务没有在指定时间内完成,将会被重新投递,由其他客户端处理。客户端也可以主动进行延时(touch)或重新入队(release)
- 隐藏(bury),一个任务执行失败了,可以先隐藏,隐藏的任务可以被重新激活(kick);
一个任务如果没有被删除,那么它就可以被重新获取。下面是大多数任务的生命周期:
put with delay release with delay
----------------> [DELAYED] <------------.
| |
| (time passes) |
| |
put v reserve | delete
-----------------> [READY] ---------> [RESERVED] --------> *poof*
^ ^ | |
| \ release | |
| `-------------' |
| |
| kick |
| |
| bury |
[BURIED] <---------------'
|
| delete
`--------> *poof*
CentOS下安装Beanstalkd
sudo yum install beanstalkd #启动beanstalk sudo service beanstalkd start #beanstalkd -l 192.168.33.14 -p 11300
PHP下面有个C扩展beanstalk库可以使用,基于libbeanstalkclient
git clone https://github.com/bergundy/libbeanstalkclient.git cd libbeanstalkclient mkdir m4 #开始编译 sudo ./autogen.sh #创建libbeanstalkclient.conf,内容为/usr/lib sudo vim /etc/ld.so.conf.d/libbeanstalkclient.conf #使配置生效 sudo ldconfig git clone https://github.com/nil-zhang/php-beanstalk.git cd php-beanstalk phpize ./configure sudo make sudo make install sudo vim /etc/php.ini
编辑php.ini增加以下内容
[beanstalk] extension = "beanstalk.so"
查看是否加载成功
php -m #加载成功则重启php-fpm sudo service php-fpm restart
PHP测试代码
<?php
$bsc = new Beanstalk();
$bsc->addserver("192.168.33.14", 11300);
$bsc->addserver("192.168.33.12", 11300);
$tubes = $bsc->list_tubes();
print_r($tubes);
for($i = 0; $i < 10; $i++)
{
$key = "key".$i;
$value = "value".$i;
$bsc->use($key);
$bsc->put($key, $value);
echo "$key\t$value\n";
$bsc->watch($key);
$job = $bsc->reserve($key);
print_r($job);
if($bsc->bury($job['id'], $key))
echo "bury ok\n";
else
echo "bury failed\n";
$bsc->kick(100, $key);
if($bsc->delete($job['id'], $key))
echo "delete ok\n";
else
echo "delete failed \n";
$bsc->ignore($key);
echo "\n";
}
echo "done\n";
注意由于Beanstalk服务端实现的比较简单,协议特性需要客户端支持,不同的实现可能效果不一样,这个客户端并没有实现延时发送(delay),超时重试(TTR)。需要这些特性建议使用这个库:PHP Beanstalkd。前台生产者创建任务:
<?php
include 'lib/Beanstalk.php';
$bean = Beanstalk::init();
$bean->addServer('192.168.33.14', 11300);
$bean->addServer('192.168.33.12', 11300);
$bean->useTube('my-tube');
$bean->put('Hello World!', 1024);
$bean->put('Hello World!2', 1023);
$bean->put(json_encode(array('what','how')), 1000, 1, 1);
后台消费者处理任务
include 'lib/Beanstalk.php';
$bean = Beanstalk::init();
$bean->addServer('192.168.33.12', 11300);
$bean->addServer('192.168.33.14', 11300);
$bean->watchTube('my-tube');
while (true)
{
try
{
$job = $bean->reserve($timeout = 10);
/* process job ... */
var_dump($job);
//var_dump($job->getMessage());
$job->delete();
}
catch (BeanstalkException $e)
{
switch ($e->getCode())
{
case BeanstalkException::TIMED_OUT:
echo "Timed out waiting for a job. Retrying in 1 second.";
sleep(1);
continue;
break;
default:
throw $e;
break;
}
}
}
注意:客户端获取任务(reverse)是阻塞的(blocking),直到超时;同一个队列(tube)的任务按FIFO进行处理(除非指定优先级);任务内容长度不能超过65536;作为内存队列需要注意是否会内存超出,可以快速处理到Mysql。
使用Beanstalk任务队列提升PHP异步处理能力,降低程序耦合度,使前台更专注,后台处理耗时、扩展性任务(也可以使用其他语言开发),使得web架构更具扩展性。
参考链接:
Scalable Work Queues with Beanstalk
Beanstalk Protocol
Frequently Asked Questions for beanstalkd
Getting Started with Beanstalkd
Queue your work
Asynchronous Processing in Web Applications, Part 2: Developers Need to Understand Message Queues