残云cyun
残云cyun
发布于 2025-10-30 / 15 阅读
0
0

手写 Redis 分布式锁

锁与分布式锁

操作系统的概念中,”锁“是在并发环境下的一种同步机制,控制共享资源的并发访问,以确保数据的一致性与正确性。

flowchart LR 进入区 --> 临界区 临界区 --> 退出区 退出区 --> 剩余区

在分布式的场景下,每个微服务都会独立启动一个 JVM,这时再使用 JVM 锁(如 Synchronized)将不再有效。

要实现一个可靠的分布式锁需要具备以下几个条件:

  • 独占性:在任何时刻,只能有一个线程持有锁

  • 高可用:不能因为某个 Redis 节点宕机,导致微服务加锁和解锁失败

  • 防死锁:必须有超时解锁机制,避免微服务在临界区突然宕机,导致永远无法解锁

  • 安全性:锁只能被持有者解锁,不能被其他微服务解锁

  • 可重入性:同一个节点同一个线程,持有锁后可以再次上锁

JVM 锁

场景搭建

首先,在 Redis 中记录有商品库存量,每当商品被销售时库存自减。

127.0.0.1:6379> set inventory 1000
OK
127.0.0.1:6379> decr inventory
(integer) 999
127.0.0.1:6379> decr inventory
(integer) 998

然后,在 Java 中写一个服务,模拟销售商品的动作。

tips: 为了与后续代码结构保持一致,这里使用 ReentrantLock 而不是 Synchronized。

@Service
public class InventoryServiceImpl implements InventoryService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    private final Lock lock = new ReentrantLock();

    @Override
    public String sale() {
        lock.lock();
        try {
            String key = "inventory";
            // 查询库存
            String result = stringRedisTemplate.opsForValue().get(key);
            int inventoryNum = result == null ? 0 : Integer.parseInt(result);
            if (inventoryNum > 0) {
                // 扣减库存
                Long left = stringRedisTemplate.opsForValue().decrement(key);
                return String.format("成功卖出一个商品,库存剩余:%d", left);
            }
        } finally {
            lock.unlock();
        }
        return "商品已售罄";
    }
}

接着,为了复现分布式场景下存在的问题,将启动两个微服务并配合 Nginx 负载均衡。

upstream services {
    server localhost:8000 weight=1;
    server localhost:9000 weight=1;
}

server {
    listen 8888;
    server_name localhost;

    location / {
        proxy_pass http://services;
    }
}

最后,使用 JMeter 启动 1500 个线程(大于库存数量)向 Nginx 发送请求,结束后查询 Redis 库存量发现是 -1,出现了商品超卖现象。

127.0.0.1:6379> get inventory
"-1"

问题分析

前面在代码中已经使用 ReentrantLock 对销售动作加锁了,为什么还会出现超卖现象呢?

究其根本,无论是 ReentrantLock 还是 Synchronized 都是 JVM 内部的锁机制,当启动多个微服务时每个服务都会持有一把独立的锁,不会影响到其他微服务。

flowchart LR C[Client] N[Nginx :8888] A[Service :8000] B[Service :9000] C --> N N --> A & B A --> L1([Lock]) B --> L2([Lock]) L1 & L2 --> Redis

为什么结果库存是 -1 而不是其他值?

虽然 JVM 锁无法影响其他微服务,但是单体的锁还是生效的,即同一时刻只有一个线程会访问 Redis。另外,由于 Redis 工作线程是单线程且 decr 本身也是原子操作,就不存在产品被重复销售的问题。

因为两个微服务持有的锁不是同一把,会导致在最后时刻两个微服务请求 Redis 获取库存量时,可能都返回了还有 1 个库存,结果导致商品超售。随着微服务数量提升,超卖数量会更加严重。

sequenceDiagram participant A as Service:8000 participant B as Service:9000 A ->> +A: lock A ->> Redis: get inventory Redis -->> A: "1" B ->> +B: Lock B ->> Redis: get inventory Redis -->> B: "1" A ->> Redis: decr inventory Note over Redis: inventory = 0 A ->> -A: unlock B ->> Redis: decr inventory Note over Redis: inventory = -1 B ->> -B: unlock

Redis 分布式锁

既然知道了单机版锁的问题所在,那么将锁从 JVM 内部移动到第三方服务中,让所有微服务竞争同一把锁就行了。

flowchart LR C[Client] N[Nginx :8888] A[Service :8000] B[Service :9000] C --> N N --> A & B A & B --> L([Redis Lock]) L --> R[Redis Cache]

在 Redis 中,可以使用 SETNX 命令来抢锁,该命令只有在 Key 不存在时才能设置成功,若 Key 已存在则返回 0 代表抢锁失败。反之,解锁只需要将 Key 删除即可。

