Seata客户端启动流程

2023-12-16 00:40:16

自动装配

Springboot启动的时候会将下面这几个类进行自动装配
在这里插入图片描述
在这里插入图片描述

SeataRestTemplateAutoConfiguration(装载拦截器)

这里会装配 SeataRestTemplateInterceptor (Seata的拦截器)

@Configuration(proxyBeanMethods = false)
public class SeataRestTemplateAutoConfiguration {

	@Bean
	public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
		return new SeataRestTemplateInterceptor();
	}

	@Bean
	public SeataRestTemplateInterceptorAfterPropertiesSet seataRestTemplateInterceptorConfiguration() {
		return new SeataRestTemplateInterceptorAfterPropertiesSet();
	}

}

在bean初始化的时候会将拦截器装配到 RestTemplate 中

public class SeataRestTemplateInterceptorAfterPropertiesSet implements InitializingBean {

	@Autowired(required = false)
	private Collection<RestTemplate> restTemplates;

	@Autowired
	private SeataRestTemplateInterceptor seataRestTemplateInterceptor;
     
    
	@Override
	public void afterPropertiesSet() {
		if (this.restTemplates != null) {
			for (RestTemplate restTemplate : restTemplates) {
				List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>(
						restTemplate.getInterceptors());
				interceptors.add(this.seataRestTemplateInterceptor);
				restTemplate.setInterceptors(interceptors);
			}
		}
	}

}

这样每次调用rest接口的时候都会走到拦截器中进行逻辑处理

	@Override
	public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
			ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
		// 1、对httpRequest做一个包装
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
        //2、从本地事务上下文(ThreadLocal)中获取全局事务xid
		String xid = RootContext.getXID();
         // 如果可以从本地事务上下文中获取到全局事务xid,则将其添加到httpRequest的请求头上;
		if (!StringUtils.isEmpty(xid)) {
			requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
		}
        //执行http请求
		return clientHttpRequestExecution.execute(requestWrapper, bytes);
	}

SeataHandlerInterceptorConfiguration

启动的时候添加一个拦截器 拦截针对的路径为:/**

@ConditionalOnWebApplication
public class SeataHandlerInterceptorConfiguration implements WebMvcConfigurer {

	@Override
	public void addInterceptors(InterceptorRegistry registry) {
		registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**");
	}

}

SeataHandlerInterceptor
XID是全局事务统一的标识

preHandle方法:执行请求之前,从本地事务上下文(线程本地变量ThreadLocal)中获取全局事务xid;从请求头中获取全局事务xid(rpcXid);如果xid不存在,但是rpcXid存在,则将rpcXid作为xid绑定到全局事务上下文(线程本地变量ThreadLocal)上;
afterCompletion方法:请求执行完毕之后;如果全局事务上下文中xid存在,但是请求头中rpcXid不存在,则不对xid进行处理,
但是如果全局事务上下文中xid存在 并且 请求头中包括rpcXid,则将xid从全局事务上下文中移除,如果移除的xid和rpcXid不相等,则将rpcXid作为xid绑定到全局事务上下文中。

    
	@Override
	public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
			Object handler) {
         
		String xid = RootContext.getXID();
		String rpcXid = request.getHeader(RootContext.KEY_XID);
		if (log.isDebugEnabled()) {
			log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
		}

		if (xid == null && rpcXid != null) {
			RootContext.bind(rpcXid);
			if (log.isDebugEnabled()) {
				log.debug("bind {} to RootContext", rpcXid);
			}
		}
		return true;
	}


	@Override
	public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
			Object handler, Exception e) {

		String rpcXid = request.getHeader(RootContext.KEY_XID);

		if (StringUtils.isEmpty(rpcXid)) {
			return;
		}

		String unbindXid = RootContext.unbind();
		if (log.isDebugEnabled()) {
			log.debug("unbind {} from RootContext", unbindXid);
		}
		if (!rpcXid.equalsIgnoreCase(unbindXid)) {
			log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
			if (unbindXid != null) {
				RootContext.bind(unbindXid);
				log.warn("bind {} back to RootContext", unbindXid);
			}
		}
	}

SeataFeignClientAutoConfiguration

处理Feign调用
在这里插入图片描述
这三个build最后都是build一个SeataFeignClient用来处理
在通过OpenFeign指定请求之前,也是会从事务上下文中获取xid,然后将其放到请求头中;

	@Override
	public Response execute(Request request, Request.Options options) throws IOException {

		Request modifiedRequest = getModifyRequest(request);
		return this.delegate.execute(modifiedRequest, options);
	}

	private Request getModifyRequest(Request request) {
        //获取全局事务ID
		String xid = RootContext.getXID();
        //如果是空直接返回
		if (StringUtils.isEmpty(xid)) {
			return request;
		}

		Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
		headers.putAll(request.headers());

		List<String> seataXid = new ArrayList<>();
		seataXid.add(xid);
		headers.put(RootContext.KEY_XID, seataXid);
        //设置全局事务id到请求头中组装新的请求
		return Request.create(request.method(), request.url(), headers, request.body(),
				request.charset());
	}

SeataAutoConfiguration

    @Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
    @ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
    public SpringApplicationContextProvider springApplicationContextProvider() {
        return new SpringApplicationContextProvider();
    }
    //异常处理
    @Bean(BEAN_NAME_FAILURE_HANDLER)
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }
    //全局事务管理器
    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }
    //Seata数据源代理
    @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
    @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
        return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());
    }

初始化全局事务管理器

GlobalTransactionScanner 实现了InitializingBean接口在bean初始化的时候会调用到afterPropertiesSet方法中进行Seata客户端的建立 用来和Seata服务端建立连接

    @Override
    public void afterPropertiesSet() {
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        //初始化客户端
        initClient();
    }
    private void initClient() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Initializing Global Transaction Clients ... ");
        }
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
            throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
        }
        //初始化TM(事务管理者)
        TMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }
        //初始化RM(资源管理者)
        RMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global Transaction Clients are initialized. ");
        }
        //注册关闭钩子
        registerSpringShutdownHook();

    }

初始化TM(事务管理者)

    public static void init(String applicationId, String transactionServiceGroup) {
         //创建TM
        TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
        //初始化并启动TM
        tmNettyRemotingClient.init();
    }

创建TM

    public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
        TmNettyRemotingClient tmRpcClient = getInstance();
        //应用id
        tmRpcClient.setApplicationId(applicationId);
        //事务分组
        tmRpcClient.setTransactionServiceGroup(transactionServiceGroup);
        tmRpcClient.setAccessKey(accessKey);
        tmRpcClient.setSecretKey(secretKey);
        return tmRpcClient;
    }
    public static TmNettyRemotingClient getInstance() {
        if (instance == null) {
            synchronized (TmNettyRemotingClient.class) {
                if (instance == null) {
                    NettyClientConfig nettyClientConfig = new NettyClientConfig();
                    final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                            nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
                            KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                            new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
                            new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
                                    nettyClientConfig.getClientWorkerThreads()),
                            RejectedPolicies.runsOldestTaskPolicy());
                    instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
                }
            }
        }
        return instance;
    }

初始化TM

    @Override
    public void init() {
        // registry processor
        registerProcessor();
        if (initialized.compareAndSet(false, true)) {
            super.init();
        }
    }
    @Override
    public void init() {
        //启动一个延时60s 每隔10秒对Seata Server 发起一个重新连接请求
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                //和 Seata Server重新连接
                clientChannelManager.reconnect(getTransactionServiceGroup());
            }
        }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
        //是否启动客户端批量发送请求
        if (NettyClientConfig.isEnableClientBatchSendRequest()) {
            mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                MAX_MERGE_SEND_THREAD,
                KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
            mergeSendExecutorService.submit(new MergedSendRunnable());
        }
        //每隔3秒执行一次定时任务 做超时检查
        super.init();
        //启动Netty客户端组件
        clientBootstrap.start();
    }

初始化RM(资源管理者)

    public static void init(String applicationId, String transactionServiceGroup) {
        //创建RM
        RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
        rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
        rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
        //初始化并启动RM
        rmNettyRemotingClient.init();
    }

初始化数据库连接

    //Seata数据源代理
    @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
    @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
        return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());
    }

SeataAutoDataSourceProxyCreator

 private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());
     @Override
    protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Auto proxy of [{}]", beanName);
        }
        //这里会构建一个SeataAutoDataSourceProxyAdvice的代理类 然后当去进行增强的时候会走到SeataAutoDataSourceProxyAdvice的invoke方法
        return new Object[]{advisor};
    }

SeataAutoDataSourceProxyAdvice

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        //添加数据源
        DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
        Method method = invocation.getMethod();
        Object[] args = invocation.getArguments();
        Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
        if (m != null) {
            return m.invoke(dataSourceProxy, args);
        } else {
            return invocation.proceed();
        }
    }

这里会去做添加数据源信息 因为这里new了一个 DataSourceProxy 然后会到 DataSourceProxy的构造方法进行数据源的初始化

    public DataSourceProxy putDataSource(DataSource dataSource) {
        return this.dataSourceProxyMap.computeIfAbsent(dataSource, DataSourceProxy::new);
    }

DataSourceProxy

    public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        super(targetDataSource);
        init(targetDataSource, resourceGroupId);
    }
    private void init(DataSource dataSource, String resourceGroupId) {
        this.resourceGroupId = resourceGroupId;
        //读取jdbc 等信息
        try (Connection connection = dataSource.getConnection()) {
            //jdbcURL信息
            jdbcUrl = connection.getMetaData().getURL();
            //db类型
            dbType = JdbcUtils.getDbType(jdbcUrl);
            if (JdbcConstants.ORACLE.equals(dbType)) {
                userName = connection.getMetaData().getUserName();
            }
        } catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }
        //将资源注册到TC
        DefaultResourceManager.get().registerResource(this);
        if (ENABLE_TABLE_META_CHECKER_ENABLE) {
            tableMetaExcutor.scheduleAtFixedRate(() -> {
                try (Connection connection = dataSource.getConnection()) {
                    TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                        .refresh(connection, DataSourceProxy.this.getResourceId());
                } catch (Exception ignore) {
                }
            }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }

RM和TM的注册

在这里插入图片描述
可以看到在DataSource初始化完成之后调用 DefaultResourceManager.get().registerResource(this);会去做一个资源的注册 这里最后会走到
RmNettyRemotingClient和TmNettyRemotingClient的onRegisterMsgSuccess方法中进行RM和TM的注册

RM注册

    @Override
    public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,
                                     AbstractMessage requestMessage) {
        RegisterRMRequest registerRMRequest = (RegisterRMRequest)requestMessage;
        RegisterRMResponse registerRMResponse = (RegisterRMResponse)response;
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("register RM success. client version:{}, server version:{},channel:{}", registerRMRequest.getVersion(), registerRMResponse.getVersion(), channel);
        }
        getClientChannelManager().registerChannel(serverAddress, channel);
        String dbKey = getMergedResourceKeys();
        if (registerRMRequest.getResourceIds() != null) {
            if (!registerRMRequest.getResourceIds().equals(dbKey)) {
                sendRegisterMessage(serverAddress, channel, dbKey);
            }
        }

    }

TM注册

    @Override
    public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,
                                     AbstractMessage requestMessage) {
        RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;
        RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel);
        }
        getClientChannelManager().registerChannel(serverAddress, channel);
    }

总结

1.客户端启动的时候会通过SpringBoot的自动装配机制加载配置类
2.设置RestTemplate的拦截器用来拦截所有mvc请求 和Feign的拦截器拦截Feign请求设置全局事务ID
3.初始化TM(事务管理者) RM(资源管理者) 建立Netty客户端的通道并启动
4.初始化数据源信息
5.注册TM 和RM到SeataServer

文章来源:https://blog.csdn.net/qq_41956309/article/details/135015014
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。