整合消息队列RabbitMQ

2023-12-15 16:30:53
为什么使用消息队列MQ?

因为使用消息队列有多个好处:可以实现系统服务的解耦、异步和削峰:

  1. 异步通信:消息队列提供了一种异步通信的方式,发送方可以将消息发送到队列中,然后继续执行其他任务,而不需要等待接收方的响应。这种异步通信方式可以提高系统的响应速度和吞吐量。
  2. 解耦系统:通过引入消息队列,不同的系统或模块可以通过消息进行通信,而不直接依赖于彼此的实时可用性。这种解耦可以降低系统之间的耦合度,使系统更加灵活、可维护和可扩展。
  3. 削峰填谷:在高并发场景下,消息队列可以作为一个缓冲层,平衡生产者和消费者之间的速度差异。当生产者的请求量暴增时,消息队列可以暂时存储消息,使消费者可以按照自身的处理能力逐渐消费这些消息,从而避免系统崩溃或过载。

A服务直接把某个功能的消息发给消息队列,B服务从消息队列中取出消息,然后执行任务,这样完成了项目的解耦,同时异步执行了这个任务。

特点:
  1. 灵活的消息路由:RabbitMQ支持多种灵活的消息路由方式,包括直接交换、主题交换、扇形交换等,可以根据消息的特征将消息路由到不同的队列。
  2. 可靠性:RabbitMQ提供持久化机制,可以将消息持久化到磁盘上,确保消息不会丢失。同时,它还支持发布确认和消费确认机制,保证消息的可靠传递。
  3. 高可用性:RabbitMQ支持集群部署,可以将多个节点组成一个集群,提供高可用性和负载均衡。
  4. 多语言支持:RabbitMQ提供了多种编程语言的客户端库,使得开发者可以使用各种语言来与RabbitMQ进行交互。
  5. 可扩展性:RabbitMQ提供了插件机制,可以根据需要添加新的功能和特性。
基本概念:
  1. Producer(生产者):负责向RabbitMQ发送消息。
  2. Consumer(消费者):负责接收和处理RabbitMQ中的消息。
  3. Queue(队列):消息在RabbitMQ中的存储区域,消息发送到交换机后最终被投递到队列中等待消费者消费。
  4. Exchange(交换机):接收生产者发送的消息,并根据规则将消息路由到一个或多个队列中。
  5. Binding(绑定):用于将交换机和队列之间建立关联关系,定义了消息如何从交换机路由到队列。
  6. Routing Key(路由键):生产者在发送消息时,可以指定一个路由键,交换机根据路由键将消息路由到相应的队列。
使用:

1、引入依赖:

        <!--        消息队列-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2、引入配置

  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

3、创建交换机和队列

    public static void doInit(){
        try{
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("host");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("name");
            connectionFactory.setPassword("pass");
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            String EXCHANGE_NAME = "code_exchange"; //交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//            创建队列,写一个队列名
            String queueName = "code_queue";
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,EXCHANGE_NAME,"my_routingKey");
            log.info("创建MQ成功");
        }catch (Exception e){
            log.error("创建MQ失败");
            e.printStackTrace();
        }
    }
  1. 生产者代码
package com.stukk.stuojbackendquestionservice.message;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @Author: stukk
 * @Description:
 * @DateTime: 2023-12-07 21:08
 **/
@Component
public class MyMessageProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange,String routingKey, String message){
        rabbitTemplate.convertAndSend(exchange,routingKey,message);
    }

}
  1. 消费者代码
package com.stukk.stuojbackendjudgeservice.message;

import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * @Author: stukk
 * @Description: 消费者
 * @DateTime: 2023-12-07 21:17
 **/
@Component
@Slf4j
public class MyMessageConsumer {

    //    指定监听的队列和确认机制
    @SneakyThrows
    @RabbitListener(queues = {"code_queue"}, ackMode = "MANUAL")
    public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliverTag){
        log.info("获取到消息:{}", message);
//   根据传递的消息来执行需要执行的业务
        channel.basicAck(deliverTag,false);
    }

}

确定好要传递的消息,就可以执行你需要的业务了。

文章来源:https://blog.csdn.net/m0_54409739/article/details/134887788
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。