Curator典型应用场景之-事件监听
Zookeeper原生就支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册Watcher,比较繁琐。Curator引入了Cache来实现对zookeeper服务端事件的监听,Cache是Curator中对事件的包装,其对事件的监听其实可以近似的看做是一个本地缓存视图和远程Zookeeper视图的对比过程。同时Curator能够自动为开发人员处理反复注册监听,从而大大简化原生API开发的繁琐过程。Cache分为NodeCache,PathChildrenCache和TreeCache。
NodeCache
NodeCache用于监听指定ZooKeeper数据节点本身的变化,其构造方法有如下两个:
public NodeCache(CuratorFramework client, String path);
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);
NodeCache构造方法参数说明如下:
client:Curator客户端实例
path:数据节点的节点路径
dataIsCompressed:是否进行数据压缩
同时NodeCache定义了事件处理的回调接口NodeCacheListener如下:
public interface NodeCacheListener{
/**
* Called when a change has occurred
*/
public void nodeChanged() throws Exception;
}
当ZNode的内容发生变化时,就会回调该方法。下面通过一个实际的例子来看如何在代码中使用NodeCache。
public class NodeCache_Sample {
static String path = "/zk-book/nodecache";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.58.42:2181")
.sessionTimeoutMs(60000)
.connectionTimeoutMs(15000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
client.start();
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path,"init".getBytes());
final NodeCache cache = new NodeCache(client,path,false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = cache.getCurrentData();
String data = currentData == null ? "" : new String(currentData.getData());
System.out.println("=====> Node data update, new Data: "+data);
}
});
client.setData().forPath(path,"i love you".getBytes());
Thread.sleep(1000);
client.delete().deletingChildrenIfNeeded().forPath(path);
Thread.sleep(10000);
cache.close();
client.close();
}
}
程序运行输出以下内容:
=====> Node data update, new Data: i love you
=====> Node data update, new Data:
NodeCache使用start()方法开启缓存,使用完后调用close()方法关闭它。在上面的例子中,首先构造了一个NodeCache实例,然后再调用start方法,该方法有一个Boolean类型的参数,默认是false,如果设置为true,那么NodeCache在第一次启动的时候就会立刻从ZooKeeper上读取该节点的数据内容,并保存在Cache中。
NodeCache不仅可以用于监听数据节点的内容变更,也可能监听指定节点是否存在。如果原节点不存在,那么Cache就会在节点被创建后触发NodeCacheListenter。节点的删除操作也会触发NodeCacheListenter有很多文章说删除不会触发其实是错误的,我测试的时候是可以触发的。
PathChildrenCache
PathChildrenCache是用来监听指定节点 的子节点变化情况,共有以下几种构造方法(不包括过时的方法):
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService);
PathChildrenCache构造方法参数说明:
client: Curator客户端实例
path:数据节点路径
cacheData:用于配置是否把节点内容缓存起来,如果配置为true。那么客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容;如果配置为false则无法获取到节点的数据内容。
dataIsCompressed:是否进行数据压缩
threadFactory:利于这个参数,开发者可以通过构造一个专门的线程池,来处理事件通知
executorService:自定义线程池
同时PathChildrenCache定义了事件处理的回调接口PathChildrenCacheListener,其定义如下:
public interface PathChildrenCacheListener{
/**
* Called when a change has occurred
*
* @param client the client
* @param event describes the change
* @throws Exception errors
*/
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}
当指定节点的子节点发生变化时,就会回调该方法。PathChildrenCacheListener类中定义了所有的事件类型,主要包括新增子节点(CHILD_ADDED)、子节点数据变更(CHILD_UPDATED)、和子节点的删除(CHILD_REMOVED)三类。
下面通过一个实际例子来看看如何在代码中使用PathChildrenCache。
public class PathChildrenCache_Sample {
static String path = "/zk-book2";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.58.42:2181")
.sessionTimeoutMs(60000)
.connectionTimeoutMs(15000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
client.start();
PathChildrenCache cache = new PathChildrenCache(client,path,true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("=====> CHILD_ADDED : "+ event.getData().getPath() +" 数据:"+ new String(event.getData().getData()));
break;
case CHILD_REMOVED:
System.out.println("=====> CHILD_REMOVED : "+ event.getData().getPath() +" 数据:"+ new String(event.getData().getData()));
break;
case CHILD_UPDATED:
System.out.println("=====> CHILD_UPDATED : "+ event.getData().getPath() +" 数据:"+ new String(event.getData().getData()));
break;
default:
break;
}
}
});
client.create().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
Thread.sleep(1000);
client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
Thread.sleep(1000);
client.setData().forPath(path+"/c1","I love you".getBytes());
Thread.sleep(1000);
client.delete().forPath(path+"/c1");
Thread.sleep(1000);
client.delete().forPath(path);
Thread.sleep(10000);
cache.close();
client.close();
}
}
程序运行输出以下内容:
=====> CHILD_ADDED : /zk-book2/c1 数据:127.0.0.1
=====> CHILD_UPDATED : /zk-book2/c1 数据:I love you
=====> CHILD_REMOVED : /zk-book2/c1 数据:I love you
在上面的示例程序中,对/zk-book2节点进行了子节点变更事件的监听,一旦该节点新增/删除子节点或者子节点的数据发生改变时,就会回调PathChildrenCacheListener,并根据对应的事件类型进行相关的处理。同时,我们看到,对于节点/zk-book2本身的变更,并没有通知到客户端。
另外,和其他ZooKeeper客户端产品一样,Curator也无法对二级子节点进行事件监听。也就是说,如果使用PathChildrenCacheListener对/zk-book2进行监听,那么当/zk-book/c1/c2节点被创建或者删除的时候,是无法触发子节点变更事件的。
TreeCache
结合NodeCache和PathChildrenCahce的特性,是对整个目录进行监听。共有以下几种构造方法:
public TreeCache(CuratorFramework client, String path);
TreeCache只有一个构造方法,其内部还有一个默认访问修饰符修饰的构造方法,如下:
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector);
TreeCache构造方法参数说明:
client: Curator客户端实例
path:数据节点路径
cacheData:用于配置是否把节点内容缓存起来,如果配置为true。那么客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容;如果配置为false则无法获取到节点的数据内容。
dataIsCompressed:是否进行数据压缩
maxDepth:比如当前监听节点/t1,目录最深为/t1/t2/t3/t4,则maxDepth=3,说明下面3级子目录全监听,即监听到t4,如果为2,则监听到t3,对t3的子节点操作不再触发,默认maxDepth最大值2147483647
executorService:自定义线程池
createParentNodes:如果父节点不存在,是否需要创建父节点,默认情况下,不会自动创建path的父节点
TreeCacheSelector:用于区分哪些节点作为缓存使用
下面通过一个实际例子来看看如何在代码中使用TreeCache。
public class TreeCache_Sample {
static String path = "/zk-book3";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.58.42:2181")
.sessionTimeoutMs(60000)
.connectionTimeoutMs(15000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
client.start();
TreeCache cache = new TreeCache(client,path);
cache.start();
//添加错误监听器
cache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
@Override
public void unhandledError(String s, Throwable throwable) {
System.out.println("======> 错误原因:" + throwable.getMessage());
}
});
//节点变化的监听器
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
switch (treeCacheEvent.getType()) {
case INITIALIZED:
System.out.println("=====> INITIALIZED : 初始化");
break;
case NODE_ADDED:
System.out.println("=====> NODE_ADDED : "+ treeCacheEvent.getData().getPath() +" 数据:"+ new String(treeCacheEvent.getData().getData()));
break;
case NODE_REMOVED:
System.out.println("=====> NODE_REMOVED : "+ treeCacheEvent.getData().getPath() +" 数据:"+ new String(treeCacheEvent.getData().getData()));
if("/zk-book3".equals(treeCacheEvent.getData().getPath())){
throw new RuntimeException("=====> 测试异常监听UnhandledErrorListener");
}
break;
case NODE_UPDATED:
System.out.println("=====> NODE_UPDATED : "+ treeCacheEvent.getData().getPath() +" 数据:"+ new String(treeCacheEvent.getData().getData()));
break;
default:
System.out.println("=====> treeCache Type:" + treeCacheEvent.getType());
break;
}
}
});
client.create().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
Thread.sleep(3000);
client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
Thread.sleep(3000);
client.setData().forPath(path+"/c1","I love you".getBytes());
Thread.sleep(3000);
client.delete().forPath(path+"/c1");
Thread.sleep(3000);
client.delete().forPath(path);
Thread.sleep(10000);
cache.close();
client.close();
}
}
输出以下结果:
=====> NODE_ADDED : /zk-book3 数据:init
=====> INITIALIZED : 初始化
=====> NODE_ADDED : /zk-book3/c1 数据:127.0.0.1
=====> NODE_UPDATED : /zk-book3/c1 数据:I love you
=====> NODE_REMOVED : /zk-book3/c1 数据:I love you
=====> NODE_REMOVED : /zk-book3 数据:init
=> 错误原因:=> 测试异常监听UnhandledErrorListener
可以看到创建/zk-book3的时候触发了NODE_ADDED,同时也会触发一个INITIALIZED事件,
中间的添加修改删除子节点就不说了,看最后一句,在删除/zk-book3节点的时候,我在程序中判断是删除该节点,就会抛出异常,这是专门用来测试UnhandledErrorListener的,当监听执行时,报错就会执行UnhandledErrorListener。