请选择 进入手机版 | 继续访问电脑版

[ASP.NET] 运用.net core中实例讲解RabbitMQ

[复制链接]
查看193 | 回复39 | 2021-9-15 07:31:09 | 显示全部楼层 |阅读模式
目次

一、RabbitMQ简介

是一个开源的消息代理和队列服务器,用来通过平凡 协议在完全不同的应用之间共享数据,RabbitMQ是使用 Erlang(高并发语言)语言来编写的,并且RabbitMQ是基于AMQP协议的。

(1) AMQP协议

Advanced Message Queuing Protocol(高级消息队列协议)

(2)AMQP专业术语

(多路复用->在同一个线程中开启多个通道举行 操作)

  • Server:又称broker,担当 客户端的链接,实现AMQP实体服务
  • Connection:毗连 ,应用程序与broker的网络毗连
  • Channel:网络信道,几乎全部 的操作都在channel中举行 ,Channel是举行 消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话使命 。
  • Message:消息,服务器与应用程序之间传送的数据,由Properties和Body构成 .Properties可以对消息举行 修饰,必须消息的优先级、耽误 等高级特性;Body则是消息体内容。
  • virtualhost: 假造 地址,用于举行 逻辑隔离,最上层的消息路由。一个virtual host内里 可以有多少 个Exchange和Queue,同一个Virtual Host 内里 不能有类似 名称的Exchange 或 Queue。
  • Exchange:交换机,吸收 消息,根据路由键转单消息到绑定队列
  • Binding: Exchange和Queue之间的假造 链接,binding中可以包换routing key
  • Routing key: 一个路由规则,假造 机可用它来确定怎样 路由一个特定消息。(如负载均衡 )

(3)RabbitMQ团体 架构

运用.net core中实例讲解RabbitMQ

ClientA(生产者)发送消息到Exchange1(交换机),同时带上RouteKey(路由Key),Exchange1找到绑定交换机为它和绑定传入的RouteKey的队列,把消息转发到对应的队列,消耗 者Client1,Client2,Client3只必要 指定对应的队列名即可以消耗 队列数据。

交换机和队列多对多关系,实际 开辟 中一样平常 是一个交换机对多个队列,防止计划 复杂化。

二、安装RabbitMQ

安装方式不影响下面的使用 ,这里用Docker安装

  1.  #15672端口为web管理端的端口,5672为RabbitMQ服务的端口
  2. docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management
复制代码

输入:ip:5672访问验证。

运用.net core中实例讲解RabbitMQ

建一个名为develop的Virtual host(假造 主机)使用 ,项目中一样平常 是一个项目建一个Virtual host用,可以或许 隔离队列。

运用.net core中实例讲解RabbitMQ

切换Virtual host

运用.net core中实例讲解RabbitMQ

三、RabbitMQ六种队列模式在.NetCore中使用

(1)简单队列

最简单的工作队列,此中 一个消息生产者,一个消息消耗 者,一个队列。也称为点对点模式

运用.net core中实例讲解RabbitMQ

形貌 :一个生产者 P 发送消息到队列 Q,一个消耗 者 C 吸收

建一个RabbitMQHelper.cs类

  1. /// <summary>
  2. /// RabbitMQ帮助类
  3. /// </summary>
  4. public class RabbitMQHelper
  5. {
  6. private static ConnectionFactory factory;
  7. private static object lockObj = new object();
  8. /// <summary>
  9. /// 获取单个RabbitMQ连接
  10. /// </summary>
  11. /// <returns></returns>
  12. public static IConnection GetConnection()
  13. {
  14. if (factory == null)
  15. {
  16. lock (lockObj)
  17. {
  18. if (factory == null)
  19. {
  20. factory = new ConnectionFactory
  21. {
  22. HostName = "172.16.2.84",//ip
  23. Port = 5672,//端口
  24. UserName = "admin",//账号
  25. Password = "123456",//密码
  26. VirtualHost = "develop" //虚拟主机
  27. };
  28. }
  29. }
  30. }
  31. return factory.CreateConnection();
  32. }
  33. }
