请选择 进入手机版 | 继续访问电脑版

[ASP.NET] 运用.NetCore实例讲解RabbitMQ死信队列,延时队列

[复制链接]
查看115 | 回复21 | 2021-9-15 07:25:02 | 显示全部楼层 |阅读模式
目次

一、死信队列

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

形貌 :Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2)

       P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定环境 ,主动 把消息经X2(交换机2)路由到Q2(队列2),C(斲丧 者)直接消息Q2的消息。

特定环境 有哪些呢:

  • 1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);
  • 2.当前队列中的消息数目 已经超过最大长度(创建队列时指定" x-max-length参数设置队列最大消息数目 )。
  • 3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;

这里演示环境 1:

假如场景:Q1中队列数据不完备 ,就算从新处理也会报错,那就可以不ack,把这个消息转到死信队列别的 处理。

生产者

  1. public static void SendMessage()
  2. {
  3. //死信交换机
  4. string dlxexChange = "dlx.exchange";
  5. //死信队列
  6. string dlxQueueName = "dlx.queue";
  7. //消息交换机
  8. string exchange = "direct-exchange";
  9. //消息队列
  10. string queueName = "queue_a";
  11. using (var connection = RabbitMQHelper.GetConnection())
  12. {
  13. using (var channel = connection.CreateModel())
  14. {
  15. //创建死信交换机
  16. channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  17. //创建死信队列
  18. channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
  19. //死信队列绑定死信交换机
  20. channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
  21. // 创建消息交换机
  22. channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  23. //创建消息队列,并指定死信队列
  24. channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
  25. new Dictionary<string, object> {
  26. { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
  27. { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
  28. });
  29. //消息队列绑定消息交换机
  30. channel.QueueBind(queueName, exchange, routingKey: queueName);
  31. string message = "hello rabbitmq message";
  32. var properties = channel.CreateBasicProperties();
  33. properties.Persistent = true;
  34. //发布消息
  35. channel.BasicPublish(exchange: exchange,
  36. routingKey: queueName,
  37. basicProperties: properties,
  38. body: Encoding.UTF8.GetBytes(message));
  39. Console.WriteLine($"向队列:{queueName}发送消息:{message}");
  40. }
  41. }
  42. }
复制代码

斲丧 者

  1. public static void Consumer()
  2. {
  3. //死信交换机
  4. string dlxexChange = "dlx.exchange";
  5. //死信队列
  6. string dlxQueueName = "dlx.queue";
  7. //消息交换机
  8. string exchange = "direct-exchange";
  9. //消息队列
  10. string queueName = "queue_a";
  11. var connection = RabbitMQHelper.GetConnection();
  12. {
  13. //创建信道
  14. var channel = connection.CreateModel();
  15. {
  16. //创建死信交换机
  17. channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  18. //创建死信队列
  19. channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
  20. //死信队列绑定死信交换机
  21. channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
  22. // 创建消息交换机
  23. channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  24. //创建消息队列,并指定死信队列
  25. channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
  26. new Dictionary<string, object> {
  27. { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX
  28. { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
  29. });
  30. //消息队列绑定消息交换机
  31. channel.QueueBind(queueName, exchange, routingKey: queueName);
  32. var consumer = new EventingBasicConsumer(channel);
  33. channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
  34. consumer.Received += (model, ea) =>
  35. {
  36. //处理业务
  37. var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  38. Console.WriteLine($"队列{queueName}消费消息:{message},不做ack确认");
  39. //channel.BasicAck(ea.DeliveryTag, false);
  40. //不ack(BasicNack),且不把消息放回队列(requeue:false)
  41. channel.BasicNack(ea.DeliveryTag, false, requeue: false);
  42. };
  43. channel.BasicConsume(queueName, autoAck: false, consumer);
  44. }
  45. }
  46. }
复制代码

斲丧 者加上channel.BasickNack()模拟 消息处理不了,不ack确认。

实行 效果 :

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

RabbitMQ管理界面:

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

看到消息队列为queue_a,特性有DLX(死信交换机),DLK(死信路由)。由于 斲丧 端不nack,触发了死信,被转发到了死信队列dlx.queue。

二、延时队列

延时队列着实 也是共同 死信队列一起用,着实 就是上面死信队列的第二中环境 。给队列添加消息过时时间(TTL),变成延时队列。

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

简单的形貌 就是:P(生产者)发送消息到Q1(延时队列),Q1的消息有过期时间,比如10s,那10s后消息过期就会触发死信,从而把消息转发到Q2(死信队列)。

办理 题目 场景:像商城下单,未付出 时取消订单场景。下单时写一条记录入Q1,延时30分钟后转到Q2,斲丧 Q2,检查订单,付出 则不做操作,没付出 则取消订单,恢复库存。

生产者代码:

  1. public static void SendMessage()
  2. {
  3. //死信交换机
  4. string dlxexChange = "dlx.exchange";
  5. //死信队列
  6. string dlxQueueName = "dlx.queue";
  7. //消息交换机
  8. string exchange = "direct-exchange";
  9. //消息队列
  10. string queueName = "delay_queue";
  11. using (var connection = RabbitMQHelper.GetConnection())
  12. {
  13. using (var channel = connection.CreateModel())
  14. {
  15. //创建死信交换机
  16. channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  17. //创建死信队列
  18. channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
  19. //死信队列绑定死信交换机
  20. channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
  21. // 创建消息交换机
  22. channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  23. //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s
  24. channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
  25. new Dictionary<string, object> {
  26. { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
  27. { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
  28. { "x-message-ttl",10000} //设置队列的消息过期时间
  29. });
  30. //消息队列绑定消息交换机
  31. channel.QueueBind(queueName, exchange, routingKey: queueName);
  32. string message = "hello rabbitmq message";
  33. var properties = channel.CreateBasicProperties();
  34. properties.Persistent = true;
  35. //发布消息
  36. channel.BasicPublish(exchange: exchange,
  37. routingKey: queueName,
  38. basicProperties: properties,
  39. body: Encoding.UTF8.GetBytes(message));
  40. Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message}");
  41. }
  42. }
  43. }
复制代码

斲丧 者代码:

  1. public static void Consumer()
  2. {
  3. //死信交换机
  4. string dlxexChange = "dlx.exchange";
  5. //死信队列
  6. string dlxQueueName = "dlx.queue";
  7. var connection = RabbitMQHelper.GetConnection();
  8. {
  9. //创建信道
  10. var channel = connection.CreateModel();
  11. {
  12. //创建死信交换机
  13. channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  14. //创建死信队列
  15. channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
  16. //死信队列绑定死信交换机
  17. channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
  18. var consumer = new EventingBasicConsumer(channel);
  19. channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
  20. consumer.Received += (model, ea) =>
  21. {
  22. //处理业务
  23. var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  24. Console.WriteLine($"{DateTime.Now},队列{dlxQueueName}消费消息:{message}");
  25. channel.BasicAck(ea.DeliveryTag, false);
  26. };
  27. channel.BasicConsume(dlxQueueName, autoAck: false, consumer);
  28. }
  29. }
  30. }
复制代码

实行 代码:

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

向延时队列发送消息,监听死信队列,发送和收到消息时间刚好是设置的10s。

RabbitMQ管理界面:

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

三、延时消息设置不同过期时间

上面的延时队列能办理 消息过期时间都是类似 的场景,能不能办理 消息的过期时间是不一样的呢?

比方 场景:机器人客服,为了更像人为操作,收到消息后要随机3-10秒回复客户。

  • 1)队列不设置TTL(消息过期时间),把过期时间设置在消息上。

生产者代码:

  1. public static void SendMessage()
  2. {
  3. //死信交换机
  4. string dlxexChange = "dlx.exchange";
  5. //死信队列
  6. string dlxQueueName = "dlx.queue";
  7. //消息交换机
  8. string exchange = "direct-exchange";
  9. //消息队列
  10. string queueName = "delay_queue";
  11. using (var connection = RabbitMQHelper.GetConnection())
  12. {
  13. using (var channel = connection.CreateModel())
  14. {
  15. //创建死信交换机
  16. channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  17. //创建死信队列
  18. channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
  19. //死信队列绑定死信交换机
  20. channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
  21. // 创建消息交换机
  22. channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  23. //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s
  24. channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
  25. new Dictionary<string, object> {
  26. { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
  27. { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
  28. //{ "x-message-ttl",10000} //设置队列的消息过期时间
  29. });
  30. //消息队列绑定消息交换机
  31. channel.QueueBind(queueName, exchange, routingKey: queueName);
  32. string message = "hello rabbitmq message 10s后处理";
  33. var properties = channel.CreateBasicProperties();
  34. properties.Persistent = true;
  35. properties.Expiration = "10000";//消息的有效期10s
  36. //发布消息,延时10s
  37. channel.BasicPublish(exchange: exchange,
  38. routingKey: queueName,
  39. basicProperties: properties,
  40. body: Encoding.UTF8.GetBytes(message));
  41. Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message},延时:10s");
  42. string message2 = "hello rabbitmq message 5s后处理";
  43. var properties2 = channel.CreateBasicProperties();
  44. properties2.Persistent = true;
  45. properties2.Expiration = "5000";//消息有效期5s
  46. //发布消息,延时5s
  47. channel.BasicPublish(exchange: exchange,
  48. routingKey: queueName,
  49. basicProperties: properties2,
  50. body: Encoding.UTF8.GetBytes(message2));
  51. Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message2},延时:5s");
  52. }
  53. }
  54. }
复制代码

斲丧 者代码还是上面延时队列的不变,先试下效果 。

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

生产者向队列中发送一条延时10s的消息再发一条延时5秒的消息,但斲丧 者却先拿到延时10s的,再拿到延时5秒的,我想要的效果 是先拿到延时5s的再拿到延时10s的,是什么缘故因由 呢。

缘故因由 是:队列是先辈 先出的,而RabbitMQ只会对首位第一条消息做检测,第一条没过期,那么后面的消息就会壅闭 住等待前面的过期。

办理 办法:增长 一个斲丧 者对延时队列斲丧 ,不ack,把第一条消息放到队列尾部。不停 让消息在活动 ,如许 就能检测到了。

  • 2)新增斲丧 者代码
  1. public static void SendMessage()
  2. {
  3. //死信交换机
  4. string dlxexChange = "dlx.exchange";
  5. //死信队列
  6. string dlxQueueName = "dlx.queue";
  7. //消息交换机
  8. string exchange = "direct-exchange";
  9. //消息队列
  10. string queueName = "delay_queue";
  11. using (var connection = RabbitMQHelper.GetConnection())
  12. {
  13. using (var channel = connection.CreateModel())
  14. {
  15. //创建死信交换机
  16. channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  17. //创建死信队列
  18. channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
  19. //死信队列绑定死信交换机
  20. channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
  21. // 创建消息交换机
  22. channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
  23. //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s
  24. channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
  25. new Dictionary<string, object> {
  26. { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
  27. { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
  28. //{ "x-message-ttl",10000} //设置队列的消息过期时间
  29. });
  30. //消息队列绑定消息交换机
  31. channel.QueueBind(queueName, exchange, routingKey: queueName);
  32. string message = "hello rabbitmq message 10s后处理";
  33. var properties = channel.CreateBasicProperties();
  34. properties.Persistent = true;
  35. properties.Expiration = "10000";//消息的有效期10s
  36. //发布消息,延时10s
  37. channel.BasicPublish(exchange: exchange,
  38. routingKey: queueName,
  39. basicProperties: properties,
  40. body: Encoding.UTF8.GetBytes(message));
  41. Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message},延时:10s");
  42. string message2 = "hello rabbitmq message 5s后处理";
  43. var properties2 = channel.CreateBasicProperties();
  44. properties2.Persistent = true;
  45. properties2.Expiration = "5000";//消息有效期5s
  46. //发布消息,延时5s
  47. channel.BasicPublish(exchange: exchange,
  48. routingKey: queueName,
  49. basicProperties: properties2,
  50. body: Encoding.UTF8.GetBytes(message2));
  51. Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message2},延时:5s");
  52. }
  53. }
  54. }
