深入理解与应用Zookeeper(三)Java客户端

深入理解与应用Zookeeper(三)Java客户端

1. zk原生客户端

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.5</version>
</dependency>

image.png

image.png

image.png

image.png

代码示例

//创建会话
public class CreateSession {

    private static final String CONNECT_URL = "192.168.71.128:2181";
    //为了防止zookeeper未创建成功就执行操作 需要将创建过程阻塞 待创建连接成功后再进行操作
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper(CONNECT_URL, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected){
                    System.out.println(event.getState());
                    countDownLatch.countDown();
                }
                //如果节点发生变化 触发watcher机制
                if (event.getType() == Event.EventType.NodeDataChanged){
                    System.out.println("节点发生变化了,路径为:" + event.getPath());
                }
            }
        });
        countDownLatch.await();
        System.out.println(zooKeeper.getState());
        //1.创建节点 节点路径,节点值,ACL,节点类型
        //zooKeeper.create("/amos2", "amos2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        //2.获取数据
        //stat封装了事务的详细信息
        Stat stat = new Stat();
        //watch:true 是否启用监听 当数据发生改变时会通知客户端
        byte[] data = zooKeeper.getData("/amos1", true, stat);
        String result = new String(data);
        System.out.println(result);
        System.out.println(stat);

        //3.修改数据
        //开启watch监听 当有数据改变时会调用process方法
        zooKeeper.getData("/amos1", true, null);
        //-1表示不进行版本控制
        zooKeeper.setData("/amos1", "hello world2".getBytes(), -1);
        //4.获取子节点的信息
        List<String> children = zooKeeper.getChildren("/amos1", true);
        System.out.println(children);
    }

}

2. zkClient

<dependency>
     <groupId>com.101tec</groupId>
     <artifactId>zkclient</artifactId>
     <version>0.11</version>
</dependency>

image.png

image.png

image.png

代码示例

public class SessionDemo {

    private static final String CONNECT_URL = "192.168.71.128:2181";

    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient(CONNECT_URL, 5000);
        System.out.println(zkClient + "- > success");
    }
}
public class ZkClientDemo {

    private static final String CONNECT_URL = "192.168.71.128:2181";

    public static ZkClient getConn(){
        return new ZkClient(CONNECT_URL, 5000);
    }

    public static void main(String[] args) throws InterruptedException {
        ZkClient zkClient = getConn();
        //1.递归创建父节点的功能 true表示如果父节点不存在 则创建父节点
        zkClient.createPersistent("/test01/test001/test001", true);
        //2.递归删除节点
        zkClient.deleteRecursive("/test01");
        //3.获取子节点
        List<String> children = zkClient.getChildren("/amos1");
        System.out.println(children);
        //4.watcher监听节点数据变化
        zkClient.subscribeDataChanges("/amos1", new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("节点数据发生了变化,路径为:{" + dataPath + "}");
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {

            }
        });
        //修改数据 测试watcher是否能监听成功
        zkClient.writeData("/amos1", "hahhaha");
        Thread.sleep(2000);
        //再次修改 测试watcher是否能重复监听
        zkClient.writeData("/amos1", "hahahahha2");
        //5.监听子节点变化(增加删除子节点等操作)
        zkClient.subscribeChildChanges("/node11", new IZkChildListener() {
            @Override
            public void handleChildChange(String s, List<String> list) throws Exception {
                System.out.println("节点名称:"+s+"->"+"当前的节点列表:"+list);
            }
        });

        zkClient.deleteRecursive("/amos1/test1");;
        TimeUnit.SECONDS.sleep(2);
    }
}

3. curator

image.png

image.png

image.png

image.png

 <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.2.0</version>
</dependency>
public class CuratorClientUtils {

    private static CuratorFramework curatorFramework;
    private static final String CONNECT_URL = "192.168.71.128:2181";


    public static CuratorFramework getInstance(){
        curatorFramework= CuratorFrameworkFactory.
                newClient(CONNECT_URL,5000,5000,
                        new ExponentialBackoffRetry(1000,3));
        curatorFramework.start();
        return curatorFramework;
    }
}
public class CuratorEventDemo {

