RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及事务消息收发、最大重试消费实战

2023-12-18 11:31:16

事务消息收发

流程如下:

  1. 发送给 MQ 一条任务操作
  2. MQ 的 Broker 成功收到后,那么发送方就开始执行原子 db 业务
  3. 如果执行原子 db 业务失败,并没有将执行成功状态同步给 Broker
  4. 那么 Broker 会去检查 db 事务是否成功,最后要么事务提交,可以被生产者消费,要么事务回滚,生产者无法消费

事务消息收发流程图如下:

在这里插入图片描述

事务消息收发消费者如下:

public class TransactionConsumer {

    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionConsumer");

        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 3、订阅主题
        consumer.subscribe("Transaction-Test-Topic", "*");

        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 5、把消费者直接启动起来
        consumer.start();
        System.out.println("Consumer Started Finished.");
    }
}

这里模拟事务成功执行的生产者,执行该生产者之后,消费者可以收到消息并消费:

public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer(
                "transaction_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                /**
                 * 这里执行本地事务,如果本地事务执行成功,就返回成功
                 * 如果本地事务失败,就返回失败
                 */
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 触发事务的检查,提供给到生产者一个检查事务是否成功提交的机会
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        List<Order> list = new ArrayList<>();
        for (int i = 0; i < 12; i ++) {
            Order order = new Order();
            order.orderId = i;
            order.desc = "desc:" + i;
            order.tag = "tag" + i % 3;
            list.add(order);
        }
        for (Order order : list) {
            Message msg = new Message(
                    "Transaction-Test-Topic",
                    order.tag,
                    (order.toString()).getBytes());
            msg.setKeys("Transaction_Tag");
            msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));

            // 直接将 msg 发送出去
            producer.sendMessageInTransaction(msg, null);
        }
        System.out.println("Send Finished.");
    }

    public static class Order {
        int orderId;
        String desc;
        String tag;

        @Override
        public String toString() {
            return "orderId="+orderId+", desc="+desc+", tag="+tag;
        }
    }
}

这里模拟事务执行失败的生产者,执行该生产者之后,消费者不会收到消息:

public class TransactionProducerFail {

    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer(
                "transaction_producer_group_fail");
        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                /**
                 * 这里执行本地事务,如果本地事务执行成功,就返回成功
                 * 如果本地事务失败,就返回失败
                 */
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 触发事务的检查,提供给到生产者一个检查事务是否成功提交的机会
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });

        producer.start();

        List<TransactionProducer.Order> list = new ArrayList<>();
        for (int i = 0; i < 12; i ++) {
            TransactionProducer.Order order = new TransactionProducer.Order();
            order.orderId = i;
            order.desc = "desc:" + i;
            order.tag = "tag" + i % 3;
            list.add(order);
        }
        for (TransactionProducer.Order order : list) {
            Message msg = new Message(
                    "Transaction-Test-Topic",
                    order.tag,
                    (order.toString()).getBytes());
            msg.setKeys("Transaction_Tag");
            msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));

            // 直接将 msg 发送出去
            producer.sendMessageInTransaction(msg, null);
        }
        System.out.println("Send Finished.");
    }

    public static class Order {
        int orderId;
        String desc;
        String tag;

        @Override
        public String toString() {
            return "orderId="+orderId+", desc="+desc+", tag="+tag;
        }
    }
}

最大重试消费

重试分为两种:生产者重试、消费者重试

生产者重试设置:

  • 生产者配置重试次数

    // 同步
    producer.setRetryTimesWhenSendFailed(3)
    // 异步
    producer.setRetryTimesWhenSendAsyncFailed(3);
    // 如果发送失败,是否尝试发送到其他 Broker 节点
    producer.setRetryAnotherBrokerWhenNotStoreOK(true);
    
    
  • 生产者设置重试的策略

    producer.addRetryResponseCode(ResponseCode.FLUSH_DISK_TIMEOUT);
    

    ?

消费者重试设置:

  • 消费者有序消费时,如果消费失败,返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 即可
  • 消费者并发消费时,如果消费失败,返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可

生产者代码如下(消费者代码就不贴了,只需要消费时返回需要重试的状态码即可):

public class RetryProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(
                "producer_group",
                true);
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 设置一些重试的策略
        producer.addRetryResponseCode(ResponseCode.FLUSH_DISK_TIMEOUT);
        // 设置发送失败最大重试次数
        producer.setRetryTimesWhenSendFailed(3);
        producer.setRetryTimesWhenSendAsyncFailed(3);
        producer.start();

        List<Order> list = new ArrayList<>();
        for (int i = 0; i < 12; i ++) {
            Order order = new Order();
            order.orderId = i;
            order.desc = "desc:" + i;
            order.tag = "tag" + i % 3;
            list.add(order);
        }
        for (Order order : list) {
            Message msg = new Message(
                    "Filter-Test-Topic",
                    order.tag,
                    (order.toString()).getBytes());
            msg.setKeys("Filter_Tag");
            msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));

            // 直接将 msg 发送出去
            producer.send(msg);
        }
        System.out.println("Send Finished.");
    }

    public static class Order {
        int orderId;
        String desc;
        String tag;

        @Override
        public String toString() {
            return "orderId="+orderId+", desc="+desc+", tag="+tag;
        }
    }
}

文章来源:https://blog.csdn.net/qq_45260619/article/details/135052598
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。