Curator典型应用场景之-Master选举

Curator典型应用场景之-Master选举

分布式锁和Master选举相似点
分布式锁和 Master选举有几种相似点,实际上其实现机制也相近:

同一时刻只有一个获取锁 / 只能有一个leader
对于分布式排他锁来说,任意时刻,只能有一个进程(对于单进程内的锁是单线程)可以获得锁。
对于领导选举来说,任意时刻,只能有一个成功当选为leader。否则就会出现脑裂。

锁重入 / 确认自己是leader
对于分布式锁,需要保证获得锁的进程在释放锁之前可再次获得锁,即锁的可重入性。
对于领导选举,Leader需要能够确认自己已经获得领导权,即确认自己是Leader。

释放锁 / 放弃领导权
锁的获得者应该能够正确释放已经获得的锁,并且当获得锁的进程宕机时,锁应该自动释放,从而使得其它竞争方可以获得该锁,从而避免出现死锁的状态。
领导应该可以主动放弃领导权,并且当领导所在进程宕机时,领导权应该自动释放,从而使得其它参与者可重新竞争领导而避免进入无主状态。

感知锁释放 / 感知领导权释放
当获得锁的一方释放锁时,其它对于锁的竞争方需要能够感知到锁的释放,并再次尝试获取锁。
原来的Leader放弃领导权时,其它参与方应该能够感知该事件,并重新发起选举流程。

Curator中选举分为两种:

Leader Latch和Leader Election

Leader Latch
LeaderLatch方式就是以一种抢占方式来决定选主,是一种非公平的领导选举,谁抢到就是谁,会随机从候选者中选择一台作为leader, 选中后除非leader自己 调用close()释放leadership,否则其他的候选者不能成为leader。

选主过程
假设现在有三个zookeeper的客户端,如下图所示,同时竞争leader。这三个客户端同时向zookeeper集群注册Ephemeral且Non-sequence类型的节点,路径都为/zkroot/leader。

www.zeeklog.com  - Curator典型应用场景之-Master选举

如上图所示,由于是Non-sequence节点,这三个客户端只会有一个创建成功,其它节点均创建失败。此时,创建成功的客户端(即上图中的Client 1)即成功竞选为 Leader 。其它客户端(即上图中的Client 2和Client 3)此时匀为 Follower。

放弃领导权
如果Leader打算主动放弃领导权,直接删除/zkroot/leader节点即可。
如果Leader进程意外宕机,其与Zookeeper间的Session也结束,该节点由于是Ephemeral类型的节点,因此也会自动被删除。
此时/zkroot/leader节点不复存在,对于其它参与竞选的客户端而言,之前的Leader已经放弃了领导权。

感知领导权的放弃
由上图可见,创建节点失败的节点,除了成为 Follower 以外,还会向/zkroot/leader注册一个 Watch ,一旦 Leader 放弃领导权,也即该节点被删除,所有的 Follower 会收到通知。

重新选举
感知到旧 Leader 放弃领导权后,所有的 Follower 可以再次发起新一轮的领导选举,如下图所示。

www.zeeklog.com  - Curator典型应用场景之-Master选举

从上图中可见
新一轮的领导选举方法与最初的领导选举方法完全一样,都是发起节点创建请求,创建成功即为Leader,否则为Follower,且Follower会Watch该节点。
新一轮的选举结果,无法预测,与它们在第一轮选举中的顺序无关。这也是该方案被称为非公平模式的原因。

Leader Latch模式总结

1. Leader Latch实现很简单,每一轮的选举算法都一样。
2. 非公平模式,每一次选举都是随机,谁抢到就是谁的,假如是第二次选举,每个 Follower 通过 Watch 感知到节点被删除的时间不完全一样,只要有一个 Follower 得到通知即发起竞选。
3. 给zookeeper造成的负载大,假如有上万个客户端都参与竞选,意味着同时会有上万个写请求发送给 Zookeper。同时一旦 Leader 放弃领导权,Zookeeper 需要同时通知上万个 Follower,负载较大。

使用过程

