RabbitMq异步请求+Redis轮询解决响应时间过长

2024-01-02 21:26:00

RabbitMq异步请求+Redis轮询解决响应时间过长

如果还没有学习Rabbit的建议去学一下我的另一篇RabbitMq的使用再来看这个实际的使用`

**当前问题:**处理时间超时导致前端页面响应超时是一个比较常见的问题。这可能由于后端执行任务时间过长、网络延迟、资源不足等原因引起。

解决的思路有:

解决方法步骤缺点
优化后端代码检查后端代码,看是否有可以优化的地方。可能存在一些复杂、低效或者重复的操作,通过优化这些代码可以提升后端执行效率。优化的效果不明显,当数据包的处理时间太长的时候处理时间是无法缩短的
设置合理的超时时间在前端与后端的通信中,设置合理的超时时间。如果后端处理任务时间较长,适当增加前端请求的超时时间,但也要避免设置过长的超时时间。前端响应时间太久,造成前端页面的假死
优化数据库的查询如果后端涉及数据库查询,确保数据库表的索引设置得当,优化查询语句,以提高查询效率。能解决查询的问题,但是数据处理的时间是解决不了的
异步处理+轮询查询如果后端执行的任务比较耗时,考虑使用异步处理方式。将一些耗时的任务交给异步任务队列或线程池来处理,从而不阻塞主线程。并且添加一个轮询查询的接口,轮询查询处理的情况。

逻辑流程图

普通处理逻辑

image-20231227223626458

当我们的处理的耗时非常久的时候,前端一直在等待加载,会造成假死的状态,并且有可能会断开连接。
在这里插入图片描述

异步处理+轮询

当异步处理的时候直接将消息丢入到消息队列中,挨个的由消费者进行处理。处理的时候使用Redis根据时间戳来记录每一个处理的状态,最后再用一个轮询的接口进行查询。


准备工作

导入依赖

<!-- Spring Boot 项目中更方便地集成 AMQP 的一种方式-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 普通的spring项目的mq客户端-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>
<!--开启web客户端,方便进行调用演示-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<!--        redis用来做轮询查询接口-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Yaml配置

spring:
  rabbitmq:
    addresses: 192.168.1.110
    port: 5672
    username: rabbit
    password: rabbit
    virtual-host: /test
#开启消息回执
    publisher-confirm-type: correlated
    publisher-returns: true
    # 设置消息的确认模式,默认的模式为auto自动确认,manual手动确认,所以需要channel去手动确认
    listener:
      simple:
        acknowledge-mode: manual

  redis:
    #Redis服务器地址
    host: localhost
    #端口
    port: 6379
    #使用几号数据库
    database: 0

配置Mq和Redis

Fanout交换机和消息队列

@Configuration
public class RabbitMqConfig {
    // 配置交换机
    @Bean
    public FanoutExchange buildFanoutExchange() {
        return ExchangeBuilder
                .fanoutExchange("fanoutExchange")
                .build();
    }
    // 配置消息队列
    @Bean
    public Queue buildFanoutQueue() {
        return QueueBuilder
                .durable("fanoutQueue")
                .build();
    }
    // 绑定交换机和消息队列
    @Bean
    public Binding buildFanoutBinding() {
        return BindingBuilder
                .bind(buildFanoutQueue())
                .to(buildFanoutExchange());
    }
}

这里我使用的是fanout(扇出)型交换机,这时一种广播类型,消息会被广播到所有与此交换机绑定的消息队列中。使用此类型

死信队列

消息队列中的数据,如果迟迟没有消费者来处理,那么就会一直占用消息队列的空间。比如我们模拟一下抢车票的场景,用户下单高铁票之后,会进行抢座,然后再进行付款,但是如果用户下单之后并没有及时的付款,这张票不可能一直让这个用户占用着,因为你不买别人还要买呢,所以会在一段时间后超时,让这张票可以继续被其他人购买。

这时,我们就可以使用死信队列,将那些用户超时未付款的或是用户主动取消的订单,进行进一步的处理,以下类型的消息都会被判定为死信:

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

在这里插入图片描述

@Configuration
public class DeadMqConfig {
    // 死信交换机
    @Bean
    public Exchange dmExchange() {
        return ExchangeBuilder
                .fanoutExchange("dm.direct")
                .build();
    }
    // 死信队列
    @Bean
    public Queue dmQueue() {
        return QueueBuilder
                .nonDurable("dm-queue") // 消息队列的名称
                .build();
    }
    @Bean
    public Binding dmBinding() {
        return BindingBuilder
                .bind(dmQueue())
                .to(dmExchange())
                .with("dm-routing") // 交换机和消息队列的路由对应关系
                .noargs();
    }

}

redis配置

配置了Object和String的序列化方式

@Configuration
public class RedisConfig {
    //编写自定义redisTemplate
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)
            throws UnknownHostException {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

        //Json序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        //String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        //key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        //value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //hash的value的序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();

        return template;
    }


}

redis工具类

创建redis工具类,进行快速的进行数据存储和读取,最后用作轮询查询时候的标识处理数据的状态

@Component
public class RedisUtils {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }
    /**
     * 普通缓存放入
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 普通缓存放入并设置时间
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */
    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

}

创建消费者

创建消费者(Listener)对消息队列进行监控,监听到消息以后进行消费,数据处理的各个阶段在Redis存储处理的状态,方便轮询查询。

注意:Redis需要设置过期时间(一周),防止内存泄漏

@Service
@Slf4j
public class MqConsumer {
    @Autowired
    RedisUtils redisUtils;
    /**
     * @param channel 用于确保消息被正确的处理
     * @param timeStamp 使用时间戳标识不同的消息请求,方便后期轮询查询
     * @param tag 每一个消息的独一无二的tag
     */
    @RabbitListener(queues = "fanoutQueue")
    public void BuidConsumer(Channel channel, String timeStamp, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {

        try {
            redisUtils.set(timeStamp, "消息开始处理......");
            log.info("开始处理编号为: " + timeStamp + " 的消息了");
            dealData(timeStamp);
            channel.basicAck(tag, false);    //  确认消息
            log.info("理编号为: " + timeStamp + " 的消息处理完成");
        } catch (Exception e) {
            try {
                log.info(e.getMessage(), e);
                channel.basicNack(tag, false, false); // 丢入死信队列
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
    @RabbitListener(queues = "dm-queue")
    public void buildDeadConsumer(Channel channel, String timeStamp, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            log.info(timeStamp + "进入处理异常"); // 日志记录处理异常进入死信队列的消息
            channel.basicAck(tag, false);
            redisUtils.set(timeStamp, "异常处理完成 ...."); // 消息队列直接处理完成
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    // 耗时处理数据的方法
    public void dealData(String timeStamp) {
        redisUtils.set(timeStamp, "消息正在处理 ....");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            redisUtils.set(timeStamp, "消息处理发生异常");
            e.printStackTrace();
        }
        redisUtils.set(timeStamp, "消息处理完成....", 7*24*60*60L);  // 设置过期时间,防止内存泄漏
    }

}

创建Mvc层

创建服务层和消息回执

服务层直接把处理的消息放入交换机中,并且返回一个时间戳用来作为任务的唯一标识,用作后面的轮询查询的接口的参数。

**消息回执:**为了确保消息已将被成功的接受并且处理,也保证不会重复的传输数据,一旦消费者成功处理了消息并发送了回执,RabbitMQ 就知道这个消息已经被正确处理。这有助于防止消息被重复处理,确保消费者不会在某些故障情况下多次处理同一条消息。

Service层:

@Service
@Slf4j
public class UserService {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void regCallback() {
        // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                // 如果ack为true代表消息已经收到
                if (!ack) {
                    // 这里可能要进行其他的方式进行存储
                    log.error("MQ队列应答失败,失败原因是:" + cause.toString());
                    return;
                }
                log.info("消息已经成功发送 !");

            }
        });
    }

    public String dealData() {
        String timeStamp =Long.toString(System.currentTimeMillis());
        // 将消息丢进交换机
        rabbitTemplate.convertAndSend("fanoutExchange", "", timeStamp, new CorrelationData(timeStamp));
        return timeStamp;
    }

}

消息回执

image-20231227221940048

rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制:

  • 生产者确认机制
  • 消息持久化存储
  • 消费者确认机制
  • 失败重试机制

实现ConfirmCallback接口

public interface ConfirmCallback {
    void confirm(CorrelationData correlationData, boolean ack, String cause);
}

  • correlationData:与消息关联的数据,通常是一个唯一标识符。在发送消息时,可以通过 CorrelationData 设置关联数据,然后在确认回调中获取。
  • ack:表示消息是否被成功确认。如果为 true,表示消息已成功发送到 Exchange;如果为 false,表示消息发送失败。
  • cause:如果消息发送失败,cause 中包含了失败的原因。

Controller层:

@RestController()
@RequestMapping("/user")
public class UserController {

    @Autowired
    UserService userService;
    @Autowired
    RedisUtils redisUtils;
// 耗时久的处理数据接口
    @GetMapping("/deal")
    public String dealData() {
        return userService.dealData();
    }
// 轮询查询接口
    @GetMapping("/status/{timeStamp}")
    public String getStatus(@PathVariable String timeStamp) {
        return redisUtils.get(timeStamp).toString();
    }

}

创建轮询处理和轮询的接口

调用开始处理的接口然后调用服务层将消息放入交换机,然后给消费者进行处理。并且返回一个时间戳用作的轮询处理的状态的标识。

@RestController("/user")
public class UserController {
    @Autowired
    UserService userService;
    @Autowired
    RedisUtils redisUtils;
    
    @GetMapping("/deal")
    public String dealData() {
        return userService.dealData();
    }
    
    // 根据时间戳进行轮询查询
    @GetMapping("/status/{timeStamp)")
    public String getStatus(@PathVariable String timeStamp) {
        return redisUtils.get(timeStamp).toString();
    }

}

运行结果

目录

下面是我的目录结构,大家可以做一个参考:
在这里插入图片描述

结果

开始数据处理接口,接口马上响应成功,返回一个可以轮询查询的时间戳:

在这里插入图片描述

根据唯一标识时间戳,轮询查看处理过程:

在这里插入图片描述

IDEA控制台界面:连续的访问好几次RabbitMq接口,已经开始排队异步处理
在这里插入图片描述

?

文章来源:https://blog.csdn.net/asdfasaa/article/details/135257605
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。