RabbitMQ的基础使用

2023-12-22 13:51:27
/**
 * 使用rabbitMQ
 * 1.引用amqp场景 RabbitAutoConfiguration就会自动生效
 * 2.给容器中自动配置了各种api RabbitTemplate AmqpAdmin CachingConnectionFactory RabbitMessagingTemplate
 * 所有属性都是 spring.rabbitmq开头
 * 3.通过注解@EnableRabbit使用
 * 4.监听消息 使用@RabbitListener 注解 必须有@EnableRabbit才能生效 如果是创建交换机,创建队列 不需要有@EnableRabbit注解
 * @RabbitListener 可以标在类和方法上
 * @RabbitHandler 可以标在方法上 场景 一个队列返回的类型不同 使用这个注解来重载
 */

1.引入依赖

<!--        mq依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.配置文件

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
#虚拟主机
spring.rabbitmq.virtual-host=/

#开启消息生产者发送消息确认
# NONE:禁用发布确认模式,是默认值
#CORRELATED:发布消息成功到交换器后会触发回调方法
#SIMPLE

spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步发送优先回调这个
spring.rabbitmq.template.mandatory=true

#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3.注入

 @Autowired
    AmqpAdmin amqpAdmin;

操作创建交换机,创建队列,创建绑定关系

 /**
     * 1.如何创建Exchange    Queue Binding?  使用AmqpAdmin进行创建
     * 2.如何接收消息
     */
    @Test
    public void createExchange(){
        //创建一个交换机
        //String name  交换机名称, boolean durable 是否持久化, boolean autoDelete  是否自动删除
        DirectExchange directExchange = new DirectExchange("hallo-java-exchange", true, false);
        amqpAdmin.declareExchange(directExchange);
        System.out.println("单点交换机创建成功"+directExchange);
    }

    @Test
    public void createQueue(){
        //String name 队列名称, boolean durable 是否持久化, boolean exclusive 是否排他,
        // boolean autoDelete 是否自动删除, @Nullable Map<String, Object> arguments
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
    }

    /**
     * 测试创建绑定关系
     */
    @Test
    public void createBing(){
        //String destination, 目的地
        // DestinationType destinationType,目的地类型
        //String exchange 交换机
        // String exchange, String routingKey, 路由key
        // @Nullable Map<String, Object> arguments 自定义参数
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
                "hallo-java-exchange","hello.java",null);
        amqpAdmin.declareBinding(binding);
    }
/ 用来发送消息的
   @Autowired
    RabbitTemplate rabbitTemplate;
/**
     * 测试发送消息功能
     */
    @Test
    public void sendMessageTest(){
        for(int i=0;i<10;i++){
            if(i%2 == 0){
                OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
                reasonEntity.setId(1L);
                reasonEntity.setCreateTime(new Date());
                reasonEntity.setName("哈哈");
                //发送消息 如果发送的消息是个对象,我们会使用序列化机制,将对象写出去 对象必须实现 Serializable
                //也可以用config的方法 将对象类型的消息转为json
                //String exchange 发送的交换机, String routingKey 路由key, Object object 发送的内容,
                // correlationData 指定uuid,生产者发送消息给服务时候,获得这个参数,从而确定是哪条消息
                rabbitTemplate.convertAndSend("hallo-java-exchange","hello.java",reasonEntity,
                        new CorrelationData(UUID.randomUUID().toString()));
            }else{
                OrderEntity orderEntity = new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hallo-java-exchange","hello.java",orderEntity,
                        new CorrelationData(UUID.randomUUID().toString()));
            }
        }

    }

接收消息在业务层操作 相关的注解有

@RabbitListener 可以标在类和方法上

@RabbitHandler 可以标在方法上 场景 一个队列返回的类型不同 使用这个注解来重载

@Service("orderItemService")
@RabbitListener(queues = {"hello-java-queue"})
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {

    /**
     * 测试接收消息
     * 注解中的queues 声明需要监听的队列,可以是多个
     * org.springframework.amqp.core.Message 第一个参数是原生的详细信息
     * OrderReturnApplyEntity 第二个参数 消息的返回类型
     * Channel 第三个参数 服务获取消息的通道
     *
     * 场景
     * 多个客户端启动时,一个消息只能被一个客户端接收 轮询制度
     * 多个消息等待服务处理时,处理完一个后,才会接收下一个
     *
     * 设置ack配置 确认前是unack状态 若宕机或其他原因失败,系统再次重启状态为ready 手动签收后消息才被处理掉
     *
     */
   @RabbitHandler
    public void recieveMessage(Message message, OrderReturnApplyEntity returnApplyEntity,
                               Channel channel) throws IOException {
        //获取消息体
        byte[] body = message.getBody();
        //获取消息头
        MessageProperties messageProperties = message.getMessageProperties();
       System.out.println(returnApplyEntity);
       long deliveryTag = message.getMessageProperties().getDeliveryTag();
       //channel内按照顺自增的
       if(deliveryTag%2 == 0){
           //当业务流程执行完后 手动签收消息 deliveryTag 签收的消息序号, b 是否批量签收
           channel.basicAck(deliveryTag,false);
       }else{
           //拒签 拒签的序号,是否批量操作,是否归队 若false则丢弃 若ture则归队
           channel.basicNack(deliveryTag,false,false);
           // 和上面作用一样 只是不能设置批量操作这个参数
           // channel.basicReject(deliveryTag,false);
       }
    }

    @RabbitHandler
    public void recieveMessage2(Message message, OrderEntity order,
                               Channel channel){
        //获取消息体
        byte[] body = message.getBody();
        //获取消息头
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println(order);
    }

4.config文件

@Configuration
public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    /**
     * 定制RabbitTemplate
     */
    @PostConstruct //MyRabbitConfig对象创建完成后,执行这个方法
    public void initRabbitTemplate(){
        //设置确认回调

        //1.生产者发送消息到服务这一步的确认
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            //CorrelationData correlationData, 当前消息的唯一关联数据(可以理解为id)
            // boolean b, 判断消息是否成功收到
            // String s 如果失败,显示失败原因
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("correlationData==="+correlationData+"b==="+b+"s==="+s);
            }
        });

        //2.设置消息没有投递给指定的队列,就触发这个失败的回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                //获得投递失败的消息的详细信息
                Message message = returnedMessage.getMessage();
                //获得投递消息失败的交换机
                String exchange = returnedMessage.getExchange();
                //获得投递消息失败的状态码
                int replyCode = returnedMessage.getReplyCode();
                //获得投递消息失败的文本
                String replyText = returnedMessage.getReplyText();
                //获得投递消息失败的路由key
                String routingKey = returnedMessage.getRoutingKey();
            }
        });
        //3.消费端确认(保证每个消息 被正确消费,此时才可以删除这个消息)
    }

保证消息可靠投递

生产者到交换机 使用confirmCallback; 得到消息投递的结果

交换机到队列 使用 returnCallback; 如果消息接收失败,将会触发,得到失败消息的信息

队列到消费者 使用ack机制;进行手动确认

常见的模式

1.点对点模式,如果路由key和绑定关系完全匹配,交换机才能收到;

2.主题订阅模式,路由key和绑定关系以单词维度匹配,路由key中可以用? “#”代表匹配多个或0个

“*” 代表匹配一个

3.广播模式,不受路由key局限,只要交换机和队列有绑定关系,就可以收到消息;

4.helder性能比较低,一般不适用,私信队列不太熟悉

rabbitmq的基本原理

文章来源:https://blog.csdn.net/wmd13164306712/article/details/135146792
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。