SETNX key value

修改前面的代码,将 ReentrantLock 改成自旋锁,若加锁失败则休眠 20 ms 再次尝试加锁,避免 CPU 不停空转。

@Service
public class InventoryServiceImpl implements InventoryService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public String sale() {
        String redisLockKey = "redisLock";
        // 自旋锁,重复尝试加锁
        while(Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, "1"))) {
            try {
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        try {
            String key = "inventory";
            // 查询库存
            String result = stringRedisTemplate.opsForValue().get(key);
            int inventoryNum = result == null ? 0 : Integer.parseInt(result);
            if (inventoryNum > 0) {
                // 扣减库存
                Long left = stringRedisTemplate.opsForValue().decrement(key);
                return String.format("成功卖出一个商品,库存剩余:%d", left);
            }
        } finally {
            // 最后一定要解锁
            stringRedisTemplate.delete(redisLockKey);
        }
        return "商品已售罄";
    }
}

超时解锁

至此,已经基于 SETNX 实现了一个最简单的分布式锁。但是,在分布式的场景中,如果有一个微服务在加锁后,还未解锁就宕机了,会发生什么?

sequenceDiagram participant A as Service:8000 participant B as Service:9000 participant L as Redis Lock participant R as Redis Cache A ->> L: setnx L -->> A: true loop B ->> L: setnx L -->> B: false B ->> B: sleep end A ->> R: process A --x A: crash

答案是会产生死锁,其他微服务永远都无法得到也无法解开这个锁。

为了避免因微服务宕机而导致死锁尝试,在获取锁时必须给 Key 设置过期时间!!若微服务突然宕机,等待超时后自动将 Key 移除,其他微服务可以继续工作。

127.0.0.1:6379> setnx redisLock 1
(integer) 1
127.0.0.1:6379> expire redisLock 30
(integer) 1

像这样使用两条命令,也有可能会在上锁后未设置过期时间服务器就宕机了,正确做法是合并成一个原子操作。

127.0.0.1:6379> set redisLock 1 NX EX 30
OK
127.0.0.1:6379> set redisLock 1 NX EX 30
(nil)

在 Java 代码中添加过期时间

while(Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, "1", 10, TimeUnit.SECONDS))) {
    try {
        TimeUnit.MILLISECONDS.sleep(20);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

防误删

当分布式锁加入过期时间后,又出现了一个新的问题。假设, 其中一个微服务 A 因为网络延迟等原因(微服务 A 并没有宕机)处理时间超过了分布式锁的生存时间。

这时其他服务会立刻开始抢夺锁并进入临界区,随后,微服务 A 处理完成执行了解锁操作,导致微服务 A 解开了其他微服务上的锁。

sequenceDiagram participant A as Service:8000 participant B as Service:9000 participant L as Redis Lock participant R as Redis Cache A ->> L: setnx L -->> A: true Note over L: lock activate L A ->> R: process L ->> L: timeout Note over L: unlock deactivate L B ->> L: setnx L -->> B: true Note over L: lock activate L B ->> R: process A ->> L: del lock Note over L: unlock deactivate L

这就需要加入防误删机制,每个线程都只能删除自己持有的锁。在 Java 中可以生成一个 UUID 并加上当前的线程 ID 作为分布式锁的值。

@Override
public String sale() {
    String redisLockKey = "redisLock";
    // 将 UUID:ThreadId 作为分布式锁的值
    String redisLockValue = String.format("%s:%s", UUID.randomUUID(), Thread.currentThread().getId());
    while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue()
            .setIfAbsent(redisLockKey, redisLockValue, 10, TimeUnit.SECONDS))) {
        // ...
    }

    try {
        // ...
    } finally {
        // 只能删除自己持有的锁
        if (Objects.requireNonNull(stringRedisTemplate.opsForValue().get(redisLockKey)).equalsIgnoreCase(redisLockValue)) {
            stringRedisTemplate.delete(redisLockKey);
        }
    }
    return "商品已售罄";
}

这么写好像很合理,先从 Redis 中获取锁的值,再判断是不是自己持有的锁,只删除自己持有的锁。

但是,这里与 Redis 交互了两次,考虑一个极端的情况:当微服务 A 获取到锁的值后,正好锁过期了,然后微服务 B 开始上锁,紧接着微服务 A 向 Redis 发送了解锁命令。

sequenceDiagram participant A as Service:8000 participant B as Service:9000 participant L as Redis Lock participant R as Redis Cache A ->> L: get L -->> A: value L ->> L: timeout Note over L: unlock B ->> L: setnx L -->> B: true Note over L: lock A ->> L: del Note over L: unlock