相关的类
LeaderLatch
构造LeaderLatch ,构造方法如下:

public LeaderLatch(CuratorFramework client, String latchPath);
public LeaderLatch(CuratorFramework client, String latchPath, String id);

启动
通过start()方法启动之后,再等待几秒钟后,Curator会自动从中选举出Leader。

public void start() throws Exception;

可以调用实例的hasLeadership()判断该实例是否为leader。

public boolean hasLeadership();

尝试获取leadership
调用await()方法会使线程一直阻塞到获得leadership为止。

public void await() throws InterruptedException, EOFException;
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;

释放leadership
只能通过close()释放leadership, 只有leader将leadership释放时,其他的候选者才有机会被选为leader

public void close() throws IOException;
public synchronized void close(CloseMode closeMode) throws IOException;

示例代码

public class TestLeaderLatch {

  private static final String PATH = "/demo/leader";
  /** 5个客户端 */
  private static final Integer CLIENT_COUNT = 5;

  public static void main(String[] args) throws Exception {
    //5个线程,5个客户端
    ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
    for (int i = 0; i < CLIENT_COUNT ; i++) {
      final int index = i;
      service.submit(new Runnable() {
        @Override
        public void run() {
          try {
            new TestLeaderLatch().schedule(index);
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      });
    }
    //休眠50秒之后结束main方法
    Thread.sleep(30 * 1000);
    service.shutdownNow();
  }

  private void schedule(int thread) throws Exception {

    //获取一个client
    CuratorFramework client = this.getClient(thread);
    //获取一个latch
    LeaderLatch latch = new LeaderLatch(client, PATH,String.valueOf(thread));

    //给latch添加监听,在
    latch.addListener(new LeaderLatchListener() {

      @Override
      public void notLeader() {
        //如果不是leader
        System.out.println("Client [" + thread + "] I am the follower !");
      }

      @Override
      public void isLeader() {
        //如果是leader
        System.out.println("Client [" + thread + "] I am the leader !");
      }
    });

    //开始选取 leader
    latch.start();

    //每个线程 休眠时间不一样,但是最大不能超过 main方法中的那个休眠时间,那个是50秒 到时候main方法结束 会中断休眠时间
    Thread.sleep(2 * (thread + 5) * 1000);
    if (latch != null) {
      //释放leadership
      //CloseMode.NOTIFY_LEADER 节点状态改变时,通知LeaderLatchListener
      latch.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
    }
    if (client != null) {
      client.close();
    }
    System.out.println("Client [" + latch.getId() + "] Server closed...");
  }

  private CuratorFramework getClient(final int thread) {
    RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
    // Fluent风格创建
    CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString("192.168.58.42:2181")
        .sessionTimeoutMs(1000000)
        .connectionTimeoutMs(3000)
        .retryPolicy(rp)
        .build();
    client.start();
    System.out.println("Client [" + thread + "] Server connected...");
    return client;
  }

}

程序运行,输出以下结果:

Client [3] Server connected…
Client [2] Server connected…
Client [4] Server connected…
Client [0] Server connected…
Client [1] Server connected…
Client [1] I am the leader !
Client [0] Server closed…
Client [1] I am the follower !
Client [1] Server closed…
Client [2] I am the leader !
Client [2] I am the follower !
Client [2] Server closed…
Client [4] I am the leader !
Client [3] Server closed…
Client [4] I am the follower !
Client [4] Server closed…

在上面的程序中,启动了5个zookeeper客户端,程序会随机选中其中一个作为leader。通过注册监听的方式来判断自己是否成为leader。调用close()方法释放当前领导权。有可能优先close的并不是leader节点,但是当leader节点close的时候,可以继续在已有的节点中重新选举leader节点。

LeaderElection
上面讲了怎么使用LeaderLatch方式进行master选举,Curator提供了两种选举,一种是LeaderLatch,提供的另一种Leader选举策略是Leader Election。

跟LeaderLatch选举策略相比,LeaderElection选举策略不同之处在于每个实例都能公平获取领导权,而且当获取领导权的实例在释放领导权之后,该实例还有机会再次获取领导权。

另外,选举出来的leader不会一直占有领导权,当 takeLeadership(CuratorFramework client) 方法执行结束之后会自动释放领导权。LeaderElection属于公平的选举方式,通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。

选主过程
如下图所示,LeaderElection选举中,各客户端均创建/zkroot/leader节点,且其类型为Ephemeral与Sequence。

由于是Sequence类型节点,故上图中三个客户端均创建成功,只是序号不一样。此时,每个客户端都会判断自己创建成功的节点的序号是不是当前最小的。如果是,则该客户端为 Leader,否则即为 Follower。

www.zeeklog.com  - Curator典型应用场景之-Master选举

在上图中,Client1 创建的节点序号为1 ,Client2创建的节点序号为2,Client3创建的节点序号为3。由于最小序号为 1 ,且该节点由Client1创建,故Client 1为 Leader 。

放弃领导权
Leader 如果主动放弃领导权,直接删除其创建的节点即可。
如果 Leader 所在进程意外宕机,其与 Zookeeper 间的 Session 结束,由于其创建的节点为Ephemeral类型,故该节点自动被删除。

感知领导权的放弃
与LeaderLatch方式不同,每个 Follower 并非都 Watch 由 Leader 创建出来的节点,而是 Watch 序号刚好比自己序号小的节点。

在上图中,总共有 1、2、3 共三个节点,因此Client 2 Watch /zkroot/leader1,Client 3 Watch /zkroot/leader2。(注:序号应该是10位数字,而非一位数字,序号最大为int最大值)。
一旦Leader弃权或者宕机,/zkroot/leader1被删除,Client2可得到通知。此时Client3由于 Watch 的是/zkroot/leader2,故不会得到通知。

重新选举
Client2得到/zkroot/leader1被删除的通知后,不会立即成为新的 Leader 。而是先判断自己的序号2是不是当前最小的序号。在该场景下,其序号确为最小。因此Client 2成为新的 Leader 。

www.zeeklog.com  - Curator典型应用场景之-Master选举

这里要注意,如果在Client1放弃领导权之前,Client2就宕机了,Client3会收到通知。此时Client3不会立即成为Leader,而是要先判断自己的序号3是否为当前最小序号。很显然,由于Client1创建的/zkroot/leader1还在,因此Client 3不会成为新的 Leader ,并向Client2序号2 前面的序号,也即 1 创建 Watch。该过程如下图所示。

www.zeeklog.com  - Curator典型应用场景之-Master选举

LeaderElection模式总结

扩展性好,每个客户端都只Watch 一个节点且每次节点被删除只须通知一个客户端
旧 Leader 放弃领导权时,其它客户端根据竞选的先后顺序(也即节点序号)成为新 Leader,这也是公平模式的由来。
延迟相对非公平模式要高,因为它必须等待特定节点得到通知才能选出新的 Leader。

使用过程

相关的类
LeaderSelector
LeaderSelectorListener
LeaderSelectorListenerAdapter
CancelLeadershipException

使用方法 创建 LeaderSelector

public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener);
public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener);
public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);

