分布式事务seata使用示例及注意事项

2023-12-15 17:50:10

示例说明

  • 有两个微服务A、B。微服务A连接本地数据库ai_vs_remote_demo,并向表xl中插入一条数据;微服务B连接远程数据库hrm_db,并向表job_inf中插入一条数据;数据库ai_vs_remote_demo与hrm_db在不同的物理机、不同的地方。
  • 示例模拟这样一种情况:微服务A成功插入一条数据,微服务B在插入数据的业务过程中抛出异常,以此来验证分布式事务的回滚:微服务B的数据插入失败,同时微服务A插入的数据会被回滚!

代码

调用方(客户端):微服务A , 服务提供方(服务器端):微服务B

调用方(微服务A)

  • controller
@RestController
public class ClassForTest {
	Logger logger = LoggerFactory.getLogger(this.getClass());

	@Autowired
	private XlServiceImpl xlServiceImpl;

	@GetMapping("/dosth")
	public CheckListPo doSth() throws Exception {
		String ne = xlServiceImpl.doInsert();
		return null;
	}

}
  • service
@Service
@Transactional(rollbackFor = Exception.class)
public class XlServiceImpl {
	Logger logger = LoggerFactory.getLogger(this.getClass());
	@Autowired
	private XlMapper xlMapper;

	@Autowired
	private XlFeignClient xlFeignClient;


	@GlobalTransactional(rollbackFor = Exception.class)
	public String doInsert() throws Exception {
		// 演示如何获取seata的xid
		String xid = RootContext.getXID();
		logger.info("++++++++xid="+xid);
		// 本地数据库插入一条数据(向数据库ai_vs_remote_demo的表xl中插入一条数据)
		xlMapper.insert2();
		// openfeign调用远程微服务B(在微服务B中会向数据库hrm_db的表job_inf中插入一条数据)
		Integer ne = xlFeignClient.invokeBusi();
		return "";
	}
}
  • feigclient
@FeignClient(contextId = "xl20231108", name = "checklist-service")
public interface XlFeignClient {	
	@PostMapping("/checklistservice/xl/test")
	Integer invokeBusi() throws Exception;
}

服务方(微服务B)

  • controller
@RestController
public class XlContoller {

	@Autowired
	private XlBusiServiceImpl XlBusiServiceImpl;

	@PostMapping("/checklistservice/xl/test")
	public Integer invokeBusi() throws Exception {
		XlBusiServiceImpl.test();
		return 3;
	}
}
  • service ,这里会抛出异常:以测试分布式事务的回滚
@Service
@Transactional(rollbackFor = Exception.class)
public class XlBusiServiceImpl {
	
	@Autowired
	private XlBusiMapper xlBusiMapper;
	
	public String test() {
		xlBusiMapper.insert1();
		int x = 0;
		int y = 3 / x ;
		return "";
	}
	
}

测试

测试一 ,seata发挥作用,成功回滚!

  • 先看下测试前数据库的数据
    在这里插入图片描述
    在这里插入图片描述
  • 运行项目:http://localhost:8088/dosth
  1. 微服务A中的数据库操作语句xlMapper.insert2();执行成功!
    在这里插入图片描述

  2. 微服务B中抛出异常!
    在这里插入图片描述
    以上两点符合预期

  3. 微服务A中同样也抛出了异常,但是异常信息并不是微服务A传递过来的, 这是个疑点!后面会解释!

feign.codec.DecodeException: Could not extract response: no suitable HttpMessageConverter found for response type [class java.lang.Integer] and content type [text/html;charset=UTF-8]
	at feign.AsyncResponseHandler.decode(AsyncResponseHandler.java:119) ~[feign-core-10.12.jar:?]
	at feign.AsyncResponseHandler.handleResponse(AsyncResponseHandler.java:87) ~[feign-core-10.12.jar:?]
	at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:138) ~[feign-core-10.12.jar:?]
	at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:89) ~[feign-core-10.12.jar:?]
	at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:100) ~[feign-core-10.12.jar:?]
	at com.sun.proxy.$Proxy236.invokeBusi(Unknown Source) ~[?:?]
	at com.omomcam.service.impl.xl.XlServiceImpl.doInsert(XlServiceImpl.java:43) ~[classes/:?]
  1. 最终数据库的结果,因为微服务A抛出了异常,所以自然会回滚插入的数据!
    在这里插入图片描述

