锁与分布式锁
操作系统的概念中,”锁“是在并发环境下的一种同步机制,控制共享资源的并发访问,以确保数据的一致性与正确性。
在分布式的场景下,每个微服务都会独立启动一个 JVM,这时再使用 JVM 锁(如 Synchronized)将不再有效。
要实现一个可靠的分布式锁需要具备以下几个条件:
独占性:在任何时刻,只能有一个线程持有锁
高可用:不能因为某个 Redis 节点宕机,导致微服务加锁和解锁失败
防死锁:必须有超时解锁机制,避免微服务在临界区突然宕机,导致永远无法解锁
安全性:锁只能被持有者解锁,不能被其他微服务解锁
可重入性:同一个节点同一个线程,持有锁后可以再次上锁
JVM 锁
场景搭建
首先,在 Redis 中记录有商品库存量,每当商品被销售时库存自减。
127.0.0.1:6379> set inventory 1000
OK
127.0.0.1:6379> decr inventory
(integer) 999
127.0.0.1:6379> decr inventory
(integer) 998然后,在 Java 中写一个服务,模拟销售商品的动作。
tips: 为了与后续代码结构保持一致,这里使用 ReentrantLock 而不是 Synchronized。
@Service
public class InventoryServiceImpl implements InventoryService {
@Resource
private StringRedisTemplate stringRedisTemplate;
private final Lock lock = new ReentrantLock();
@Override
public String sale() {
lock.lock();
try {
String key = "inventory";
// 查询库存
String result = stringRedisTemplate.opsForValue().get(key);
int inventoryNum = result == null ? 0 : Integer.parseInt(result);
if (inventoryNum > 0) {
// 扣减库存
Long left = stringRedisTemplate.opsForValue().decrement(key);
return String.format("成功卖出一个商品,库存剩余:%d", left);
}
} finally {
lock.unlock();
}
return "商品已售罄";
}
}接着,为了复现分布式场景下存在的问题,将启动两个微服务并配合 Nginx 负载均衡。
upstream services {
server localhost:8000 weight=1;
server localhost:9000 weight=1;
}
server {
listen 8888;
server_name localhost;
location / {
proxy_pass http://services;
}
}最后,使用 JMeter 启动 1500 个线程(大于库存数量)向 Nginx 发送请求,结束后查询 Redis 库存量发现是 -1,出现了商品超卖现象。
127.0.0.1:6379> get inventory
"-1"问题分析
前面在代码中已经使用 ReentrantLock 对销售动作加锁了,为什么还会出现超卖现象呢?
究其根本,无论是 ReentrantLock 还是 Synchronized 都是 JVM 内部的锁机制,当启动多个微服务时每个服务都会持有一把独立的锁,不会影响到其他微服务。
为什么结果库存是 -1 而不是其他值?
虽然 JVM 锁无法影响其他微服务,但是单体的锁还是生效的,即同一时刻只有一个线程会访问 Redis。另外,由于 Redis 工作线程是单线程且 decr 本身也是原子操作,就不存在产品被重复销售的问题。
因为两个微服务持有的锁不是同一把,会导致在最后时刻两个微服务请求 Redis 获取库存量时,可能都返回了还有 1 个库存,结果导致商品超售。随着微服务数量提升,超卖数量会更加严重。
Redis 分布式锁
既然知道了单机版锁的问题所在,那么将锁从 JVM 内部移动到第三方服务中,让所有微服务竞争同一把锁就行了。
在 Redis 中,可以使用 SETNX 命令来抢锁,该命令只有在 Key 不存在时才能设置成功,若 Key 已存在则返回 0 代表抢锁失败。反之,解锁只需要将 Key 删除即可。
SETNX key value修改前面的代码,将 ReentrantLock 改成自旋锁,若加锁失败则休眠 20 ms 再次尝试加锁,避免 CPU 不停空转。
@Service
public class InventoryServiceImpl implements InventoryService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public String sale() {
String redisLockKey = "redisLock";
// 自旋锁,重复尝试加锁
while(Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, "1"))) {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
try {
String key = "inventory";
// 查询库存
String result = stringRedisTemplate.opsForValue().get(key);
int inventoryNum = result == null ? 0 : Integer.parseInt(result);
if (inventoryNum > 0) {
// 扣减库存
Long left = stringRedisTemplate.opsForValue().decrement(key);
return String.format("成功卖出一个商品,库存剩余:%d", left);
}
} finally {
// 最后一定要解锁
stringRedisTemplate.delete(redisLockKey);
}
return "商品已售罄";
}
}超时解锁
至此,已经基于 SETNX 实现了一个最简单的分布式锁。但是,在分布式的场景中,如果有一个微服务在加锁后,还未解锁就宕机了,会发生什么?
答案是会产生死锁,其他微服务永远都无法得到也无法解开这个锁。
为了避免因微服务宕机而导致死锁尝试,在获取锁时必须给 Key 设置过期时间!!若微服务突然宕机,等待超时后自动将 Key 移除,其他微服务可以继续工作。
127.0.0.1:6379> setnx redisLock 1
(integer) 1
127.0.0.1:6379> expire redisLock 30
(integer) 1像这样使用两条命令,也有可能会在上锁后未设置过期时间服务器就宕机了,正确做法是合并成一个原子操作。
127.0.0.1:6379> set redisLock 1 NX EX 30
OK
127.0.0.1:6379> set redisLock 1 NX EX 30
(nil)在 Java 代码中添加过期时间
while(Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, "1", 10, TimeUnit.SECONDS))) {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}防误删
当分布式锁加入过期时间后,又出现了一个新的问题。假设, 其中一个微服务 A 因为网络延迟等原因(微服务 A 并没有宕机)处理时间超过了分布式锁的生存时间。
这时其他服务会立刻开始抢夺锁并进入临界区,随后,微服务 A 处理完成执行了解锁操作,导致微服务 A 解开了其他微服务上的锁。
这就需要加入防误删机制,每个线程都只能删除自己持有的锁。在 Java 中可以生成一个 UUID 并加上当前的线程 ID 作为分布式锁的值。
@Override
public String sale() {
String redisLockKey = "redisLock";
// 将 UUID:ThreadId 作为分布式锁的值
String redisLockValue = String.format("%s:%s", UUID.randomUUID(), Thread.currentThread().getId());
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue()
.setIfAbsent(redisLockKey, redisLockValue, 10, TimeUnit.SECONDS))) {
// ...
}
try {
// ...
} finally {
// 只能删除自己持有的锁
if (Objects.requireNonNull(stringRedisTemplate.opsForValue().get(redisLockKey)).equalsIgnoreCase(redisLockValue)) {
stringRedisTemplate.delete(redisLockKey);
}
}
return "商品已售罄";
}这么写好像很合理,先从 Redis 中获取锁的值,再判断是不是自己持有的锁,只删除自己持有的锁。
但是,这里与 Redis 交互了两次,考虑一个极端的情况:当微服务 A 获取到锁的值后,正好锁过期了,然后微服务 B 开始上锁,紧接着微服务 A 向 Redis 发送了解锁命令。
又再次出现了锁误删的情况,原因是判断锁与删除锁不是一套原子操作,这是完全不同的两条命令无法像上面 SETNX 一样一条命令完成加锁与设置过期时间两个操作。
基于此,Redis 提供了 EVAL 命令结合 Lua 脚本,可以将一系列操作原子化。
EVAL script numkeys [key [key ...]] [arg [arg ...]]在 Lua 脚本中可以使用 redis.call 方法执行 Redis 相关命令,其中 KEYS 与 ARGV 是通过参数传递的值,值得注意的是其索引从 1 开始。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end修改 Java 解锁操作,使用 Lua 脚本保证其原子性。
try {
// ...
} finally {
// 使用 Lua 脚本原子化删除自己持有的锁
String luaScript = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
""";
stringRedisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), List.of(redisLockKey), redisLockValue);
}可重入锁
以上已经基于 SETNX 实现了相对较完善的分布式锁,但还不满足可重入性,当同一个线程第二次进行 SETNX 时由于第一把锁未解开,导致出现死锁现象。
要实现一把可重入锁 SETNX 已经不满足需求了,可重入锁在原先的基础上还需要能够记录同一线程加锁的次数,可以使用 HSET 实现。
{
"redisLock": {
"uuid:threadId": "count"
}
}在加锁时,需要判断锁是否存在,只有不存在时才能对其上锁;若锁已存在,则判断是否为自己持有的锁,需要将值自增。
这多个操作必须使用 Lua 脚本保证其原则性。
if redis.call("exists", KEYS[1]) == 0 then
redis.call("hset", KEYS[1], KEYS[2], 1)
redis.call("expire", KEYS[1], ARGV[1])
return 1
elseif redis.call("hexists", KEYS[1], KEYS[2]) == 1 then
redis.call("hincrby", KEYS[1], KEYS[2], 1)
redis.call("expire", KEYS[1], ARGV[1])
return 1
else
return 0
end由于 HINCRBY 自增操作,当 Key 不存在时会有隐式的 SET 操作,所以可以将以上脚本简化。
if redis.call("exists", KEYS[1]) == 0 or redis.call("hexists", KEYS[1], KEYS[2]) == 1 then
redis.call("hincrby", KEYS[1], KEYS[2], 1)
redis.call("expire", KEYS[1], ARGV[1])
return 1
else
return 0
end解锁时反向操作,若锁不存在直接返回,存在则自减,当值自减到 0 时将其删除。
if redis.call("hexists", KEYS[1], KEYS[2]) == 0 then
return nil
elseif redis.call("hincrby", KEYS[1], KEYS[2], -1) == 0 then
return redis.call("del", KEYS[1])
else
return 0
end封装分布式锁
遵循 Java 开发规范,既然要封装一把锁,就应该实现 java.util.concurrent.locks.Lock 接口。
public interface Lock {
// 获取锁
void lock();
// 除非当前线程被中断,否则获取锁
void lockInterruptibly() throws InterruptedException;
// 仅在调用时锁是空闲的情况下才获取锁
boolean tryLock();
// 如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 解锁
void unlock();
// 返回一个绑定到此 Lock 实例的新 Condition 实例
Condition newCondition();
}目前先不考虑中断与获取新实例,基于 Redis 的分布锁很自然的需要能够与 Redis 交互,并确定分布式锁使用的 Key 以及过期时间。
public class RedisDistributedLock implements Lock {
private final StringRedisTemplate stringRedisTemplate;
private final String redisLockKey;
private final String redisLockField;
private final long expireTime;
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate) {
this(stringRedisTemplate, "redisLock");
}
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String redisLockKey) {
this(stringRedisTemplate, redisLockKey,
String.format("%s:%s", UUID.randomUUID(), Thread.currentThread().getId()));
}
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate,
String redisLockKey, String redisLockField) {
this(stringRedisTemplate, redisLockKey, redisLockField, 30L);
}
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate,
String redisLockKey, String redisLockField, long expireTime) {
this.stringRedisTemplate = stringRedisTemplate;
this.redisLockKey = redisLockKey;
this.redisLockField = redisLockField;
this.expireTime = expireTime;
}
}按照上文分析可重入锁加/解锁需要配合 Lua 脚本保证原子性。
在 Lock 接口中 tryLock 方法规定在指定时间内获得到锁,可以假设传递 -1 时将无时间限制尝试加锁。
@Override
public void lock() {
tryLock();
}
@Override
public boolean tryLock() {
try {
return tryLock(-1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time == -1) {
String luaScript = """
if redis.call("exists", KEYS[1]) == 0 or redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
redis.call("hincrby", KEYS[1], ARGV[1], 1)
redis.call("expire", KEYS[1], ARGV[2])
return 1
else
return 0
end
""";
while (!stringRedisTemplate.execute(
new DefaultRedisScript<>(luaScript, Boolean.class),
List.of(redisLockKey), redisLockField, String.valueOf(expireTime))) {
TimeUnit.MILLISECONDS.sleep(20);
}
return true;
}
return false;
}
@Override
public void unlock() {
String luaScript = """
if redis.call("hexists", KEYS[1], ARGV[1]) == 0 then
return nil
elseif redis.call("hincrby", KEYS[1], ARGV[1], -1) == 0 then
return redis.call("del", KEYS[1])
else
return 0
end
""";
stringRedisTemplate.execute(
new DefaultRedisScript<>(luaScript, Boolean.class),
List.of(redisLockKey), redisLockField);
}单例工厂模式
封装好分布式锁后,若每次需要锁都单独 New 一个实例会因为每个实例生成的 UUID 不相同导致锁无法重入,再次陷入死锁。
可以使用单例工厂模式,只生成一次 UUID 确保锁的可重入性,搭配线程 id 确保 Key 的唯一性。
@Component
public class DistributedLockFactory {
@Resource
private StringRedisTemplate stringRedisTemplate;
private final UUID uuid = UUID.randomUUID();
public Lock getInstance() {
return new RedisDistributedLock(stringRedisTemplate, "redisLock",
String.format("%s:%s", uuid, Thread.currentThread().getId()));
}
}一切返璞归真,现在又可能像调用 JVM 锁那么简单的调用分布式锁了。
@Service
public class InventoryServiceImpl implements InventoryService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private DistributedLockFactory distributedLockFactory;
@Override
public String sale() {
// 获取分布式锁
Lock lock = distributedLockFactory.getInstance();
// 加锁
lock.lock();
try {
String key = "inventory";
// 查询库存
String result = stringRedisTemplate.opsForValue().get(key);
int inventoryNum = result == null ? 0 : Integer.parseInt(result);
if (inventoryNum > 0) {
// 扣减库存
Long left = stringRedisTemplate.opsForValue().decrement(key);
return String.format("成功卖出一个商品,库存剩余:%d", left);
}
} finally {
// 解锁
lock.unlock();
}
return "商品已售罄";
}
}自动续期
前面已经分析过,为了避免其中一台微服务宕机导致陷入死锁,于是加入了锁生存时间;为了避免锁超时后被其他微服务误删,于是加入了防误删机制。
考虑介于这两者之间还存在一种情况,如果其中一个微服务因为网络延迟等原因,在临界区的处理时间超过了锁的生存时间,服务器并没有宕机。在锁过期的一瞬间,其他微服务将开始抢夺锁的持有权并进入临界区,此时出现了两个线程同时进入临界区的情况。
为了避免这种情况,还需要引入“看门狗”每隔一段时间判断是否为自己持有的锁,并为其续期。
if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end在 Java 中可以使用一个定时任务完成该操作,并在加锁成功后自动启动该定时任务。
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time == -1) {
// ...
resetExpire();
return true;
}
return false;
}
private void resetExpire() {
String luaScript = """
if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
""";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (stringRedisTemplate.execute(
new DefaultRedisScript<>(luaScript, Boolean.class),
List.of(redisLockKey), redisLockField, String.valueOf(expireTime))) {
// 递归再次续期
resetExpire();
}
}
}, (this.expireTime * 1000) / 3);
}