前言:
当前你们对“apachepost未转发”大体比较看重,同学们都想要学习一些“apachepost未转发”的相关资讯。那么小编同时在网摘上收集了一些有关“apachepost未转发””的相关知识,希望看官们能喜欢,我们一起来了解一下吧!我们的网关使用的是 Spring Cloud Gateway,并且加入了 spring-cloud-sleuth 的依赖,用于链路追踪。并且通过 log4j2 的配置,将链路信息输出到日志中,相关的占位符是:
%X{traceId},%X{spanId}
但是最近发现,日志中链路信息出现丢失的情况,这是怎么回事呢?
Spring Cloud Gateway 的基本流程与实现
首先简单介绍一下 Spring Cloud Gateway 的基本结构,以及 Spring Cloud Sleuth 是如何在其中嵌入链路追踪相关代码的。加入 Spring Cloud Sleuth 以及 Prometheus 相关依赖之后, Spring Cloud Gateway 的处理流程如下所示:
Spring Cloud Gateway 是基于 Spring WebFlux 开发的异步响应式网关,异步响应式代码比较难以理解和阅读,我这里给大家分享一种方法去理解,通过这个流程来理解 Spring Cloud Gateway 的工作流程以及底层原理。其实可以理解为,上图这个流程,就是拼出来一个完整的 Mono(或者 Flux)流,最后 subscribe 执行。
当收到一个请求的时候,会经过 org.springframework.web.server.handler.DefaultWebFilterChain,这是 WebFilter 的调用链,这个链路包括三个 WebFilter:
org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter:添加 Prometheus 相关依赖之后,会有这个 MetricsWebFilter,用于记录请求处理耗时,采集相关指标。org.springframework.cloud.sleuth.instrument.web.TraceWebFilter:添加 Spring Cloud Sleuth 相关依赖之后,会有这个 TraceWebFilter。org.springframework.cloud.gateway.handler.predicate.WeightCalculatorWebFilter:Spring Cloud Gateway 路由权重相关配置功能相关实现类,这个我们这里不关心。
在这个 DefaultWebFilterChain 会形成这样一个 Mono,我们依次将他们标记出来,首先是入口代码 org.springframework.web.server.handler.DefaultWebFilterChain#filter:
public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> // this.currentFilter != null 代表 WebFilter 链还没有结束 // this.chain != null 代表 WebFilter 链不为空 this.currentFilter != null && this.chain != null ? //在 WebFilter 链没有结束的情况下,调用 WebFilter invokeFilter(this.currentFilter, this.chain, exchange) : //在 WebFilter 结束的情况下,调用 handler this.handler.handle(exchange));}
对于我们这里的 WebFilter 链的第一个 MetricsWebFilter,假设启用了对应的采集统计的话,这时候生成的 Mono 就是:
return Mono.defer(() -> chain.filter(exchange).transformDeferred((call) -> { long start = System.nanoTime(); return call //成功时,记录响应时间 .doOnSuccess((done) -> MetricsWebFilter.this.onSuccess(exchange, start)) //失败时,记录响应时间和异常 .doOnError((cause) -> MetricsWebFilter.this.onError(exchange, start, cause)); }););
这里为了方便,我们对代码做了简化,由于我们要将整个链路的所有 Mono 和 Flux 拼接在一起行程完整链路,所以原本是 MetricsWebFilter中的 onSuccess(exchange, start)方法,被改成了 MetricsWebFilter.this.onSuccess(exchange, start) 这种伪代码。
接着,根据DefaultWebFilterChain 的源码分析,chain.filter(exchange) 会继续 WebFilter 链路,到达下一个 WebFilter,即 TraceWebFilter。经过 TraceWebFilter,Mono 就会变成:
return Mono.defer(() -> new MonoWebFilterTrace(source, chain.filter(exchange), TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }););
可以看出,在 TraceWebFilter 中,整个内部 Mono (chain.filter(exchange) 后续的结果)都被封装成了一个 MonoWebFilterTrace,这也是保持链路追踪信息的关键实现。
继续 WebFilter 链路,经过最后一个 WebFilter WeightCalculatorWebFilter; 这个 WebFilter 我们不关心,里面对路由权重做了一些计算操作,我们这里直接忽略即可。这样我们就走完了所有 WebFilter 链路,来到了最后的调用 DefaultWebFilterChain.this.handler,这个 handler 就是 org.springframework.web.reactive.DispatcherHandler。在 DispatcherHandler 中,我们会计算出路由并发送请求到符合条件的 GatewayFilter。经过 DispatcherHandler,Mono 会变成:
return Mono.defer(() -> new MonoWebFilterTrace(source, Flux.fromIterable(DispatcherHandler.this.handlerMappings) //读取所有的 handlerMappings .concatMap(mapping -> mapping.getHandler(exchange)) //按顺序调用所有的 handlerMappings 的 getHandler 方法,如果有对应的 Handler 会返回,否则返回 Mono.empty(); .next() //找到第一个返回不是 Mono.empty() 的 Handler .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404 .flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler .flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果 TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }););
handlerMappings 包括:
org.springframework.boot.actuate.endpoint.web.reactive.WebFluxEndPointHandlerMapping:由于我们项目中添加了 Actuator 相关依赖,所以这里有这个 HandlerMapping。Actuator 相关路径映射,不是我们这里关心的。但是可以看出,Actuator 相关路径优先于 Spring Cloud Gateway 配置路由org.springframework.boot.actuate.endpoint.web.reactive.ControllerEndpointHandlerMapping:由于我们项目中添加了 Actuator 相关依赖,所以这里有这个 HandlerMapping。使用 @ControllerEndpoint 或者 @RestControllerEndpoint 注解标注的 Actuator 相关路径映射,不是我们这里关心的。org.springframework.web.reactive.function.server.support.RouterFunctionMapping:在 Spring-WebFlux 中,你可以定义很多不同的 RouterFunction 来控制路径路由,但这也不是我们这里关心的。但是可以看出,自定义的 RouterFunction 会优先于 Spring Cloud Gateway 配置路由org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping:针对 @RequestMapping 注解的路径的 HandlerMapping,不是我们这里关心的。但是可以看出,如果你在 Spring Cloud Gateway 中指定 RequestMapping 路径,会优先于 Spring Cloud Gateway 配置路由。org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping:这个是 Spring Cloud Gateway 的 HandlerMapping,会读取 Spring Cloud Gateway 配置并生成路由。这个是我们这里要详细分析的。
其实这些 handlerMappings,我们这里肯定走的是 RoutePredicateHandlerMapping 的相关逻辑,所以我们的 Mono 又可以简化成:
return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping.this.getHandler(exchange) .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404 .flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler .flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果 TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }););
我们来看 RoutePredicateHandlerMapping,首先这些 handlerMapping 都是继承了抽象类 org.springframework.web.reactive.handler.AbstractHandlerMapping, 前面我们拼接的 Mono 里面的 getHandler 的实现其实就在这个抽象类中:
public Mono<Object> getHandler(ServerWebExchange exchange) { //调用抽象方法 getHandlerInternal 获取真正的 Handler return getHandlerInternal(exchange).map(handler -> { //这里针对 handler 做一些日志记录 if (logger.isDebugEnabled()) { logger.debug(exchange.getLogPrefix() + "Mapped to " + handler); } // 跨域处理 ServerHttpRequest request = exchange.getRequest(); if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) { CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null); CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange); config = (config != null ? config.combine(handlerConfig) : handlerConfig); if (config != null) { config.validateAllowCredentials(); } if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) { return NO_OP_HANDLER; } } return handler; });}
可以看出,其实核心就是每个实现类的 getHandlerInternal(exchange) 方法,所以在我们拼接的 Mono 中,我们会忽略抽象类中的针对 handler 之后的 map 处理。
return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping.this.getHandlerInternal(exchange) .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404 .flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler .flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果 TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }););
接下来经过 RoutePredicateHandlerMapping 的 getHandlerInternal(exchange) 方法,我们的 Mono 变成了:
return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由 .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到 return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler }).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由 Mono.empty() //返回 Mono.empty() .then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志 if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))) .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404 .flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler .flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果 TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }););
RoutePredicateHandlerMapping.this.lookupRoute(exchange) 根据请求寻找路由,这个我们就不详细展开了,其实就是根据你的 Spring Cloud Gateway 配置,找到合适的路由。接下来我们来看调用对应的 Handler,即 FilteringWebHandler。DispatcherHandler.this.invokeHandler(exchange, handler)我们这里也不详细展开,我们知道其实就是调用 Handler 的 handle 方法,即 FilteringWebHandler 的 handle 方法,所以 我们的 Mono 变成了:
return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由 .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到 return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler }).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由 Mono.empty() .then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志 if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))) .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404 .then(FilteringWebHandler.this.handle(exchange).then(Mono.empty())) //调用对应的 Handler .flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果 TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }););
由于调用对应的 Handler,最后返回的是 Mono.empty(),所以后面的 flatMap 其实不会执行了。所以我们可以将最后的处理结果这一步去掉。所以我们的 Mono 就变成了:
return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由 .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到 return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler }).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由 Mono.empty() .then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志 if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))) .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404 .then(FilteringWebHandler.this.handle(exchange).then(Mono.empty()))), //调用对应的 Handler TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }););
FilteringWebHandler.this.handle(exchange) 其实就是从 Attributes 中取出路由,从路由中取出对应的 GatewayFilters,与全局 GatewayFilters 放到同一个 List 中,并按照这些 GatewayFilter 的顺序排序(可以通过实现 org.springframework.core.Ordered 接口来制定顺序),然后生成 DefaultGatewayFilterChain 即 GatewayFilter 链路。对应的源码是:
public Mono<Void> handle(ServerWebExchange exchange) { //从 Attributes 中取出路由,从路由中取出对应的 GatewayFilters Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); List<GatewayFilter> gatewayFilters = route.getFilters(); //与全局 GatewayFilters 放到同一个 List 中 List<GatewayFilter> combined = new ArrayList<>(this.globalFilters); combined.addAll(gatewayFilters); //按照这些 GatewayFilter 的顺序排序(可以通过实现 `org.springframework.core.Ordered` 接口来制定顺序) AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: " + combined); } //生成调用链 return new DefaultGatewayFilterChain(combined).filter(exchange);}
这个 GatewayFilter 调用链和 WebFilter 调用链类似,参考 DefaultGatewayFilterChain 的源码:
public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { //如果链路没有结束,则继续链路 if (this.index < filters.size()) { GatewayFilter filter = filters.get(this.index); //这里将 index + 1,也就是调用链路中的下一个 GatewayFilter DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1); //每个 filter 中如果想要继续链路,则会调用 chain.filter(exchange),这也是我们开发 GatewayFilter 的时候的使用方式 return filter.filter(exchange, chain); } else { //到达末尾,链路结束 return Mono.empty(); // complete } });}
所以,经过 DefaultGatewayFilterChain 后,我们的 Mono 就会变成:
return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由 .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到 return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler }).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由 Mono.empty() .then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志 if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))) .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404 .then(new DefaultGatewayFilterChain(combined).filter(exchange).then(Mono.empty()))), //调用对应的 Handler TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }););
再继续展开 DefaultGatewayFilterChain 的链路调用,可以得到:
return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由 .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到 return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler }).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由 Mono.empty() .then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志 if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))) .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404 .then( Mono.defer(() -> { //如果链路没有结束,则继续链路 if (DefaultGatewayFilterChain.this.index < DefaultGatewayFilterChain.this.filters.size()) { GatewayFilter filter = DefaultGatewayFilterChain.this.filters.get(DefaultGatewayFilterChain.this.index); //这里将 index + 1,也就是调用链路中的下一个 GatewayFilter DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(DefaultGatewayFilterChain.this, DefaultGatewayFilterChain.this.index + 1); //每个 filter 中如果想要继续链路,则会调用 chain.filter(exchange),这也是我们开发 GatewayFilter 的时候的使用方式 return filter.filter(exchange, chain); } else { return Mono.empty(); //链路完成 } }) .then(Mono.empty())) ), //调用对应的 Handler TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }););
这样,就形成了 Spring Cloud Gateway 针对路由请求的完整 Mono 调用链。
Spring Cloud Sleuth 是如何增加链路信息
通过之前的源码分析,我们知道,在最开始的 TraceWebFilter,我们将 Mono 封装成了一个 MonoWebFilterTrace,它的核心源码是:
@Overridepublic void subscribe(CoreSubscriber<? super Void> subscriber) { Context context = contextWithoutInitialSpan(subscriber.currentContext()); Span span = findOrCreateSpan(context); //将 Span 放入执行上下文中,对于日志其实就是将链路信息放入 org.slf4j.MDC //日志的 MDC 一般都是 ThreadLocal 的 Map,对于 Log4j2 的实现类就是 org.apache.logging.log4j.ThreadContext,其核心 contextMap 就是一个基于 ThreadLocal 实现的 Map //简单理解就是将链路信息放入一个 ThreadLocal 的 Map 中,每个线程访问自己的 Map 获取链路信息 try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) { //将实际的 subscribe 用 Span 所在的 Context 包裹住,结束时关闭 Span this.source.subscribe(new WebFilterTraceSubscriber(subscriber, context, span, this)); } //在 scope.close() 之后,会将链路信息从 ThreadLocal 的 Map 中剔除}@Overridepublic Object scanUnsafe(Attr key) { if (key == Attr.RUN_STYLE) { //执行的方式必须是不能切换线程,也就是同步的 //因为,日志的链路信息是放在 ThreadLocal 对象中,切换线程,链路信息就没了 return Attr.RunStyle.SYNC; } return super.scanUnsafe(key);}
WebFilterTraceSubscriber 干了些什么呢?出现异常,以及 http 请求结束的时候,我们可能想将响应信息,异常信息记录进入 Span 中,就是通过这个类封装实现的。
经过 MonoWebFilterTrace 的封装,由于 Spring-WebFlux 处理请求,其实就是封装成我们上面得出的 Mono 之后进行 subscribe 处理的请求,所以这样,整个内部 Mono 的 publish 链路以及 subscribe 链路,就被 WebFilterTraceSubscriber 中的 scope 包裹起来了。只要我们自己不在 GatewayFilter 中转换成某些强制异步的 Mono 或者 Flux 导致切换线程,链路信息是不会丢失的。
我们应用中丢失链路信息的地方
通过查看日志我们发现,启用 RequestBody 缓存的地方,都有链路缺失。这个 RequestBody 缓存我们使用的是 Spring Cloud Gateway 中的 AdaptCachedBodyGlobalFilter,其核心源码是:
private static <T> Mono<T> cacheRequestBody(ServerWebExchange exchange, boolean cacheDecoratedRequest, Function<ServerHttpRequest, Mono<T>> function) { ServerHttpResponse response = exchange.getResponse(); NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); return //读取 Body,由于 TCP 拆包,所以需要他们拼接到一起 DataBufferUtils.join(exchange.getRequest().getBody()) //如果没有 Body,则直接返回空 DataBuffer .defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator()))) //decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body //之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路 .map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest)) .switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);}
为何会使用这个 AdaptCachedBodyGlobalFilter 呢?获取请求 Body 是通过 exchange.getRequest().getBody() 获取的,其结果是一个 Flux<DataBuffer>.请求的 Body 是一次性的,如果你需要请求重试的话,在第一次调用失败的之后,第二次重试的时候,Body 就读取不到了,因为 Flux 已经结束。所以,对于需要重复调用,例如重试,一对多路由转发的情况,需要将请求 Body 缓存起来,就是经过这个 GatewayFilter。但是经过这个 GatewayFilter 之后,链路信息就没了,可以通过以下这个简单项目进行复现(项目地址):
引入依赖:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.6</version></parent><dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!--log4j2异步日志需要的依赖,所有项目都必须用log4j2和异步日志配置--> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>${disruptor.version}</version> </dependency></dependencies><dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2020.0.3</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies></dependencyManagement>
对所有路径开启 AdaptCachedBodyGlobalFilter:
@Configuration(proxyBeanMethods = false)public class ApiGatewayConfiguration { @Autowired private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter; @Autowired private GatewayProperties gatewayProperties; @PostConstruct public void init() { gatewayProperties.getRoutes().forEach(routeDefinition -> { //对 spring cloud gateway 路由配置中的每个路由都启用 AdaptCachedBodyGlobalFilter EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition.getId()); adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent); }); }}
配置(我们只有一个路由,将请求转发到 httpbin.org 这个 http 请求测试网站):
server: port: 8181spring: application: name: apiGateway cloud: gateway: httpclient: connect-timeout: 500 response-timeout: 60000 routes: - id: first_route uri: predicates: - Path=/httpbin/** filters: - StripPrefix=1
添加两个全局 Filter,一个在 AdaptCachedBodyGlobalFilter 之前,一个在 AdaptCachedBodyGlobalFilter 之后。这两个 Filter 非常简单,只是打一行日志。
@Log4j2@Componentpublic class PreLogFilter implements GlobalFilter, Ordered { public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("before AdaptCachedBodyGlobalFilter"); return chain.filter(exchange); } @Override public int getOrder() { return ORDER; }}@Log4j2@Componentpublic class PostLogFilter implements GlobalFilter, Ordered { public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("after AdaptCachedBodyGlobalFilter"); return chain.filter(exchange); } @Override public int getOrder() { return ORDER; }}
最后指定 Log4j2 的输出格式中包含链路信息,就像系列文章开头中指定的那样。
启动这个应用,之后访问 ,查看日志,发现 PostLogFilter 中的日志,没有链路信息了:
2021-09-08 06:32:35.457 INFO [service-apiGateway,51063d6f1fe264d0,51063d6f1fe264d0] [30600] [reactor-http-nio-2][?:]: before AdaptCachedBodyGlobalFilter2021-09-08 06:32:35.474 INFO [service-apiGateway,,] [30600] [reactor-http-nio-2][?:]: after AdaptCachedBodyGlobalFilter为何链路信息会丢失
我们来看经过 AdaptCachedBodyGlobalFilter 之后,我们前面拼的 Mono 链路会变成什么样:
return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由 .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到 return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler }).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由 Mono.empty() .then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志 if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))) .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404 .then( Mono.defer(() -> { //省略在 AdaptCachedBodyGlobalFilter 前面的链路嵌套 //读取 Body,由于 TCP 拆包,所以需要他们拼接到一起 DataBufferUtils.join(exchange.getRequest().getBody()) //如果没有 Body,则直接返回空 DataBuffer .defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator()))) //decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body //之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路 .map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest)) .switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function); }) .then(Mono.empty())) ), //调用对应的 Handler TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> { //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略 }););
其中 DataBufferUtils.join(exchange.getRequest().getBody()) 其实是一个 FluxReceive,这里我们可以理解为:提交一个尝试读取请求 Body 的任务,将之后的 GatewayFilter 的链路处理加到在读取完 Body 之后的回调当中,提交这个任务后,立刻返回。这么看可能比较复杂,我们用一个类似的例子类比下:
//首先我们创建一个新的 SpanSpan span = tracer.newTrace();//声明一个类似于 TraceWebFilter 中封装的 MonoWebFilterTrace 的 MonoOperatorclass MonoWebFilterTrace<T> extends MonoOperator<T, T> { protected MonoWebFilterTrace(Mono<? extends T> source) { super(source); } @Override public void subscribe(CoreSubscriber<? super T> actual) { //将 subscribe 用 span 包裹 try (Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span)) { source.subscribe(actual); //在将要关闭 spanInScope 的时候(即从 ThreadLocal 的 Map 中移除链路信息),打印日志 log.info("stopped"); } }}Mono.defer(() -> new MonoWebFilterTrace( Mono.fromRunnable(() -> { log.info("first"); }) //模拟 FluxReceive .then(Mono.delay(Duration.ofSeconds(1)) .doOnSuccess(longSignal -> log.info(longSignal))))).subscribe(aLong -> log.info(aLong));
Mono.delay 和 FluxReceive 表现类似,都是异步切换线程池执行。执行上面的代码,我们可以从日志上面就能看出来:
2021-09-08 07:12:45.236 INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: first2021-09-08 07:12:45.240 INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: stopped2021-09-08 07:12:46.241 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: doOnEach_onNext(0)2021-09-08 07:12:46.242 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: onComplete()2021-09-08 07:12:46.242 INFO [service-apiGateway,,] [31868] [parallel-1][?:]: 0
在 Spring Cloud Gateway 中,Request Body 的 FluxReceive 使用的线程池和调用 GatewayFilter 的是同一个线程池,所以可能线程还是同一个,但是由于 Span 已经结束,从 ThreadLocal 的 Map 中已经移除了链路信息,所以日志中还是没有链路信息。
Spring Cloud Gateway 其他的可能丢失链路信息的点
经过前面的分析,我们可以看出,不止这里,还有其他地方会导致 Spring Cloud Sleuth 的链路追踪信息消失,这里举几个大家常见的例子:
1.在 GatewayFilter 中指定了异步执行某些任务,由于线程切换了,并且这时候可能 Span 已经结束了,所以没有链路信息,例如:
@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange).publishOn(Schedulers.parallel()).doOnSuccess(o -> { //这里就没有链路信息了 log.info("success"); });}
2.将 GatewayFilter 中继续链路的 chain.filter(exchange) 放到了异步任务中执行,上面的 AdaptCachedBodyGlobalFilter 就属于这种情况,这样会导致之后的 GatewayFilter 都没有链路信息,例如:
@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return Mono.delay(Duration.ofSeconds(1)).then(chain.filter(exchange));}Java 并发编程模型与 Project Reactor 编程模型的冲突思考
Java 中的很多框架,都用到了 ThreadLocal,或者通过 Thread 来标识唯一性。例如:
日志框架中的 MDC,一般都是 ThreadLocal 实现。所有的锁、基于 AQS 的数据结构,都是通过 Thread 的属性来唯一标识谁获取到了锁的。分布式锁等数据结构,也是通过 Thread 的属性来唯一标识谁获取到了锁的,例如 Redisson 中分布式 Redis 锁的实现。
但是放到 Project Reactor 编程模型,这就显得格格不入了,因为 Project Reactor 异步响应式编程就是不固定线程,没法保证提交任务和回调能在同一个线程,所以 ThreadLocal 的语义在这里很难成立。Project Reactor 虽然提供了对标 ThreadLocal 的 Context,但是主流框架还没有兼容这个 Context,所以给 Spring Cloud Sleuth 粘合这些链路追踪带来了很大困难,因为 MDC 是一个 ThreadLocal 的 Map 实现,而不是基于 Context 的 Map。这就需要 Spring Cloud Sleuth 在订阅一开始,就需要将链路信息放入 MDC,同时还需要保证运行时不切换线程。
运行不切换线程,这样其实限制了 Project Reactor 的灵活调度,是有一些性能损失的。我们其实想尽量就算加入了链路追踪信息,也不用强制运行不切换线程。但是 Spring Cloud Sleuth 是非侵入式设计,很难实现这一点。但是对于我们自己业务的使用,我们可以定制一些编程规范,来保证大家写的代码不丢失链路信息。
改进我们的编程规范
首先,我们自定义 Mono 和 Flux 的工厂
公共 Subscriber 封装,将 reactor Subscriber 的所有关键接口,都检查当前上下文是否有链路信息,即 Span,如果没有就包裹上,如果有则直接执行即可。
public class TracedCoreSubscriber<T> implements Subscriber<T>{ private final Subscriber<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void onSubscribe(Subscription s) { executeWithinScope(() -> { delegate.onSubscribe(s); }); } @Override public void onError(Throwable t) { executeWithinScope(() -> { delegate.onError(t); }); } @Override public void onComplete() { executeWithinScope(() -> { delegate.onComplete(); }); } @Override public void onNext(T o) { executeWithinScope(() -> { delegate.onNext(o); }); } private void executeWithinScope(Runnable runnable) { //如果当前没有链路信息,强制包裹 if (tracer.currentSpan() == null) { try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) { runnable.run(); } } else { //如果当前已有链路信息,则直接执行 runnable.run(); } }}
之后分别定义所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其实就是在 subscribe 的时候,用 TracedCoreSubscriber 包装传入的 CoreSubscriber:
public class TracedFlux<T> extends Flux<T> { private final Flux<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void subscribe(CoreSubscriber<? super T> actual) { delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span)); }}public class TracedMono<T> extends Mono<T> { private final Mono<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void subscribe(CoreSubscriber<? super T> actual) { delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span)); }}
定义工厂类,使用请求 ServerWebExchange 和原始 Flux 创建 TracedFlux,以及使用请求 ServerWebExchange 和原始 Mono 创建 TracedMono,并且 Span 是通过 Attributes 获取的,根据前文的源码分析我们知道,这个 Attribute 是通过 TraceWebFilter 放入 Attributes 的。由于我们只在 GatewayFilter 中使用,一定在 TraceWebFilter 之后 所以这个 Attribute 一定存在。
@Componentpublic class TracedPublisherFactory { protected static final String TRACE_REQUEST_ATTR = Span.class.getName(); @Autowired private Tracer tracer; @Autowired private CurrentTraceContext currentTraceContext; public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) { return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR)); } public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) { return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR)); }}
然后,我们规定:1. 所有的 GatewayFilter,需要继承我们自定义的抽象类,这个抽象类仅仅是把 filter 的结果用 TracedPublisherFactory 的 getTracedMono 给封装了一层 TracedMono,以 GlobalFilter 为例子:
public abstract class AbstractTracedFilter implements GlobalFilter { @Autowired protected TracedPublisherFactory tracedPublisherFactory; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return tracedPublisherFactory.getTracedMono(traced(exchange, chain), exchange); } protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);}
2. GatewayFilter 中新生成的 Flux 或者 Mono,统一使用 TracedPublisherFactory 再封装一层。
3. 对于 AdaptCachedBodyGlobalFilter 读取 Request Body 导致的链路丢失,我向社区提了一个 Pull Request: fix #2004 Span is not terminated properly in Spring Cloud Gateway,大家可以参考。也可以在这个 Filter 之前自己将 Request Body 使用 TracedPublisherFactory 进行封装解决。
标签: #apachepost未转发