Beanstalk是一个基于内存的(binlog持久化到硬盘),事件驱动(libevent),简单、快速的任务队列,支持大部分编程语言,将前台的任务转为后台异步处理,为web开发提供更高弹性。它可以支持多个server(客户端支持),一个任务只会被投递到一台server,一个任务只会被一个消费者获取(Reverse)。
相比RabbitMQ,Beanstalk作为一个任务队列,设计比较简单,支持以下特性:
- 优先级(priority),可以对任务进行优先处理(或降级),越小的值优先级越高(0~4,294,967,295),默认按先进先出(FIFO)
- 延迟执行(delay),一个任务创建完成并稍后再执行(比如等待主从同步)
- 超时重试(TTR),一个任务没有在指定时间内完成,将会被重新投递,由其他客户端处理。客户端也可以主动进行延时(touch)或重新入队(release)
- 隐藏(bury),一个任务执行失败了,可以先隐藏,隐藏的任务可以被重新激活(kick);
一个任务如果没有被删除,那么它就可以被重新获取。下面是大多数任务的生命周期:
put with delay release with delay ----------------> [DELAYED] <------------. | | | (time passes) | | | put v reserve | delete -----------------> [READY] ---------> [RESERVED] --------> *poof* ^ ^ | | | \ release | | | `-------------' | | | | kick | | | | bury | [BURIED] <---------------' | | delete `--------> *poof*
CentOS下安装Beanstalkd
sudo yum install beanstalkd #启动beanstalk sudo service beanstalkd start #beanstalkd -l 192.168.33.14 -p 11300
PHP下面有个C扩展beanstalk库可以使用,基于libbeanstalkclient
git clone https://github.com/bergundy/libbeanstalkclient.git cd libbeanstalkclient mkdir m4 #开始编译 sudo ./autogen.sh #创建libbeanstalkclient.conf,内容为/usr/lib sudo vim /etc/ld.so.conf.d/libbeanstalkclient.conf #使配置生效 sudo ldconfig git clone https://github.com/nil-zhang/php-beanstalk.git cd php-beanstalk phpize ./configure sudo make sudo make install sudo vim /etc/php.ini
编辑php.ini增加以下内容
[beanstalk] extension = "beanstalk.so"
查看是否加载成功
php -m #加载成功则重启php-fpm sudo service php-fpm restart
PHP测试代码
<?php $bsc = new Beanstalk(); $bsc->addserver("192.168.33.14", 11300); $bsc->addserver("192.168.33.12", 11300); $tubes = $bsc->list_tubes(); print_r($tubes); for($i = 0; $i < 10; $i++) { $key = "key".$i; $value = "value".$i; $bsc->use($key); $bsc->put($key, $value); echo "$key\t$value\n"; $bsc->watch($key); $job = $bsc->reserve($key); print_r($job); if($bsc->bury($job['id'], $key)) echo "bury ok\n"; else echo "bury failed\n"; $bsc->kick(100, $key); if($bsc->delete($job['id'], $key)) echo "delete ok\n"; else echo "delete failed \n"; $bsc->ignore($key); echo "\n"; } echo "done\n";
注意由于Beanstalk服务端实现的比较简单,协议特性需要客户端支持,不同的实现可能效果不一样,这个客户端并没有实现延时发送(delay),超时重试(TTR)。需要这些特性建议使用这个库:PHP Beanstalkd。前台生产者创建任务:
<?php include 'lib/Beanstalk.php'; $bean = Beanstalk::init(); $bean->addServer('192.168.33.14', 11300); $bean->addServer('192.168.33.12', 11300); $bean->useTube('my-tube'); $bean->put('Hello World!', 1024); $bean->put('Hello World!2', 1023); $bean->put(json_encode(array('what','how')), 1000, 1, 1);
后台消费者处理任务
include 'lib/Beanstalk.php'; $bean = Beanstalk::init(); $bean->addServer('192.168.33.12', 11300); $bean->addServer('192.168.33.14', 11300); $bean->watchTube('my-tube'); while (true) { try { $job = $bean->reserve($timeout = 10); /* process job ... */ var_dump($job); //var_dump($job->getMessage()); $job->delete(); } catch (BeanstalkException $e) { switch ($e->getCode()) { case BeanstalkException::TIMED_OUT: echo "Timed out waiting for a job. Retrying in 1 second."; sleep(1); continue; break; default: throw $e; break; } } }
注意:客户端获取任务(reverse)是阻塞的(blocking),直到超时;同一个队列(tube)的任务按FIFO进行处理(除非指定优先级);任务内容长度不能超过65536;作为内存队列需要注意是否会内存超出,可以快速处理到Mysql。
使用Beanstalk任务队列提升PHP异步处理能力,降低程序耦合度,使前台更专注,后台处理耗时、扩展性任务(也可以使用其他语言开发),使得web架构更具扩展性。
参考链接:
Scalable Work Queues with Beanstalk
Beanstalk Protocol
Frequently Asked Questions for beanstalkd
Getting Started with Beanstalkd
Queue your work
Asynchronous Processing in Web Applications, Part 2: Developers Need to Understand Message Queues