Java小案例-RocketMQ的11种消息类型,你知道几种?(事务消息)

2023-12-15 21:33:03

前言

上一节给大家讲了Rocket的延迟消息,这一节和大家聊一下事务消息,关于延迟消息大家可以点下面这个链接直接看。

事务消息

事务消息是RocketMQ提供的一种类似X/Open XA的分布式事务功能。通过RocketMQ的事务消息,可以达到分布式事务的最终一致。

事务消息用起来也比较简单,如下所示:

public?class?TransactionMessageDemo?{

????public?static?void?main(String[]?args)?throws?Exception?{
????????TransactionMQProducer?transactionMQProducer?=?new?TransactionMQProducer("sanyouProducer");
????????transactionMQProducer.setNamesrvAddr("192.168.200.143:9876");

????????//设置事务监听器
????????transactionMQProducer.setTransactionListener(new?TransactionListener()?{

????????????@Override
????????????public?LocalTransactionState?executeLocalTransaction(Message?msg,?Object?arg)?{
????????????????//处理本次事务
????????????????return?LocalTransactionState.COMMIT_MESSAGE;
????????????}

????????????@Override
????????????public?LocalTransactionState?checkLocalTransaction(MessageExt?msg)?{
????????????????//检查本地事务
????????????????return?LocalTransactionState.COMMIT_MESSAGE;
????????????}
????????});

????????transactionMQProducer.start();

????????Message?message?=?new?Message("sanyouTopic",?"java日记".getBytes());

????????//发送消息
????????transactionMQProducer.sendMessageInTransaction(message,?new?Object());
????}

}

事务消息发送相对于前面的例子主要有以下不同:

  • 将前面的DefaultMQProducer换成TransactionMQProducer

  • 需要设置事务的监听器TransactionListener,来执行本地事务

  • 发送方法改成 sendMessageInTransaction

为什么要这么改,接下来我们来讲讲背后的实现原理

上一节在说延迟消息的时候提到,RocketMQ使用到了SCHEDULE_TOPIC_XXXX这个中转Topic,来偷梁换柱实现延迟消息

不仅仅是延迟消息,事务消息其实也是这么干的,它也会进行偷梁换柱,将消息先存在RMQ_SYS_TRANS_HALF_TOPIC这个Topic下,同时也会将消息真正的Topic和队列id存到额外信息中,操作都是一样滴

由于消息不在真正目标的Topic下,所以这条消息消费者也是消费不到滴

当消息成功存储之后,服务端会向生产者响应,告诉生产者我消息存储成功了,你可以执行本地事务了

之后生产者就会执行本地执行事务,也就是执行如下方法

TransactionListener#executeLocalTransaction

当本地事务执行完之后,会将执行的结果发送给服务端

服务端会根据事务的执行状态来执行对应的处理结果

  • commit:提交事务消息,跟延迟消息一样,重新构建一条消息,Topic和队列id都设置成消息真正的Topic和队列id,然后重新存到CommitLog文件,这样消费者就可以消费到消息了

  • rollback:回滚消息,其实并没有实际的操作,因为消息本身就不在真正的Topic下,所以消费者压根就消费不到,什么都不做就可以了

  • unknown:本地事务执行异常时就是这个状态,这个状态下会干一些事,咱们后面再说

所以在正常情况下,事务消息整个运行流程如下图所示

既然有正常情况下,那么就有非正常情况下

比如前面提到的抛异常导致unknown,又或者什么乱七八糟的原因,导致无法正常提交本地事务的执行状态,那么此时该怎么办呢?

RocketMQ当然也想到了,他有自己的一套补偿机制

RocketMQ内部会起动一个线程,默认每隔1分钟去检查没有被commit或者rollback的事务消息

RocketMQ内部有一套机制,可以找出哪些事务消息没有commit或者rollback,这里就不细说了

当发现这条消息超过6s没有提交事务状态,那么此时就会向生产者发送一个请求,让生产者去检查一下本地的事务执行的状态,就是执行下面这行代码

TransactionListener#checkLocalTransaction

之后会将这个方法返回的事务状态提交给服务端,服务端就可以知道事务的执行状态了

这里有一个细节需要注意,事务消息检查次数不是无限的,默认最大为15次,一旦超过15次,那么就不会再被检查了,而是会直接把这个消息存到TRANS_CHECK_MAX_TIME_TOPIC

所以你可以从这个Topic读取那些无法正常提交事务的消息

这就是RocketMQ事务消息的原理

总结

RocketMQ事务消息的实现主要是先将消息存到RMQ_SYS_TRANS_HALF_TOPIC这个中间Topic,有些资料会把这个消息称为半消息(half消息),这是因为这个消息不能被消费

之后会执行本地的事务,提交本地事务的执行状态

RocketMQ会根据事务的执行状态去判断commit或者是rollback消息,也就是是不是可以让消费者消费这条消息的意思

在一些异常情况下,生产者无法及时正确提交事务执行状态

RocketMQ会向生产者发送消息,让生产者去检查本地的事务,之后再提交事务状态

当然,这个检查次数默认不超过15次,如果超过15次还未成功提交事务状态,RocketMQ就会直接把这个消息存到TRANS_CHECK_MAX_TIME_TOPIC

联系方式

关于文章中大家有任何疑问可以通过关注公众号《编程乐学》进行留言,同时,公众号还有更多有趣的项目以及关于学习编程的笔记资料大家可以看看,欢迎大家进行留言。

文章来源:https://blog.csdn.net/weixin_54449574/article/details/135025037
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。