RocketMQ 的两种消息消费模式:Pull(拉取)和Push(推送)
2023-12-14 05:18:22
RocketMQ 支持两种消息消费模式:Pull(拉取)和Push(推送),它们之间有一些区别和联系。下面是它们的主要特点和比较:
Pull(拉取)模式:
- 主动权在消费者: 在Pull模式中,消费者主动向Broker请求消息,决定拉取的时机和拉取的数量。消费者有更大的控制权,可以按照自己的需求灵活地拉取消息。
- 消息拉取频率: 消费者可以根据实际情况决定拉取消息的频率,例如可以定时拉取,也可以根据业务负载动态调整拉取消息的速度。
- 流量控制: Pull模式下,消费者可以根据自身情况进行流量控制,避免瞬时大量消息涌入导致负载过重。
- 适用场景: Pull模式适用于需要更多消费者控制和自适应的场景,例如消费者需要根据实时负载情况自主调整拉取消息的速度。
Pull(拉取)模式示例代码:
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.message.MessageQueue;
public class PullConsumerDemo {
public static void main(String[] args) throws MQClientException {
// 创建消费者
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("your_consumer_group");
consumer.setNamesrvAddr("your_namesrv_address");
// 启动消费者
consumer.start();
// 订阅主题
consumer.subscribe("YourTopic", "*");
// 从指定队列拉取消息
MessageQueue messageQueue = new MessageQueue("YourTopic", "your_broker_name", 0);
PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, "*", getMessageQueueOffset(messageQueue), 32);
// 处理拉取的消息
if (pullResult.getPullStatus() == PullStatus.FOUND) {
pullResult.getMsgFoundList().forEach(msg -> {
// 处理消息
System.out.println(new String(msg.getBody()));
});
}
// 更新队列偏移量
updateMessageQueueOffset(messageQueue, pullResult.getNextBeginOffset());
// 关闭消费者
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
// TODO: 从存储中获取当前队列的偏移量
return 0;
}
private static void updateMessageQueueOffset(MessageQueue mq, long offset) {
// TODO: 将当前队列的偏移量更新到存储中
}
}
Push(推送)模式:
- 被动接收消息: 在Push模式中,消息是由Broker推送给消费者的,消费者不再需要主动去请求消息,而是等待消息到达。
- 实时性: Push模式下,消息可以实时被推送给消费者,适用于需要实时响应的场景。
- 消息推送速率: Push模式下,消息的推送速率由Broker控制,可能会受到一些限制。如果消息推送速率过快,消费者可能需要自行处理流控。
- 适用场景: Push模式适用于对实时性要求较高、消费者希望被动接收消息的场景。
Push(推送)模式示例代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class PushConsumerDemo {
public static void main(String[] args) throws MQClientException {
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group");
consumer.setNamesrvAddr("your_namesrv_address");
// 设置消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
// 处理消息
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 订阅主题和标签
consumer.subscribe("YourTopic", "*");
// 启动消费者
consumer.start();
// 阻塞主线程,保持消费者一直运行
while (true) {
// do nothing
}
}
}
联系:
- 消费者驱动: 无论是Pull还是Push,消费者都是主动发起消费的一方。Pull模式中,消费者主动发起拉取消息的请求;Push模式中,虽然消息是被动推送给消费者,但消费者仍然需要主动处理接收到的消息。
- 消费者订阅: 无论是Pull还是Push,消费者都需要订阅消息的主题(Topic)。
- 消费者负载均衡: 在Pull模式中,如果有多个消费者订阅了同一个Topic,它们之间可能需要进行负载均衡,以确保每个消费者都能获取到一定比例的消息。在Push模式中,Broker通常会进行消息推送的负载均衡。
?
文章来源:https://blog.csdn.net/weixin_41860630/article/details/134985184
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!