下单链路分布式事务Seata&MQ可靠消息实战
概述
- Seata 需要注册中心支持;
- 将 DataSource 封装成基于 Seata 的 DataSource 可以解决分库分表逻辑表问题;
- Seata 有三个角色:TC(服务端)、TM(调用方)、RM(被调用方);
分布式事务
在微服务架构中,完成某一个业务功能可能需要横跨多个服务,操作多个数据库。这就涉及到到了分布式事务,需要操作的资源位于多个资源服务器上,而应用需要保证对于多个资源服务器的数据操作,要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同资源服务器的数据一致性。
电商项目下单链路中的分布式事务场景
- 用户下单冻结库存
- 支付成功后修改订单状态,异步扣减真实库存
常见分布式事务解决方案
2PC | TCC | 可靠消息 | 最大努力通知 | |
---|---|---|---|---|
一致性 | 强一致性 | 最终一致性 | 最终一致性 | 最终一致性 |
吞吐量 | 低 | 中 | 高 | 高 |
实现复杂度 | 易 | 难 | 中 | 易 |
电商项目中会结合下单的业务重点讲解两种分布式事务解决方案:
2PC 的方案:基于 Seata AT 实现;
MQ 可靠消息的方案:基于 RocketMQ 事务消息实现;
基于Seata实现用户下单冻结库存场景的分布式事务
Seata架构
在 Seata 的架构中,一共有三个角色:
- TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
其中,TC 为单独部署的 Server 服务端,TM 和 RM 为嵌入到应用中的 Client 客户端(应用服务)。
在 Seata 中,一个分布式事务的生命周期如下:
- TM 请求 TC 开启一个全局事务。TC 会生成一个 XID 作为该全局事务的编号。XID 会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。
- RM 请求 TC 将本地事务注册为全局事务的分支事务,通过全局事务的 XID 进行关联。
- TM 请求 TC 告诉 XID 对应的全局事务是进行提交还是回滚。
- TC 驱动 RM 们将 XID 对应的自己的本地事务进行提交还是回滚。
整合Seata实战
Seata 分 TC、TM 和 RM 三个角色,TC(Server 端)为单独服务端部署,TM 和 RM(Client 端)由业务系统集成。
Seata的版本选择
注意:微服务组件整合的时候需要考虑兼容性问题。
电商项目选择的 Spring Cloud Alibaba 版本是 2.2.6.RELEASE
,所整合的 Seata 版本是 1.3.0。但是低版本的 Seata 环境搭建繁琐,而且 bug 多,所以整合的时候尽量选择更高的版本,比如1.5.x
。
<!--分布式事务 seata 依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.2.8.RELEASE</version>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.5.1</version>
</dependency>
Seata Server(TC)环境搭建
参考官网。
Seata接入微服务
1)引入依赖
spring-cloud-starter-alibaba-seata 内部集成了 Seata,并实现了 xid 传递。
<!--分布式事务 seata 依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.2.8.RELEASE</version>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.5.1</version>
</dependency>
2)微服务对应数据库中添加 undo_log 表(仅AT模式需要)
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
3)微服务 application.yml 中添加 Seata 配置
# seata 配置
seata:
application-id: mall-product
# seata 服务分组,要与服务端配置service.vgroup_mapping的后缀对应
tx-service-group: order-group
registry:
# 指定nacos作为注册中心
type: nacos
nacos:
application: seata-server
server-addr: 192.168.65.103:8848
group: SEATA_GROUP
config:
# 指定nacos作为配置中心
type: nacos
nacos:
server-addr: 192.168.65.103:8848
namespace: 7e838c12-8554-4231-82d5-6d93573ddf32
group: SEATA_GROUP
data-id: seataServer.properties
注意:请确保 client 与 server 的注册中心和配置中心 namespace 和 group 一致。
4)全局事务发起者开启全局事务配置
此处是本项目接入 Seata 最难的地方:原因在于订单表用了分库分表技术(ShardingSphere),Seata 不能对逻辑表进行解析。不能简单的在全局事务发起方使用@GlobalTransactional
// 此处不能使用@GlobalTransactional
@GlobalTransactional(name = "generateOrder", rollbackFor = Exception.class)
public CommonResult generateOrder(OrderParam orderParam, Long memberId) {
...
}
这个问题应该如何解决呢?
Apache ShardingSphere 分布式事务:
- 基于 XA 协议的两阶段事务
- 基于 Seata 的柔性事务
整合 Seata AT 事务时,需要将 TM、RM 和 TC 的模型融入 Apache ShardingSphere 的分布式事务生态中。 在数据库资源上,Seata 通过对接?DataSource?接口,让 JDBC 操作可以同 TC 进行远程通信。同样,Apache ShardingSphere 也是面向?DataSource?接口,对用户配置的数据源进行聚合。 因此,将?DataSource?封装为基于 Seata 的?DataSource?后,就可以将 Seata AT 事务融入到 Apache ShardingSphere 的分片生态中。
ShardingSphere整合Seata
1)引入依赖
<!--shardingsphere整合seata依赖-->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-transaction-base-seata-at</artifactId>
<version>4.1.1</version>
</dependency>
2)配置 seata.conf
包含 Seata 柔性事务的应用启动时,用户配置的数据源会根据?seata.conf?的配置,适配为 Seata 事务所需的?DataSourceProxy,并且注册至 RM 中。
client {
application.id = tulingmall-order-curr
transaction.service.group = tuling-order-group
}
3)开启全局事务配置
// 全局事务交给SeataATShardingTransactionManager管理
@ShardingTransactionType(TransactionType.BASE)
@Transactional
public CommonResult generateOrder(OrderParam orderParam, Long memberId) {
...
}
注意:GlobalTransactional 和 ShardingTransactionType 不能同时出现,此处不能使用@GlobalTransactional
。同时需要关闭数据源自动代理。
seata:
enable-auto-data-source-proxy: false #关闭数据源自动代理,交给sharding-jdbc那边
柔性事务
可靠消息最终一致性方案实现。
可靠消息最终一致性方案是指当事务发起执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。
本地消息表方案
本地消息表这个方案最初是 eBay 提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。
下面以注册送优惠券为例来说明 :
共有两个微服务交互,会员服务和优惠券服务,用户服务负责添加用户,优惠券服务负责赠送优惠券。
交互流程如下:
1)用户注册
用户服务在本地事务新增用户和增加“优惠券消息日志”。(用户表和消息表通过本地事务保证一致)
下面是伪代码:
begin transaction;
// 1.新增用户
// 2.存储优惠券消息日志
commit transation;
这种情况下,本地数据库操作与存储优惠券消息日志处于同一事务中,本地数据库操作与记录消息日志操作具备原子性。
2)定时任务扫描日志
如何保证将消息发送给消息队列呢?
经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。
3)消费消息
如何保证消费者一定能消费到消息呢?
这里可以使用 MQ 的 ack(即消息确认)机制,消费者监听 MQ,如果消费者接收到消息并且业务处理完成后向 MQ 发送 ack(即消息确认),此时说明消费者正常消费消息完成,MQ 将不再向消费者推送消息,否则消费者会不断重试向消费者来发送消息。
优惠券服务接收到“赠送优惠券”消息,开始赠送用户优惠券,成功后消息中间件回应 ack,否则消息中间件将重复投递此消息。
由于消息会重复投递,优惠券服务的“赠送优惠券”功能需要实现幂等性。
RocketMQ事务消息实现
RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;
而 RocketMQ 本身提供的存储机制为事务消息提供了持久化能力;
RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。
在 RocketMQ 4.3 后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了 MQ 内部,解决 Producer 端的消息发送与本地事务执行的原子性问题。
执行流程如下 :
为方便理解我们以注册送优惠券的例子来描述整个流程。
Producer 即 MQ 发送方,本例中是用户服务,负责新增用户。MQ 订阅方即消息消费方,本例中是优惠券服务,负责新增优惠券。
1)Producer 发送事务消息
Producer(MQ 发送方)发送事务消息至 MQ Server,MQ Server 将消息状态标记为 Prepared(预览状态),注意此时这条消息消费者(MQ 订阅方)是无法消费到的。
2)MQ Server 回应消息发送成功
MQ Server 接收到 Producer 发送给的消息则回应发送成功表示 MQ 已接收到消息。
3)Producer 执行本地事务
Producer 端执行业务代码逻辑,通过本地数据库事务控制。
本例中,Producer 执行添加用户操作。
4)消息投递
若 Producer 本地事务执行成功则自动向 MQ Server 发送 commit 消息,MQ Server 接收到 commit 消息后将“增加优惠券消息”状态标记为可消费,此时 MQ 订阅方(优惠券服务)即正常消费消息;
若 Producer 本地事务执行失败则自动向 MQ Server 发送 rollback 消息,MQ Server 接收到 rollback 消息后将删除“增加优惠券消息”。
MQ 订阅方(优惠券服务)消费消息,消费成功则向 MQ 回应 ack,否则将重复接收消息。这里 ack 默认自动回应,即程序执行正常则自动回应 ack。
5)事务回查
如果执行 Producer 端本地事务过程中,执行端挂掉,或者超时,MQ Server 将会不停的询问同组的其他 Producer 来获取事务执行状态,这个过程叫事务回查。MQ Server 会根据事务回查结果来决定是否投递消息。
以上主干流程已由 RocketMQ 实现,对用户则来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。
RocketMQ提供RocketMQLocalTransactionListener接口 :
public interface RocketMQLocalTransactionListener {
/**
发送prepare消息成功此方法被回调,该方法用于执行本地事务
@param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
@param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
@return 返回事务状态,COMMIT :提交 ROLLBACK :回滚 UNKNOW :回调
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg,Object arg);
/**
@param msg 通过获取transactionId来判断这条消息的本地事务执行状态
@return 返回事务状态,COMMIT :提交 ROLLBACK :回滚 UNKNOW :回调
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!