首页 / REDIS / redis实现队列消息的ack
redis实现队列消息的ack
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了redis实现队列消息的ack,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含1957字,纯文字阅读大概需要3分钟。
内容图文
原文链接:http://www.cnblogs.com/ViewState/p/6662191.html由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列。
原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满足需求的(没有ack功能),所以需要自己对redis的list(队列)做个小小的调整。
大体思路为在POP时将pop出的数据放到备份的地方,当有ACK请求(确认消息被消耗)后将备份的信息删除掉;每次在pop前需要检查备份队列中有没有过期的数据没有ack的,如果有则PUSH到list中后再从list中POP出来。
以下脚本使用lua实现,只需要在执行前加载到redis中即可。
消息本身需要包含id属性
push没什么问题,原生即可(此处以LPUSH为例)
pop时脚本
1 local not_empty = function(x) 2 return (type(x) == "table") and (not x.err) and (#x ~= 0) 3 end 4 5 local qName = ARGV[1] --队列名称 6 local currentTime = ARGV[2] --当前时间,这个需要从外部传入,不能使用redis自身时间,如果使用自身时间可能导致redis本身的backup在重放请求时出现不一致性 7 local considerAsFailMaxTimeSpan = ARGV[3] --超时时间设定,当消息超过一定时间还没有ack则认为此消息需要再次入队 8 9 local zsetName= qName ..'BACKUP' 10 local hashName= qName ..'CONTEXT' 11 12 local tmp = redis.call('ZRANGEBYSCORE',zsetName , '-INF', tonumber(currentTime) - tonumber(considerAsFailMaxTimeSpan), 'LIMIT', 0, 1) 13 if (not_empty(tmp)) then 14 redis.call('ZREM', zsetName, tmp[1]) --此处拿出的为消息的唯一id 15 redis.call('LPUSH', qName, redis.call('HGET', hashName, tmp[1])) 16 end 17 tmp = redis.call('RPOP', qName) 18 if (tmp) then 19 local msg = cjson.decode(tmp) 20 local id = msg['id'] 21 redis.call('ZADD', zsetName, tonumber(currentTime), id) 22 redis.call('HSET',hashName , id, tmp) 23 end 24 return tmp
ack时候比较简单,只需要将指定id从set和hash中删除即可
1 local key = ARGV[1] 2 local qName=ARGV[2] 3 redis.call('ZREM', qName..'BACKUP', key) 4 redis.call('HDEL', qName..'CONTEXT', key)
在程序中使用前需要显示load这两个脚本,后面直接调用这两个脚本的sha值即可执行。
转载于:https://www.cnblogs.com/ViewState/p/6662191.html
内容总结
以上是互联网集市为您收集整理的redis实现队列消息的ack全部内容,希望文章能够帮你解决redis实现队列消息的ack所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。