ZooKeeper(八)-- Curator实现分布式锁

1.pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>

2.JAVA代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.xbq.zookeeper.curator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;

/**
* 使用Curator来实现分布式锁
* @author xbq
*/
public class LockByCurator {

// 此demo使用的集群,所以有多个ip和端口
private static String CONNECT_SERVER = "192.168.242.129:2181,192.168.242.129:2182,192.168.242.129:2183";
// session过期时间
private static int SESSION_TIMEOUT = 3000;
// 连接超时时间
private static int CONNECTION_TIMEOUT = 3000;

// 锁节点
private static final String CURATOR_LOCK = "/curatorLock";

/**
* 获取锁操作
* @param cf
*/
public static void doLock(CuratorFramework cf){
System.out.println(Thread.currentThread().getName() + " 尝试获取锁!");
// 实例化 zk分布式锁
InterProcessMutex mutex = new InterProcessMutex(cf, CURATOR_LOCK);
try {
// 判断是否获取到了zk分布式锁
if(mutex.acquire(5, TimeUnit.SECONDS)){
System.out.println(Thread.currentThread().getName() + " 获取到了锁!-------");
// 业务操作
Thread.sleep(5000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}

/**
* 测试
* @param args
*/
public static void main(String[] args) {
// 定义线程池
ExecutorService service = Executors.newCachedThreadPool();
// 定义信号灯,只能允许10个线程并发操作
final Semaphore semaphore = new Semaphore(10);
// 模拟10个客户端
for(int i=0; i < 10 ;i++){
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
// 连接 ZooKeeper
CuratorFramework framework = CuratorFrameworkFactory.
newClient(CONNECT_SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, new RetryNTimes(10,5000));
// 启动
framework.start();
doLock(framework);

semaphore.release();
} catch (Exception e) {

}
}
};
service.execute(runnable);
}
service.shutdown();
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!
-------------本文结束感谢您的阅读-------------
分享到:
0%