Java在SpringCloud中自定义Gateway负载均衡策略
Java在SpringCloud中自定义Gateway负载均衡策略
一、前言
spring-cloud-starter-netflix-ribbon已经不再更新了,最新版本是2.2.10.RELEASE,最后更新时间是2021年11月18日,详细信息可以看maven官方仓库:org.springframework.cloud/spring-cloud-starter-netflix-ribbon,SpringCloud官方推荐使用spring-cloud-starter-loadbalancer进行负载均衡。
背景:大文件上传做切片文件上传;
流程:将切片文件上传到服务器,然后进行合并任务,合并完成之后上传到对象存储;现在服务搞成多节点以后,网关默认走轮循,但是相同的服务在不同的机器上,这样就会导致切片文件散落在不同的服务器上,会导致文件合并失败;所以根据一个标识去自定义gateway对应服务的负载均衡策略,可以解决这个问题;
我的版本如下:
<spring-boot.version>2.7.3</spring-boot.version>
? ? ? ? <spring-cloud.version>2021.0.4</spring-cloud.version>
? ? ? ? <spring-cloud-alibaba.version>2021.0.4.0</spring-cloud-alibaba.version>
二、参考默认实现
springCloud原生默认的负载均衡策略是这个类:
org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer
我们参考这个类实现自己的负载均衡策略即可,RoundRobinLoadBalancer实现了ReactorServiceInstanceLoadBalancer这个接口,实现了choose这个方法,如下图:
在choose方法中调用了processInstanceResponse方法,processInstanceResponse方法中调用了getInstanceResponse方法,所以我们我们可以复制RoundRobinLoadBalancer整个类,只修改getInstanceResponse这个方法里的内容就可以实现自定义负载均衡策略。
三、实现代码
原理:根据请求头当中设备的唯一标识传递到下游,唯一标识做哈希取余,可以指定对应的服务器节点,需要的服务设置自定义负载策略,不需要的服务设置默认的轮循机制即可
package com.wondertek.gateway.loadBalancer;
import cn.hutool.core.util.ObjectUtil;
import com.wondertek.web.exception.enums.HttpRequestHeaderEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Slf4j
@Component
public class RequestFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
// 应该小于LoadBalancerClientFilter的顺序值
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String clientDeviceUniqueCode = request.getHeaders().getFirst(HttpRequestHeaderEnum.CLIENT_DEVICE_UNIQUE_CODE.getCode());
// 存入Reactor上下文
String resultCode = clientDeviceUniqueCode;
return chain.filter(exchange)
.contextWrite(context -> {
if (ObjectUtil.isNotEmpty(resultCode)) {
log.info("开始将request中的唯一标识封装到上下游中:{}", resultCode);
return context.put("identification", resultCode);
} else {
// 或者根据需求进行其他处理
return context;
}
});
}
}
package com.wondertek.gateway.loadBalancer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
@Slf4j
public class ClientDeviceUniqueCodeInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final String serviceId;
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public ClientDeviceUniqueCodeInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
//在 choose 方法中,使用 deferContextual 方法来访问上下文并提取客户端标识。这里的 getOrDefault 方法尝试从上下文中获取一个键为 "identification" 的值,如果不存在则返回 "default-identification"
return Mono.deferContextual(contextView -> {
String identification = contextView.getOrDefault("identification", "14d58a1ba286f087d9736249ec785314");
log.info("上下游获取到的identification的值为:{}", identification);
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, identification));
});
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances, String identification) {
Response<ServiceInstance> serviceInstanceResponse;
if (Objects.isNull(identification)) {
serviceInstanceResponse = this.getInstanceResponse(serviceInstances, null);
} else {
serviceInstanceResponse = this.getIpInstanceResponse(serviceInstances, identification);
}
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance((ServiceInstance) serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, String identification) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + this.serviceId);
}
return new EmptyResponse();
} else {
int index = ThreadLocalRandom.current().nextInt(instances.size());
ServiceInstance instance = (ServiceInstance) instances.get(index);
return new DefaultResponse(instance);
}
}
private Response<ServiceInstance> getIpInstanceResponse(List<ServiceInstance> instances, String identification) {
if (instances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
} else if (instances.size() == 1) {
log.info("只有一个服务实例,直接返回这个实例");
return new DefaultResponse(instances.get(0));
} else {
//创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题
List<ServiceInstance> sortedInstances = new ArrayList<>(instances);
// 现在对新列表进行排序,保持原始列表的顺序不变
Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));
//log.info("获取到的实例个数的值为:{}", sortedInstances.size());
sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));
//log.info("多个服务实例,使用客户端 identification 地址的哈希值来选择服务实例");
// 使用排序后的列表来找到实例
int ipHashCode = Math.abs(identification.hashCode());
//log.info("identificationHashCode的值为:{}", ipHashCode);
int instanceIndex = ipHashCode % sortedInstances.size();
//log.info("instanceIndex的值为:{}", instanceIndex);
ServiceInstance instanceToReturn = sortedInstances.get(instanceIndex);
//log.info("instanceToReturn.getUri()的值为:{}", instanceToReturn.getUri());
log.info("自定义identification负载机制,Client identification: {} is routed to instance: {}:{}", identification, instanceToReturn.getHost(), instanceToReturn.getPort());
return new DefaultResponse(instanceToReturn);
}
}
}
package com.wondertek.gateway.loadBalancer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class DefaultInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final String serviceId;
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
final AtomicInteger position;
public DefaultInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, AtomicInteger position) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.position = position;
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
//创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题
List<ServiceInstance> sortedInstances = new ArrayList<>(instances);
// 现在对新列表进行排序,保持原始列表的顺序不变
Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));
//log.info("获取到的实例个数的值为:{}", sortedInstances.size());
sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));
int pos = Math.abs(this.position.incrementAndGet());
//log.info("默认轮循机制,pos递加后的值为:{}", pos);
int positionIndex = pos % instances.size();
//log.info("取余后的positionIndex的值为:{}", positionIndex);
ServiceInstance instance = instances.get(positionIndex);
//log.info("instance.getUri()的值为:{}", instance.getUri());
log.info("默认轮循机制,routed to instance: {}:{}",instance.getHost(), instance.getPort());
return new DefaultResponse(instance);
}
}
package com.wondertek.gateway.loadBalancer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.util.concurrent.atomic.AtomicInteger;
@Configuration
//单台服务
//@LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class)
//多台服务
@LoadBalancerClients({
@LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class),
@LoadBalancerClient(name = "unity-api", configuration = CustomLoadBalancerConfig.class),
@LoadBalancerClient(name = "cloud-api", configuration = CustomLoadBalancerConfig.class),
@LoadBalancerClient(name = "open-api", configuration = CustomLoadBalancerConfig.class),
@LoadBalancerClient(name = "server-api", configuration = CustomLoadBalancerConfig.class),
@LoadBalancerClient(name = "center-service", configuration = CustomLoadBalancerConfig.class),
})
@Slf4j
public class CustomLoadBalancerConfig {
// 定义一个Bean来提供AtomicInteger的实例
@Bean
public AtomicInteger positionTracker() {
// 这将在应用上下文中只初始化一次
return new AtomicInteger(0);
}
//自定义优先级负载均衡器
@Bean
public ReactorServiceInstanceLoadBalancer customPriorityLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
Environment environment,AtomicInteger positionTracker) {
String serviceId = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
//目的为解决文件上传切片文件分散上传的问题
if ("oms-api".equals(serviceId)||"unity-api".equals(serviceId)||"cloud-api".equals(serviceId)){
//log.info("服务名称:serviceId:{},走自定义clientDeviceUniqueCode负载模式", serviceId);
return new ClientDeviceUniqueCodeInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId);
}
//log.info("服务名称:serviceId:{},走默认负载模式", serviceId);
return new DefaultInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId,positionTracker);
}
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!