复制代码

生产者代码

新建发送类Send.cs

  1. public static void SimpleSendMsg()
  2. {
  3. string queueName = "simple_order";//队列名
  4. //创建连接
  5. using (var connection = RabbitMQHelper.GetConnection())
  6. {
  7. //创建信道
  8. using (var channel = connection.CreateModel())
  9. {//创建队列
  10. channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
  11. for (var i = 0; i < 10; i++)
  12. {
  13. string message = $"Hello RabbitMQ MessageHello,{i + 1}";
  14. var body = Encoding.UTF8.GetBytes(message);//发送消息
  15. channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);
  16. Console.WriteLine($"发送消息到队列:{queueName},内容:{message}");
  17. }
  18. }
  19. }
  20. }
复制代码

创建队列参数分析 :

durable:是否持久化。

exclusive:排他队列,只有创建它的毗连 (connection)能连,创建它的毗连 关闭,会自动 删除队列。

autoDelete:被消耗 后,消耗 者数目 都断开时自动 删除队列。

arguments:创建队列的参数。

发送消息参数分析 :

exchange:交换机,为什么能传空呢,由于 RabbitMQ内置有一个默认的交换机,假如 传空时,就会用默认交换机。

routingKey:路由名称,这里用队列名称做路由key。

mandatory:true告诉服务器至少将消息route到一个队列种,否则就将消息return给发送者;false:没有找到路由则消息丢弃。

实行 效果 :

运用.net core中实例讲解RabbitMQ

队列产生10条消息。

运用.net core中实例讲解RabbitMQ

消耗 者代码

新建Recevie.cs类

  1. public static void SimpleConsumer()
  2. {
  3. string queueName = "simple_order";
  4. var connection = RabbitMQHelper.GetConnection();
  5. {
  6. //创建信道
  7. var channel = connection.CreateModel();
  8. {
  9. //创建队列
  10. channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
  11. var consumer = new EventingBasicConsumer(channel);
  12. int i = 0;
  13. consumer.Received += (model, ea) =>
  14. {
  15. //消费者业务处理
  16. var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  17. Console.WriteLine($"{i},队列{queueName}消费消息长度:{message.Length}");
  18. i++;
  19. };
  20. channel.BasicConsume(queueName, true, consumer);
  21. }
  22. }
  23. }
复制代码

消耗 者只必要 知道队列名就可以消耗 了,不必要 Exchange和routingKey。

注:消耗 者这里有一个创建队列,它本身不必要 ,是防备 消耗 端程序先实行 ,没有队列会报错。

实行 效果 :

运用.net core中实例讲解RabbitMQ

运用.net core中实例讲解RabbitMQ

消息已经被消耗 完。

(2)工作队列模式

一个消息生产者,一个交换器,一个消息队列,多个消耗 者。同样也称为点对点模式

运用.net core中实例讲解RabbitMQ

生产者P发送消息到队列,多个消耗 者C消耗 队列的数据。

工作队列也称为公平性队列模式,循环分发,RabbitMQ将按次序 将每条消息发送给下一个消耗 者,每个消耗 者将获得类似 数目 的消息。

生产者

Send.cs代码:

  1. /// <summary>
  2. /// 工作队列模式
  3. /// </summary>
  4. public static void WorkerSendMsg()
  5. {
  6. string queueName = "worker_order";//队列名
  7. //创建连接
  8. using (var connection = RabbitMQHelper.GetConnection())
  9. {
  10. //创建信道
  11. using (var channel = connection.CreateModel())
  12. {
  13. //创建队列
  14. channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
  15.              var properties = channel.CreateBasicProperties();
  16.              properties.Persistent = true; //消息持久化
  17. for ( var i=0;i<10;i++)
  18. {
  19. string message = $"Hello RabbitMQ MessageHello,{i+1}";
  20. var body = Encoding.UTF8.GetBytes(message);
  21. //发送消息到rabbitmq
  22. channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: properties, body);
  23. Console.WriteLine($"发送消息到队列:{queueName},内容:{message}");
  24. }
  25. }
  26. }
  27. }