复制代码

实行 效果 :

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

这会得到了想要的效果 。

RabbitMQ管理界面:

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

四、延时消息用延时插件的方式实现

相比上面第三的延时消息,这里的插件方式会显的更加简单,也保举 用这种。

由于 这里只必要 一个交换机和一个对队列,生产者向队列发送消息,会直接是延时才会到队列。

安装插件:

地址:https://www.rabbitmq.com/community-plugins.html

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

找到和本身 RabbitMQ一样的版本,下载下来上传到Linux,或F12查看这个文件的地址,直接Linux上下载(这里用这种)

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

Linux下载插件:

  1. #下载插件
  2. wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
复制代码

已经下载到Linux上

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

  1. #把文件复制到rabbitmq docker容器下的plugins文件夹
  2. docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
复制代码
  1. #进入rabbitmq docker容器
  2. docker exec -it rabbitmq bash
复制代码
  1. #开启插件:
  2. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码

做完上面这些在RabbitMQ管理界面可以看到多了一个延时消息的交换机。

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

插件装好了,生产者代码

  1. public static void SendMessage()
  2. {
  3. //延时消息交换机
  4. string delayExchange = "delay.exchange";
  5. //延时消息队列
  6. string delayQueueName = "delay_queue";
  7. using (var connection = RabbitMQHelper.GetConnection())
  8. {
  9. using (var channel = connection.CreateModel())
  10. {
  11. Dictionary<string, object> args = new Dictionary<string, object>();
  12. args.Add("x-delayed-type", "direct"); //x-delayed-type必须加
  13. //创建延时交换机,type类型为x-delayed-message
  14. channel.ExchangeDeclare(delayExchange, type: "x-delayed-message", durable: true, autoDelete: false,arguments: args);
  15. //创建延时消息队列
  16. channel.QueueDeclare(delayQueueName, durable: true, exclusive: false, autoDelete: false);
  17. //交换机绑定队列
  18. channel.QueueBind(delayQueueName, delayExchange, routingKey: delayQueueName);
  19. string message = "hello rabbitmq message 10s后处理";
  20. var properties = channel.CreateBasicProperties();
  21. properties.Persistent = true;
  22. //延时时间从header赋值
  23. Dictionary<string, object> headers = new Dictionary<string, object>();
  24. headers.Add("x-delay", 10000);
  25. properties.Headers = headers;
  26. //发布消息,按时10s
  27. channel.BasicPublish(exchange: delayExchange,
  28. routingKey: delayQueueName,
  29. basicProperties: properties,
  30. body: Encoding.UTF8.GetBytes(message));
  31. Console.WriteLine($"{DateTime.Now},向队列:{delayQueueName}发送消息:{message},延时:10s");
  32. string message2 = "hello rabbitmq message 5s后处理";
  33. var properties2 = channel.CreateBasicProperties();
  34. properties2.Persistent = true;
  35. //延时时间从header赋值
  36. Dictionary<string, object> headers2 = new Dictionary<string, object>();
  37. headers2.Add("x-delay", 5000);
  38. properties2.Headers = headers2;
  39. //发布消息,延时5s
  40. channel.BasicPublish(exchange: delayExchange,
  41. routingKey: delayQueueName,
  42. basicProperties: properties2,
  43. body: Encoding.UTF8.GetBytes(message2));
  44. Console.WriteLine($"{DateTime.Now},向队列:{delayQueueName}发送消息:{message2},延时:5s");
  45. }
  46. }
  47. }