启动

leaderSelector.start();

一旦启动,如果获取了leadership的话,takeLeadership()会被调用,只有当leader释放了leadership的时候,takeLeadership()才会返回。

释放
调用close()释放 leadership

leaderSelector.close();

示例代码
LeaderSelectorListener的实现类
实现LeaderSelectorListener 或者 继承LeaderSelectorListenerAdapter,用于定义获取领导权后的业务逻辑:

public class CustomLeaderSelectorListenerAdapter extends LeaderSelectorListenerAdapter implements Closeable {

    /** 客户端名称 */
    private String name;
    /** leaderSelector */
    private LeaderSelector leaderSelector;
    /** 原子性的 用来记录获取 leader的次数 */
    public AtomicInteger leaderCount = new AtomicInteger(1);

    public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name ){
        this.name = name;
        this.leaderSelector = new LeaderSelector(client, path, this);

        /**
         * 自动重新排队
         * 该方法的调用可以确保此实例在释放领导权后还可能获得领导权
         */
        leaderSelector.autoRequeue();
    }

    /**
     * 启动  调用leaderSelector.start()
     * @throws IOException
     */
    public void start() throws IOException {
        leaderSelector.start();
    }

    /**
     * 获取领导权之后执行的业务逻辑,执行完自动放弃领导权
     * @param client
     * @throws Exception
     */
    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = 2;
        System.out.println(name + "成为当前leader" + " 共成为leader的次数:" + leaderCount.getAndIncrement() + "次");
        try{
            //模拟业务逻辑执行2秒
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        }catch ( InterruptedException e ){
            System.err.println(name + "已被中断");
            Thread.currentThread().interrupt();
        }finally{
            System.out.println(name + "放弃领导权");
        }
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }
}