测试二:处理feignclient接口的返回类型从Integer变成String,其他的环境及代码完全一样,但是结果是“数据没有回滚:一致性未能保证”

  • 同样运行项目并访问:http://localhost:8088/dosth。
  • 微服务B抛出同样的异常
    在这里插入图片描述
  • 但是,微服务A没有抛出异常!
    在这里插入图片描述
    也就是说,在微服务A中的如下代码一切正常执行完毕!
	@GlobalTransactional(rollbackFor = Exception.class)
	public String doInsert() throws Exception {
		// 演示如何获取seata的xid
		String xid = RootContext.getXID();
		logger.info("++++++++xid="+xid);
		// 本地数据库插入一条数据(向数据库ai_vs_remote_demo的表xl中插入一条数据)
		xlMapper.insert2();
		// openfeign调用远程微服务B(在微服务B中会向数据库hrm_db的表job_inf中插入一条数据)
		String ne = xlFeignClient.invokeBusi();
		return "";
	}

既然,程序正常执行完毕,那当然也不会存在数据的回滚了

  • 数据库验证:数据是否回滚
    刚刚访问http://localhost:8088/dosth 了两次
    在这里插入图片描述
  • 未回滚原因分析

上面已经提到了,因为微服务A中代码正常执行完成并没有抛出异常,所以是不会触发回滚机制的:

那为什么(对比“测试一”)将feignclient的中接口的返回类型从Integer改为String,调用方(微服务A)就不报错了呢?

原因参考:openfeign客户端A调用服务B,服务B抛出异常时,客户端A接收的几种情况

结论一:要保证事务自动回滚,则需要有对应的异常抛出,才会触发自动回滚机制

测试三,针对如下场景:服务端(微服务B)抛出了异常,但是业务上又不能直接抛出,需要try…catch…捕获后并处理再返回。 在这种场景下如何保证分布式事务的回滚?—手动回滚

  • 服务端抛出的异常被捕获了,而调用端又必须要有异常抛出才会回滚,这是相互矛盾的。那么要解决这个矛盾就只有手动进行回滚!

  • 手动回滚代码

 			try {
				GlobalTransactionContext.reload(RootContext.getXID()).rollback();
			} catch (TransactionException e1) {
				e1.printStackTrace();
			}

手动回滚的时机/位置:是在服务端还是调用端呢?
经测试手动回滚的代码,只能放在调用端,放在服务端无效!

  • 调用端示例代码
    特别说明:手动回滚时,不能同时使用io.seata.spring.annotation.GlobalTransactional注解和org.springframework.transaction.annotation.Transactional注解因为这里使用的是全局事务@GlobalTransactional的手动回滚功能,所以不能有@Transactional注解,至少要保证在@GlobalTransactional的方法上不能有@Transactional注解的作用(注意:这个时候也不能在类上加@Transactional了,因为其会作用于所有的方法)
package com.omomcam.service.impl.xl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.omomcam.dao.xl.XlMapper;
import com.omomcam.entity.common.ResponseData;
import com.omomcam.feign.XlFeignClient;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;

@Service
//注释掉本注解,如果其他非@GlobalTransactional的方法上需要@Transactional注解,可单独加在方法上
//@Transactional(rollbackFor = Exception.class) 
public class XlServiceImpl {
	Logger logger = LoggerFactory.getLogger(this.getClass());
	
	@Autowired
	private XlMapper xlMapper;

	@Autowired
	private XlFeignClient xlFeignClient;

	public void fb() {
		xlMapper.insert1();
		int y = 0;
		int x = 3/y;
	}

	@GlobalTransactional(rollbackFor = Exception.class)
	public String doInsert() throws Exception {
		// 本地数据库插入一条数据(向数据库ai_vs_remote_demo的表xl中插入一条数据)
		xlMapper.insert2();
		// openfeign调用远程微服务B(在微服务B中会向数据库hrm_db的表job_inf中插入一条数据)
		ResponseData<String> rd = xlFeignClient.invokeBusi();
		if (rd.getRespCode() != 200) {// 非200(失败)时,手动回滚事务,可根据具体业务自己灵活定义
			try {
				GlobalTransactionContext.reload(RootContext.getXID()).rollback();
			} catch (TransactionException e1) {
				e1.printStackTrace();
			}
		}
		return "";
	}

}
  • 服务端代码

抛出异常,但是会捕获并处理返回业务信息

