标签归档:zookeeper

基于Swoole和Zookeeper的分布式ID生成器

大部分业务都需要一个唯一标识ID,比如订单ID、消息ID,通常使用的ID就是数据库的自增ID,比如MySQL的AUTO_INCREMENT;有时候这个ID还需要在不同系统里面传递、保存,又要保证唯一性。单机MySQL在高并发请求下面又可能存在锁/性能问题,于是Flicker使用两台MySQL来生成ID,一台从0开始,一台从1开始,步长为2,这样两台生成的ID不会互相重复,这个方案也可以扩展成N台,自增步长为N即可。
作为一个分布式ID应当避免在不同节点同步ID信息,通常都是基于时间戳和机器信息来生成。比如MongoDB的ObjectId是提前生成的为12字节=4字节UNIX时间戳+3字节机器码+2字节进程ID+2字节计数序列。
如果不需要访问数据库即能生成ID,性能可以更高。比如UUID V1,基于时间戳和网卡,采用128位,可以生成范围非常广的ID,但是生成的16进制值的36位字符串不好排序。在MySQL里面可以通过调整机器码(MAC)和时间戳位置顺序,并采用binary来存储以提高性能
Twitter开源的Snowflake则生成64位数字ID,包括41时间戳,10位机器码/节点码,12位计数序列,另外1位保留。采用基于时间戳的数字ID的好处是这个ID可以当作主键,并且已经粗略按时间排好序,可以直接分页读取,省去在时间字段上建立索引。

分布式ID通常需要用到机器信息(节点ID或MAC),一个机器通常只运行一个服务进程,所以通常不采用Nginx/Apache + PHP。参考这里实现一个基于Swoole和Zookpeer的64位ID生成器。基于Swoole可以快速开发一个Web/Socket server,不同于Apache/Nginx,它的PHP进程启动后是常驻运行的,资源初始化后可以重复使用,使用Zookpeer来获取当前进程的节点ID,一旦PHP进程退出后便会销毁对应的节点ID。
首先是生成ID

<?php

declare(strict_types=1);

namespace Dig\Ticket;

use Dig\Ticket\Exception\IllegalTimeException;

class Number
{
    public const TOTAL_BIT = 64;
    public const EPOCH_BIT = 42;
    public const NODE_BIT = 10;
    public const SEQUENCE_BIT = 12;

    public const MAX_NODE_ID = 2 ** self::NODE_BIT - 1;
    public const MAX_SEQUENCE_NUMBER = 2 ** self::SEQUENCE_BIT - 1;
    public const CUSTOM_EPOCH = 1262332800000;

    private $lastTimestamp = 0;
    private $sequence = 0;

    private $nodeId = 0;

    public function __construct(int $nodeId)
    {
        $this->nodeId = $nodeId;
    }

    public function getNodeId(): int
    {
        return $this->nodeId;
    }

    public function getTimestamp(): int
    {
        return (int) (\microtime(true) * 1000) - self::CUSTOM_EPOCH;
    }

    public function generate(): int
    {
        $current = $this->getTimestamp();
        if ($current < $this->lastTimestamp) {
            throw new IllegalTimeException('current timestamp cannot less than before');
        }
        if ($current === $this->lastTimestamp) {
            $this->sequence = ($this->sequence + 1) & self::MAX_SEQUENCE_NUMBER;
            if (0 === $this->sequence) {
                $current = $this->_waitNextTimestamp($current);
            }
        } else {
            $this->sequence = 0;
        }
        $this->lastTimestamp = $current;
        $id = $current << (self::TOTAL_BIT - self::EPOCH_BIT);
        $id = $id | ($this->getNodeId() << (self::TOTAL_BIT - self::EPOCH_BIT - self::NODE_BIT));
        $id = $id | $this->sequence;

        return $id;
    }

    private function _waitNextTimestamp($current)
    {
        while ($current === $this->lastTimestamp) {
            $current = $this->getTimestamp();
        }

        return $current;
    }
}

这里只涉及到ID生成,包括时间戳,序列号获取,而节点ID由其他对象生成并传入。2的42次方减1等于4398046511103,大概就是2109年5月15日,可以使用(2^42-1)/(365*24*60*60*1000)≈139年,距离现在还有90年可以用,仍然是一个非常大的可使用范围。CUSTOM_EPOC是自定义的时间戳偏移量,以便选取合适的ID生成下限和上限。距离现在节点生成的接口定义

<?php

declare(strict_types=1);

namespace Dig\Ticket;

interface NodeInterface
{
    public const MAX_NODE_ID = 2 ** Number::NODE_BIT - 1;

    public function getId(): int;
}

这里定义了最大节点序号不能超过1023,可以依据自己的需求更改范围。节点ID的实现可以是基于网卡/进程ID/文件配置等等实现,但是不同机器或多进程之间需要不一样的ID或者需要锁保证上面的generate函数。

