55.MQ高级特性

2023-12-13 11:23:51

目录

一、RabbitMQ部署指南。

1)单机部署。

1.1.下载镜像

1.2.安装MQ

2)安装DelayExchange插件。

2.1.下载插件

2.2.上传插件

2.3.安装插件

2.4.使用插件。

3)集群部署。

3.1.集群分类

3.2.获取cookie

3.3.准备集群配置

3.4.启动集群

3.5.测试

2.5.1.数据共享测试

2.5.2.可用性测试

4)镜像模式。

4.1.镜像模式的特征

4.2.镜像模式的配置

4.2.1.exactly模式(命令在容器里面执行)

4.2.2.all模式(命令在容器里面执行)

4.2.3.nodes模式(命令在容器里面执行)

4.3.测试

4.3.1.测试数据共享

4.3.2.测试高可用

5)仲裁队列。

5.1.添加仲裁队列

5.2.测试

5.3.集群扩容

5.3.1.加入集群

5.3.2.增加仲裁队列副本

二、MQ的一些常见问题。

三、rabbitmq有五种消息模型。

四、消息可靠性。

1)消息可靠性问题。

2)生产者消息确认。

3)消息持久化。

4)消费者消息确认。

5)消费失败重试机制。

五、死信交换机。

1)初识死信交换机。

2)TTL(Time-To-Live存活时间)。

3)延迟队列。

六、惰性队列。

1)消息堆积问题。

2)惰性队列。

七、MQ集群。

1)集群分类。

2)普通集群。

3)镜像集群。

4)仲裁队列。


一、RabbitMQ部署指南。

1)单机部署。

我们在Centos7虚拟机中使用Docker来安装。

1.1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3.8-management

方式二:从本地加载

在课前资料已经提供了镜像包:

上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

1.2.安装MQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

2)安装DelayExchange插件。

官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

因为我们之前是基于Docker安装RabbitMQ,所以下面我们会讲解基于Docker来安装RabbitMQ插件。

2.1.下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

其中包含各种各样的插件,包括我们要使用的DelayExchange插件:

大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9这个对应RabbitMQ的3.8.5以上版本。

课前资料也提供了下载好的插件:

2.2.上传插件

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。如果不是基于Docker的同学,请参考第一章部分,重新创建Docker容器。

我们之前设定的RabbitMQ的数据卷名称为mq-plugins,所以我们使用下面命令查看数据卷:

docker volume inspect mq-plugins

可以得到下面结果:

接下来,将插件上传到这个目录即可:

2.3.安装插件

最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令:

docker exec -it mq bash

执行时,请将其中的 -it 后面的mq替换为你自己的容器名.

进入容器内部后,执行下面命令开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

结果如下:

2.4.使用插件。

3)集群部署。

接下来,我们看看如何安装RabbitMQ的集群。

3.1.集群分类

在RabbitMQ的官方文档中,讲述了两种集群的配置方式:

  • 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。

  • 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。

我们先来看普通模式集群,我们的计划部署3节点的mq集群:

主机名控制台端口amqp通信端口
mq18081 ---> 156728071 ---> 5672
mq28082 ---> 156728072 ---> 5672
mq38083 ---> 156728073 ---> 5672

集群中的节点标示默认都是:rabbit@[hostname],因此以上三个节点的名称分别为:

  • rabbit@mq1

  • rabbit@mq2

  • rabbit@mq3

3.2.获取cookie

RabbitMQ底层依赖于Erlang,而Erlang虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个RabbitMQ 节点使用 cookie 来确定它们是否被允许相互通信。

要使两个节点能够通信,它们必须具有相同的共享秘密,称为Erlang cookie。cookie 只是一串最多 255 个字符的字母数字字符。

每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信。

我们先在之前启动的mq容器中获取一个cookie值,作为集群的cookie。执行下面的命令:

docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

可以看到cookie值如下:

FXZMCVGLBIXZCDEMMVZQ

接下来,停止并删除当前的mq容器,我们重新搭建集群。

docker rm -f mq

3.3.准备集群配置

在/tmp目录新建一个配置文件 rabbitmq.conf:

cd /tmp
# 创建文件
touch rabbitmq.conf

文件内容如下:

loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

在上面的示例中,"mq1"、"mq2" 和 "mq3" 应该是你为每个节点设置的实际主机名。在 /etc/hosts 文件或 DNS 中配置好主机名与 IP 地址的映射,以便节点之间能够相互识别和连接。

再创建一个文件,记录cookie

cd /tmp
# 创建cookie文件
touch .erlang.cookie
# 写入cookie
echo "FXZMCVGLBIXZCDEMMVZQ" > .erlang.cookie
# 修改cookie文件的权限
chmod 600 .erlang.cookie

准备三个目录,mq1、mq2、mq3:

cd /tmp
# 创建目录
mkdir mq1 mq2 mq3

然后拷贝rabbitmq.conf、cookie文件到mq1、mq2、mq3:

# 进入/tmp
cd /tmp
# 拷贝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3

#查看目录文件(-a 包含隐藏目录文件):

?ll -a mq1

3.4.启动集群

创建一个网络:

docker network create mq-net

运行命令

docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management

3.5.测试

在mq1这个节点上添加一个队列:

如图,在mq2和mq3两个控制台也都能看到:

2.5.1.数据共享测试

点击这个队列,进入管理页面:

然后利用控制台发送一条消息到这个队列:

结果在mq2、mq3上都能看到这条消息:

2.5.2.可用性测试

我们让其中一台节点mq1宕机:

docker stop mq1

然后登录mq2或mq3的控制台,发现simple.queue也不可用了:

说明数据并没有拷贝到mq2和mq3。

4)镜像模式。

在刚刚的案例中,一旦创建队列的主机宕机,队列就会不可用。不具备高可用能力。如果要解决这个问题,必须使用官方提供的镜像集群方案。

官方文档地址:https://www.rabbitmq.com/ha.html

4.1.镜像模式的特征

默认情况下,队列只保存在创建该队列的节点上。而镜像模式下,创建队列的节点被称为该队列的主节点,队列还会拷贝到集群中的其它节点,也叫做该队列的镜像节点。

但是,不同队列可以在集群中的任意节点上创建,因此不同队列的主节点可以不同。甚至,一个队列的主节点可能是另一个队列的镜像节点

用户发送给队列的一切请求,例如发送消息、消息回执默认都会在主节点完成,如果是从节点接收到请求,也会路由到主节点去完成。镜像节点仅仅起到备份数据作用

当主节点接收到消费者的ACK时,所有镜像都会删除节点中的数据。

总结如下:

  • 镜像队列结构是一主多从(从就是镜像)

  • 所有操作都是主节点完成,然后同步给镜像节点

  • 主宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)

  • 不具备负载均衡功能,因为所有操作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)

4.2.镜像模式的配置

镜像模式的配置有3种模式:

ha-modeha-params效果
准确模式exactly队列的副本量count集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
all(none)队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。
nodesnode names指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。

这里我们以rabbitmqctl命令作为案例来讲解配置语法。

语法示例:

4.2.1.exactly模式(命令在容器里面执行)
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • rabbitmqctl set_policy:固定写法

  • ha-two:策略名称,自定义

  • "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称

  • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容

    • "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量

    • "ha-params":2:策略参数,这里是2,就是副本数量为2,1主1镜像

    • "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销

4.2.2.all模式(命令在容器里面执行)
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
  • ha-all:策略名称,自定义

  • "^all\.":匹配所有以all.开头的队列名

  • '{"ha-mode":"all"}':策略内容

    • "ha-mode":"all":策略模式,此处是all模式,即所有节点都会称为镜像节点

4.2.3.nodes模式(命令在容器里面执行)
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
  • rabbitmqctl set_policy:固定写法

  • ha-nodes:策略名称,自定义

  • "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.开头的队列名称

  • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容

    • "ha-mode":"nodes":策略模式,此处是nodes模式

    • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称

4.3.测试

我们使用exactly模式的镜像,因为集群节点数量为3,因此镜像数量就设置为2.

运行下面的命令:

docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

下面,我们创建一个新的队列:

在任意一个mq控制台查看队列:

4.3.1.测试数据共享

给two.queue发送一条消息:

然后在mq1、mq2、mq3的任意控制台查看消息:

