Curator典型应用场景之-Master选举
分布式锁和Master选举相似点
分布式锁和 Master选举有几种相似点,实际上其实现机制也相近:
同一时刻只有一个获取锁 / 只能有一个leader
对于分布式排他锁来说,任意时刻,只能有一个进程(对于单进程内的锁是单线程)可以获得锁。
对于领导选举来说,任意时刻,只能有一个成功当选为leader。否则就会出现脑裂。
锁重入 / 确认自己是leader
对于分布式锁,需要保证获得锁的进程在释放锁之前可再次获得锁,即锁的可重入性。
对于领导选举,Leader需要能够确认自己已经获得领导权,即确认自己是Leader。
释放锁 / 放弃领导权
锁的获得者应该能够正确释放已经获得的锁,并且当获得锁的进程宕机时,锁应该自动释放,从而使得其它竞争方可以获得该锁,从而避免出现死锁的状态。
领导应该可以主动放弃领导权,并且当领导所在进程宕机时,领导权应该自动释放,从而使得其它参与者可重新竞争领导而避免进入无主状态。
感知锁释放 / 感知领导权释放
当获得锁的一方释放锁时,其它对于锁的竞争方需要能够感知到锁的释放,并再次尝试获取锁。
原来的Leader放弃领导权时,其它参与方应该能够感知该事件,并重新发起选举流程。
Curator中选举分为两种:
Leader Latch和Leader Election
Leader Latch
LeaderLatch方式就是以一种抢占方式来决定选主,是一种非公平的领导选举,谁抢到就是谁,会随机从候选者中选择一台作为leader, 选中后除非leader自己 调用close()释放leadership,否则其他的候选者不能成为leader。
选主过程
假设现在有三个zookeeper的客户端,如下图所示,同时竞争leader。这三个客户端同时向zookeeper集群注册Ephemeral且Non-sequence类型的节点,路径都为/zkroot/leader。
如上图所示,由于是Non-sequence节点,这三个客户端只会有一个创建成功,其它节点均创建失败。此时,创建成功的客户端(即上图中的Client 1)即成功竞选为 Leader 。其它客户端(即上图中的Client 2和Client 3)此时匀为 Follower。
放弃领导权
如果Leader打算主动放弃领导权,直接删除/zkroot/leader节点即可。
如果Leader进程意外宕机,其与Zookeeper间的Session也结束,该节点由于是Ephemeral类型的节点,因此也会自动被删除。
此时/zkroot/leader节点不复存在,对于其它参与竞选的客户端而言,之前的Leader已经放弃了领导权。
感知领导权的放弃
由上图可见,创建节点失败的节点,除了成为 Follower 以外,还会向/zkroot/leader注册一个 Watch ,一旦 Leader 放弃领导权,也即该节点被删除,所有的 Follower 会收到通知。
重新选举
感知到旧 Leader 放弃领导权后,所有的 Follower 可以再次发起新一轮的领导选举,如下图所示。
从上图中可见
新一轮的领导选举方法与最初的领导选举方法完全一样,都是发起节点创建请求,创建成功即为Leader,否则为Follower,且Follower会Watch该节点。
新一轮的选举结果,无法预测,与它们在第一轮选举中的顺序无关。这也是该方案被称为非公平模式的原因。
Leader Latch模式总结
1. Leader Latch实现很简单,每一轮的选举算法都一样。
2. 非公平模式,每一次选举都是随机,谁抢到就是谁的,假如是第二次选举,每个 Follower 通过 Watch 感知到节点被删除的时间不完全一样,只要有一个 Follower 得到通知即发起竞选。
3. 给zookeeper造成的负载大,假如有上万个客户端都参与竞选,意味着同时会有上万个写请求发送给 Zookeper。同时一旦 Leader 放弃领导权,Zookeeper 需要同时通知上万个 Follower,负载较大。
使用过程
相关的类
LeaderLatch
构造LeaderLatch ,构造方法如下:
public LeaderLatch(CuratorFramework client, String latchPath);
public LeaderLatch(CuratorFramework client, String latchPath, String id);
启动
通过start()方法启动之后,再等待几秒钟后,Curator会自动从中选举出Leader。
public void start() throws Exception;
可以调用实例的hasLeadership()判断该实例是否为leader。
public boolean hasLeadership();
尝试获取leadership
调用await()方法会使线程一直阻塞到获得leadership为止。
public void await() throws InterruptedException, EOFException;
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
释放leadership
只能通过close()释放leadership, 只有leader将leadership释放时,其他的候选者才有机会被选为leader
public void close() throws IOException;
public synchronized void close(CloseMode closeMode) throws IOException;
示例代码
public class TestLeaderLatch {
private static final String PATH = "/demo/leader";
/** 5个客户端 */
private static final Integer CLIENT_COUNT = 5;
public static void main(String[] args) throws Exception {
//5个线程,5个客户端
ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
for (int i = 0; i < CLIENT_COUNT ; i++) {
final int index = i;
service.submit(new Runnable() {
@Override
public void run() {
try {
new TestLeaderLatch().schedule(index);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
//休眠50秒之后结束main方法
Thread.sleep(30 * 1000);
service.shutdownNow();
}
private void schedule(int thread) throws Exception {
//获取一个client
CuratorFramework client = this.getClient(thread);
//获取一个latch
LeaderLatch latch = new LeaderLatch(client, PATH,String.valueOf(thread));
//给latch添加监听,在
latch.addListener(new LeaderLatchListener() {
@Override
public void notLeader() {
//如果不是leader
System.out.println("Client [" + thread + "] I am the follower !");
}
@Override
public void isLeader() {
//如果是leader
System.out.println("Client [" + thread + "] I am the leader !");
}
});
//开始选取 leader
latch.start();
//每个线程 休眠时间不一样,但是最大不能超过 main方法中的那个休眠时间,那个是50秒 到时候main方法结束 会中断休眠时间
Thread.sleep(2 * (thread + 5) * 1000);
if (latch != null) {
//释放leadership
//CloseMode.NOTIFY_LEADER 节点状态改变时,通知LeaderLatchListener
latch.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
}
if (client != null) {
client.close();
}
System.out.println("Client [" + latch.getId() + "] Server closed...");
}
private CuratorFramework getClient(final int thread) {
RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
// Fluent风格创建
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.58.42:2181")
.sessionTimeoutMs(1000000)
.connectionTimeoutMs(3000)
.retryPolicy(rp)
.build();
client.start();
System.out.println("Client [" + thread + "] Server connected...");
return client;
}
}
程序运行,输出以下结果:
Client [3] Server connected…
Client [2] Server connected…
Client [4] Server connected…
Client [0] Server connected…
Client [1] Server connected…
Client [1] I am the leader !
Client [0] Server closed…
Client [1] I am the follower !
Client [1] Server closed…
Client [2] I am the leader !
Client [2] I am the follower !
Client [2] Server closed…
Client [4] I am the leader !
Client [3] Server closed…
Client [4] I am the follower !
Client [4] Server closed…
在上面的程序中,启动了5个zookeeper客户端,程序会随机选中其中一个作为leader。通过注册监听的方式来判断自己是否成为leader。调用close()方法释放当前领导权。有可能优先close的并不是leader节点,但是当leader节点close的时候,可以继续在已有的节点中重新选举leader节点。
LeaderElection
上面讲了怎么使用LeaderLatch方式进行master选举,Curator提供了两种选举,一种是LeaderLatch,提供的另一种Leader选举策略是Leader Election。
跟LeaderLatch选举策略相比,LeaderElection选举策略不同之处在于每个实例都能公平获取领导权,而且当获取领导权的实例在释放领导权之后,该实例还有机会再次获取领导权。
另外,选举出来的leader不会一直占有领导权,当 takeLeadership(CuratorFramework client) 方法执行结束之后会自动释放领导权。LeaderElection属于公平的选举方式,通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。
选主过程
如下图所示,LeaderElection选举中,各客户端均创建/zkroot/leader节点,且其类型为Ephemeral与Sequence。
由于是Sequence类型节点,故上图中三个客户端均创建成功,只是序号不一样。此时,每个客户端都会判断自己创建成功的节点的序号是不是当前最小的。如果是,则该客户端为 Leader,否则即为 Follower。
在上图中,Client1 创建的节点序号为1 ,Client2创建的节点序号为2,Client3创建的节点序号为3。由于最小序号为 1 ,且该节点由Client1创建,故Client 1为 Leader 。
放弃领导权
Leader 如果主动放弃领导权,直接删除其创建的节点即可。
如果 Leader 所在进程意外宕机,其与 Zookeeper 间的 Session 结束,由于其创建的节点为Ephemeral类型,故该节点自动被删除。
感知领导权的放弃
与LeaderLatch方式不同,每个 Follower 并非都 Watch 由 Leader 创建出来的节点,而是 Watch 序号刚好比自己序号小的节点。
在上图中,总共有 1、2、3 共三个节点,因此Client 2 Watch /zkroot/leader1,Client 3 Watch /zkroot/leader2。(注:序号应该是10位数字,而非一位数字,序号最大为int最大值)。
一旦Leader弃权或者宕机,/zkroot/leader1被删除,Client2可得到通知。此时Client3由于 Watch 的是/zkroot/leader2,故不会得到通知。
重新选举
Client2得到/zkroot/leader1被删除的通知后,不会立即成为新的 Leader 。而是先判断自己的序号2是不是当前最小的序号。在该场景下,其序号确为最小。因此Client 2成为新的 Leader 。
这里要注意,如果在Client1放弃领导权之前,Client2就宕机了,Client3会收到通知。此时Client3不会立即成为Leader,而是要先判断自己的序号3是否为当前最小序号。很显然,由于Client1创建的/zkroot/leader1还在,因此Client 3不会成为新的 Leader ,并向Client2序号2 前面的序号,也即 1 创建 Watch。该过程如下图所示。
LeaderElection模式总结
扩展性好,每个客户端都只Watch 一个节点且每次节点被删除只须通知一个客户端
旧 Leader 放弃领导权时,其它客户端根据竞选的先后顺序(也即节点序号)成为新 Leader,这也是公平模式的由来。
延迟相对非公平模式要高,因为它必须等待特定节点得到通知才能选出新的 Leader。
使用过程
相关的类
LeaderSelector
LeaderSelectorListener
LeaderSelectorListenerAdapter
CancelLeadershipException
使用方法 创建 LeaderSelector
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener);
public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener);
public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);
启动
leaderSelector.start();
一旦启动,如果获取了leadership的话,takeLeadership()会被调用,只有当leader释放了leadership的时候,takeLeadership()才会返回。
释放
调用close()释放 leadership
leaderSelector.close();
示例代码
LeaderSelectorListener的实现类
实现LeaderSelectorListener 或者 继承LeaderSelectorListenerAdapter,用于定义获取领导权后的业务逻辑:
public class CustomLeaderSelectorListenerAdapter extends LeaderSelectorListenerAdapter implements Closeable {
/** 客户端名称 */
private String name;
/** leaderSelector */
private LeaderSelector leaderSelector;
/** 原子性的 用来记录获取 leader的次数 */
public AtomicInteger leaderCount = new AtomicInteger(1);
public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name ){
this.name = name;
this.leaderSelector = new LeaderSelector(client, path, this);
/**
* 自动重新排队
* 该方法的调用可以确保此实例在释放领导权后还可能获得领导权
*/
leaderSelector.autoRequeue();
}
/**
* 启动 调用leaderSelector.start()
* @throws IOException
*/
public void start() throws IOException {
leaderSelector.start();
}
/**
* 获取领导权之后执行的业务逻辑,执行完自动放弃领导权
* @param client
* @throws Exception
*/
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
final int waitSeconds = 2;
System.out.println(name + "成为当前leader" + " 共成为leader的次数:" + leaderCount.getAndIncrement() + "次");
try{
//模拟业务逻辑执行2秒
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
}catch ( InterruptedException e ){
System.err.println(name + "已被中断");
Thread.currentThread().interrupt();
}finally{
System.out.println(name + "放弃领导权");
}
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
}
多个客户端测试
public class TestLeaderElection {
private static final String PATH = "/demo/leader";
/** 3个客户端 */
private static final Integer CLIENT_COUNT = 3;
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
for (int i = 0; i < CLIENT_COUNT; i++) {
final int index = i;
service.submit(new Runnable() {
@Override
public void run() {
try {
new TestLeaderElection().schedule(index);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(30 * 1000);
service.shutdownNow();
}
private void schedule(final int thread) throws Exception {
CuratorFramework client = this.getClient(thread);
CustomLeaderSelectorListenerAdapter leaderSelectorListener =
new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + thread);
leaderSelectorListener.start();
}
private CuratorFramework getClient(final int thread) {
RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
// Fluent风格创建
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.58.42:2181")
.sessionTimeoutMs(1000000)
.connectionTimeoutMs(3000)
.retryPolicy(rp)
.build();
client.start();
System.out.println("Client [" + thread + "] Server connected...");
return client;
}
}
运行程序,输出以下内容:
Client [0] Server connected…
Client [1] Server connected…
Client [2] Server connected…
Client #2成为当前leader 共成为leader的次数:1次
Client #2放弃领导权
Client #0成为当前leader 共成为leader的次数:1次
Client #0放弃领导权
Client #1成为当前leader 共成为leader的次数:1次
Client #1放弃领导权
Client #2成为当前leader 共成为leader的次数:2次
Client #2放弃领导权
Client #0成为当前leader 共成为leader的次数:2次
Client #0放弃领导权
Client #1成为当前leader 共成为leader的次数:2次
Client #1放弃领导权
Client #2成为当前leader 共成为leader的次数:3次
Client #2放弃领导权
Client #0成为当前leader 共成为leader的次数:3次
Client #0放弃领导权
Client #1成为当前leader 共成为leader的次数:3次
Client #1放弃领导权
Client #2成为当前leader 共成为leader的次数:4次
Client #2放弃领导权
Client #0成为当前leader 共成为leader的次数:4次
Client #0放弃领导权
Client #1成为当前leader 共成为leader的次数:4次
Client #1放弃领导权
上面只是简单测试代码,并没有关闭client等操作,每个实例在获取领导权后,如果 takeLeadership(CuratorFramework client) 方法执行结束,将会释放其领导权。而且获取领导权 也是按照 Client #2, Client #0 ,Client #1 顺序来的,正好验证了它的公平性。
LeaderSelectorListener类继承了ConnectionStateListener。一旦LeaderSelector启动,它会向curator客户端添加监听器。 使用LeaderSelector必须时刻注意连接的变化。一旦出现连接问题如SUSPENDED,curator实例必须确保它可能不再是leader,直至它重新收到RECONNECTED。如果LOST出现,curator实例不再是leader并且其takeLeadership()应该直接退出。
推荐的做法是,如果发生SUSPENDED或者LOST连接问题,最好直接抛CancelLeadershipException,此时,leaderSelector实例会尝试中断并且取消正在执行takeLeadership()方法的线程。
建议扩展LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已经提供了推荐的处理方式 。