博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
利用zookeeper实现发布订阅模式
阅读量:5277 次
发布时间:2019-06-14

本文共 4107 字,大约阅读时间需要 13 分钟。

zookeeper应用

发布订阅

zk实现的方式是推拉结合,Client想服务端注册自己需要关注的节点,一旦节点的数据发生变更,那么Server会向对应的客户端发送Watcher事件通知,客户端接收到这个消息后,需要主动到服务端获取最新的数据。

目前很多应用使用发布订阅都不是用zk的这种方式,比较典型的纯的推模式和拉模式,这个之前有记录过Notify和MetaQ的比较,不是本篇的重点。本次主要是利用zookeeper来实现以下发布订阅这种功能。

搭建了一个zk环境,手动创建了一个节点/publish,客户端发布者代码如下:

package com.wpr.zk.pulish;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.util.concurrent.CountDownLatch;/** * 利用zk来模拟发布订阅模式 * Created by peirong.wpr on 2017/4/5. */public class Publish implements Watcher{    private static CountDownLatch latch =  new CountDownLatch(1);    private static Stat stat = new Stat();    private static ZooKeeper zk =null;    private final static Integer  SESSION_TIMEOUT = 5000;    public static void main(String[] args) {        try {            String path  ="/publish";             zk =  new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new Publish());            latch.await();            System.out.println("zk connection");            byte[]  temp = zk.getData(path,true,stat);            System.out.println("init data :pulish node data"+new String(temp));            int i=0;            while(true){                System.out.println( "publish new Data:"+i);                zk.setData(path,String.valueOf(i).getBytes(),-1);                Thread.sleep(5000L);                i++;            }        } catch (InterruptedException e) {            e.printStackTrace();        } catch (KeeperException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }    }    public void process(WatchedEvent event) {        if(Event.KeeperState.SyncConnected == event.getState()){            System.out.println("receive watched event:"+event);            System.out.println(event.getState());            latch.countDown();        }    }}

订阅者代码如下:

package com.wpr.zk.pulish;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.util.concurrent.CountDownLatch;/** * Created by peirong.wpr on 2017/4/5. */public class Subscribe  implements Watcher {    private static CountDownLatch latch =  new CountDownLatch(1);    private static Stat stat = new Stat();    private static ZooKeeper zk =null;    private final static Integer  SESSION_TIMEOUT = 5000;    public static void main(String[] args) {        try {            String path  ="/publish";            zk =  new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new Subscribe());            latch.await();            System.out.println("zk connection");            byte[]  temp = zk.getData(path,true,stat);            System.out.println("init data :pulish node data"+new String(temp));            int i=0;            while(true){                Thread.sleep(Integer.MAX_VALUE);            }        } catch (InterruptedException e) {            e.printStackTrace();        } catch (KeeperException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }    }    public void process(WatchedEvent event) {        if(Event.KeeperState.SyncConnected == event.getState()){            if(Event.EventType.None == event.getType() && event.getPath() == null){                latch.countDown();            }else if(event.getType()  == Event.EventType.NodeDataChanged){                //Clinet需要去拉取最新的数据信息                try {                    byte[] newByte = zk.getData(event.getPath(),true,stat);                    System.out.println("path:"+event.getPath()+"\tdata has changed.\t new Data :"+ new String(newByte));                } catch (KeeperException e) {                    e.printStackTrace();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }    }}

转载于:https://www.cnblogs.com/kakaxisir/p/6667279.html

你可能感兴趣的文章
关于vue的npm run dev和npm run build
查看>>
Hive架构
查看>>
EJBCA安装教程+postgresql+wildfly10
查看>>
(五十四)涂鸦的实现和截图的保存
查看>>
关于微信暴力加很申请
查看>>
06享元、责任链
查看>>
ubuntu如何部署tftp服务
查看>>
【Alpha版本】冲刺阶段——Day 8
查看>>
解决CentOS6.x或RedHat Linux 6.x版本不能通过System eth0以固定IP访问外网的问题
查看>>
(转)Expression Tree不完全入门
查看>>
Struts2的工作原理
查看>>
配置EditPlus使其可以编译运行java程序
查看>>
我眼中的Android IDE
查看>>
C++默认参数值函数
查看>>
java中的占位符\t\n\r\f
查看>>
7.14
查看>>
SDN2017 第一次作业
查看>>
MySQL通过frm 和 ibd 恢复数据过程
查看>>
AngularJs 学习笔记(2)
查看>>
关于元素优先级
查看>>