ZooKeeper客户端操作

ls、create、get、set、stat、delete和始终保持循环注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.atguigu.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class ZkClient {
private ZooKeeper zkCli;
private static final String CONNECT_STRING = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
private static final int SESSION_TIMEOUT = 2000;

@Before
public void before() throws IOException {
zkCli = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("默认回调函数");
}
});
}

@Test
public void ls() throws KeeperException, InterruptedException {
//List<String> children = zkCli.getChildren("/", true);
List<String> children = zkCli.getChildren("/", event -> {
System.out.println("自定义回调函数");
});
System.out.println("====================================");
for (String child : children) {
System.out.println(child);
}
System.out.println("=======================================");
Thread.sleep(Long.MAX_VALUE);
}

@Test
public void create() throws KeeperException, InterruptedException {
String s = zkCli.create("/Idea", "Idea2018".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(s);
Thread.sleep(Long.MAX_VALUE);
}

@Test
public void get() throws KeeperException, InterruptedException {
byte[] data = zkCli.getData("/zxx0000000007", true, new Stat());
String string = new String(data);
System.out.println(string);
}

@Test
public void set() throws KeeperException, InterruptedException {
Stat exists = zkCli.exists("/zxx0000000007", false);
if(exists != null) {
Stat stat = zkCli.setData("/zxx0000000007", "hello,world".getBytes(), exists.getVersion());
//这里的版本号是为了防止并发写带来的误删的数据安全问题
System.out.println(stat.getDataLength());
}
}

@Test
public void stat() throws KeeperException, InterruptedException {
Stat exists = zkCli.exists("/hello", false);
if(exists == null)
System.out.println("节点不存在");
else
System.out.println(exists.getDataLength());
}

@Test
public void delete() throws KeeperException, InterruptedException {
Stat exists = zkCli.exists("/zxx0000000007", false);
if(exists != null){
zkCli.delete("/zxx0000000007",exists.getVersion());
}
}

public void register() throws KeeperException, InterruptedException {//循环注册
byte[] data = zkCli.getData("/a", new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
register();//自定义Watcher循环调用
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, null);
System.out.println(new String(data));
}

@Test
public void testRegister(){
try {
register();
Thread.sleep(Long.MAX_VALUE);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}