前端带你学后端系列 ①【RocketMQ】

2023-12-14 16:41:26

消息队列:我们可以简单理解成队列,用各种不同类型的传递方式,传递各种不同类型消息

Ⅰ 我们为什么要用RocketMQ?这个中间件有啥作用?

  1. 限流削峰,利用队列的特性,让其限流削峰。
  2. 异步解耦,上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。而异步调用则会解决这些问题。
  3. 大数据传递,分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。

Ⅱ RocketMQ 的组成元素

  1. 消息(Message),系统所传输信息的【物理载体】
  2. 消息标识:RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。
  3. 队列(Message Queue),存储消息的【物理实体】。一个【队列Queue】中存储着【多个Message】
  4. 主题(Topic),首先Topic是【逻辑概念】。一个Topic中可以包含多个Queue,一个Topic的Queue也被称为一个Topic中消息的分区(Partition) 。Topic是RocketMQ进行消息订阅的基本单位。消费者订阅主题。
  5. 标签(Tag),标签是【逻辑概念】同一主题下区分不同类型的消息。用于消息分类。可以理解为 Topic是消息的一级分类,Tag是消息的二级分类。
  6. 分组(Group):对消费者与生产者的分组
  7. offset:用来管理每个消费队列的不同消费组的消费进度。就是所谓的偏移量。

我们用一张图来显示,会更清晰
在这里插入图片描述

Ⅲ RocketMQ 的系统架构

  1. Producer,RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。
  2. Consumer,RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。
  3. NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。
  4. Broker充当着消息中转角色,负责存储消息、转发消息
    在这里插入图片描述

Ⅳ 消息是怎么发送的?又是怎么存储的?又是如何拉取的?

消息发送

代码指定选择策略,选择Queue发送。

  1. Producer发送消息之前,会先向NameServer发出【获取消息Topic的路由信息的请求】
  2. NameServer返回该Topic的路由表及Broker列表
  3. Producer根据代码中指定的Queue【选择策略】,从Queue列表中选出一个队列,用于后续存储消息
  4. Produer对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩
  5. Producer向选择出的Queue所在的Broker发出RPC请求,将消息发送到【选择】出的Queue

Topic的路由表:Topic的路由表的key为Topic名称,value则为所有涉及该Topic的BrokerName列表。
Queue选择算法:轮询算法、最小投递延迟算.我们可以在代码中指定。

消息存储

消息、偏移量等信息组成消息单元,并存储到目录中。

RocketMQ中的消息存储在本地文件系统中,相关文件默认在当前用户主目录下的store目录中。

  1. Broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即QueueOffset
  2. 将queueId、queueOffset等数据,与消息一起封装为消息单元
  3. 将消息单元写入到commitlog
  4. 同时,形成消息索引条目将消息索引条目分发到相应的consumequeue

消息拉取

消息拉取Queue、消息offset及消息Tag

  1. Consumer获取到其要消费消息所在Queue的消费偏移量offset ,计算出其要消费消息的消息offset
  2. Consumer向Broker发送拉取请求,其中会包含其要拉取消息的Queue、消息offset及消息Tag
  3. Broker计算在该consumequeue中的queueOffset。
  4. 从对应commitlog offset中读取消息单元,并发送给Consumer

Ⅴ 消费者接受消息

消费者消费消息:

  1. 拉取式消费。Consumer主动从Broker中拉取消息,主动权由Consumer控制
  2. 推送式消费。该模式下Broker收到数据后会主动推送给Consumer。(发布-订阅模式)

Ⅵ 生产者生产消息

一 发送消息的方式

①同步发送:Producer发出?条消息后,会在收到MQ返回的ACK之后才发下?条消息。该方式的消息可靠性最高,但消息发送效率太低

②异步发送:异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下?条消息

③单向发送:单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。


二 发送顺序消息

顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。

