【并发】9、借助redis 实现生产消费,消息订阅发布模式队列
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了【并发】9、借助redis 实现生产消费,消息订阅发布模式队列,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3703字,纯文字阅读大概需要6分钟。
内容图文
这个就是一个消息可以被多次消费的范例了
其实这个实现的方式可以参考我之前的设计模式,观察者模式
https://www.cnblogs.com/cutter-point/p/5249780.html
不过有一点需要注意一下啊,这个消息发布的时候,好像是不支持字节数据的,里面好像会对字节进行转换,这样的结果就是导致我最后无法吧相应的字节转换成我之前序列化的对象
不知道是不是ObjectInputStream和ObjectOutputStream实现不是很好的原因,还是什么,反正反序列化的时候,有些不可见的字符应该是被截掉了
消息发布者
package queue.redisQueue; import queue.fqueue.vo.TempVo; import redis.clients.jedis.Jedis; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.util.UUID; /** * @ProjectName: cutter-point * @Package: queue.redisQueue * @ClassName: RedisQueueProducter3 * @Author: xiaof * @Description: 订阅,发布模式 发布消息 * @Date: 2019/6/12 16:47 * @Version: 1.0 */ public class RedisQueueProducter3 implements Runnable { private Jedis jedis; private String queueKey; public RedisQueueProducter3(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void putMessage() { try { Thread.sleep((long) (Math.random() * 1000)); //不存在则创建,存在则直接插入 //向redis队列中存放数据 //生成数据 TempVo tempVo = new TempVo(); tempVo.setName(Thread.currentThread().getName() + ",time is:" + UUID.randomUUID()); try { int i = 0; while(i < 10) { //反馈订阅的数量 long num = jedis.publish(queueKey.getBytes(), tempVo.toString().getBytes()); if(num > 0) { System.out.println("成功!num:" + num); break; } ++i; } } catch (Exception e) { System.out.println("失败!"); } } catch (Exception e) { e.printStackTrace(); } } @Override public void run() { while(true) { putMessage(); } } }
消息消费者
package queue.redisQueue; import queue.fqueue.vo.EventVo; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import redis.clients.util.SafeEncoder; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.UnsupportedEncodingException; /** * @ProjectName: cutter-point * @Package: queue.redisQueue * @ClassName: RedisQueueConsume3 * @Author: xiaof * @Description: 发布订阅消息,订阅线程 * @Date: 2019/6/12 16:53 * @Version: 1.0 */ public class RedisQueueConsume3 implements Runnable { private Jedis jedis; private String queueKey; class myJedisPubSub extends JedisPubSub { /** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现 * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法 * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[] **/ @Override public void onMessage(String channel, String message) { System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message); //接收到exit消息后退出 System.out.println(message); } } public RedisQueueConsume3(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void consumerMessage() { jedis.subscribe(new myJedisPubSub(), queueKey); } @Override public void run() { while (true) { consumerMessage(); } } }
测试代码:
@Test public void test4() throws InterruptedException { //读写取数据 for(int i = 0; i < 2; ++i) { System.out.println("输出测试" + i); RedisQueueProducter3 producter = new RedisQueueProducter3(jedisPool.getResource(), "xiaof"); Thread t = new Thread(producter); t.start(); } while(true) { Thread.sleep(1000); } } @Test public void test5() throws InterruptedException { //读写取数据 for(int i = 0; i < 5; ++i) { System.out.println("输出测试" + i); //切记一定要重新获取Resource,不然无法并发操作 RedisQueueConsume3 fqueueConsume = new RedisQueueConsume3(jedisPool.getResource(), "xiaof"); Thread t = new Thread(fqueueConsume); t.setDaemon(true); t.start(); } while(true) { Thread.sleep(1000); } }
效果展示
同一消息被多个订阅者同步消费
内容总结
以上是互联网集市为您收集整理的【并发】9、借助redis 实现生产消费,消息订阅发布模式队列全部内容,希望文章能够帮你解决【并发】9、借助redis 实现生产消费,消息订阅发布模式队列所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。