异步消息原理
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬
在日常开发中,偶尔需要在主业务逻辑之外做一些附加操作,比如下单成功后通知商家、课程报名成功后通知老师、简历投递成功后通知HR。一般来讲,这些业务是不适合放在主线程中的:
@Slf4j
@SpringBootTest
public class AsyncNotifyTest {
@Test
public void testAsyncNotify() throws InterruptedException {
long start = System.currentTimeMillis();
// 投递简历,插入投递记录
TimeUnit.SECONDS.sleep(2);
log.info("插入投递记录完毕...");
// 发送短信通知HR,并留存发送记录
notifyHR("mx", "叉车师傅");
writeMsg("mx", "叉车师傅");
log.info("耗时:{}毫秒", System.currentTimeMillis() - start);
}
public void notifyHR(String username, String jobName) throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
log.info("【发送消息】HR你好,用户:{}, 投递你的岗位:{}", username, jobName);
}
public void writeMsg(String username, String jobName) {
// 留存消息发送记录
log.info("【保存消息】保存到数据库, 用户:{}, 岗位:{}", username, jobName);
}
}
结果
[main] INFO - 插入投递记录完毕...
[main] INFO - 【发送消息】HR你好,用户:mx, 投递你的岗位:叉车师傅
[main] INFO - 【保存消息】保存到数据库, 用户:mx, 岗位:叉车师傅
[main] INFO - com.bravo.happy.AsyncNotifyTest - 耗时:3019毫秒
消息通知等附加操作为什么不适合放在主流程呢?
- 首先,消息通知相对没那么重要,即使发送失败了,一般还有发送记录,重新发送或者只要能追溯即可
- 其次,在主流程中加入消息通知会减慢响应速度
- 最后,万一消息发送失败,还可能导致事务回滚,但系统本身其实是没有问题的
多线程异步消息
一个解决办法是使用多线程,把消息发送的逻辑单独放在一个异步线程中执行,主流程处理完毕直接返回即可。为了尽可能简单,这里就不配置线程池或使用@Async了,换CompletableFuture做演示:
@Slf4j
@SpringBootTest
public class AsyncNotifyTest {
@Test
public void testAsyncNotify() throws InterruptedException {
long start = System.currentTimeMillis();
// 投递简历,插入投递记录
TimeUnit.SECONDS.sleep(2);
log.info("插入投递记录完毕...");
// 异步发送短信通知HR,并留存发送记录
CompletableFuture.runAsync(() -> {
try {
notifyHR("bravo1988", "叉车师傅");
writeMsg("bravo1988", "叉车师傅");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
log.info("耗时:{}毫秒", System.currentTimeMillis() - start);
// 为了观察到异步线程里的打印信息,主线程sleep一会儿
TimeUnit.SECONDS.sleep(2);
}
public void notifyHR(String username, String jobName) throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
log.info("【发送消息】HR你好,用户:{}, 投递你的岗位:{}", username, jobName);
}
public void writeMsg(String username, String jobName) {
// 留存消息发送记录
log.info("【保存消息】保存到数据库, 用户:{}, 岗位:{}", username, jobName);
}
}
结果
[main] INFO - 插入投递记录完毕...
[main] INFO - 耗时:2145毫秒
[ForkJoinPool.commonPool-worker-9] INFO - 【发送消息】HR你好,用户:mx, 投递你的岗位:叉车师傅
[ForkJoinPool.commonPool-worker-9] INFO - 【保存消息】保存到数据库, 用户:mx, 岗位:叉车师傅
可以看到,主线程耗时变成了2秒,如此一来用户整个投递简历的响应时间缩短了。
Spring事件监听机制
具体请参考Spring事件监听机制,本质上和多线程异步消息是一样的。
Redis实现消息队列
上面多线程版本的异步消息其实已经挺不错了,但小概率的情况下可能会出现消息丢失(虽然当前情境下无所谓):
- 情况1:消息过多,线程数不够触发拒绝策略
- 情况2:异步线程宕机了,消息丢失(类似于消费者挂了)
此时可以考虑使用Redis做一个简单的消息队列,数据类型可以选择List。
对于Redis的List,如果使用lpush+rpop即可实现先进先出的简单队列,而如果配合brpop则可实现阻塞队列。所谓的brpop,其实就是blocking right pop,即阻塞等待队列中的消息,一旦有消息被push进队列就从右边取出消费。
注意,rpop和brpop的区别是,rpop发现队列为空直接返回null,不会等待,也不能设置等待时间 :
lpush和brpop反映到Java代码里,可以使用RedisTemplate或StringRedisTemplate实现。
public interface RedisService {
/**
* 向队列插入消息
*
* @param queue 自定义队列名称
* @param obj 要存入的消息
*/
void pushQueue(String queue, Object obj);
/**
* 从队列取出消息
*
* @param queue 自定义队列名称
* @param timeout 最长阻塞等待时间
* @param timeUnit 时间单位
* @return
*/
Object popQueue(String queue, long timeout, TimeUnit timeUnit);
}
@Component
public class RedisServiceImpl implements RedisService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ObjectMapper objectMapper;
@Override
public void pushQueue(String queue, Object obj) {
try {
redisTemplate.opsForList().leftPush(queue, objectMapper.writeValueAsString(obj));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
@Override
public Object popQueue(String queue, long timeout, TimeUnit timeUnit) {
return redisTemplate.opsForList().rightPop(queue, timeout, timeUnit);
}
}
Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisQueueTest {
@Autowired
private RedisService redisService;
public static final String ORDER_MESSAGE = "order_message";
@Test
public void testRedisBlockingQueue() throws InterruptedException {
// 订单服务
orderService("bravo1988", 10086L);
// 启动消费者,取出消息,逐一发送
new Thread(this::consumeMsg).start();
// 10秒后再发一条消息,模拟第二次下单
TimeUnit.SECONDS.sleep(10);
orderService("bravo2020", 99999L);
// 等待一会儿,观察第二条消息
TimeUnit.SECONDS.sleep(10);
}
public void orderService(String username, Long orderId) {
// 1.操作数据库,插入订单
// 2.其他操作
// 3.发送消息
redisService.pushQueue(ORDER_MESSAGE, new Order(username, orderId));
}
public void consumeMsg() {
for (; ; ) {
Object order = redisService.popQueue(ORDER_MESSAGE, 5, TimeUnit.SECONDS);
log.info("每隔5秒循环获取,期间for循环阻塞");
if (order != null) {
log.info("order:{}", order.toString());
}
}
}
@Data
@AllArgsConstructor
static class Order {
private String username;
private Long resumeId;
}
}
Redis实现消息队列的好处是,可以把消息存起来慢慢消费,而且项目挂了不影响已经存入的消息,重新启动后仍可继续消费:
可能有同学不禁要问:如果消息还没发送到队列中就丢失了呢?发送方也无法感知(没有应答机制),所以Redis作为消息队列还是存在很多问题的。
那为什么要在这一章节安排Redis实现消息队列呢?
首先,就我个人的感受而言,入行后有很长一段时间我都对Redis很抵触、很畏惧,导致自己一直停滞不前,其实Redis并没有我们想的那么难,只要你敢动手去敲,就会迅速熟悉起来(其他技术也是如此)。
其次,实际开发中一些小项目还是有人会用,主要是从系统复杂性考虑,不轻易引入MQ,所以仍有学习的必要。尤其是对于一些消息通知,丢了就丢了,影响不是特别大,而订单操作就不适合用Redis这么简陋的消息队列了。
上面的demo仅仅用作学习,大家有兴趣可以自行拓展,Redis还提供了消息的发布/订阅模式:
用过MQ的同学会觉得上面的模式很熟悉!
小坑
我在Redis实现消息通知的代码中,留了一个小坑,是大家平时可能会不小心犯的。
提示:和SpringBoot定时任务故事中我同事遇到的类似的坑。
在执行上面的代码时可能遇到:
nested exception is io.lettuce.core.RedisException: io.lettuce.core.RedisException: Connection closed
这是因为主线程结束导致Redis断开,而两个for循环还在操作队列。实际生产环境一般不会有问题,因为线程是一直跑着的。
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
进群,大家一起学习,一起进步,一起对抗互联网寒冬
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!