基于Rocket MQ扩展的无限延迟消息队列
2023-12-23 06:13:01
基于Rocket MQ扩展的无限延迟消息队列
背景:
- Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的延迟消息无法实现该功能, 所以对方案进行了改造.
实现原理:
-
简单而言, 就是在Rocket MQ延迟队列固定时间间隔的基础上, 通过多次发送延迟消息, 达到任意延时时间组合计算. 通过反射的方式, 实现延迟业务逻辑的调用.
-
源码如下:
-
/* * Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved. */ package com.example.xxx.utils; import com.vevor.bmp.crm.common.constants.MQConstants; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.Serializable; import java.util.Calendar; import java.util.Date; import java.util.concurrent.TimeUnit; /** * @version :1.8.0 * @description :基于Rocket MQ的任意延迟时长工具 * @program :user-growth * @date :Created in 2023/5/22 3:35 下午 * @since :1.8.0 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP, topic = MQConstants.CRM_DELAY_QUEUE_TOPIC, // 消息消费顺序 consumeMode = ConsumeMode.CONCURRENTLY, // 最大消息重复消费次数 maxReconsumeTimes = 3) public class RocketMQDelayQueueUtils implements RocketMQListener<RocketMQDelayQueueUtils.DelayTable<Object>> { /** * Rocket MQ客户端 */ @Resource private RocketMQTemplate rocketMQTemplate; /** * MQ默认延迟等级 */ private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L, 30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L, 480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L}; @SneakyThrows @Override public void onMessage(DelayTable<Object> message) { Date endTime = message.getEndTime(); int delayLevel = getDelayLevel(endTime); // 继续延迟 if (delayLevel != 0) { int currentDelayCount = message.getCurrentDelayCount(); currentDelayCount++; message.setCurrentDelayCount(currentDelayCount); message.setCurrentDelayLevel(delayLevel); message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]); this.sendDelayMessage(message); return; } // 执行业务 log.info("delay message end! start to process business..."); Class<? extends DelayMessageHandler> messageHandler = message.getMessageHandler(); if (messageHandler != null) { DelayMessageHandler delayMessageHandler = messageHandler.newInstance(); delayMessageHandler.handle(); } } /** * 延迟消息体 * * @param <E> 消息类型 */ @Data public static class DelayTable<E> implements Serializable { private static final long serialVersionUID = 2405172041950251807L; /** * 延迟消息体 */ private E content; /** * 消息延迟结束时间 */ private Date endTime; /** * 总延迟毫秒数 */ private long totalDelayTime; /** * 总延迟时间单位 */ private TimeUnit totalDelayTimeUnit; /** * 当前延迟次数 */ private int currentDelayCount; /** * 当前延迟等级 */ private int currentDelayLevel; /** * 当前延迟毫秒数 */ private long currentDelayMillis; /** * 延迟处理逻辑 */ private Class<? extends DelayMessageHandler> messageHandler; } /** * 发送延迟消息 * * @param message 消息体 * @param delay 延迟时长 * @param timeUnit 延迟时间单位 * @param handler 延迟时间到了之后,需要处理的逻辑 * @param <E> 延迟消息类型 */ public <E> void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) { // 把延迟时间转换成时间戳(毫秒) long totalDelayMills = timeUnit.toMillis(delay); // 根据延迟时间计算结束时间 Calendar instance = Calendar.getInstance(); instance.add(Calendar.MILLISECOND, (int)totalDelayMills); Date endTime = instance.getTime(); // 根据延迟时间匹配延迟等级(delay level) int delayLevel = getDelayLevel(endTime); long delayMillis = TIME_DELAY_LEVEL[delayLevel]; // 发送消息 DelayTable<E> delayTable = new DelayTable<>(); // 全局数据 delayTable.setContent(message); delayTable.setMessageHandler(handler); delayTable.setEndTime(endTime); delayTable.setTotalDelayTime(delay); delayTable.setTotalDelayTimeUnit(timeUnit); // 当前延迟等级数据 delayTable.setCurrentDelayCount(1); delayTable.setCurrentDelayLevel(delayLevel); delayTable.setCurrentDelayMillis(delayMillis); this.sendDelayMessage(delayTable); } /** * 计算延迟等级 * * @param targetTime 延迟截止时间 * @return Rocket MQ延迟消息等级 */ private static int getDelayLevel(Date targetTime) { long currentTime = System.currentTimeMillis(); long delayMillis = targetTime.getTime() - currentTime; if (delayMillis <= 0) { // 不延迟,即延迟等级为 0 return 0; } // 判断处于哪个延迟等级 // 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h for (int i = 1; i <= 18; i++) { long delayLevelTime = TIME_DELAY_LEVEL[i]; if (delayMillis < delayLevelTime) { return i - 1; } else if (delayMillis == delayLevelTime) { return i; } } // 最大延迟等级为 18 return 18; } /** * 发送延迟消息 * * @param delayTable 延迟对象,可以循环使用 */ @SneakyThrows private <E> void sendDelayMessage(DelayTable<E> delayTable) { // 消息序列化 Message<DelayTable<E>> message = MessageBuilder .withPayload(delayTable) .build(); // 设置\发送延迟消息 int delayLevel = delayTable.getCurrentDelayLevel(); rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message , 3000, delayLevel); log.debug("delay count: {}, delay level: {}, time: {} milliseconds", delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]); } /** * 延迟回调接口 * * 回调逻辑必须实现该接口#hander()方法,在延迟结束后,会通过反射的方式调用该方法 */ public interface DelayMessageHandler extends Serializable { long serialVersionUID = 2405172041950251807L; /** * 回调函数 */ void handle(); } }
测试代码:
-
/* * Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved. */ package com.vevor.bmp.crm.io.controller; import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils; import com.vevor.common.pojo.vo.ResponseResult; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; /** * @version :1.8.0 * @description :延迟队列测试 * @program :user-growth * @date :Created in 2023/5/22 4:54 下午 * @since :1.8.0 */ @Slf4j @RestController public class DelayQueueController { @Resource private RocketMQDelayQueueUtils rocketMQDelayQueueUtils; @GetMapping("/mq/delay") @SneakyThrows public ResponseResult<String> mqDelay(@RequestParam Integer delay, @RequestParam String task) { // 获取延时队列 rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class); return ResponseResult.success(); } /** * @version : * @description : * @program :user-growth * @date :Created in 2023/5/23 2:11 下午 * @since : */ @Data public static class CallBack implements RocketMQDelayQueueUtils.DelayMessageHandler { /** * 回调函数 */ @Override public void handle() { log.info("i am business logical! {}", System.currentTimeMillis()); } } }
优缺点:
- 优点: 与定时任务框架相比, 通过延迟消息的方式具实时性高、 支持分布式、轻量级、高并发等优点.
- 缺点: 消息的准确性不可靠, 正常情况下准确性在秒级, 但是当MQ服务出现消息堆积时, 消息的时间就会偏差较大, 所以准确性依赖MQ服务的稳定.
文章来源:https://blog.csdn.net/Andrew_Chenwq/article/details/135045877
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!