又长又细,万字长文带你解读Redisson分布式锁的源码
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了又长又细,万字长文带你解读Redisson分布式锁的源码,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含16395字,纯文字阅读大概需要24分钟。
内容图文
前言
上一篇文章写了Redis分布式锁的原理和缺陷,觉得有些不过瘾,只是简单的介绍了下Redisson这个框架,具体的原理什么的还没说过呢。趁年前项目忙的差不多了,反正闲着也是闲着,不如把Redisson的源码也学习一遍好了。
虽说是一时兴起,但仔细研究之后发现Redisson的源码解读工作量还是挺大的,其中用到了大量的Java并发类,并且引用了Netty作为通信工具,实现与Redis组件的远程调用,这些知识点如果要全部讲解的话不太现实,本文的重点主要是关于Redisson分布式锁的实现原理,所以网络通信和并发原理这块的代码解读不会太仔细,有不足之处还望见谅!
Redis 发布订阅
之前说过,分布式锁的核心功能其实就三个:加锁、解锁、设置锁超时。这三个功能也是我们研究Redisson分布式锁原理的方向。
在学习之前,我们有必要先了解一个知识点,就是有关Redis的发布订阅功能。
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息,发布者可以向指定的渠道 (channel) 发送消息,订阅者如果订阅了该频道的话就能收到消息,从而实现多个客户端的通信效果。
订阅的命令是SUBSCRIBE channel[channel ...]
,可以订阅一个或多个频道,当有新消息通过PUBLISH命令发送给频道时,订阅者就能收到消息,就好像这样
开启两个客户端,一个订阅了频道channel1,另一个通过PUBLISH发送消息后,订阅的那个就能收到了,靠这种模式就能实现不同客户端之间的通信。
当然,关于这种通信模式有哪些妙用场景我们就不展开了,大家可以自己去网上查阅一下,我们的主角还是Redisson,热身完毕,该上主菜了。
Redisson源码
在使用Redisson加锁之前,需要先获取一个RLock实例对象,有了这个对象就可以调用lock、tryLock方法来完成加锁的功能
Config?config?=?new?Config();
config.useSingleServer()
??.setPassword("")
??.setAddress("redis://127.0.0.1:6379");
RedissonClient?redisson?=?Redisson.create(config);
//?RLock对象
RLock?lock?=?redisson.getLock("myLock");
配置好对应的host,然后就可以创建一个RLock对象。RLock是一个接口,具体的同步器需要实现该接口,当我们调用redisson.getLock()
时,程序会初始化一个默认的同步执行器RedissonLock
这里面初始化了几个参数,
commandExecutor:异步的Executor执行器,Redisson中所有的命令都是通过...Executor 执行的 ;
id:唯一ID,初始化的时候是用UUID创建的;
internalLockLeaseTime:等待获取锁时间,这里读的是配置类中默认定义的,时间为30秒;
同时,图片里我还标注了一个方法getEntryName
,返回的是 “ID :锁名称” 的字符串,代表的是当前线程持有对应锁的一个标识,这些参数有必要留个印象,后面的源码解析中经常会出现。
说完了初始化的东西,我们就可以开始学习加锁和解锁的源码了。
加锁
Redisson的加锁方法有两个,tryLock和lock,使用上的区别在于tryLock可以设置锁的过期时长leaseTime
和等待时长waitTime
,核心处理的逻辑都差不多,我们先从tryLock讲起。
tryLock
代码有点长啊。。。整成图片不太方便,直接贴上来吧,
/**
?*?@param?waitTime?等待锁的时长?
?*?@param?leaseTime?锁的持有时间?
?*?@param?unit?时间单位
?*?@return
?*?@throws?InterruptedException
?*/
public?boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit)?throws?InterruptedException?{????
????????//?剩余的等待锁的时间
????????long?time?=?unit.toMillis(waitTime);
????????long?current?=?System.currentTimeMillis();
????????
????????final?long?threadId?=?Thread.currentThread().getId();
????????//?尝试获取锁,如果没取到锁,则返回锁的剩余超时时间
????????Long?ttl?=?tryAcquire(leaseTime,?unit,?threadId);
????????//?ttl为null,说明可以抢到锁了,返回true
????????if?(ttl?==?null)?{
????????????return?true;
????????}
????????
????????//?如果waitTime已经超时了,就返回false,代表申请锁失败
????????time?-=?(System.currentTimeMillis()?-?current);
????????if?(time?<=?0)?{
????????????acquireFailed(threadId);
????????????return?false;
????????}
????????
????????current?=?System.currentTimeMillis();
????????//?订阅分布式锁,?解锁时进行通知,看,这里就用到了我们上面说的发布-订阅了吧
????????final?RFuture<RedissonLockEntry>?subscribeFuture?=?subscribe(threadId);
????????//?阻塞等待锁释放,await()返回false,说明等待超时了
????????if?(!await(subscribeFuture,?time,?TimeUnit.MILLISECONDS))?{
????????????if?(!subscribeFuture.cancel(false))?{
????????????????subscribeFuture.addListener(new?FutureListener<RedissonLockEntry>()?{
????????????????????@Override
????????????????????public?void?operationComplete(Future<RedissonLockEntry>?future)?throws?Exception?{
????????????????????????if?(subscribeFuture.isSuccess())?{
?????????????????????????//?等待都超时了,直接取消订阅
????????????????????????????unsubscribe(subscribeFuture,?threadId);
????????????????????????}
????????????????????}
????????????????});
????????????}
????????????acquireFailed(threadId);
????????????return?false;
????????}
????????try?{
????????????time?-=?(System.currentTimeMillis()?-?current);
????????????if?(time?<=?0)?{
????????????????acquireFailed(threadId);
????????????????return?false;
????????????}
?????????//?进入死循环,反复去调用tryAcquire尝试获取锁,跟上面那一段拿锁的逻辑一样
????????????while?(true)?{
????????????????long?currentTime?=?System.currentTimeMillis();
????????????????ttl?=?tryAcquire(leaseTime,?unit,?threadId);
????????????????//?lock?acquired
????????????????if?(ttl?==?null)?{
????????????????????return?true;
????????????????}
????????????????time?-=?(System.currentTimeMillis()?-?currentTime);
????????????????if?(time?<=?0)?{
????????????????????acquireFailed(threadId);
????????????????????return?false;
????????????????}
????????????????//?waiting?for?message
????????????????currentTime?=?System.currentTimeMillis();
????????????????if?(ttl?>=?0?&&?ttl?<?time)?{
????????????????????getEntry(threadId).getLatch().tryAcquire(ttl,?TimeUnit.MILLISECONDS);
????????????????}?else?{
????????????????????getEntry(threadId).getLatch().tryAcquire(time,?TimeUnit.MILLISECONDS);
????????????????}
????????????????time?-=?(System.currentTimeMillis()?-?currentTime);
????????????????if?(time?<=?0)?{
????????????????????acquireFailed(threadId);
????????????????????return?false;
????????????????}
????????????}
????????}?finally?{
????????????unsubscribe(subscribeFuture,?threadId);
????????}
//????????return?get(tryLockAsync(waitTime,?leaseTime,?unit));
????}
代码还是挺长的,不过流程也就两步,要么线程拿到锁返回成功;要么没拿到锁并且等待时间还没过就继续循环拿锁,同时监听锁是否被释放。
拿锁的方法是tryAcquire
,传入的参数分别是锁的持有时间,时间单位以及代表当前线程的ID,跟进代码查看调用栈,它会调到一个叫做tryAcquireAsync
的方法:
private?Long?tryAcquire(long?leaseTime,?TimeUnit?unit,?long?threadId)?{
????return?get(tryAcquireAsync(leaseTime,?unit,?threadId));
}
private?<T>?RFuture<Long>?tryAcquireAsync(long?leaseTime,?TimeUnit?unit,?final?long?threadId)?{
????????//?如果有设置锁的等待时长的话,就直接调用tryLockInnerAsync方法获取锁
????????if?(leaseTime?!=?-1)?{
????????????return?tryLockInnerAsync(leaseTime,?unit,?threadId,?RedisCommands.EVAL_LONG);
????????}
????????//?没有设置等待锁的时长的话,加多一个监听器,也就是调用lock.lock()会跑的逻辑,后面会说
????????RFuture<Long>?ttlRemainingFuture?=?tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),?TimeUnit.MILLISECONDS,?threadId,?RedisCommands.EVAL_LONG);
????????ttlRemainingFuture.addListener(new?FutureListener<Long>()?{
????????????@Override
????????????public?void?operationComplete(Future<Long>?future)?throws?Exception?{
????????????????if?(!future.isSuccess())?{
????????????????????return;
????????????????}
????????????????Long?ttlRemaining?=?future.getNow();
????????????????//?lock?acquired
????????????????if?(ttlRemaining?==?null)?{
????????????????????scheduleExpirationRenewal(threadId);
????????????????}
????????????}
????????});
????????return?ttlRemainingFuture;
????}
我们继续跟,看看tryLockInnerAsync
方法的源码:
<T>?RFuture<T>?tryLockInnerAsync(long?leaseTime,?TimeUnit?unit,?long?threadId,?RedisStrictCommand<T>?command)?{
????internalLockLeaseTime?=?unit.toMillis(leaseTime);
????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?command,
??????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
??????????????????"redis.call('hset',?KEYS[1],?ARGV[2],?1);?"?+
??????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
??????????????????"return?nil;?"?+
??????????????"end;?"?+
??????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
??????????????????"redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?"?+
??????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
??????????????????"return?nil;?"?+
??????????????"end;?"?+
??????????????"return?redis.call('pttl',?KEYS[1]);",
????????????????Collections.<Object>singletonList(getName()),?internalLockLeaseTime,?getLockName(threadId));
}
String?getLockName(long?threadId)?{
????return?id?+?":"?+?threadId;
}
这里就是底层的调用栈了,直接操作命令,整合成lua脚本后,调用netty的工具类跟redis进行通信,从而实现获取锁的功能。
这段脚本命令还是有点意思的,简单解读一下:
先用 exists key
命令判断是否锁是否被占据了,没有的话就用hset
命令写入,key为锁的名称,field为“客户端唯一ID:线程ID”,value为1;锁被占据了,判断是否是当前线程占据的,是的话value值加1; 锁不是被当前线程占据,返回锁剩下的过期时长;
命令的逻辑并不复杂,但不得不说,作者的设计还是很有心的,用了redis的Hash结构存储数据,如果发现当前线程已经持有锁了,就用hincrby
命令将value值加1,value的值将决定释放锁的时候调用解锁命令的次数,达到实现锁的可重入性效果。
每一步命令对应的逻辑我都在下面的图中标注了,大家可以读一下:
我们继续跟代码吧,根据上面的命令可以看出,如果线程拿到锁的话,tryLock方法会直接返回true,万事大吉。
拿不到的话,就会返回锁的剩余过期时长,这个时长有什么作用呢?我们回到tryLock方法中死循环的那个地方:
这里有一个针对waitTime和key的剩余过期时间大小的比较,取到二者中比较小的那个值,然后用Java的Semaphore信号量的tryAcquire方法来阻塞线程。
那么Semaphore信号量又是由谁控制呢,何时才能release呢。这里又需要回到上面来看,各位看官应该还记得,我们上面贴的tryLock代码中还有这一段:
current?=?System.currentTimeMillis();
//?订阅分布式锁,?解锁时进行通知
final?RFuture<RedissonLockEntry>?subscribeFuture?=?subscribe(threadId);
订阅的逻辑显然是在subscribe
方法里,跟着方法的调用链,它会进入到PublishSubscribe.Java中:
这段代码的作用在于将当前线程的threadId添加到一个AsyncSemaphore中,并且设置一个redis的监听器,这个监听器是通过redis的发布、订阅功能实现的。
一旦监听器收到redis发来的消息,就从中获取与当前thread相关的,如果是锁被释放的消息,就立马通过操作Semaphore(也就是调用release方法)来让刚才阻塞的地方释放。
释放后线程继续执行,仍旧是判断是否已经超时。如果还没超时,就进入下一次循环再次去获取锁,拿到就返回true,没有拿到的话就继续流程。
这里说明一下,之所以要循环,是因为锁可能会被多个客户端同时争抢,线程阻塞被释放之后的那一瞬间很可能还是拿不到锁,但是线程的等待时间又还没过,这个时候就需要重新跑循环去拿锁。
这就是tryLock获取锁的整个过程了,画一张流程图的话表示大概是这样:
lock
除了tryLock,一般我们还经常直接调用lock来获取锁,lock的拿锁过程跟tryLock基本是一致的,区别在于lock没有手动设置锁过期时长的参数,该方法的调用链也是跑到tryAcquire
方法来获取锁的,不同的是,它会跑到这部分的逻辑:
这段代码做了两件事:
1、预设30秒的过期时长,然后去获取锁
2、开启一个监听器,如果发现拿到锁了,就开启定时任务不断去刷新该锁的过期时长
刷新过期时长的方法是scheduleExpirationRenewal
,贴一下源码吧:
private?void?scheduleExpirationRenewal(final?long?threadId)?{
?//?expirationRenewalMap是一个ConcurrentMap,存储标志为"当前线程ID:key名称"的任务
????????if?(expirationRenewalMap.containsKey(getEntryName()))?{
????????????return;
????????}
????????Timeout?task?=?commandExecutor.getConnectionManager().newTimeout(new?TimerTask()?{
????????????@Override
????????????public?void?run(Timeout?timeout)?throws?Exception?{
????????????????//?检测锁是否存在的lua脚本,存在的话就用pexpire命令刷新过期时长
????????????????RFuture<Boolean>?future?=?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
????????????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
????????????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
????????????????????????????"return?1;?"?+
????????????????????????"end;?"?+
????????????????????????"return?0;",
??????????????????????????Collections.<Object>singletonList(getName()),?internalLockLeaseTime,?getLockName(threadId));
????????????????
????????????????future.addListener(new?FutureListener<Boolean>()?{
????????????????????@Override
????????????????????public?void?operationComplete(Future<Boolean>?future)?throws?Exception?{
????????????????????????expirationRenewalMap.remove(getEntryName());
????????????????????????if?(!future.isSuccess())?{
????????????????????????????log.error("Can't?update?lock?"?+?getName()?+?"?expiration",?future.cause());
????????????????????????????return;
????????????????????????}
????????????????????????
????????????????????????if?(future.getNow())?{
????????????????????????????//?reschedule?itself
????????????????????????????scheduleExpirationRenewal(threadId);
????????????????????????}
????????????????????}
????????????????});
????????????}
????????},?internalLockLeaseTime?/?3,?TimeUnit.MILLISECONDS);
????????if?(expirationRenewalMap.putIfAbsent(getEntryName(),?task)?!=?null)?{
????????????task.cancel();
????????}
????}
代码的流程比较简单,大概就是开启一个定时任务,每隔internalLockLeaseTime / 3
的时间(这个时间是10秒)就去检测锁是否还被当前线程持有,是的话就重新设置过期时长internalLockLeaseTime
,也就是30秒的时间。
而这些定时任务会存储在一个ConcurrentHashMap对象expirationRenewalMap
中,存储的key就为“线程ID:key名称”,如果发现expirationRenewalMap
中不存在对应当前线程key的话,定时任务就不会跑,这也是后面解锁中的一步重要操作。
上面这段代码就是Redisson中所谓的”看门狗“程序,用一个异步线程来定时检测并执行的,以防手动解锁之前就过期了。
其他的逻辑就跟tryLock()
基本没什么两样啦,大家看一下就知道了
解锁
有拿锁的方法,自然也就有解锁。Redisson分布式锁解锁的上层调用方法是unlock(),默认不用传任何参数
@Override
????public?void?unlock()?{
?????//?发起释放锁的命令请求
????????Boolean?opStatus?=?get(unlockInnerAsync(Thread.currentThread().getId()));
????????if?(opStatus?==?null)?{
????????????throw?new?IllegalMonitorStateException("attempt?to?unlock?lock,?not?locked?by?current?thread?by?node?id:?"
????????????????????+?id?+?"?thread-id:?"?+?Thread.currentThread().getId());
????????}
????????if?(opStatus)?{
?????????//?成功释放锁,取消"看门狗"的续时线程
????????????cancelExpirationRenewal();
????????}
????}
解锁相关的命令操作在unlockInnerAsync
方法中定义,
又是一大串的lua脚本,比起前面加锁那段脚本的命令稍微复杂了点,不过没关系,我们简单梳理一下,命令的逻辑大概是这么几步:
1、判断锁是否存在,不存在的话用publish
命令发布释放锁的消息,订阅者收到后就能做下一步的拿锁处理;
2、锁存在但不是当前线程持有,返回空置nil;
3、当前线程持有锁,用hincrby
命令将锁的可重入次数-1,然后判断重入次数是否大于0,是的话就重新刷新锁的过期时长,返回0,否则就删除锁,并发布释放锁的消息,返回1;
当线程完全释放锁后,就会调用cancelExpirationRenewal()
方法取消"看门狗"的续时线程
void?cancelExpirationRenewal()?{
?//?expirationRenewalMap移除对应的key,就不会执行当前线程对应的"看门狗"程序了
????Timeout?task?=?expirationRenewalMap.remove(getEntryName());
????if?(task?!=?null)?{
????????task.cancel();
????}
}
这就是释放锁的过程了,怎么样,是不是还是比较简单的,阅读起来比加锁那份代码舒服多了,当然啦,简单归简单,为了方便你们理清整个分布式锁的过程,我当然还是费心费力的给你们画流程图展示下啦(就冲这点,是不是该给我来个三连啊,哈哈):
RedLock
以上就是Redisson分布式锁的原理讲解,总的来说,就是简单的用lua脚本整合基本的set
命令实现锁的功能,这也是很多Redis分布式锁工具的设计原理。除此之外,Redisson还支持用"RedLock算法"来实现锁的效果,这个工具类就是RedissonRedLock
。
用法也很简单,创建多个Redisson Node, 由这些无关联的Node就可以组成一个完整的分布式锁
RLock?lock1?=?Redisson.create(config1).getLock(lockKey);
RLock?lock2?=?Redisson.create(config2).getLock(lockKey);
RLock?lock3?=?Redisson.create(config3).getLock(lockKey);
RedissonRedLock?redLock?=?new?RedissonRedLock(lock1,?lock2,?lock3);
try?{
???redLock.lock();
}?finally?{
???redLock.unlock();
}
RedLock算法原理方面我就不细说了,大家有兴趣可以看我之前的文章,或者是网上搜一下,简单的说就是能一定程度上能有效防止Redis实例单点故障的问题,但并不完全可靠,不管是哪种设计,光靠Redis本身都是无法保证锁的强一致性的。
还是那句话,鱼和熊掌不可兼得,性能和安全方面也往往如此,Redis强大的性能和使用的方便足以满足日常的分布式锁需求,如果业务场景对锁的安全隐患无法忍受的话,最保底的方式就是在业务层做幂等处理。
总结
看了本文的源码解析,相信各位看官对Redisson分布式锁的设计也有了足够的了解,当然啦,虽然是讲解源码,我们的主要精力还是放在分布式锁的原理上,一些无关流程的代码就没有带大家字斟酌句的解读了,大家有兴趣的话可以自己去阅读看看,源码中很多地方都展示了一些基础并发工具和网络通信的妙用之处,学习一下还是挺有收获的。
最后我还是想吐槽一下,Redisson的注释是真的少啊。。。。。。
如果您觉得文章有用的话,欢迎点个赞支持一下,这将是对我创作的最好鼓励!
作者:鄙人薛某,一个不拘于技术的互联网人,喜欢用通俗易懂的语言来解构后端技术的知识点,想看更多精彩文章的可以关注我的公众号,微信搜索【鄙人薛某】即可关注
内容总结
以上是互联网集市为您收集整理的又长又细,万字长文带你解读Redisson分布式锁的源码全部内容,希望文章能够帮你解决又长又细,万字长文带你解读Redisson分布式锁的源码所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。