RabbitMQ整合spring
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了RabbitMQ整合spring,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7855字,纯文字阅读大概需要12分钟。
内容图文
![RabbitMQ整合spring](/upload/InfoBanner/zyjiaocheng/1331/f4b287ab9ad546659a742eb4c6f25b0f.jpg)
1 <? xml version="1.0" encoding="UTF-8" ?> 2 < beans xmlns ="http://www.springframework.org/schema/beans" 3 xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:context ="http://www.springframework.org/schema/context" 4 xmlns:mvc ="http://www.springframework.org/schema/mvc" xmlns:jdbc ="http://www.springframework.org/schema/jdbc" 5 xmlns:jee ="http://www.springframework.org/schema/jee" xmlns:aop ="http://www.springframework.org/schema/aop" 6 xmlns:tx ="http://www.springframework.org/schema/tx" xmlns:task ="http://www.springframework.org/schema/task" 7 xmlns:rabbit ="http://www.springframework.org/schema/rabbit" 8 xsi:schemaLocation ="http://www.springframework.org/schema/beans 9 http://www.springframework.org/schema/beans/spring-beans.xsd 10 http://www.springframework.org/schema/context 11 http://www.springframework.org/schema/context/spring-context-4.3.xsd 12 http://www.springframework.org/schema/mvc 13 http://www.springframework.org/schema/mvc/spring-mvc.xsd 14 http://www.springframework.org/schema/tx 15 http://www.springframework.org/schema/tx/spring-tx.xsd 16 http://www.springframework.org/schema/aop 17 http://www.springframework.org/schema/aop/spring-aop.xsd 18 http://www.springframework.org/schema/task 19 http://www.springframework.org/schema/task/spring-task.xsd 20 http://www.springframework.org/schema/rabbit 21 http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd" > 22 23 < description >rabbitmq 连接服务配置</description> 24<!-- 不适用【发布确认】连接配置 --> 25<rabbit:connection-factory id="rabbitConnectionFactory" 26 host="172.18.112.102" username="woms" password="woms" port="5672" 27 virtual-host="lingyi" channel-cache-size="25" cache-mode="CHANNEL" publisher-confirms="true" publisher-returns="true" connection-timeout="200"/> 28 29 30 31<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> 32<property name="backOffPolicy"> 33<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> 34<property name="initialInterval" value="200"/> 35<property name="maxInterval" value="30000"/> 36</bean> 37</property> 38<property name="retryPolicy"> 39<bean class="org.springframework.retry.policy.SimpleRetryPolicy"> 40<property name="maxAttempts" value="5"/> 41</bean> 42</property> 43</bean> 44 45 46 47<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 如果使用多exchange必须配置declared-by="connectAdmin" --> 48<rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/> 49 50<rabbit:template id="ampqTemplate" connection-factory="connectionFactory" 51 exchange="test-mq-exchange" return-callback="sendReturnCallback" 52 message-converter="jsonMessageConverter" routing-key="test_queue_key" 53 mandatory="true" confirm-callback="confirmCallback" retry-template="retryTemplate"/> 54 55 56<bean id="confirmCallback" class="ly.net.rabbitmq.MsgSendConfirmCallBack"/> 57<bean id="sendReturnCallback" class="ly.net.rabbitmq.MsgSendReturnCallback"/> 58<!-- 消息对象json转换类 --> 59<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74<!-- queue配置 --> 75<!-- durable:是否持久化 --> 76<!-- exclusive: 仅创建者可以使用的私有队列,断开后自动删除 --> 77<!-- auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 --> 78<rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" declared-by="rabbitAdmin"/> 79 80 81 82 83<!-- exchange配置 --> 84<!-- rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。 --> 85<!-- rabbit:binding:设置消息queue匹配的key --> 86<rabbit:direct-exchange name="test-mq-exchange" 87 durable="true" auto-delete="false" id="test-mq-exchange" declared-by="rabbitAdmin"> 88<rabbit:bindings> 89<rabbit:binding queue="test_queue_key" key="test_queue_key"/> 90</rabbit:bindings> 91</rabbit:direct-exchange> 92 93<!-- <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false"> --> 94<!-- <rabbit:bindings> --> 95<!-- 设置消息Queue匹配的pattern (direct模式为key) --> 96<!-- <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/> --> 97<!-- </rabbit:bindings> --> 98<!-- </rabbit:topic-exchange> --> 99100101<bean id="mqConsumer" class="ly.net.rabbitmq.MQConsumer"/>102<bean id="mqConsumer1" class="ly.net.rabbitmq.MQConsumerManual"/>103104<!-- listener配置 消费者 自动确认 -->105<!-- queues:监听的队列,多个的话用逗号(,)分隔 ref:监听器 -->106<rabbit:listener-container 107connection-factory="connectionFactory" acknowledge="auto"108 message-converter="jsonMessageConverter">109<rabbit:listener queues="test_queue_key" ref="mqConsumer"/>110</rabbit:listener-container>111<!-- 消费者 手动确认 -->112<rabbit:listener-container 113connection-factory="connectionFactory" acknowledge="manual">114<rabbit:listener queues="test_queue_key" ref="mqConsumer1"/>115</rabbit:listener-container>116117118119120121122123</beans>
1 package ly.net.rabbitmq; 2 3 4 import org.springframework.amqp.rabbit.core.RabbitTemplate; 5 import org.springframework.amqp.rabbit.support.CorrelationData; 6 public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { 7 8 @Override 9publicvoid confirm(CorrelationData correlationData, boolean ack, String cause) { 10// TODO Auto-generated method stub11if (ack) { 12 System.out.println("消息确认成功"); 13 } else { 14//处理丢失的消息 15 System.out.println("消息确认失败,"+cause); 16 } 17 } 1819 }
package ly.net.rabbitmq; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.beans.factory.annotation.Autowired; public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate errorTemplate; @Override publicvoid returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { String msgJson = new String(message.getBody()); System.out.println("Returned Message:"+msgJson); //重新发布 // RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey"); // Throwable cause = new Exception(new Exception("route_fail_and_republish")); // recoverer.recover(message,cause); // System.out.println("Returned Message:"+replyText); // } }
1 package ly.net.rabbitmq; 2 3 import org.springframework.amqp.core.Message; 4 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; 5 6 import com.rabbitmq.client.Channel; 7 8 public class MQConsumerManual implements ChannelAwareMessageListener { 910 @Override 11publicvoid onMessage(Message message, Channel channel) throws Exception { 12// TODO Auto-generated method stub 13//手动确认14 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); 15 } 1617 }
@Service public class MQProducerImpl implements MQProducer { @Autowired private AmqpTemplate amqpTemplate; privatefinalstatic Logger LOGGER = Logger.getLogger(MQProducerImpl.class); /* * convertAndSend:将Java对象转换为消息发送到匹配Key的交换机中Exchange,由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 * 原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. */ @Override publicvoid sendDataToQueue(String queueKey, Object object) { try { amqpTemplate.convertAndSend(object); } catch (Exception e) { LOGGER.error(e); } } }
public interface MQProducer { /** * 发送消息到指定队列 * @param queueKey * @param object */ public void sendDataToQueue(String queueKey, Object object); }
原文:http://www.cnblogs.com/woms/p/7040902.html
内容总结
以上是互联网集市为您收集整理的RabbitMQ整合spring全部内容,希望文章能够帮你解决RabbitMQ整合spring所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。