<?php

declare(strict_types=1);

namespace Dig\Zookeeper;

class Client extends \Zookeeper
{
    public function makePath(string $path, string $value = ''): bool
    {
        $arrPath = \explode('/', $path);
        if (!empty($arrPath)) {
            $arrPath = \array_filter($arrPath);
            $subpath = '';
            $flag = true;
            foreach ($arrPath as $p) {
                $subpath .= '/'.$p;
                if (!$this->exists($subpath)) {
                    if (!$this->makeNode($subpath, $value)) {
                        $flag = false;
                        break;
                    }
                }
            }

            return $flag;
        }

        return false;
    }

    public function makeNode(string $path, string $value, array $acls = [], int $flag = 0): bool
    {
        if (empty($acls)) {
            $acls = [
                [
                    'perms' => \Zookeeper::PERM_ALL,
                    'scheme' => 'world',
                    'id' => 'anyone',
                ],
            ];
        }
        if ($this->create($path, $value, $acls, $flag)) {
            return true;
        }

        return false;
    }

    public function deletePath(string $path): bool
    {
        $children = $this->getChildren($path);
        if (!empty($children)) {
            foreach ($children as $child) {
                $subpath = $path.'/'.$child;
                $this->deletePath($subpath);
            }
        }

        return $this->delete($path);
    }
}

这里使用Zookeeper实现

<?php

declare(strict_types=1);

namespace Dig\Ticket\Node;

use Dig\Ticket\Exception\UnavailableNodeIdException;
use Dig\Ticket\NodeInterface;
use Dig\Zookeeper\Client;

class Zookeeper implements NodeInterface
{
    private $zk;
    private $dsn;
    private $pool;
    private $basePath = '/dig/ticket';
    private $acls = [
        [
            'perms' => \Zookeeper::PERM_ALL,
            'scheme' => 'world',
            'id' => 'anyone',
        ],
    ];
    private $id;

    public function __construct(string $dsn, string $path = '/sim/ticket')
    {
        $this->dsn = $dsn;
        $this->pool = new \SplQueue();
        if (!empty($path)) {
            $this->basePath = $path;
        }
    }

    public function getZookeeper(): Client
    {
        if (null === $this->zk) {
            $this->zk = new Client($this->dsn);
        }

        return $this->zk;
    }

    public function getId(): int
    {
        if (null === $this->id) {
            if (!$this->getZookeeper()->exists($this->basePath)) {
                $this->getZookeeper()->makePath($this->basePath);
            }
            $i = 1;
            $length = \mb_strlen((string) self::MAX_NODE_ID);
            $nodeId = \sprintf('%0'.$length.'d', $i);
            $children = $this->getZookeeper()->getChildren($this->basePath);
            $children = empty($children) ? [] : $children;
            for (; $i <= self::MAX_NODE_ID; ++$i) {
                $nodeId = \sprintf('%0'.$length.'d', $i);
                if (!\in_array($nodeId, $children)) {
                    $path = $this->basePath.'/'.$nodeId;
                    if ($this->getZookeeper()->exists($path)) {
                        //throw new UnavailableNodeIdException('node already exist: '.$path);
                        continue;
                    }
                    try {
                        $this->getZookeeper()->makeNode($path, $nodeId, $this->acls, \Zookeeper::EPHEMERAL);
                        break;
                    } catch (\ZookeeperException $e) {
                        //throw new UnavailableNodeIdException('cannot create node in zookeeper: '.$e->getMessage());
                        continue;
                    }
                }
            }
            if (self::MAX_NODE_ID === $i) {
                throw new UnavailableNodeIdException('cannot create node in zookeeper: reach max node limit '.self::MAX_NODE_ID);
            }
            $this->id = $i;
        }

        return $this->id;
    }
}

这里遍历查询1-1023之间的节点是否都已在Zookeeper上注册,如果没有则注册,Zookeeper会保证只有一个客户端注册成功。注册的节点类型位Zookeeper::EPHEMERAL,在客户端退出时,该节点会被自动删除,方便其他机器/进程申请。在这篇文章里面我们也使用Zookeeper::EPHEMERAL配合Zookeeper::EPHEMERAL,生成序列号,用来确定进程的master/slave。
初始化并运行Swoole Web server,需要传入Zookeeper的连接字符串,可以使用docker快速部署

<?php
include __DIR__.'/../vendor/autoload.php';
use Dig\Ticket\Number;
use Dig\Ticket\Node\Zookeeper as ZookeeperNode;


/** 
 * swoole - zookeeper tick dispatch issue: https://github.com/php-zookeeper/php-zookeeper
*/
$host = getenv("ZOOKEEPER_CONNECTION");
$host = empty($host) ? "192.168.33.1:2181" : $host;
$node = new ZookeeperNode($host);