又再次出现了锁误删的情况,原因是判断锁与删除锁不是一套原子操作,这是完全不同的两条命令无法像上面 SETNX 一样一条命令完成加锁与设置过期时间两个操作。

基于此,Redis 提供了 EVAL 命令结合 Lua 脚本,可以将一系列操作原子化。

EVAL script numkeys [key [key ...]] [arg [arg ...]]

在 Lua 脚本中可以使用 redis.call 方法执行 Redis 相关命令,其中 KEYSARGV 是通过参数传递的值,值得注意的是其索引从 1 开始

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

修改 Java 解锁操作,使用 Lua 脚本保证其原子性。

try {
    // ...
} finally {
    // 使用 Lua 脚本原子化删除自己持有的锁
    String luaScript = """
            if redis.call("get",KEYS[1]) == ARGV[1] then
                return redis.call("del",KEYS[1])
            else
                return 0
            end
            """;
    stringRedisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), List.of(redisLockKey), redisLockValue);
}

可重入锁

以上已经基于 SETNX 实现了相对较完善的分布式锁,但还不满足可重入性,当同一个线程第二次进行 SETNX 时由于第一把锁未解开,导致出现死锁现象。

要实现一把可重入锁 SETNX 已经不满足需求了,可重入锁在原先的基础上还需要能够记录同一线程加锁的次数,可以使用 HSET 实现。

{
    "redisLock": {
        "uuid:threadId": "count"
    }
}

在加锁时,需要判断锁是否存在,只有不存在时才能对其上锁;若锁已存在,则判断是否为自己持有的锁,需要将值自增。

这多个操作必须使用 Lua 脚本保证其原则性。

if redis.call("exists", KEYS[1]) == 0 then
    redis.call("hset", KEYS[1], KEYS[2], 1)
    redis.call("expire", KEYS[1], ARGV[1])
    return 1
elseif redis.call("hexists", KEYS[1], KEYS[2]) == 1 then
    redis.call("hincrby", KEYS[1], KEYS[2], 1)
    redis.call("expire", KEYS[1], ARGV[1])
    return 1
else
    return 0
end

由于 HINCRBY 自增操作,当 Key 不存在时会有隐式的 SET 操作,所以可以将以上脚本简化。

if redis.call("exists", KEYS[1]) == 0 or redis.call("hexists", KEYS[1], KEYS[2]) == 1 then
    redis.call("hincrby", KEYS[1], KEYS[2], 1)
    redis.call("expire", KEYS[1], ARGV[1])
    return 1
else
    return 0
end

解锁时反向操作,若锁不存在直接返回,存在则自减,当值自减到 0 时将其删除。

if redis.call("hexists", KEYS[1], KEYS[2]) == 0 then
    return nil
elseif redis.call("hincrby", KEYS[1], KEYS[2], -1) == 0 then
    return redis.call("del", KEYS[1])
else
    return 0
end

封装分布式锁

遵循 Java 开发规范,既然要封装一把锁,就应该实现 java.util.concurrent.locks.Lock 接口。

public interface Lock {
    // 获取锁
    void lock();
    // 除非当前线程被中断,否则获取锁
    void lockInterruptibly() throws InterruptedException;
    // 仅在调用时锁是空闲的情况下才获取锁
    boolean tryLock();
    // 如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    // 解锁
    void unlock();
    // 返回一个绑定到此 Lock 实例的新 Condition 实例
    Condition newCondition();
}

目前先不考虑中断与获取新实例,基于 Redis 的分布锁很自然的需要能够与 Redis 交互,并确定分布式锁使用的 Key 以及过期时间。

public class RedisDistributedLock implements Lock {

    private final StringRedisTemplate stringRedisTemplate;

    private final String redisLockKey;

    private final String redisLockField;

    private final long expireTime;

    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate) {
        this(stringRedisTemplate, "redisLock");
    }

    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String redisLockKey) {
        this(stringRedisTemplate, redisLockKey,
                String.format("%s:%s", UUID.randomUUID(), Thread.currentThread().getId()));
    }

    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate,
                                String redisLockKey, String redisLockField) {
        this(stringRedisTemplate, redisLockKey, redisLockField, 30L);
    }

    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate,
                                String redisLockKey, String redisLockField, long expireTime) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.redisLockKey = redisLockKey;
        this.redisLockField = redisLockField;
        this.expireTime = expireTime;
    }
}

按照上文分析可重入锁加/解锁需要配合 Lua 脚本保证原子性。

在 Lock 接口中 tryLock 方法规定在指定时间内获得到锁,可以假设传递 -1 时将无时间限制尝试加锁。

