分布式锁

分布式锁是用于协调分布式系统中多个进程服务互斥访问共享资源的机制。其核心目标是确保在分布式环境下,同一时刻只有一个客户端能操作共享资源(如数据库、文件等),以解决并发导致的数据不一致问题 。

核心特性

  1. 互斥性:同一时刻仅有一个客户端持有锁,其他请求需等待或失败 。
  2. 超时机制:锁自动过期释放,避免因客户端故障导致死锁(如 Redis 的 PX 参数) 。
  3. 可重入性:同一线程可重复获取同一把锁(如 Redisson 通过 Hash 结构记录线程标识和重入次数) 。
  4. 高可用与容错:锁服务需支持集群部署,避免单点故障(如 Redis 的 RedLock 算法、ZooKeeper 集群) 。

常见实现方式

  1. 基于数据库

    • 唯一约束:通过插入唯一键(如业务单号)实现加锁,删除记录释放锁 。
    • 悲观锁:使用 SELECT ... FOR UPDATE 锁定数据行 。
    缺点:性能低、死锁风险高,仅适用于低并发场景 。
  2. 基于 Redis
  • SETNX + EXPIRE
    SET key value NX PX 3000 原子性加锁,Lua 脚本保证解锁原子性 。
  • Redisson 框架
    支持可重入锁、看门狗自动续期(默认每 10 秒重置超时)、订阅机制实现阻塞等待 。

    缺点:主从切换时可能丢锁(RedLock 算法通过多节点投票解决) 。
  1. 基于 ZooKeeper
  • 临时顺序节点
    客户端创建临时顺序节点,仅最小序号节点获锁;其他节点监听前序节点释放事件 。
  • Curator 框架
    封装 InterProcessMutex 实现可重入锁,节点断开自动删除临时节点 。
缺点:性能低于 Redis,但强一致性和顺序性更可靠 。

简易锁

以Spring Data Redis为例,用RedisTemplate来操作Redis

// 加锁
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
    return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}

// 解锁,防止删错别人的锁,以uuid为value校验是否自己的锁
public void unlock(String lockName, String uuid) {
    if(uuid.equals(redisTemplate.opsForValue().get(lockName))
       {        
         redisTemplate.opsForValue().del(lockName);    
       }
}

这是一个简易的锁,但是加锁和删锁不是原子性的,并发一旦大了,无法保证线程安全

  • 线程A执行get操作,获取到锁的值"uuidA",判断与自己持有的uuid相等,准备执行del
  • 此时,锁的过期时间到了(或者因为其他原因,比如Redis的过期策略),锁自动失效(即被Redis删除)。
  • 线程B此时尝试获取锁"lock1",由于锁已失效,线程B成功获取锁(setIfAbsent成功),设置值为"uuidB"。
  • 线程A继续执行del操作,删除了锁"lock1"(此时删除的是线程B刚设置的锁)。
  • 结果:线程B的锁被线程A删除,导致线程B在持有锁的过程中,锁被意外删除,其他线程(比如线程C)可能又会获得锁,导致多个线程同时进入临界区。

Lua脚本保证原子性

Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval/evalsha命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作

if redis.call('get', KEYS[1]) == ARGV[1] 
    then 
 -- 执行删除操作
        return redis.call('del', KEYS[1]) 
    else 
 -- 不成功,返回0
        return 0 
end
// 解锁脚本
DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));
 
// 执行lua脚本解锁
redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);

而这种写法,同一个线程不允许获取同一把锁,在有些场景不适用,例如

public void methodA() {
    lock.lock(); // 第一次加锁
    try {
        // 业务逻辑1...
        methodB(); // 调用methodB
    } finally {
        lock.unlock(); // 最终释放锁(需确保只释放一次)
    }
}
 
public void methodB() {
    lock.lock(); // 嵌套调用,需要再次加锁
    try {
        // 业务逻辑2...
    } finally {
        lock.unlock();
    }
}

如果锁是不可重入的,当methodA获取锁后调用methodB时,methodB尝试再次加锁会被阻塞(因为锁已被当前线程持有),导致死锁,由此引出可重入锁

可重入锁

Redisson的可重入锁通过自动维护重入次数解决了这一问题:

  • 当同一线程首次获取锁时,重入次数初始化为1;
  • 若同一线程再次获取同一把锁(如嵌套调用),重入次数递增(如2、3...);
  • 释放锁时,重入次数递减,直到次数为0时才真正释放锁(通知Redis删除锁)。

开发者无需手动维护重入次数,锁的获取和释放逻辑与普通单次加锁一致,大大降低了复杂度。

由此,我们可以改造lua脚本

  1. redis存储锁名称,该锁线程id和对应线程进入次数
  2. 每次线程获取锁时,判断是否获取过锁,若获取过,次数+1
  3. 解锁时,判断是否已存在该锁,有则-1,然后判断次数是否为0,若为0则删除该锁

存储结构

为了方便维护这个对象,我们用Hash结构来存储这些字段。Redis的Hash类似Java的HashMap,适合存储对象

lockname   锁名称
    key1:   threadId   唯一键,线程id
    value1:  count     计数器,记录该线程获取锁的次数