抛出异常的代码:

package com.omomcam.checklistservice.service.impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.omomcam.checklistservice.dao.XlBusiMapper;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;

@Service
@Transactional(rollbackFor = Exception.class) 
public class XlBusiServiceImpl {
	
	@Autowired
	private XlBusiMapper xlBusiMapper;
	
	public String test() {
		xlBusiMapper.insert1();
		int x = 0;
		int y = 3 / x ;
		return "";
	}
}

捕获异常并返回自定义信息给调用端,代码如下:

package com.omomcam.checklistservice.controller;

import javax.servlet.http.HttpServletResponse;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import com.omomcam.checklistservice.service.impl.XlBusiServiceImpl;
import com.omomcam.entity.common.ResponseData;

@RestController
public class XlContoller {

	@Autowired
	private XlBusiServiceImpl XlBusiServiceImpl;

	@PostMapping("/checklistservice/xl/test")
	public ResponseData<String> invokeBusi(HttpServletResponse response) throws Exception {
		@SuppressWarnings("unchecked")
		ResponseData<String> rd = (ResponseData<String>) ResponseData.init();
		try {
			XlBusiServiceImpl.test();
		} catch (Exception e) {
			rd.setRespCode(400);
			rd.setRespMsg("服务端发生了异常");
			rd.setData("其他信息。。。");
		}
		return rd;
	}
}
  • 测试

说明:尽管服务端的异常被自己捕获了,返回了自定的相关信息,即是调用端没有收到任何异常。但是,调用端采用了手动回滚的方式,所以,调用端插入的数据仍然会被回滚!

运行前数据库中的数据:
在这里插入图片描述
执行过程
在这里插入图片描述
执行后数据库中的数据:
在这里插入图片描述
对比前后数据库及执行过程可知:数据回滚了!

结论二: 业务上或交互上不允许抛出异常时,可采用上面的手动回滚的方式实现分布式事务

测试三 考虑这样一种场景:服务A调用服务B,服务B调用服务C ; 即是 A—>B—>C的情况;B即是服务端也是调用端

如果B发生异常,则需要回滚A!上面的 测试二 已经测试了此种情况

如果C发生异常,则需要回滚B以及A!下面测试此种情况

  • 测试 (在上面“测试二”的基础上修改代码)

因为这里只有两个微服务A和B ,所以A处理是自己以外还扮演C的角色。即是调用链为 A——>B——>A。详细解释即是:调用端A——>服务端B 调用端B——>服务端A

  • 修改服务B的代码
    因为服务B既作为服务端有作为调用端,故需要保证服务端B中的数据库操作正常,以验证服务端A抛出异常后服务端B的数据可以回滚并且调用端A的数据库操作同样能够回滚!

保证XlBusiServiceImpl.test();调用正常:注释掉一下两行代码

package com.omomcam.checklistservice.service.impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.omomcam.checklistservice.dao.XlBusiMapper;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;

@Service
@Transactional(rollbackFor = Exception.class) 
public class XlBusiServiceImpl {
	
	@Autowired
	private XlBusiMapper xlBusiMapper;
	
	public String test() {
		xlBusiMapper.insert1();
//		int x = 0;
//		int y = 3 / x ;
		return "";
	}
	
}

服务端B及调用端B的完整代码如下,因为B这里又作为调用端所以需要加上 手动回滚 的代码以及注解 @GlobalTransactional(rollbackFor = Exception.class)

package com.omomcam.checklistservice.controller;

import javax.servlet.http.HttpServletResponse;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import com.omomcam.checklistservice.feign.OnlineReadingRestFeignClient;
import com.omomcam.checklistservice.service.impl.XlBusiServiceImpl;
import com.omomcam.entity.common.ResponseData;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;

@RestController
public class XlContoller {

	@Autowired
	private XlBusiServiceImpl XlBusiServiceImpl;

	@Autowired
	private OnlineReadingRestFeignClient onlineReadingRestFeignClient;