@Override
public void lock() {
    tryLock();
}

@Override
public boolean tryLock() {
    try {
        return tryLock(-1, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    if (time == -1) {
        String luaScript = """
                if redis.call("exists", KEYS[1]) == 0 or redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
                    redis.call("hincrby", KEYS[1], ARGV[1], 1)
                    redis.call("expire", KEYS[1], ARGV[2])
                    return 1
                else
                    return 0
                end
                """;
        while (!stringRedisTemplate.execute(
                new DefaultRedisScript<>(luaScript, Boolean.class),
                List.of(redisLockKey), redisLockField, String.valueOf(expireTime))) {
            TimeUnit.MILLISECONDS.sleep(20);
        }
        return true;
    }
    return false;
}

@Override
public void unlock() {
    String luaScript = """
            if redis.call("hexists", KEYS[1], ARGV[1]) == 0 then
                return nil
            elseif redis.call("hincrby", KEYS[1], ARGV[1], -1) == 0 then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
            """;
    stringRedisTemplate.execute(
            new DefaultRedisScript<>(luaScript, Boolean.class),
            List.of(redisLockKey), redisLockField);
}

单例工厂模式

封装好分布式锁后,若每次需要锁都单独 New 一个实例会因为每个实例生成的 UUID 不相同导致锁无法重入,再次陷入死锁。

可以使用单例工厂模式,只生成一次 UUID 确保锁的可重入性,搭配线程 id 确保 Key 的唯一性。

@Component
public class DistributedLockFactory {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    private final UUID uuid = UUID.randomUUID();

    public Lock getInstance() {
        return new RedisDistributedLock(stringRedisTemplate, "redisLock",
                String.format("%s:%s", uuid, Thread.currentThread().getId()));
    }
}

一切返璞归真,现在又可能像调用 JVM 锁那么简单的调用分布式锁了。

@Service
public class InventoryServiceImpl implements InventoryService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private DistributedLockFactory distributedLockFactory;

    @Override
    public String sale() {
        // 获取分布式锁
        Lock lock = distributedLockFactory.getInstance();
        // 加锁
        lock.lock();
        try {
            String key = "inventory";
            // 查询库存
            String result = stringRedisTemplate.opsForValue().get(key);
            int inventoryNum = result == null ? 0 : Integer.parseInt(result);
            if (inventoryNum > 0) {
                // 扣减库存
                Long left = stringRedisTemplate.opsForValue().decrement(key);
                return String.format("成功卖出一个商品,库存剩余:%d", left);
            }
        } finally {
            // 解锁
            lock.unlock();
        }
        return "商品已售罄";
    }
}

自动续期

前面已经分析过,为了避免其中一台微服务宕机导致陷入死锁,于是加入了锁生存时间;为了避免锁超时后被其他微服务误删,于是加入了防误删机制。

考虑介于这两者之间还存在一种情况,如果其中一个微服务因为网络延迟等原因,在临界区的处理时间超过了锁的生存时间,服务器并没有宕机。在锁过期的一瞬间,其他微服务将开始抢夺锁的持有权并进入临界区,此时出现了两个线程同时进入临界区的情况。

sequenceDiagram participant A as Service:8000 participant B as Service:9000 participant L as Redis Lock participant R as Redis Cache A ->> L: setnx L -->> A: true Note over L: lock A ->> R: process activate A L ->> L: timeout Note over L: unlock B ->> L: setnx L -->> B: true Note over L: lock B ->> R: process Note over A,B: 同时进入临界区 deactivate A

为了避免这种情况,还需要引入“看门狗”每隔一段时间判断是否为自己持有的锁,并为其续期。

if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then
    return redis.call('expire', KEYS[1], ARGV[2])
else
    return 0
end

在 Java 中可以使用一个定时任务完成该操作,并在加锁成功后自动启动该定时任务。

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    if (time == -1) {
        // ...
        resetExpire();
        return true;
    }
    return false;
}

private void resetExpire() {
    String luaScript = """
            if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then
                return redis.call('expire', KEYS[1], ARGV[2])
            else
                return 0
            end
            """;
    new Timer().schedule(new TimerTask() {
        @Override
        public void run() {
            if (stringRedisTemplate.execute(
                    new DefaultRedisScript<>(luaScript, Boolean.class),
                    List.of(redisLockKey), redisLockField, String.valueOf(expireTime))) {
                // 递归再次续期
                resetExpire();
            }
        }
    }, (this.expireTime * 1000) / 3);
}

参考资料

https://www.bilibili.com/video/BV13R4y1v7sP?p=129https://redis.io/docs/latest/develop/clients/patterns/distributed-locks/


评论