4.3.2.测试高可用

现在,我们让two.queue的主节点mq1宕机:

docker stop mq1

查看集群状态:

查看队列状态:

发现依然是健康的!并且其主节点切换到了rabbit@mq2上

5)仲裁队列。

从RabbitMQ 3.8版本开始,引入了新的仲裁队列,他具备与镜像队里类似的功能,但使用更加方便。

5.1.添加仲裁队列

在任意控制台添加一个队列,一定要选择队列类型为Quorum类型。

在任意控制台查看队列:

可以看到,仲裁队列的 + 2字样。代表这个队列有2个镜像节点。

因为仲裁队列默认的镜像数为5。如果你的集群有7个节点,那么镜像数肯定是5;而我们集群只有3个节点,因此镜像数量就是3.

5.2.测试

可以参考对镜像集群的测试,效果是一样的。

5.3.集群扩容

5.3.1.加入集群

1)启动一个新的MQ容器:

docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management

2)进入容器控制台:

docker exec -it mq4 bash

3)停止mq进程

rabbitmqctl stop_app

4)重置RabbitMQ中的数据:

rabbitmqctl reset

5)加入mq1:

rabbitmqctl join_cluster rabbit@mq1

6)再次启动mq进程

rabbitmqctl start_app

5.3.2.增加仲裁队列副本

我们先查看下quorum.queue这个队列目前的副本情况,进入mq1容器:

docker exec -it mq1 bash

执行命令:

rabbitmq-queues quorum_status "quorum.queue"

结果:

现在,我们让mq4也加入进来:

rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"

结果:

再次查看:

rabbitmq-queues quorum_status "quorum.queue"

查看控制台,发现quorum.queue的镜像数量也从原来的 +2 变成了 +3:

二、MQ的一些常见问题。

三、rabbitmq有五种消息模型。

RabbitMQ支持多种消息模型,包括以下五种:

  1. 简单模式(Simple Mode):最基本的消息模型,包括生产者发送消息到队列,然后消费者从队列中接收消息。这是 RabbitMQ 最简单的工作模式。

  2. 工作队列模式(Work Queues):也称为任务队列模式,多个消费者共享一个队列,当消息到达时,会被平均分配给不同的消费者进行处理。

  3. 发布/订阅模式(Publish/Subscribe):包括一个生产者将消息发送到交换机,然后多个消费者绑定到交换机上的队列,实现广播消息的发送和接收。

  4. 路由模式(Routing):在发布/订阅模式的基础上,增加了对消息的路由规则,消费者可以根据指定的路由键来选择接收特定类型的消息。

  5. 主题模式(Topics):类似于路由模式,但是更加灵活,支持对消息的多重条件匹配,消费者可以使用通配符进行订阅,从而接收符合指定模式的消息。

这些消息模型可以根据具体的业务需求和场景进行选择和应用,以实现不同的消息传递和处理方式。

四、消息可靠性。

1)消息可靠性问题。

2)生产者消息确认。

调用回调函数的时候会传进来那几个参数。

判断是否是延迟消息,如果是延迟消息,则忽略这个错误提示。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode,replayText, exchange, routingKey) -> {
            //判断是否是延迟消息
            if (message.getMessageProperties().getReceivedDelay() > 0) {
                //如果大于0,那就是延迟消息,则忽略这个错误提示。
                return;
            }
            //记录日志 这里大括号{}代表占位符
            log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",replyCode,replayText,exchange,routingKey,message.toString());
            //如果有需要的话,重发消息
        });
    }
}

