分布式锁 – Redisson的看门狗(watchdog)机制

前言

本篇文章从Redisson的加锁(tryLock)入手,带大家由源码来了解一下watchdog的自动延迟加锁操作,如果对Redisson的加锁机制没有了解,建议可以先看一下本人的另一篇博客分布式锁-Redisson的使用及源码分析

结论

  • 想要触发Redisson看门狗机制,不能自定义 leaseTime(或者传参 -1)
  • Redisson默认加锁30秒,每隔10秒刷新加锁时间
  • watchdog的延时时间 可以由 lockWatchdogTimeout指定默认延时时间,但是不要设置太小
  • Redisson是通过Future和Timeout功能来实现异步延时

源码分析

前文讲到 tryLock() 中最重要的加锁逻辑是 tryAcquire(waitTime, leaseTime, unit, threadId),今天我们就来从加锁的方法入手看一下如何完成WatchDog

Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);

接下来,我们进入这个方法看一下

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// get() 方法异步阻塞式获取加锁结果return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}/** * 重点方法 看这里 */private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Boolean> acquiredFuture;// 1、判断锁的持有时间是否由用户自定义(这里我不在细节讨论,感兴趣的童鞋们可以去看上一篇文章)if (leaseTime != -1) {acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {// 2、当用户没有自定义锁占有时间时,默认传入 internalLockLeaseTime// private long lockWatchdogTimeout = 30 * 1000; 默认30秒acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {// lock acquiredif (acquired) {if (leaseTime != -1) {// 3、如果用户传入占用时间直接转换,把默认值internalLockLeaseTime 更新为用户自定义的占有时间internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 4、看这里,看这里,没错,这里就是触发看门狗机制的方法// 重点:只有当 leaseTime == -1时才会触发看门狗机制scheduleExpirationRenewal(threadId);}}return acquired;});return new CompletableFutureWrapper<>(f);}

我们来看一下方法:scheduleExpirationRenewal(threadId);

protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();// 1、EXPIRATION_RENEWAL_MAP 是一个全局的静态常量MapExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {// 2、oldEntry != null 说明该线程不是第一次触发oldEntry.addThreadId(threadId);} else {// 3、oldEntry == null 说明该线程是第一次触发entry.addThreadId(threadId);try {// 4、更新过期时间renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}}

接下来,我们一起看一下renewExpiration()是如何更新过期时间的

private void renewExpiration() {// 1、获取当前线程的更新对象ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}// 2、创建了一个定时任务Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 3、异步更新过期时间RFuture<Boolean> future = renewExpirationAsync(threadId);// 4、res 执行结果,e 异常future.whenComplete((res, e) -> {if (e != null) {// 5、如果出现异常,从map中删除,直接返回log.error("Can't update lock " + getRawName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// 6、如果没有报错,就再次定时延期renewExpiration();} else {// 7、否则取消定时cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);// 8、延迟时间为:internalLockLeaseTime / 3 也就是10秒ee.setTimeout(task);}// 通过lua脚本判断给key重新设置过期时间protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getRawName()),internalLockLeaseTime, getLockName(threadId));}