复制代码

参数durable:true,必要 持久化,实际 项目中肯定必要 持久化的,不然重启RabbitMQ数据就会丢失了。

实行 效果 :

运用.net core中实例讲解RabbitMQ

写入10条数据,有持久化标识D

运用.net core中实例讲解RabbitMQ

消耗 端

Recevie代码:

  1. public static void WorkerConsumer()
  2. {
  3. string queueName = "worker_order";
  4. var connection = RabbitMQHelper.GetConnection();
  5. {
  6. //创建信道
  7. var channel = connection.CreateModel();
  8. {
  9. //创建队列
  10. channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
  11. var consumer = new EventingBasicConsumer(channel);
  12. //prefetchCount:1来告知RabbitMQ,不要同时给一个消费者推送多于 N 个消息,也确保了消费速度和性能
  13. channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false);
  14. int i = 1;
  15. int index = new Random().Next(10);
  16. consumer.Received += (model, ea) =>
  17. {
  18. //处理业务
  19. var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  20. Console.WriteLine($"{i},消费者:{index},队列{queueName}消费消息长度:{message.Length}");
  21. channel.BasicAck(ea.DeliveryTag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了               Thread.Sleep(1000); i++;
  22. };
  23. channel.BasicConsume(queueName,autoAck:false, consumer);
  24. }
  25. }
  26. }
复制代码

BasicQos参数分析 :

prefetchSize:每条消息大小,一样平常 设为0,表示不限定 。

prefetchCount:1,作用限流,告诉RabbitMQ不要同时给一个消耗 者推送多于N个消息,消耗 者会把N条消息缓存到本地一条条消耗 ,假如 不设,RabbitMQ会进大概 快的把消息推到客户端,导致客户端内存升高。设置合理可以不用频仍 从RabbitMQ 获取能提拔 消耗 速率 和性能,设的太多的话则会增大本地内存,必要 根据机器性能合理设置,官方建议设为30。

global:是否为全局设置。

这些限流设置针对消耗 者autoAck:false时才有用 ,假如 是自动 Ack的,限流不见效 。

实行 两个消耗 者,效果 :

运用.net core中实例讲解RabbitMQ

可以看到消耗 者号的标识,8,2,8,2是均匀 的,一个消耗 者5个,RabbitMQ上也能看到有2个消耗 者,Unacked数是2,由于 每个客户端的限流数是1。

运用.net core中实例讲解RabbitMQ

工作队列模式也是很常用的队列模式。

(3)发布订阅模式

Pulish/Subscribe,无选择吸收 消息,一个消息生产者,一个交换机(交换机范例 为fanout),多个消息队列,多个消耗 者。称为发布/订阅模式

在应用中,只必要 简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的全部 队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。

运用.net core中实例讲解RabbitMQ

生产者P只需把消息发送到交换机X,绑定这个交换机的队列都会获得一份一样的数据。

应用场景:得当 于用同一份数据源做不同的业务。

生产者代码

  1. /// <summary>
  2. /// 发布订阅, 扇形队列
  3. /// </summary>
  4. public static void SendMessageFanout()
  5. {
  6. //创建连接
  7. using (var connection = RabbitMQHelper.GetConnection())
  8. {
  9. //创建信道
  10. using (var channel = connection.CreateModel())
  11. {
  12. string exchangeName = "fanout_exchange";
  13. //创建交换机,fanout类型
  14. channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
  15. string queueName1 = "fanout_queue1";
  16. string queueName2 = "fanout_queue2";
  17. string queueName3 = "fanout_queue3";
  18. //创建队列
  19. channel.QueueDeclare(queueName1, false, false, false);
  20. channel.QueueDeclare(queueName2, false, false, false);
  21. channel.QueueDeclare(queueName3, false, false, false);
  22. //把创建的队列绑定交换机,routingKey不用给值,给了也没意义的
  23. channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "");
  24. channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "");
  25. channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "");
  26.              var properties = channel.CreateBasicProperties();
  27.              properties.Persistent = true; //消息持久化
  28. //向交换机写10条消息
  29. for (int i = 0; i < 10; i++)
  30. {
  31. string message = $"RabbitMQ Fanout {i + 1} Message";
  32. var body = Encoding.UTF8.GetBytes(message);
  33. channel.BasicPublish(exchangeName, routingKey: "", null, body);
  34. Console.WriteLine($"发送Fanout消息:{message}");
  35. }
  36. }
  37. }
  38. }
