龙空技术网

Spring cloud gateway 如何在路由时进行负载均衡

极速星空4DO 494

前言:

此时我们对“netty如何做负载均衡”都比较珍视,看官们都需要剖析一些“netty如何做负载均衡”的相关文章。那么小编同时在网摘上汇集了一些有关“netty如何做负载均衡””的相关文章,希望同学们能喜欢,咱们快快来学习一下吧!

1.spring cloud gateway 配置路由

在网关模块的配置文件中配置路由:

spring:  cloud:    gateway:      routes:        - id: user          uri: lb://user-server          predicates:            - Path=/api-web/**    #前端访问需加入例如 http:ip:port/api-web          filters:            - StripPrefix=1   #访问后端服务过滤掉m 必填否则找不到后端服务也可以在服务加上统一路径

其中lb表示采用了负载均衡,user-server表示服务名

当后端有多个服务节点时,网关会以负载均衡的方式将请求发送到后端的各个服务节点上,当某个服务节点关闭以后,后续的请求不会发送到该节点上。这个过程会存在一定的时间延迟,比如30秒左右。

2.查看 GatewayLoadBalancerClientAutoConfiguration 的配置类

这个配置类会加载一个过滤器,使用这个过滤器可以实现负载均衡

@Configuration(    proxyBeanMethods = false)@ConditionalOnClass({LoadBalancerClient.class, RibbonAutoConfiguration.class, DispatcherHandler.class})@AutoConfigureAfter({RibbonAutoConfiguration.class})@EnableConfigurationProperties({LoadBalancerProperties.class})public class GatewayLoadBalancerClientAutoConfiguration {    public GatewayLoadBalancerClientAutoConfiguration() {    }    @Bean    @ConditionalOnBean({LoadBalancerClient.class})    @ConditionalOnMissingBean({LoadBalancerClientFilter.class, ReactiveLoadBalancerClientFilter.class})    public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient client, LoadBalancerProperties properties) {        // 会加载一个负载均衡的 过滤器 :LoadBalancerClientFilter        return new LoadBalancerClientFilter(client, properties);    }}
3.查看 LoadBalancerClientFilter过滤器的实现

查看该过滤器的实现

public class LoadBalancerClientFilter implements GlobalFilter, Ordered {  public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100;    private static final Log log = LogFactory.getLog(LoadBalancerClientFilter.class);    protected final LoadBalancerClient loadBalancer;    private LoadBalancerProperties properties;    public LoadBalancerClientFilter(LoadBalancerClient loadBalancer, LoadBalancerProperties properties) {        this.loadBalancer = loadBalancer;        this.properties = properties;    }    public int getOrder() {        return 10100;    }    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);        String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);        if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {            ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);            if (log.isTraceEnabled()) {                log.trace("LoadBalancerClientFilter url before: " + url);            }            ServiceInstance instance = this.choose(exchange);            if (instance == null) {                throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " + url.getHost());            } else {                URI uri = exchange.getRequest().getURI();                String overrideScheme = instance.isSecure() ? "https" : "http";                if (schemePrefix != null) {                    overrideScheme = url.getScheme();                }                URI requestUrl = this.loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);                if (log.isTraceEnabled()) {                    log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);                }                exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);                return chain.filter(exchange);            }        } else {            return chain.filter(exchange);        }    }    protected ServiceInstance choose(ServerWebExchange exchange) {    // 该loadBalancer 为ribbon 配置的负载均衡器,会根据指定的规则进行负载均衡,默认是轮询                return this.loadBalancer.choose(((URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR)).getHost());    } }

NettyRoutingFilter 过滤器:

public class NettyRoutingFilter implements GlobalFilter, Ordered {    private static final Log log = LogFactory.getLog(NettyRoutingFilter.class);    private final HttpClient httpClient;    private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;    private final HttpClientProperties properties;    private volatile List<HttpHeadersFilter> headersFilters;    public NettyRoutingFilter(HttpClient httpClient, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider, HttpClientProperties properties) {        this.httpClient = httpClient;        this.headersFiltersProvider = headersFiltersProvider;        this.properties = properties;    }    public List<HttpHeadersFilter> getHeadersFilters() {        if (this.headersFilters == null) {            this.headersFilters = (List)this.headersFiltersProvider.getIfAvailable();        }        return this.headersFilters;    }    public int getOrder() {        return 2147483647;    }    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);        String scheme = requestUrl.getScheme();        if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("http".equals(scheme) || "https".equals(scheme))) {            ServerWebExchangeUtils.setAlreadyRouted(exchange);            ServerHttpRequest request = exchange.getRequest();            HttpMethod method = HttpMethod.valueOf(request.getMethodValue());            String url = requestUrl.toASCIIString();            HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange);            DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();            filtered.forEach(httpHeaders::set);            boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);            Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);            Flux<HttpClientResponse> responseFlux = ((RequestSender)this.getHttpClient(route, exchange).headers((headers) -> {                headers.add(httpHeaders);                headers.remove("Host");                if (preserveHost) {                    String host = request.getHeaders().getFirst("Host");                    headers.add("Host", host);                }            }).request(method).uri(url)).send((req, nettyOutbound) -> {                if (log.isTraceEnabled()) {                    nettyOutbound.withConnection((connection) -> {                        log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix());                    });                }                return nettyOutbound.send(request.getBody().map(this::getByteBuf));            }).responseConnection((res, connection) -> {                exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR, res);                exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR, connection);                ServerHttpResponse response = exchange.getResponse();                HttpHeaders headers = new HttpHeaders();                res.responseHeaders().forEach((entry) -> {                    headers.add((String)entry.getKey(), (String)entry.getValue());                });                String contentTypeValue = headers.getFirst("Content-Type");                if (StringUtils.hasLength(contentTypeValue)) {                    exchange.getAttributes().put("original_response_content_type", contentTypeValue);                }                this.setResponseStatus(res, response);                HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(this.getHeadersFilters(), headers, exchange, Type.RESPONSE);                if (!filteredResponseHeaders.containsKey("Transfer-Encoding") && filteredResponseHeaders.containsKey("Content-Length")) {                    response.getHeaders().remove("Transfer-Encoding");                }                exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());                response.getHeaders().putAll(filteredResponseHeaders);                return Mono.just(res);            });            Duration responseTimeout = this.getResponseTimeout(route);            if (responseTimeout != null) {                responseFlux = responseFlux.timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class, (th) -> {                    return new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th);                });            }            return responseFlux.then(chain.filter(exchange));        } else {            return chain.filter(exchange);        }    }    protected ByteBuf getByteBuf(DataBuffer dataBuffer) {        if (dataBuffer instanceof NettyDataBuffer) {            NettyDataBuffer buffer = (NettyDataBuffer)dataBuffer;            return buffer.getNativeBuffer();        } else if (dataBuffer instanceof DefaultDataBuffer) {            DefaultDataBuffer buffer = (DefaultDataBuffer)dataBuffer;            return Unpooled.wrappedBuffer(buffer.getNativeBuffer());        } else {            throw new IllegalArgumentException("Unable to handle DataBuffer of type " + dataBuffer.getClass());        }    }    private void setResponseStatus(HttpClientResponse clientResponse, ServerHttpResponse response) {        HttpStatus status = HttpStatus.resolve(clientResponse.status().code());        if (status != null) {            response.setStatusCode(status);        } else {            while(true) {                if (!(response instanceof ServerHttpResponseDecorator)) {                    if (!(response instanceof AbstractServerHttpResponse)) {                        throw new IllegalStateException("Unable to set status code " + clientResponse.status().code() + " on response of type " + response.getClass().getName());                    }                    ((AbstractServerHttpResponse)response).setStatusCodeValue(clientResponse.status().code());                    break;                }                response = ((ServerHttpResponseDecorator)response).getDelegate();            }        }    }    protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {        Object connectTimeoutAttr = route.getMetadata().get("connect-timeout");        if (connectTimeoutAttr != null) {            Integer connectTimeout = getInteger(connectTimeoutAttr);            return this.httpClient.tcpConfiguration((tcpClient) -> {                return tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);            });        } else {            return this.httpClient;        }    }    static Integer getInteger(Object connectTimeoutAttr) {        Integer connectTimeout;        if (connectTimeoutAttr instanceof Integer) {            connectTimeout = (Integer)connectTimeoutAttr;        } else {            connectTimeout = Integer.parseInt(connectTimeoutAttr.toString());        }        return connectTimeout;    }    private Duration getResponseTimeout(Route route) {        Object responseTimeoutAttr = route.getMetadata().get("response-timeout");        Long responseTimeout = null;        if (responseTimeoutAttr != null) {            if (responseTimeoutAttr instanceof Number) {                responseTimeout = ((Number)responseTimeoutAttr).longValue();            } else {                responseTimeout = Long.valueOf(responseTimeoutAttr.toString());            }        }        return responseTimeout != null ? Duration.ofMillis(responseTimeout) : this.properties.getResponseTimeout();    }}

在NettyRoutingFilter中根据GATEWAY_REQUEST_URL_ATTR属性读取requestUrl,然后进行相应请求。

LoadBalancerClientFilter会作用在url以lb开头的路由,然后利用loadBalancer来获取服务实例,构造目标requestUrl,设置到GATEWAY_REQUEST_URL_ATTR属性中,供NettyRoutingFilter使用。

标签: #netty如何做负载均衡