消息队列-RockMQ-过滤发送消息实践
2024-01-08 21:36:07
    		过滤发送消息实战
我们有这样一个需求,我们上面的消费组只想感知A类消息,下面的消费者组只想感知BC类的消息。我们的消息都是发送到同一个Topic的,这个时候怎么做一个筛选呢?RockMQ是通过TAG来进行筛选。这个时候就又有一个问题如果我们上面这个消费者组订阅了Tag1和Tag2的消息,下面订阅了Tag1和Tag3的消息。那么这个时候如果上面这个消费组的第二个消费者遇到了Tag1的消息岂不是就直接丢弃掉,同理下面也是,可能无形中丢失了一些消息。
 
 验证是否丢失消息
 我们启动一个生产者发送TAG1和TAG2的消息,然后启动两个消费者(属于同一个消费者组分别订阅TAG1和TAG2)来消费看消费情况:
public class Consumer6 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-lost");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("test-lost", "TAG1");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
public class Consumer5 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-lost");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("test-lost", "TAG2");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
我这里测试得到的结果是始终消费不到TAG2的消息,如果你先启动消费者TAG2也就消费不到TAG1的消息,产生了消息丢失。
为了避免这种情况,我们要保证一个消费者组订阅的都是同一个TAG:
 
 生产者1
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        // 生成3大类消息
        List<Order> F = OrderBuilder.build(1, 4, "TAG1", "A");
        List<Order> S = OrderBuilder.build(5, 4, "TAG2", "B" );
        List<Order> T = OrderBuilder.build(9, 4, "TAG3", "C" );
        ArrayList<Order> orders = new ArrayList<Order>() {{
            addAll(F);
            addAll(S);
            addAll(T);
        }};
        List<Message> msgs = new ArrayList<>();
        for (Order order : orders) {
            Message msg = new Message("filter-topic", "filter-topic-str", order.toString().getBytes());
            msg.setKeys("filter-topic-trace");
            msgs.add(msg);
        }
        producer.send(msgs);
    }
}
生产者2
public class Producer2 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        // 生成3大类消息
        List<Order> F = OrderBuilder.build(1, 4, "TAG1", "A");
        List<Order> S = OrderBuilder.build(5, 4, "TAG2", "B" );
        List<Order> T = OrderBuilder.build(9, 4, "TAG3", "C" );
        ArrayList<Order> orders = new ArrayList<Order>() {{
            addAll(F);
            addAll(S);
            addAll(T);
        }};
        List<Message> msgs = new ArrayList<>();
        for (Order order : orders) {
            Message msg = new Message("filter-topic", order.getTag(), order.toString().getBytes());
            msg.setKeys("filter-topic-trace");
            msg.putUserProperty("idx", String.valueOf(order.getOrderID()));
            msgs.add(msg);
        }
        producer.send(msgs);
    }
}
我们有这几种订阅方式
 第一种 全部订阅
public class Consumer4 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-all");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("filter-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
                System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
            }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
第二种Tag订阅
 订阅一个TAG
public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-tag1");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("filter-topic", "TAG1");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
我们可以通过||的方式订阅多个TAG
public class Consumer2 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-tag2-tag3");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("filter-topic", "TAG2 || TAG3");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
第三种SQL订阅
 这种方式是基于我们使用生产者2在发送消息的时候设置了属性,这样我们就可以在收到消息的时候使用SQL进行筛选。
public class Consumer3 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-sql");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("filter-topic", MessageSelector.bySql("idx > 10"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
                System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
            }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
    			文章来源:https://blog.csdn.net/qq_43259860/article/details/135466603
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
    	本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!