RabbitMQ 消息应答与发布

2023-12-16 15:33:44

消息应答是 RabbitMQ 中的一项重要功能,它确保消息在被消费者处理后得到适当的确认。通过消息应答,消费者可以告知 RabbitMQ 它已经成功处理了一条或多条消息,并且可以安全地从消息队列中删除这些消息。这种机制保证了消息的可靠传递和处理,即使在消费者出现故障或网络问题的情况下也能确保消息不会丢失。本文将深入探讨 RabbitMQ 中的消息应答和发布的概念、原理和最佳实践。

1.消息应答机制

在 RabbitMQ 中,消息应答机制是指消费者在处理消息后向 RabbitMQ 发送确认消息,告知 RabbitMQ 该消息已被成功处理。这种机制可以确保消息在被消费者正确处理后才从队列中移除,以避免消息丢失或重复处理。

消息应答机制的主要角色包括:

  1. 消费者(Consumer):从队列中获取消息并进行处理的应用程序。
  2. RabbitMQ 服务端(Broker):负责管理消息队列和消息传递。
  3. 消息确认(Acknowledgement):消费者处理消息后发送给 RabbitMQ 的确认消息。

RabbitMQ 提供了两种类型的消息应答分类:自动应答、手动应答。此外,RabbitMQ 提供了两种类型的消息应答方式:单条消息确认(Single Message Acknowledgement)、批量消息确认(Batch Message Acknowledgement)

通过消息应答机制,RabbitMQ 可以根据消费者的确认消息来确定是否将消息从队列中移除。如果消费者未发送确认消息,RabbitMQ 将认为消息未被成功处理,然后将其重新发送给其他消费者或者保留在队列中等待处理。

消息应答机制是 RabbitMQ 中实现消息可靠性传递的重要机制之一,可以确保消息能够被正确处理,并且在处理失败时进行重试。

2.消息应答分类

2.1 自动应答

2.1.1 概念说明

概念:在RabbitMQ中,自动应答机制(Automatic Acknowledgement)是指消费者接收到消息后是否自动向RabbitMQ发送确认消息(acknowledgement)的一种行为。RabbitMQ是一个消息代理(message broker)系统,它负责接收、存储和转发消息。

当消费者从RabbitMQ中获取消息并进行处理时,消息会被标记为"未确认"状态。如果消费者在处理消息期间遇到错误或异常情况,它可以选择不发送确认消息,从而告知RabbitMQ该消息处理失败,并将其重新放回队列中以便重新投递给其他消费者。

然而,如果消费者成功处理消息并且希望告知RabbitMQ该消息已被处理,消费者可以发送确认消息。在自动应答模式下,一旦RabbitMQ将消息发送给消费者,它会立即将消息标记为"已确认",而不必等待消费者发送确认消息。

自动应答机制的优点是简单和方便,消费者无需显式发送确认消息,RabbitMQ会自动将消息标记为已确认。然而,这也意味着如果消费者在处理消息期间崩溃或断开连接,RabbitMQ将自动假定消息未被正确处理,并将其重新放回队列中。因此,自动应答机制在某些情况下可能会导致消息重复处理的问题。

注意

  • 自动应答模式比手动应答要快,但在高并发场景下容易出现数据丢失问题;
  • 自动应答模式容易导致生产的速度远远大于消费消息的速度,从而导入消息积压,最终使得内存耗尽导致数据丢失;
  • 自动应答模式适合在消费者高效处理消息并且对数据可靠性要求不高的场景下使用;
2.1.2 案例说明

下面模拟RabbitMQ中消息自动应答机制及消息自动应答机制可能导致消息丢失的场景:

新建生产者模拟生产100条消息并启动程序

public class Producer {
    public static final String QUEUE_NAME = "auto_ack_message";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        for (int i = 1; i <= 100; i++) {
            String message = "Message " + i;
            // 发布消息到队列
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("Sent message: " + message);
        }
    }
}

发现生产的100条消息已经就绪,如下

image-20231216134034742

新建消费者,使用 Thread.sleep(1000);模拟实际业务中处理逻辑,运行消费者

public class Consumer {
    public static final String QUEUE_NAME = "auto_ack_message";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 设置自动应答
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 模拟耗时操作
            try {
                System.out.println("Processing message: " + message);
                Thread.sleep(1000);
                System.out.println("Finished processing message: " + message);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, consumerTag -> {
        });
    }
}

正常情况下,消费者能消费完生产者生产的消息,但是如果消费者在消费的过程中宕机了,就会造成数据丢失的问题。

image-20231216134856379

2.2 手动应答