例如,现在有TOPIC ORDER_STATUS (订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态: 未支付、已支付、发货中、发货成功、发货失败。

根据以上订单状态,生产者从时序上可以生成如下几个消息:
订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中 --> 订单T0000001:发货失败

在这里插入图片描述



三 延时消息

延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

在分布式定时调度触发任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。



四 事务消息

要么都成功,要么都失败。

在这里插入图片描述

在这里插入图片描述



五 批量消息

在这里插入图片描述



Ⅶ 最佳实践

一 订阅关系的一致性

订阅关系的一致性指的是,同一个消费者组(Group ID相同)下所有Consumer实例所订阅的TopicTag及对消息的处理逻辑 必须完全一致
否则,消息消费的逻辑就会混乱,甚至导致消息丢失。

二 消费幂等

【消费幂等】的概念:当出现消费者对某条消息【重复消费】的情况时,【重复消费】的结果与【消费一次】的【结果是相同的】,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的。

常见的出现重复消费的地方

  1. 发送时消息重复。当一条消息已被成功发送到Broker并完成持久化,此时出现了网络闪断,从而导致Broker对Producer应答失败。
  2. 消费时消息重复。消息已投递到Consumer并完成业务处理,当Consumer给Broker反馈应答时网络闪断,Broker没有接收到消费成功响应。
  3. Rebalance时消息重复。当Consumer Group中的Consumer数量发生变化时,或其订阅的Topic的Queue数量发生变化时,会触发Rebalance,此时Consumer可能会收到曾经被消费过的消息。

常见的解决幂等性的方法:

幂等解决方案的设计中涉及到两项要素:【幂等令牌】,与【唯一性处理】。

  1. 【幂等令牌】:是生产者和消费者两者中的既定协议,通常指具备唯?业务标识的字符串。例如,订单号、流水号。一般由Producer随着消息一同发送来的。
  2. 【唯一性处理】:服务端通过采用?定的算法策略,保证同?个业务逻辑不会被重复执行成功多次。例如,对同一笔订单的多次支付操作,只会成功一次。

幂等性操作的通用性解决方案具体:

  1. 首先通过缓存中查找,如果在缓存中如果已经存在了某幂等令牌,则说明本次操作是重复性操作;若缓存没有命中,则进入下一步
  2. 在唯一性处理之前,先在数据库中查询幂等令牌作为索引的数据是否存在,则说明本次操作为重复性操作;若不存在,则进入下一步;
  3. 唯一性处理后,将幂等令牌写入到缓存,并将幂等令牌作为唯一索引的数据写入到DB中;

为啥第一步过了以后,需要第二,三步
【缓存中数据的有效期会出现过期的情况】

以支付场景为例(多次支付幂等问题):

  1. ①当支付请求到达后,首先在Redis缓存中却获取key为支付流水号的缓存value
    ②若value不空,则说明本次支付是重复操作,业务系统直接返回调用侧重复支付标识;若value为空,则进入下一步操作

  2. ①到DBMS中根据支付流水号查询是否存在相应实例若存在,则说明本次支付是重复操作,业务系统直接返回调用侧重复支付标识;
    ②若不存在,则说明本次操作是首次操作,进入下一步完成唯一性处理;

  3. ①完成支付任务
    ②将当前支付流水号作为key,任意字符串作为value,通过set(key, value, expireTime)将数据写入到Redis缓存
    ③将当前支付流水号作为主键,与其它相关数据共同写入到DBMS

三 消息堆积与消费延迟的处理

①消息处理流程中,如果Consumer的消费速度 跟不上Producer的发送速度,MQ中未处理的消息会越来越多(进的多出的少),这部分消息就被称为堆积消息。
②消息出现堆积进而会造成消息的消费延迟

问题的解决:

为了避免在业务使用时出现非预期的消息堆积和消费延迟问题,需要在【前期设计阶段】对整个业务逻辑进行完善的排查和梳理。其中最重要的就是梳理消息的【消费耗时】和设置【消息消费的并发度】。

【重点】关注以下内容

  1. ①消息消费逻辑的计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。
    ②消息消费逻辑中的I/O操作是否是必须的,能否用本地缓存等方案规避。
    ③消费逻辑中的复杂耗时的操作是否可以做异步化处理。如果可以,是否会造成逻辑错乱。

  2. 设置消费并发度
    ①逐步调大单个Consumer【节点的线程数】
    ②根据上下游链路的流量峰值计算出需要设置的【节点数】

Ⅷ 功能特性

一 消息的清理

消息被消费过后会被清理掉吗?不会的。

commitlog文件存在一个过期时间,默认为72小时,即三天。
当然commitlog也会被自动清理

二 消息过滤

消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤。

对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。

三 消费重试与发送重试

1.消费重试

消费者出现异常,消费某条消息失败时, Apache RocketMQ 会根据消费重试策略重新投递该消息进行故障恢复。

>

在这里插入图片描述

2.发送重试

Apache RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, Apache RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。

在这里插入图片描述

文章来源:https://blog.csdn.net/sugerfle/article/details/134993642
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。