如果有收到消息确认就走第一个方法,没有收到消息确认就调用第二个方法。只要有收到消息确认,就一定会执行第一个方法。如果交换机发送到队列失败,收到消息确认,那就执行第一个方法,并且调用上面写的回调方法。(第一个方法是指如果收到成功发送到交换机的消息确认就调用第一个方法,然后交换机再发送到队列如果失败则调用上面写的回调方法。所以上面的回调方法和下面的回调方法是应用在不同的阶段的方法)

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        //登录rabbitmq的控制台,创建队列和绑定到交换机
        //1.准备消息
        String message = "hello, spring amqp!";
        //2.准备CorrelationData
        //2.1准备消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //2.2.准备ConfirmCallback
        correlationData.getFuture().addCallback(confirm -> {
            //判断结果
            if (confirm.isAck()){
                //ACK,到达交换机执行这里(后面如果交换机到达队列失败,则调用return回调方法)
                log.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());
            }else {
                //NACK,未到达交换机执行这里
                log.error("消息投递到交换机失败!消息ID:{}",correlationData.getId());
                //可以重发消息
            }
        }, throwable -> {
            //记录日志
            log.error("消息发送失败!",throwable);
            //有需要的话,可以重发消息
        });
        //3.发送消息
        rabbitTemplate.convertAndSend("amq.topic", "simple.test", message,correlationData);
    }
}

SpringAMQP中处理消息确认的几种情况:

publisher- comfirm
? 消息成功发送到 exchange ,返回 ack
? 消息发送失败,没有到达交换机,返回 nack
? 消息发送过程中出现异常,没有收到回执
消息成功发送到 exchange ,但没有路由到 queue ,调用 ReturnCallback

3)消息持久化。

交换机、队列和消息默认情况下就是持久化的,所以只需要用以前的那种创建方式就可以了。(如果不需要持久化就可以进行下面的配置,把参数改变一下)

下面的配置作用:启动类一启动,rabbitmq就会创建对应的交换机和队列。(下面的配置作用就是创建交换机和队列等)(默认就是持久化的,可以直接创建)

@Configuration
public class CommonConfig {
    @Bean
    public DirectExchange simplieDirect(){
        return new DirectExchange("simple.direct",true,false);
    }
    @Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("simple.queue").build();
    }
}

下面是创建持久化消息:(默认就是持久化的,可以直接发送)

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testDurableMessage(){
        //1.准备消息
        Message message = MessageBuilder.withBody("hello,spring".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
        //2.发送消息
        rabbitTemplate.convertAndSend("simple.queue",message);
    }
}

4)消费者消息确认。

5)消费失败重试机制。

如果default-requeue-rejected: false 和 retry都写,则执行重试耗尽后,再执行死信,如果是RepublishMessageRecoverer模式(该模式就是把失败消息发送到其他队列),只执行重试,不执行死信。
spring:
  rabbitmq:
    host: 192.168.203.129 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /
    listener:
      simple:
        prefetch: 1 # 每次取
        acknowledge-mode: auto #默认就是auto
        default-requeue-rejected: false #加上这个才会失败后就会被认为是死信
        retry: # 如果default-requeue-rejected: false 和 retry都写,则执行重试耗尽后,再执行死信,如果是RepublishMessageRecoverer模式,只执行重试,不执行死信
          enabled: true
          initial-interval: 1000
          multiplier: 3
          max-attempts: 3
          stateless: true

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }
    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}

如何确保RabbitMQ消息的可靠性?

? 开启生产者确认机制,确保生产者的消息能到达队列
? 开启持久化功能,确保消息未消费前在队列中不会丢失
? 开启消费者确认机制为 auto ,由 spring 确认消息处理成功后完成 ack
? 开启消费者失败重试机制,并设置 MessageRecoverer ,多次重试失败后将消息投递到异常交换机,交由人工处理

五、死信交换机。

1)初识死信交换机。

spring:
  rabbitmq:
    host: 192.168.203.129 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /
    listener:
      simple:
        prefetch: 1 # 每次取
        acknowledge-mode: auto #默认就是auto
        default-requeue-rejected: false #加上这个才会失败后就会被认为是死信

什么样的消息会成为死信?

? 消息被消费者 reject 或者返回 nack
? 消息超时未消费
? 队列满了

如何给队列绑定死信交换机?

? 给队列设置 dead-letter-exchange 属性,指定一个交换机
? 给队列设置 dead-letter-routing-key 属性,设置死信交换机与死信队列的 RoutingKey

2)TTL(Time-To-Live存活时间)。

这里监听消费信息,同时也是启动就创建dl交换机、队列和路由键。

这里给队列设置超时时间和绑定私信交换机等,如果出现私信就发到绑定的交换机。

这里给信息设置超时时间。

当消息(只针对当前消息的超时时间)和队列都有设置超时时间时,以时间较短的为准。

