首页 / RABBITMQ / [心得体会]RabbitMQ
[心得体会]RabbitMQ
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了[心得体会]RabbitMQ,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含17057字,纯文字阅读大概需要25分钟。
内容图文
![[心得体会]RabbitMQ](/upload/InfoBanner/zyjiaocheng/1312/f93b67bfd00645bbb1266fa7f3fd6a6c.jpg)
-
RabbitMQ是什么?
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091931428.jpg)
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091931719.jpg)
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091931873.jpg)
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091932019.jpg)
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091932161.jpg)
-
有什么用? 有什么应用场景?
-
为什么使用RabbitMQ?
-
AMQP是什么?
AMQP是什么?
AMQP,即Advanced Message Queuing Protocol,
一个提供统一消息服务的应用层标准高级消息队列协议,
是应用层协议的一个开放标准,为面向消息的中间件设计。
基于此协议的客户端与消息中间件可传递消息,
并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
AMQP,即Advanced Message Queuing Protocol,
一个提供统一消息服务的应用层标准高级消息队列协议,
是应用层协议的一个开放标准,为面向消息的中间件设计。
基于此协议的客户端与消息中间件可传递消息,
并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
-
AMQP的应用场景
存储转发(多个消息发送者,单个消息接收者)。
分布式事务(多个消息发送者,多个消息接收者)。
发布订阅(多个消息发送者,多个消息接收者)。
基于内容的路由(多个消息发送者,多个消息接收者)。
文件传输队列(多个消息发送者,多个消息接收者)。
点对点连接(单个消息发送者,单个消息接收者)。
-
RabbitMQ安装
-
Linux安装
rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el7.centos.x86_64.rpm
rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el7.centos.x86_64.rpm
rpm -Uvh http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-3.5.6-1.noarch.rpm
rpm -Uvh http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-3.5.6-1.noarch.rpm
rpm -qa|grep rabbitmq
rpm -qa|grep rabbitmq
$ sudo chkconfig rabbitmq-server on # 添加开机启动RabbitMQ服务
$ sudo /sbin/service rabbitmq-server start # 启动服务
$ sudo /sbin/service rabbitmq-server status # 查看服务状态
$ sudo /sbin/service rabbitmq-server stop # 停止服务
# 查看当前所有用户
$ sudo rabbitmqctl list_users
# 查看默认guest用户的权限
$ sudo rabbitmqctl list_user_permissions guest
# 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户
$ sudo rabbitmqctl delete_user guest
# 添加新用户
$ sudo rabbitmqctl add_user username password
# 设置用户tag
$ sudo rabbitmqctl set_user_tags username administrator
# 赋予用户默认vhost的全部操作权限
$ sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
# 查看用户的权限
$ sudo rabbitmqctl list_user_permissions username
$ sudo chkconfig rabbitmq-server on # 添加开机启动RabbitMQ服务
$ sudo /sbin/service rabbitmq-server start # 启动服务
$ sudo /sbin/service rabbitmq-server status # 查看服务状态
$ sudo /sbin/service rabbitmq-server stop # 停止服务
# 查看当前所有用户
$ sudo rabbitmqctl list_users
# 查看默认guest用户的权限
$ sudo rabbitmqctl list_user_permissions guest
# 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户
$ sudo rabbitmqctl delete_user guest
# 添加新用户
$ sudo rabbitmqctl add_user username password
# 设置用户tag
$ sudo rabbitmqctl set_user_tags username administrator
# 赋予用户默认vhost的全部操作权限
$ sudo rabbitmqctl set_permissions -p / username ".*"".*"".*"
# 查看用户的权限
$ sudo rabbitmqctl list_user_permissions username
开启web管理接口
如果只从命令行操作RabbitMQ,多少有点不方便。幸好RabbitMQ自带了web管理界面,只需要启动插件便可以使用。
$ sudo rabbitmq-plugins enable rabbitmq_management
$ sudo rabbitmq-plugins enable rabbitmq_management
然后通过浏览器访问
输入用户名和密码访问web管理界面了。
配置RabbitMQ
关于RabbitMQ的配置,可以下载RabbitMQ的配置文件模板到/etc/rabbitmq/rabbitmq.config
, 然后按照需求更改即可。
关于每个配置项的具体作用,可以参考官方文档。
更新配置后,别忘了重启服务哦!
开启用户远程访问
默认情况下,RabbitMQ的默认的guest
用户只允许本机访问, 如果想让guest
用户能够远程访问的话,只需要将配置文件中的loopback_users
列表置为空即可,如下:
{loopback_users, []}
{loopback_users,[]}
另外关于新添加的用户,直接就可以从远程访问的,如果想让新添加的用户只能本地访问,可以将用户名添加到上面的列表, 如只允许admin
用户本机访问。
{loopback_users, ["admin"]}
{loopback_users,["admin"]}
更新配置后,别忘了重启服务哦!
-
window安装方法
1. 安装erlang
2. 安装RabbitMQ
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091932295.jpg)
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091932425.jpg)
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091932596.jpg)
注意事项
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091932864.jpg)
rabbitMQ消息确认
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received ‘" + message + "‘"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; // 手动Ack功能开始 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledgedlinux
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
公平调度
int prefetchCount = 1; channel.basicQos(prefetchCount);prefetchCount = 1, 调用basicQos方法这个消费者就会只拿出一个消息进行操作, 其他的消息它都拒绝掉, 拒绝的消息将被滞留带消息队列中, 但是这个消息将会丢给新的消费者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; publicclassNewTask{ privatestaticfinal String TASK_QUEUE_NAME = "task_queue"; publicstaticvoidmain(String[] argv)throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent ‘" + message + "‘"); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; publicclassWorker{ privatestaticfinal String TASK_QUEUE_NAME = "task_queue"; publicstaticvoidmain(String[] argv)throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received ‘" + message + "‘"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } privatestaticvoiddoWork(String task){ for (char ch : task.toCharArray()) { if (ch == ‘.‘) { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091931428.jpg)
1. 消息确认
2. 消息的持久化
3. 公平调度原则
至于啥是TTL???
1. 如何给消息加上TTL?
* 为消息添加 TTL 时间的方式有两种
* (1) 为添加进入队列的每个消息都加上 TTL
* (2) 在消息即将公布的时候 , 给消息添加上消息的 ttl
(1) 为添加进入队列的每个消息都加上 TTL
Map<String, Object> arg = new HashMap<>();
arg.put("x-message-ttl", 60000);
channel.queueDeclare(QUEUE, true, false, false, arg);
(2) 在消息即将公布的时候 , 给消息添加上消息的 ttl
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("600000").build();
channel.basicPublish("my-exchange", QUEUE, properties, message.getBytes());
-
同一个消息在不同消息队列中是否会同时寿终正寝?
-
如何修改TTL的时间?
rabbitmqctl set_policy TTL ".*"‘{"message-ttl":60000}‘ --apply-to queueswindow:
rabbitmqctl set_policy TTL ".*""{""message-ttl"":60000}" --apply-to queues上面的命令将是 60 秒
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 60000); channel.queueDeclare("myqueue", false, false, false, args);临时修改60秒时间
需要注意:
- 在消息即将公布的时候, 给消息添加上ttl时间
2. 如何给队列加上TTL?
rabbitmqctl set_policy expiry ".*"‘{"expires":1800000}‘ --apply-to queueswindow:
rabbitmqctl.bat set_policy expiry ".*""{""expires"":1800000}" --apply-to queues也可以在消息队列声明的时候为队列加上TTL
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-expires", 1800000); channel.queueDeclare("myqueue", false, false, false, args);
// 为添加进入队列的每个消息都加上 TTL
// Map<String, Object> arg = new HashMap<>();
// arg.put("x-message-ttl", 60000);
// 为队列添加 TTL , 半个小时的时间
Map<String, Object> arg = new HashMap<>();
arg.put("x-expires", 1800000);
channel.queueDeclare(QUEUE, true, false, false, arg);
String message = "Hello World";
// 在消息即将公布的时候, 给消息添加上消息的ttl
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("600000").build();
channel.basicPublish("my-exchange", QUEUE, properties, message.getBytes());
package com.xuecheng.manage_cms.rabbit ;
import com.rabbitmq.client. Channel ;
import com.rabbitmq.client. Connection ;
import com.rabbitmq.client.ConnectionFactory ;
import com.rabbitmq.client.MessageProperties ;
/**
* @author zhengwei
* @version 1.0.0
* @date 2019/12/4 20:29
* @msg
**/
public class Producer01 {
private final static String QUEUE = "helloworld" ;
public static void main ( String [] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE, true, false, false, null);
String message = "Hello World";
// 将生产者的通道设置为持久化状态
channel.basicPublish("", QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("[x] Sent‘" + message + "‘");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
package com.xuecheng.manage_cms.rabbit ;
import com.rabbitmq.client. *;
import java.io.IOException ;
import java.nio.charset.StandardCharsets ;
/**
* @author zhengwei
* @version 1.0.0
* @date 2019/12/4 20:50
* @msg
**/
public class Consumer01 {
private static final String QUEUE = "helloworld" ;
public static void main ( String [] args) throws Exception {
func1 () ;
}
public static void func1 () throws Exception {
ConnectionFactory factory = new ConnectionFactory () ;
factory . setHost ( "localhost" ) ;
Connection connection = factory . newConnection () ;
Channel channel = connection . createChannel () ;
channel . queueDeclare ( QUEUE , true , false , false , null ) ;
channel . basicQos ( 1 ) ;
DeliverCallback deliverCallback = (consumerTag , delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("receive message..." + message);
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
// 应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 开启消费者的自动 Ack 功能
boolean autoAck = false;
channel.basicConsume(QUEUE, autoAck, deliverCallback, (consumerTag) -> {
System.out.println("consumerTag: " + consumerTag);
});
}
public static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == ‘.‘) {
Thread.sleep(1000);
}
}
}
public static void func() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
String routingKey = envelope.getRoutingKey();
long deliveryTag = envelope.getDeliveryTag();
String msg = new String(body, "utf-8");
System.out.println("receive message..." + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
//true if the rejected message should be requeued rather than discarded/dead-lettered
// 如果拒绝的方式是 true 则这个消息不被丢弃, false的话直接丢弃消息
channel.basicReject(envelope.getDeliveryTag(), true);
}
};
channel.basicConsume(QUEUE, true, consumer);
}
}
前言, 了解学习第二阶段的目标是什么???
1. 发布和订阅
1. Exchanges交换器
sudo rabbitmqctl list_exchanges
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091933307.jpg)
channel.basicPublish( "logs", "", null, message.getBytes());
2. Temporary queues
是什么?
作用
3. Bindings
(1) 用于交换机和队列之间的绑定
(2) 如何绑定:
channel.queueBind(queueName, "logs", "");
(3) 查看先有的绑定
4. Putting it all together
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091933595.jpg)
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
// 声明一个扇形交换机, 向所有的消费者发布消息
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "zhazha Hello World";
// 指定 logs(EXCHANGE_NAME) 交换器, 将消息发布出去
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("message = " + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(queueName, true, deliverCallback, (consumerTag) -> {
});
}
}
路由选择
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091933727.jpg)
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091933878.jpg)
直接交换
1. 是什么?
是一个算法, 消息发送到与其binding key与消息routing key完成匹配的队列
绑定键就是图中的orange, black, green, 直接交换就是图中的 direct 就是那个 X
核心: 关注
routing key, 这个就是将消息传递到这个绑定键的东西
多重绑定
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091934029.jpg)
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091934166.jpg)
发出日志
emmmmmmmmmmmm
订阅
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091934309.jpg)
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091934619.jpg)
总结
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091934890.jpg)
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String routingKey = "info";
String message = "zhazha";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent ‘" + routingKey + "‘:‘" + message + "‘");
}
}
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
String[] argv = new String[]{"info", "warning", "error"};
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
for (String routingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received ‘" + delivery.getEnvelope().getRoutingKey() + "‘:‘" + message + "‘");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
主题 -- Topics
![技术分享图片](/upload/getfiles/default/2022/10/30/20221030091935030.jpg)
原文:https://www.cnblogs.com/bangiao/p/12418392.html
内容总结
以上是互联网集市为您收集整理的[心得体会]RabbitMQ全部内容,希望文章能够帮你解决[心得体会]RabbitMQ所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。