异步消息原理

2023-12-23 10:37:19
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

在日常开发中,偶尔需要在主业务逻辑之外做一些附加操作,比如下单成功后通知商家、课程报名成功后通知老师、简历投递成功后通知HR。一般来讲,这些业务是不适合放在主线程中的:

@Slf4j
@SpringBootTest
public class AsyncNotifyTest {

    @Test
    public void testAsyncNotify() throws InterruptedException {

        long start = System.currentTimeMillis();

        // 投递简历,插入投递记录
        TimeUnit.SECONDS.sleep(2);
        log.info("插入投递记录完毕...");

        // 发送短信通知HR,并留存发送记录
        notifyHR("mx", "叉车师傅");
        writeMsg("mx", "叉车师傅");

        log.info("耗时:{}毫秒", System.currentTimeMillis() - start);
    }

    public void notifyHR(String username, String jobName) throws InterruptedException {
        TimeUnit.SECONDS.sleep(1);
        log.info("【发送消息】HR你好,用户:{}, 投递你的岗位:{}", username, jobName);
    }

    public void writeMsg(String username, String jobName) {
        // 留存消息发送记录
        log.info("【保存消息】保存到数据库, 用户:{}, 岗位:{}", username, jobName);
    }

}

结果

[main] INFO - 插入投递记录完毕...

[main] INFO - 【发送消息】HR你好,用户:mx, 投递你的岗位:叉车师傅

[main] INFO - 【保存消息】保存到数据库, 用户:mx, 岗位:叉车师傅

[main] INFO - com.bravo.happy.AsyncNotifyTest - 耗时:3019毫秒

消息通知等附加操作为什么不适合放在主流程呢?

  • 首先,消息通知相对没那么重要,即使发送失败了,一般还有发送记录,重新发送或者只要能追溯即可
  • 其次,在主流程中加入消息通知会减慢响应速度
  • 最后,万一消息发送失败,还可能导致事务回滚,但系统本身其实是没有问题的

多线程异步消息

一个解决办法是使用多线程,把消息发送的逻辑单独放在一个异步线程中执行,主流程处理完毕直接返回即可。为了尽可能简单,这里就不配置线程池或使用@Async了,换CompletableFuture做演示:

@Slf4j
@SpringBootTest
public class AsyncNotifyTest {

