SpringBoot整合rocketmq

2023-12-20 00:45:45

1.创建生产者应用,名称为springboot-producer

2.在pom.xml加入依赖

<!-- RocketMQ消息中间件 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

3.在application.yml加入下面配置:

#RocketMQ配置
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: rocketmq-demo
    send-message-timeout: 3000
    retry-times-when-send-failed: 3
    retry-times-when-send-async-failed: 3

4.创建RocketMQ配置信息类

@Configuration
@ConditionalOnProperty(prefix = "rocketmq", name = "name-server")
public class RocketMQConfig {
	
	@Bean
	@Primary
    public RocketMQMessageSender rocketMQMessageSender() {
        return new RocketMQMessageSender();
    }

}

5.创建基于RocketMQ消息生产者实现类

@Slf4j
public class RocketMQMessageSender implements MqMessageSender, SmartInitializingSingleton {
	
	/**
	 * 预设值的延迟时间间隔为:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h
	 */
	private static final Map<Integer, Integer> delayLevelMap = new LinkedHashMap<>(); 

	@Autowired
    private RocketMQTemplate rocketMQTemplate;

	@Override
	public void syncSend(String topic, Serializable message) {
		SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
		log.info("【MQ同步消息】发送结果: msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(), 
				sendResult.getSendStatus(), JSON.toJSONString(message));
	}

	@Override
	public void syncSend(String topic, Serializable payload, int delayTime) {
		Integer delayLevel = delayLevelMap.get(delayTime);
		if (delayLevel == null) {
			throw new MessagingException("延时参数delayTime错误!");
		}
		
		Message<?> message = MessageBuilder.withPayload(payload).build();
		SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 2000, delayLevel);
		log.info("【MQ同步延迟消息】发送结果: msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(), 
				sendResult.getSendStatus(), JSON.toJSONString(payload));
	}
	
	@Override
	public void syncSend(String topic, List<?> messages) {
		List<Message<?>> messageList = new ArrayList<>();
		for (int i = 0; i < messages.size(); i++) {
			messageList.add(MessageBuilder.withPayload(messages.get(i))
				.setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
		}

		SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);
		log.info("【MQ同步批量消息】发送结果: msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(), 
				sendResult.getSendStatus(), JSON.toJSONString(messages));
	}

	@Override
	public void asyncSend(String topic, Serializable message) {
		rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
			@Override
			public void onSuccess(SendResult sendResult) {
				log.info("【MQ异步消息】发送成功, msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(), 
						sendResult.getSendStatus(), JSON.toJSONString(message));
			}

			@Override
			public void onException(Throwable e) {
				log.error("【MQ异步消息】发送失败, 消息内容:{}, 原因: {}", JSON.toJSONString(message), e.getMessage(), e);
			}
		});
	}
	
	@Override
	public void asyncSend(String topic, Serializable payload, int delayTime) {
		Integer delayLevel = delayLevelMap.get(delayTime);
		if (delayLevel == null) {
			throw new MessagingException("延时参数delayTime错误!");
		}
		
		Message<?> message = MessageBuilder.withPayload(payload).build();
		rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
			@Override
			public void onSuccess(SendResult sendResult) {
				log.info("【MQ异步延迟消息】发送成功, msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(), 
						sendResult.getSendStatus(), JSON.toJSONString(payload));
			}

			@Override
			public void onException(Throwable e) {
				log.error("【MQ异步延迟消息】发送失败, 消息内容:{}, 原因: {}", JSON.toJSONString(payload), e.getMessage(), e);
			}
		}, 		
		2000, delayLevel);
	}
	
	@Override
	public void asyncSend(String topic, List<?> messages) {
		List<Message<?>> messageList = new ArrayList<>();
		for (int i = 0; i < messages.size(); i++) {
			messageList.add(MessageBuilder.withPayload(messages.get(i))
				.setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
		}
		
		rocketMQTemplate.asyncSend(topic, messageList, new SendCallback() {
			@Override
			public void onSuccess(SendResult sendResult) {
				log.info("【MQ异步批量消息】发送成功, msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(), 
						sendResult.getSendStatus(), JSON.toJSONString(messages));
			}

			@Override
			public void onException(Throwable e) {
				log.error("【MQ异步批量消息】发送失败, 消息内容:{}, 原因: {}", JSON.toJSONString(messages), e.getMessage(), e);
			}
		});
	}
	
	@Override
	public void sendMessageInTransaction(String topic, Serializable payload) {
		//String transactionID = TransactionUtils.getTransactionID();
		String transactionID = "1";
		Message<?> message = MessageBuilder.withPayload(payload)
				.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionID)
				.build();
		SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, message, payload);
		log.info("【MQ事务消息】发送结果: msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(), 
				sendResult.getSendStatus(), JSON.toJSONString(payload));
	}

	@Override
	public void afterSingletonsInstantiated() {
		// 预设值的延迟时间间隔为:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h
		delayLevelMap.put(1, 1); // 延迟1s
		delayLevelMap.put(5, 2); // 延迟5s
		delayLevelMap.put(10, 3); // 延迟10s
		delayLevelMap.put(30, 4); // 延迟30s
		delayLevelMap.put(1*60, 5); // 延迟1m
		delayLevelMap.put(2*60, 6); // 延迟2m
		delayLevelMap.put(3*60, 7); // 延迟3m
		delayLevelMap.put(4*60, 8); // 延迟4m
		delayLevelMap.put(5*60, 9); // 延迟5m
		delayLevelMap.put(6*60, 10); // 延迟6m
		delayLevelMap.put(7*60, 11); // 延迟7m
		delayLevelMap.put(8*60, 12); // 延迟8m
		delayLevelMap.put(9*60, 13); // 延迟9m
		delayLevelMap.put(10*60, 14); // 延迟10m
		delayLevelMap.put(20*60, 15); // 延迟20m
		delayLevelMap.put(30*60, 16); // 延迟30m
		delayLevelMap.put(1*60*60, 17); // 延迟1h
		delayLevelMap.put(2*60*60, 18); // 延迟2h
	}

}

