分布式锁(2)Redission之Fair Lock
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了分布式锁(2)Redission之Fair Lock,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含4756字,纯文字阅读大概需要7分钟。
内容图文
![分布式锁(2)Redission之Fair Lock](/upload/InfoBanner/zyjiaocheng/910/5a6527a0e32f4ec683d9aa49864a69a6.jpg)
Fair Lock
代码示例:
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://192.168.31.114:7001")
.addNodeAddress("redis://192.168.31.184:7002");
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getFairLock("anyLock");
lock.lock();
lock.unlock();
基本逻辑和可重入锁类似,最大区别是加锁的逻辑。
获取锁的核心逻辑:RedissonFairLock
RedissonLock#tryLockInnerAsync()
if (command == RedisCommands.EVAL_LONG) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads
"while true do "
//弹出队列中的第一个元素
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
//如果队列为空,跳出循环
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
//得到有序集合中第一个元素的过期时间,timeout = set集合里面线程对应的过期时间
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
//如果过期时间<当前时间
+ "if timeout <= tonumber(ARGV[4]) then "
//清空集合
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
//清空队列
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
//锁不存在(未被持有)并且队列为空或者队列的第一个元素是当前线程
+ "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//把队列的第一个元素弹出来
"redis.call('lpop', KEYS[2]); " +
//从有序set集合中删除当前线程对应的元素
"redis.call('zrem', KEYS[3], ARGV[2]); " +
//给当前线程加锁,key:anyLock,field:当前线程ID,value:1
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//设置锁的过期时间为30秒
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//anyLock锁中,存在field:当前线程ID,value:1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//给value+1,即field:当前线程ID,value:2
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//设置锁的过期时间为30秒
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//得到队列中的第一个元素
"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
"local ttl; " +
//队列中的第一个元素存在并且线程id不等于当前线程
"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +
//tl = 队列第一个元素过期时间 - 当前时间
"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +
"else "
//如果队列是空,ttl = anyLock的存活时间
+ "ttl = redis.call('pttl', KEYS[1]);" +
"end; " +
//timeout= ttl + 当前时间 + 50秒(等待时间)
"local timeout = ttl + tonumber(ARGV[3]);" +
//往set集合插入一个元素,过期时间为timeout,并把它放入队列中
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end; " +
"return ttl;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
}
释放锁的核心代码:RedissonFairLock
RedissonLock#unlockInnerAsync()
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads
"while true do "
//获取队列中的第一个元素
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
//如果队列为空,跳出循环
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
//timeout = set集合中线程对应的元素存活时间
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
//如果timeout <=当前时间
+ "if timeout <= tonumber(ARGV[4]) then "
//清空set集合
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
//清空队列
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
//锁不存在(即锁未被持有)
+ "if (redis.call('exists', KEYS[1]) == 0) then " +
//取出队列中的第一个元素
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
//队列中还存在线程
"if nextThreadId ~= false then " +
//发布一个解锁消息事件
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
//anylock的map结构中没有任何线程
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//递减anyLock的threadId的value
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//说明加了多次锁
"if (counter > 0) then " +
//重新设置锁的过期时间为30秒
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " +
//删除anyLock这个key
"redis.call('del', KEYS[1]); " +
//得到队列的第一个元素
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
//如果从队列中取得出第一个元素
"if nextThreadId ~= false then " +
//队列中的第一个元素通过消息事件,尝试去获取锁
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
redisson 公平锁的原理:
内容总结
以上是互联网集市为您收集整理的分布式锁(2)Redission之Fair Lock全部内容,希望文章能够帮你解决分布式锁(2)Redission之Fair Lock所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。