残云cyun
残云cyun
发布于 2025-11-02 / 9 阅读
0
0

Redisson 分布式锁源码分析

快速开始

在上一篇文章中,已经从头到尾手写了一把基于 Redis 的分布式锁,除了自研外还可以直接使用 Redisson 框架,它提供了更加完善的基于 Redis 的分布式锁方案。

在项目 pom.xml 中引入 Redisson 依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.52.0</version>
</dependency>

创建配置对象,并初始化 Redisson 实例:

// 配置对象
Config config = new Config();
config.useSingleServer()
        .setAddress("redis://localhost:6379");

// 使用配置对象创建实例
RedissonClient redissonClient = Redisson.create(config);

简单使用 Redisson 分布式锁:

// 创建分布式锁,指定锁的 Key
RLock lock = redisson.getLock("myLock");

lock.lock();
try {
    // ...
} finally {
    lock.unlock();
}

可重入锁

在 Redisson 中分布式锁实现了 java.util.concurrent.locks.Lock 接口,且使用发布/订阅通道通知其他等待中的线程获取锁。

classDiagram direction TB Lock <|-- RLock RLock <|.. RedissonBaseLock RedissonBaseLock <|-- RedissonLock PubSubEntry <|-- RedissonLockEntry PubSubEntry <.. PublishSubscribe PublishSubscribe <|-- LockPubSub LockPubSub <.. RedissonLock RedissonLockEntry <.. RedissonLock class Lock { <<interface>> } class RLock { <<interface>> } class PublishSubscribe { <<abstract>> } class LockPubSub class RedissonBaseLock { + unlock() # unlockInnerAsync() } class RedissonLock { ~ internalLockLeaseTime ~ pubSub # commandExecutor + lock() - tryAcquire() - tryAcquireAsync() ~ tryLockInnerAsync() # unlockInnerAsync() } class RedissonLockEntry { - latch - promise - listeners + getLatch() } class PubSubEntry { <<interface>> }

加锁

先看看 RedissonLock 中最关键的加锁逻辑,这里写了一段 Lua 脚本与 Redis 交互,原子化的实现了加锁与设置过期时间的逻辑。

进一步分析 Lua 脚本:

  • KEYS[1] 不存在时,表示当前没有任何人持有锁,可以直接加锁,反之有人持有锁

  • ARGV[2] 是锁持有者的标识,对于持有者可以多次加锁

  • hincrby 是实现可重入锁的核心机制,记录当前锁重入次数,当 Key 不存在时会自动创建

  • 若加锁成功返回 nil,否则返回当前锁的过期时间

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, command,
            "if ((redis.call('exists', KEYS[1]) == 0) " +
                    "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
            "end; " +
            "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

为了避免线程持有锁后突然宕机产生死锁,分布式锁一定要设置过期时间。将代码往前翻,发现当用户有传递过期时间时,使用用户传递的过期时间,否则使用默认的过期时间。

private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  RFuture<Long> ttlRemainingFuture;
  if (leaseTime > 0) {
    ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  } else {
    ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                           TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  }
  // ...
}

默认的过期时间在配置类中定义为 30 秒,并且是 private 权限不允许修改。

public class Config {
    // ...
    private long lockWatchdogTimeout = 30 * 1000;
    // ...
}

继续往前翻代码,终于到了实际调用的 lock() 方法,用户可以根据需求定义锁的过期时间,默认传递 -1 即使用配置类中定义的 30 秒。

@Override
public void lock() {
  try {
    lock(-1, null, false);
  } catch (InterruptedException e) {
    throw new IllegalStateException();
  }
}

@Override
public void lock(long leaseTime, TimeUnit unit) {
  try {
    lock(leaseTime, unit, false);
  } catch (InterruptedException e) {
    throw new IllegalStateException();
  }
}

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  // ...
}

在加锁逻辑中,RedissonLock 会立刻尝试加锁,根据前面的分析已经知道加锁成功返回 null,否则会返回当前锁还剩余的过期时间。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
    return;
  }

  // ...
}

若加锁失败,则需要等待一段时间再次尝试加锁,但是要等待多久呢?在 RedissonLock 中使用了Redis 的发布订阅机制,而不是固定的等待时间,减少了线程与 Redis 无效交互的次数。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  // ...

  CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
  pubSub.timeout(future);
  RedissonLockEntry entry;
  if (interruptibly) {
    entry = commandExecutor.getInterrupted(future);
  } else {
    entry = commandExecutor.get(future);
  }

  // ...
}

