1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| package com.atguigu.zookeeper;
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test;
import java.io.IOException; import java.util.List;
public class ZkClient { private ZooKeeper zkCli; private static final String CONNECT_STRING = "hadoop101:2181,hadoop102:2181,hadoop103:2181"; private static final int SESSION_TIMEOUT = 2000;
@Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("默认回调函数"); } }); }
@Test public void ls() throws KeeperException, InterruptedException { //List<String> children = zkCli.getChildren("/", true); List<String> children = zkCli.getChildren("/", event -> { System.out.println("自定义回调函数"); }); System.out.println("===================================="); for (String child : children) { System.out.println(child); } System.out.println("======================================="); Thread.sleep(Long.MAX_VALUE); }
@Test public void create() throws KeeperException, InterruptedException { String s = zkCli.create("/Idea", "Idea2018".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(s); Thread.sleep(Long.MAX_VALUE); }
@Test public void get() throws KeeperException, InterruptedException { byte[] data = zkCli.getData("/zxx0000000007", true, new Stat()); String string = new String(data); System.out.println(string); }
@Test public void set() throws KeeperException, InterruptedException { Stat exists = zkCli.exists("/zxx0000000007", false); if(exists != null) { Stat stat = zkCli.setData("/zxx0000000007", "hello,world".getBytes(), exists.getVersion()); //这里的版本号是为了防止并发写带来的误删的数据安全问题 System.out.println(stat.getDataLength()); } }
@Test public void stat() throws KeeperException, InterruptedException { Stat exists = zkCli.exists("/hello", false); if(exists == null) System.out.println("节点不存在"); else System.out.println(exists.getDataLength()); }
@Test public void delete() throws KeeperException, InterruptedException { Stat exists = zkCli.exists("/zxx0000000007", false); if(exists != null){ zkCli.delete("/zxx0000000007",exists.getVersion()); } }
public void register() throws KeeperException, InterruptedException {//循环注册 byte[] data = zkCli.getData("/a", new Watcher() { @Override public void process(WatchedEvent event) { try { register();//自定义Watcher循环调用 } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }, null); System.out.println(new String(data)); }
@Test public void testRegister(){ try { register(); Thread.sleep(Long.MAX_VALUE); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }
}
|