$http = new \Swoole\Http\Server("0.0.0.0", 9501);

$http->on("start", function ($server) {
    echo "Swoole http server is started at http://0.0.0.0:9501\n";
});

$http->on("WorkerStart", function ($server, $workerId) use($node) {
    // https://wiki.swoole.com/wiki/page/325.html
    // https://wiki.swoole.com/wiki/page/852.html
    // https://wiki.swoole.com/wiki/page/865.html
    // use lazy initial zk here, so that each worker can hold its own zk resource
    // if we only run swoole http server in 1 worker process (1 CPU), then no need to consider this
    $id = $node->getId();
    $server->nodeId = $id;
    $server->number = new Number($server->nodeId);
});

$http->on("request", function ($request, $response) use ($http) {
    $data = $http->number->generate();
    $response->end($data);
});

$http->start();

访问本机的9501端口即可以得到ID了,完整代码在这里。Swoole默认运行与CPU核数量相同的worker进程数,注意这里需要WorkerStart里初始化获取Node节点ID,如果只是运行一个Swoole worker进程,也可以在外面获取节点ID。可以将Swool\Htpp\Server替换成React\Http\Server或者Amp\Http\Server,它们在单个进程里面loop,每个进程分别持有自己的节点序号,可以保证生成的ID不冲突,性能方面Swoole > Amp > ReactPHP。
可以采用Swoole\Server + thrift/gRPC改造这些代码,提供RPC服务。
注意ID的生成是随时间递增的,依赖于时间戳,如果出现了时间回拨,将会抛出异常。一般解决方案包括:

  • 等待重试
  • 使用Int64原子自增量代替时间戳,跳过时间戳判断
  • 使用预留的节点ID
  • 关闭时钟同步
  • 使用备选自增量方案

生成的ID并不是严格递增的,只是千分一秒递增,对于微博、Twiter的Timeline够用;但也有好处,比如别人不能通过ID相减了解美团的订单量。

参考链接:
如何设计一个分布式ID生成器(Distributed ID Generator),并保证ID按时间粗略有序?
生成全局唯一ID的3个思路,来自一个资深架构师的总结
Distributed unique id generation
Unique ID generation in distributed systems
Optimised UUIDs in mysql
Storing UUID Values in MySQL Tables
Mysql 8.0: UUID support
How to store a 128 bit number in a single column in MySQL?
Generating unique IDs in a distributed environment at high scale
Leaf——美团点评分布式ID生成系统
分布式ID增强篇–优化时钟回拨问题
Ticket Servers: Distributed Unique Primary Keys on the Cheap
Sharding & IDs at Instagram

PHP ZooKeeper分布式应用开发

ZooKeeper是一个中心化服务,用于分布式应用下的配置同步和协调,提供统一配置服务,统一命名服务,分布式同步,集群管理等。Zookeeper 从设计模式角度来看,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。ZooKeeper的应用场景包括:统一命名服务;配置管理;集群管理;队列管理等。

ZooKeeper作为一个Java应用程序,有大神开发了PHP的扩展:php-zookeeper。利用ZooKeeper,我们可以让分布式的PHP应用程序协调产生leader,为woker分配任务,当leader崩溃时,自动选举产生leader;也可以作分布式的锁和队列。

ZooKeeper本身是一个集群,至少需要表示3台,只要超过半数节点正常就可以工作,避免单点故障。首先需要安装JDK环境

yum search java | grep 'java-'
sudo yum install java-1.8.0-openjdk-devel

然后安装ZooKeeper,从官网下载

tar zxfv zookeeper-3.4.6.tar.gz
cd zookeeper-3.4.6/src/c
./configure --prefix=/usr/
make
sudo make install

#创建libzookeeper.conf,内容为/usr/lib,以便编译扩展使用
sudo vim /etc/ld.so.conf.d/libzookeeper.conf
#使配置生效
sudo ldconfig

然后安装PHP的扩展

cd
git clone https://github.com/andreiz/php-zookeeper.git
cd php-zookeeper
phpize
./configure
make
sudo make install

更改php.ini配置,增加以下内容

[zookeeper]
extension = zookeeper.so

查看是否加载成功

php -m | grep zookeeper

更改ZooKeeper配置,可以改变里面的DataDir熟悉,默认在/tmp下面

cp conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg

然后终端A里面运行ZooKeeper,通过shell进行交互

cd zookeeper-3.4.6/bin
./zkServer.sh start
./zkCli.sh -server 127.0.0.1:2181
create /test hello
;Created /test
ls /
;[test, zookeeper]

这时便已成功连到了ZooKeeper,并创建了一个名为“/test”的znode。ZooKeeper以树形结构保存数据。这很类似于文件系统,但“文件夹”又和文件很像。znode是ZooKeeper保存的实体。

新建一个PHP脚本来测试一下

<?php
 
class ZookeeperDemo extends Zookeeper {
 
  public function watcher( $i, $type, $key ) {
    echo "Insider Watcher\n";
 
    // Watcher gets consumed so we need to set a new one
    $this->get( '/test', array($this, 'watcher' ) );
  }
 
}
 
$zoo = new ZookeeperDemo('127.0.0.1:2181');
$zoo->get( '/test', array($zoo, 'watcher' ) );
 
while( true ) {
  echo '.';
  sleep(2);
}

在新的终端B里面运行这个脚本

$ php zookeeperdemo1.php

返回刚才的那个终端A里面,改变节点“/test”存储的数据

set /test world

这时候在终端B里面变化打印“Insider Watcher”。注意:这里注册的回到函数仅支持对象的方法,不支持普通的函数。

前面说过,ZooKeeper是一个基于观察者模式设计的分布式服务管理框架。Zookeeper提供了绑定在znode上的监听器,一旦监听到znode数据发生变化,便会通知所有注册的客户端。所以也可以应用于发布订阅模式。

这篇文章还举例,如何让多个PHP脚本自动选举leader,分配工作。

<?php
 
class Worker extends Zookeeper {
 
  const CONTAINER = '/cluster';
 
  protected $acl = array(
                    array(
                      'perms' => Zookeeper::PERM_ALL,
                      'scheme' => 'world',
                      'id' => 'anyone' ) );
 
  private $isLeader = false;
 
  private $znode;
 
  public function __construct( $host = '', $watcher_cb = null, $recv_timeout = 10000 ) {
    parent::__construct( $host, $watcher_cb, $recv_timeout );
  }
 
  public function register() {
    if( ! $this->exists( self::CONTAINER ) ) {
      $this->create( self::CONTAINER, null, $this->acl );
    }
 
    //Zookeeper::EPHEMERAL - auto remove if client session goes away
    //Zookeeper::EPHEMERAL - auto increasing sequence number
    $this->znode = $this->create( self::CONTAINER . '/w-',
                                  null,
                                  $this->acl,
                                  Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE );
 
    $this->znode = str_replace( self::CONTAINER .'/', '', $this->znode );
 
    printf( "I'm registred as: %s\n", $this->znode );
 
    $watching = $this->watchPrevious();
 
    if( $watching == $this->znode ) {
      printf( "Nobody here, I'm the leader\n" );
      $this->setLeader( true );
    }
    else {
      printf( "I'm watching %s\n", $watching );
    }
  }
 
  public function watchPrevious() {
    $workers = $this->getChildren( self::CONTAINER );
    sort( $workers );
    $size = sizeof( $workers );
    for( $i = 0 ; $i < $size ; $i++ ) {
      if( $this->znode == $workers[ $i ] ) {
        if( $i > 0 ) {
          //for node path change event
          $this->get( self::CONTAINER . '/' . $workers[ $i - 1 ], array( $this, 'watchNode' ) );
          //for node path exist event
          $this->exists( self::CONTAINER . '/' . $workers[ $i - 1 ], array( $this, 'watchNode' ) );
          return $workers[ $i - 1 ];
        }
 
        return $workers[ $i ];
      }
    }
 
    throw new Exception(  sprintf( "Something went very wrong! I can't find myself: %s/%s",
                          self::CONTAINER,
                          $this->znode ) );
  }
 
  public function watchNode( $i, $type, $name ) {
    $watching = $this->watchPrevious();
    if( $watching == $this->znode ) {
      printf( "I'm the new leader!\n" );
      $this->setLeader( true );
    }
    else {
      printf( "Now I'm watching %s\n", $watching );
    }
  }
 
  public function isLeader() {
    return $this->isLeader;
  }
 
  public function setLeader($flag) {
    $this->isLeader = $flag;
  }
 
  public function run() {
    $this->register();
 
    while( true ) {
      if( $this->isLeader() ) {
        $this->doLeaderJob();
    }
    else {
      $this->doWorkerJob();
    }
 
      sleep( 2 );
    }
  }
 
  public function doLeaderJob() {
    echo "Leading\n";
  }
 
  public function doWorkerJob() {
    echo "Working\n";
  }
 
}
//host can be multiple, e.g '127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183' 
$worker = new Worker( '127.0.0.1:2181' );
$worker->run();

打开多个终端运行这个脚本。使用Ctrl+c或其他方法退出第一个脚本。刚开始不会有任何变化,worker可以继续工作。后来,ZooKeeper会发现超时,并选举出新的leader。

除此之外,利用这个扩展还可以实现一下其他的应用场景,比如排他锁和共享锁:php-zookeeper-recipes

参考链接:
Distributed application in PHP with Apache Zookeeper
分布式服务框架 Zookeeper — 管理分布式环境中的数据
使用Apache Zookeeper分布式部署PHP应用程序
分布式服务框架:Zookeeper