Curator典型应用场景之-事件监听

Curator典型应用场景之-事件监听

Zookeeper原生就支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册Watcher,比较繁琐。Curator引入了Cache来实现对zookeeper服务端事件的监听,Cache是Curator中对事件的包装,其对事件的监听其实可以近似的看做是一个本地缓存视图和远程Zookeeper视图的对比过程。同时Curator能够自动为开发人员处理反复注册监听,从而大大简化原生API开发的繁琐过程。Cache分为NodeCache,PathChildrenCache和TreeCache。

NodeCache
NodeCache用于监听指定ZooKeeper数据节点本身的变化,其构造方法有如下两个:

public NodeCache(CuratorFramework client, String path);
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

NodeCache构造方法参数说明如下:

client:Curator客户端实例
path:数据节点的节点路径
dataIsCompressed:是否进行数据压缩

同时NodeCache定义了事件处理的回调接口NodeCacheListener如下:

public interface NodeCacheListener{
    /**
     * Called when a change has occurred
     */
    public void nodeChanged() throws Exception;
}

当ZNode的内容发生变化时,就会回调该方法。下面通过一个实际的例子来看如何在代码中使用NodeCache。

public class NodeCache_Sample {

    static String path = "/zk-book/nodecache";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.58.42:2181")
            .sessionTimeoutMs(60000)
            .connectionTimeoutMs(15000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path,"init".getBytes());
        final NodeCache cache = new NodeCache(client,path,false);
        cache.start(true);
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData currentData = cache.getCurrentData();
                String data = currentData == null ? "" : new String(currentData.getData());
                System.out.println("=====> Node data update, new Data: "+data);
            }
        });

        client.setData().forPath(path,"i love you".getBytes());
        Thread.sleep(1000);
        client.delete().deletingChildrenIfNeeded().forPath(path);
        Thread.sleep(10000);
        cache.close();
        client.close();
    }

}

程序运行输出以下内容:

=====> Node data update, new Data: i love you
=====> Node data update, new Data:

NodeCache使用start()方法开启缓存,使用完后调用close()方法关闭它。在上面的例子中,首先构造了一个NodeCache实例,然后再调用start方法,该方法有一个Boolean类型的参数,默认是false,如果设置为true,那么NodeCache在第一次启动的时候就会立刻从ZooKeeper上读取该节点的数据内容,并保存在Cache中。

NodeCache不仅可以用于监听数据节点的内容变更,也可能监听指定节点是否存在。如果原节点不存在,那么Cache就会在节点被创建后触发NodeCacheListenter。节点的删除操作也会触发NodeCacheListenter有很多文章说删除不会触发其实是错误的,我测试的时候是可以触发的。

PathChildrenCache
PathChildrenCache是用来监听指定节点 的子节点变化情况,共有以下几种构造方法(不包括过时的方法):

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService);

PathChildrenCache构造方法参数说明:

client: Curator客户端实例
path:数据节点路径
cacheData:用于配置是否把节点内容缓存起来,如果配置为true。那么客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容;如果配置为false则无法获取到节点的数据内容。
dataIsCompressed:是否进行数据压缩
threadFactory:利于这个参数,开发者可以通过构造一个专门的线程池,来处理事件通知
executorService:自定义线程池

同时PathChildrenCache定义了事件处理的回调接口PathChildrenCacheListener,其定义如下:

public interface PathChildrenCacheListener{
    /**
     * Called when a change has occurred
     *
     * @param client the client
     * @param event describes the change
     * @throws Exception errors
     */
    public void     childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}

当指定节点的子节点发生变化时,就会回调该方法。PathChildrenCacheListener类中定义了所有的事件类型,主要包括新增子节点(CHILD_ADDED)、子节点数据变更(CHILD_UPDATED)、和子节点的删除(CHILD_REMOVED)三类。
下面通过一个实际例子来看看如何在代码中使用PathChildrenCache。

public class PathChildrenCache_Sample {
    static String path = "/zk-book2";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.58.42:2181")
            .sessionTimeoutMs(60000)
            .connectionTimeoutMs(15000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client,path,true);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("=====> CHILD_ADDED : "+ event.getData().getPath() +"  数据:"+ new String(event.getData().getData()));
                        break;
                    case CHILD_REMOVED:
                        System.out.println("=====> CHILD_REMOVED : "+ event.getData().getPath() +"  数据:"+ new String(event.getData().getData()));
                        break;
                    case CHILD_UPDATED:
                        System.out.println("=====> CHILD_UPDATED : "+ event.getData().getPath() +"  数据:"+ new String(event.getData().getData()));
                        break;
                    default:
                        break;
                }
            }
        });
        client.create().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
        Thread.sleep(1000);
        client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
        Thread.sleep(1000);
        client.setData().forPath(path+"/c1","I love you".getBytes());
        Thread.sleep(1000);
        client.delete().forPath(path+"/c1");
        Thread.sleep(1000);
        client.delete().forPath(path);
        Thread.sleep(10000);
        cache.close();
        client.close();
    }
}