计数器

当同一个线程获取同一把锁时,我们需要对对应线程的计数器count做加减

判断一个redis key是否存在,可以用exists,而判断一个hash的key是否存在,可以用hexists

而redis也有hash自增的命令hincrby,每次自增1时 hincrby lockname1 threadId 1,自减1时 hincrby lockname1 threadId -1

解锁

加锁部分代码

local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];
 
-- lockname不存在
if(redis.call('exists', key) == 0) then
    redis.call('hset', key, threadId, '1');
    redis.call('expire', key, releaseTime);
    return 1;
end;
 
-- 当前线程已id存在
if(redis.call('hexists', key, threadId) == 1) then
    redis.call('hincrby', key, threadId, '1');
    redis.call('expire', key, releaseTime);
    return 1;
end;
return 0;

解锁部分代码

local key = KEYS[1];
local threadId = ARGV[1];
 
-- lockname、threadId不存在
if (redis.call('hexists', key, threadId) == 0) then
    return nil;
end;
 
-- 计数器-1
local count = redis.call('hincrby', key, threadId, -1);
 
-- 删除lock
if (count == 0) then
    redis.call('del', key);
    return nil;
end;

这种写法在面临一些场景时依旧会出现问题

  • A进程在获取到锁的时候,因业务操作时间太长,锁释放了但是业务还在执行,而此刻B进程又可以正常拿到锁做业务操作,两个进程操作就会存在依旧有共享资源的问题
  • 如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态

所以我们希望在这种情况时,可以延长锁的releaseTime延迟释放锁来直到完成业务期望结果,这种不断延长锁过期时间来保证业务执行完成的操作就是锁续约,由此引出Redisson分布式锁

Redisson介绍

Redisson是一个高级的分布式协调Redis客服端,能帮助用户在分布式环境中轻松实现一些Java的对象。Redisson、Jedis、Lettuce 是三个常用的操作 Redis 的客户端,Jedis、Lettuce 的 API 更侧重对 Reids 数据库的 CRUD(增删改查);而 Redisson API 侧重于分布式开发,提供了多种分布式Java对象和服务,包括分布式锁、分布式集合、分布式对象、分布式限流器、可重入锁等。

安装依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

配置

@Configuration
public class RedissionConfig {
    @Value("${spring.redis.host}")
    private String redisHost;
 
    @Value("${spring.redis.password}")
    private String password;
 
    private int port = 6379;
 
    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        config.useSingleServer().
                setAddress("redis://" + redisHost + ":" + port).
                setPassword(password);
        config.setCodec(new JsonJacksonCodec());
        return Redisson.create(config);
    }
}

启用

@Resource
private RedissonClient redissonClient;
 
RLock rLock = redissonClient.getLock(lockName);
try {
    boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
    if (isLocked) {
        // TODO
                }
    } catch (Exception e) {
            rLock.unlock();
    }

redissonClient是Redisson的核心客户端实例,用于连接Redis服务器并操作分布式对象

getLock(lockName)通过客户端获取一个分布式锁对象(RLock),lockName是锁的唯一标识符

RLock接口继承自java.util.concurrent.locks.Lock,提供分布式环境下的可重入锁能力

RLock

从RLock进入,找到RedissonLock类,找到tryLock方法再递进到干事的tryAcquireOnceAsync方法

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
   if (leaseTime != -1L) {
      return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
   } 
   else {
      RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
      ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
         if (e == null) {
            if (ttlRemaining == null) {
               this.scheduleExpirationRenewal(threadId);
            }
         }
      });
      return ttlRemainingFuture;
   }
}

分析有leaseTime时间判断的2个分支

加锁

先分析有过期时间tryLockInnerAsync部分

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS
[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(this.getName()),
            new Object[] { this.internalLockLeaseTime, this.getLockName(threadId) });
}

观察可得,这里返回的是用eval命令执行Lua脚本,此处的脚本展开得

-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then 
  -- 新增该锁并且hash中该线程id对应的count置1
  redis.call('hincrby', KEYS[1], ARGV[2], 1); 
  -- 设置过期时间
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil; 
end; 
 
-- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
  -- 线程重入次数++
  redis.call('hincrby', KEYS[1], ARGV[2], 1); 
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil; 
end; 
return redis.call('pttl', KEYS[1]);

​ 分析参数可得

// 锁名称
KEYS[1] = Collections.singletonList(this.getName())
// 锁失效时间
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId组合的唯一值
ARGV[2] = this.getLockName(threadId)
  • 判断该锁是否已经有对应hash表存在,
  • 没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime
  • 存在对应的hash表:则将该lockName的value执行+1操作,也就是计算进入次数,再设置失效时间leaseTime
  • 最后返回这把锁的ttl剩余时间

解锁

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], 
-1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;",
            Arrays.asList(this.getName(), this.getChannelName()),
            new Object[] { LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId) });
}
-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
  return nil;
end;
-- 计数器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
  -- 过期时间重设
  redis.call('pexpire', KEYS[1], ARGV[2]); 
  return 0; 
else
  -- 删除并发布解锁消息
  redis.call('del', KEYS[1]); 
  redis.call('publish', KEYS[2], ARGV[1]); 
  return 1;
