初识RabbitMQ

2023-12-14 22:32:35

一、消息队列

1、消息队列的介绍

????????在介绍RabbitMQ之前,首先来介绍下消息队列。消息队列是生产者-消费者模型的一个典型的代表,由一端往消息队列中不断的写入消息,而另一端则可以读取或者订阅队列中的消息。当新的消息入队时,就会通知接收方进行处理,消息的发送方称为生产者,消息的接收方称为消费者。那么,当我们发起HTTP请求的时候,就可以将请求丢到消息队列中,由消费者取出。这种加入“中间商”的方式,很好的实现了解耦,并且在高并发的情况下,由于消费者能力有限,消息队列也可以扮演“削峰填谷”的作用,先堆积一部分的请求,然后由消费者进行慢处理,避免大量请求导致接口崩溃的情况。常见的消息队列有以下几种:

  • RabbitMQ:性能很强、吞吐量很高,且支持多种协议、集群化,适合于企业级开发
  • Kafka:提供了超高的吞吐量,ms级别的延迟,具有极高的可用性以及可靠性,且分布式可以任意扩展
  • RocketMQ:阿里巴巴推出的消息队列,单机吞吐量高、消息的高可靠性,扩展性很强,支持事务

?2、消息队列的作用

(1)解耦

????????假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可。

(2)异步

????????一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。

(3)削峰填谷

????????假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。

此处参考博客:https://blog.csdn.net/Rok728/article/details/123106242

二、RabbitMQ介绍

1、RabbitMQ

(1)RabbitMQ的设计架构? ? ? ?

????????RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件,其架构如下:

可以看到:

  • 生产者:消息的发送方
  • 消费者:消息的接收方
  • Channel:每个客户端连接都会使用一个Channel,再通过Channel去访问到RabbitMQ服务器,这里的通信协议不是HTTP,而是amqp协议
  • Exchange:类似于交换机,会根据我们的请求转发给相应的消息队列,每个队列都可以绑定到Exchange上,这样Exchange就可以将数据转发给队列了。Exchange可以存在很多个,不同的Exchange类型可以用于实现不同的消息的模式。
  • Queue:消息队列本体,生产者的所有消息都在消息队列中,由消费者取出
  • Virtual Host:类似于环境隔离。不同环境可以单独配置一个Virtual Host,每个Virtual Host可以包含很多个Exchange和Queue,每个Virtual Host之间互不影响

(2)RabbitMQ的特点

  • 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。

2、RabbitMQ安装

????????前面介绍了RabbitMQ是一款使用Erlang语言开发的,因此在RabbitMQ的安装的过程中需要下载ErLang安装包和RabbitMQ安装包,这里需要注意的是ErLang和RabbitMQ的版本的匹配,可参考此网站,查看Erlang和RabbitMQ不同版本之间的对应关系RabbitMQ Erlang Version Requirements — RabbitMQ

我在安装使用的Erlang的版本是26.1.2,RabbitMQ的版本是3.12.2,这里以Windows下的RabbitMQ安装为例进行介绍。

(1)Erlang安装

Erlang官网:http://www.erlang.org/downloads

下载之后一路点击next即可。

(2)RabbitMQ安装

?RabbitMQ官网:http://www.rabbitmq.com/

?下载安装之后,在RabbitMQ的安装路径的sbin目录下,打开终端,执行:rabbitmq-plugins enable rabbitmq_management命令安装管理页面的插件。

?

然后双击rabbitmq-server.bat启动脚本,打开浏览器输入http://localhost:15672,账号密码默认是:guest/guest

?

3、消息发送接收案例

这里我们先通过界面模拟一个消息的发送案例。

(1)新建队列

点击Quues and Streams创建一个队列first_queue。

(2)查看交换机

点击Exchanges,查看当前的交换机。

(3)发送消息

这里先选择一个交换机,以amp.direct为例,点击进入。

