前端带你学后端系列 ①【RocketMQ】
前端带你学后端系列 ①【RocketMQ】
消息队列:我们可以简单理解成
队列
,用各种不同类型的传递方式
,传递各种不同类型消息
Ⅰ 我们为什么要用RocketMQ?这个中间件有啥作用?
限流削峰
,利用队列的特性,让其限流削峰。异步解耦
,上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。而异步调用则会解决这些问题。大数据传递
,分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。
Ⅱ RocketMQ 的组成元素
- 消息(Message),系统所传输信息的
【物理载体】
。 - 消息标识:RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。
- 队列(Message Queue),存储消息的
【物理实体】
。一个【队列Queue】中存储着【多个Message】 - 主题(Topic),首先Topic是
【逻辑概念】
。一个Topic中可以包含多个Queue,一个Topic的Queue也被称为一个Topic中消息的分区(Partition) 。Topic是RocketMQ进行消息订阅的基本单位。消费者订阅主题。 - 标签(Tag),标签是
【逻辑概念】
同一主题下区分不同类型的消息。用于消息分类。可以理解为 Topic是消息的一级分类,Tag是消息的二级分类。 - 分组(Group):对消费者与生产者的分组
- offset:用来管理每个消费队列的不同消费组的消费进度。就是所谓的偏移量。
我们用一张图来显示,会更清晰
Ⅲ RocketMQ 的系统架构
- Producer,RocketMQ中的消息生产者都是以
生产者组(Producer Group)
的形式出现的。 - Consumer,RocketMQ中的消息消费者都是以
消费者组(Consumer Group)
的形式出现的。 - NameServer是一个Broker与Topic路由的
注册中心
,支持Broker的动态注册与发现。 - Broker充当着消息中转角色,负责
存储消息、转发消息
。
Ⅳ 消息是怎么发送的?又是怎么存储的?又是如何拉取的?
消息发送
代码
指定选择策略
,选择Queue发送。
- Producer发送消息之前,会先向NameServer发出
【获取消息Topic的路由信息的请求】
- NameServer返回该Topic的路由表及Broker列表
- Producer根据
代码中指定的Queue【选择策略】
,从Queue列表中选出一个队列,用于后续存储消息 - Produer对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩
- Producer向选择出的Queue所在的Broker发出RPC请求,将消息发送到【选择】出的Queue
Topic的路由表:Topic的路由表的key为Topic名称,value则为所有涉及该Topic的BrokerName列表。
Queue选择算法:轮询算法、最小投递延迟算.我们可以在代码中指定。
消息存储
将
消息、偏移量
等信息组成消息单元,并存储到目录中。
RocketMQ中的消息存储在本地文件系统中,相关文件默认在当前用户主目录下的store目录
中。
- Broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即QueueOffset
- 将queueId、queueOffset等数据,与消息一起封装为
消息单元
- 将消息单元写入到
commitlog
- 同时,形成消息索引条目将消息索引条目分发到相应的consumequeue
消息拉取
消息拉取
Queue、消息offset及消息Tag
- Consumer获取到其要消费
消息所在Queue的消费偏移量offset
,计算出其要消费消息的消息offset - Consumer向Broker发送拉取请求,其中会包含其要
拉取
消息的Queue、消息offset及消息Tag
。 - Broker计算在该consumequeue中的queueOffset。
- 从对应commitlog offset中读取消息单元,并发送给Consumer
Ⅴ 消费者接受消息
消费者消费消息:
- 拉取式消费。Consumer主动从Broker中拉取消息,主动权由Consumer控制
- 推送式消费。该模式下Broker收到数据后会主动推送给Consumer。(发布-订阅模式)
Ⅵ 生产者生产消息
一 发送消息的方式
①同步发送:Producer发出?条消息后,会在收到MQ返回的ACK之后
才发下?条消息。该方式的消息可靠性最高,但消息发送效率太低
。
②异步发送:异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下?条消息
。
③单向发送:单向发送消息是指,Producer仅负责发送消息
,不等待、不处理MQ的ACK。
二 发送顺序消息
顺序消息指的是,严格按照消息的
发送顺序
进行消费的消息(FIFO)。
例如,现在有TOPIC ORDER_STATUS (订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态: 未支付、已支付、发货中、发货成功、发货失败。
根据以上订单状态,生产者从时序上可以生成如下几个消息:
订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中 --> 订单T0000001:发货失败
三 延时消息
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
在分布式定时
调度触发
、任务超时处理
等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
四 事务消息
要么都成功,要么都失败。
五 批量消息
Ⅶ 最佳实践
一 订阅关系的一致性
订阅关系的一致性指的是,同一个消费者组(Group ID相同)下所有Consumer实例所订阅的
Topic
与Tag
及对消息的处理逻辑
必须完全一致
。
否则,消息消费的逻辑就会混乱,甚至导致消息丢失。
二 消费幂等
【消费幂等】的概念:当出现消费者对某条消息【重复消费】的情况时,
【重复消费】
的结果与【消费一次
】的【结果是相同的】
,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的。
常见的出现重复消费的地方
发送
时消息重复。当一条消息已被成功发送到Broker并完成持久化,此时出现了网络闪断
,从而导致Broker对Producer应答失败。消费
时消息重复。消息已投递到Consumer并完成业务处理,当Consumer给Broker反馈应答时网络闪断
,Broker没有接收到消费成功响应。- Rebalance时消息重复。当Consumer Group中的
Consumer数量发生变化
时,或其订阅的Topic的Queue数量发生变化
时,会触发Rebalance,此时Consumer可能会收到曾经被消费过的消息。
常见的解决幂等性的方法:
幂等解决方案的设计中涉及到两项要素:【幂等令牌】,与【唯一性处理】。
- 【幂等令牌】:是生产者和消费者两者中的
既定协议
,通常指具备唯?业务标识的字符串
。例如,订单号、流水号。一般由Producer随着消息一同发送来的。 - 【唯一性处理】:
服务端
通过采用?定的算法策略
,保证同?个业务逻辑不会被重复执行成功多次
。例如,对同一笔订单的多次支付操作,只会成功一次。
幂等性操作的通用性解决方案具体:
- 首先通过
缓存中查找
,如果在缓存中如果已经存在
了某幂等令牌,则说明本次操作是重复性操作
;若缓存没有
命中,则进入下一步
; - 在唯一性处理之前,先在
数据库
中查询幂等令牌作为索引的数据是否
存在,则说明本次操作为重复性操作;若不存在,则进入下一步; - 唯一性处理后,将幂等令牌
写入到缓存
,并将幂等令牌作为唯一索引的数据写入到DB
中;
为啥第一步过了以后,需要第二,三步
【缓存中数据的有效期会出现过期的情况】
以支付场景为例(多次支付幂等问题):
-
①当支付请求到达后,首先在Redis缓存中却获取key为支付流水号的缓存value
②若value不空,则说明本次支付是重复操作,业务系统直接返回调用侧重复支付标识;若value为空,则进入下一步操作 -
①到DBMS中根据支付流水号查询是否存在相应实例若存在,则说明本次支付是重复操作,业务系统直接返回调用侧重复支付标识;
②若不存在,则说明本次操作是首次操作,进入下一步完成唯一性处理; -
①完成支付任务
②将当前支付流水号作为key,任意字符串作为value,通过set(key, value, expireTime)将数据写入到Redis缓存
③将当前支付流水号作为主键,与其它相关数据共同写入到DBMS
三 消息堆积与消费延迟的处理
①消息处理流程中,如果Consumer的消费速度
跟不上Producer的发送速度
,MQ中未处理的消息会越来越多(进的多出的少),这部分消息就被称为堆积消息。
②消息出现堆积
进而会造成消息的消费延迟
。
问题的解决:
为了避免在业务使用时出现非预期的消息堆积和消费延迟问题,需要在【前期设计阶段】对整个业务逻辑进行完善的排查和梳理。其中最重要的就是梳理消息的【消费耗时】和设置【消息消费的并发度】。
【重点】关注以下内容
-
①消息
消费逻辑的计算复杂度
是否过高,代码是否存在无限循环和递归等缺陷。
②消息消费逻辑中的I/O
操作是否是必须的,能否用本地缓存等方案规避。
③消费逻辑中的复杂耗时的操作
是否可以做异步化处理。如果可以,是否会造成逻辑错乱。 -
设置消费
并发度
①逐步调大单个Consumer【节点的线程数】
②根据上下游链路的流量峰值计算出需要设置的【节点数】
Ⅷ 功能特性
一 消息的清理
消息被消费过后会被清理掉吗?不会的。
commitlog文件存在一个过期时间,默认为72小时,即三天。
当然commitlog也会被自动清理
二 消息过滤
消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤。
对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。
三 消费重试与发送重试
1.消费重试
消费者出现异常,消费某条消息失败时, Apache RocketMQ 会根据消费重试策略重新投递该消息进行故障恢复。
2.发送重试
Apache RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, Apache RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!