Redis中的Stream数据类型作为消息队列的尝试
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Redis中的Stream数据类型作为消息队列的尝试,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含5181字,纯文字阅读大概需要8分钟。
内容图文
Redis的List数据类型作为消息队列,已经比较合适了,但存在一些不足,比如只能独立消费,订阅发布又无法支持数据的持久化,相对前两者,Redis Stream作为消息队列的使用更为有优势。 ? 相信球迷小伙伴们对文字直播这个东西都不陌生,时常在想,这个功能是怎么实现的? 具体说就是用什么技术实现最为合适?如何面对数以百万计的读压力?广告消息是如何插播进来的?最后的历史消息如何归档,如何持久化存储? 文字直播其实就是解说员作为生产者,生产消息(文字信息),各种客户端作为消费者,消费信息(刷新文字内容)。 典型的消息队列实现,可以用队列或者类似队列的功能实现,这里只是简单想象一下,结合redis中的stream数据类型,来学习stream作为消息队列的功能实现。 ? ? 1,生成者:生产者队列的创建,与消息的增删改 1.1 创建并写入消息 语法:xadd?queue_name?Id?filed value(filed value) ? ?? 1,每一组消息需要一个唯一的Id,*号表示服务器自动生成ID,后面顺序跟着一组或者多组消息(filed value) ? ? ?2,消息ID的形式是timestampInMillis-sequence,例如1527846880572-5,它表示当前的消息在毫米时间戳1527846880572时产生,并且是该毫秒内产生的第5条消息。 ? ??? ? ?? 消息ID可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的ID要大于前面的消息ID。 ? ? ?3,消息元素的的结构为key-value,必须成对出现,如果key或者value元素中有空格,必须用"abc ?def"或者'abc ?def'括起来 ? 1.2?生产者写入消息 语法:xadd?queue_name *|Id filed value? ?? ? 1.3 xlen 当前stream的长度:xlen stream_namexlen "NBA_Match_001"
?,也就是上面写入的10条消息
1.4 限制某一个stream的最大长度,maxlen?
依据先进先出的原则,自动删除超出最长长度的消息
xadd?"NBA_Match_001" maxlen 50000 *?"2019-07-13?08:26:39"?"反击哈腾,一条龙上篮得分"
1.5 查询消息(查询是生产者查询自己生产的消息,跟消费者的消费是两码事)
正向查询
xrange "NBA_Match_001" # 查询所有消息
xrange "NBA_Match_001" - + # -表示最小值, +表示最大值
xrange "NBA_Match_001" 1562980142175-0 + # 指定最小消息ID的列表
xrange "NBA_Match_001"- 1562980142175-0 # 指定最大消息ID的列表
反向查询
xrevrange "NBA_Match_001"
xrevrange "NBA_Match_001" + -
xrevrange "NBA_Match_001" + 1562980142175-0
xrevrange "NBA_Match_001" 1562980142175-0 -
1.6 删除消息
xdel stream_name id,删除消息并不是真正的物理删除,队列的长度不变,指示标记当前消息被删除
? ? 2 xread:独立消费 类似于List,生产者往list中写数据,消费者从list中读数据,只能有一个消费者 ? 2. 1,从头部读取消息,从某个streams中读取n条消息,0-0只从头开始,或者指定从streams的Id开始 xread?count 1 streams?"NBA_Match_001"?0-0 xread?count 1 streams?"NBA_Match_001"?1562980142175-0 2.2,从尾部读取最新的一条消息 xread?count?1?streams?"NBA_Match_001"?$ 此时默认不返回任何消息 xread??block?0?count?1?streams?"NBA_Match_001"?$ 以阻塞的方式读取尾部最新的一条消息,直到新的消息的到来 ?
3 多消费者xgroup
:消费组,每个组中的消费者独立消费stream中的消息
典型的比如文字直播的安卓App客户端,苹果App客户端,网页客户端等等。多个终端,都可以独立地消费队列里面的
3.1 创建消费组
对消息队列"NBA_Match_001"创建了两个消费组,一个是cg1,一个是cg2,比如网页客户端与App客户端
1,xgroup?create?"NBA_Match_001"?cg1?0-0??#??表示从头开始消费 创建消费组cg1,消费组必须绑定一个steam(NBA_Match_001),从头(0-0?)开始消费"NBA_Match_001"中的消息 2,xgroup?create?"NBA_Match_001"?cg2?0-0??#??表示从头开始消费 3,2 从消费组中创建消费者 xreadgroup指令可以进行消费组的组内消费xreadgroup GROUP cg1 c1 count 1 streams?"NBA_Match_001"?>
>号表示从当前消费组的last_delivered_id后面开始读 ,?每当消费者读取一条消息,last_delivered_id变量就会前进? 当一个组的消费则消费完全部消息之后,就没有新的消息了 ?
每个消费组(Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份Stream内部的消息会被每个消费组都消费到。
同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。
每个消费者者有一个组内唯一名称。
关于消费组,可能不太好理解,举个例子就比较清楚
假设有2个消费组cg1,cg2,对于cg1,其组内共有3个消费者c1,、c2、c3。一个消息队列中共有5条消息a,b,c,d,e,那么一种可能的消费方式如下
a -> c1
b -> c2
c -> c3
d -> c1
e -> c2
也就是说3个消费者,对于消息的消费是互斥的,消费的消息是没有交集的
而对于cg2,同样可以消费a,b,c,d,e这5条消息,不依赖于cg1消费组以及消费情况,同理,具体怎么消费,取决于其组内的消费者数量
就好比体育直播的客户端,正常情况下,网页客户端可以收到所有的直播消息,手机App客户端也可以收到所有的直播消息一样,不同消费组间对消息的消费互不干扰。
4 多个生产者和多个消费者
这种情况类似以上,不用的是增加了多个消费者,在上面的基础上做了扩展。
其实不难想象,文字直播插播的广告消息,可能是类似如下结构,是另外一个独立的生产者,与文字直播员一样生成写入消息到队列,然后客户端看到的就是夹杂了广告的直播。
目前就个人认识而言,stream数据类型实现消息队列并不完美,最大的问题就是单点压力问题:这里是说单点压力,而不是单点故障,stream类型数据,其实从逻辑上看,是一个key值(stream_name),跟着一系列value(消息),这些消息只能存储在一个Redis实例中,如何缓解多个消费者对单个Key值中的消息消费压力?说来说去,不就是想说kafka的partition么……
参考:
http://database.51cto.com/art/201812/588189.htm
https://www.zhihu.com/question/279540635
内容总结
以上是互联网集市为您收集整理的Redis中的Stream数据类型作为消息队列的尝试全部内容,希望文章能够帮你解决Redis中的Stream数据类型作为消息队列的尝试所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。