?在其下方可以看到Publish message,然后在Payload中填写消息信息:this is a message,点击发送,这时弹出提示可以看到:消息发布了,但是没有被路由。在前面我们介绍了:消息发送给交换机之后,交换机会将消息路由到队列中。但是这里并没有路由到队列中,这是为什么呢?很简单,因为目前amq.redirect交换机和first_queue队列之间并没有绑定,那么自然而然的交换机就不会将消息路由到队列中。从这里我们可以看到:交换机是负责消息的路由转发的,并没有存储消息的能力。那么如何实现交换机和队列的绑定呢?请继续。

(4)绑定交换机和队列

在amq.direct交换机里面可以看到:目前该交换机并没有绑定任何队列。

因此,我们可以在这里选择first_queue队列,并与之绑定。在Bingdings里面填写队列名称之后,即可绑定。

?再回到队列里面查看,同样也可以看到队列已经绑定。

(5)发送消息验证

前面绑定了队列,这里我们再次发送消息,可以看到:消息已经发送成功!

我们看下交换机的信息,可以看到有个消息已经发送。?

?

?我们再去队列里面看下,同样可以在Overview里面看到有一个消息,在 get message里面也可以看到消息。

?

三、Java操作RabbitMQ

1、基本准备

(1)引入依赖包

创建Maven工程,引入下面的依赖:

        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

(2)创建工具类

package com.yht.RabbitMQDemo.simple.utils;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {
    //队列名称
    public    static final String QUEUE_NAME = "learn_queue";
    public static Connection getConnection() throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机地址  如果是本机就是localhost   如果在其他 地方比如:虚拟机中 那么就是ip地址
        connectionFactory.setHost("localhost");
        //连接端口;默认为 5672
        connectionFactory.setPort(5672);
        //虚拟主机名称   就是和你用户绑定的虚拟机  在创建用户时候就指定了
        connectionFactory.setVirtualHost("/virtualHost1");
        //连接用户名
        connectionFactory.setUsername("guest");
        //连接密码
        connectionFactory.setPassword("guest");
        //创建连接
        return connectionFactory.newConnection();
    }

}

2、生产者生产消息

创建生产者Producer

package com.yht.RabbitMQDemo.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yht.RabbitMQDemo.simple.utils.RabbitMQUtils;


public class Producer {
    public static void main(String[] args) throws Exception {
        //创建连接
        Connection connection = RabbitMQUtils.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(RabbitMQUtils.QUEUE_NAME, true, false, false, null);

        // 要发送的信息
        String message = "Hello RabbitMQ!";
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish("", RabbitMQUtils.QUEUE_NAME, null, message.getBytes());
        System.out.println("消息:" + message + "已经发送!");
        // 关闭资源
        channel.close();
        connection.close();
    }
}

执行上述代码之后,可以看到控制台的打印信息:

同时,也可以进入RabbitMQ管理界面,即在浏览器地址栏输入:?http://localhost:15672/??,并输入用户名和密码之后,可以看到:

?

?

?

3、消费者取出消息

创建消费者Consumer

package com.yht.RabbitMQDemo.simple;

import com.rabbitmq.client.*;
import com.yht.RabbitMQDemo.simple.utils.RabbitMQUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

