RocketMq实战

2023-12-13 09:43:27

目录

Mq生产消费架构

生产者

发送消息固定步骤

发送模式

1. 单向发送

2. 同步发送

3. 异步发送

生产消息完整代码

消费者

消费消息固定步骤

简单消费代码示例

消费确认机制

消息模型

广播消息

顺序消息

延迟消息

批量消息

过滤消息

简单过滤

SQL过滤

消费者代码

事务消息

作用

代码示例

两阶段原理?

RocketMq集成SpringBoot


Mq生产消费架构

  1. 生产者设置nameServer地址, 往topic发信息
  2. mq服务启动topic时候,自动给每一个topic创建MessageQueue,默认1:2
  3. 默认Push模式推送消息, MessageQueue把消息推送给消费者, 一个MessageQueue只能绑定一个消费实例,一个消费实例能对应多个MessageQueue, 也是保证顺序消费的基础??

生产者

发送消息固定步骤

1.创建消息生产者producer,并指定生产者组名
2.指定Nameserver地址, 也可配置环境变量NAMESRV_ADDR
3.启动producer。 可以认为这是消息生产者与服务端建立连接的过程。
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer,释放资源

发送模式

1. 单向发送

        // 发送单向消息
        producer.sendOneway(buildOneMessage());

2. 同步发送

        // 同步发送消息
        SendResult sendResult = producer.send(buildOneMessage());
        System.out.println("同步发送消息结果:" + sendResult);

3. 异步发送

        // 异步发送消息, 发送完回调, 注意:回调完之前, producer不能关闭
        producer.send(buildOneMessage(), new SendCallback() {
            // 成功回调
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步发送消息结果:" + sendResult);
            }

            // broker处理失败回调, 超时回调, 进入onException也可能发送下游成功, 做好幂等性
            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步发送消息出现异常:" + throwable.getMessage());
            }
        });

生产消息完整代码

public class ProducerTest {


    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("192.168.6.128:9876");
        // 启动生产者
        producer.start();
        // 发送单向消息
        producer.sendOneway(buildOneMessage());
        System.out.println("单向发送消息, 不知道结果");
        // 同步发送消息
        SendResult sendResult = producer.send(buildOneMessage());
        System.out.println("同步发送消息结果:" + sendResult);
        // 异步发送消息, 发送完回调, 注意:回调完之前, producer不能关闭
        producer.send(buildOneMessage(), new SendCallback() {
            // 成功回调
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步发送消息结果:" + sendResult);
            }

            // broker处理失败回调, 超时回调, 进入onException也可能发送下游成功, 做好幂等性
            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步发送消息出现异常:" + throwable.getMessage());
            }
        });
//        producer.shutdown();
    }

    private static Message buildOneMessage() {
        return new Message("kk_test_topic", "tagA", ("hello MQ" + new Random().nextInt(100)).getBytes());
    }
}

消费者

消费消息固定步骤

1.创建消费者Consumer,必须指定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer。消费者会一直挂起,持续处理消息。

简单消费代码示例

