C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含11857字,纯文字阅读大概需要17分钟。
内容图文
这篇文章主要介绍了C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
1:RabbitMQ是个啥?(专业术语参考自网络)
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库
2:使用RabbitMQ有啥好处?
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。
对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。
RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,
3:RabbitMq的安装以及环境搭建等:
网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!
3.1:运行容器的命令如下:
docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management
4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?
4.1什么时候使用MQ?
1)数据驱动的任务依赖
2)上游不关心多下游执行结果
3)异步返回执行时间长
4.2什么时候不使用MQ?
需要实时关注执行结果 (eg:同步调用)
5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)
6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:
Code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
//简单生产端 ui调用者
using
System;
namespace
RabbitMqPublishDemo
{
using
MyRabbitMqService;
using
System.Runtime.CompilerServices;
class
Program
{
static
void
Main(
string
[] args)
{
//就是简单的队列,生产者
Console.WriteLine(
"====RabbitMqPublishDemo===="
);
for
(
int
i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishSampleMsg(
"smapleMsg"
, $
"nihaifengge:{i}"
);
}
Console.WriteLine(
"生成完毕!"
);
Console.ReadLine();
}
}
}
/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public
static
void
PublishSampleMsg(
string
queueName,
string
msg)
{
using
(IConnection conn = connectionFactory.CreateConnection())
{
using
(IModel channel = conn.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable:
false
, exclusive:
false
, autoDelete:
false
, arguments:
null
);
var msgBody = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange:
""
, routingKey: queueName, basicProperties:
null
, body: msgBody);
}
}
}
//简单消费端
using
System;
namespace
RabbitMqConsumerDemo
{
using
MyRabbitMqService;
using
System.Runtime.InteropServices;
class
Program
{
static
void
Main(
string
[] args)
{
Console.WriteLine(
"====RabbitMqConsumerDemo===="
);
ZrfRabbitMqHelper.ConsumeSampleMsg(
"smapleMsg"
, isBasicNack:
true
, handleMsgStr: handleMsgStr =>
{
Console.WriteLine($
"订阅到消息:{DateTime.Now}:{handleMsgStr}"
);
});
Console.ReadLine();
}
}
}
#region 简单生产者后端逻辑
/// <summary>
/// 简单消费者
/// </summary>
/// <param name="queueName">队列名称</param>
/// <param name="isBasicNack">失败后是否自动放到队列</param>
/// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
public
static
void
ConsumeSampleMsg(
string
queueName,
bool
isBasicNack =
false
, Action<
string
> handleMsgStr =
null
)
// bool ifBasicReject = false,
{
Console.WriteLine(
"ConsumeSampleMsg Waiting for messages...."
);
IConnection conn = connectionFactory.CreateConnection();
IModel channel = conn.CreateModel();
channel.QueueDeclare(queue: queueName, durable:
false
, exclusive:
false
, autoDelete:
false
, arguments:
null
);
var consumer =
new
EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
byte
[] bymsg = ea.Body.ToArray();
string
msg = Encoding.UTF8.GetString(bymsg);
if
(handleMsgStr !=
null
)
{
handleMsgStr.Invoke(msg);
}
else
{
Console.WriteLine($
"{DateTime.Now}->收到消息:{msg}"
);
}
};
channel.BasicConsume(queueName, autoAck:
true
, consumer);
}
#endregion
|
7:Work模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
//简单生产端 ui调用者
using
System;
namespace
RabbitMqPublishDemo
{
using
MyRabbitMqService;
using
System.Runtime.CompilerServices;
class
Program
{
static
void
Main(
string
[] args)
{
//就是简单的队列,生产者
Console.WriteLine(
"====RabbitMqPublishDemo===="
);
for
(
int
i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishSampleMsg(
"smapleMsg"
, $
"nihaifengge:{i}"
);
}
Console.WriteLine(
"生成完毕!"
);
Console.ReadLine();
}
}
}
/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public
static
void
PublishSampleMsg(
string
queueName,
string
msg)
{
using
(IConnection conn = connectionFactory.CreateConnection())
{
using
(IModel channel = conn.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable:
false
, exclusive:
false
, autoDelete:
false
, arguments:
null
);
var msgBody = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange:
""
, routingKey: queueName, basicProperties:
null
, body: msgBody);
}
}
}
//简单消费端
using
System;
namespace
RabbitMqConsumerDemo
{
using
MyRabbitMqService;
using
System.Runtime.InteropServices;
class
Program
{
static
void
Main(
string
[] args)
{
Console.WriteLine(
"====RabbitMqConsumerDemo===="
);
ZrfRabbitMqHelper.ConsumeSampleMsg(
"smapleMsg"
, isBasicNack:
true
, handleMsgStr: handleMsgStr =>
{
Console.WriteLine($
"订阅到消息:{DateTime.Now}:{handleMsgStr}"
);
});
Console.ReadLine();
}
}
}
#region 简单生产者后端逻辑
/// <summary>
/// 简单消费者
/// </summary>
/// <param name="queueName">队列名称</param>
/// <param name="isBasicNack">失败后是否自动放到队列</param>
/// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
public
static
void
ConsumeSampleMsg(
string
queueName,
bool
isBasicNack =
false
, Action<
string
> handleMsgStr =
null
)
// bool ifBasicReject = false,
{
Console.WriteLine(
"ConsumeSampleMsg Waiting for messages...."
);
IConnection conn = connectionFactory.CreateConnection();
IModel channel = conn.CreateModel();
channel.QueueDeclare(queue: queueName, durable:
false
, exclusive:
false
, autoDelete:
false
, arguments:
null
);
var consumer =
new
EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
byte
[] bymsg = ea.Body.ToArray();
string
msg = Encoding.UTF8.GetString(bymsg);
if
(handleMsgStr !=
null
)
{
handleMsgStr.Invoke(msg);
}
else
{
Console.WriteLine($
"{DateTime.Now}->收到消息:{msg}"
);
}
};
channel.BasicConsume(queueName, autoAck:
true
, consumer);
}
#endregion
|
8:Fanout
Code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
//就如下的code, 多次生产,3个消费者都可以自动开始消费
//生产者
using
System;
namespace
RabbitMqPublishDemo
{
using
MyRabbitMqService;
using
System.Runtime.CompilerServices;
class
Program
{
static
void
Main(
string
[] args)
{
for
(
int
i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishWorkQueueModel(
"workqueue"
, $
" :发布消息成功{i}"
);
}
Console.WriteLine(
"工作队列模式 生成完毕......!"
);
Console.ReadLine();
}
}
}
//生产者后端逻辑
public
static
void
PublishWorkQueueModel(
string
queueName,
string
msg)
{
using
(var connection = connectionFactory.CreateConnection())
using
(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable:
true
, exclusive:
false
, autoDelete:
false
, arguments:
null
);
var body = Encoding.UTF8.GetBytes(msg);
var properties = channel.CreateBasicProperties();
properties.Persistent =
true
;
channel.BasicPublish(exchange:
""
, routingKey: queueName, basicProperties: properties, body: body);
Console.WriteLine($
"{DateTime.Now},SentMsg: {msg}"
);
}
}
//work消费端
using
System;
namespace
RabbitMqConsumerDemo
{
using
MyRabbitMqService;
using
System.Runtime.InteropServices;
class
Program
{
static
void
Main(
string
[] args)
{
Console.WriteLine(
"====Work模式开启了===="
);
ZrfRabbitMqHelper.ConsumeWorkQueueModel(
"workqueue"
, handserMsg: msg =>
{
Console.WriteLine($
"work模式获取到消息{msg}"
);
});
Console.ReadLine();
}
}
}
//work后端逻辑
public
static
void
ConsumeWorkQueueModel(
string
queueName,
int
sleepHmao = 90, Action<
string
> handserMsg =
null
)
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue: queueName, durable:
true
, exclusive:
false
, autoDelete:
false
, arguments:
null
);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global:
false
);
var consumer =
new
EventingBasicConsumer(channel);
Console.WriteLine(
" ConsumeWorkQueueModel Waiting for messages...."
);
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if
(handserMsg !=
null
)
{
if
(!
string
.IsNullOrEmpty(message))
{
handserMsg.Invoke(message);
}
}
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple:
false
);
};
channel.BasicConsume(queue: queueName, autoAck:
false
, consumer: consumer);
}
|
9:Direct
Code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
//同一个消息会被多个订阅者消费
//发布者
using
System;
namespace
RabbitMqPublishDemo
{
using
MyRabbitMqService;
using
System.Runtime.CompilerServices;
class
Program
{
static
void
Main(
string
[] args)
{
#region 发布订阅模式,带上了exchange
for
(
int
i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishExchangeModel(
"exchangemodel"
, $
"发布的消息是:{i}"
);
}
Console.WriteLine(
"发布ok!"
);
#endregion
Console.ReadLine();
}
}
}
//发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout
public
static
void
PublishExchangeModel(
string
exchangeName,
string
message)
{
using
(var connection = connectionFactory.CreateConnection())
using
(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey:
""
, basicProperties:
null
, body: body);
Console.WriteLine($
" Sent {message}"
);
}
}
//订阅者
using
System;
namespace
RabbitMqConsumerDemo
{
using
MyRabbitMqService;
using
System.Runtime.InteropServices;
class
Program
{
static
void
Main(
string
[] args)
{
#region 发布订阅模式 Exchange
ZrfRabbitMqHelper.SubscriberExchangeModel(
"exchangemodel"
, msg =>
{
Console.WriteLine($
"订阅到消息:{msg}"
);
});
#endregion
Console.ReadLine();
}
}
}
//订阅者后端的逻辑
public
static
void
SubscriberExchangeModel(
string
exchangeName, Action<
string
> handlerMsg =
null
)
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
//Fanout 扇形分叉
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey:
""
);
Console.WriteLine(
" Waiting for msg...."
);
var consumer =
new
EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if
(handlerMsg !=
null
)
{
if
(!
string
.IsNullOrEmpty(message))
{
handlerMsg.Invoke(message);
}
}
else
{
Console.WriteLine($
"订阅到消息:{message}"
);
}
};
channel.BasicConsume(queue: queueName, autoAck:
true
, consumer: consumer);
}
|
到此这篇关于C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)的文章就介绍到这了,更多相关C#使用RabbitMq队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
https://www.jb51.net/article/197580.htm
原文:https://www.cnblogs.com/proxyz/p/13938013.html
内容总结
以上是互联网集市为您收集整理的C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)全部内容,希望文章能够帮你解决C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。