6.创建消息实体类GoodsStockMQ

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class GoodsStockMQ  {

	private static final long serialVersionUID = 1342006257783472024L;
	
	public static final String DEDUCT = "deduct";
	public static final String INCREASE = "increase";
	
	/**
	 * 商品id
	 */
	private Long productId;
	
	/**
	 * 商品库存量
	 */
	private Long num;
	


}

7.创建测试控制器RedisAndMysqlDataConsistenceController

@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("/consistence")
public class RedisAndMysqlDataConsistenceController {

    private final MqMessageSender mqMessageSender;

    /**
     *
     *
     * @return
     */
    @GetMapping("testRedisAndMysqlData")
    public String testRedisAndMysqlData(){
        GoodsStockMQ message=GoodsStockMQ.builder()
                .productId(1L)
                .num(2L)
                .build();
        log.info("生产端开始发送消息={}",message.toString());
        mqMessageSender.syncSend(MessageConstants.GOODS_STOCK_TOPIC,message);
        return "success";
    }
}

启动应用后,使用postMan访问地址:http://localhost:9084/consistence/testRedisAndMysqlData

看到应用控制台输出以下信息:

8.创建消费者应用springboot-consumer,以上步骤相同的自动忽略

9.创建RocketMQ消息统一处理器类

public abstract class RocketMQMessageHandler<T> implements MqMessageHandler<T> {

}

10.创建异步消息处理类

@Slf4j
@Component
@RocketMQMessageListener(topic = MessageConstants.GOODS_STOCK_TOPIC,
        consumerGroup = MessageConstants.GOODS_STOCK_CONSUMER_GROUP)
public class GoodsStockMessageHandler extends RocketMQMessageHandler<GoodsStockMQ>
        implements RocketMQListener<GoodsStockMQ> {

    @Override
    public void onMessage(GoodsStockMQ message) {
        handleMessage(message);
    }

    @Override
    public void handleMessage(GoodsStockMQ message) {
        log.info("【消费端异步处理消息】, 请求报文:{}", JSON.toJSONString(message));
        long start = System.currentTimeMillis();
        try {
            Long productId = message.getProductId();
            Long num = message.getNum();
            log.info("productId={}",productId);
            log.info("num={}",num);
        } catch (Exception e) {
            log.error("处理【扣减商品库存任务】出现异常, 原因:{}", e.getMessage(), e);

        } finally {
            long end = System.currentTimeMillis();
            log.info("完成【扣减商品库存任务】, 耗时(ms):{}", end - start);
        }
    }
}

11.启动应用后,控制台立马输出以下信息:

文章来源:https://blog.csdn.net/xiao297328/article/details/135089162
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。