public class SimpleConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("kk_consumer_group1");
        consumer.setNamesrvAddr("192.168.6.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe("kk_test_topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("收到消息:" + new String(messageExt.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }
}

消费确认机制

消息消费状态有以下两种:

  • CONSUME_SUCCESS: 消费成功
  • RECONSUME_LATER: 等会再来消费, 默认最多重试16次.

如果消费者没有处理成功,返回的是RECONSUME_LATER,Broker就会过一段时间再发起消息重试。

重试逻辑:

????????每一个消费者组都会自动建一个重试队列, 如果消费者返回RECONSUME_LATER状态, 就会把消息转发到重试Topic,这样设计就可以不阻塞原有的messageQueue.如果一个消息经过多次(默认16)重试还无法正常处理, 就会把消息转发到对应的死信Topic.这时候, 可以对死信Topic的消息进行人工处理, 消息补偿或者直接删除.

消息模型

广播消息

消费者端可以设置两种消息模式, 广播模式和集群模式(默认)

  • 集群模式: 同一个消费者群组下, 一条消息只能被该消费者群组下面其中一个消费者消费
  • 广播模式: 会忽略消费者群组, 一条消息会被所有消费者消费
消费模式集群模式广播模式
设置方式默认MessageModel.BROADCASTING
消费次数每个消费者群组只会有一个实例消费忽略群组概念,所有消费者实例都会消费
维护Offsetbroker消费者自己维护,中途宕机会丢失Offset

设置广播模式代码:

consumer.setMessageModel(MessageModel.BROADCASTING);

顺序消息

实现原理:?

  1. 生产者将一批消息发送到同一个messageQueue, 就能保证这批消息发送有序.
  2. 消费者一次锁定一个MessageQueue消息, 保证消费局部有序.

生产者代码:

    @Test
    public void sendOrderMessage() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("PG_kk_test_order");
        producer.setNamesrvAddr("192.168.6.128:9876");
        // 启动生产者
        producer.start();
        // 创建5个订单 
        for (int orderId = 0; orderId < 5; orderId++) {
            // 每个订单都要顺序经历0,1,2,3,4这几个步骤
            for (int i = 0; i < 5; i++) {
                Message message = new Message("MQ_kk_test_order", "tagA", ("开始下单, order = " + orderId + ", step =" + i)
                        .getBytes());
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> queues, Message message, Object o) {
                        // 将相同orderId的消息放入同一个MessageQueue
                        int orderId = (int) o;
                        return queues.get(orderId % queues.size());
                    }
                }, orderId);
                System.out.println(sendResult);
            }
        }
        producer.shutdown();
    }

消费者代码:

public class OrderConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CG_kk_test_order");
        consumer.setNamesrvAddr("192.168.6.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe("MQ_kk_test_order", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt messageExt : list) {
                    System.out.println(" 收到消息:" + new String(messageExt.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }
}

结果:

注意:

  1. 只能保证局部有序, 不能保证全局有序.如果要做全局有序, 那就只用一个MessageQueue,但是性能会下降
  2. 生产者应尽可能讲不同批次的有序消息打散到不同的MessageQueue, 避免queue排队竞争
  3. 消费者只能用同步的方式处理消息,不要使用异步处理。更不能自行使用批量处理
  4. 如果一条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进行重试。但是,如果消费者一直处理失败,超过最大重试次数,那么RocketMQ就会跳过这一条消息,处理后面的消息,这会造成消息乱序
  5. 消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT替代

延迟消息

?延迟消息是指消息发送到Apache RocketMQ后,并不会马上投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

生产者核心代码:

// 给消息设置延时级别
message.setDelayTimeLevel(5);

默认18个延时级别, 也可修改默认配置

实现原理:?

????????RocketMQ的实现方式是预设一个系统Topic,名字叫做SCHEDULE_TOPIC_XXXX。在这个Topic下,预设18个延迟queue,会把延时消息转发到SCHEDULE_TOPIC。然后每次只针对这18个队列里的消息进行延迟操作,这样就不用一直定时扫描所有TOPIC。

批量消息

将多条消息合并成一个批量消息,一次性发送出去。这样可以减少网络IO,提升消息发送的吞吐量

生产者代码:

    @Test
    public void sendBatchMessage() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("PG_kk_test_order");
        producer.setNamesrvAddr("192.168.6.128:9876");
        // 启动生产者
        producer.start();
        List<Message> messages = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            Message message = new Message("MQ_kk_test_order", "tagA", ("hello, MQ  i = " + i)
                    .getBytes());
            messages.add(message);
        }
        // 一次发送多条消息
        SendResult sendResult = producer.send(messages);
        System.out.println(sendResult);
        producer.shutdown();
    }

过滤消息

简单过滤

生产者发送消息带上TAG标识

        Message message = new Message("test_topic", "tagA", "hello".getBytes());
        SendResult sendResult = producer.send(message);

