Rabbitmq 消息可靠性保证

2024-01-10 10:31:24
1、简介

????????消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,本文详细介绍两个环节的消息可靠性传递方式:1)、消息传递到交换机的 confirm 模式;2)、消息传递到队列的 Return 模式。

消息从 producer 到 exchange 则会返回一个 confirmCallback;

?消息从 exchange?到 queue 投递失败则会返回一个 returnCallback;

2、Confirm模式

????????消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障。

2.1、使用confirm模式的具体步骤

1)、配置文件application.yml 开启确认模式

spring:
  rabbitmq:
    host: 192.168.30.88
    port: 5672
    username: admin
    password: admin
    virtual-host: test
    publisher-confirm-type: correlated   # 开启数据关联确认机制

2)、组件绑定关系配置(采用直连交换机模式)

@SpringBootConfiguration
@ConfigurationProperties(prefix = "rabbit.confirm")
public class RabbitConfigCommit {
    private String exchangeName;
    private String queueName;
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }
    @Bean
    public Queue queue(){
        return QueueBuilder.durable(queueName).build();
    }
    public Binding bindingA(DirectExchange exchange, Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("info");
    }
}

3)、写一个类实现implements RabbitTemplate.ConfirmCallback,判断成功和失败的ack结果,可以根据具体的结果,如果ack为false,对消息进行重新发送记录日志等处理;设置rabbitTemplate的确认回调方法rabbitTemplate.setConfirmCallback(messageConfirmCallBack);

@Service
public class MessageServiceConfirm implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }
    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello long".getBytes()).build();
        // 设置消息关联数据
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("123");
        rabbitTemplate.convertAndSend("exchange.confirm", "info", message, correlationData);
    }
// 消息确认返回判断
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        if (b){
            System.out.println("消息发送到交换机成功!" + correlationData.getId());
            return;
        }
        System.out.println("消息发送到交换机失败!" + correlationData.getId());
    }
}
3、Return 模式
3.1、使用 retrun 模式的具体步骤(和confirm 模式一致)

1)、配置文件application.yml 开启 retrun 模式

spring:
  rabbitmq:
    host: 192.168.30.88
    port: 5672
    username: admin
    password: admin
    virtual-host: test
    publisher-confirm-type: correlated
    publisher-returns: true  # 开启Return模式

2)、组件配置(参考confirm模式)

3)、使用rabbitTemplate.setReturnCallback设置退回函数,当消息从?exchange?路由到 queue?失败后,则会将消息退回给producer,并执行回调函数 returnedMessage(采用匿名内部类/lambda表达式实现)

@Service
public class MessageServiceReturn {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        // 不使用匿名内部类,就要实现接口 implements RabbitTemplate.ReturnsCallback
        rabbitTemplate.setReturnsCallback(msg -> {
            System.out.println("消息发送结果:" + msg.getReplyText());
        });
//        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
//            @Override
//            public void returnedMessage(ReturnedMessage returnedMessage) {
//                System.out.println("消息发送结果:" + returnedMessage.getReplyText());
//            }
//        });
    }
    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello long".getBytes()).build();
        rabbitTemplate.convertAndSend("exchange.return", "info", message, correlationData);
    }
}
4、总结

? ? ? ? 本文详细介绍两种模式的消息可靠性保证,关于Rabbitmq 消息持久化、集群配置等更高级的内容关注下面公众号查阅。

????????本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

文章来源:https://blog.csdn.net/zwl2220943286/article/details/135492779
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。