RocketMq集成SpringBoot(待完善)

2023-12-13 04:15:07

环境

jdk1.8, springboot2.7.3

Maven依赖

        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.7.3</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>        

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.5</version>
        </dependency>

配置文件

rocketmq.name-server=192.168.6.128:9876
#生产通用群组, 也可单独指定
rocketmq.producer.group=springBootGroup
#消费通用群组, 也可单独指定
rocketmq.consumer.group=testGroup
server.port=9000

代码

生产者发送消息

@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Autowired
    private ProducerService producerService;

    // 发送同步消息
    @PostMapping("/sendSync")
    public Object sendSync(@RequestBody MessageReq req) {
        return producerService.sendSyncMessage(req.getTopic(), req.getTag(), req.getMessage());
    }

    // 发送异步消息
    @PostMapping("/sendAsync")
    public Object sendAsyncMessage(@RequestBody MessageReq req) {
        producerService.sendAsyncMessage(req.getTopic(), req.getTag(), req.getMessage());
        return "200";
    }
}
@Service
public class ProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送同步消息.
     * @return 发送结果
     */
    public SendResult sendSyncMessage(String topic, String tag, String message) {
        // param1: topic和tag冒号分隔
        return rocketMQTemplate.syncSend(topic + ":" + tag, message);
    }

    /**
     * 发送异步消息.
     */
    public void sendAsyncMessage(String topic, String tag, String message) {
        rocketMQTemplate.convertAndSend(topic + ":" + tag, message);
    }
}

消费者

@Component
@RocketMQMessageListener(
        consumerGroup = "SimpleStringConsumerGroup",  // consumerGroup:消费者组名
        topic = "MQ_sp_test1",                         // topic:订阅的主题
        selectorExpression = "Tag-kk||Tag-kk2",         // selectorExpression, 1. 根据Tag过滤, 多个用||分割, 也可设置*; 2. 根据SQL92语法过滤
//        selectorExpression = "*",
//        selectorType = SelectorType.SQL92,             // 设置SQL92语法过滤, 不设置默认TAG
        messageModel = MessageModel.CLUSTERING,  // messageModel: 控制消息模式。MessageModel.CLUSTERING:负载均衡;MessageModel.BROADCASTING:广播模式
        consumeMode= ConsumeMode.CONCURRENTLY    // CONCURRENTLY: 无序消费; ORDERLY: 有序消费
)
public class SimpleConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message : "+ message);
    }
}

测试

同步消息

异步消息?

TAG过滤消息

1. 消费者指定了TAG, 不满足的不会消费, 状态是CONSUMED_BUT_FILTERED

消费端接收消息?

文章来源:https://blog.csdn.net/weixin_64027360/article/details/134894334
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。