外贸网站,做公司门户网站的重点,中国甘肃网,wap网页制作工具前言
我之前写了一篇快速上手ZK的文章#xff1a;https://blog.csdn.net/qq_38974073/article/details/135293106
本篇最要是进一步加深学习ZK#xff0c;算是一次简单的实践#xff0c;巩固学习成果。
设计一个分布式锁
对锁的基本要求
可重入#xff1a;允许同一个应…前言
我之前写了一篇快速上手ZK的文章https://blog.csdn.net/qq_38974073/article/details/135293106
本篇最要是进一步加深学习ZK算是一次简单的实践巩固学习成果。
设计一个分布式锁
对锁的基本要求
可重入允许同一个应用内的同一个线程重复调用同一个方法阻塞没有拿到锁的线程将进入阻塞。公平的先来先得。
实现原理
使用zk作为发号器每个线程申请锁时会创建一个临时有序节点
节点编号最小的获得锁完成业务操作之后删除临时节点如果不是最小编号的节点就监听前一个节点的删除事件并进入阻塞状态当触发回调的事件时唤醒阻塞线程并重新进行获取锁操作。
锁要求实现的描述
可重入对同一个线程不用重复获取锁重入计数1即可阻塞利用CountDownLatch实现当触发回调时唤醒线程公平的利用zk临时有序节点的特点进行排队先到先申请锁。
问申请到锁之后网络中断怎么办
临时节点随客户端关闭而被删除
问如何避免羊群效应
每个线程只监听前一个节点
关键流程 关键代码实现
锁的关键方法
加锁lock解锁unLock尝试加锁tryLock public boolean lock() {if(Thread.currentThread().equals(thread)) {lockCount.incrementAndGet();return true;}while (true) {if (tryLock()) {thread Thread.currentThread();lockCount.incrementAndGet();return true;}try {await();} catch (Exception e) {throw new RuntimeException(e);}}
}public synchronized boolean unlock() {if (!thread.equals(Thread.currentThread())) {return false;}int newLockCount lockCount.decrementAndGet();if (newLockCount 0) {throw new IllegalMonitorStateException(重入锁计数不可为负数 );}// 是否剩余重入次数if (newLockCount ! 0) {return true;}// 到这一步意味着lockCount已经为0可以删除临时节点了try{if(client.isNodeExist(properties.getZkPath())) {client.deleteNode(lockedPathMap.get(thread));}} catch (Exception e) {return false;} finally {lockedPathMap.remove(thread);priorPathMap.remove(thread);}return true;
}protected boolean tryLock() {String lockedPath lockedPathMap.get(Thread.currentThread());if (null lockedPath || !client.isNodeExist(lockedPath)) {lockedPathMap.put(Thread.currentThread(), lockedPath client.createEphemeralSeqNode(getLockPrefix()));}// 取得加锁的排队编号String lockedShortPath getShorPath(lockedPath);ListString waiters getWaiters();// 如果自己是所有等待锁中的第一个则获得锁if (checkLocked(waiters, lockedShortPath)) {return true;}// 当前线程节点是否在排队int index Collections.binarySearch(waiters, lockedShortPath);if(index 0) {throw new NullPointerException(可能网络抖动连接断开临时节点失效);}// waiters最后面的节点写入map用来监听priorPathMap.put(Thread.currentThread(), getLockPrefix() waiters.get(index - 1));return false;
}private boolean await() throws Exception {String priorPath priorPathMap.get(Thread.currentThread());if (null priorPath) {throw new NullPointerException(prior_path error);}final CountDownLatch latch new CountDownLatch(1);// 删除事件Watcher w watchedEvent - {// 监测到前一个节点发生变化接下来就可以唤起等待线程重新尝试获取锁latch.countDown();};try{// 监听前一个节点的删除时间client.watcher(w, priorPath);} catch (KeeperException.NoNodeException e) {e.printStackTrace();return false;}return latch.await(properties.getTimeout(), TimeUnit.MILLISECONDS);
}好了如果你对这个感兴趣不妨拉一下完整源码 https://gitee.com/liangshij/zk-lock-demo
源码简要说明
模块说明
lsj-zk-lock核心实现。lsj-zk-lock-spring-boot-starter整合springbootlsj-zk-lock-test使用demo
安装 经典三步走导包、配置、使用 拉取代码将lsj-zk-lock、lsj-zk-lock-spring-boot-starter通过 mvn install 命令安装到本地仓库。引入依赖
dependencygroupIdcn.lsj/groupIdartifactIdlsj-zk-lock-spring-boot-starter/artifactIdversion2.4.2/version
/dependency配置
配置locks和dataSource
spring:zk:dataSource:url: localhostport: 2181locks:- zkPath: /test/locklockName: countLock# 获取锁失败时进入等待的时间等待结束将重新尝试获取锁timeout: 5000- zkPath: /test2/locklockName: locktimeout: 5000使用
使用方式1通过GlobalLock注解指定要使用那个lock
GetMapping(test2)
GlobalLock(countLock)
public String test2() {// 业务代码return ;
}使用方式2通过Qualifier注解指定要使用那个lock
RestController
public class TestController {int count 0;ResourceQualifier(lock)private ReentrantLock lock;ResourceQualifier(countLock)private ReentrantLock countLock;GetMapping(test)public String test() {countLock.lock();try{for (int i 0; i 10000; i) {count;}} finally {countLock.unlock();}return String.valueOf(count);}
}