ZooKeeper(五)-- Curator使用

前言

Curator是Netflix开源的一套ZooKeeper客户端框架:

  • 1.封装ZooKeeper client与ZooKeeper server之间的连接处理;
  • 提供了一套Fluent风格的操作API;
  • 提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装。

Curator几个组成部分:

  • Client:是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法
  • Framework: 用来简化ZooKeeper高级功能的使用, 并增加了一些新的功能, 比如管理到ZooKeeper集群的连接, 重试处理
  • Recipes:实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上
  • Utilities:各种ZooKeeper的工具类
  • Errors: 异常处理, 连接, 恢复等
  • Extensions: recipe扩展

Curator内部实现的几种重试策略:

  • ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加
  • RetryNTimes:指定最大重试次数的重试策略
  • RetryOneTime:仅重试一次
  • RetryUntilElapsed:一直重试直到达到规定的时间

正文

1.项目使用maven工程,在pom.xml中添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.5.0</version>
</dependency>

2.下面代码从增删改查、事务、事件订阅/监听器来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package om.xbq.demo;

import java.util.Collection;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;

public class CuratorDemo {

// 此demo使用的集群,所以有多个ip和端口
private static String CONNECT_SERVER = "192.168.242.129:2181,192.168.242.129:2182,192.168.242.129:2183";
private static int SESSION_TIMEOUT = 3000;
private static int CONNECTION_TIMEOUT = 3000;

public static void main(String[] args) {
// 连接 ZooKeeper
CuratorFramework framework = CuratorFrameworkFactory.
newClient(CONNECT_SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, new ExponentialBackoffRetry(1000,10));
// 启动
framework.start();

Stat stat = ifExists(framework);

if(stat != null){
// update(framework);
// delete(framework);
// query(framework);

// 监听事件,只监听一次,不推荐
// listener1(framework);
}else {
// add(framework);
}

// 事务
// transaction(framework);

// 持久监听,推荐使用
listener2(framework);
}

/**
* 判断节点是否存在
* @param cf
* @return
*/
public static Stat ifExists(CuratorFramework cf){
Stat stat = null;
try {
stat = cf.checkExists().forPath("/node_curator/test");;
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
}
return stat;
}

/**
* @Title: add
* @Description: TODO(增加节点 , 可以增加 多级节点)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void add(CuratorFramework cf){
try {
String rs = cf.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath("/node_curator/test","xbq".getBytes());
System.out.println(rs);
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
}

/**
* @Title: update
* @Description: TODO(修改指定节点的值)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void update(CuratorFramework cf){
try {
Stat stat = cf.setData().forPath("/node_curator/test", "javaCoder".getBytes());
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
}

/**
* @Title: delete
* @Description: TODO(删除节点或者删除包括子节点在内的父节点)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void delete(CuratorFramework cf){
try {
// 递归删除的话,则输入父节点
cf.delete().deletingChildrenIfNeeded().forPath("/node_curator");
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
}

/**
* @Title: query
* @Description: TODO(查询节点的值)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void query(CuratorFramework cf){
try {
byte[] value = cf.getData().forPath("/node_curator/test");
System.out.println(new String(value));
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
}

/**
* @Title: transaction
* @Description: TODO(一组crud操作同生同灭)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void transaction(CuratorFramework cf){
try {
// 事务处理, 事务会自动回滚
Collection<CuratorTransactionResult> results = cf.inTransaction()
.create().withMode(CreateMode.PERSISTENT).forPath("/node_xbq1").and()
.create().withMode(CreateMode.PERSISTENT).forPath("/node_xbq2").and().commit();
// 遍历
for(CuratorTransactionResult result:results){
System.out.println(result.getResultStat() + "->" + result.getForPath() + "->" + result.getType());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
}

/**
* @Title: listener1
* @Description: TODO(监听 事件 -- 通过 usingWatcher 方法)
* 注意:通过CuratorWatcher 去监听指定节点的事件, 只监听一次
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void listener1(CuratorFramework cf){
try {
cf.getData().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
System.out.println("触发事件:" + event.getType());
}
}).forPath("/javaCoder");

System.in.read(); // 挂起,在控制台上输入 才停止
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
}

/**
* @Title: listener2
* @Description: TODO(监听 子节点的事件,不监听 自己 -- 通过 PathChildrenCacheListener 方法,推荐使用)
* @param @param cf 设定文件
* @return void 返回类型
* @throws
*/
public static void listener2(CuratorFramework cf) {
// 节点node_xbq不存在 会新增
PathChildrenCache cache = new PathChildrenCache(cf, "/node_xbq", true);
try {
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("触发事件:" + event.getType());
}
});
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
cf.close();
}
}
}

源码下载

点击阅读原文下载源码哦

坚持原创技术分享,您的支持将鼓励我继续创作!
-------------本文结束感谢您的阅读-------------
分享到:
0%