复制代码

实行 代码:

运用.net core中实例讲解RabbitMQ

运用.net core中实例讲解RabbitMQ

向交换机发送10条消息,则绑定这个交换机的3个队列都会有10条消息。

消耗 端的代码和工作队列的一样,只需知道队列名即可消耗 ,声明时要和生产者的声明一样。

(4)路由模式(保举 使用 )

在发布/订阅模式的基础上,有选择的吸收 消息,也就是通过 routing 路由举行 匹配条件是否满意 吸收 消息。

运用.net core中实例讲解RabbitMQ

上图是一个连合 日记 消耗 级别的配图,在路由模式它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的

  1. direct
复制代码
模式。

生产者P发送数据是要指定交换机(X)和routing发送消息 ,指定的routingKey=error,则队列Q1和队列Q2都会有一份数据,假如 指定routingKey=into,或=warning,交换机(X)只会把消息发到Q2队列。

生产者代码

  1. /// <summary>
  2. /// 路由模式,点到点直连队列
  3. /// </summary>
  4. public static void SendMessageDirect()
  5. {
  6. //创建连接
  7. using (var connection = RabbitMQHelper.GetConnection())
  8. {
  9. //创建信道
  10. using (var channel = connection.CreateModel())
  11. {
  12. //声明交换机对象,fanout类型
  13. string exchangeName = "direct_exchange";
  14. channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
  15. //创建队列
  16. string queueName1 = "direct_errorlog";
  17. string queueName2 = "direct_alllog";
  18. channel.QueueDeclare(queueName1, true, false, false);
  19. channel.QueueDeclare(queueName2, true, false, false);
  20. //把创建的队列绑定交换机,direct_errorlog队列只绑定routingKey:error
  21. channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "error");
  22. //direct_alllog队列绑定routingKey:error,info
  23. channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "info");
  24. channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "error");
  25.              var properties = channel.CreateBasicProperties();
  26.              properties.Persistent = true; //消息持久化
  27. //向交换机写10条错误日志和10条Info日志
  28. for (int i = 0; i < 10; i++)
  29. {
  30. string message = $"RabbitMQ Direct {i + 1} error Message";
  31. var body = Encoding.UTF8.GetBytes(message);
  32. channel.BasicPublish(exchangeName, routingKey: "error", properties, body);
  33. Console.WriteLine($"发送Direct消息error:{message}");
  34. string message2 = $"RabbitMQ Direct {i + 1} info Message";
  35. var body2 = Encoding.UTF8.GetBytes(message);
  36. channel.BasicPublish(exchangeName, routingKey: "info", properties, body2);
  37. Console.WriteLine($"info:{message2}");
  38. }
  39. }
  40. }
  41. }
复制代码

这里创建一个direct范例 的交换机,两个路由key,一个error,一个info,两个队列,一个队列只绑定error,一个队列绑定error和info,向error和info各发10条消息。

实行 代码:

运用.net core中实例讲解RabbitMQ

查看RabbitMQ管理界面,direct_errorlog队列10条,而direct_alllog有20条,由于 direct_alllog队列两个routingKey的消息都进去了。

运用.net core中实例讲解RabbitMQ

点进去看下两个队列绑定的交换机和routingKey

运用.net core中实例讲解RabbitMQ

运用.net core中实例讲解RabbitMQ

消耗 者代码