最后就是自旋锁, entry.latch 是一个 Semaphore 信号量,使用 tryAcquire 方法最多等待到锁过期为止,等待到锁过期还未抢到 permit 则立刻与 Redis 交互尝试获取锁或刷新锁过期时间 TTL。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  // ...

  try {
    while (true) {
      ttl = tryAcquire(-1, leaseTime, unit, threadId);
      // lock acquired
      if (ttl == null) {
        break;
      }

      // waiting for message
      if (ttl >= 0) {
        try {
          entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
          if (interruptibly) {
            throw e;
          }
          entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        }
      } else {
        if (interruptibly) {
          entry.getLatch().acquire();
        } else {
          entry.getLatch().acquireUninterruptibly();
        }
      }
    }
  } finally {
    unsubscribe(entry, threadId);
  }
}

解锁

RedissonLock 解锁也是靠一段 Lua 脚本原子化实现,将锁重入次数 -1,减到 0 时删除对应的锁并通过发布/订阅通道通知其他线程开始抢锁。

protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
  return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                                     "local val = redis.call('get', KEYS[3]); " +
                                     "if val ~= false then " +
                                     "return tonumber(val);" +
                                     "end; " +

                                     "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                                     "return nil;" +
                                     "end; " +
                                     "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                                     "if (counter > 0) then " +
                                     "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                                     "redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
                                     "return 0; " +
                                     "else " +
                                     "redis.call('del', KEYS[1]); " +
                                     "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
                                     "redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
                                     "return 1; " +
                                     "end; ",
                                     Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
                                     LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
                                     getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}

虽然 Redis 是单线程执行命令,但是客户端可能会出现重复发送请求的情况。RedissonLock 使用 UnlockLatch 记录了短时间内的请求,避免了短时间内出现的多次重复请求造成锁逻辑错误的问题。

sequenceDiagram Client ->> Redis: Unlock Note over Redis: get KEYS[3] -> nil<br/>...<br/>set KEYS[3] Redis --x Client: 未及时收到响应 Client ->> Redis: Retry Note over Redis: get KEYS[3] Redis -->> Client: result

看门狗机制

在前面 RedissonLock 加锁成功后,还启动了一个看门狗机制,用于自动续期锁持有时间。

private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  RFuture<Long> ttlRemainingFuture;
  if (leaseTime > 0) {
    ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  } else {
    ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                           TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  }
  CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
  ttlRemainingFuture = new CompletableFutureWrapper<>(s);

  CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
    // lock acquired
    if (ttlRemaining == null) {
      if (leaseTime > 0) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
      } else {
        scheduleExpirationRenewal(threadId);
      }
    }
    return ttlRemaining;
  });
  return new CompletableFutureWrapper<>(f);
}

RedissonLock 看门狗实现了 Netty 的 TimerTask 接口,通过 HashedWheelTimer 时间轮定时给锁续期。

