启明办公

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 97|回复: 1

《RabbitMQ系列》之RabbitMQ的发布确认

[复制链接]

2

主题

6

帖子

10

积分

新手上路

Rank: 1

积分
10
发表于 2022-12-31 10:41:32 | 显示全部楼层 |阅读模式
大家好,我是 @明人只说暗话,本文为大家介绍RabbitMQ的【发布确认】相关的概念和用法,水平一般,能力有限,若有错误之处,欢迎指正。
本文参考RabbitMQ官网文档:
用过消息中间件的朋友,即使没遇到过,也应该听说过消失丢失的问题。但凡使用消息中间件,如何解决消息丢失问题便是一个绕不过去的坎。
消失丢失大概分为三种情况:
一、生产者问题。因为应用程序故障、网络抖动等各种原因,生产者没有成功向broker发送消息。
二、消息中间件自身问题。broker没有将消息保存好,也许在消息持久化之前broker宕机了,导致消息丢失。
三、消费者问题。消费者在消费消息时,因为没有合适的处理措施,导致broker将消费失败的消息从队列中删除了。
为了解决消失丢失的问题,各个消息中间件都有自己的解决方案,本文主要讲述RabbitMQ是如何解决上述第一种情况的(第二种情况可以采用持久化机制;第三种情况可以采用消息应答)。
其实,还有一种情况,因为路由或者交换机等配置的不对,导致消息发送失败。


相关参数说明

spring.rabbitmq.publisher-confirm-type

要开启发布确认功能,需要配置spring.rabbitmq.publisher-confirm-type参数,可选值有三个:SIMPLE、CORRELATED和NONE。
spring.rabbitmq.template.mandatory

是否启用强制消息。
当服务器无法将消息路由到队列时,如果将spring.rabbitmq.template.mandatory设置为true,它将使用return方法返回不可路由的消息。
代码示例

配置信息



生产者代码

@Slf4j
@Component
public class PublisherConfirmsProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        /*
         * 回调函数
         * @param correlationData 消息内容
         * @param ack 是否成功
         * @param cause 失败的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("已经收到消息,Id为 : {}", correlationData.getId());
            } else {
                log.info("未接收到消息:{},原因:{}", correlationData.getId(), cause);
            }
        });

        /*
         * 消息退回,指消息未达到目的地被回退
         */
        rabbitTemplate.setReturnsCallback(returnCallback -> log.info("【消息被退回】交换机:{},消息:{},退回原因:{}",
                returnCallback.getExchange(), new String(returnCallback.getMessage().getBody()), returnCallback.getReplyText()));
    }


    public void send1() {
        init();
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend("directExchange", "rk001", "这是一条可以消费的消息", correlationData);
    }

    public void send2() {
        init();
        CorrelationData correlationData = new CorrelationData("2");
        rabbitTemplate.convertAndSend("directExchange", "rk002", "这是一条无法到达的消息", correlationData);
    }
}如上面代码所示。
我们首先通过rabbitTemplate.setConfirmCallback方法,设置了发送消息的回调函数(需要将参数spring.rabbitmq.publisher-confirm-type设置为SIMPLE或者CORRELATED)。
correlationData参数表示我们发送消息时附带的额外参数,可以是业务ID等,方便执行其他业务逻辑。
ack参数表示消息是否发送成功,true表示消息发送成功,false表示发送失败。
cause参数表示没有发送到消息的错误信息。
其次,通过rabbitTemplate.setReturnsCallback方法,设置了消息没有正常到达broker(如routing key 不匹配等)时的处理逻辑(需要将参数spring.rabbitmq.template.mandatory设置为true)。
最后,我们发送了两条消息,第一条消息可以正常被消费,第二条消息因为routing key 不匹配,所以无法到达队列中。
消费者代码

@Component
@Slf4j
public class PublisherConfirmsConsumer {
    @RabbitListener(bindings = {@QueueBinding(value = @Queue("queue.pc.001"),
            exchange = @Exchange(value = "directExchange"), key = {"rk.001"})})
    public void process1(Message message, Channel channel) {
        log.info(">>>>>>>>>> Receive: {}", message.getPayload());
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue("queue.pc.002"),
            exchange = @Exchange("directExchange"), key = {"rk.001"})})
    public void process2(Message message, Channel channel) {
        log.info(">>>>>>>>>> Receive: {}", message.getPayload());
    }
}通过routing key绑定交换机和队列。
单元测试代码

@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
class PublisherConfirmsProducerTest {

    @Resource
    private PublisherConfirmsProducer publisherConfirmsProducer;

    @Test
    void send() {
        publisherConfirmsProducer.send1();
        publisherConfirmsProducer.send2();
    }
}测试结果



如上图所示,第一条消息被正常消费了,触发了rabbitTemplate.setConfirmCallback方法;
第二条消息因为routing key不匹配,因此没有路由到队列中被消费(NO_ROUTE),触发了rabbitTemplate.setReturnsCallback方法。
<hr/>我是 @明人只说暗话,以上就是本文内容,欢迎点赞、评论、关注。
回复

使用道具 举报

1

主题

3

帖子

4

积分

新手上路

Rank: 1

积分
4
发表于 2022-12-31 10:42:05 | 显示全部楼层
[酷]
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|天恒办公

Copyright © 2001-2013 Comsenz Inc.Template by Comsenz Inc.All Rights Reserved.

Powered by Discuz!X3.4

快速回复 返回顶部 返回列表