RocketMQ 的两种消息消费模式:Pull(拉取)和Push(推送)

2023-12-14 05:18:22

RocketMQ 支持两种消息消费模式:Pull(拉取)和Push(推送),它们之间有一些区别和联系。下面是它们的主要特点和比较:

Pull(拉取)模式:

  1. 主动权在消费者: 在Pull模式中,消费者主动向Broker请求消息,决定拉取的时机和拉取的数量。消费者有更大的控制权,可以按照自己的需求灵活地拉取消息。
  2. 消息拉取频率: 消费者可以根据实际情况决定拉取消息的频率,例如可以定时拉取,也可以根据业务负载动态调整拉取消息的速度。
  3. 流量控制: Pull模式下,消费者可以根据自身情况进行流量控制,避免瞬时大量消息涌入导致负载过重。
  4. 适用场景: Pull模式适用于需要更多消费者控制和自适应的场景,例如消费者需要根据实时负载情况自主调整拉取消息的速度。

Pull(拉取)模式示例代码:

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.message.MessageQueue;

public class PullConsumerDemo {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("your_consumer_group");
        consumer.setNamesrvAddr("your_namesrv_address");

        // 启动消费者
        consumer.start();

        // 订阅主题
        consumer.subscribe("YourTopic", "*");

        // 从指定队列拉取消息
        MessageQueue messageQueue = new MessageQueue("YourTopic", "your_broker_name", 0);
        PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, "*", getMessageQueueOffset(messageQueue), 32);

        // 处理拉取的消息
        if (pullResult.getPullStatus() == PullStatus.FOUND) {
            pullResult.getMsgFoundList().forEach(msg -> {
                // 处理消息
                System.out.println(new String(msg.getBody()));
            });
        }

        // 更新队列偏移量
        updateMessageQueueOffset(messageQueue, pullResult.getNextBeginOffset());

        // 关闭消费者
        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        // TODO: 从存储中获取当前队列的偏移量
        return 0;
    }

    private static void updateMessageQueueOffset(MessageQueue mq, long offset) {
        // TODO: 将当前队列的偏移量更新到存储中
    }
}

Push(推送)模式:

  1. 被动接收消息: 在Push模式中,消息是由Broker推送给消费者的,消费者不再需要主动去请求消息,而是等待消息到达。
  2. 实时性: Push模式下,消息可以实时被推送给消费者,适用于需要实时响应的场景。
  3. 消息推送速率: Push模式下,消息的推送速率由Broker控制,可能会受到一些限制。如果消息推送速率过快,消费者可能需要自行处理流控。
  4. 适用场景: Push模式适用于对实时性要求较高、消费者希望被动接收消息的场景。

Push(推送)模式示例代码:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class PushConsumerDemo {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group");
        consumer.setNamesrvAddr("your_namesrv_address");

        // 设置消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                // 处理消息
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 订阅主题和标签
        consumer.subscribe("YourTopic", "*");

        // 启动消费者
        consumer.start();

        // 阻塞主线程,保持消费者一直运行
        while (true) {
            // do nothing
        }
    }
}

联系:

  • 消费者驱动: 无论是Pull还是Push,消费者都是主动发起消费的一方。Pull模式中,消费者主动发起拉取消息的请求;Push模式中,虽然消息是被动推送给消费者,但消费者仍然需要主动处理接收到的消息。
  • 消费者订阅: 无论是Pull还是Push,消费者都需要订阅消息的主题(Topic)。
  • 消费者负载均衡: 在Pull模式中,如果有多个消费者订阅了同一个Topic,它们之间可能需要进行负载均衡,以确保每个消费者都能获取到一定比例的消息。在Push模式中,Broker通常会进行消息推送的负载均衡。


?

文章来源:https://blog.csdn.net/weixin_41860630/article/details/134985184
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。