end; 
return nil;
  • 如果该锁不存在则返回nil;
  • 如果该锁存在则将其线程的hash key计数器-1,
  • 计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;

分析可得,该lua有两个key,即Arrays.asList(this.getName(), this.getChannelName())

  1. name 锁名称
  2. channelName,用于pubSub发布消息的channel名称

ARGV变量有三个LockPubSub.UNLOCK_MESSAGE,internalLockLeaseTimegetLockName(threadId)

  1. LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0
  2. internalLockLeaseTime,watchDog配置的超时时间,默认为30s
  3. lockName,这里的lockName指的是uuid和threadId组合的唯一值

续约

其中unLock的时候使用到了Redis发布订阅PubSub完成消息通知。

而订阅的步骤就在RedissonLock的加锁入口的lock方法里

long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);//尝试获取分布式锁,返回锁的剩余存活时间
        if (ttl != null) {
            // 订阅
            RFuture<RedissonLockEntry> future = this.subscribe(threadId);//向 Redis 订阅当前锁的释放事件
            if (interruptibly) {//是否可中断
                this.commandExecutor.syncSubscriptionInterrupted(future);
            } else {
                this.commandExecutor.syncSubscription(future);
            }

当锁被其他线程占用时,通过监听锁的释放通知(在其他线程通过RedissonLock释放锁时,会通过发布订阅pub/sub功能发起通知),等待锁被其他线程释放

进入LockPubSub,这里只有一个明显的监听方法onMessage,其订阅和信号量的释放都在父类PublishSubscribe,我们只关注监听事件的实际操作

protected void onMessage(RedissonLockEntry value, Long message) {
    Runnable runnableToExecute;
    if (message.equals(unlockMessage)) {
        // 从监听器队列取监听线程执行监听回调
        runnableToExecute = (Runnable) value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        // getLatch()返回的是Semaphore,信号量,此处是释放信号量
        // 释放信号量后会唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁
        value.getLatch().release();
    } else if (message.equals(readUnlockMessage)) {
        while (true) {
            runnableToExecute = (Runnable) value.getListeners().poll();
            if (runnableToExecute == null) {
                value.getLatch().release(value.getLatch().getQueueLength());
                break;
            }
            runnableToExecute.run();
        }
    }
}

LockPubSub监听最终执行了2件事

  1. runnableToExecute.run() 执行监听回调
  2. value.getLatch().release(); 释放信号量

Redisson通过LockPubSub监听解锁消息,执行监听回调和释放信号量通知等待线程可以重新抢锁。

这时再回来看tryAcquireOnceAsync另一分支

else {
  RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime,
          this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
          TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
      if (e == null) {
          if (ttlRemaining) {
              this.scheduleExpirationRenewal(threadId);
          }
      }
  });

可以看到,无过期时间时,在执行加锁操作后,还执行了一段逻辑

ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                    if (ttlRemaining) {
                        this.scheduleExpirationRenewal(threadId);
                    }
                }
            })

执行异步加锁的操作后,加锁成功则根据加锁完成返回的ttl是否过期来确认是否执行一段定时任务

查看RedissonLock.this.scheduleExpirationRenewal(threadId)

private void scheduleExpirationRenewal(long threadId) {
    // 1. 创建锁续期任务入口
    ExpirationEntry entry = new ExpirationEntry();
    // 2. 原子操作:注册续期任务到全局Map(KEY=锁名称)
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // 3. 重入场景:同一线程多次获取锁时,仅记录线程ID
        oldEntry.addThreadId(threadId);
    } else {
        // 4. 首次加锁:记录线程ID并启动续期任务
        entry.addThreadId(threadId);
        renewExpiration(); // 核心续期逻辑
    }
}
 
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) return; // 锁已释放
    
    // 1. 创建定时任务(默认间隔 = 锁超时时间/3,即10秒)
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) return;
            
            // 2. 获取锁持有者线程ID(首次加锁线程)
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) return;
            
            // 3. 异步续期:通过Lua脚本延长锁超时时间
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("续期失败", e); // 网络异常等
                } else if (res) {
                    renewExpiration(); // 递归调用实现循环续期
                } // else: 锁已被释放或线程失去所有权
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 默认30s/3=10s
    
    ee.setTimeout(task); // 绑定任务到ExpirationEntry
}

renewExpirationAsync 的Lua如下

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }
 
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return 1; 
end; 
return 0;

因为锁的失效时间是30s,当10s之后,此时这个timeTask 就触发了,他就去进行续约,把当前这把锁续约成30s,如果操作成功,那么此时就会递归调用自己,再重新设置一个timeTask(),于是再过10s后又再设置一个timerTask,完成不停的续约

流程即

  1. A、B线程争抢一把锁,A获取到后,B阻塞
  2. B线程阻塞时并非主动CAS,而是PubSub方式订阅该锁的广播消息
  3. A操作完成释放了锁,B线程收到订阅消息通知
  4. B被唤醒开始继续抢锁,拿到锁
最后修改:2025 年 08 月 01 日
如果觉得我的文章对你有用,请随意赞赏