ZooKeeper(八)-- Curator实现分布式锁 发表于 2019-06-02 | 分类于 Zookeeper | 阅读次数: 字数统计: 482 字 | 阅读时长 ≈ 3 分钟 1.pom.xml123456789101112131415161718<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.5.0</version> </dependency></dependencies> 2.JAVA代码12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788package com.xbq.zookeeper.curator;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.RetryNTimes;/** * 使用Curator来实现分布式锁 * @author xbq */public class LockByCurator { // 此demo使用的集群,所以有多个ip和端口 private static String CONNECT_SERVER = "192.168.242.129:2181,192.168.242.129:2182,192.168.242.129:2183"; // session过期时间 private static int SESSION_TIMEOUT = 3000; // 连接超时时间 private static int CONNECTION_TIMEOUT = 3000; // 锁节点 private static final String CURATOR_LOCK = "/curatorLock"; /** * 获取锁操作 * @param cf */ public static void doLock(CuratorFramework cf){ System.out.println(Thread.currentThread().getName() + " 尝试获取锁!"); // 实例化 zk分布式锁 InterProcessMutex mutex = new InterProcessMutex(cf, CURATOR_LOCK); try { // 判断是否获取到了zk分布式锁 if(mutex.acquire(5, TimeUnit.SECONDS)){ System.out.println(Thread.currentThread().getName() + " 获取到了锁!-------"); // 业务操作 Thread.sleep(5000); } } catch (Exception e) { e.printStackTrace(); } finally { try { // 释放锁 mutex.release(); } catch (Exception e) { e.printStackTrace(); } } } /** * 测试 * @param args */ public static void main(String[] args) { // 定义线程池 ExecutorService service = Executors.newCachedThreadPool(); // 定义信号灯,只能允许10个线程并发操作 final Semaphore semaphore = new Semaphore(10); // 模拟10个客户端 for(int i=0; i < 10 ;i++){ Runnable runnable = new Runnable() { @Override public void run() { try { semaphore.acquire(); // 连接 ZooKeeper CuratorFramework framework = CuratorFrameworkFactory. newClient(CONNECT_SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, new RetryNTimes(10,5000)); // 启动 framework.start(); doLock(framework); semaphore.release(); } catch (Exception e) { } } }; service.execute(runnable); } service.shutdown(); }} 坚持原创技术分享,您的支持将鼓励我继续创作! 打赏 微信支付 支付宝 -------------本文结束感谢您的阅读-------------