Springboot+死信实现RabbitMQ延迟队列
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Springboot+死信实现RabbitMQ延迟队列,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2732字,纯文字阅读大概需要4分钟。
内容图文
原理
??生产者把带有 ttl(Time-To-Live过期时间) 的消息发送到一个临时队列(DelayQueue),该队列没有消费者;
??该消息在DelayQueue中停留直至过期,同时该消息没有ReQueue(重新入队),就变成了死信(Dead-letter或Dead-message),死信自动地被发送给了配置好的DLX(Dead-Letter-Exchange);
??DLX根据路由规则把消息路由到了配置好的队列中(DeadLetterQueue),队列中的消息被消费者消费。
maven依赖
引入amqp的依赖, 生产者和消费者都需要
<!--amqp 适用rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
boostrap配置
同样,生产者和消费者都需要
properties.yaml:
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
使用配置类创建各组件
rabbitmq的基本的组件是Exchange(交换机)、Queue(队列)、Binding(绑定)。实现延时队列需要定义如下组件:
- 一个带ttl的Queue临时队列;
- 一个普通的用于业务的Queue,即死信队列;
- 定义一个Exchange,即死信交换器DLX(Dead-Letter-Exchange);
- 再定义一个Bingding把死信队列和死信交换器绑定在一起。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayQueueConfig {
/**为了更贴合业务,参数名不使用DeadQueue之类的*/
/**延迟队列名*/
private static String DELAY_QUEUE = "delay.queue";
/**延迟队列(死信队列)交换器名*/
private static String DELAY_EXCHANGE = "delay.exchange";
/**处理业务的队列(死信队列)*/
private static String PROCESS_QUEUE = "process.queue";
/**ttl(10秒)*/
private static int DELAY_EXPIRATION = 10000;
/**
* 创建延迟队列
* "x-dead-letter-exchange"参数定义死信队列交换机
* "x-dead-letter-routing-key"定义死信队列中的消息重定向时的routing-key
* "x-message-ttl"定义消息的过期时间
*/
@Bean
public Queue delayQueue(){
return QueueBuilder.durable(DELAY_QUEUE)
.withArgument("x-dead-letter-exchange", DELAY_EXCHANGE)
.withArgument("x-dead-letter-routing-key", PROCESS_QUEUE)
.withArgument("x-message-ttl", DELAY_EXPIRATION)
.build();
}
/**创建用于业务的队列*/
@Bean
public Queue processQueue(){
return QueueBuilder.durable(PROCESS_QUEUE)
.build();
}
/**创建一个DirectExchange*/
@Bean
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE);
}
/**绑定Exchange和queue,把消息重定向到业务queue*/
@Bean
Binding dlxBinding(DirectExchange directExchange, Queue processQueue){
return BindingBuilder.bind(processQueue)
.to(directExchange)
.with(PROCESS_QUEUE);
//绑定,以PROCESS_QUEUE为routing key
}
}
生产者创建发送消息的组件
@Component
public class MessageSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String routingKey, String msg) {
//amqpTemplate.convertAndSend("process.queue", msg);
amqpTemplate.convertAndSend(routingKey, msg);
}
}
消费者自动接收并处理消息的组件
@Component
//注意监听的Queue是用于业务的ProcessQueue, 而不是临时存放消息的DelayQueue
@RabbitListener(queues = "process.queue")
public class MessageReceiver {
@RabbitHandler()
public void doSth(String msg) {
//TODO
}
}
原文:https://www.cnblogs.com/life-of-coding/p/13222443.html
内容总结
以上是互联网集市为您收集整理的Springboot+死信实现RabbitMQ延迟队列全部内容,希望文章能够帮你解决Springboot+死信实现RabbitMQ延迟队列所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。