跨线程保持上下文
在大型微服务架构中管理跨服务的上下文共享是一项挑战性任务。本文介绍了使用Java和Webflux进行此操作的标准方式。构建一个大型的、准备就绪的无状态微服务架构时,我们总是面临一个共同的挑战,即如何保持跨服务和线程的请求上下文,包括将上下文传播到子线程中。
什么是上下文传播??
上下文传播意味着在分布式系统的不同组件或服务中传递上下文信息或状态,这些应用程序通常由运行在不同机器或容器上的多个服务组成。这些服务需要通信和协作以完成用户请求或执行业务流程。
在这样的分布式系统中,上下文传播变得至关重要,以确保在不同服务之间传递特定事务或操作的相关信息。这种上下文可能包括:
-
用户认证细节
-
请求标识符?
-
分布式追踪信息?
-
其他元数据(帮助理解请求的状态和起源)?
上下文传播的关键方面包括:
-
请求上下文:当用户发起请求时,它通常会触发跨多个服务的一系列交互。需要传播初始请求的上下文,包括用户身份、请求时间戳和唯一标识符等相关信息,以确保一致的行为和跟踪。?
-
分布式追踪和日志记录:上下文传播与分布式追踪和日志记录机制密切相关。通过传播上下文信息,可以更容易地追踪请求通过各种服务的流程,有助于调试、性能分析和监控。
-
一致性:在服务之间保持一致的上下文对于确保每个服务在处理请求时都有必要的信息来正确执行其任务至关重要。这有助于避免不一致性,确保分布式系统的协调行为。?
-
中间件和框架支持:许多中间件和框架为上下文传播提供内置支持。例如,在微服务架构中,像Spring Cloud、Istio或Zipkin这样的框架提供了管理和无缝传播上下文的工具。?
-
无状态性:在无状态架构中,上下文传播尤为重要,每个服务应该独立运行,不依赖共享状态。上下文有助于在不需要存储持久状态的情况下,为服务提供处理请求所需的必要信息。?
有效的上下文传播有助于提高分布式系统的整体可靠性、可观察性和可维护性,提供了在不同服务中移动的事务状态的统一视图。它还有助于减少代码。
使用案例?
假设你正在构建一个基于Springboot Webflux的微服务/应用程序,你需要确保用户的状态(会话标识符、请求标识符、登录状态等)和客户端(设备类型、客户端IP等)在源请求中传递的状态应在服务之间传递。
挑战?
-
服务到服务调用:对于内部服务到服务的调用,上下文传播不会自动发生。?
-
在类内传播上下文:要在服务和/或帮助类中引用上下文,你需要通过方法参数显式传递它。这可以通过创建一个带有静态方法的类来处理,该方法将上下文存储在ThreadLocal对象中。
-
Java 流操作:由于 Java 流函数在单独的执行器线程中运行,因此需要显式地通过ThreadLocal 向子线程传播上下文。
-
Webflux:与Java Stream 函数类似,Webflux ?中的上下文传播需要通过reactor Hooks来处理。
这里的想法是如何确保上下文传播在子线程中自动发生,并使用反应式 Web 客户端传播到内部调用的服务。对于非反应式代码也可以实现类似的模式。
解决方案
Core Java 提供了两个类:ThreadLocal 和 InheritableThreadLocal,来存储线程范围的值。
-
ThreadLocal允许创建线程本地变量,确保每个线程都有自己的变量副本。
-
一个限制 ThreadLocal是,如果在另一个线程的范围内生成一个新线程,则子线程不会ThreadLocal从其父线程继承变量的值。
public class ExampleThreadLocal {
private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
public static void main(String[] args) {
threadLocal.set("Main Thread Value");
new Thread(() -> {
System.out.println("Child Thread: " + threadLocal.get()); // Outputs: Child Thread: null
}).start();
System.out.println("Main Thread: " + threadLocal.get()); // Outputs: Main Thread: Main Thread Value
}
}
另一方面,InheritableThreadLocal扩展ThreadLocal并为子线程提供从其父线程继承值的能力。
public class ExampleInheritableThreadLocal {
private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
public static void main(String[] args) {
inheritableThreadLocal.set("Main Thread Value");
new Thread(() -> {
System.out.println("Child Thread: " + inheritableThreadLocal.get()); // Outputs: Child Thread: Main Thread Value
}).start();
System.out.println("Main Thread: " + inheritableThreadLocal.get()); // Outputs: Main Thread: Main Thread Value
}
}
因此,在需要确保上下文必须在父线程和子线程之间传播的情况下,我们可以使用应用程序范围的静态InheritableThreadLocal变量来保存上下文并在需要时获取它。
@Getter
@ToString
@Builder
public class RequestContext {
private String sessionId;
private String correlationId;
private String userStatus;
private String channel;
}
public class ContextAdapter {
final ThreadLocal<RequestContext> threadLocal = new InheritableThreadLocal<>();
public RequestContext getCurrentContext() {
return threadLocal.get();
}
public void setContext(tRequestContext requestContext) {
threadLocal.set(requestContext);
}
public void clear() {
threadLocal.remove();
}
}
public final class Context {
static ContextAdapter contextAdapter;
private Context() {}
static {
contextAdapter = new ContextAdapter();
}
public static void clear() {
if (contextAdapter == null) {
throw new IllegalStateException();
}
contextAdapter.clear();
}
public static RequestContext getContext() {
if (contextAdapter == null) {
throw new IllegalStateException();
}
return contextAdapter.getCurrentContext();
}
public static void setContext(RequestContext requestContext) {
if (cContextAdapter == null) {
throw new IllegalStateException();
}
contextAdapter.setContext(requestContext);
}
public static ContextAdapter getContextAdapter() {
return contextAdapter;
}
}
然后,我们可以通过在代码中需要的地方调用静态方法来引用上下文。
Context.getContext()
这解决了:
-
在类内传播上下文
-
Java流操作
-
网络通量
为了确保上下文通过 webclient 自动传播到外部调用,我们可以创建一个自定义来从Context.getContext() ExchangeFilterFunction读取上下文,然后根据需要将上下文添加到标头或查询参数中。
public class HeaderExchange implements ExchangeFilterFunction {
@Override
public Mono<ClientResponse> filter(
ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
return Mono.deferContextual(Mono::just)
.flatMap(
context -> {
RequestContext currentContext = Context.getContext();
ClientRequest newRequest = ClientRequest.from(clientRequest)
.headers(httpHeaders ->{
httpHeaders.add("context-session-id",currentContext.getSessionId() );
httpHeaders.add("context-correlation-id",currentContext.getCorrelationId() );
}).build();
return exchangeFunction.exchange(newRequest);
});
}
}
将上下文初始化为 WebFilter 的一部分。
@Slf4j
@Component
public class RequestContextFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String sessionId = exchange.getRequest().getHeaders().getFirst("context-session-id");
String correlationId = exchange.getRequest().getHeaders().getFirst("context-correlation-id");
RequestContext requestContext = RequestContext.builder().sessionId(sessionId).correlationId(correlationId).build()
Context.setContext(requestContext);
return chain.filter(exchange);
}
}
作者:Ajay Joshi
更多技术干货请关注公号【云原生数据库】
squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。
irds.cn,多数据库管理平台(私有云)。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!