分布式微服务架构日志调用链路跟踪-traceId
分布式微服务架构日志调用链路跟踪-traceId
在ELK日志集成平台里(日志的写入,收集,跟踪,搜索,分析)
背景知识
在xxx(博主之前的公司),每个前端请求里面,都会在request的header区携带一个traceId 随机数值,用来跟踪在后端的调用链路栈打印.通过ES收集的日志数据,在ELK日志集成平台里,用traceId就能得到用户请求的完整链路,排查问题的效率非常高.那么具体是怎么设计的?
同样的,每个请求的用户会话信息该怎么传递?
traceId,UserId,facilityId,tenantId.
遇到的场景
有几个难题需要思考: 1.客户端调用方法中,遇到了rpc该怎么传递traceId? 2.遇到了feign接口该怎么传递traceId? 3.遇到了mq该怎么传递tranceId? 4.遇到了线程池里的新线程该怎么传递tranceId? 下面针对各个场景进行逐一的分析xxx的实现方案:
在gateway中对sid进行校验和初步存储 gateway-GlobalFilter-SIDCheckedFilter.java gateway的原理:一系列的router和filter集合.常见:
- IP白名单和黑名单控制
- SID检测
- 请求签名检测,防止恶意构建请求
- 用户会话检测,防止非系统用户请求
private static String getSID(ServerHttpRequest request) { String sid = request.getHeaders().getFirst(Constants.HEADER_SID); // 将sid放到日志的变量中 MDC.put(Constants.HEADER_SID,sid); return sid; } @Override public Mono<Void> execute(ServerHttpRequest request, ServerWebExchange exchange, GatewayFilterChain chain) { String sid = getSID(request); if(logger.isInfoEnabled()) { String url = StringUtils.defaultIfBlank(request.getPath().value(), ""); String ip = IpUtils.getIp(request); MDC.put(MDC_SID,sid); MDC.put(MDC_URI,url); MDC.put(MDC_IP,ip); MDC.put(MDC_USER,null); logger.info("网关请求.."); } if (validSid) { if (StringUtils.isBlank(sid)) { logger.info("{} SID为空", this.getRequestUri(request)); // 构造返回错误信息 ApiResponse<String> responseMap = ApiResponse.failResp(RequestCode.SID_ISNULL); return returnError(exchange, HttpStatus.BAD_REQUEST, responseMap); } // 解决高并发状态的多次重复请求问题 String redisSidKey = GatewayRedisKeyConstants.GATEWAY_SID_KEY_PREFIX + sid; boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(redisSidKey, GATEWAY_SID_KEY_DEF_VAL, GATEWAY_SID_KEY_EXPIRE_TIME, TimeUnit.MINUTES); if (!isSuccess) { logger.info("SID已存在,重复请求:{}", sid); ApiResponse<String> responseMap = ApiResponse.failResp(RequestCode.SID_REPEATED_REQUESTS); return returnError(exchange, HttpStatus.BAD_REQUEST, responseMap); } } exchange = afterValidHandler(exchange);//验证过后处理器 return chain.filter(exchange); } /** * 验证过后处理器 * * @param exchange * @return */ @Override public ServerWebExchange afterValidHandler(ServerWebExchange exchange) { ServerHttpRequest request = exchange.getRequest(); String sid = request.getHeaders().getFirst(Constants.HEADER_SID); //不要验证SID的情况下,如果SI为空,默认生成一个SID,供下游系统使用 if (StringUtils.isBlank(sid)) { //生成sid String gatewaySid = "gateway_" + DateTimeUtils.format(LocalDateTime.now(), DateTimeUtils.FORMAT_1) + RandomUtil.randomNumbers(6); //构造新的ServerHttpRequest ServerHttpRequest.Builder builder = request.mutate() //往header中添加网关生成的SID .header(com.ess.framework.commons.constant.Constants.HEADER_SID, gatewaySid); // 将sid放到日志的变量中 MDC.put(Constants.HEADER_SID,gatewaySid); exchange = exchange.mutate().request(builder.build()).build(); } return exchange; }
定义个framework-boot的依赖包
这个依赖包内使用interceptor拦截器对web请求做拦截,拿到traceId,通过threadLocal对象放入一个Context对象中.每个请求对应一个ThreadLocal对象. 每个服务的全局拦截器@RefreshScope public abstract class BootWebConfigurer implements WebMvcConfigurer { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private RequestMappingHandlerAdapter requestMappingHandlerAdapter; @Bean @RefreshScope public GatewayInterceptor getGatewayInterceptor() { return new GatewayInterceptor(); } @Bean @RefreshScope public SIDInterceptor getSIDInterceptor() { return new SIDInterceptor(); } @Bean @RefreshScope public InnerInterceptor getInnerInterceptor() { return new InnerInterceptor(); } @Bean public TokenInterceptor getTokenInterceptor() { return new TokenInterceptor(); } @Bean public LogMdcInterceptor getLogMdcInterceptor() { return new LogMdcInterceptor(); } @Bean public UnionInterceptor getUnionInterceptor() { return new UnionInterceptor(); } @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(getSIDInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE); registry.addInterceptor(getGatewayInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE); registry.addInterceptor(getInnerInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE); registry.addInterceptor(getTokenInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE); registry.addInterceptor(getLogMdcInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE); // 添加联盟拦截器 registry.addInterceptor(getUnionInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE); } /** * 初始化SIDReturnValueHandler 对所有的响应返回SID字段 */ @PostConstruct public void initSidHandler() { final List<HandlerMethodReturnValueHandler> originalHandlers = new ArrayList<>(requestMappingHandlerAdapter.getReturnValueHandlers()); final int deferredPos = obtainValueHandlerPosition(originalHandlers, DeferredResultMethodReturnValueHandler.class); SIDReturnValueHandler decorator = null; for (HandlerMethodReturnValueHandler handler : originalHandlers) { if (handler instanceof RequestResponseBodyMethodProcessor) { decorator = new SIDReturnValueHandler((RequestResponseBodyMethodProcessor) handler); break; } } originalHandlers.add(deferredPos + 1, decorator); requestMappingHandlerAdapter.setReturnValueHandlers(originalHandlers); } private int obtainValueHandlerPosition(final List<HandlerMethodReturnValueHandler> originalHandlers, Class<?> handlerClass) { for (int i = 0; i < originalHandlers.size(); i++) { final HandlerMethodReturnValueHandler valueHandler = originalHandlers.get(i); if (handlerClass.isAssignableFrom(valueHandler.getClass())) { return i; } } return -1; } @Override public void addResourceHandlers(ResourceHandlerRegistry registry) { registry.addResourceHandler("/"); registry.addResourceHandler("/favicon.ico").addResourceLocations("classpath:/favicon.ico"); registry.addResourceHandler("app.html").addResourceLocations("classpath:/"); registry.addResourceHandler("/error"); // Swagger2配置 registry.addResourceHandler("/v2/**"); registry.addResourceHandler("/swagger-resources/**"); registry.addResourceHandler("/csrf"); registry.addResourceHandler("/webjars/**").addResourceLocations("classpath:/META-INF/resources/webjars/"); registry.addResourceHandler("/swagger-ui.html").addResourceLocations("classpath:/META-INF/resources/"); registry.addResourceHandler("/doc.html").addResourceLocations("classpath:/META-INF/resources/"); } /** * 自定义扩展消息转换器 * * @param converters */ @Override public void extendMessageConverters(List<HttpMessageConverter<?>> converters) { //定义fastjson转换消息对象 FastJsonHttpMessageConverter fastJsonConverter = new FastJsonHttpMessageConverter(); //添加fastjson全局配置 FastJsonConfig fastJsonConfig = new FastJsonConfig(); fastJsonConfig.setSerializerFeatures( // 排序配置 SerializerFeature.SortField, SerializerFeature.MapSortField, // 避免对象重复引用 SerializerFeature.DisableCircularReferenceDetect, // 格式化输出 SerializerFeature.PrettyFormat ); fastJsonConfig.setCharset(Charset.forName("UTF-8")); fastJsonConfig.setDateFormat("yyyy-MM-dd HH:mm:ss"); //默认json会对属性里的json字符串值进行排序,加了这个Feature.OrderedField则会禁止排序 fastJsonConfig.setFeatures(Feature.OrderedField); fastJsonConverter.setFastJsonConfig(fastJsonConfig); //添加支持的MediaType类型 fastJsonConverter.setSupportedMediaTypes(Lists.newArrayList(MediaType.APPLICATION_JSON)); converters.set(0, fastJsonConverter); //BigDecimal格式化 SerializeConfig serializeConfig = SerializeConfig.getGlobalInstance(); serializeConfig.put(BigDecimal.class, BigDecimalConfigure.instance); fastJsonConfig.setSerializeConfig(serializeConfig); //字节数组消息转换器(供文件下载使用) converters.add(new ByteArrayHttpMessageConverter()); Map<String, HttpMessageConverter<?>> convertersMap = new LinkedHashMap<>(); //转换器去重,并设置所有转换器的默认编码 for (HttpMessageConverter converter : converters) { String name = converter.getClass().getSimpleName(); if (converter instanceof StringHttpMessageConverter) { //设置StringHttpMessageConverter的默认编码为:UTF-8 ((StringHttpMessageConverter) converter).setDefaultCharset(Charset.forName("UTF-8")); } if (!convertersMap.containsKey(name)) { convertersMap.put(name, converter); } } converters.clear(); converters.addAll(Lists.newArrayList(convertersMap.values())); } @Bean public Converter<String, LocalDate> localDateConverter() { return new Converter<String, LocalDate>() { @Override public LocalDate convert(String source) { return LocalDate.parse(source, Constants.DATE_FORMATTER); } }; } @Bean public Converter<String, LocalDateTime> localDateTimeConverter() { return new Converter<String, LocalDateTime>() { @Override public LocalDateTime convert(String source) { return LocalDateTime.parse(source, Constants.DATE_TIME_FORMATTER); } }; } /** * Java常用时间类型序列化和反序列化 * LocalDateTime、LocalDate、LocalTime、java.util.Date、java.sql.Date、Calendar、Timestamp * 解决通过Feign调用时,客户端是以上类型会报错问题。 */ @Bean public ObjectMapper objectMapper() { ObjectMapper objectMapper = new ObjectMapper(); JavaTimeModule javaTimeModule = new JavaTimeModule(); //定义序列化日期时间类型转换器 javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeUtils.DEFAULT_FORMATTER)); javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeUtils.YEAR_MONTH_DAY_FORMATTER)); javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeUtils.HOUR_MINUTE_SECOND_FORMATTER)); javaTimeModule.addSerializer(Date.class, new DateSerializer(true, new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT))); javaTimeModule.addSerializer(java.sql.Date.class, new SqlDateSerializer().withFormat(true, new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT))); javaTimeModule.addSerializer(Calendar.class, new CalendarSerializer(true, new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT))); //定义反序列化日期时间类型转换器 javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeUtils.DEFAULT_FORMATTER)); javaTimeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeUtils.YEAR_MONTH_DAY_FORMATTER)); javaTimeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeUtils.HOUR_MINUTE_SECOND_FORMATTER)); javaTimeModule.addDeserializer(Date.class, new DateDeserializers.DateDeserializer(DateDeserializers.DateDeserializer.instance, new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT), DateTimeUtils.DEFAULT_FORMAT)); javaTimeModule.addDeserializer(java.sql.Date.class, new DateDeserializers.SqlDateDeserializer(new DateDeserializers.SqlDateDeserializer(), new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT), DateTimeUtils.DEFAULT_FORMAT)); javaTimeModule.addDeserializer(Calendar.class, new DateDeserializers.CalendarDeserializer(new DateDeserializers.CalendarDeserializer(), new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT), DateTimeUtils.DEFAULT_FORMAT)); javaTimeModule.addDeserializer(Timestamp.class, new DateDeserializers.TimestampDeserializer(new DateDeserializers.TimestampDeserializer(), new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT), DateTimeUtils.DEFAULT_FORMAT)); objectMapper.registerModule(javaTimeModule); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); return objectMapper; } }
sid拦截器
package com.ess.framework.boot.interceptor; import com.ess.framework.commons.constant.Constants; import com.ess.framework.commons.response.RequestCode; import com.ess.framework.commons.utils.EssContextHolder; import com.ess.framework.commons.utils.ExceptionUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.web.servlet.HandlerInterceptor; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * SID 拦截器 ,验证SID是否为空 */ @RefreshScope public class SIDInterceptor implements HandlerInterceptor { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); /** * 是否验证SID */ @Value("${valid.sid:false}") private boolean validSid = false; @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String sid = request.getHeader(Constants.HEADER_SID); // 验证sid是否为空 if (validSid && StringUtils.isBlank(sid)) { logger.warn("{} {}", request.getRequestURI(), RequestCode.SID_ISNULL.message()); ExceptionUtils.throwBusiness(RequestCode.SID_ISNULL); } // 设置SID到线程变量 这是传输的关键 if (StringUtils.isNotBlank(sid)) { EssContextHolder.setSID(sid); } else { EssContextHolder.setSID(null); } return true; } }
关于feign接口的传递tranceId
package com.ess.framework.boot.interceptor; import com.ess.framework.commons.constant.Constants; import com.ess.framework.commons.utils.EssContextHolder; import com.ess.framework.commons.utils.IpUtils; import feign.RequestInterceptor; import feign.RequestTemplate; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.slf4j.MDC; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; /** * @ClassName FeignRequestInterceptor * @Description feign拦截器 * @Date 2021/5/27 21:56 * @Version */ @Slf4j public class FeignRequestInterceptor implements RequestInterceptor { private final static String HEADER_ESS_FEIGN_IP = "ess-feign-ip"; private final static String HEADER_IP = "ip"; @Override public void apply(RequestTemplate requestTemplate) { try { String ip = null; String sid = null; String token = null; String unionId = null; /* ExecuteTaskUtils 线程池处理方案**/ String threadName = Thread.currentThread().getName(); if (StringUtils.startsWith(threadName, "ess-task-pool") || StringUtils.startsWith(threadName, "ess-async-pool")) { sid = MDC.get(Constants.HEADER_SID); ip = MDC.get(HEADER_IP); unionId = EssContextHolder.getUnionId(); token = EssContextHolder.getToken(); } else { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); if (attributes == null) { return; } HttpServletRequest request = attributes.getRequest(); token = request.getHeader(Constants.HEADER_TOKEN); sid = request.getHeader(Constants.HEADER_SID); ip = IpUtils.getIPAddress(request); unionId = request.getHeader(Constants.HEADER_UNIONID); } requestTemplate.header(Constants.HEADER_TOKEN, token); requestTemplate.header(Constants.HEADER_SID, sid); requestTemplate.header(Constants.HEADER_UNIONID, unionId); requestTemplate.header(HEADER_ESS_FEIGN_IP, ip); // feign调用都设置网关标识设置为true,防止被拦截 requestTemplate.header(Constants.PASS_GATEWAY, "true"); } catch (Exception e) { log.error("拦截器异常", e); } } }
由此可见,feign请求的本质就是requestTemplate,而原理上,就是把sid重新塞到request的header里面.
线程池sid的传递
那么怎么传递到另外的线程里呢? ==答案就是threadLocal作为管道存储对象.package com.ess.framework.commons.utils; /** * 线程变量上下文 */ public class EssContextHolder { private EssContextHolder() { } /** * sid */ private final static ThreadLocal<String> SID = new ThreadLocal<>(); /** * token */ private final static ThreadLocal<String> TOKEN = new ThreadLocal<>(); /** * 联盟code */ private final static ThreadLocal<String> UNION_CODE = new ThreadLocal<>(); /** * 联盟unionId */ private final static ThreadLocal<String> UNION_ID = new ThreadLocal<>(); /** * 设置SID * * @param sid */ public static void setSID(String sid) { EssContextHolder.SID.set(sid); } /** * 获取SID */ public static String getSID() { return EssContextHolder.SID.get(); } /** * 设置TOKEN * * @param token */ public static void setToken(String token) { EssContextHolder.TOKEN.set(token); } /** * 获取TOKEN */ public static String getToken() { return EssContextHolder.TOKEN.get(); } /** * 设置unionCode */ public static void setUnionCode(String unionCode) { EssContextHolder.UNION_CODE.set(unionCode); } /** * 获取unionCode */ public static String getUnionCode() { return EssContextHolder.UNION_CODE.get(); } /** * 设置unionId */ public static void setUnionId(String unionId) { EssContextHolder.UNION_ID.set(unionId); } /** * 获取联盟unionId */ public static String getUnionId() { return EssContextHolder.UNION_ID.get(); } }
重载线程池实现细节,这样springboot在使用自定义线程池时,就会初始化个性化的实现细节,把sid等会话状态传递到线程的ThreadLocal里.
package com.ess.framework.boot.asynctask; import com.ess.framework.commons.utils.EssContextHolder; import org.slf4j.MDC; import java.util.concurrent.*; /** * @ClassName ttt * @Description TODO * @Author shengfq * @Date 2021/5/28 0028 上午 10:53 * @Version */ public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor { public ThreadPoolExecutorMdcWrapper(AsyncTaskThreadPoolConfig config,ThreadFactory threadFactory,RejectedExecutionHandler handler ) { this(config.getCorePoolSize(), config.getMaxPoolSize(), config.getKeepAliveSecond(), TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(config.getQueueCapacity()),threadFactory,handler); } public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override public void execute(Runnable task) { String sid = EssContextHolder.getSID(); String token = EssContextHolder.getToken(); String unionId = EssContextHolder.getUnionId(); super.execute(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap())); } @Override public <T> Future<T> submit(Runnable task, T result) { String sid = EssContextHolder.getSID(); String token = EssContextHolder.getToken(); String unionId = EssContextHolder.getUnionId(); return super.submit(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap()), result); } @Override public <T> Future<T> submit(Callable<T> task) { String sid = EssContextHolder.getSID(); String token = EssContextHolder.getToken(); String unionId = EssContextHolder.getUnionId(); return super.submit(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap())); } @Override public Future<?> submit(Runnable task) { String sid = EssContextHolder.getSID(); String token = EssContextHolder.getToken(); String unionId = EssContextHolder.getUnionId(); return super.submit(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap())); } }
mq消息的sid传递
如何传递给mq的消费者呢?实际上有了EssContextHolder对象,在Thread里获取sid很方便,如何通过mq传递sid,可以把sid放入message的header区,在消费端取出来,存入消费端线程的ThreadLocal对象中,就能起到传递的作用.
消息发送消息接收
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.function.Consumer; import org.hzero.core.exception.OptimisticLockException; import org.hzero.core.message.MessageAccessor; import org.springframework.amqp.core.Message; import com.rabbitmq.client.Channel; import com..mom.me.common.constants.MeBaseConstants; import com..mom.me.common.mq.MessageHandler; import cn.hutool.core.util.ObjectUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import io.choerodon.core.exception.CommonException; import io.choerodon.core.oauth.CustomUserDetails; import io.choerodon.core.oauth.DetailsHelper; import lombok.extern.slf4j.Slf4j; /** * <p> * 消息处理类 * </p> */ @Slf4j public abstract class AbstractMessageHandlers implements MessageHandler { private AbstractMessageHandlers() {} public static void handleMessage(Message msg, Map<String, Object> headers, Channel channel, Consumer<String> fn) { String consumerQueue = msg.getMessageProperties().getConsumerQueue(); try { log.trace("消息队列[{}]处理, headers={}", consumerQueue, headers); fn.accept(new String(msg.getBody(), StandardCharsets.UTF_8)); } catch (CommonException e) { log.error("消息队列[{}], 消息处理失败业务异常->{}", consumerQueue, MessageAccessor.getMessage(e.getCode(), e.getParameters(), e.getMessage()).desc()); } catch (IllegalArgumentException e) { log.error("消息队列[{}], 消息处理失败业务异常: {}", consumerQueue, e.getMessage()); } catch (OptimisticLockException e) { // 如果乐观锁异常 则触发重试 log.warn("消息队列[{}], 数据处理乐观锁异常触发重试", consumerQueue); throw e; } catch (Exception e) { log.error("消息队列[{}], 消息处理失败系统异常", consumerQueue, e); } } /** * 携带用户会话信息 -> 消息头获取用户会话信息 * * 经AbstractMessageSender 发送MQ消息会在消息头附加用户会话信息 * * JSON示例如下 * * @param msg * @param headers * @param channel * @param fn */ public static void handleWithUser(Message msg, Map<String, Object> headers, Channel channel, Consumer<String> fn) { try { // 生产执行发出的mq消息头带有用户会话信息 切勿用于其他系统、模块的mq消息处理 Object userDetails = headers.get(Constants.HEADER_SID); if (ObjectUtil.isNull(userDetails)) { throw new CommonException("MQ消息头获取用户会话信息失败!!!"); } JSONObject jsonObject = JSONUtil.parseObj(userDetails); log.trace("设置用户会话信息,传入JSONObject信息: {}", jsonObject); CustomUserDetails details = new CustomUserDetails(jsonObject.getStr("username"), MeBaseConstants.DEFAULT); details.setTenantId(jsonObject.getLong("tenantId")); details.setUserId(jsonObject.getLong("userId")); details.setOrganizationId(jsonObject.getLong("organizationId")); details.setRealName(jsonObject.getStr("realName")); details.setLanguage(jsonObject.getStr("language")); details.setAdditionInfo(jsonObject.getJSONObject("additionInfo")); DetailsHelper.setCustomUserDetails(details); // 设置用户会话异常 } catch (Exception e) { log.error("消息队列[{}], 消息处理失败", msg.getMessageProperties().getConsumerQueue(), e); return; } handleMessage(msg, headers, channel, fn); } public static void handleWithNack(Message msg, Map<String, Object> headers, Channel channel, Consumer<String> fn) { String consumerQueue = msg.getMessageProperties().getConsumerQueue(); try { log.trace("消息队列[{}]处理, headers={}", consumerQueue, headers); fn.accept(new String(msg.getBody(), StandardCharsets.UTF_8)); ack(msg, channel); } catch (CommonException e) { log.error("消息队列[{}], 消息处理失败业务异常->{}", consumerQueue, MessageAccessor.getMessage(e.getCode(), e.getParameters(), e.getMessage()).desc()); nack(msg, channel); } catch (IllegalArgumentException e) { log.error("消息队列[{}], 消息处理失败业务异常: {}", consumerQueue, e.getMessage()); nack(msg, channel); } catch (Exception e) { log.error("消息队列[{}], 消息处理失败系统异常:{}", consumerQueue, e); nack(msg, channel); } } public static void handleWithUserThrowEx(Message msg, Map<String, Object> headers, Channel channel, Consumer<String> fn) { String consumerQueue = msg.getMessageProperties().getConsumerQueue(); try { // 生产执行发出的mq消息头带有用户会话信息 切勿用于其他系统、模块的mq消息处理 Object userDetails = headers.get(MeBaseConstants.USER_DETAILS_KEY); if (ObjectUtil.isNull(userDetails)) { throw new CommonException("MQ消息头获取用户会话信息失败!!!"); } JSONObject jsonObject = JSONUtil.parseObj(userDetails); log.trace("设置用户会话信息,传入JSONObject信息: {}", jsonObject); CustomUserDetails details = new CustomUserDetails(jsonObject.getStr("username"), MeBaseConstants.DEFAULT); details.setTenantId(jsonObject.getLong("tenantId")); details.setUserId(jsonObject.getLong("userId")); details.setOrganizationId(jsonObject.getLong("organizationId")); details.setRealName(jsonObject.getStr("realName")); details.setLanguage(jsonObject.getStr("language")); details.setAdditionInfo(jsonObject.getJSONObject("additionInfo")); DetailsHelper.setCustomUserDetails(details); log.trace("消息队列[{}]处理, headers={}", consumerQueue, headers); fn.accept(new String(msg.getBody(), StandardCharsets.UTF_8)); } catch (CommonException e) { log.error("消息队列[{}], 消息处理失败业务异常->{}, 详细信息:", consumerQueue, MessageAccessor.getMessage(e.getCode(), e.getParameters(), e.getMessage()).desc(), e); throw e; } catch (Exception e) { log.error("消息队列[{}], 消息处理失败系统异常:{}", consumerQueue, e); throw e; } } private static void ack(Message msg, Channel channel) { try { long deliverTag = msg.getMessageProperties().getDeliveryTag(); channel.basicAck(deliverTag, false); } catch (IOException ioe) { log.error("消息处理ack异常{}", ioe.getMessage(), ioe); } } private static void nack(Message msg, Channel channel) { try { long deliverTag = msg.getMessageProperties().getDeliveryTag(); channel.basicNack(deliverTag, false, true); } catch (IOException ioe) { log.error("消息处理nack异常{}", ioe.getMessage(), ioe); } } }
总结
综上所述,sid的设置,拦截,传递,存储都有对应的机制.那么在日志系统中的存储则是通过org.slf4j.MDC;这个对象来实现的.
这个对象的实现机制不在本文中展开细节,后续日志的入库,检索,都是通过slf4j这个日志框架和ELK日志系统进行.本文着重讲解了在应用服务中一个跟踪请求的链路id是如何传递,存储的.
属于抛砖引玉的内容,内容较为主观,希望能对自己和其他开发者在分布式日志调用链路上有点启发.谢谢
参考
SpringBoot + MDC 实现全链路调用日志跟踪
微服务-网关Spring Gateway进阶-日志跟踪唯一ID
日志全链路追踪之MDC
秒杀 : 做一个完善的全链路日志实现方案有多简单
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!