JUC
java
分布式锁
约 3366 个字 405 行代码 8 张图片 预计阅读时间 16 分钟
分布式锁基本介绍
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
在一个单体系统中,使用 Synchronized 或者 ReentrantLock 等本地锁就能保证线程安全问题,但是在分布式场景下,因为synchronized是本地锁 ,只能提供线程级别 的同步,每个JVM中都有一把synchronized锁,不能跨 JVM 进行上锁 ,当一个线程进入被 synchronized 关键字修饰的方法或代码块时,它会尝试获取对象的内置锁(也称为监视器锁)。如果该锁没有被其他线程占用,则当前线程获得锁,可以继续执行代码;否则,当前线程将进入阻塞状态,直到获取到锁为止。而如果现在存在两个节点,即两台 JVM,那么synchronized 锁会失效
1. 分布式锁特点
多线程可见。
互斥。 分布式锁必须能够确保在任何时刻只有一个节点能够获得锁,其他节点需要等待。
高可用。 分布式锁应该具备高可用性,即使在网络分区或节点故障的情况下,仍然能够正常工作。(容错性)当持有锁的节点发生故障或宕机时,系统需要能够自动释放该锁,以确保其他节点能够继续获取锁。
高性能。 分布式锁需要具备良好的性能,尽可能减少对共享资源的访问等待时间,以及减少锁竞争带来的开销。
安全性。(可重入性) 如果一个节点已经获得了锁,那么它可以继续请求获取该锁而不会造成死锁。(锁超时机制)为了避免某个节点因故障或其他原因无限期持有锁而影响系统正常运行,分布式锁通常应该设置超时机制,确保锁的自动释放。
2. 基于Redis实现分布式锁
使用 setnx 指令
setnx指令的特点 :setnx只能设置key不存在的值,值不存在设置成功,返回 1 ;值存在设置失败,返回 0
获取锁
Bash # 添加锁
setnx [ key] [ value]
# 为锁设置过期时间,超时释放,避免死锁
expire [ key] [ time]
方式二: (两个指令变成一个指令,从而保障指令的原子性)
Bash # 添加锁
set [ key] [ value] ex [ time] nx
释放锁
Bash # 释放锁(除了使用del手动释放,还可超时释放)
del [ key]
Java 代码
Java public class SimpleRedisLock implements Lock {
/**
* RedisTemplate
*/
private StringRedisTemplate stringRedisTemplate ;
/**
* 锁的名称
*/
private String name ;
public SimpleRedisLock ( StringRedisTemplate stringRedisTemplate , String name ) {
this . stringRedisTemplate = stringRedisTemplate ;
this . name = name ;
}
/**
* 获取锁
*
* @param timeoutSec 超时时间
* @return
*/
@Override
public boolean tryLock ( long timeoutSec ) {
String id = Thread . currentThread (). getId () + "" ;
// SET lock:name id EX timeoutSec NX
Boolean result = stringRedisTemplate . opsForValue ()
. setIfAbsent ( "lock:" + name , id , timeoutSec , TimeUnit . SECONDS );
return Boolean . TRUE . equals ( result );
}
/**
* 释放锁
*/
@Override
public void unlock () {
stringRedisTemplate . delete ( "lock:" + name );
}
}
3. 超卖问题优化
优化 1
问题背景:当线程1获取锁后,由于业务阻塞,线程1的锁超时释放了,这时候线程2趁虚而入拿到了锁,然后此时线程1业务完成了,然后把线程2刚刚获取的锁给释放了,这时候线程3又趁虚而入拿到了锁,这就导致又出现了超卖问题 !
解决办法:我们为分布式锁添加一个线程标识,在释放锁时判断当前锁是否是自己的锁,是自己的就直接释放,不是自己的就不释放锁,从而解决多个线程同时获得锁的情况导致出现超卖
实现细节:
Java public class SimpleRedisLock implements Lock {
/**
* RedisTemplate
*/
private StringRedisTemplate stringRedisTemplate ;
/**
* 锁的名称
*/
private String name ;
/**
* key前缀
*/
public static final String KEY_PREFIX = "lock:" ;
/**
* ID前缀
*/
public static final String ID_PREFIX = UUID . randomUUID (). toString ( true ) + "-" ;
public SimpleRedisLock ( StringRedisTemplate stringRedisTemplate , String name ) {
this . stringRedisTemplate = stringRedisTemplate ;
this . name = name ;
}
/**
* 获取锁
*
* @param timeoutSec 超时时间
* @return
*/
@Override
public boolean tryLock ( long timeoutSec ) {
String threadId = ID_PREFIX + Thread . currentThread (). getId () + "" ;
// SET lock:name id EX timeoutSec NX
Boolean result = stringRedisTemplate . opsForValue ()
. setIfAbsent ( KEY_PREFIX + name , threadId , timeoutSec , TimeUnit . SECONDS );
return Boolean . TRUE . equals ( result );
}
/**
* 释放锁
*/
@Override
public void unlock () {
// 判断 锁的线程标识 是否与 当前线程一致
String currentThreadFlag = ID_PREFIX + Thread . currentThread (). getId ();
String redisThreadFlag = stringRedisTemplate . opsForValue (). get ( KEY_PREFIX + name );
if ( currentThreadFlag != null || currentThreadFlag . equals ( redisThreadFlag )) {
// 一致,说明当前的锁就是当前线程的锁,可以直接释放
stringRedisTemplate . delete ( KEY_PREFIX + name );
}
// 不一致,不能释放
}
}
优化 2
问题背景:当线程1获取锁,执行完业务然后并且判断完当前锁是自己的锁时 ,但就在此时发生了阻塞(发生了 JVM 的垃圾回收机制) ,结果锁被超时释放 了,线程2立马就趁虚而入了,获得锁执行业务,但就在此时线程1阻塞完成,由于已经判断过锁,已经确定锁是自己的锁了,于是直接就删除了锁,结果删的是线程2的锁,这就又导致线程3趁虚而入了,从而继续发生超卖问题
解决办法:使用Lua脚本保障 判断锁 和 释放锁 这段代码的原子性
Lua 脚本如何保证原子性
在Redis中,Lua脚本能够保证原子性的主要原因还是Redis采用了单线程执行模型。也就是说,当Redis执行Lua脚本时,Redis会把Lua脚本作为一个整体并把它当作一个任务加入到一个队列中,然后单线程按照队列的顺序依次执行这些任务,在执行过程中Lua脚本是不会被其他命令或请求打断,因此可以保证每个任务的执行都是原子性的。
注意 :虽然Redis在单个Lua脚本的执行期间会暂停其他脚本和Redis命令,以确保脚本的执行是原子的,但如果Lua脚本本身出错,那么无法完全保证原子性。也就是说Lua脚本中的Redis指令出错,会发生回滚以确保原子性,但Lua脚本本身出错就无法保障原子性
深入阅读:阿里 P7二面:Redis 执行 Lua,能保证原子性吗?-腾讯云开发者社区-腾讯云
Lua 代码:
Lua -- 比较缓存中的线程标识与当前线程标识是否一致
if ( redis . call ( 'get' , KEYS [ 1 ]) == ARGV [ 1 ]) then
-- 一致,直接删除
return redis . call ( 'del' , KEYS [ 1 ])
end
-- 不一致,返回0
return 0
Java 代码:
Java public class SimpleRedisLock implements Lock {
/**
* RedisTemplate
*/
private StringRedisTemplate stringRedisTemplate ;
/**
* 锁的名称
*/
private String name ;
/**
* key前缀
*/
private static final String KEY_PREFIX = "lock:" ;
/**
* ID前缀
*/
private static final String ID_PREFIX = UUID . randomUUID (). toString ( true ) + "-" ;
public SimpleRedisLock ( StringRedisTemplate stringRedisTemplate , String name ) {
this . stringRedisTemplate = stringRedisTemplate ;
this . name = name ;
}
/**
* 获取锁
*
* @param timeoutSec 超时时间
* @return
*/
@Override
public boolean tryLock ( long timeoutSec ) {
String threadId = ID_PREFIX + Thread . currentThread (). getId () + "" ;
// SET lock:name id EX timeoutSec NX
Boolean result = stringRedisTemplate . opsForValue ()
. setIfAbsent ( KEY_PREFIX + name , threadId , timeoutSec , TimeUnit . SECONDS );
return Boolean . TRUE . equals ( result );
}
/**
* 加载Lua脚本
*/
private static final DefaultRedisScript < Long > UNLOCK_SCRIPT ;
static {
UNLOCK_SCRIPT = new DefaultRedisScript <> ();
UNLOCK_SCRIPT . setLocation ( new ClassPathResource ( "lua/unlock.lua" ));
UNLOCK_SCRIPT . setResultType ( Long . class );
}
/**
* 释放锁
*/
@Override
public void unlock () {
// 执行lua脚本
stringRedisTemplate . execute (
UNLOCK_SCRIPT ,
Collections . singletonList ( KEY_PREFIX + name ),
ID_PREFIX + Thread . currentThread (). getId ()
);
}
}
Redisson
经过优化1和优化2,我们实现的分布式锁已经达到生产可用级别了,但是还不够完善,比如:
分布式锁不可重入 :不可重入是指同一线程不能重复获取同一把锁。比如,方法A中调用方法B,方法A需要获取分布式锁,方法B同样需要获取分布式锁,线程1进入方法A获取了一次锁,进入方法B又获取一次锁,由于锁不可重入,所以就会导致死锁
分布式锁不可重试 :获取锁只尝试一次就返回false,没有重试机制,这会导致数据丢失,比如线程1获取锁,然后要将数据写入数据库,但是当前的锁被线程2占用了,线程1直接就结束了而不去重试,这就导致数据发生了丢失
分布式锁超时释放 :超时释放机机制虽然一定程度避免了死锁发生的概率,但是如果业务执行耗时过长,期间锁就释放了,这样存在安全隐患。锁的有效期过短,容易出现业务没执行完就被释放,锁的有效期过长,容易出现死锁,所以这是一个大难题!
我们可以设置一个较短的有效期,但是加上一个心跳机制和自动续期 :在锁被获取后,可以使用心跳机制并自动续期锁的持有时间。通过定期发送心跳请求,显示地告知其他线程或系统锁还在使用中,同时更新锁的过期时间。如果某个线程持有锁的时间超过了预设的有效时间,其他线程可以尝试重新获取锁。
主从一致性问题 :如果Redis提供了主从集群,主从同步存在延迟,线程1获取了锁
而上述这些问题,Redisson 都给出了解决方案,接下来对 Redisson 的可重入锁和锁重试做原理讲解
Redisson可重入锁
Redisson通过维护一个计数器(本质上是 hash 结构) 来实现锁的可重入特性。
当同一个线程第一次获取锁时,Redis会记录下这个线程的 threadId
,并将锁的持有次数设置为1。
如果这个线程再次请求锁(即可重入操作),Redisson会检测到当前持有锁的 threadId
与当前线程相同,则不会重新设置锁,而是简单地增加计数器,表示这个线程再次持有了锁。
每次释放锁时,Redisson会减少计数器,只有当计数器减为0时,锁才会真正释放。
Redisson内部释放锁,并不是直接执行 del
命令将锁给删除,而是将锁以 hash
数据结构的形式存储在Redis中,每次获取锁,都将 value
的值+1,每次释放锁,都将value的值-1,只有锁的value值归0时才会真正的释放锁,从而确保锁的可重入性
获取锁对应的 Lua 脚本如下:
源码位置: org.redisson.RedissonLock#tryLockInnerAsync
Lua -- 大 key 不存在,说明没有线程获取到锁
if (( redis . call ( 'exists' , KEYS [ 1 ]) == 0 )
-- 大 key 存在,但是hash 的 key(线程 id)相同,代表锁重入
or ( redis . call ( 'hexists' , KEYS [ 1 ], ARGV [ 2 ]) == 1 )) then
-- value + 1,并设置过期时间
redis . call ( 'hincrby' , KEYS [ 1 ], ARGV [ 2 ], 1 );
-- p 开头表示以毫秒为单位
redis . call ( 'pexpire' , KEYS [ 1 ], ARGV [ 1 ]);
return nil ;
end ;
-- 获取失败,其他线程占有锁,返回锁的剩余有效期(毫秒)
return redis . call ( 'pttl' , KEYS [ 1 ]);
释放锁对应的 Lua 脚本如下:
源码位置:
org.redisson.RedissonLock#unlockInnerAsync
Java protected RFuture < Boolean > unlockInnerAsync ( long threadId , String requestId , int timeout ) {
return evalWriteSyncedNoRetryAsync ( getRawName (), LongCodec . INSTANCE , RedisCommands . EVAL_BOOLEAN ,
"local val = redis.call('get', KEYS[3]); " +
"if val ~= false then " +
"return tonumber(val);" +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
"return 1; " +
"end; " ,
Arrays . asList ( getRawName (), getChannelName (), getUnlockLatchName ( requestId )),
LockPubSub . UNLOCK_MESSAGE , internalLockLeaseTime ,
getLockName ( threadId ), getSubscribeService (). getPublishCommand (), timeout );
}
Lua -- 尝试获取释放锁的标记 key(KEYS[3]),如果已经存在,就直接返回该值,防止重复释放锁
local val = redis . call ( 'get' , KEYS [ 3 ])
if val ~= false then
return tonumber ( val ) -- 如果释放标记已存在,则直接返回标记值(0 或 1)
end
-- 检查锁的哈希表 (KEYS[1]) 是否包含当前线程 ID(ARGV[3])
if redis . call ( 'hexists' , KEYS [ 1 ], ARGV [ 3 ]) == 0 then
return nil -- 当前线程没有持有该锁,返回 nil,表示无法释放锁
end
-- 减少当前线程持有的锁次数(可重入锁,每次调用 -1)
local counter = redis . call ( 'hincrby' , KEYS [ 1 ], ARGV [ 3 ], - 1 )
if counter > 0 then
-- 说明锁还有剩余次数(可重入锁仍未完全释放)
-- 重新设置锁的过期时间,确保锁不会因超时被删除
redis . call ( 'pexpire' , KEYS [ 1 ], ARGV [ 2 ])
-- 释放标记 key 设为 0,表示锁仍然有效
redis . call ( 'set' , KEYS [ 3 ], 0 , 'px' , ARGV [ 5 ])
return 0 -- 返回 0,表示锁未完全释放
else
-- 说明该线程的可重入锁已全部释放,需要彻底删除锁
-- 删除锁的哈希表 (KEYS[1])
redis . call ( 'del' , KEYS [ 1 ])
-- 使用 `ARGV[4]` 这个 Redis 命令(通常是 `publish`),通知其他线程锁已释放
redis . call ( ARGV [ 4 ], KEYS [ 2 ], ARGV [ 1 ])
-- 释放标记 key 设为 1,表示锁已完全释放
redis . call ( 'set' , KEYS [ 3 ], 1 , 'px' , ARGV [ 5 ])
return 1 -- 返回 1,表示锁已释放
end
变量/参数
作用
KEYS[1]
存储锁的 Redis 哈希表 ,用于支持可重入锁,HSET key threadId count 记录当前线程持有的锁次数
KEYS[2]
发布订阅消息的频道 ,用于通知其他等待的线程(一般是 redisson_lock__channel:{锁名称})
KEYS[3]
释放锁标记 key ,用于防止锁被重复释放(0 代表锁未释放,1 代表锁已释放)
ARGV[1]
锁的名称 ,用于唯一标识锁
ARGV[2]
锁的过期时间(毫秒) ,用于重新设置锁的 TTL
ARGV[3]
当前线程 ID ,用于检查该线程是否持有锁
ARGV[4]
Redis 命令(如 publish) ,用于通知其他线程锁已释放
ARGV[5]
释放锁标记 key 的过期时间(毫秒) ,用于避免 KEYS[3] 占用 Redis 资源
黑马的流程图中其实少了一个重要的参数判断,即 KEYS[3],这里对 KEYS[3] 做详细介绍
问题
如果没有 KEYS[3] 可能出现的问题
KEYS[3] 如何解决
锁的重复释放
线程 A 释放锁后,线程 B 可能仍然尝试释放它,导致误删
KEYS[3] 记录锁是否已经释放,防止误删
性能问题
每次释放锁都需要执行 Redis 命令
通过 KEYS[3] 快速返回锁状态,减少 Redis 操作
锁状态不可知
其他线程无法判断锁是否真的释放
KEYS[3] 0 表示未释放,1 表示已释放
Redisson锁重试
Redisson 获取锁的方法有两个,分别是 lock()
和 tryLock()
区别如下:
方法
适用场景
订阅机制
超时处理
lock()
必须获取锁 ,否则阻塞
订阅 Redis 频道 ,等待锁释放
无超时
tryLock()
尝试获取锁 ,超时后放弃
订阅 Redis 频道(等待时间内)
超时后直接返回 false
流程解析图
这里以 tryLock()
方法为例
代码位置: org.redisson.RedissonLock#tryLock(long, long, java.util.concurrent.TimeUnit)
Java @Override
public boolean tryLock ( long waitTime , long leaseTime , TimeUnit unit ) throws InterruptedException {
long time = unit . toMillis ( waitTime ); // 将等待时间转换为毫秒
long current = System . currentTimeMillis (); // 记录当前时间
long threadId = Thread . currentThread (). getId (); // 获取当前线程 ID
// ① **尝试直接获取锁**
Long ttl = tryAcquire ( waitTime , leaseTime , unit , threadId );
// ② **成功获取锁,直接返回 true**
if ( ttl == null ) {
return true ;
}
// ③ **计算剩余等待时间**
time -= System . currentTimeMillis () - current ;
if ( time <= 0 ) {
acquireFailed ( waitTime , unit , threadId ); // 记录获取锁失败
return false ;
}
// ④ **订阅锁释放事件**
current = System . currentTimeMillis ();
CompletableFuture < RedissonLockEntry > subscribeFuture = subscribe ( threadId );
try {
// ⑤ **等待订阅结果**
subscribeFuture . get ( time , TimeUnit . MILLISECONDS );
} catch ( TimeoutException e ) {
// **超时处理**
if ( ! subscribeFuture . completeExceptionally ( new RedisTimeoutException (
"Unable to acquire subscription lock after " + time + "ms. " +
"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters." ))) {
subscribeFuture . whenComplete (( res , ex ) -> {
if ( ex == null ) {
unsubscribe ( res , threadId );
}
});
}
acquireFailed ( waitTime , unit , threadId );
return false ;
} catch ( ExecutionException e ) {
// **执行异常处理**
LOGGER . error ( e . getMessage (), e );
acquireFailed ( waitTime , unit , threadId );
return false ;
}
try {
// ⑥ **更新剩余等待时间**
time -= System . currentTimeMillis () - current ;
if ( time <= 0 ) {
acquireFailed ( waitTime , unit , threadId );
return false ;
}
while ( true ) {
long currentTime = System . currentTimeMillis ();
ttl = tryAcquire ( waitTime , leaseTime , unit , threadId ); // **再次尝试获取锁**
// **成功获取锁,返回 true**
if ( ttl == null ) {
return true ;
}
// ⑦ **检查剩余等待时间**
time -= System . currentTimeMillis () - currentTime ;
if ( time <= 0 ) {
acquireFailed ( waitTime , unit , threadId );
return false ;
}
// ⑧ **阻塞等待锁释放**
currentTime = System . currentTimeMillis ();
if ( ttl >= 0 && ttl < time ) {
// **等待锁的过期时间**
commandExecutor . getNow ( subscribeFuture ). getLatch (). tryAcquire ( ttl , TimeUnit . MILLISECONDS );
} else {
// **等待剩余的时间**
commandExecutor . getNow ( subscribeFuture ). getLatch (). tryAcquire ( time , TimeUnit . MILLISECONDS );
}
// **更新剩余时间**
time -= System . currentTimeMillis () - currentTime ;
if ( time <= 0 ) {
acquireFailed ( waitTime , unit , threadId );
return false ;
}
}
} finally {
// ⑨ **取消订阅**
unsubscribe ( commandExecutor . getNow ( subscribeFuture ), threadId );
}
}
WatchDog 实现超时续约
但不管调用的是 lock()
还是 tryLock()
,最终会调用到 tryAcquireAsync
其中有一个非常重要的判断逻辑
Java if ( leaseTime > 0 ) {
ttlRemainingFuture = tryLockInnerAsync ( waitTime , leaseTime , unit , threadId , RedisCommands . EVAL_LONG );
} else {
ttlRemainingFuture = tryLockInnerAsync ( waitTime , internalLockLeaseTime ,
TimeUnit . MILLISECONDS , threadId , RedisCommands . EVAL_LONG );
}
如果 leaseTime > 0 ,那么 Redisson 会按照指定的 leaseTime 进行锁的过期控制 ,不会启动 看门狗机制 (自动续期)。
而如果 leaseTime <= 0 (默认-1) ,那么 Redisson 会使用内部的internalLockLeaseTime ,并启动看门狗机制
internalLockLeaseTime
在构造函数处初始化
Java public RedissonLock ( CommandAsyncExecutor commandExecutor , String name ) {
super ( commandExecutor , name );
this . commandExecutor = commandExecutor ;
this . internalLockLeaseTime = getServiceManager (). getCfg (). getLockWatchdogTimeout ();
this . pubSub = commandExecutor . getConnectionManager (). getSubscribeService (). getLockPubSub ();
}
默认值为 30 * 1000ms,即 30 秒
如果拿到分布式锁的节点宕机,且这个锁正好处于锁住的状态时,会出现锁死的状态,为了避免这种情况的发生,锁都会设置一个过期时间。这样也存在线程安全问题,加入一个线程拿到了锁设置了30s超时,在30s后这个线程还没有执行完毕,锁超时释放了,就会导致问题,Redisson给出了自己的答案,就是 watch dog 自动延期机制。
Redisson提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期,也就是说,如果一个拿到锁的线程一直没有完成逻辑,那么看门狗会帮助线程不断的延长锁超时时间,锁不会因为超时而被释放。
默认情况下,看门狗的续期时间是30s ,也可以通过修改 Config.lockWatchdogTimeout
来另行指定。另外Redisson 还提供了可以指定leaseTime参数的加锁方法来指定加锁的时间。超过这个时间后锁便自动解开了,不会延长锁的有效期。
继续跟踪源码,以下是 tryAcquireAsync
的方法全貌
源码位置: org.redisson.RedissonLock#tryAcquireAsync
Java private RFuture < Long > tryAcquireAsync ( long waitTime , long leaseTime , TimeUnit unit , long threadId ) {
RFuture < Long > ttlRemainingFuture ;
if ( leaseTime > 0 ) {
ttlRemainingFuture = tryLockInnerAsync ( waitTime , leaseTime , unit , threadId , RedisCommands . EVAL_LONG );
} else {
ttlRemainingFuture = tryLockInnerAsync ( waitTime , internalLockLeaseTime ,
TimeUnit . MILLISECONDS , threadId , RedisCommands . EVAL_LONG );
}
CompletionStage < Long > s = handleNoSync ( threadId , ttlRemainingFuture );
ttlRemainingFuture = new CompletableFutureWrapper <> ( s );
CompletionStage < Long > f = ttlRemainingFuture . thenApply ( ttlRemaining -> {
// lock acquired
if ( ttlRemaining == null ) {
if ( leaseTime > 0 ) {
internalLockLeaseTime = unit . toMillis ( leaseTime );
} else {
//这里是定时执行 当前锁自动延期的动作,leaseTime为-1时,才会自动延期
scheduleExpirationRenewal ( threadId );
}
}
return ttlRemaining ;
});
return new CompletableFutureWrapper <> ( f );
}
重点在于 scheduleExpirationRenewal
这个自动延期方法,最终调用 org.redisson.renewal.RenewalTask#add
Java final void add ( String rawName , String lockName , long threadId , LockEntry entry ) {
addSlotName ( rawName );
LockEntry oldEntry = name2entry . putIfAbsent ( rawName , entry );
if ( oldEntry != null ) {
oldEntry . addThreadId ( threadId , lockName );
} else {
if ( tryRun ()) {
schedule ();
}
}
}
tryRun()
方法是一个 CAS 操作,保证同一时刻只有 1 个线程进行续约操作
Java boolean tryRun () {
return running . compareAndSet ( false , true );
}
schedule()
方法最终向一个线程池(其实没看懂,应该是线程池类似物)中传入一个定时任务,时间间隔为 internalLockLeaseTime / 3 ,即 10s 执行一次续约
Java public void schedule () {
if ( ! running . get ()) {
return ;
}
long internalLockLeaseTime = executor . getServiceManager (). getCfg (). getLockWatchdogTimeout ();
executor . getServiceManager (). newTimeout ( this , internalLockLeaseTime / 3 , TimeUnit . MILLISECONDS );
}
疑问:既然有了开门狗机制,为什么还要设置默认过期释放时间为 30s?既然目的是防止锁超时释放导致线程安全问题,那么直接设置为-1 永不过期不就好了?
其实上文中有提到,如果当前节点的进程崩溃了,过期时间却设置为永不过期的话,就会导致死锁问题,其他节点无法获得锁,从而一直阻塞。同时也是保证即使看门狗机制失效了,也不会一直持有锁。
主从一致
主从一致性:集群模式下,主从同步存在延迟,当加锁后主服务器宕机时,从服务器还没同步主服务器中的锁数据,此时从服务器升级为主服务器,其他线程又可以获取到锁
将服务器升级为多主多从:
获取锁需要从所有主服务器 SET 成功才算获取成功
某个 master 宕机,slave 还没有同步锁数据就升级为 master,其他线程尝试加锁会加锁失败,因为其他 master 上已经存在该锁
2025-03-07
GitHub