//消费者
public class Consumer {

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(RabbitMQUtils.QUEUE_NAME, true, false, false, null);

        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
            }
        };
        //监听消息
        /**
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(RabbitMQUtils.QUEUE_NAME, true, consumer);

        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
    }
}

这里可以执行下消费者,控制台输出如下:

我们再登进网页端可以去看下队列里面的消息,提示队列为空,这是因为在basicConsume()方法中,我们将第二个参数设置为true,代表当取出这个消息之后,就会将其删除;如果取出消息后不想删除,则可以将其置为false。

四、SpringBoot整合RabbitMQ

1、整合过程

(1)依赖包

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

(2)编写yml文件

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /virtualHost1

(3)编写配置类


import org.springframework.amqp.core.*;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    //定义交换机Bean
    @Bean("directExchange")
    public Exchange exchange(){
        return ExchangeBuilder.directExchange("amq.direct").build();
    }

    //定义消息队列
    @Bean("learnQueue")
    public Queue queue(){
        return QueueBuilder.nonDurable("learn_queue").build();
    }

    @Bean("binding")
    public Binding binding(@Qualifier("directExchange")Exchange exchange, @Qualifier("learnQueue")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my-key").noargs();
    }
}

(4)生产者生产消息

在测试类里编写测试方法

    @Test
	void producer() {
		//最后一个是消息
		rabbitTemplate.convertAndSend("amq.direct", "my-key", "Hello SpringBoot");
	}

执行完成之后,可以在learn_queue里面看到在该队列里面已经有一个消息了。?

(5)消费者消费消息

创建监听器


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MQListener {

    @RabbitListener(queues = "learn_queue")
    public void getMessage(Message message){
        System.out.println( "消费者取出消息:" + new String(message.getBody()));
    }
}

启动SpringBoot,就可以看到消费者取出了消息。?

五、RabbitMQ案例

1、Work Queues模型

前面在SpringBoot里面整合RabbitMQ的案例是比较简单的,只有一个消费者和一个生产者,这里我们介绍一个新的模型,实现一个队列绑定多个消费者。如下图:

这里我们先来模拟一个场景,即:生产者生产50个消息,由两个消费者来进行消费。因此,我们对上面的代码做一个改变。

(1)创建一个队列:work.queue

?

(2)生产者生产50个消息到work.queue

    @Test
	void producer() {
		for (int i = 1; i <= 50; i++) {
			String msg = "Hello SpringBoot: + 【" + i + "】";
			rabbitTemplate.convertAndSend("amq.direct", "my-key", msg);
		}
		System.out.println("消息已经发送");
	}

?(3)消费者取出消息

在监听器里面创建两个消费者,都监听work.queue队列。

@Component
public class MQListener {

    @RabbitListener(queues = "learn_queue")
    public void getMessage1(Message message){
        System.out.println( "【消费者1】取出消息:" + new String(message.getBody()));
    }

    @RabbitListener(queues = "learn_queue")
    public void getMessage2(Message message){
        System.err.println( "【消费者2】取出消息:" + new String(message.getBody()));
    }
}

这里我们来看下执行的结果:

从上面的图我们可以看到几个关键点:

  • 两个消费者是以轮询的方式来处理消息的,即生产者的消息平均分给了两个消费者
  • 每个消息只会被消费者消费一次

这里就带来了一个问题:如果两个消费者的能力不一样,那么这种轮询的方式就无法很好的利用消费者的性能。我们更希望的是如果消费者1的性能比较好,处理的比较快,那么完全可以让消费者1多处理一些消息,这样更符合我们的实际场景。

综上所述,在默认情况下,RabbitMQ会将消息依次轮询的绑定到队列的每一个消费者,这种情况下因为消费者的性能不同会导致出现消息堆积的情况。因此,我们就需要修改配置文件,设置preFetch值为1,确保同一时刻最多给消费者一条消息,即:每个消费者只有在处理完当前消息之后,才能获取下一个消息。如下:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /virtualHost1
    listener:
      simple:
        prefetch: 1

这里修改下消费者的代码,模拟两个消费者的能力不同。

@Component
public class MQListener {

    @RabbitListener(queues = "learn_queue")
    public void getMessage1(Message message) throws InterruptedException {
        Thread.sleep(20);
        System.out.println( "【消费者1】取出消息:" + new String(message.getBody()));
    }

    @RabbitListener(queues = "learn_queue")
    public void getMessage2(Message message) throws InterruptedException {
        Thread.sleep(200);
        System.err.println( "【消费者2】取出消息:" + new String(message.getBody()));
    }
}

执行结果如下:

2、Fanout交换机

Fanout交换机会将接收到的消息光波导每一个跟其绑定的queue,因此也称为广播模式,如下:

(1) 在RabbitMQ控制台创建两个队列,并绑定到fanout交换机。

?

(2)生产者生产消息

    @Test
	void fanout() {
		rabbitTemplate.convertAndSend("amq.fanout", "fanout", "Hello Everyone");

	}

(3)消费者监听消息

     @RabbitListener(queues = "fanout.queue1")
    public void fanout1(Message message) throws InterruptedException {
        System.out.println( "fanout.queue1的消息:" + new String(message.getBody()));
    }
    @RabbitListener(queues = "fanout.queue2")
    public void fanout(Message message) throws InterruptedException {
        System.err.println( "fanout.queue2的消息:" + new String(message.getBody()));
    }

执行结果如下:?

3、Direct交换机

该交换机会将接收到的消息根据规则路由到指定的Queue,因此又被称为定向路由。发布者在发送消息时,会指定消息的RoutingKey,而每个Queue都与Exchange会设置一个BindingKey,之后交换机会将消息路由到BindingKey与消息队列RoutingKey一致的队列。?如下图:

(1) 在RabbitMQ控制台创建两个队列,并绑定到fanout交换机。

?

(2)创建消费者

    @RabbitListener(queues = "direct.queue1")
    public void direct1(Message message){
        System.out.println( "direct.queue1的消息:" + new String(message.getBody()));
    }
    @RabbitListener(queues = "direct.queue2")
    public void direct2(Message message) {
        System.err.println( "direct.queue2的消息:" + new String(message.getBody()));
    }

(3)生产者生产消息

	@Test
	void direct() {
		rabbitTemplate.convertAndSend("amq.direct", "red", "明天暴雨");
	}

	@Test
	void direct() {
		rabbitTemplate.convertAndSend("amq.direct", "blue", "明天微风,适合游玩");
	}

?

?

4、Topic交换机

该交换机与Direct Exchange类似,区别在于routingKey可以时多个单词的列表,并且以.分割。也就是说,Topic交换机接收的消息的RoutingKey可以是多个单词,单词之间以.分割。

Topic交换机的通配符有两种形式:

  • *(星号):表示匹配一个单词。
  • #(井号):表示匹配零个或多个单词。

?(1)创建两个队列

(2)修改消费者

    @RabbitListener(queues = "topic.queue1")
    public void topic1(Message message) {
        System.out.println( "topic.queue1的消息:" + new String(message.getBody()));
    }
    @RabbitListener(queues = "topic.queue2")
    public void topic2(Message message){
        System.err.println( "topic.queue2的消息:" + new String(message.getBody()));
    }

(3)创建生产者

?第一种:两个队列的routingkey都匹配成功

	@Test
	void Topic() {
		rabbitTemplate.convertAndSend("amq.topic", "zhejiang.weather", "今天天气挺好");
	}

??第二种:只有一个队列的routingkey匹配成功

	@Test
	void Topic() {
		rabbitTemplate.convertAndSend("amq.topic", "zhejiang.news", "杭州发优惠券了");
	}

5、声明队列和交换机

(1)方式一:代码方式

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder创建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectListener {
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("amq.direct");
    }

    @Bean
    public Queue directQueue(){
        return new Queue("direct.queue1");
    }

    @Bean
    public Binding directBind(Queue directQueue, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue).to(directExchange).withQueueName();
    }
}

(2)方式二:注解方式

Spring AMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式。

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1", durable = "true"),
            exchange = @Exchange(name = "amq.direct", type = ExchangeTypes.DIRECT),
            key = {"red"}
    ))
    public void directByAnno(Message message){
        System.out.println( "direct.queue1的消息:" + new String(message.getBody()));
    }

6、消息转换器

如果我们需要利用Spring AMQP发送对象类型的消息,需要基于JDK的ObejctOutputStream完成序列化。否则的话,拿到的消息就是有问题的,如下图:

(1)引入依赖

        <dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
		</dependency>

(2)配置消息转换器MessageConverter

    @Bean
	public MessageConverter messageConverter(){
		return new Jackson2JsonMessageConverter();
	}

(3)发送消息

    @Test
	void fanout() {
		Map<String, Object> map = new HashMap<>();
		map.put("name", "tom");
		map.put("age", "23");
		rabbitTemplate.convertAndSend("amq.fanout", "fanout", map);

	}

?执行之后,可以去获取消息,如下:


?

文章来源:https://blog.csdn.net/weixin_47382783/article/details/134656653
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。