	@GlobalTransactional(rollbackFor = Exception.class) // 需要加上 本注解
	@PostMapping("/checklistservice/xl/test")
	public ResponseData<String> invokeBusi(HttpServletResponse response) throws Exception {
		@SuppressWarnings("unchecked")
		ResponseData<String> rd = (ResponseData<String>) ResponseData.init();
		try {
			XlBusiServiceImpl.test();
		} catch (Exception e) {
			rd.setRespCode(400);
			rd.setRespMsg("服务端发生了异常");
			rd.setData("其他信息。。。");
			return rd; // 抛出异常后,直接返回
		}
		@SuppressWarnings("unchecked")
		ResponseData<Integer> resd = (ResponseData<Integer>) ResponseData.init();
		try {
			// 本"服务端"又反过去调用 "调用端" 即:本"服务端"现在扮演的是一个新的"调用端",原"调用端"变成了新的"服务端"
			resd = onlineReadingRestFeignClient.xlSeataTest();
			if (resd.getRespCode() != 200) { // 不等于200,说明发生了异常,则手动回滚
				try {
					GlobalTransactionContext.reload(RootContext.getXID()).rollback();
				} catch (TransactionException e1) {
					e1.printStackTrace();
				}
			}
		} catch (Exception e) {
			rd.setRespCode(resd.getRespCode());
			rd.setRespMsg(resd.getRespMsg());
			return rd;
		}
		return rd;
	}
}

  • 修改服务A的代码

增加提供服务的接口方法,同时要保证返回的转态码不等于200:使xlServiceImpl.fb();抛出异常;详细见如下代码:

	@PostMapping("/seata/xl/test")
	public ResponseData<Integer> xlSeataTest() {
		@SuppressWarnings("unchecked")
		ResponseData<Integer> rd = (ResponseData<Integer>) ResponseData.init();
		try {
			xlServiceImpl.fb();
		} catch (Exception e) {
			rd.setRespCode(500);
		}
		return rd;
	}

完整controller代码:

package com.omomcam.controller.xl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import com.omomcam.entity.CheckListPo;
import com.omomcam.entity.common.ResponseData;
import com.omomcam.service.impl.xl.XlServiceImpl;

@RestController
public class ClassForTest {
	Logger logger = LoggerFactory.getLogger(this.getClass());

	@Autowired
	private XlServiceImpl xlServiceImpl;

	@GetMapping("/dosth")
	public CheckListPo doSth() throws Exception {
		String ne = xlServiceImpl.doInsert();
		return null;
	}
	
	@PostMapping("/seata/xl/test")
	public ResponseData<Integer> xlSeataTest() {
		@SuppressWarnings("unchecked")
		ResponseData<Integer> rd = (ResponseData<Integer>) ResponseData.init();
		try {
			xlServiceImpl.fb();
		} catch (Exception e) {
			rd.setRespCode(500);
		}
		return rd;
	}
}

完整service代码:

package com.omomcam.service.impl.xl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.omomcam.dao.xl.XlMapper;
import com.omomcam.entity.common.ResponseData;
import com.omomcam.feign.XlFeignClient;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;

@Service
//注释掉本注解,如果其他非@GlobalTransactional的方法上需要@Transactional注解,可单独加在方法上
//@Transactional(rollbackFor = Exception.class) 
public class XlServiceImpl {
	Logger logger = LoggerFactory.getLogger(this.getClass());
	
	@Autowired
	private XlMapper xlMapper;

	@Autowired
	private XlFeignClient xlFeignClient;
	
	@Transactional(rollbackFor = Exception.class) 
	public void fb() {
		xlMapper.insert1();
		int y = 0;
		int x = 3/y;
	}

	@GlobalTransactional(rollbackFor = Exception.class)
	public String doInsert() throws Exception {
		// 本地数据库插入一条数据(向数据库ai_vs_remote_demo的表xl中插入一条数据)
		xlMapper.insert2();
		// openfeign调用远程微服务B(在微服务B中会向数据库hrm_db的表job_inf中插入一条数据)
		ResponseData<String> rd = xlFeignClient.invokeBusi();
		if (rd.getRespCode() != 200) { // 非200(失败)时,手动回滚事务
			try {
				GlobalTransactionContext.reload(RootContext.getXID()).rollback();
			} catch (TransactionException e1) {
				e1.printStackTrace();
			}
		}
		return "";
	}

}

  • 测试

先看调用端A的执行情况:
在这里插入图片描述
再看服务端B执行情况
在这里插入图片描述
再看调用端B的执行情况:
在这里插入图片描述
执行前的数据库中数据的情况
在这里插入图片描述
再执行一遍
执行后的数据库中数据情况
在这里插入图片描述
截图说明,数据成功回滚: C异常回滚B以及回滚A