程序运行输出以下内容:

=====> CHILD_ADDED : /zk-book2/c1 数据:127.0.0.1
=====> CHILD_UPDATED : /zk-book2/c1 数据:I love you
=====> CHILD_REMOVED : /zk-book2/c1 数据:I love you

在上面的示例程序中,对/zk-book2节点进行了子节点变更事件的监听,一旦该节点新增/删除子节点或者子节点的数据发生改变时,就会回调PathChildrenCacheListener,并根据对应的事件类型进行相关的处理。同时,我们看到,对于节点/zk-book2本身的变更,并没有通知到客户端。

另外,和其他ZooKeeper客户端产品一样,Curator也无法对二级子节点进行事件监听。也就是说,如果使用PathChildrenCacheListener对/zk-book2进行监听,那么当/zk-book/c1/c2节点被创建或者删除的时候,是无法触发子节点变更事件的。

TreeCache
结合NodeCache和PathChildrenCahce的特性,是对整个目录进行监听。共有以下几种构造方法:

public TreeCache(CuratorFramework client, String path);
TreeCache只有一个构造方法,其内部还有一个默认访问修饰符修饰的构造方法,如下:
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector);

TreeCache构造方法参数说明:

client: Curator客户端实例
path:数据节点路径
cacheData:用于配置是否把节点内容缓存起来,如果配置为true。那么客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容;如果配置为false则无法获取到节点的数据内容。
dataIsCompressed:是否进行数据压缩
maxDepth:比如当前监听节点/t1,目录最深为/t1/t2/t3/t4,则maxDepth=3,说明下面3级子目录全监听,即监听到t4,如果为2,则监听到t3,对t3的子节点操作不再触发,默认maxDepth最大值2147483647
executorService:自定义线程池
createParentNodes:如果父节点不存在,是否需要创建父节点,默认情况下,不会自动创建path的父节点
TreeCacheSelector:用于区分哪些节点作为缓存使用

下面通过一个实际例子来看看如何在代码中使用TreeCache。

public class TreeCache_Sample {

    static String path = "/zk-book3";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.58.42:2181")
            .sessionTimeoutMs(60000)
            .connectionTimeoutMs(15000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();
        TreeCache cache = new TreeCache(client,path);

        cache.start();

        //添加错误监听器
        cache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
            @Override
            public void unhandledError(String s, Throwable throwable) {
                System.out.println("======>  错误原因:" + throwable.getMessage());
            }
        });

        //节点变化的监听器
        cache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                switch (treeCacheEvent.getType()) {
                    case INITIALIZED:
                        System.out.println("=====> INITIALIZED :  初始化");
                        break;
                    case NODE_ADDED:
                        System.out.println("=====> NODE_ADDED : "+ treeCacheEvent.getData().getPath() +"  数据:"+ new String(treeCacheEvent.getData().getData()));
                        break;
                    case NODE_REMOVED:
                        System.out.println("=====> NODE_REMOVED : "+ treeCacheEvent.getData().getPath() +"  数据:"+ new String(treeCacheEvent.getData().getData()));
                        if("/zk-book3".equals(treeCacheEvent.getData().getPath())){
                            throw new RuntimeException("=====> 测试异常监听UnhandledErrorListener");
                        }
                        break;
                    case NODE_UPDATED:
                        System.out.println("=====> NODE_UPDATED : "+ treeCacheEvent.getData().getPath() +"  数据:"+ new String(treeCacheEvent.getData().getData()));
                        break;
                    default:
                        System.out.println("=====> treeCache Type:" + treeCacheEvent.getType());
                        break;
                }
            }
        });
        client.create().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
        Thread.sleep(3000);
        client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
        Thread.sleep(3000);
        client.setData().forPath(path+"/c1","I love you".getBytes());
        Thread.sleep(3000);
        client.delete().forPath(path+"/c1");
        Thread.sleep(3000);
        client.delete().forPath(path);
        Thread.sleep(10000);
        cache.close();
        client.close();
    }
}

输出以下结果:

=====> NODE_ADDED : /zk-book3 数据:init
=====> INITIALIZED : 初始化
=====> NODE_ADDED : /zk-book3/c1 数据:127.0.0.1
=====> NODE_UPDATED : /zk-book3/c1 数据:I love you
=====> NODE_REMOVED : /zk-book3/c1 数据:I love you
=====> NODE_REMOVED : /zk-book3 数据:init
=> 错误原因:=> 测试异常监听UnhandledErrorListener

可以看到创建/zk-book3的时候触发了NODE_ADDED,同时也会触发一个INITIALIZED事件,

中间的添加修改删除子节点就不说了,看最后一句,在删除/zk-book3节点的时候,我在程序中判断是删除该节点,就会抛出异常,这是专门用来测试UnhandledErrorListener的,当监听执行时,报错就会执行UnhandledErrorListener。