复制代码

斲丧 者代码

  1. public static void DelayMessageConsumer()
  2. {
  3. //延时队列
  4. string queueName = "delay_queue";
  5. var connection = RabbitMQHelper.GetConnection();
  6. {
  7. //创建信道
  8. var channel = connection.CreateModel();
  9. {
  10. var consumer = new EventingBasicConsumer(channel);
  11. channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
  12. consumer.Received += (model, ea) =>
  13. {
  14. //处理业务
  15. var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  16. Console.WriteLine($"{DateTime.Now},接收到消息:{message}");
  17. channel.BasicAck(ea.DeliveryTag, false);
  18. };
  19. channel.BasicConsume(queueName, autoAck: false, consumer);
  20. }
  21. }
  22. }
复制代码

实行 代码:

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

RabbitMQ管理界面,只有一个队列:

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

到此这篇关于运用.NetCore实例讲解RabbitMQ死信队列,延时队列的文章就先容 到这了,更多干系 .NetCore RabbitMQ死信队列,延时队列内容请搜刮 脚本之家从前 的文章或继续欣赏 下面的干系 文章渴望 大家以后多多支持脚本之家!


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

avatar 小仙女要起飞 | 2021-9-15 10:45:26 | 显示全部楼层
admin楼主很有艺术范!
回复

使用道具 举报

avatar 强绝商爸摇 | 2021-9-15 23:24:25 | 显示全部楼层
看帖不回帖的人就是耍流氓,我回复了!
回复

使用道具 举报

avatar luly靓 | 2021-9-16 15:17:06 | 显示全部楼层
我对admin楼主的敬仰犹如滔滔江水绵延不绝!
回复

使用道具 举报

avatar 新北狂纱 | 2021-9-18 14:12:14 | 显示全部楼层
最近精神病院在打折,admin楼主去看看吧?
回复

使用道具 举报

avatar 落日五湖W | 2021-9-20 14:05:09 | 显示全部楼层
脑残片admin楼主今天吃了么?
回复

使用道具 举报

avatar XY890 | 2021-9-23 01:39:13 | 显示全部楼层
吹牛的人越来越多了!
回复

使用道具 举报

avatar wangliqxm | 2021-9-24 08:07:40 | 显示全部楼层
顶顶更健康!
回复

使用道具 举报

avatar 刘冠华 | 2021-9-24 10:19:02 | 显示全部楼层
我默默的回帖,从不声张!
回复

使用道具 举报

avatar 囝囝刚 | 2021-9-26 04:05:05 | 显示全部楼层
对牛弹琴的人越来越多了!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则