消耗 者和工作队列一样,只需根据队列名消耗 即可,这里只消耗 direct_errorlog队列作示例

  1. public static void DirectConsumer()
  2. {
  3. string queueName = "direct_errorlog";
  4. var connection = RabbitMQHelper.GetConnection();
  5. {
  6. //创建信道
  7. var channel = connection.CreateModel();
  8. {
  9. //创建队列
  10. channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
  11. var consumer = new EventingBasicConsumer(channel);
  12. ///prefetchCount:1来告知RabbitMQ,不要同时给一个消费者推送多于 N 个消息,也确保了消费速度和性能
  13. ///global:是否设为全局的
  14. ///prefetchSize:单条消息大小,通常设0,表示不做限制
  15. //是autoAck=false才会有效
  16. channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
  17. int i = 1;
  18. consumer.Received += (model, ea) =>
  19. {
  20. //处理业务
  21. var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  22. Console.WriteLine($"{i},队列{queueName}消费消息长度:{message.Length}");
  23. channel.BasicAck(ea.DeliveryTag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了
  24. i++;
  25. };
  26. channel.BasicConsume(queueName, autoAck: false, consumer);
  27. }
  28. }
  29. }
复制代码

平凡 场景中保举 使用 路由模式,由于 路由模式有交换机,有路由key,可以或许 更好的拓展各种应用场景。

(5)主题模式

topics(主题)模式跟routing路由模式类似 ,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以含糊 匹配路由键 routingKey,类似 于SQL中 = 和 like 的关系。

运用.net core中实例讲解RabbitMQ

P 表示为生产者、 X 表示交换机、C1C2 表示为消耗 者,红色表示队列。

topics 模式与 routing 模式比较相近,topics 模式不能具有恣意 的 routingKey,必须由一个英文句点号“.”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),比如 "lazy.orange.a"。topics routingKey 中可以存在两种特殊 字符"*"与“#”,用于做含糊 匹配,此中 “*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

以上图为例:

假如 发送消息的routingKey设置为:

aaa.orange.rabbit,那么消息会路由到Q1与Q2,

routingKey=aaa.orange.bb的消息会路由到Q1,

routingKey=lazy.aa.bb.cc的消息会路由到Q2;

routingKey=lazy.aa.rabbit的消息会路由到 Q2(只会投递给Q2一次,固然 这个routingKey 与 Q2 的两个 bindingKey 都匹配);

没匹配routingKey的消息将会被丢弃。

生产者代码

  1. public static void SendMessageTopic()
  2. {
  3. //创建连接
  4. using (var connection = RabbitMQHelper.GetConnection())
  5. {
  6. //创建信道
  7. using (var channel = connection.CreateModel())
  8. {
  9. //声明交换机对象,fanout类型
  10. string exchangeName = "topic_exchange";
  11. channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
  12. //队列名
  13. string queueName1 = "topic_queue1";
  14. string queueName2 = "topic_queue2";
  15. //路由名
  16. string routingKey1 = "*.orange.*";
  17. string routingKey2 = "*.*.rabbit";
  18. string routingKey3 = "lazy.#";
  19. channel.QueueDeclare(queueName1, true, false, false);
  20. channel.QueueDeclare(queueName2, true, false, false);
  21. //把创建的队列绑定交换机,routingKey指定routingKey
  22. channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: routingKey1);
  23. channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey2);
  24. channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey3);
  25. //向交换机写10条消息
  26. for (int i = 0; i < 10; i++)
  27. {
  28. string message = $"RabbitMQ Direct {i + 1} Message";
  29. var body = Encoding.UTF8.GetBytes(message);
  30. channel.BasicPublish(exchangeName, routingKey: "aaa.orange.rabbit", null, body);
  31. channel.BasicPublish(exchangeName, routingKey: "lazy.aa.rabbit", null, body);
  32. Console.WriteLine($"发送Topic消息:{message}");
  33. }
  34. }
  35. }
  36. }
复制代码

这里演示了 routingKey为aaa.orange.rabbit,和lazy.aa.rabbit的环境 ,第一个匹配到Q1和Q2,第二个匹配到Q2,以是 应该Q1是10条,Q2有20条,

