Spring RabbitMQ那些事(3-消息可靠传输和订阅)

03-13 阅读 0评论

目录

  • 一、序言
  • 二、生产者确保消息发送成功
    • 1、为什么需要Publisher Confirms
    • 2、哪些消息会被确认处理成功
    • 三、消费者保证消息被处理
    • 四、Spring RabbitMQ支持代码示例
      • 1、 application.yml
      • 2、RabbigtMQ配置
      • 3、可靠生产者配置
      • 4、可靠消费者配置
      • 5、测试用例

        一、序言

        在有些业务场景中,消息是不能丢的,比如分布式事务资金动账,出账方扣款,那么入账方就一定要收款。以前写了一篇分布式事务的文章,里面的跨地区转账就是一个实际案例。

        Spring RabbitMQ那些事(3-消息可靠传输和订阅),Spring RabbitMQ那些事(3-消息可靠传输和订阅),词库加载错误:未能找到文件“C:\Users\Administrator\Desktop\火车头9.8破解版\Configuration\Dict_Stopwords.txt”。,我们,配置,.com,第1张
        (图片来源网络,侵删)

        消息是有可能丢的,比如生产者在发送消息时broker服务挂了,消息没有来得及落盘,这时消息就彻底丢了。

        保证MQ消息可靠传输主要有两个方面,一方面是消息生产者确保消息一定发送成功,另一方面是消费者确保消息一定被处理。


        二、生产者确保消息发送成功

        1、为什么需要Publisher Confirms

        在Spring AMQP中AmqpTemplate的实现RabbitTemplate已经支持 Publisher Confirms and Returns,所谓的publisher confirms意思就是消息发布者确认消息是否已经被发送。

        在RabbitMQ官方文档描述中,持久化的消息在Broker重启时也是应该存活的,这里的词用的是应该,因为消息有可能在落地磁盘前Broker就挂了,导致消息丢失。

        最直接的解决方案是通过事务,但是通过事务有两个问题:

        • 事务阻塞:发布者必须等待Broker处理完每条消息。
        • 事务很重:每次提交都会要求触发fsync(),强制磁盘,这个过程需要花很长的时间。

          备注:在RabbitMQ官方测试中,通过事务去保证,发布10000条消息需要花至少4分钟的时间。

          Spring RabbitMQ那些事(3-消息可靠传输和订阅),Spring RabbitMQ那些事(3-消息可靠传输和订阅),词库加载错误:未能找到文件“C:\Users\Administrator\Desktop\火车头9.8破解版\Configuration\Dict_Stopwords.txt”。,我们,配置,.com,第2张
          (图片来源网络,侵删)

          Spring RabbitMQ那些事(3-消息可靠传输和订阅)

          而通过Publisher Confirm机制,一旦Broker处理完就会确认消息,而且这个过程是异步的,生产者可以流式发布消息,不需要等待Broker,并且Broker会批量高效将消息落盘。

          2、哪些消息会被确认处理成功

          当Broker确认消息时,会通知消息发布者消息是否被成功处理,成功处理的基本规则如下:

          • 无法路由的mandatory(必须有符合条件的队列)和immediate(必须有消费者在线)类型在被basic.return后会被确认。
          • 非持久化消息在入队时会被确认。
          • 持久化消息当持久化到磁盘或者被消费者消费时会被确认。

            三、消费者保证消息被处理

            消费者端确保消息消费很简单,关闭消息自动确认就好,开启消息手动确认。当然有些场景消息只能被处理一次,可以通过分布式锁来实现。


            四、Spring RabbitMQ支持代码示例

            1、 application.yml

            server:
              port: 8080
            spring:
              rabbitmq:
                addresses: localhost:5672
                username: admin
                password: admin
                virtual-host: /
                publisher-returns: true
                publisher-confirm-type: correlated
                listener:
                  type: simple
                  simple:
                    acknowledge-mode: manual
                    concurrency: 5
                    max-concurrency: 20
                    prefetch: 5
                template:
                  mandatory: true
            

            备注:

            • 这里一定要设置spring.rabbitmq.publisher-returns为true,并且设置spring.rabbitmq.publisher-confirm-type为correlated,同时设置spring.rabbitmq.template.mandatory为true。
            • 上面我们将消费者的确认模式改为了手动确认。

              2、RabbigtMQ配置

              @Configuration
              public class RabbitReliableTransportConfig {
              	/**
              	 * RabbitTemplate消息转换器配置,自动将对象转换为json字符串
              	 *
              	 * @return
              	 */
              	@Bean
              	public MessageConverter jackson2JsonMessageConverter() {
              		Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
              		messageConverter.setClassMapper(new DefaultJackson2JavaTypeMapper());
              		return messageConverter;
              	}
              	@Bean
              	public Queue reliableQueue() {
              		return QueueBuilder.durable("reliable-queue").build();
              	}
              }
              

              3、可靠生产者配置

              @Slf4j
              @Component
              @RequiredArgsConstructor
              public class RabbitMqReliableProducer {
              	private final RabbitTemplate rabbitTemplate;
              	public void sendReliableMsg(String body) {
              		// 发送可靠消息
              		ReliableMsgDTO reliableMsgDTO = ReliableMsgDTO.builder().body(body).build();
              		CorrelationData correlationData = new CorrelationData();
              		rabbitTemplate.convertAndSend("reliable-queue", reliableMsgDTO, correlationData);
              		// 发送确认逻辑
              		CompletableFuture future = correlationData.getFuture().completable();
              		future.whenComplete((confirm, throwable) -> {
              			if (confirm.isAck()) {
              				log.info("消息已经被成功发送, 消息内容:{}", JSON.toJSONString(reliableMsgDTO));
              				return;
              			}
              			log.warn("消息发送未成功发送, 原因:{}, 消息内容:{}", confirm.getReason(), JSON.toJSONString(reliableMsgDTO), throwable);
              			// 5秒后再发送
              			LockSupport.parkNanos(5 * 1000 * 1000 * 1000L);
              			rabbitTemplate.convertSendAndReceive(reliableMsgDTO, correlationData);
              		});
              	}
              }
              

              4、可靠消费者配置

              @Slf4j
              @Component
              public class RabbitMQReliableConsumer {
              	@RabbitListener(queues = "reliable-queue")
              	public void handleMsgFromQueue(ReliableMsgDTO reliableMsgDTO, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
              		channel.basicAck(tag, false);
              		// channel.basicNack(tag, false, false);
              		log.info("Message received from queue, message body: {}", JSON.toJSONString(reliableMsgDTO));
              	}
              }
              

              备注:这里我们开启了消息的手动确认,如果消息处理失败没有确认,那么消息将会在下次消费者参加连接时再次被投递。

              Spring RabbitMQ那些事(3-消息可靠传输和订阅),Spring RabbitMQ那些事(3-消息可靠传输和订阅),词库加载错误:未能找到文件“C:\Users\Administrator\Desktop\火车头9.8破解版\Configuration\Dict_Stopwords.txt”。,我们,配置,.com,第4张
              (图片来源网络,侵删)

              5、测试用例

              测试结果如下,每当消息发送至Broker成功后会触发回调,如果消息发送失败将会触发重新发送。

              2024-01-20 18:13:11.399  INFO 12316 --- [78.107.127:5672] c.u.r.i.p.RabbitMqReliableProducer       : 消息已经被成功发送, 消息内容:{"body":"hello"}
              2024-01-20 18:13:11.399  INFO 12316 --- [ntContainer#0-5] c.u.r.i.c.RabbitMQReliableConsumer       : Message received from queue, message body: {"body":"hello"}
              

              Spring RabbitMQ那些事(3-消息可靠传输和订阅)


免责声明
本网站所收集的部分公开资料来源于AI生成和互联网,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

发表评论

快捷回复: 表情:
评论列表 (暂无评论,人围观)

还没有评论,来说两句吧...

目录[+]