RabbitMQ
文章目录
RabbitMQ
1 介绍
RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP,Advanced Message Queuing Protocol)。它设计用于在分布式系统中传递消息,提供了一种可靠的、异步的通信方式,帮助不同的应用程序或组件之间进行解耦。
以下是 RabbitMQ 的一些主要特点和概念:
- 消息代理(Message Broker): RabbitMQ 充当消息代理,负责接收、存储和转发消息。
- 消息队列(Message Queue): RabbitMQ 使用消息队列来存储消息。生产者将消息发送到队列,然后消费者从队列中接收和处理消息。队列采用先进先出(FIFO)的原则,即先发送的消息会先被消费。
- 生产者(Producer): 生产者是消息的发送方,负责将消息发送到 RabbitMQ 的队列。
- 消费者(Consumer): 消费者是消息的接收方,负责从队列中获取消息并进行处理。
- 交换器(机)(Exchange): 交换机用于将消息路由到一个或多个队列。生产者将消息发送到交换机,而交换机根据规则将消息路由到相应的队列。
- 路由键(Routing Key): 路由键是用于指定消息路由规则的关键字。在发送消息时,生产者通过指定路由键将消息发送到交换器。在某些交换器类型中,路由键用于匹配与之绑定的队列,决定消息将被发送到哪个队列。
- 绑定(Binding): 绑定是交换机和队列之间的关联规则,它定义了消息应该如何从交换机路由到队列。
- 虚拟主机(Virtual Host): 虚拟主机提供了一种逻辑隔离机制,允许在同一物理服务器上运行多个独立的消息代理。
- 持久化(Durable): RabbitMQ 允许将队列和消息标记为持久的,确保在代理重启时消息不会丢失。
使用 RabbitMQ 可以有效地处理系统之间的异步通信,提高系统的可伸缩性和可维护性。它在分布式系统、微服务架构和异步任务处理等场景中广泛应用。
工作原理图:
1.1 为什么使用 RabbitMQ
RabbitMQ 提供了以下优势:
-
解耦与可靠性: 通过消息队列,系统的不同部分可以独立工作,提高可维护性和可扩展性。消息的可靠传递确保消息不会丢失,即使某个组件不可用。
-
异步通信: 消息队列支持异步通信,生产者将消息发送到队列,而消费者从队列中接收并处理消息,实现了松耦合和高效通信。
-
处理负载峰值: RabbitMQ 能够缓冲和调整消息流,有助于处理系统中的负载峰值,防止系统过载。
-
消息路由与灵活性: 不同类型的交换器使得消息能够以灵活的方式进行路由,满足多样化的应用场景。
1.2 RabbitMQ 的关键特性
-
多种交换器类型: 包括直连、扇出、主题和头交换器,支持不同的消息路由策略。
-
消息持久化: RabbitMQ 允许将消息和队列标记为持久的,确保消息不会在代理重启时丢失。
-
灵活的消息路由: 使用 Routing Key 和交换器,可以根据需求定义复杂的消息路由规则。
-
可扩展性与集群支持: RabbitMQ 提供了水平扩展的能力,支持构建高可用性的集群。
-
安全性: 支持虚拟主机,提供权限控制和加密传输,确保消息的安全性。
2 RabbitMQ 安装与配置
2.1 先安装Docker
安装gcc
yum -y install gcc gcc-c++
安装软件包
yum install -y yum-utils device-mapper-persistent-data lvm2
设置镜像仓库
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
更新yum
yum makecache fast
安装免费版本的docker-ce
yum -y install docker-ce
启动docker
systemctl start docker
入门hello-world
docker run hello-world
证明docker安装成功!
2.2 配置RabbitMQ
先执行
docker search rabbitmq:management
拉取镜像
docker pull macintoshplus/rabbitmq-management
查看镜像
docker images
创建并运行一个RabbitMQ容器:
设置容器的主机名为kdxing,设置容器指定名称为 rabbitmq,设置RabbitMQ的默认用户名和密码,
将容器的15672端口映射到主机的15672端口,15672端口是RabbitMQ的Web管理界面端口。
将容器的5672端口映射到主机的5672端口,5672端口是RabbitMQ的AMQP协议端口。
设置Docker镜像的名称或ID为c20
docker run -d --hostname kdxing --name rabbitmq -e rabbitmq_default_user=guest -e rabbitmq_user_pass=guest -p 15672:15672 -p 5672:5672 c20
查看容器
docker ps -a
然后打开浏览器输入http://192.168.64.128:15672,界面如下:
输入用户名和密码guest进入RabbitMQ的Web管理界面:
此时,RabbitMQ配置成功!
3 Spring AMQP入门案例
3.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
spring.application.name=mq-demo01
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 自定义一个属性设置队列
mq.queue.name=hello-queue01
3.3 创建生产者
创建一个生产者类,用于发送消息到 RabbitMQ:
package com.kdx.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.queue.name}")
private String queueName;
//发送消息
public void send(String msg){
amqpTemplate.convertAndSend(queueName,msg);
}
}
3.4 创建消费者
创建一个消费者类,用于接收并处理从 RabbitMQ 收到的消息:
package com.kdx.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//接收Consumer消息的消费者
@Component
public class Receiver {
@RabbitListener(queues = {"${mq.queue.name}"})
public void process(String msg){
System.out.println("Receiver:" + msg);
}
}
3.5 创建配置类
package com.kdx.config;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Value("${mq.queue.name}")
private String queueName;
@Bean
public Queue createQueue(){
return new Queue(queueName);
}
}
3.6 测试
package com.kdx;
import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqDemo01ApplicationTests {
@Autowired
private Sender sender;
@Test
void testSend() {
sender.send("RabbitMQ入门案例");
}
}
启动Application,测试testSend()方法,查看控制台:
发送消息和接收消息成功!
查看RabbitMQ的Web管理界面,点击队列,发现hello-queue01:
4 交换器(Exchange)类型
交换器是消息的分发中心,负责将消息路由到一个或多个队列。生产者将消息发送到交换器,而交换器根据规则将消息发送到与之绑定的队列。不同类型的交换器定义了不同的路由策略,包括直连交换器、扇出交换器、主题交换器和头交换器。
在 RabbitMQ 中,有几种不同类型的交换器(Exchange Types),每种类型都定义了不同的消息路由规则。以下是 RabbitMQ 支持的主要交换器类型:
4.1 Direct Exchange(直连交换器)
- 根据消息的路由键(Routing Key)将消息直接发送到指定队列。
- 在发送消息时,指定的 Routing Key 必须与队列绑定时指定的 Routing Key 相匹配。
4.2 Fanout Exchange(扇出交换器)
- 将消息广播到绑定到该交换器的所有队列,无论消息的 Routing Key 是什么。
- 不关心消息的 Routing Key,消息会被发送到所有与交换器绑定的队列。
4.3 Topic Exchange(主题交换器)
- 使用通配符的方式进行消息的路由。
- 在发送消息时,可以使用通配符模式匹配 Routing Key,将消息发送到与模式匹配的队列。
- 通配符有两种:
*
匹配一个单词,#
匹配零个或多个单词。
4.4 Headers Exchange(头交换器)
- 根据消息的头部属性来进行路由。
- 在发送消息时,可以通过设置消息的头部属性,交换器会根据头部属性匹配规则将消息发送到对应的队列。
4.5 System Exchange(默认交换器)
- 默认交换器是一个特殊的直连交换器,无需指定交换器的名称。
- 当消息的 Routing Key 与队列的名称匹配时,消息会被发送到该队列。
选择交换器类型取决于消息路由需求。例如,如果希望将消息直接发送到指定队列,可以选择 Direct Exchange;如果希望消息广播到所有队列,可以选择 Fanout Exchange;如果需要根据复杂的条件进行消息路由,可以选择 Topic Exchange 或 Headers Exchange。
面试题
RabbitMQ为什么需要信道? 为什么不是TCP直接通道 ?
- Tcp创建和销毁开销特别大。
- 如果不用信道,大量的请求过来,会造成性能的瓶颈。
- 信道的原理是一条线程一条信道,多条线程多条通道同用一条TCP连接。
- 一条TCP连接可能容纳无限的信道,处理高并发的请求不会造成性能瓶颈。
5 Direct Exchange案例
1.在consumer服务中,编写两个消费者方法,分别监听log.info和log.error
2.在publisher中编写测试方法,向log. direct发送消息
5.1 消费者
5.1.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
5.1.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8081
spring.application.name=mq-demo02-consumer
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.direct
#设置队列info
mq.config.queue.info=log.info
#设置队列info的路由键
mq.config.info.routing.key=log.info.routing.key
#设置队列error
mq.config.queue.error=log.error
#设置队列error的路由键
mq.config.error.routing.key=log.error.routing.key
5.1.3 ErrorReceiver
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
key = "${mq.config.error.routing.key}"
))
public class ErrorReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("errorReceiver:" + msg);
}
}
5.1.4 InfoReceiver
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
key = "${mq.config.info.routing.key}"
))
public class InfoReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("infoReceiver:" + msg);
}
}
5.2 生产者
5.2.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
5.2.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8082
spring.application.name=mq-demo03-provider
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.direct
#设置队列info的路由键
mq.config.info.routing.key=log.info.routing.key
#设置队列error的路由键
mq.config.error.routing.key=log.error.routing.key
5.2.3 Sender
package com.kdx.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String exchange;
@Value("${mq.config.info.routing.key}")
private String routingKey1;
@Value("${mq.config.error.routing.key}")
private String routingKey2;
public void send1(String msg){
amqpTemplate.convertAndSend(exchange,routingKey1,msg);
}
public void send2(String msg){
amqpTemplate.convertAndSend(exchange,routingKey2,msg);
}
}
5.3 测试
package com.kdx;
import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqDemo03ProviderApplicationTests {
@Autowired
private Sender sender;
@Test
void testSend1() {
sender.send1("hello mq 1");
}
@Test
void testSend2() {
sender.send2("hello mq 2");
}
}
启动两个Application,执行testSend1和testSend2:
结果看到Direct交换器根据RoutingKey判断路由给哪个队列
6 Topic Exchange案例
1.在consumer服务中,编写三个消费者方法,分别监听log.info、log.error和log.all
2.在publisher中编写测试方法,向 topic发送消息
6.1 消费者
6.1.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.1.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8083
spring.application.name=mq-demo05-consumer
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.topic
#设置队列info
mq.config.queue.info=log.info
#设置队列error
mq.config.queue.error=log.error
#设置队列logs
mq.config.queue.logs=log.all
6.1.3 ErrorReceiver
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
key = "*.log.error"
))
public class ErrorReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("errorReceiver:" + msg);
}
}
6.1.4 InfoReceiver
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
key = "*.log.info"
))
public class InfoReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("infoReceiver:" + msg);
}
}
6.1.5 LogsReceiver
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
key = "*.log.*"
))
public class LogsReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("logsReceiver:" + msg);
}
}
6.2 生产者
6.2.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.2.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8084
spring.application.name=mq-demo04-provider
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.topic
6.2.3 GoodServer
package com.kdx.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class GoodServer {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg){
amqpTemplate.convertAndSend(exchange,"good.log.debug","good.log.debug:" + msg);
amqpTemplate.convertAndSend(exchange,"good.log.info","good.log.info:" + msg);
amqpTemplate.convertAndSend(exchange,"good.log.warn","good.log.warn:" + msg);
amqpTemplate.convertAndSend(exchange,"good.log.error","good.log.error:" + msg);
}
}
6.3 测试
package com.kdx;
import com.kdx.provider.GoodServer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqDemo04ProviderApplicationTests {
@Autowired
private GoodServer goodServer;
@Test
void test1() {
goodServer.send("hello mq");
}
}
启动消费者和生产者服务,执行test1():
结果看到使用通配符模式匹配 Routing Key,并将消息发送到与模式匹配的队列。
Topic交换器与队列绑定时的bindingKey可以指定通配符,而且Topic交换器接收的消息RoutingKey必须是多个单词,以
.
分割
7 Fanout Exchange案例
1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定
2.在consumer服务中,编写两个消费者方法,分别监听order.sms和order.push
3.在publisher中编写测试方法,向log.fanout发送消息
7.1 消费者
7.1.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
7.1.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8086
spring.application.name=mq-demo07-consumer
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.fanout
#设置队列Q1
mq.config.queue.sms=order.sms
#设置队列Q2
mq.config.queue.push=order.push
7.1.3 SmsReceiver
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.sms}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.FANOUT)
))
public class SmsReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("sms:" + msg);
}
}
7.1.4 PushReceiver
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.push}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.FANOUT)
))
public class PushReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("push:" + msg);
}
}
7.2 生产者
7.2.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
7.2.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8085
spring.application.name=mq-demo06-provider
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.fanout
7.2.3 Sender
package com.kdx.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg){
amqpTemplate.convertAndSend(exchange,"",msg); //routingKey不写
}
}
7.3 测试
package com.kdx;
import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqDemo06ProviderApplicationTests {
@Autowired
private Sender sender;
@Test
void testSend() {
sender.send("fanout广播");
}
}
启动消费者和生产者服务,测试testSend():
结果看到SmsReceiver和PushReceiver都接收到了交换器广播消息。
8 RabbitMQ 持久化
RabbitMQ 提供了消息的持久化机制,确保即使在 RabbitMQ 服务器重启后,消息仍然能够被恢复。这主要涉及到队列和消息的持久化。
在 5 Direct Echange的案例基础上修改测试方法
package com.kdx;
import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqDemo03ProviderApplicationTests {
@Autowired
private Sender sender;
@Test
void testSend1() {
for (int i = 1; i < 1000; i++) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sender.send1("hello mq 1 ..." + i);
}
}
}
必须确保在
autoDelete = "false"
:
出现在Queue中:当所有的消费者客户连接断开后,是否自动删除队列。
出现在Exchange中:当所有的绑定队列都不再使用时,是否自动删除交换器。
启动消费者和生产者服务,执行testSend1()方法:
现在停止消费者服务,结束在infoReceiver:hello mq 1 ...7
然后再重启消费者服务,控制台:
发现没有重新从infoReceiver:hello mq 1 ...1
开始输出,而是接着7从infoReceiver:hello mq 1 ...8
开始,这就是消息的持久化。
9 RabbitMQ ACK确认机制
RabbitMQ 的 Acknowledgment(简称 ack)机制是确保消息在消费者正确处理后才被确认的一种机制。它有助于提高消息传递的可靠性。在 RabbitMQ 中,有三种 Acknowledgment 模式:自动确认、手动确认(单条消息)、手动批量确认(foreach遍历)。
1. 自动确认模式(Automatic Acknowledgment)
在自动确认模式下,消息一旦被消费者接收,RabbitMQ 就会立即确认消息的接收。这种模式下,消费者无法明确知道消息是否被正确处理。
// 默认是自动确认模式
@RabbitListener(queues = "myQueue")
public void handleMessage(String message) {
// 处理消息的业务逻辑...
}
2. 手动确认模式(Manual Acknowledgment)
在手动确认模式下,消费者需要显式地告诉 RabbitMQ 是否成功处理了消息。如果消费者成功处理消息,则调用 channel.basicAck
进行确认;如果处理失败,则可以调用 channel.basicNack
或 channel.basicReject
进行拒绝。
@RabbitListener(queues = "myQueue")
public void handleMessage(Message message, Channel channel) throws IOException {
try {
// 处理消息的业务逻辑...
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 处理异常,可以选择拒绝消息或者进行其他处理
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
在手动确认模式下,消费者需要谨慎处理异常情况,以确保消息在处理失败时能够得到适当的处理。手动确认模式提供了更精细的控制,确保消息在被消费者正确处理后才被确认。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!