快速开始
在上一篇文章中,已经从头到尾手写了一把基于 Redis 的分布式锁,除了自研外还可以直接使用 Redisson 框架,它提供了更加完善的基于 Redis 的分布式锁方案。
在项目 pom.xml 中引入 Redisson 依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.52.0</version>
</dependency>创建配置对象,并初始化 Redisson 实例:
// 配置对象
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:6379");
// 使用配置对象创建实例
RedissonClient redissonClient = Redisson.create(config);简单使用 Redisson 分布式锁:
// 创建分布式锁,指定锁的 Key
RLock lock = redisson.getLock("myLock");
lock.lock();
try {
// ...
} finally {
lock.unlock();
}可重入锁
在 Redisson 中分布式锁实现了 java.util.concurrent.locks.Lock 接口,且使用发布/订阅通道通知其他等待中的线程获取锁。
加锁
先看看 RedissonLock 中最关键的加锁逻辑,这里写了一段 Lua 脚本与 Redis 交互,原子化的实现了加锁与设置过期时间的逻辑。
进一步分析 Lua 脚本:
KEYS[1]不存在时,表示当前没有任何人持有锁,可以直接加锁,反之有人持有锁ARGV[2]是锁持有者的标识,对于持有者可以多次加锁hincrby是实现可重入锁的核心机制,记录当前锁重入次数,当 Key 不存在时会自动创建若加锁成功返回 nil,否则返回当前锁的过期时间
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}为了避免线程持有锁后突然宕机产生死锁,分布式锁一定要设置过期时间。将代码往前翻,发现当用户有传递过期时间时,使用用户传递的过期时间,否则使用默认的过期时间。
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
// ...
}默认的过期时间在配置类中定义为 30 秒,并且是 private 权限不允许修改。
public class Config {
// ...
private long lockWatchdogTimeout = 30 * 1000;
// ...
}继续往前翻代码,终于到了实际调用的 lock() 方法,用户可以根据需求定义锁的过期时间,默认传递 -1 即使用配置类中定义的 30 秒。
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// ...
}在加锁逻辑中,RedissonLock 会立刻尝试加锁,根据前面的分析已经知道加锁成功返回 null,否则会返回当前锁还剩余的过期时间。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// ...
}若加锁失败,则需要等待一段时间再次尝试加锁,但是要等待多久呢?在 RedissonLock 中使用了Redis 的发布订阅机制,而不是固定的等待时间,减少了线程与 Redis 无效交互的次数。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// ...
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
// ...
}最后就是自旋锁, entry.latch 是一个 Semaphore 信号量,使用 tryAcquire 方法最多等待到锁过期为止,等待到锁过期还未抢到 permit 则立刻与 Redis 交互尝试获取锁或刷新锁过期时间 TTL。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// ...
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
}解锁
RedissonLock 解锁也是靠一段 Lua 脚本原子化实现,将锁重入次数 -1,减到 0 时删除对应的锁并通过发布/订阅通道通知其他线程开始抢锁。
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local val = redis.call('get', KEYS[3]); " +
"if val ~= false then " +
"return tonumber(val);" +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
"return 1; " +
"end; ",
Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}虽然 Redis 是单线程执行命令,但是客户端可能会出现重复发送请求的情况。RedissonLock 使用 UnlockLatch 记录了短时间内的请求,避免了短时间内出现的多次重复请求造成锁逻辑错误的问题。
看门狗机制
在前面 RedissonLock 加锁成功后,还启动了一个看门狗机制,用于自动续期锁持有时间。
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}RedissonLock 看门狗实现了 Netty 的 TimerTask 接口,通过 HashedWheelTimer 时间轮定时给锁续期。
在 LockRenewalScheduler 中定义了全局唯一的定时任务,并使用 add 方法将当前线程与持有锁相关的信息加入定时任务中。
public void renewLock(String name, Long threadId, String lockName) {
reference.compareAndSet(null, new LockTask(internalLockLeaseTime, executor, batchSize));
LockTask task = reference.get();
task.add(name, lockName, threadId);
}这里传递的线程与持有锁相关信息,还会被封装成 LockEntry ,最终被保存在线程安全的 ConcurrentHashMap 中。
abstract class RenewalTask implements TimerTask {
// ...
final Map<String, LockEntry> name2entry = new ConcurrentHashMap<>();
// ...
}最终会在 schedule 方法中启用时间轮,每过超时时间的 1/3 就自动续期一次锁。
public void schedule() {
if (!running.get()) {
return;
}
long internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout();
executor.getServiceManager().newTimeout(this, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
}定时任务最后又会再次异步调用 schedule 方法,递归实现无限需求。
@Override
public void run(Timeout timeout) {
if (executor.getServiceManager().isShuttingDown()) {
return;
}
CompletionStage<Void> future = execute();
future.whenComplete((result, e) -> {
if (e != null) {
log.error("Can't update locks {} expiration", name2entry.keySet(), e);
schedule();
return;
}
schedule();
});
}在 LockTask 中定义的 renew 方法是实现锁续期的核心逻辑,通过迭代器获取所有要续期的锁,并通过 Lua 脚本一次性续期所有锁。
但是,Redis 是单线程工作如果一次性续期大量的锁,可能会导致主线程长时间无法完成工作,影响其他业务逻辑正常运行。这时 chunkSize 参数就很重要,限制了单次最多续期的锁数量,避免影响其他业务。
@Override
CompletionStage<Void> renew(Iterator<String> iter, int chunkSize) {
if (!iter.hasNext()) {
return CompletableFuture.completedFuture(null);
}
Map<String, Long> name2threadId = new HashMap<>(chunkSize);
List<Object> args = new ArrayList<>(chunkSize + 1);
args.add(internalLockLeaseTime);
List<String> keys = new ArrayList<>(chunkSize);
while (iter.hasNext()) {
String key = iter.next();
LockEntry entry = name2entry.get(key);
if (entry == null) {
continue;
}
Long threadId = entry.getFirstThreadId();
if (threadId == null) {
continue;
}
keys.add(key);
args.add(entry.getLockName(threadId));
name2threadId.put(key, threadId);
if (keys.size() == chunkSize) {
break;
}
}
if (keys.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
// ...
}接着将所有锁信息打包成一个 Lua 脚本交给 Redis 执行,每个锁都会判断其存在性。
锁存在,续期并记录 1
锁不存在,记录 0
最终返回记录的列表
对于已经不存在的锁,则取消其自动续期的任务。
另外,一次可能无法给所有锁都续期,需要递归调用 renew 方法继续给其他锁续期。
@Override
CompletionStage<Void> renew(Iterator<String> iter, int chunkSize) {
// ...
String firstName = keys.get(0);
CompletionStage<List<String>> f = executor.syncedEval(firstName, LongCodec.INSTANCE,
new RedisCommand<>("EVAL", new ContainsDecoder<>(keys)),
"local result = {} " +
"for i = 1, #KEYS, 1 do " +
"if (redis.call('hexists', KEYS[i], ARGV[i + 1]) == 1) then " +
"redis.call('pexpire', KEYS[i], ARGV[1]); " +
"table.insert(result, 1); " +
"else " +
"table.insert(result, 0); " +
"end; " +
"end; " +
"return result;",
new ArrayList<>(keys),
args.toArray());
return f.thenCompose(existingNames -> {
keys.removeAll(existingNames);
for (String key : keys) {
cancelExpirationRenewal(key, name2threadId.get(key));
}
return renew(iter, chunkSize);
});
}单点故障
前面的讨论都是建立在单个 Redis 节点正常的情况,如果这个 Redis 宕机了是否还有效?
Redis 天生就支持故障转移机制,无论是哨兵还是集群,都能让在一个 Master 出现故障时,将另一个 Slave 提升为 Master 继续工作。
但是,Redis 是 AP 模型不支持强一致性,当 Master 异步将锁同步给 Slave 期间突然宕机,由于 Slave 未同步到锁就被提升成了 Master,客户端又可以再次获取锁,出现客户端一锁多持的情况。
为了避免这种单点故障的问题,可以使用 MultiLock 将多个独立的 Redis 组合在一起使用。
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);
multiLock.lock();
try {
// ...
} finally {
multiLock.unlock();
}RedLock
Redis 官方文档中还有关于 RedLock 的算法介绍,该算法也存在一些问题在新版本的 Redisson 中已经被废弃。
既然已经被废弃那就不讨论,有兴趣可以自己看看官方文档与相关文章。