多个客户端测试

public class TestLeaderElection {

  private static final String PATH = "/demo/leader";
  /** 3个客户端 */
  private static final Integer CLIENT_COUNT = 3;

  public static void main(String[] args) throws Exception {
    ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);

    for (int i = 0; i < CLIENT_COUNT; i++) {
      final int index = i;
      service.submit(new Runnable() {
        @Override
        public void run() {
          try {
            new TestLeaderElection().schedule(index);
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      });
    }

    Thread.sleep(30 * 1000);
    service.shutdownNow();
  }

  private void schedule(final int thread) throws Exception {
    CuratorFramework client = this.getClient(thread);
    CustomLeaderSelectorListenerAdapter leaderSelectorListener =
        new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + thread);
    leaderSelectorListener.start();
  }

  private CuratorFramework getClient(final int thread) {
    RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
    // Fluent风格创建
    CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString("192.168.58.42:2181")
        .sessionTimeoutMs(1000000)
        .connectionTimeoutMs(3000)
        .retryPolicy(rp)
        .build();
    client.start();
    System.out.println("Client [" + thread + "] Server connected...");
    return client;
  }

}

运行程序,输出以下内容:

Client [0] Server connected…
Client [1] Server connected…
Client [2] Server connected…
Client #2成为当前leader 共成为leader的次数:1次
Client #2放弃领导权
Client #0成为当前leader 共成为leader的次数:1次
Client #0放弃领导权
Client #1成为当前leader 共成为leader的次数:1次
Client #1放弃领导权
Client #2成为当前leader 共成为leader的次数:2次
Client #2放弃领导权
Client #0成为当前leader 共成为leader的次数:2次
Client #0放弃领导权
Client #1成为当前leader 共成为leader的次数:2次
Client #1放弃领导权
Client #2成为当前leader 共成为leader的次数:3次
Client #2放弃领导权
Client #0成为当前leader 共成为leader的次数:3次
Client #0放弃领导权
Client #1成为当前leader 共成为leader的次数:3次
Client #1放弃领导权
Client #2成为当前leader 共成为leader的次数:4次
Client #2放弃领导权
Client #0成为当前leader 共成为leader的次数:4次
Client #0放弃领导权
Client #1成为当前leader 共成为leader的次数:4次
Client #1放弃领导权

上面只是简单测试代码,并没有关闭client等操作,每个实例在获取领导权后,如果 takeLeadership(CuratorFramework client) 方法执行结束,将会释放其领导权。而且获取领导权 也是按照 Client #2, Client #0 ,Client #1 顺序来的,正好验证了它的公平性。

LeaderSelectorListener类继承了ConnectionStateListener。一旦LeaderSelector启动,它会向curator客户端添加监听器。 使用LeaderSelector必须时刻注意连接的变化。一旦出现连接问题如SUSPENDED,curator实例必须确保它可能不再是leader,直至它重新收到RECONNECTED。如果LOST出现,curator实例不再是leader并且其takeLeadership()应该直接退出。

推荐的做法是,如果发生SUSPENDED或者LOST连接问题,最好直接抛CancelLeadershipException,此时,leaderSelector实例会尝试中断并且取消正在执行takeLeadership()方法的线程。
建议扩展LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已经提供了推荐的处理方式 。