消费者订阅指定TAG消息

consumer.subscribe("test_topic", "tagA");

SQL过滤

首先去bin/broker.conf文件打开配置,

1. 修改选择消息模式为 SelectorType.SQL92,默认模式是SelectorType.TAG。

selectorType=SelectorType.SQL92

2. 设置允许属性过滤?

enablePropertyFilter=true

?不设置会报错

Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 1  DESC: The broker does not support consumer to filter message by SQL92
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.checkClientInBroker(MQClientAPIImpl.java:2315)
	at org.apache.rocketmq.client.impl.factory.MQClientInstance.checkClientInBroker(MQClientInstance.java:450)
	at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:665)
	at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:707)
	at com.kk.mq.demo.simple.SqlFilterConsumer.main(SqlFilterConsumer.java:32)

生产者代码

    @Test
    public void sendSqlMessage() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("PG_SqlFilterTest2");
        producer.setNamesrvAddr("192.168.6.128:9876");
        // 启动生产者
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC"};
        for (int i = 0; i < 15; i++) {
            Message msg = new Message("SqlFilterTest2",
                    tags[i % tags.length],
                    ("Hello RocketMQ " + i).getBytes()
            );
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }

消费者代码

public class SqlFilterConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CG_SqlFilterTest1");
        consumer.setNamesrvAddr("192.168.6.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe("test_sql_topic",
                MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA', 'tagB'))" +
                        "and (a is not null and a between 0 and 5)"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("收到消息:" + new String(messageExt.getBody()) + "\n" + messageExt.getProperties() + "\n");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }
}

事务消息

作用

????????和传统的分布式事务不同, 只保证生产者端事务一致, 消费者端自己保证

代码示例

1. 生产者发送消息代码

@Test
    public void sendTransactionMessage() throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.6.128:9876");
        producer.setSendMsgTimeout(8000);
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                // 依次发送10条消息, 每个Tag分别发两次
                Message msg =
                        new Message("TopicTest_Transaction", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

2. 定义生产者事务监听器

public class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        if(StringUtils.contains(tags,"TagA")){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if(StringUtils.contains(tags,"TagB")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String tags = msg.getTags();
        if(StringUtils.contains(tags,"TagC")){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if(StringUtils.contains(tags,"TagD")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
}

3. 消费者代码, 正常消费逻辑

public class SimpleConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CG_TopicTest_Transaction");
        consumer.setNamesrvAddr("192.168.6.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("TopicTest_Transaction", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
//                    System.out.println("收到消息:" + new String(messageExt.getBody()));
                    System.out.println(new Date() + " 收到消息:" + messageExt);

                }
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }
}

消费者运行结果:

可以看到消费者先收到两个TagA的消息, 过一分钟后收到两个TagC消息.类似于两阶段提交.

两阶段原理?

  1. 第一阶段, 生产者消息发送完, 先回调executeLocalTransaction方法, 检查状态
    1. 如果状态是COMMIT, 消息直接推送给消费者
    2. 如果状态是ROLLBACK, 消息丢弃, 生产者端回滚事务
    3. 如果状态是UNKNOW, 过一分钟(可配置)再来检查状态
  2. 如果1阶段返回是UNKNOW, 进入第二阶段, 调checkLocalTransaction方法,再次检查状态
    1. 如果状态是COMMIT, 消息直接推送给消费者
    2. 如果状态是ROLLBACK, 消息丢弃, 生产者端回滚事务
    3. 如果状态是UNKNOW, 过一分钟(可配置)再来检查状态, 递归调自己.最大重试15次(可配置)

broker配置项,配置间隔时间和重试次数, 应基于自身业务, 配置是broker级别.
?

# 最大重试次数
transactionCheckMax=15
# 重试间隔, 60秒
transactionCheckInterval=60000

RocketMq集成SpringBoot

RocketMq集成SpringBoot-CSDN博客

文章来源:https://blog.csdn.net/weixin_64027360/article/details/134843860
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。