【分布式锁】05-使用Redisson中Semaphore和CountDownLatch原理
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了【分布式锁】05-使用Redisson中Semaphore和CountDownLatch原理,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7142字,纯文字阅读大概需要11分钟。
内容图文
前言
前面已经写了Redisson大多的内容,我们再看看Redisson官网共有哪些组件:
image.png
剩下还有Semaphore和CountDownLatch两块,我们就趁热打铁,赶紧看看Redisson是如何实现的吧。
我们在JDK中都知道Semaphore和CountDownLatch两兄弟,这里就不多赘述,不了解的可以再回头看看。
Semaphore使用示例
先看下Semaphore原理图如下:
image.png
接着我们看下Redisson中使用的案例:
RSemaphore?semaphore?=?redisson.getSemaphore("semaphore");
//?同时最多允许3个线程获取锁
semaphore.trySetPermits(3);
for(int?i?=?0;?i?<?10;?i++)?{
??new?Thread(new?Runnable()?{
????@Override
????public?void?run()?{
??????try?{
????????System.out.println(new?Date()?+?":线程["?+?Thread.currentThread().getName()?+?"]尝试获取Semaphore锁");?
????????semaphore.acquire();
????????System.out.println(new?Date()?+?":线程["?+?Thread.currentThread().getName()?+?"]成功获取到了Semaphore锁,开始工作");?
????????Thread.sleep(3000);??
????????semaphore.release();
????????System.out.println(new?Date()?+?":线程["?+?Thread.currentThread().getName()?+?"]释放Semaphore锁");?
??????}?catch?(Exception?e)?{
????????e.printStackTrace();
??????}
????}
??}).start();
}
Semaphore源码解析
接着我们根据上面的示例,看看源码是如何实现的:
第一步:
semaphore.trySetPermits(3);
public?class?RedissonSemaphore?extends?RedissonExpirable?implements?RSemaphore?{
????@Override
????public?boolean?trySetPermits(int?permits)?{
????????return?get(trySetPermitsAsync(permits));
????}
????@Override
????public?RFuture<Boolean>?trySetPermitsAsync(int?permits)?{
????????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
????????????????"local?value?=?redis.call('get',?KEYS[1]);?"?+
????????????????"if?(value?==?false?or?value?==?0)?then?"
????????????????????+?"redis.call('set',?KEYS[1],?ARGV[1]);?"
????????????????????+?"redis.call('publish',?KEYS[2],?ARGV[1]);?"
????????????????????+?"return?1;"
????????????????+?"end;"
????????????????+?"return?0;",
????????????????Arrays.<Object>asList(getName(),?getChannelName()),?permits);
????}
}
执行流程为:
- get semaphore,获取到一个当前的值
- 第一次数据为0, 然后使用set semaphore 3,将这个信号量同时能够允许获取锁的客户端的数量设置为3
- 然后发布一些消息,返回1
接着看看semaphore.acquire();
和semaphore.release();
逻辑:
public?class?RedissonSemaphore?extends?RedissonExpirable?implements?RSemaphore?{
????@Override
????public?RFuture<Boolean>?tryAcquireAsync(int?permits)?{
????????if?(permits?<?0)?{
????????????throw?new?IllegalArgumentException("Permits?amount?can't?be?negative");
????????}
????????if?(permits?==?0)?{
????????????return?RedissonPromise.newSucceededFuture(true);
????????}
????????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
??????????????????"local?value?=?redis.call('get',?KEYS[1]);?"?+
??????????????????"if?(value?~=?false?and?tonumber(value)?>=?tonumber(ARGV[1]))?then?"?+
??????????????????????"local?val?=?redis.call('decrby',?KEYS[1],?ARGV[1]);?"?+
??????????????????????"return?1;?"?+
??????????????????"end;?"?+
??????????????????"return?0;",
??????????????????Collections.<Object>singletonList(getName()),?permits);
????}
????@Override
????public?RFuture<Void>?releaseAsync(int?permits)?{
????????if?(permits?<?0)?{
????????????throw?new?IllegalArgumentException("Permits?amount?can't?be?negative");
????????}
????????if?(permits?==?0)?{
????????????return?RedissonPromise.newSucceededFuture(null);
????????}
????????return?commandExecutor.evalWriteAsync(getName(),?StringCodec.INSTANCE,?RedisCommands.EVAL_VOID,
????????????"local?value?=?redis.call('incrby',?KEYS[1],?ARGV[1]);?"?+
????????????"redis.call('publish',?KEYS[2],?value);?",
????????????Arrays.<Object>asList(getName(),?getChannelName()),?permits);
????}
}
先看看加锁的逻辑tryAcquireAsync()
:
- get semaphore,获取到一个当前的值,比如说是3,3 > 1
- decrby semaphore 1,将信号量允许获取锁的客户端的数量递减1,变成2
- decrby semaphore 1
- decrby semaphore 1
- 执行3次加锁后,semaphore值为0
此时如果再来进行加锁则直接返回0,然后进入死循环去获取锁,如下图:
image.png
接着看看解锁逻辑releaseAsync()
:
- incrby semaphore 1,每次一个客户端释放掉这个锁的话,就会将信号量的值累加1,信号量的值就不是0了
看到这里大家就明白了了,Redisson实现Semaphore其实是很简单了
CountDownLatch使用示例
使用案例:
RCountDownLatch?latch?=?redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(3);
System.out.println(new?Date()?+?":线程["?+?Thread.currentThread().getName()?+?"]设置了必须有3个线程执行countDown,进入等待中。。。");?
for(int?i?=?0;?i?<?3;?i++)?{
??new?Thread(new?Runnable()?{
????@Override
????public?void?run()?{
??????try?{
????????System.out.println(new?Date()?+?":线程["?+?Thread.currentThread().getName()?+?"]在做一些操作,请耐心等待。。。。。。");?
????????Thread.sleep(3000);?
????????RCountDownLatch?localLatch?=?redisson.getCountDownLatch("anyCountDownLatch");
????????localLatch.countDown();
????????System.out.println(new?Date()?+?":线程["?+?Thread.currentThread().getName()?+?"]执行countDown操作");?
??????}?catch?(Exception?e)?{
????????e.printStackTrace();?
??????}
????}
??}).start();
}
latch.await();
System.out.println(new?Date()?+?":线程["?+?Thread.currentThread().getName()?+?"]收到通知,有3个线程都执行了countDown操作,可以继续往下走");?
CountDownLatch 源码解析
源码如下:
?public?class?RedissonCountDownLatch?extends?RedissonObject?implements?RCountDownLatch?{
????@Override
????public?RFuture<Boolean>?trySetCountAsync(long?count)?{
????????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
????????????????"if?redis.call('exists',?KEYS[1])?==?0?then?"
????????????????????+?"redis.call('set',?KEYS[1],?ARGV[2]);?"
????????????????????+?"redis.call('publish',?KEYS[2],?ARGV[1]);?"
????????????????????+?"return?1?"
????????????????+?"else?"
????????????????????+?"return?0?"
????????????????+?"end",
????????????????Arrays.<Object>asList(getName(),?getChannelName()),?newCountMessage,?count);
????}
????@Override
????public?RFuture<Void>?countDownAsync()?{
????????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
????????????????????????"local?v?=?redis.call('decr',?KEYS[1]);"?+
????????????????????????"if?v?<=?0?then?redis.call('del',?KEYS[1])?end;"?+
????????????????????????"if?v?==?0?then?redis.call('publish',?KEYS[2],?ARGV[1])?end;",
????????????????????Arrays.<Object>asList(getName(),?getChannelName()),?zeroCountMessage);
????}
}
先分析trySetCount()
方法逻辑:
- exists anyCountDownLatch,第一次肯定是不存在的
- set redisson_countdownlatch__channel__anyCountDownLatch 3
- 返回1
接着分析latch.await();
方法,如下图:
image.png
这个方法其实就是陷入一个while true死循环,不断的get anyCountDownLatch的值,如果这个值还是大于0那么就继续死循环,否则的话呢,就退出这个死循环
最后分析localLatch.countDown();
方法:
- decr anyCountDownLatch,就是每次一个客户端执行countDown操作,其实就是将这个cocuntDownLatch的值递减1
await()
方面已经分析过,死循环去判断anyCountDownLatch对应存储的值是否为0,如果为0则接着执行自己的逻辑
总结
看到了这里 这两个组件是不是很简单?
到了这里,Redisson部分的学习都已经结束了,后面还会学习ZK实现分布式锁的原理。
申明
本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!
感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫
内容总结
以上是互联网集市为您收集整理的【分布式锁】05-使用Redisson中Semaphore和CountDownLatch原理全部内容,希望文章能够帮你解决【分布式锁】05-使用Redisson中Semaphore和CountDownLatch原理所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。