classDiagram direction LR LockTask <.. LockRenewalScheduler RenewalTask <|-- LockTask TimerTask <|.. RenewalTask class LockRenewalScheduler { - reference - executor + renewLock() } class LockTask { + renew() + add() } class RenewalTask { <<abstract>> - executor - name2entry # execute() + schedule() + run() } class TimerTask { <<interface>> + run() }

在 LockRenewalScheduler 中定义了全局唯一的定时任务,并使用 add 方法将当前线程与持有锁相关的信息加入定时任务中。

public void renewLock(String name, Long threadId, String lockName) {
  reference.compareAndSet(null, new LockTask(internalLockLeaseTime, executor, batchSize));
  LockTask task = reference.get();
  task.add(name, lockName, threadId);
}

这里传递的线程与持有锁相关信息,还会被封装成 LockEntry ,最终被保存在线程安全的 ConcurrentHashMap 中。

abstract class RenewalTask implements TimerTask {
  // ...
  final Map<String, LockEntry> name2entry = new ConcurrentHashMap<>();
  // ...
}

最终会在 schedule 方法中启用时间轮,每过超时时间的 1/3 就自动续期一次锁。

public void schedule() {
  if (!running.get()) {
    return;
  }

  long internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout();
  executor.getServiceManager().newTimeout(this, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
}

定时任务最后又会再次异步调用 schedule 方法,递归实现无限需求。

@Override
public void run(Timeout timeout) {
  if (executor.getServiceManager().isShuttingDown()) {
    return;
  }

  CompletionStage<Void> future = execute();
  future.whenComplete((result, e) -> {
    if (e != null) {
      log.error("Can't update locks {} expiration", name2entry.keySet(), e);
      schedule();
      return;
    }

    schedule();
  });
}

在 LockTask 中定义的 renew 方法是实现锁续期的核心逻辑,通过迭代器获取所有要续期的锁,并通过 Lua 脚本一次性续期所有锁。

但是,Redis 是单线程工作如果一次性续期大量的锁,可能会导致主线程长时间无法完成工作,影响其他业务逻辑正常运行。这时 chunkSize 参数就很重要,限制了单次最多续期的锁数量,避免影响其他业务。

@Override
CompletionStage<Void> renew(Iterator<String> iter, int chunkSize) {
  if (!iter.hasNext()) {
    return CompletableFuture.completedFuture(null);
  }

  Map<String, Long> name2threadId = new HashMap<>(chunkSize);
  List<Object> args = new ArrayList<>(chunkSize + 1);
  args.add(internalLockLeaseTime);

  List<String> keys = new ArrayList<>(chunkSize);
  while (iter.hasNext()) {
    String key = iter.next();

    LockEntry entry = name2entry.get(key);
    if (entry == null) {
      continue;
    }
    Long threadId = entry.getFirstThreadId();
    if (threadId == null) {
      continue;
    }

    keys.add(key);
    args.add(entry.getLockName(threadId));
    name2threadId.put(key, threadId);

    if (keys.size() == chunkSize) {
      break;
    }
  }

  if (keys.isEmpty()) {
    return CompletableFuture.completedFuture(null);
  }

  // ...
}

接着将所有锁信息打包成一个 Lua 脚本交给 Redis 执行,每个锁都会判断其存在性。

  • 锁存在,续期并记录 1

  • 锁不存在,记录 0

  • 最终返回记录的列表

对于已经不存在的锁,则取消其自动续期的任务。

另外,一次可能无法给所有锁都续期,需要递归调用 renew 方法继续给其他锁续期。

@Override
CompletionStage<Void> renew(Iterator<String> iter, int chunkSize) {
  // ...
  String firstName = keys.get(0);

  CompletionStage<List<String>> f = executor.syncedEval(firstName, LongCodec.INSTANCE,
                   new RedisCommand<>("EVAL", new ContainsDecoder<>(keys)),
                   "local result = {} " +
                   "for i = 1, #KEYS, 1 do " +
                       "if (redis.call('hexists', KEYS[i], ARGV[i + 1]) == 1) then " +
                            "redis.call('pexpire', KEYS[i], ARGV[1]); " +
                            "table.insert(result, 1); " +
                       "else " +
                            "table.insert(result, 0); " +
                       "end; " +
                   "end; " +
                   "return result;",
                   new ArrayList<>(keys),
                   args.toArray());

  return f.thenCompose(existingNames -> {
    keys.removeAll(existingNames);
    for (String key : keys) {
      cancelExpirationRenewal(key, name2threadId.get(key));
    }
    return renew(iter, chunkSize);
  });
}

单点故障

前面的讨论都是建立在单个 Redis 节点正常的情况,如果这个 Redis 宕机了是否还有效?

Redis 天生就支持故障转移机制,无论是哨兵还是集群,都能让在一个 Master 出现故障时,将另一个 Slave 提升为 Master 继续工作。

但是,Redis 是 AP 模型不支持强一致性,当 Master 异步将锁同步给 Slave 期间突然宕机,由于 Slave 未同步到锁就被提升成了 Master,客户端又可以再次获取锁,出现客户端一锁多持的情况。

sequenceDiagram participant Thread A participant Thread B participant Master participant Slave Thread A ->> Master: lock Master --x Slave: sync data Note over Master: 突然宕机 Note over Slave: 提升为 Master Thread B ->> Slave: lock Note over Thread A,Thread B: 一锁多持

为了避免这种单点故障的问题,可以使用 MultiLock 将多个独立的 Redis 组合在一起使用。

RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");

RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);

multiLock.lock();
try {
  // ...
} finally {
  multiLock.unlock();
}

RedLock

Redis 官方文档中还有关于 RedLock 的算法介绍,该算法也存在一些问题在新版本的 Redisson 中已经被废弃。

既然已经被废弃那就不讨论,有兴趣可以自己看看官方文档与相关文章。

参考资料

https://www.bilibili.com/video/BV13R4y1v7sP?p=139https://redis.io/docs/latest/develop/clients/patterns/distributed-locks/https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html


评论