RocketMQ高级应用
RocketMQ高级应用
1.RocketMQ的顺序消息
1.无序消息
无序消息指的是普通的消息,生产者发送消息,消费者消费消息,顺序没有保障,比如生产者依次发送的消息为1,2,3,消费者接到消息的顺序可能是3,2,1、2,1,3…默认的rocketmq就是这样的普通消息,但是由于队列都是先进先出的一般情况下这样的消息也都是有序的不过有些特殊的情况可能导致消息无序比如分区再均衡、消费者宕机、消费者多线程消费等。如果业务上要求我们的消息必须是严格有序的这样的消息就不太适用于我们的业务,比如排队买票、排队打车、先创建订单再支付订单等业务场景。
2.全局顺序
对于指定的topic所有消息必须严格按照先进先出的顺序发布和消费,比如生产者发送消息的顺序是1,2,3,那么消费者消费的顺序也一定是1,2,3。rocketmq实现全局有序只能用一个message queue,不过只能用一个分区性能是很低的。
3.局部顺序
指的是分区有序,也就是rocketmq中每一个message queue 中的消息是局部严格有序的。rocketmq保证消息严格有序指的就是这个局部有序,也就是每一个分区严格有序。
4.RocketMQ顺序消息的实现
1.message queue中保证消息投递有序
rocketmq中生产者默认投递到消息队列是轮询的投递方式这样无法保证消息有序。
我们需要把需要保证严格有序的消息投放到同一个message queue中这样才能保证消息有序。
2.保证消费者集群有序
消费者默认是多线程消费,比如一共队列中有2条消息message1和message2,message1第一次消费失败了,message2消费成功了,message1重试成功了顺序就变成先消费message2后消费message1了。
需要使用分布式锁,rocketmq采用了分段锁,锁住了单个message queue 保证消息局部有序消费者顺序消费。可以实现MessageListenerOrderly接口替换之前的实现MessageListenerConcurrently。
MessageListenerOrderly内部有分布式锁。
3.顺序消息的劣势
顺序消息性能相对较差,不支持广播模式,不支持延迟消息,不支持半事务消息,不支持异步发送,不支持单次发送,顺序消息必须同步发送。顺序消息其中一条消息消费失败会消息堆积。
4.队列选择器
默认的队列选择器是SelectMessageQueueByRandom随机投递,我们可以使用队列选择器改变消息投递的队列,有序的队列选择器可以用SelectMessageQueueByHash。SelectMessageQueueByHash根据传递的参数的hash值与队列数取余可以把相同业务的数据投递到同一个message queue中。
2.消息的投递策略
1.生产者投递策略
根据不同的队列选择器有不同的生产者消息投递策略。有特殊的业务需求我们也可以自己实现接口写自己的队列选择器指定业务需要的生产者投递策略。
2.消费者分配队列
1.广播消费
消费者messageModel设置为广播模式。
广播模式下一个消息会通知每一个消费者。
2.集群消费
消费者messageModel设置为集群模式。默认为集群消费中的平均分配算法,这个算法类似kafka的range分配算法。不希望分区和消费者的关系频繁变动可以用一致性hash分配算法它类似于kafka的黏性分配。
注意:这里同一个消费者组中不同消费者消费同一个主题消息使用相同的集群分配算法。如果改变分区规则后要一起启动和停止不然有可能存在一个分区被2个消费者订阅的情况。
3.RocketMQ消息保障
1.生产者保障
1.rocketmq发送消息的方式
1.sendOneWay
单次发送 没有状态返回 类似kafka的发送并忘记只负责投递到broker 效率最高消息可能丢失。适合应用于日志业务。
2.sendSync
同步发送就是指 producer 发送消息后,在接收到 broker 服务器响应后才会发送下一条,不过安全性还是必须结合broker的配置来看比如同步复制同步刷盘这样安全性最高性能最差,一般业务用的都是异步刷盘同步复制。适合应用于比较重要的消息并且不在意响应时间的业务场景。
3.sendASync
异步发送就是指 producer 发送消息后,不需要接收到 broker 服务器响应。会通过回调接口来接收broker的响应对结果进行处理。适合于应用于比较在意响应时间的业务场景。
2.发送状态
1.FLUSH_DISH_TIMEOUT
如果broker配置设置了同步刷盘SYNC_FLSUHbroker没有在默认时间(5s)完成刷盘broker会返回此状态码
2.FLUSH_SLAVE_TIMEOUT
如果配置了同步复制SYNC_MASTER,slave broker没有在设定时间完成同步会返回此状态码
3.SLAVE_NOT_AVAILABLE
设置为同步复制SYNC_MASTER,却没有slave broker 会返回此状态码
4.SEND_OK
发送成功,要严格保证消息没有丢失,broker服务器的配置中需要开启SYNC_MASTER 、SYNC_FLSUH
3.RocketMQ发送端重试保障
producer服务器向broker发送消息没有成功即生产者没有收到broker的ack时rocketmq会自动进行重试。写生产者方代码的时候可以手动设置最大重试次数默认重试2次。同步发送和异步发送都有重试,源码可见异步是递归的方式进行重试同步时是在一个for循环中,单次发送是没有重试的。
注意:代码中设置了超时时间,过了超时时间是不会重试的。
4.禁止自动创建topic
1.自动创建topic流程
生产者发送消息时携带一个topic,如果name server没有这个主题,broker接收到消息没有这个topic会用默认的主题去自动创建一个topic进行消息存储broker创建主题后是30s后才会上报到name server,如果我们有多个rocketmq broker集群在这30s如果消息只发送到了一个broker之后的所有消息都只能用这个broker来发送了,因为broker之间互相不会共享信息兵器不会拉取name server的信息broker只负责上报,这样集群资源得不到充分的利用所以生产环境一般是关闭自动创建主题的。
5.发送端规避
比如生产者发送一个topic到broker服务器,broker服务器有3个有4个messagequeue在这3台broker中,其中一台broker宕机了,因为生产者客户端每隔 30s更新一次路由所以生产者最快30s才能得知其中一台broker宕机了。rocketmq使用重试机制规避这样的问题比如第一次发送到broker1失败了重试的时候会规避掉broker1发送消息到broker2或者broker3。如果开启发送端规避下次发送消息会主动的规避掉broker1。
sendLatencyFaultEnable默认配置是false。可以将它设置为ture开启发送端规避策略这样会有一段时间不发送到宕机的broker节点。
2.消费者保障
1.幂等性处理
rocketmq遵循至少消费一次的的消息交付策略,这种交付策略和重复消费有着因果关系为了做到不丢失消息就无法避免消息重复的问题。我们是要自己维护消息的幂等性的通常我们的消息的key是要雪花算法生成的然后通过redis的特性做消息幂等的处理。
2.消息消费模式
1.集群消费
一条消息消费一次,分消费者组。消息消费失败不会进行重试,消费进度保存在Consumer端
2.广播消费
一条消息消费多次,不分消费者组。消息消费失败有机会进行重试,消费进度集中保存在Broker端。
3推模型和拉模型
rocketmq的推模型是模拟的拉模型不需要自己控制偏移量,可以自动提交偏移量使用起来比较方便通常都是使用rocketmq的推模型。
3.消息确认机制(ACK)
为了保证数据不被丢失,RocketMQ支持消息确认机制,即ack,发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功,中途断电,抛出异常等都不会认为成功都会重新投递。
1.确认消费
回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS就代表消费完成,
2.异常消费
回调函数返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 就代表消费失败了
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker的重试队列(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到死信队列,应用可以监控死信队列来做人工干预。如果是顺序消息的消费场景会阻塞一直不断重试并且不会更新消费进度。
重试队列的消息是随着重试次数的增加重试间隔越长16次后进入死信队列。
抛出异常、返回null、return RECONSUMER_LATER 都会重试。如果不需要重试捕获异常时候的返回COMSUMER_SUCCESS就可以放弃重试一般不推荐使用。
4.RocketMQ死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次 数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息,存储死信消息的特殊队列称为死信队列
1.死信队列特点
1.不能被消费者正常消费
2.有效期与正常消息一样都为3天,3天后消息丢失。
3.一个死信队列对应一个消费者组,不是对应的topic
4.一个死信队列包含消费者组group id 对应的所有topic失败的消息
5.若一个消费者组没有产生过死信消息rocketmq不会为其创建死信队列。
5.RocketMQ延时消息
rocketmq支持18个等级的延迟消息,创建延迟消息的主题标签和队列,在生产者发送消息时指定消息的延迟等级就可以了。可以通过broker配置文件调整这18个等级的消息延时时间。rocketmq开源版本不知道任意等级的延迟消息,我们需要使用rocketmq商业版或者自己基于rocketmq源码的基础上修改源代码才有这样的功能。
6.RocketMQ批量消息
把消息全部放入到list中直接send就可以了。
4.RocketMQ消息存储
分布式队列有高可用要求所以数据需要持久化存储,rocketmq采用类似kafka的文件存储机制直接用磁盘文件来保存消息。
1.消息存储技术
目前的高性能磁盘顺序读写的内存速度、读速度、写速度要比随机读写的速度高几千上万倍。
1.顺序读写
读写一个大文件不需要频繁的寻址,通过文件预读可以加载后续内容,在大并发时处理速度更快,随机读写是无法文件预读的。
2.随机读写
读写多个小文件需要进行多次的寻址和旋转延迟速率远低于顺序读写。
2.零拷贝技术
linux系统分为用户态(用户代码可以操作的)、内核态(操作系统)。内核态帮我们操作磁盘、网卡、内存等硬件。java代码是不能直接操作硬件的我们需要与内核态交互才能操作硬件。 用户态与内核态的交互就是上下文切换。
rocketmq的主从数据同步就需要复制master broker的消息到slave broker这样如果用传统的技术会有多次上下文切换才能完成。
1.传统的文件拷贝
1.read :
磁盘文件 – DMA拷贝 – 内核态缓冲区
内核态缓冲区 – CPU拷贝 – 用户缓冲区
2.write:
用户缓冲区 – CPU拷贝 – socket缓冲区
socket缓冲区 – DMA拷贝 – 网卡
DMA拷贝效率是非常高的,cpu拷贝效率相对比较差
内核态与用户态交互需要上下文切换
2.零拷贝技术优化
零拷贝目标就是尽可能的减少上下文切换次数以及cpu拷贝的次数提升性能
1.mmap + write (rocketmq使用)
使用了内存映射技术,mmap 将用户缓冲区和内核缓冲区共享,读内核缓冲区相当于读用户缓冲区它们公用一块内存地址,这样复制流程简化为:
磁盘文件 – DMA拷贝 – 内核缓冲区
内核缓冲区 – CPU拷贝 – 内核socket缓冲区
内核socket缓冲区 – DMA拷贝 – 网卡
mmap内存映射技术适合用于传递比较小的文件,一次只能映射1.5 - 2g的文件到用户态的虚拟内存 这也是rocketmq默认单个commitlog日志数据文件为1G的原因创建commitlog的时候直接就是连续的1个G的磁盘空间。
2.sendfile (kafka使用)
sendfile适合传递比较大的文件
用sendfile函数替换了read和write,用mmap需要4次读写的上下文切换sendfile只有2次上下文切换。sendfile函数标明了从哪拷贝到哪以及拷贝的字节数。sendfile拷贝流程看上去跟mmap差不多只是用sendfile替代了内存映射技术。它完全不需要内存缓冲区了。
3.SG-DMA
如果网卡支持 SG-DMA,我们可以进一步减少通过 CPU 把内核缓冲区里的数据拷贝到 socket 缓冲区的过程 如果能支持SG-DMA那么流程将简化为
磁盘文件 – DMA拷贝 – 内核缓冲区
内核缓冲区 --SG-DMA拷贝 – 网卡
3.RocketMQ消息存储机制
rocketMQ的存储采用了单一日志文件,把同一个broker的所有topic所有queue全部写入到同一个commitlog文件里,避免了kafka分区副本过多多个文件产生的随机写的问题。
消息存入commitlog后由一个异步线程拉取commitlog中指定消息的标识信息到message queue消费队列中,消费者消费消息的时候会根据这个标识找到消息在commitlog中的位置。
1.commitlog
commitlog是消息的存储主题,producer发送的消息都会存储在commitlog中。文件位置在rocketmq的安装位置/store/commitlog中
文件长度20位 起始的偏移量是20个0代表了第一个文件的位置,第一个文件写满了默认是1个G,第二个文件为00000000001073741824,以此类推消息顺序写入日志文件,一个文件写满了写下一个文件。
2.consumeQueue
consumeQueue文件可以看做是基于commitlog的分区索引文件。文件位置在安装位置/store/consumeQueue中
每一个consumequeue条目存储了消息的关键信息commitog文件中的偏移量、消息长度、tag的hashcode值。
消费者消费消息时根据consumequeue中offset逻辑偏移量对应的实际消息的偏移量随机读取消息因为有索引所以效率也非常高。
3.indexFile
文件位置在安装位置/store/index中,这个文件以时间戳命名是个索引文件存储的是索引数据用来加快消息查询的速度,通过消息Key值查询消息真正的实体内容。
4.config
文件位置在安装位置/store/config中,config 文件夹中 存储着 Topic 和 Consumer 等相关信息。
5.过期文件删除
commitlog 默认的3天删除 指的是3天没有写入新的数据或者服务器磁盘空间不足才会删除。删除时不会关注文件中的消息是否消费了。
5.RocketMQ
消费进度
1.消费进度管理
客户端通过broker的消费进度确定自己需要拉取那些消息。
因为所有消息存储在broker的commitlog中,所以不可能每个消息消费完成删除哪个消息要以时间为维度去删除(默认3天),客户端拉取消息时broker会返回当前消费进度在consumerOffset.json文件中,客户端下次拉取消息时就从这里拉取。
消费者消费完成返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS 才认为是消费成功,如果返回其他rocketmq会认为消费失败,失败的这一批消息从最小偏移量开始都会到失败队列中重试,默认重试16次,重试间隔时间随着重试次数递增,如果最后还是失败消息会投递到死信队列中,在死信队列的消息与rabbitmq的死信交换机死信队列是不同的,rocketmq的死信队列不可以消费的需要人工干预。
2.消息ACK机制
rocketmq是以consumer group + queue为单位来管理消费进度的。 以consumer offset标记在queue上的消费进度。每次消费成功后本地消费进度都会更新然后由定时任务定时同步到broker,以此持久化消费进度,每次记录一批消息最小的offset比如1到10个消息,6卡住了一直消费未成功阻塞了其他都成功了,消费进度offset就是6保证消息不丢失,重启时候消费者会从消费进度offset6开始消费,这样会重复消费7,8,9,10要注意幂等性处理。
6.RocketMQ文件刷盘机制
rocketmq消息是存储在磁盘上的保证断点后可以恢复存储的消息超出内存限制。
基于mmap内存映射机制尽可能的保证顺序写
1.同步刷盘
需要写到磁盘才ACK
消息持久化到磁盘后,RocketMQ的Broker端才会真正地返回给Producer端一个成功的ACK响应,同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用领域。
2.异步刷盘
写到虚拟内存就ACK,由异步线程写到磁盘上
降低了读写延迟,提高了MQ的性能和吞吐量
3.主从复制
rocketmq broker中有一个master和一个slave,消息需要从master复制到slave
1.同步复制
等待master和slave都写入成功后才返回给生产者ACK响应。
可靠性高,master宕机后slave有全部备份数据
会降低系统吞吐量
2.异步复制
只要master写成功就返回给生产者ACK响应。然后由异步线程写入slave broker
若master宕机时如果有一部分消息还没来得及写入slave会存在消息丢失的问题。
延迟低系统吞吐量高。
4.应用
通常生产的rocketmq使用同步复制异步刷盘。
7.Dledger高可用集群
Dledger是 RocketMQ 4.5 引入的实现高可用集群的一项技术,该模式下集群会随机选出一个节点作为Master,当Master节点挂了后,会从Slave中自动选出一个节点升级成为Master。
Dledger会从集群中选举出Master 节点,完成Master 节点往 Slave 节点的消息同步,且接管Broker的 CommitLog 消息存储,Dledger是使用 Raft 算法来进行节点选举的。
用dledger高可用集群一个broker最少需要1主2从。
如果从普通集群升级为dledger集群模式要注意消息对齐,不然会有消息丢失的问题,其实如果原来已经有稳定运行的rocketmq broker最好不要切换。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!