zookeeper应用
发布订阅
zk实现的方式是推拉结合,Client想服务端注册自己需要关注的节点,一旦节点的数据发生变更,那么Server会向对应的客户端发送Watcher事件通知,客户端接收到这个消息后,需要主动到服务端获取最新的数据。
目前很多应用使用发布订阅都不是用zk的这种方式,比较典型的纯的推模式和拉模式,这个之前有记录过Notify和MetaQ的比较,不是本篇的重点。本次主要是利用zookeeper来实现以下发布订阅这种功能。
搭建了一个zk环境,手动创建了一个节点/publish,客户端发布者代码如下:
package com.wpr.zk.pulish;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.util.concurrent.CountDownLatch;/** * 利用zk来模拟发布订阅模式 * Created by peirong.wpr on 2017/4/5. */public class Publish implements Watcher{ private static CountDownLatch latch = new CountDownLatch(1); private static Stat stat = new Stat(); private static ZooKeeper zk =null; private final static Integer SESSION_TIMEOUT = 5000; public static void main(String[] args) { try { String path ="/publish"; zk = new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new Publish()); latch.await(); System.out.println("zk connection"); byte[] temp = zk.getData(path,true,stat); System.out.println("init data :pulish node data"+new String(temp)); int i=0; while(true){ System.out.println( "publish new Data:"+i); zk.setData(path,String.valueOf(i).getBytes(),-1); Thread.sleep(5000L); i++; } } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ System.out.println("receive watched event:"+event); System.out.println(event.getState()); latch.countDown(); } }}
订阅者代码如下:
package com.wpr.zk.pulish;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.util.concurrent.CountDownLatch;/** * Created by peirong.wpr on 2017/4/5. */public class Subscribe implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); private static Stat stat = new Stat(); private static ZooKeeper zk =null; private final static Integer SESSION_TIMEOUT = 5000; public static void main(String[] args) { try { String path ="/publish"; zk = new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new Subscribe()); latch.await(); System.out.println("zk connection"); byte[] temp = zk.getData(path,true,stat); System.out.println("init data :pulish node data"+new String(temp)); int i=0; while(true){ Thread.sleep(Integer.MAX_VALUE); } } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ if(Event.EventType.None == event.getType() && event.getPath() == null){ latch.countDown(); }else if(event.getType() == Event.EventType.NodeDataChanged){ //Clinet需要去拉取最新的数据信息 try { byte[] newByte = zk.getData(event.getPath(),true,stat); System.out.println("path:"+event.getPath()+"\tdata has changed.\t new Data :"+ new String(newByte)); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }}