Seata中AT模式的实现原理04-GlobalLock的作用

2023-12-25 23:01:25

前言

在AT模式下全局事务的隔离性是读未提交,即RM一阶段写入的数据,在二阶段提交前就能被其他session读到
为了解决脏读和脏写的问题,Seata提供了GlobalLock注解。
如果客户端要避免脏读,在业务方法上标注GlobalLock走Seata代理逻辑,此外使用select for update作为查询语句走Seata代理数据源;
如果客户端要避免脏写,在业务方法上标注GlobalLock走Seata代理逻辑,所有update、insert、delete都会走Seata代理数据源。

源码解析

在@GlobalLock注解方法执行前,GlobalTransactionalInterceptor拦截并在上下文中标注启用GlobalLock(RootContext.bindGlobalLockFlag),然后执行业务方法。
GlobalTransactionalInterceptor

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class<?> targetClass =
        methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        final GlobalTransactional globalTransactionalAnnotation =
            getAnnotation(method, targetClass, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
        if (!localDisable) {
            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                // GlobalTransactional...
            } else if (globalLockAnnotation != null) {
                return handleGlobalLock(methodInvocation, globalLockAnnotation);
            }
        }
    }
    return methodInvocation.proceed();
}

private Object handleGlobalLock(final MethodInvocation methodInvocation, final GlobalLock globalLockAnno) throws Throwable {
    return globalLockTemplate.execute(new GlobalLockExecutor() {
        @Override
        public Object execute() throws Throwable {
            return methodInvocation.proceed();
        }

        @Override
        public GlobalLockConfig getGlobalLockConfig() {
            GlobalLockConfig config = new GlobalLockConfig();
            config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
            config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
            return config;
        }
    });
}

public class GlobalLockTemplate {
    public Object execute(GlobalLockExecutor executor) throws Throwable {
        boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
        if (!alreadyInGlobalLock) {
            // 设置启用GlobalLock标志
            RootContext.bindGlobalLockFlag();
        }
        GlobalLockConfig myConfig = executor.getGlobalLockConfig();
        // 绑定GlobalLock配置
        GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);

        try {
            // 执行业务方法
            return executor.execute();
        } finally {
            if (!alreadyInGlobalLock) {
                RootContext.unbindGlobalLockFlag();
            }

            if (previousConfig != null) {
                GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
            } else {
                GlobalLockConfigHolder.remove();
            }
        }
    }
}

脏读场景处理

对于脏读场景,客户端业务sql要使用select for update语句,seata会代理这个查询sql,通过SelectForUpdateExecutor执行:
1.设置连接非自动提交
2.执行业务select for update语句,获得本地锁
3.查询业务sql涉及的所有id主键列,构建lockKeys
4.发送GlobalLockQueryRequest给TC,如果TC发现资源+主键id被其他全局事务锁住(lock_table),这里会抛出异常
SelectForUpdateExecutor

public T doExecute(Object... args) throws Throwable {
    Connection conn = statementProxy.getConnection();
    DatabaseMetaData dbmd = conn.getMetaData();
    T rs;
    Savepoint sp = null;
    boolean originalAutoCommit = conn.getAutoCommit();
    try {
        // 1. 设置非自动提交
        if (originalAutoCommit) {
            conn.setAutoCommit(false);
        } else if (dbmd.supportsSavepoints()) {
            sp = conn.setSavepoint();
        } else {
            throw new SQLException("not support savepoint. please check your db version");
        }

        LockRetryController lockRetryController = new LockRetryController();
        ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
        String selectPKSQL = buildSelectSQL(paramAppenderList);
        while (true) {
            try {
                // 2. 执行业务查询方法 select for update 获取本地锁
                rs = statementCallback.execute(statementProxy.getTargetStatement(), args);

                // 3. 查询需要获取全局锁的主键id集合
                TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
                String lockKeys = buildLockKey(selectPKRows);
                if (StringUtils.isNullOrEmpty(lockKeys)) {
                    break;
                }

                // 4. 发送GlobalLockQueryRequest给TC,如果抛出异常则代表结果集有未提交数据
                if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
                    statementProxy.getConnectionProxy().checkLock(lockKeys);
                } else {
                    throw new RuntimeException("Unknown situation!");
                }
                break;
            } catch (LockConflictException lce) {
                if (sp != null) {
                    conn.rollback(sp);
                } else {
                    conn.rollback();
                }
                lockRetryController.sleep(lce);
            }
        }
    } finally {
        // ...
    }
    return rs;
}

脏写场景处理

对于脏写场景,发送GlobalLockQueryRequest发生在commit阶段,在这之前已经在例如UpdateExecutor中获取了本地锁。

private void doCommit() throws SQLException {
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        processLocalCommitWithGlobalLocks();
    } else {
        targetConnection.commit();
    }
}

private void processLocalCommitWithGlobalLocks() throws SQLException {
    // 发送GlobalLockQueryRequest给TC,如果抛出异常则代表结果集有未提交数据
    checkLock(context.buildLockKeys());
    try {
        targetConnection.commit();
    } catch (Throwable ex) {
        throw new SQLException(ex);
    }
    context.reset();
}

DataSourceManager

public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys) throws TransactionException {
    GlobalLockQueryRequest request = new GlobalLockQueryRequest();
    request.setXid(xid);
    request.setLockKey(lockKeys);
    request.setResourceId(resourceId);
    try {
        GlobalLockQueryResponse response;
        if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
            response = (GlobalLockQueryResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
        } else {
            throw new RuntimeException("unknow situation!");
        }

        if (response.getResultCode() == ResultCode.Failed) {
            throw new TransactionException(response.getTransactionExceptionCode(),
                "Response[" + response.getMsg() + "]");
        }
        return response.isLockable();
    } catch (TimeoutException toe) {
        throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
    } catch (RuntimeException rex) {
        throw new RmTransactionException(TransactionExceptionCode.LockableCheckFailed, "Runtime", rex);
    }
}

如果TC用db存储,底层和RM一阶段获取全局锁差不多。执行select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified,status from lock_table where row_key in ( ? ) order by status desc,如果结果xid与当前全局事务不一致,则返回isLockable=false,代表锁被其他全局事务争用,GlobalLock作用的数据对于客户端不可见,实现全局事务读已提交隔离级别。

总结

Seata(AT 模式)的默认全局隔离级别是 读未提交
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
在这里插入图片描述
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

文章来源:https://blog.csdn.net/qq_41956309/article/details/135103207
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。