实行 后看rabbitMQ界面:

运用.net core中实例讲解RabbitMQ

(6)RPC模式

与上面其他5种所不同之处,该模式是拥有哀求 /回复的。也就是有相应 的,上面5种都没有。

RPC是指长途 过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的处理业务,处理完后然后在A服务器继续实行 下去,把异步的消息以同步的方式实行 。

运用.net core中实例讲解RabbitMQ

客户端(C)声明一个排他队列本身 订阅,然后发送消息到RPC队列同时也把这个排他队列名也在消息里传进去,服务端监听RPC队列,处理完业务后把处理效果 发送到这个排他队列,然后客户端收到效果 ,继续处理本身 的逻辑。

RPC的处理流程:

  • 当客户端启动时,创建一个匿名的回调队列。
  • 客户端为RPC哀求 设置2个属性:replyTo:设置回调队列名字;correlationId:标记request。
  • 哀求 被发送到rpc_queue队列中。
  • RPC服务器端监听rpc_queue队列中的哀求 ,当哀求 到来时,服务器端会处理并且把带有用 果 的消息发送给客户端。吸收 的队列就是replyTo设定的回调队列。
  • 客户端监听回调队列,当有消息时,检查correlationId属性,假如 与request中匹配,那就是效果 了。

服务端代码

  1. public class RPCServer
  2. {
  3. public static void RpcHandle()
  4. {
  5. var connection = RabbitMQHelper.GetConnection();
  6. {
  7. var channel = connection.CreateModel();
  8. {
  9. string queueName = "rpc_queue";
  10. channel.QueueDeclare(queue: queueName, durable: false,
  11. exclusive: false, autoDelete: false, arguments: null);
  12. channel.BasicQos(0, 1, false);
  13. var consumer = new EventingBasicConsumer(channel);
  14. channel.BasicConsume(queue: queueName,
  15. autoAck: false, consumer: consumer);
  16. Console.WriteLine("【服务端】等待RPC请求...");
  17. consumer.Received += (model, ea) =>
  18. {
  19. string response = null;
  20. var body = ea.Body.ToArray();
  21. var props = ea.BasicProperties;
  22. var replyProps = channel.CreateBasicProperties();
  23. replyProps.CorrelationId = props.CorrelationId;
  24. try
  25. {
  26. var message = Encoding.UTF8.GetString(body);
  27. Console.WriteLine($"【服务端】接收到数据:{ message},开始处理");
  28. response = $"消息:{message},处理完成";
  29. }
  30. catch (Exception e)
  31. {
  32. Console.WriteLine("错误:" + e.Message);
  33. response = "";
  34. }
  35. finally
  36. {
  37. var responseBytes = Encoding.UTF8.GetBytes(response);
  38. channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
  39. basicProperties: replyProps, body: responseBytes);
  40. channel.BasicAck(deliveryTag: ea.DeliveryTag,
  41. multiple: false);
  42. }
  43. };
  44. }
  45. }
  46. }
  47. }
复制代码

客户端

  1. public class RPCClient
  2. {
  3. private readonly IConnection connection;
  4. private readonly IModel channel;
  5. private readonly string replyQueueName;
  6. private readonly EventingBasicConsumer consumer;
  7. private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
  8. private readonly IBasicProperties props;
  9. public RPCClient()
  10. {
  11. connection = RabbitMQHelper.GetConnection();
  12. channel = connection.CreateModel();
  13. replyQueueName = channel.QueueDeclare().QueueName;
  14. consumer = new EventingBasicConsumer(channel);
  15. props = channel.CreateBasicProperties();
  16. var correlationId = Guid.NewGuid().ToString();
  17. props.CorrelationId = correlationId; //给消息id
  18. props.ReplyTo = replyQueueName;//回调的队列名,Client关闭后会自动删除
  19. consumer.Received += (model, ea) =>
  20. {
  21. var body = ea.Body.ToArray();
  22. var response = Encoding.UTF8.GetString(body);
  23. //监听的消息Id和定义的消息Id相同代表这条消息服务端处理完成
  24. if (ea.BasicProperties.CorrelationId == correlationId)
  25. {
  26. respQueue.Add(response);
  27. }
  28. };
  29. channel.BasicConsume(
  30. consumer: consumer,
  31. queue: replyQueueName,
  32. autoAck: true);
  33. }
  34. public string Call(string message)
  35. {
  36. var messageBytes = Encoding.UTF8.GetBytes(message);
  37. //发送消息
  38. channel.BasicPublish(
  39. exchange: "",
  40. routingKey: "rpc_queue",
  41. basicProperties: props,
  42. body: messageBytes);
  43. //等待回复
  44. return respQueue.Take();
  45. }
  46. public void Close()
  47. {
  48. connection.Close();
  49. }
  50. }