2.2.1 概念说明

在RabbitMQ中,手动应答机制(Manual Acknowledgement)是指消费者接收到消息后,需要显式地发送确认消息(acknowledgement)给RabbitMQ,以告知消息已经被成功处理。手动应答机制的引入主要是为了解决消息消费的可靠性问题。

手动应答机制需要消费者在处理消息完成后,主动发送确认消息给RabbitMQ,确认消息的发送可以基于以下两种方式之一:

1)手动确认单条消息:消费者在处理完一条消息后,发送确认消息给RabbitMQ,告知该消息已被成功处理。

**2)批量手动确认消息:**消费者在处理一批消息后,发送确认消息给RabbitMQ,告知这批消息已被成功处理。

2.2.2 案例说明

生产者生产100条消息

public class Producer {
    public static final String QUEUE_NAME = "manual_ack_message";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        for (int i = 1; i <= 100; i++) {
            String message = "Message " + i;
            // 发布消息到队列
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("Sent message: " + message);
        }
    }
}

消费者以单条消息确认的方式确认消息

public class Consumer {
    public static final String QUEUE_NAME = "manual_ack_message";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 设置手动应答
        channel.basicConsume(QUEUE_NAME, false, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 模拟耗时操作
            try {
                System.out.println("Processing message: " + message);
                Thread.sleep(1000);
                System.out.println("Finished processing message: " + message);
                //单条手动确认消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, consumerTag -> {
        });
    }
}

运行后查询RabbitMQ消息消费情况,如下图Unacked 是指未确认的消息总数,Unacked +acked = Total

image-20231216140845437

为了验证手动确认消息是否和自动确认消息一样可能造成消息丢失,在消费者还没确认完消息时,手动把消费者关闭,模拟消费者宕机的情况,发现未确认的消息被保存在队列中,并不会丢失数据

image-20231216141256055

3.两种消息应答方式

手动消息确认机制有两种消息应答的方式:单条消息确认、批量消息确认。以上代码案例中为单条消息确认,批量消息确认在单条消息确认基础上进行简单改造,如下

// 手动发送批量消息确认
if (delivery.getEnvelope().getDeliveryTag() % BATCH_SIZE == 0) {
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
}

BATCH_SIZE 是一批中消息的个数,当消息达成BATCH_SIZE时,就会触发手动确认消息的方法。

4.消息自动重新入队

从上述的案例中,我们知道如果消费者宕机,消息会重新保存到队列中,那么如果消费者是多个,重新入队的消息会被其他消费者消费吗?

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

手动应答案例

  • 默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答
  • 消费者启用两个线程,消费 1 一秒消费一个消息,消费者 2 十秒消费一个消息,然后在消费者 2 消费消息的时候,停止运行,这时正在消费的消息是否会重新进入队列,而后给消费者 1 消费呢?
public class SleepUtils {
    public static void sleep(int second){
        try {
            Thread.sleep(1000*second);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

消息生产者:

public class Task02 {

    //队列名称
    public static final String TASK_QUEUE_NAME = "ACK_QUEUE";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        //在控制台中输入信息
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入信息:");
        while (scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
        }

    }

}

消费者 1:

public class Work03 {

    //队列名称
    public static final String TASK_QUEUE_NAME = "ACK_QUEUE";

    //接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C1等待接受消息处理时间较短");

        DeliverCallback deliverCallback =(consumerTag, message) ->{

            //沉睡1S
            SleepUtils.sleep(1);
            System.out.println("接受到的消息:"+new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 1.消息的标记Tag
             * 2.是否批量应答 false表示不批量应答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        CancelCallback cancelCallback = (consumerTag -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");

        });
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

消费者 2:

将工作线程2睡眠时间改为 30 秒:

public class Work04 {

    //队列名称
    public static final String TASK_QUEUE_NAME = "ACK_QUEUE";

    //接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C2等待接受消息处理时间较长");

        DeliverCallback deliverCallback =(consumerTag, message) ->{
            //沉睡1S
            SleepUtils.sleep(30);
            System.out.println("接受到的消息:"+new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 1.消息的标记Tag
             * 2.是否批量应答 false表示不批量应答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        CancelCallback cancelCallback = (consumerTag -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");

        });
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

效果演示

  • 发送AA消息,C1工作线程经过1s后接收到消息;

image-20230710205101424

  • 发送BB消息后,C2工作线程经过30s后接收到消息。CC和DD消息同理;

image-20230710205230975

  • 手动应答不会丢失数据
    image-20230710205857030

文章来源:https://blog.csdn.net/qq_37606901/article/details/135032824
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。