rocketmq(三 java操作rocket API, rocketmq 幂等性)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了rocketmq(三 java操作rocket API, rocketmq 幂等性),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7657字,纯文字阅读大概需要11分钟。
内容图文
- JAVA操作rocketmq:
1.导入rocketmq所需要的依赖:
< dependency > < groupId >com.alibaba.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>3.0.10</version></dependency><dependency><groupId>com.alibaba.rocketmq</groupId><artifactId>rocketmq-all</artifactId><version>3.0.10</version><type>pom</type></dependency>
2.创建生产者
package com.example.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("producer-group"); producer.setNamesrvAddr("192.168.31.165:9876;192.168.31.144:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i = 0; i < 10; i++) { // Thread.sleep(1000); // 每秒发送一次MQ Message msg = new Message("producer-topic", // topic 主题名称 "msg", // pull 临时值 在消费者消费的时候 可以根据msg类型进行消费 ("pushmsg-" + i).getBytes()// body 内容 ); SendResult sendResult = producer.send(msg); System.out.println(sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
3.创建消费者
package com.example.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); consumer.setNamesrvAddr("192.168.31.165:9876;192.168.31.144:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("producer-topic", "msg");//此处是根据Message对象的参数来获取 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消息id:"+msg.getMsgId() + "---" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
4.运行结果:
生产者运行结果:
消费者运行结果:
- rocetmq幂等性问题:
在Activemq中 jms规范支持两种消息模型:点对点和发布订阅,在rocketmq中 有两种消费模式:广播消费,和集群消费。
在消费的过程中,如果消费者出现异常或者超时,导致mq没有及时的相应消费的状态,则可能让mq重试,重试机制就有可能导致出现幂等性,而rocketmq的幂等性 只会出现在集群消费(类似activemq中的点对点消息模型)
生产者:
package com.example.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("producer-group"); producer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i = 0; i < 10; i++) { Message msg = new Message("topic", // topic 主题名称 "msg", // pull 临时值 在消费者消费的时候 可以根据msg类型进行消费 (i + "条消息").getBytes()// body 内容 ); SendResult sendResult = producer.send(msg); System.out.println(sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
消费者:
package com.example.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); consumer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("topic1", "msg"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody())); } // 超时的情况 或者程序异常int i = 2 / 0; return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
消费结果:
消息id:C0A81FB100002A9F00000000000268EC---5条消息 消息id:C0A81FB100002A9F000000000002686E---4条消息 消息id:C0A81FA900002A9F0000000000037E6A---1条消息 消息id:C0A81FB100002A9F000000000002696A---6条消息 消息id:C0A81FB100002A9F00000000000269E8---7条消息 消息id:C0A81FA900002A9F0000000000038062---9条消息 消息id:C0A81FA900002A9F0000000000037EE8---2条消息 消息id:C0A81FA900002A9F0000000000037FE4---8条消息 消息id:C0A81FA900002A9F0000000000037F66---3条消息 消息id:C0A81FA900002A9F0000000000037DEC---0条消息 消息id:C0A81FA900002A9F0000000000038704---1条消息 消息id:C0A81FA900002A9F000000000003880C---9条消息 消息id:C0A81FA900002A9F0000000000038914---2条消息 消息id:C0A81FA900002A9F0000000000038A1C---0条消息 消息id:C0A81FA900002A9F0000000000038B24---3条消息 消息id:C0A81FA900002A9F0000000000038C2C---8条消息 消息id:C0A81FB100002A9F0000000000026E7E---4条消息 消息id:C0A81FB100002A9F0000000000026F86---7条消息 消息id:C0A81FB100002A9F0000000000027196---5条消息 消息id:C0A81FB100002A9F000000000002708E---6条消息
在Activimq中,可以通过消息id 来作为全局变量,检测是不是重复消费。但是在rocketmq中消费重试的结果中,任意选出两条相同的消息,可以看出 重试的时候消息id是不同的,此时在用消息id作为全局变量判断是否重复消费显然是不可能的。rocketmq中提供了一个消息的key,可以将业务id作为该key。例如:订单号什么的。可以将消息设置的key 在第一次消费的时候存放到数据库之中
幂等性消费者:
package com.example.consumer; import java.util.HashMap; import java.util.List; import java.util.Map; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static Map<String, String> map = new HashMap<String, String>();// 模拟内存,实际情况可以将key放在redis之中publicstaticvoid main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); consumer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("topic1", "msg"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { if (!map.containsKey(msg.getKeys())) { // 如果此时的业务逻辑是将收到的消息存放到数据库 System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody())); map.put(msg.getKeys(), new String(msg.getBody())); } else { System.out.println("重复消费"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } // 超时的情况 或者程序异常int i = 2 / 0; return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
原文:https://www.cnblogs.com/920913cheng/p/10730497.html
内容总结
以上是互联网集市为您收集整理的rocketmq(三 java操作rocket API, rocketmq 幂等性)全部内容,希望文章能够帮你解决rocketmq(三 java操作rocket API, rocketmq 幂等性)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。