RocketMQ(三)——————javaAPI(7.事务消息)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了RocketMQ(三)——————javaAPI(7.事务消息),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2823字,纯文字阅读大概需要5分钟。
内容图文
![RocketMQ(三)——————javaAPI(7.事务消息)](/upload/InfoBanner/zyjiaocheng/596/e4cfb880c1a243b68c764544f3de2acd.jpg)
Half Message:
预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
检查事务状态:
Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,
每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。
超时:
如果超过回查次数,默认回滚消息
TransactionListener的两个方法
executeLocalTransaction
半消息发送成功触发此方法来执行本地事务
checkLocalTransaction
broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
本地事务执行状态
LocalTransactionState.COMMIT_MESSAGE
执行事务成功,确认提交
LocalTransactionState.ROLLBACK_MESSAGE
回滚消息,broker端会删除半消息
LocalTransactionState.UNKNOW
暂时为未知状态,等待broker回查
1、生产者样例
//1.发送事务消息 public static void main(String[] args) throws Exception { TransactionMQProducer producer = new TransactionMQProducer("TransactionGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("==e xecuteLocalTransaction=="); System.out.println("message-body : "+message.getBody()); System.out.println("message-TransactionId : "+message.getTransactionId()); try { //业务 }catch (Exception e){ //回滚消息,broker端会删除半消息 return LocalTransactionState.ROLLBACK_MESSAGE; } //执行事务成功,确认提交 return LocalTransactionState.COMMIT_MESSAGE; } public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("==c heckLocalTransaction=="); System.out.println("message-body : "+new String(messageExt.getBody())); System.out.println("message-TransactionId : "+messageExt.getTransactionId()); //暂时为未知状态,等待broker回查 //return LocalTransactionState.UNKNOW; //回滚消息,broker端会删除半消息 //return LocalTransactionState.ROLLBACK_MESSAGE; //执行事务成功,确认提交 return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); TransactionSendResult sendResult = producer.sendMessageInTransaction(new Message ("TransactionTopic", "事务消息!".getBytes()), null); System.out.println("sendResult : "+sendResult); producer.shutdown(); System.out.println("生产者下线!"); }
2、消费者样例
//1.接收事务消息 public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("TransactionTopic","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt mes: list) { System.out.println("mes : "+new String(mes.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer start..."); }
内容总结
以上是互联网集市为您收集整理的RocketMQ(三)——————javaAPI(7.事务消息)全部内容,希望文章能够帮你解决RocketMQ(三)——————javaAPI(7.事务消息)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。