ZooKeeper(四)-- 第三方客户端 ZkClient的使用

前言

zkClient主要做了两件事情:

  • 一件是在session loss和session expire时自动创建新的ZooKeeper实例进行重连。
  • 另一件是将一次性watcher包装为持久watcher。后者的具体做法是简单的在watcher回调中,重新读取数据的同时再注册相同的watcher实例。

zkClient目前已经运用到了很多项目中,知名的有Dubbo、Kafka、Helix。

zkClient jar包下载,或者直接添加maven依赖: http://mvnrepository.com/artifact/com.101tec/zkclient

正文

直接上代码:

1.测试类  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.xbq.demo;
import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.IZkDataListener;

/**
* @ClassName: ZkClientDemo
* @Description: TODO(zkClient 测试)
* @author xbq
* @date 2017-3-26 上午11:49:39
*/
public class ZkClientDemo {

// 此demo使用的集群,所以有多个ip和端口
private static String CONNECT_SERVER = "192.168.99.138:2181,192.168.99.138:2182,192.168.99.138:2183";
private static int SESSION_TIMEOUT = 3000;
private static int CONNECTION_TIMEOUT = 3000;
private static ZkClient zkClient ;

static {
zkClient = new ZkClient(CONNECT_SERVER, SESSION_TIMEOUT,CONNECTION_TIMEOUT,new MyZkSerializer());
}

public static void main(String[] args) {

add(zkClient);
// update(zkClient);
// delete(zkClient);

// addDiGui(zkClient);
// deleteDiGui(zkClient);

// subscribe(zkClient);
}

/**
* @Title: add
* @Description: TODO(增加一个指定节点)
* @param @param zkClient 设定文件
* @return void 返回类型
* @throws
*/
public static void add(ZkClient zkClient){
// 如果不存在节点,就新建一个节点
if(!zkClient.exists("/config")){
zkClient.createPersistent("/config","javaCoder");
}
// 查询一下,看是否增加成功
String value = zkClient.readData("/config");
System.out.println("value===" + value);
}

/**
* @Title: addSequential
* @Description: TODO(递归创建节点)
* @param @param zkClient 设定文件
* @return void 返回类型
* @throws
*/
public static void addDiGui(ZkClient zkClient){
// 递归创建节点
zkClient.createPersistent("/xbq/java/coder", true);
if(zkClient.exists("/xbq/java/coder")){
System.out.println("增加成功!");
}else {
System.out.println("增加失败!");
}
}

/**
* @Title: delete
* @Description: TODO(删除指定节点)
* @param @param zkClient 设定文件
* @return void 返回类型
* @throws
*/
public static void delete(ZkClient zkClient){
// 存在节点才进行删除
if(zkClient.exists("/config")){
boolean flag = zkClient.delete("/config");
System.out.println("删除" + (flag == true ? "成功!" : "失败!"));
}
}

/**
* @Title: deleteDiGui
* @Description: TODO(递归删除)
* @param @param zkClient 设定文件
* @return void 返回类型
* @throws
*/
public static void deleteDiGui(ZkClient zkClient){
// 存在节点才进行删除
if(zkClient.exists("/xbq")){
// 递归删除的时候 只传入 父节点就可以,如果传入 全部的节点,虽然返回的是true,但是依然是没有删除的,
// 因为zkClient将异常封装好了,进入catch的时候,会返回true,这是一个坑
boolean flag = zkClient.deleteRecursive("/xbq");
System.out.println("删除" + (flag == true ? "成功!" : "失败!"));
}
}

/**
* @Title: update
* @Description: TODO(修改节点的值)
* @param @param zkClient 设定文件
* @return void 返回类型
* @throws
*/
public static void update(ZkClient zkClient){
if(zkClient.exists("/config")){
zkClient.writeData("/config", "testUpdate");
// 查询一下,看是否修改成功
String value = zkClient.readData("/config");
System.out.println("value===" + value);
}
}

/**
* @Title: subscribe
* @Description: TODO(事件订阅, 可用于配置管理)
* 先订阅,再 操作增删改。(可多个 客户端订阅)
* @param @param zkClient 设定文件
* @return void 返回类型
* @throws
*/
public static void subscribe(ZkClient zkClient){
zkClient.subscribeDataChanges("/config/userName", new IZkDataListener() {
@Override
public void handleDataDeleted(String arg0) throws Exception {
System.out.println("触发了删除事件:" + arg0);
}

@Override
public void handleDataChange(String arg0, Object arg1) throws Exception {
System.out.println("触发了改变事件:" + arg0 + "-->" + arg1);
}
});

try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}

2.自定义序列化类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.xbq.demo;

import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;

import java.io.UnsupportedEncodingException;

/**
* @ClassName: MyZkSerializer
* @Description: TODO(实现序列化接口,转为UTF-8编码)
* @author xbq
* @date 2017-3-26 上午11:56:22
*/
public class MyZkSerializer implements ZkSerializer{
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
try {
return String.valueOf(data).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}

@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}

源码下载

点击阅读原文下载源码哦

坚持原创技术分享,您的支持将鼓励我继续创作!
-------------本文结束感谢您的阅读-------------
分享到:
0%