    /**
     * 三种watcher来做节点的监听
     * pathcache   监视一个路径下子节点的创建、删除、节点数据更新
     * NodeCache   监视一个节点的创建、更新、删除
     * TreeCache   pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件),
     * 缓存路径下的所有子节点的数据
     */

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework=CuratorClientUtils.getInstance();

        /**
         * 节点变化NodeCache
         */
       /* NodeCache cache=new NodeCache(curatorFramework,"/curator",false);
        cache.start(true);

        cache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" +
                ":"+new String(cache.getCurrentData().getData())));

        curatorFramework.setData().forPath("/curator","菲菲".getBytes());*/


        /**
         * PatchChildrenCache
         */

        PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        // Normal / BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT

        cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
            switch (pathChildrenCacheEvent.getType()){
                case CHILD_ADDED:
                    System.out.println("增加子节点");
                    break;
                case CHILD_REMOVED:
                    System.out.println("删除子节点");
                    break;
                case CHILD_UPDATED:
                    System.out.println("更新子节点");
                    break;
                default:break;
            }
        });

        curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
        TimeUnit.SECONDS.sleep(1);
        System.out.println("1");
        curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());
        TimeUnit.SECONDS.sleep(1);
        System.out.println("2");

        curatorFramework.setData().forPath("/event/event1","222".getBytes());
        TimeUnit.SECONDS.sleep(1);
        System.out.println("3");

        curatorFramework.delete().forPath("/event/event1");
        System.out.println("4");

        System.in.read();

    }
}


public class CuratorOperatorDemo {

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
        System.out.println("连接成功.........");

        //fluent风格

        /**
         * 创建节点
         */
        curatorFramework.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/curator/test01", "curator01".getBytes());
        System.out.println("success");

        /**
         * 删除节点
         */
        //默认情况下,version为-1
        curatorFramework.delete().deletingChildrenIfNeeded().forPath("/curator");

        /**
         * 查询
         */

        Stat stat = new Stat();
        try {
            byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/node11");
            System.out.println(new String(bytes) + "-->stat:" + stat);
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 更新
         */
        try {
            stat = curatorFramework.setData().forPath("/node11", "123".getBytes());
            System.out.println(stat);
        } catch (Exception e) {
            e.printStackTrace();
        }


        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService service = Executors.newFixedThreadPool(1);
        //创建节点
        curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                //当节点创建成功的时候 在后台使用service这个线程异步调用processResult方法
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        System.out.println(Thread.currentThread().getName() + "resultCode:" + curatorEvent.getResultCode() + "type:" + curatorEvent.getType());
                        countDownLatch.countDown();
                    }
                }, service).forPath("/amosw", "amosw".getBytes());
        countDownLatch.await();
        //关闭service线程
        service.shutdown();
        /**
         * 异步操作
         */
        ExecutorService service= Executors.newFixedThreadPool(1);
        CountDownLatch countDownLatch=new CountDownLatch(1);
        try {
            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).
                    inBackground(new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                            System.out.println(Thread.currentThread().getName()+"->resultCode:"+curatorEvent.getResultCode()+"->"
                            +curatorEvent.getType());
                            countDownLatch.countDown();
                        }
                    },service).forPath("/enjoy","deer".getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
        countDownLatch.await();
        service.shutdown();

        /**
         * 事务操作(curator独有的)
         */
        try {
            //先创建节点 再设置值 两个操作放入到同一个事物当中
            Collection<CuratorTransactionResult> resultCollections = curatorFramework.inTransaction().create().forPath("/demo1", "111".getBytes()).and().
                    setData().forPath("/demo1", "111".getBytes()).and().commit();
            for (CuratorTransactionResult result : resultCollections) {
                System.out.println(result.getForPath() + "->" + result.getType());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class SessionDemo {

    private static final String CONNECT_URL = "192.168.71.128:2181";

    public static void main(String[] args) {
        //创建会话的两种方式 normal
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_URL, 5000, 5000,
                //第一次1s后重试 一共重试三次 每次时间递增
                new ExponentialBackoffRetry(1000, 3));
        curatorFramework.start();
        //fluent风格
        CuratorFramework curatorFramework1 = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_URL)
                .sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        curatorFramework.start();
    }
}



评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×