消息超时的两种方式是?

? 给队列设置 ttl 属性,进入队列后超过 ttl 时间的消息变为死信
? 给消息设置 ttl 属性,队列接收到消息超过 ttl 时间后变为死信
? 两者共存时,以时间短的 ttl 为准

如何实现发送一个消息20秒后消费者才收到消息?(这种方式不设置消费者,让其超时)

? 给消息的目标队列指定死信交换机
? 消费者监听与死信交换机绑定的队列
? 发送消息时给消息设置 ttl 20

3)延迟队列。

延迟的使用看红色圈圈 圈起来的就行,改动的地方不多。

前面写的交换机发送到队列失败就执行的回调方法,在这延迟下也调用了,明显不合理。(因为这里只不过是延迟了发送到队列,并不是失败)

判断是否是延迟消息,如果是延迟消息,则忽略这个错误提示。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode,replayText, exchange, routingKey) -> {
            //判断是否是延迟消息
            if (message.getMessageProperties().getReceivedDelay() > 0) {
                //如果大于0,那就是延迟消息,则忽略这个错误提示。
                return;
            }
            //记录日志 这里大括号{}代表占位符
            log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",replyCode,replayText,exchange,routingKey,message.toString());
            //如果有需要的话,重发消息
        });
    }
}

延迟队列插件的使用步骤包括哪些?

? 声明一个交换机,添加 delayed 属性为 true
? 发送消息时,添加 x-delay 头,值为超时时间

六、惰性队列。

1)消息堆积问题。

2)惰性队列。

惰性队列是消息一进来就放到磁盘存储。

lazy-queue$(写的是正则表达式):需要改为惰性队列的队列名称。

"queue-mode":"lazy":这里是指队列模式是惰性队列。

--apply-to queues:应用到所以匹配的队列。

发送100万条消息时,发送到惰性队列没失败,但是发送到非惰性队列就出现失败。

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testLazyQueue(){
        for (int i = 0; i < 1000000; i++) {
            //1.准备消息
            Message message = MessageBuilder
                    .withBody("hello,Spring".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            //2.发送消息
            rabbitTemplate.convertAndSend("lazy.queue",message);
        }
    }

    @Test
    public void testNormalQueue(){
        for (int i = 0; i < 1000000; i++) {
            //1.准备消息
            Message message = MessageBuilder
                    .withBody("hello,Spring".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            //2.发送消息
            rabbitTemplate.convertAndSend("normal.queue",message);
        }
    }
}

消息堆积问题的解决方案?

? 队列上绑定多个消费者,提高消费速度
? 给消费者开启线程池,提高消费速度
? 使用惰性队列,可以再 mq 中保存更多消息

惰性队列的优点有哪些?

? 基于磁盘存储,消息上限高
? 没有间歇性的 page-out ,性能比较稳定

惰性队列的缺点有哪些?

? 基于磁盘存储,消息时效性会降低
? 性能受限于磁盘的 IO

七、MQ集群。

1)集群分类。

在RabbitMQ的官方文档中,讲述了两种集群的配置方式:

  • 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。

  • 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。

2)普通集群。

这种很像分片集群,只不过是没有备份副本分片,所以当有一节点宕机时,当前节点的队列都使用不了。

3)镜像集群。

很像es的分片集群。

exactly模式:count设置为2(即镜像节点为1),当mq1节点宕机时,two.queue的镜像节点变为主节点,然后再选一个mq3为镜像节点,当mq1恢复后,它已经不是two.queue的主节点和镜像节点了。

注意:这里的镜像节点是只有匹配的队列才能创建该队列的镜像节点。(也就是说,不匹配的队列跟普通集群那么一样,只有匹配的队列才有副本节点(从节点或备份))

4)仲裁队列。

镜像队列的数据同步不是强一致,有可能导致数据丢失,所以才有了仲裁队列。

注意:只要创建的是仲裁队列,它默认形成镜像模式,count默认值为5(即前面所说的镜像集群,如果节点数量低于count,相当于使用镜像模式中的all模式)。(前面的镜像集群需要我们指定匹配的队列才会形成镜像模式)

文章来源:https://blog.csdn.net/khmff/article/details/134770618
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。