庆阳有人做农资网站吗,1688货源网官方网站,企业电子网站建设,文案写作网站为了避免Redis实现的分布式锁超时#xff0c;Redisson中引入了watch dog的机制#xff0c;他可以帮助我们在Redisson实例被关闭前#xff0c;不断的延长锁的有效期。
自动续租#xff1a;当一个Redisson客户端实例获取到一个分布式锁时#xff0c;如果没有指定锁的超时时…为了避免Redis实现的分布式锁超时Redisson中引入了watch dog的机制他可以帮助我们在Redisson实例被关闭前不断的延长锁的有效期。
自动续租当一个Redisson客户端实例获取到一个分布式锁时如果没有指定锁的超时时间Watchdog会基于Netty的时间轮启动一个后台任务定期向Redis发送命令重新设置锁的过期时间通常是锁的租约时间的1/3。这确保了即使客户端处理时间较长所持有的锁也不会过期。每次续期的时长默认情况下每10s钟做一次续期续期时长是30s。停止续期当锁被释放或者客户端实例被关闭时Watchdog会自动停止对应锁的续租任务。 底层实现
RedissonBaseLock.renewExpiration()
protected void scheduleExpirationRenewal(long threadId) {// 创建一个新的过期续期条目ExpirationEntry entry new ExpirationEntry();// 尝试将新的过期续期条目放入到过期续期映射中如果已存在则不替换ExpirationEntry oldEntry EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry ! null) {// 如果已存在则将线程ID添加到旧的过期续期条目中oldEntry.addThreadId(threadId);} else {// 如果是新的过期续期条目则添加线程ID并尝试续期entry.addThreadId(threadId);try {// 尝试续期renewExpiration();} finally {// 如果当前线程被中断则取消续期if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}
}// 定时任务执行续期
private void renewExpiration() {// 从过期续期映射中获取当前的过期续期条目ExpirationEntry ee EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee null) {// 如果没有找到则直接返回return;}// 创建一个新的定时任务用于执行续期逻辑Timeout task getServiceManager().newTimeout(new TimerTask() {Overridepublic void run(Timeout timeout) throws Exception {// 再次检查过期续期条目是否仍然存在ExpirationEntry ent EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent null) {return;}// 获取线程IDLong threadId ent.getFirstThreadId();if (threadId null) {return;}// 使用LUA脚本异步续期CompletionStageBoolean future renewExpirationAsync(threadId);future.whenComplete((res, e) - {if (e ! null) {// 如果有异常发生记录错误并从映射中移除过期续期条目log.error(Cant update lock {} expiration, getRawName(), e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// 如果续期成功则重新调度续期任务renewExpiration();} else {// 如果续期失败则取消续期cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);// 将定时任务与过期续期条目关联ee.setTimeout(task);
}// 使用LUA脚本进行续期
protected CompletionStageBoolean renewExpirationAsync(long threadId) {// 使用evalWriteAsync方法异步执行LUA脚本用于续期return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(pexpire, KEYS[1], ARGV[1]); return 1; end; return 0;,Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));
}可以看到上面的代码的主要逻辑就是用了一个TimerTask来实现了一个定时任务设置了internalLockLeaseTime / 3的时长进行一次锁续期。默认的超时时长是30s那么他会每10s进行一次续期通过LUA脚本进行续期再续30s
不过这个续期也不是无脑续他也是有条件的其中ExpirationEntry ent EXPIRATION_RENEWAL_MAP.get(getEntryName());这个值得我们关注他会从EXPIRATION_RENEWAL_MAP中尝试获取一个KV对如果查不到就不续期了。
EXPIRATION_RENEWAL_MAP这个东西会在unlock的时候操作的对他进行remove所以一个锁如果被解了那么就不会再继续续期了
Override
public void unlock() {try {// 异步执行解锁操作get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {// 检查异常是否是由于线程没有持有锁导致的if (e.getCause() instanceof IllegalMonitorStateException) {// 如果是则抛出原始的 IllegalMonitorStateException异常throw (IllegalMonitorStateException) e.getCause();} else {// 如果不是则抛出原始的RedisException异常throw e;}}
}Override
public RFutureVoid unlockAsync(long threadId) {// 使用getServiceManager执行解锁操作return getServiceManager().execute(() - unlockAsync0(threadId));
}private RFutureVoid unlockAsync0(long threadId) {// 异步执行解锁操作CompletionStageBoolean future unlockInnerAsync(threadId);// 处理异步操作的结果或异常CompletionStageVoid f future.handle((opStatus, e) - {// 取消续期任务cancelExpirationRenewal(threadId);if (e ! null) {// 如果有异常发生抛出CompletionExceptionif (e instanceof CompletionException) {throw (CompletionException) e;}throw new CompletionException(e);}if (opStatus null) {// 如果解锁操作失败抛出IllegalMonitorStateExceptionIllegalMonitorStateException cause new IllegalMonitorStateException(attempt to unlock lock, not locked by current thread by node id: id thread-id: threadId);throw new CompletionException(cause);}return null;});// 将CompletableFuture包装为RFuturereturn new CompletableFutureWrapper(f);
}protected void cancelExpirationRenewal(Long threadId) {// 从过期续期映射中获取过期续期条目ExpirationEntry task EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task null) {// 如果没有找到则直接返回return;}if (threadId ! null) {// 如果线程ID不为空则从过期续期条目中移除该线程IDtask.removeThreadId(threadId);}if (threadId null || task.hasNoThreads()) {// 如果线程ID为空或者过期续期条目中没有线程ID则取消定时任务Timeout timeout task.getTimeout();if (timeout ! null) {timeout.cancel();}// 从过期续期映射中移除过期续期条目EXPIRATION_RENEWAL_MAP.remove(getEntryName()); // 取消续期关键点}
}核心EXPIRATION_RENEWAL_MAP.remove(getEntryName());
一次unlock过程中对EXPIRATION_RENEWAL_MAP进行移除进而取消下一次锁续期的实现细节。
并且在unlockAsync方法中不管unlockInnerAsync是否执行成功还是抛了异常都不影响cancelExpirationRenewal的执行也可以理解为只要unlock方法被调用了即使解锁未成功那么也可以停止下一次的锁续期。 续期
加锁代码
/*** 尝试异步获取分布式锁。** param waitTime 等待获取锁的最大时间如果设置为-1则表示无限等待。* param leaseTime 锁的过期时间如果设置为-1则表示使用默认的过期时间。* param unit 时间单位用于将leaseTime转换为毫秒。* param threadId 当前线程的唯一标识符。* return 一个RFuture对象表示异步操作的结果如果成功获取锁则返回剩余的过期时间毫秒。* throws InterruptedException 如果当前线程在等待过程中被中断。*/
private RFutureLong tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 尝试获取锁的异步方法RFutureLong ttlRemainingFuture;// 如果锁的过期时间大于0则使用指定的过期时间if (leaseTime 0) {ttlRemainingFuture tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 如果锁的过期时间不大于0则使用内部锁的过期时间ttlRemainingFuture tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 处理没有同步获取锁的情况CompletionStageLong s handleNoSync(threadId, ttlRemainingFuture);// 将处理后的CompletionStage包装为RFuturettlRemainingFuture new CompletableFutureWrapper(s);// 当ttlRemainingFuture完成时如果ttlRemaining为null则表示锁已成功获取CompletionStageLong f ttlRemainingFuture.thenApply(ttlRemaining - {// 锁已获取if (ttlRemaining null) {// 如果锁的过期时间大于0则设置锁的过期时间if (leaseTime 0) {internalLockLeaseTime unit.toMillis(leaseTime);} else {// 如果锁的过期时间不大于0则安排锁的过期时间续期scheduleExpirationRenewal(threadId);}}// 返回ttlRemaining如果为null则表示锁已获取return ttlRemaining;});// 将处理后的CompletionStage包装为RFuturereturn new CompletableFutureWrapper(f);
}停止续期
如果一个锁的unlock方法被调用了那么就会停止续期。
那么取消续期的核心代码如下
/*** 取消与锁关联的自动续期任务。** param threadId 如果不为null则只取消与特定线程ID关联的续期任务。*/
protected void cancelExpirationRenewal(Long threadId) {// 从过期续期映射中获取当前的过期续期条目ExpirationEntry task EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task null) {// 如果没有找到对应的续期条目则直接返回return;}if (threadId ! null) {// 如果提供了线程ID则从续期条目中移除该线程IDtask.removeThreadId(threadId);}if (threadId null || task.hasNoThreads()) {// 如果没有提供线程ID或者续期条目中没有其他线程ID则取消定时任务Timeout timeout task.getTimeout();if (timeout ! null) {// 取消定时任务timeout.cancel();}// 从过期续期映射中移除过期续期条目EXPIRATION_RENEWAL_MAP.remove(getEntryName());}
}主要就是通过 EXPIRATION_RENEWAL_MAP.remove来做的。那么cancelExpirationRenewal还有下面一处调用
protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry new ExpirationEntry();ExpirationEntry oldEntry EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry ! null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}
}也就是说在尝试开启续期的过程中如果线程被中断了那么就会取消续期动作了。
目前Redisson是没有针对最大续期次数和最大续期时间的支持的。所以正常情况下如果没有解锁是会一直续期下去的。 客户端挂了锁会不会一直续期
Redission 是 redis 的客户端