    @Test
    public void testAsyncNotify() throws InterruptedException {

        long start = System.currentTimeMillis();

        // 投递简历,插入投递记录
        TimeUnit.SECONDS.sleep(2);
        log.info("插入投递记录完毕...");

        // 异步发送短信通知HR,并留存发送记录
        CompletableFuture.runAsync(() -> {
            try {
                notifyHR("bravo1988", "叉车师傅");
                writeMsg("bravo1988", "叉车师傅");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        log.info("耗时:{}毫秒", System.currentTimeMillis() - start);

        // 为了观察到异步线程里的打印信息,主线程sleep一会儿
        TimeUnit.SECONDS.sleep(2);
    }

    public void notifyHR(String username, String jobName) throws InterruptedException {
        TimeUnit.SECONDS.sleep(1);
        log.info("【发送消息】HR你好,用户:{}, 投递你的岗位:{}", username, jobName);
    }

    public void writeMsg(String username, String jobName) {
        // 留存消息发送记录
        log.info("【保存消息】保存到数据库, 用户:{}, 岗位:{}", username, jobName);
    }

}

结果

[main] INFO - 插入投递记录完毕...

[main] INFO - 耗时:2145毫秒

[ForkJoinPool.commonPool-worker-9] INFO - 【发送消息】HR你好,用户:mx, 投递你的岗位:叉车师傅

[ForkJoinPool.commonPool-worker-9] INFO - 【保存消息】保存到数据库, 用户:mx, 岗位:叉车师傅

可以看到,主线程耗时变成了2秒,如此一来用户整个投递简历的响应时间缩短了。

Spring事件监听机制

具体请参考Spring事件监听机制,本质上和多线程异步消息是一样的。

Redis实现消息队列

上面多线程版本的异步消息其实已经挺不错了,但小概率的情况下可能会出现消息丢失(虽然当前情境下无所谓):

  • 情况1:消息过多,线程数不够触发拒绝策略
  • 情况2:异步线程宕机了,消息丢失(类似于消费者挂了)

此时可以考虑使用Redis做一个简单的消息队列,数据类型可以选择List。

对于Redis的List,如果使用lpush+rpop即可实现先进先出的简单队列,而如果配合brpop则可实现阻塞队列。所谓的brpop,其实就是blocking right pop,即阻塞等待队列中的消息,一旦有消息被push进队列就从右边取出消费。

注意,rpop和brpop的区别是,rpop发现队列为空直接返回null,不会等待,也不能设置等待时间 :

lpush和brpop反映到Java代码里,可以使用RedisTemplate或StringRedisTemplate实现。

public interface RedisService {

    /**
     * 向队列插入消息
     *
     * @param queue 自定义队列名称
     * @param obj   要存入的消息
     */
    void pushQueue(String queue, Object obj);

    /**
     * 从队列取出消息
     *
     * @param queue    自定义队列名称
     * @param timeout  最长阻塞等待时间
     * @param timeUnit 时间单位
     * @return
     */
    Object popQueue(String queue, long timeout, TimeUnit timeUnit);
}
@Component
public class RedisServiceImpl implements RedisService {

    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private ObjectMapper objectMapper;

    @Override
    public void pushQueue(String queue, Object obj) {
        try {
            redisTemplate.opsForList().leftPush(queue, objectMapper.writeValueAsString(obj));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Object popQueue(String queue, long timeout, TimeUnit timeUnit) {
        return redisTemplate.opsForList().rightPop(queue, timeout, timeUnit);
    }
}
Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisQueueTest {

    @Autowired
    private RedisService redisService;

    public static final String ORDER_MESSAGE = "order_message";

    @Test
    public void testRedisBlockingQueue() throws InterruptedException {
        // 订单服务
        orderService("bravo1988", 10086L);

        // 启动消费者,取出消息,逐一发送
        new Thread(this::consumeMsg).start();

        // 10秒后再发一条消息,模拟第二次下单
        TimeUnit.SECONDS.sleep(10);
        orderService("bravo2020", 99999L);

        // 等待一会儿,观察第二条消息
        TimeUnit.SECONDS.sleep(10);
    }

    public void orderService(String username, Long orderId) {
        // 1.操作数据库,插入订单

        // 2.其他操作

        // 3.发送消息
        redisService.pushQueue(ORDER_MESSAGE, new Order(username, orderId));
    }

    public void consumeMsg() {
        for (; ; ) {
            Object order = redisService.popQueue(ORDER_MESSAGE, 5, TimeUnit.SECONDS);
            log.info("每隔5秒循环获取,期间for循环阻塞");
            if (order != null) {
                log.info("order:{}", order.toString());
            }
        }
    }

    @Data
    @AllArgsConstructor
    static class Order {
        private String username;
        private Long resumeId;
    }

}

Redis实现消息队列的好处是,可以把消息存起来慢慢消费,而且项目挂了不影响已经存入的消息,重新启动后仍可继续消费:

可能有同学不禁要问:如果消息还没发送到队列中就丢失了呢?发送方也无法感知(没有应答机制),所以Redis作为消息队列还是存在很多问题的。

那为什么要在这一章节安排Redis实现消息队列呢?

首先,就我个人的感受而言,入行后有很长一段时间我都对Redis很抵触、很畏惧,导致自己一直停滞不前,其实Redis并没有我们想的那么难,只要你敢动手去敲,就会迅速熟悉起来(其他技术也是如此)。

其次,实际开发中一些小项目还是有人会用,主要是从系统复杂性考虑,不轻易引入MQ,所以仍有学习的必要。尤其是对于一些消息通知,丢了就丢了,影响不是特别大,而订单操作就不适合用Redis这么简陋的消息队列了。

上面的demo仅仅用作学习,大家有兴趣可以自行拓展,Redis还提供了消息的发布/订阅模式:

用过MQ的同学会觉得上面的模式很熟悉!

小坑

我在Redis实现消息通知的代码中,留了一个小坑,是大家平时可能会不小心犯的。

提示:和SpringBoot定时任务故事中我同事遇到的类似的坑。

在执行上面的代码时可能遇到:

nested exception is io.lettuce.core.RedisException: io.lettuce.core.RedisException: Connection closed

这是因为主线程结束导致Redis断开,而两个for循环还在操作队列。实际生产环境一般不会有问题,因为线程是一直跑着的。

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

进群,大家一起学习,一起进步,一起对抗互联网寒冬

文章来源:https://blog.csdn.net/smart_an/article/details/135165030
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。