复制代码

实行 代码

  1. static void Main(string[] args)
  2. {
  3. Console.WriteLine("Hello World!");
  4. //启动服务端,正常逻辑是在另一个程序
  5. RPCServer.RpcHandle();
  6. //实例化客户端
  7. var rpcClient = new RPCClient();
  8. string message = $"消息id:{new Random().Next(1, 1000)}";
  9. Console.WriteLine($"【客服端】RPC请求中,{message}");
  10. //向服务端发送消息,等待回复
  11. var response = rpcClient.Call(message);
  12. Console.WriteLine("【客服端】收到回复响应:{0}", response);
  13. rpcClient.Close();
  14. Console.ReadKey();
  15. }
复制代码

测试效果 :

运用.net core中实例讲解RabbitMQ

z实行 完,客服端close后,可以接着本身 的下一步业务处理。

总结

以上便是RabbitMQ的6中模式在.net core中实际 使用 ,此中 (1)简单队列,(2)工作队列,(4)路由模式,(6)RPC模式的交换机范例 都是direct,(3)发布订阅的交换机是fanout,(5)topics的交换机是topic。正常场景用的是direct,默认交换机也是direct范例 的,保举 用(4)路由模式,由于 指定交换机名比起默认的交换机会轻易 扩展场景,其他的交换机看业务场景所需使用 。

下面位置可以看到交换机范例 ,amq.开头那几个是内置的,避免交换机过多可以直接使用 。

运用.net core中实例讲解RabbitMQ

到此这篇关于运用.net core中实例讲解RabbitMQ的文章就先容 到这了,更多相干 .net core RabbitMQ内容请搜索 脚本之家从前 的文章或继续欣赏 下面的相干 文章盼望 大家以后多多支持脚本之家!


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

avatar 嘴边反复的话普 | 2021-9-24 20:12:17 | 显示全部楼层
顶一个!
回复

使用道具 举报

avatar 雨荷424 | 2021-9-24 20:12:20 | 显示全部楼层
admin楼主加油,看好你哦!
回复

使用道具 举报

avatar 我爱萨其马虞co | 2021-9-26 04:28:44 | 显示全部楼层
好多兽医在广场上义诊,admin楼主去看看吧!
回复

使用道具 举报

avatar 江左岸右郧 | 2021-9-26 20:41:48 | 显示全部楼层
突然觉得admin楼主说的很有道理,赞一个!
回复

使用道具 举报

avatar 喜欢吃芒果干俺 | 2021-9-26 23:18:14 | 显示全部楼层
admin楼主的帖子越来越有深度了!
回复

使用道具 举报

avatar 爱晚风愁制 | 2021-10-1 22:18:55 | 显示全部楼层
admin楼主给脑残下了定义!
回复

使用道具 举报

avatar 象棋达人 | 2021-10-4 14:06:24 | 显示全部楼层
楼上的说的很好!
回复

使用道具 举报

avatar 塞上云烟辣 | 2021-10-4 14:18:24 | 显示全部楼层
我和我的小伙伴都惊呆了!
回复

使用道具 举报

avatar 司驴迁咏 | 2021-10-4 14:26:42 | 显示全部楼层
支持楼上的!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则