结论三: A——>B——>C,当C抛出异常时,可依次回滚! 注意,在调用端需加上@GlobalTransactional,如果对抛出的异常做了捕获处理则需要手动进行回滚!

延伸 关于openfeign调用:服务端返回异常信息,调用端接收异常的说明

前面“测试一”中调用端接收到的异常是 服务端抛出的异常吗??

显示不是的!

服务端返回的是
/ by zero 异常
在这里插入图片描述

调用端接收到的是
HttpMessageConverter 转换异常

feign.codec.DecodeException: Could not extract response: no suitable HttpMessageConverter found for response type [class java.lang.Integer] and content type [text/html;charset=UTF-8]
	at feign.AsyncResponseHandler.decode(AsyncResponseHandler.java:119) ~[feign-core-10.12.jar:?]
	at feign.AsyncResponseHandler.handleResponse(AsyncResponseHandler.java:87) ~[feign-core-10.12.jar:?]
	at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:138) ~[feign-core-10.12.jar:?]
	at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:89) ~[feign-core-10.12.jar:?]
	at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:100) ~[feign-core-10.12.jar:?]
	at com.sun.proxy.$Proxy236.invokeBusi(Unknown Source) ~[?:?]
	at com.omomcam.service.impl.xl.XlServiceImpl.doInsert(XlServiceImpl.java:43) ~[classes/:?]

参考:openfeign客户端A调用服务B,服务B抛出异常时,客户端A接收的几种情况

openfeign调用如何接收到服务端返回的真实的异常信息呢?

参考:
在这里插入图片描述

摘自: openfeign集成sentinel实现服务降级

关于feign.codec.ErrorDecoder的使用及注意

1、实现ErrorDecoder接口

package com.omomcam.config;

import feign.Response;
import feign.Response.Body;
import feign.codec.ErrorDecoder;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.tm.api.GlobalTransactionContext;

public class CustomErrorDecoder implements ErrorDecoder {
	
	private final ErrorDecoder defaultErrorDecoder = new Default();

	@Override
	public Exception decode(String methodKey, Response response) {
        Body body = response.body();
        String bodyStr = body.toString();
		// 在这里进行自定义的异常处理逻辑
        // 例如,你可以根据 HTTP 状态码区分不同的异常类型
        if (response.status() == 404) {
            // 处理 404 错误
//            return new NotFoundException("Not Found");
        }
        
//        try {
//			GlobalTransactionContext.reload(RootContext.getXID()).rollback();
//		} catch (TransactionException e) {
//			// TODO Auto-generated catch block
//			e.printStackTrace();
//		}

        // 如果没有匹配到特定的异常,使用默认的 ErrorDecoder
        return defaultErrorDecoder.decode(methodKey, response);
	}
}

2、将实现类配置进spring环境

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import feign.codec.ErrorDecoder;

@Configuration
public class MyConfig {
 
    @Bean
    public ErrorDecoder errorDecoder() {
        return new CustomErrorDecoder();
    }
 
}

3, 特别注意!!并不是服务器端只要抛出了异常就会触发下面的方法执行,只有response的status不是200的时候才会触发

@Override
	public Exception decode(String methodKey, Response response) {
        Body body = response.body();
        String bodyStr = body.toString();
		// 在这里进行自定义的异常处理逻辑
        // 例如,你可以根据 HTTP 状态码区分不同的异常类型
        if (response.status() == 404) {
            // 处理 404 错误
//            return new NotFoundException("Not Found");
        }
        
//        try {
//			GlobalTransactionContext.reload(RootContext.getXID()).rollback();
//		} catch (TransactionException e) {
//			// TODO Auto-generated catch block
//			e.printStackTrace();
//		}

        // 如果没有匹配到特定的异常,使用默认的 ErrorDecoder
        return defaultErrorDecoder.decode(methodKey, response);
	}

什么情况response的status不是200?

  • openfeign自动返回非200的情况
  • 在程序中手动设置response的status的值
import javax.servlet.http.HttpServletResponse;

public ResponseData<String> invokeBusi(HttpServletResponse response) throws Exception {
		try {
		XlBusiServiceImpl.test();
		} catch (Exception e) {
		  	response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
	       // 或者直接设任意非200类型的值
	        response.setStatus(404);
			ne.setSs("报错了,报错信息:"+e.getMessage());
		}
}

文章来源:https://blog.csdn.net/qq_29025955/article/details/134990525
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。