借助zookeeper的java api浅析zookeeper的典型应用场景

张庭  |  2017. 11. 20   |  阅读 790 次

   平时写业务不免要用到一些分布式的中间件,例如dubbo、Kafka等,它们都有一个共同点,那就是都以zookeeper作为注册中心。由此可见在zookeeper在处理分布式应用中的地位。既然如此,有必要对zookeeper进行一番学习。本人初学zookeeper,刚好看了zookeeper的一些典型应用场景,所以就在此利用zookeeper的java api简单地实现一番。当然要声明的是在zk的正式应用中远比这些demo要做的复杂、考虑的全面的多。但是就像标题所写,这里是“浅析”,这些demo至少能够在凸显原理的基础上具有“浅析”之用,大家看看就好。

首先将下面要用到的一些java api简单介绍一下(相信大家都已经了解了):

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException :创建zookeeper实例,对zookeeper的所有操作都将依赖于该实例。
• connectString--- zookeeper server列表, 以逗号隔开. ZooKeeper对象初始化后, 将从server列表中选择一个server, 并尝试与其建立连接. 如果连接建立失败, 则会从列表的剩余项中选择一个server, 并再次尝试建立连接. • sessionTimeout--- 指定连接的超时时间. • watcher--- 事件回调接口.

String create(String path, byte[] data, List acl, CreateMode createMode) : 创建节点
• path--- znode的路径. • data--- 与znode关联的数据. • acl--- 指定权限信息, 如果不想指定权限, 可以传入Ids.OPENACLUNSAFE. • createMode--- 指定znode类型. CreateMode是一个枚举类, 从中选择一个成员传入即可. 常见有以下几种模式 : PERSISTENT(创建持久化节点) PERSISTENTSEQUENTIAL(创建有序的持久化节点) EPHEMERAL(创建临时节点) EPHEMERALSEQUENTIAL(创建有序的临时节点)。所谓临时节点是指创建该节点的客户端一旦失去连接,则节点自动删除

List getChildren(String path, boolean watch):获取指定节点的所有子节点
• watch--- 参数用于指定是否监听path node的子node的增加和删除事件, 以及path node本身的删除事件.

Stat exists(String path, boolean watch):检测指定节点的状态,可以绑定监控事件
• watch参数用于指定是否监听path node的创建, 删除事件, 以及数据更新事件. 如果该node存在, 则返回该node的状态信息, 否则返回null.

byte[] getData(String path, boolean watch, Stat stat):获取指定节点上的数据
• watch--- 参数用于指定是否监听path node的删除事件, 以及数据更新事件, 注意, 不监听path node的创建事件, 因为如果path node不存在, 该方法将抛出KeeperException.NoNodeException异常. • stat--- 参数是个传出参数, getData方法会将path node的状态信息设置到该参数中.

Stat setData(final String path, byte data[], int version):更新指定节点上存储的数据
• data--- 为待更新的数据. • version--- 参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查.

void delete(final String path, int version):删除指定节点,可以指定删除指定版本

下面以借助这些api来浅析zookeeper的一些典型应用

• 动态配置 由于zookeeper的节点上能够存储数据,如果客户端持续在节点上监听,也能实时地检测到节点上数据的变化。借助这些特性可以利用zookeeper实现简单的远程动态配置管理。所以实现思路就很简单了:设置某个znode作为特定配置的存储节点,所有需要该配置的应用持续监听该znode的状态,一旦数据发生变化则通过watcher回调触发应用内配置同步。简单的关键代码如下:

//获取应用内配置文件的路径
    public static String getFilePath() {
        return System.getProperty("user.dir") + "/src/main/resources/" + FILENAME;

    }


    public static void initPath(String path) {
        try {
            ZooKeeper zooKeeper = getZookeeperClient();
            Stat stat = zooKeeper.exists(path, false);
            if (stat == null) {
                zooKeeper.create(path, path.substring(1).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            zooKeeper.close();
        } catch (IOException e) {
            LOGGER.error(e.getMessage());
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage());
            Thread.currentThread().interrupt();
        } catch (KeeperException e) {
            LOGGER.error(e.getMessage());
        }

    }

//处理zookeeper上配置文件对应节点的更改
    public void handleUpdate() {
        FileWriter fileWriter = null;
        ZooKeeper zk = null;
        try {
            zk = getZookeeperClient();
            byte[] data = zk.getData(PROPERTIES_PATH, false, null);
            String jsonStr = new String(data);
            Map<String, String> map = JSON.parseObject(jsonStr, Map.class);
            StringBuilder sb = new StringBuilder();
            for (Map.Entry<String, String> entry : map.entrySet()) {
                sb.append(entry.getKey() + "=" + entry.getValue()).append("\n");
            }
            fileWriter = new FileWriter(getFilePath());
            fileWriter.write(sb.toString());
        } catch (IOException e) {
            LOGGER.error(e.getMessage());
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage());
            Thread.currentThread().interrupt();
        } catch (KeeperException e) {
            LOGGER.error(e.getMessage());
        } finally {
            try {
                if (fileWriter != null) {
                    fileWriter.close();
                }
            } catch (IOException e) {
                LOGGER.error(e.getMessage());
            }
        }
    }

    //借助zookeeper实现简单的动态配置
    public void testAutoSetPropertiesFile() {

        initPath(PROPERTIES_PATH);
        ZooKeeper zooKeeper = null;
        final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
        try {
            zooKeeper = getZookeeperClient(new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getType() != Event.EventType.None) {
                        LOGGER.info("配置文件发生了改变,开始同步到本地...");
                        handleUpdate();
                        try {
                            queue.put(1);
                        } catch (InterruptedException e) {
                            LOGGER.error(e.getMessage());
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            });
            File file = new File(getFilePath());
            if (!file.exists()) {
                handleUpdate();
            }
            while (zooKeeper.exists(PROPERTIES_PATH, true) != null) {   //此处会持续监听远程配置文件的改动
                queue.take();
            }

        } catch (IOException e) {
            LOGGER.error(e.getMessage());
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage());
            Thread.currentThread().interrupt();
        } catch (KeeperException e) {
            LOGGER.error(e.getMessage());
        }

    }

可以看到,最重要的是在初始化zk时绑定了用于同步配置的Watcher(),然后调用exists(path,isWatch)时设isWatch为true即可使Watcher生效。需要注意的是Watcher一旦被调用一次后即失效,所以在这里循环调用exists(path,isWatch)使Watcher重新生效以达到持续监听的效果。使用了SynchronousQueue用于在远程配置无变化时就阻塞住线程,避免无限循环。 附上测试代码:

@Test
    public void testAutoProperties(){

        final ZookUtil zookUtil = new ZookUtil();
        new Thread(new Runnable() {
            public void run() {
                zookUtil.testAutoSetPropertiesFile();  //开启一个线程用于监听远程配置文件的变化并同步
            }
        }).start();
        new Thread(new Runnable() {   //开启一个线程模拟改变远程配置文件
            public void run() {
                try {
                    System.out.println("开始模拟改变zookeeper中存储的配置文件...");
                    Thread.sleep(10000);
                    ZooKeeper zk = ZookUtil.getZookeeperClient();
                    zk.setData("/props","{\"name\":\"kobe Bryant\"}".getBytes(),-1);
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

其实不必像这里一样专门开一个线程用于修改zk上的配置文件,直接用zk的客户端在命令行上直接修改是比较快捷的测试办法。

• 集群管理 由于zk除了能够监听znode本身的状态外也能监控其子节点的状态,所以利用这一点可以很方便地构建出一个集群模型。利用一台主机持续对集群服务的公用注册节点下的子节点进行监控,这样在公用节点下注册的服务的变动都将被主机检测到。这里为了方便地检测到服务的上下线所以服务都已临时节点的方式注册,这样只要服务一断开则相应节点被删除就可被监控到,新服务的上线同理。关键代码如下:

//用于集群管理的watcher
    public class ClusterWatcher implements Watcher {

        ZooKeeper zooKeeper;
        SynchronousQueue<Integer> lock = new SynchronousQueue<Integer>();

        public ClusterWatcher() {
            try {
                zooKeeper = getZookeeperClient();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void process(WatchedEvent watchedEvent) {
            if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
                try {
                    lock.put(1);
                    synchronized (zooKeeper) {
                        int chidren_actual = zooKeeper.getChildren(CLUSTER_PATH, false).size();
                        int children_before = Integer.valueOf(new String(zooKeeper.getData(
                                CLUSTER_PATH, false, null)));
                        zooKeeper.setData(CLUSTER_PATH, String.valueOf(chidren_actual).getBytes(), -1);
                        if (chidren_actual > children_before) {
                            LOGGER.info("集群中有新服务上线...");
                        } else {
                            LOGGER.info("集群中有服务下线...");
                        }
                    }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void await() throws InterruptedException {
            lock.take();
        }
    }
//zookeeper模拟集群管理
    public void simulateCluster() {

        initPath(CLUSTER_PATH);
        LOGGER.info("模拟集群已初始化完毕,持续监听注册的客户端状态...");
        final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
        try {
            ClusterWatcher clusterWatcher = new ClusterWatcher();
            ZooKeeper zk = getZookeeperClient(clusterWatcher);
            int serverCounts = zk.getChildren(CLUSTER_PATH, false).size();
            zk.setData(CLUSTER_PATH, String.valueOf(serverCounts).getBytes(), -1);
            while (zk.getChildren(CLUSTER_PATH, true) != null) {
                clusterWatcher.await();
            }
        } catch (IOException e) {
            LOGGER.error(e.getMessage());
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage());
            Thread.currentThread().interrupt();
        } catch (KeeperException e) {
            LOGGER.error(e.getMessage());
        }

    }

关键点还是持续监控上,由于每次父节点下的子节点变动触发watcher后就会导致watcher失效。所以还是要在watcher每次被调用后立即重新让它生效,所以上面循环调用zk.getChildren(CLUSTER_PATH,true)。而且上面对服务的上下线的判断依赖于父节点始终存储着子节点变动前的子节点总数,子节点一旦变化则在watcher中将其与实时的子节点总数作对比。其实这里只是浅层次地实现了对集群中服务的在线状态实行监控,对集群的管理远不止这些,包括集群中leader的动态选择,这些大家可以自己去尝试,也比较简单。测试代码就不贴了,就是异步添加和删除多个临时节点的过程。

• 分布式锁 利用zk实现分布式锁也是依赖于znode,加锁解锁往往依赖于节点的创建删除。具体思路如下:

  • 建立一个父节点base_node
  • 每当有应用要获取一个锁时则先到base_node下创建一个有序的临时节点
  • 获取base_node 所有子节点集合并排序选出其中最小的节点,将其与2中创建的节点进行对比,判断最小的节点是不是2创建的节点
  • 如果3判断最小的节点即是2创建的节点则获取锁成功,可以继续去处理加锁的业务;否则获取锁失败,可以阻塞住等待最小的节点离线,即其它应用解锁。
  • 处理完业务后通过删除节点达到解锁的效果。 分析上面的思路可以发现,一旦一个刚创建的节点是最小的节点则加锁成功,这时如果外部有新的节点建立,由于是有序节点则新建立的节点肯定比当前存在的锁节点大,只能等待锁节点的删除才能成为最小的节点。

关键代码如下:

//zookeeper实现分布式锁
    public void simulateLock() {

        initPath(LOCK_PATH);
        try {
            ZooKeeper zk = getZookeeperClient();
            long currentTime = System.currentTimeMillis();
            String newNode = zk.create(LOCK_PATH + "/" + currentTime, String.valueOf(currentTime).getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  //创建锁节点

            List<String> list = zk.getChildren(LOCK_PATH, false);
            String[] nodes = list.toArray(new String[list.size()]);
            Arrays.sort(nodes);
            if (newNode.equals(LOCK_PATH + "/" + nodes[0])) {  //与zk中最小的锁节点比较,相同则获取锁成功
                LOGGER.info("获取锁成功");
                Thread.sleep(5000);
                zk.close();    //由于创建的锁节点是临时节点,所以客户端退出即删除相应节点
                lock_wait.put(1);
            } else {
                LOGGER.info("获取锁失败,持续等待");
                lock_wait.take();
                zk.close();   //退出客户端以删除获取锁失败时创建的节点
                simulateLock();  //尝试重新获取锁
            }
        } catch (IOException e) {
            LOGGER.error(e.getMessage());
        } catch (KeeperException e) {
            LOGGER.error(e.getMessage());
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage());
            Thread.currentThread().interrupt();
        }
    }

需要注意的是lockwait是已经声明的一个静态的SynchronousQueue,一旦线程获取锁失败则通过lockwait.take()阻塞住,一旦线程释放锁完成则通过lock.wait.put()唤醒其它获取锁陷入阻塞的线程,之后这些线程可以重试锁。 测试代码如下:

@Test
    public void testLock(){
        final ZookUtil zookUtil = new ZookUtil();
        new Thread(new Runnable() {    //开启一个获取锁的线程
            public void run() {
                zookUtil.simulateLock();
            }
        }).start();

        new Thread(new Runnable() {    //开启一个获取锁的线程
            public void run() {
                zookUtil.simulateLock();
            }
        }).start();

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

• 分布式阻塞队列  这里实现一个分布式的生产者-消费者模型的阻塞队列。原理比较简单,建立一个队列的父节点,队列中的元素都将存储于其子节点中,入队则是新建子节点,出队则是删除子节点。创建的子节点都是有序节点,这样保证队列中元素的有序性。当入队时先检测当前父节点下的子节点数是否已经超出队列的最大容量,如果是则阻塞住等待元素的出队。出队时先检测父节点的子节点集是否为空,如果是则阻塞住等待元素的入队。关键代码如下:

//用于分布式队列的watcher
    public class QueueWatcher implements Watcher {

        CountDownLatch latch;

        public QueueWatcher() {
            latch = new CountDownLatch(1);
        }

        public void process(WatchedEvent watchedEvent) {
            if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
                LOGGER.info("队列成员发生变更...");
                latch.countDown();
            }
        }

        public void await() throws InterruptedException {
            latch.await();
        }
    }

 //zookeeper实现阻塞队列的生产者
    public void simulateProducer() {

        initPath(QUEUE_PATH);
        try {
            ZooKeeper zk = getZookeeperClient();
            while (zk.exists(QUEUE_PATH, false) != null) {
                QueueWatcher watcher = new QueueWatcher();
                if (zk.getChildren(QUEUE_PATH, watcher).size() >= queueSize) {
                    LOGGER.info("由于队列已满,进入阻塞状态...");
                    watcher.await();
                }
                zk.create(QUEUE_PATH + "/elem-", String.valueOf(System.currentTimeMillis()).getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            }
        } catch (IOException e) {
            LOGGER.error(e.getMessage());
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage());
        } catch (KeeperException e) {
            LOGGER.error(e.getMessage());
        }

    }

//zookeeper实现阻塞队列的消费者
    public void simulateCustomer() {

        initPath(QUEUE_PATH);
        try {
            ZooKeeper zk = getZookeeperClient();
            while (zk.exists(QUEUE_PATH, false) != null) {
                QueueWatcher watcher = new QueueWatcher();
                List<String> nodes = zk.getChildren(QUEUE_PATH, watcher);
                if (nodes.isEmpty()) {
                    LOGGER.info("由于队列已空,消费者线程进入阻塞状态...");
                    watcher.await();
                    continue;
                } else {
                    String[] products = nodes.toArray(new String[nodes.size()]);
                    Arrays.sort(products);
                    String path = QUEUE_PATH + "/" + products[0];
                    LOGGER.info("模拟处理队列{}中的{}元素对应的数据", QUEUE_PATH, path);
                    Thread.sleep(5000);
                    zk.delete(path, -1);
                    LOGGER.info("处理完后从队列{}移除元素{}", QUEUE_PATH, path);
                }

            }
        } catch (IOException e) {
            LOGGER.error(e.getMessage());
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage());
        } catch (KeeperException e) {
            LOGGER.error(e.getMessage());
        }

    }

需要注意的是消费者每次在出队操作时总是选取父节点下最小的子节点,同时每次发生入队或出队操作都将触发父节点上设置的watcher以唤醒阻塞的生产者或消费者线程。测试代码如下:

@Test
    public void testDistributedBlockingQueue(){
        ZookUtil zookUtil = new ZookUtil();
        zookUtil.simulateProducer();   //开启producer线程
        zookUtil.simulateCustomer();   //开启customer线程
    }

分享到

   
快速了解pwa