龙空技术网

Spring Cloud Gateway 丢失了链路信息,怎么办

hashcon 55

前言:

当前你们对“apachepost未转发”大体比较看重,同学们都想要学习一些“apachepost未转发”的相关资讯。那么小编同时在网摘上收集了一些有关“apachepost未转发””的相关知识,希望看官们能喜欢,我们一起来了解一下吧!

我们的网关使用的是 Spring Cloud Gateway,并且加入了 spring-cloud-sleuth 的依赖,用于链路追踪。并且通过 log4j2 的配置,将链路信息输出到日志中,相关的占位符是:

%X{traceId},%X{spanId}

但是最近发现,日志中链路信息出现丢失的情况,这是怎么回事呢?

Spring Cloud Gateway 的基本流程与实现

首先简单介绍一下 Spring Cloud Gateway 的基本结构,以及 Spring Cloud Sleuth 是如何在其中嵌入链路追踪相关代码的。加入 Spring Cloud Sleuth 以及 Prometheus 相关依赖之后, Spring Cloud Gateway 的处理流程如下所示:

Spring Cloud Gateway 是基于 Spring WebFlux 开发的异步响应式网关,异步响应式代码比较难以理解和阅读,我这里给大家分享一种方法去理解,通过这个流程来理解 Spring Cloud Gateway 的工作流程以及底层原理。其实可以理解为,上图这个流程,就是拼出来一个完整的 Mono(或者 Flux)流,最后 subscribe 执行。

当收到一个请求的时候,会经过 org.springframework.web.server.handler.DefaultWebFilterChain,这是 WebFilter 的调用链,这个链路包括三个 WebFilter:

org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter:添加 Prometheus 相关依赖之后,会有这个 MetricsWebFilter,用于记录请求处理耗时,采集相关指标。org.springframework.cloud.sleuth.instrument.web.TraceWebFilter:添加 Spring Cloud Sleuth 相关依赖之后,会有这个 TraceWebFilter。org.springframework.cloud.gateway.handler.predicate.WeightCalculatorWebFilter:Spring Cloud Gateway 路由权重相关配置功能相关实现类,这个我们这里不关心。

在这个 DefaultWebFilterChain 会形成这样一个 Mono,我们依次将他们标记出来,首先是入口代码 org.springframework.web.server.handler.DefaultWebFilterChain#filter:

public Mono<Void> filter(ServerWebExchange exchange) {	return Mono.defer(() ->	        // this.currentFilter != null 代表 WebFilter 链还没有结束	        // this.chain != null 代表 WebFilter 链不为空			this.currentFilter != null && this.chain != null ?					//在 WebFilter 链没有结束的情况下,调用 WebFilter					invokeFilter(this.currentFilter, this.chain, exchange) :					//在 WebFilter 结束的情况下,调用 handler 					this.handler.handle(exchange));}

对于我们这里的 WebFilter 链的第一个 MetricsWebFilter,假设启用了对应的采集统计的话,这时候生成的 Mono 就是:

return Mono.defer(() ->	chain.filter(exchange).transformDeferred((call) -> {		long start = System.nanoTime();		return call				//成功时,记录响应时间				.doOnSuccess((done) -> MetricsWebFilter.this.onSuccess(exchange, start))				//失败时,记录响应时间和异常				.doOnError((cause) -> MetricsWebFilter.this.onError(exchange, start, cause));	}););

这里为了方便,我们对代码做了简化,由于我们要将整个链路的所有 Mono 和 Flux 拼接在一起行程完整链路,所以原本是 MetricsWebFilter中的 onSuccess(exchange, start)方法,被改成了 MetricsWebFilter.this.onSuccess(exchange, start) 这种伪代码。

接着,根据DefaultWebFilterChain 的源码分析,chain.filter(exchange) 会继续 WebFilter 链路,到达下一个 WebFilter,即 TraceWebFilter。经过 TraceWebFilter,Mono 就会变成:

return Mono.defer(() ->	new MonoWebFilterTrace(source, chain.filter(exchange), TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {		//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略	}););

可以看出,在 TraceWebFilter 中,整个内部 Mono (chain.filter(exchange) 后续的结果)都被封装成了一个 MonoWebFilterTrace,这也是保持链路追踪信息的关键实现。

继续 WebFilter 链路,经过最后一个 WebFilter WeightCalculatorWebFilter; 这个 WebFilter 我们不关心,里面对路由权重做了一些计算操作,我们这里直接忽略即可。这样我们就走完了所有 WebFilter 链路,来到了最后的调用 DefaultWebFilterChain.this.handler,这个 handler 就是 org.springframework.web.reactive.DispatcherHandler。在 DispatcherHandler 中,我们会计算出路由并发送请求到符合条件的 GatewayFilter。经过 DispatcherHandler,Mono 会变成:

return Mono.defer(() ->	new MonoWebFilterTrace(source, 		Flux.fromIterable(DispatcherHandler.this.handlerMappings) //读取所有的 handlerMappings			.concatMap(mapping -> mapping.getHandler(exchange)) //按顺序调用所有的 handlerMappings 的 getHandler 方法,如果有对应的 Handler 会返回,否则返回 Mono.empty();			.next() //找到第一个返回不是 Mono.empty() 的 Handler			.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404			.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler			.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果	TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {		//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略	}););

handlerMappings 包括:

org.springframework.boot.actuate.endpoint.web.reactive.WebFluxEndPointHandlerMapping:由于我们项目中添加了 Actuator 相关依赖,所以这里有这个 HandlerMapping。Actuator 相关路径映射,不是我们这里关心的。但是可以看出,Actuator 相关路径优先于 Spring Cloud Gateway 配置路由org.springframework.boot.actuate.endpoint.web.reactive.ControllerEndpointHandlerMapping:由于我们项目中添加了 Actuator 相关依赖,所以这里有这个 HandlerMapping。使用 @ControllerEndpoint 或者 @RestControllerEndpoint 注解标注的 Actuator 相关路径映射,不是我们这里关心的。org.springframework.web.reactive.function.server.support.RouterFunctionMapping:在 Spring-WebFlux 中,你可以定义很多不同的 RouterFunction 来控制路径路由,但这也不是我们这里关心的。但是可以看出,自定义的 RouterFunction 会优先于 Spring Cloud Gateway 配置路由org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping:针对 @RequestMapping 注解的路径的 HandlerMapping,不是我们这里关心的。但是可以看出,如果你在 Spring Cloud Gateway 中指定 RequestMapping 路径,会优先于 Spring Cloud Gateway 配置路由org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping:这个是 Spring Cloud Gateway 的 HandlerMapping,会读取 Spring Cloud Gateway 配置并生成路由。这个是我们这里要详细分析的。

其实这些 handlerMappings,我们这里肯定走的是 RoutePredicateHandlerMapping 的相关逻辑,所以我们的 Mono 又可以简化成:

 return Mono.defer(() ->	new MonoWebFilterTrace(source, 		RoutePredicateHandlerMapping.this.getHandler(exchange)			.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404			.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler			.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果	TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {		//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略	}););

我们来看 RoutePredicateHandlerMapping,首先这些 handlerMapping 都是继承了抽象类 org.springframework.web.reactive.handler.AbstractHandlerMapping, 前面我们拼接的 Mono 里面的 getHandler 的实现其实就在这个抽象类中:

 public Mono<Object> getHandler(ServerWebExchange exchange) {	//调用抽象方法 getHandlerInternal 获取真正的 Handler	return getHandlerInternal(exchange).map(handler -> {		//这里针对 handler 做一些日志记录		if (logger.isDebugEnabled()) {			logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);		}		// 跨域处理		ServerHttpRequest request = exchange.getRequest();		if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {			CorsConfiguration config = (this.corsConfigurationSource != null ?					this.corsConfigurationSource.getCorsConfiguration(exchange) : null);			CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);			config = (config != null ? config.combine(handlerConfig) : handlerConfig);			if (config != null) {				config.validateAllowCredentials();			}			if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {				return NO_OP_HANDLER;			}		}		return handler;	});}

可以看出,其实核心就是每个实现类的 getHandlerInternal(exchange) 方法,所以在我们拼接的 Mono 中,我们会忽略抽象类中的针对 handler 之后的 map 处理。

return Mono.defer(() ->	new MonoWebFilterTrace(source, 		RoutePredicateHandlerMapping.this.getHandlerInternal(exchange)			.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404			.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler			.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果	TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {		//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略	}););

接下来经过 RoutePredicateHandlerMapping 的 getHandlerInternal(exchange) 方法,我们的 Mono 变成了:

return Mono.defer(() ->	new MonoWebFilterTrace(source, 		RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由				.flatMap((Function<Route, Mono<?>>) r -> {					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到					return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler				}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由					Mono.empty() //返回 Mono.empty()					.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志						if (logger.isTraceEnabled()) {							logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");					}				})))			.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404			.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler			.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果	TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {		//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略	}););

RoutePredicateHandlerMapping.this.lookupRoute(exchange) 根据请求寻找路由,这个我们就不详细展开了,其实就是根据你的 Spring Cloud Gateway 配置,找到合适的路由。接下来我们来看调用对应的 Handler,即 FilteringWebHandler。DispatcherHandler.this.invokeHandler(exchange, handler)我们这里也不详细展开,我们知道其实就是调用 Handler 的 handle 方法,即 FilteringWebHandler 的 handle 方法,所以 我们的 Mono 变成了:

return Mono.defer(() ->	new MonoWebFilterTrace(source, 		RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由				.flatMap((Function<Route, Mono<?>>) r -> {					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到					return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler				}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由					Mono.empty() 					.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志						if (logger.isTraceEnabled()) {							logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");					}				})))			.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404			.then(FilteringWebHandler.this.handle(exchange).then(Mono.empty())) //调用对应的 Handler			.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果	TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {		//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略	}););

由于调用对应的 Handler,最后返回的是 Mono.empty(),所以后面的 flatMap 其实不会执行了。所以我们可以将最后的处理结果这一步去掉。所以我们的 Mono 就变成了:

return Mono.defer(() ->	new MonoWebFilterTrace(source, 		RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由				.flatMap((Function<Route, Mono<?>>) r -> {					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到					return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler				}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由					Mono.empty() 					.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志						if (logger.isTraceEnabled()) {							logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");					}				})))			.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404			.then(FilteringWebHandler.this.handle(exchange).then(Mono.empty()))), //调用对应的 Handler	TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {		//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略	}););

FilteringWebHandler.this.handle(exchange) 其实就是从 Attributes 中取出路由,从路由中取出对应的 GatewayFilters,与全局 GatewayFilters 放到同一个 List 中,并按照这些 GatewayFilter 的顺序排序(可以通过实现 org.springframework.core.Ordered 接口来制定顺序),然后生成 DefaultGatewayFilterChain 即 GatewayFilter 链路。对应的源码是:

public Mono<Void> handle(ServerWebExchange exchange) {	//从 Attributes 中取出路由,从路由中取出对应的 GatewayFilters	Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);	List<GatewayFilter> gatewayFilters = route.getFilters();	//与全局 GatewayFilters 放到同一个 List 中	List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);	combined.addAll(gatewayFilters);	//按照这些 GatewayFilter 的顺序排序(可以通过实现 `org.springframework.core.Ordered` 接口来制定顺序)	AnnotationAwareOrderComparator.sort(combined);		if (logger.isDebugEnabled()) {		logger.debug("Sorted gatewayFilterFactories: " + combined);	}	//生成调用链	return new DefaultGatewayFilterChain(combined).filter(exchange);}

这个 GatewayFilter 调用链和 WebFilter 调用链类似,参考 DefaultGatewayFilterChain 的源码:

public Mono<Void> filter(ServerWebExchange exchange) {	return Mono.defer(() -> {		//如果链路没有结束,则继续链路		if (this.index < filters.size()) {			GatewayFilter filter = filters.get(this.index);			//这里将 index + 1,也就是调用链路中的下一个 GatewayFilter			DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);			//每个 filter 中如果想要继续链路,则会调用 chain.filter(exchange),这也是我们开发 GatewayFilter 的时候的使用方式			return filter.filter(exchange, chain);		}		else {			//到达末尾,链路结束			return Mono.empty(); // complete		}	});}

所以,经过 DefaultGatewayFilterChain 后,我们的 Mono 就会变成:

return Mono.defer(() ->	new MonoWebFilterTrace(source, 		RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由				.flatMap((Function<Route, Mono<?>>) r -> {					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到					return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler				}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由					Mono.empty() 					.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志						if (logger.isTraceEnabled()) {							logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");					}				})))			.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404			.then(new DefaultGatewayFilterChain(combined).filter(exchange).then(Mono.empty()))), //调用对应的 Handler	TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {		//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略	}););

再继续展开 DefaultGatewayFilterChain 的链路调用,可以得到:

return Mono.defer(() ->	new MonoWebFilterTrace(source, 		RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由				.flatMap((Function<Route, Mono<?>>) r -> {					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到					return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler				}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由					Mono.empty() 					.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志						if (logger.isTraceEnabled()) {							logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");					}				})))			.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404			.then(				Mono.defer(() -> {					//如果链路没有结束,则继续链路					if (DefaultGatewayFilterChain.this.index < DefaultGatewayFilterChain.this.filters.size()) {						GatewayFilter filter = DefaultGatewayFilterChain.this.filters.get(DefaultGatewayFilterChain.this.index);						//这里将 index + 1,也就是调用链路中的下一个 GatewayFilter						DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(DefaultGatewayFilterChain.this, DefaultGatewayFilterChain.this.index + 1);						//每个 filter 中如果想要继续链路,则会调用 chain.filter(exchange),这也是我们开发 GatewayFilter 的时候的使用方式						return filter.filter(exchange, chain);					}					else {						return Mono.empty(); //链路完成					}				})				.then(Mono.empty()))			), //调用对应的 Handler	TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {		//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略	}););

这样,就形成了 Spring Cloud Gateway 针对路由请求的完整 Mono 调用链。

Spring Cloud Sleuth 是如何增加链路信息

通过之前的源码分析,我们知道,在最开始的 TraceWebFilter,我们将 Mono 封装成了一个 MonoWebFilterTrace,它的核心源码是:

@Overridepublic void subscribe(CoreSubscriber<? super Void> subscriber) {	Context context = contextWithoutInitialSpan(subscriber.currentContext());	Span span = findOrCreateSpan(context);	//将 Span 放入执行上下文中,对于日志其实就是将链路信息放入 org.slf4j.MDC	//日志的 MDC 一般都是 ThreadLocal 的 Map,对于 Log4j2 的实现类就是 org.apache.logging.log4j.ThreadContext,其核心 contextMap 就是一个基于 ThreadLocal 实现的 Map	//简单理解就是将链路信息放入一个 ThreadLocal 的 Map 中,每个线程访问自己的 Map 获取链路信息	try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) {		//将实际的 subscribe 用 Span 所在的 Context 包裹住,结束时关闭 Span		this.source.subscribe(new WebFilterTraceSubscriber(subscriber, context, span, this));	}	//在 scope.close() 之后,会将链路信息从  ThreadLocal 的 Map 中剔除}@Overridepublic Object scanUnsafe(Attr key) {	if (key == Attr.RUN_STYLE) {		//执行的方式必须是不能切换线程,也就是同步的		//因为,日志的链路信息是放在 ThreadLocal 对象中,切换线程,链路信息就没了		return Attr.RunStyle.SYNC; 	}	return super.scanUnsafe(key);}

WebFilterTraceSubscriber 干了些什么呢?出现异常,以及 http 请求结束的时候,我们可能想将响应信息,异常信息记录进入 Span 中,就是通过这个类封装实现的。

经过 MonoWebFilterTrace 的封装,由于 Spring-WebFlux 处理请求,其实就是封装成我们上面得出的 Mono 之后进行 subscribe 处理的请求,所以这样,整个内部 Mono 的 publish 链路以及 subscribe 链路,就被 WebFilterTraceSubscriber 中的 scope 包裹起来了。只要我们自己不在 GatewayFilter 中转换成某些强制异步的 Mono 或者 Flux 导致切换线程,链路信息是不会丢失的。

我们应用中丢失链路信息的地方

通过查看日志我们发现,启用 RequestBody 缓存的地方,都有链路缺失。这个 RequestBody 缓存我们使用的是 Spring Cloud Gateway 中的 AdaptCachedBodyGlobalFilter,其核心源码是:

private static <T> Mono<T> cacheRequestBody(ServerWebExchange exchange, boolean cacheDecoratedRequest,		Function<ServerHttpRequest, Mono<T>> function) {	ServerHttpResponse response = exchange.getResponse();	NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();	return 		//读取 Body,由于 TCP 拆包,所以需要他们拼接到一起		DataBufferUtils.join(exchange.getRequest().getBody())			//如果没有 Body,则直接返回空 DataBuffer			.defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))			//decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body			//之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路			.map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))			.switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);}

为何会使用这个 AdaptCachedBodyGlobalFilter 呢?获取请求 Body 是通过 exchange.getRequest().getBody() 获取的,其结果是一个 Flux<DataBuffer>.请求的 Body 是一次性的,如果你需要请求重试的话,在第一次调用失败的之后,第二次重试的时候,Body 就读取不到了,因为 Flux 已经结束。所以,对于需要重复调用,例如重试,一对多路由转发的情况,需要将请求 Body 缓存起来,就是经过这个 GatewayFilter。但是经过这个 GatewayFilter 之后,链路信息就没了,可以通过以下这个简单项目进行复现(项目地址):

引入依赖:

<parent>	<groupId>org.springframework.boot</groupId>	<artifactId>spring-boot-starter-parent</artifactId>	<version>2.4.6</version></parent><dependencies>	<dependency>		<groupId>org.springframework.cloud</groupId>		<artifactId>spring-cloud-starter-gateway</artifactId>	</dependency>	<dependency>		<groupId>org.springframework.cloud</groupId>		<artifactId>spring-cloud-starter-sleuth</artifactId>	</dependency>	<dependency>		<groupId>org.springframework.boot</groupId>		<artifactId>spring-boot-starter-log4j2</artifactId>	</dependency>	<!--log4j2异步日志需要的依赖,所有项目都必须用log4j2和异步日志配置-->	<dependency>		<groupId>com.lmax</groupId>		<artifactId>disruptor</artifactId>		<version>${disruptor.version}</version>	</dependency></dependencies><dependencyManagement>	<dependencies>		<dependency>			<groupId>org.springframework.cloud</groupId>			<artifactId>spring-cloud-dependencies</artifactId>			<version>2020.0.3</version>			<type>pom</type>			<scope>import</scope>		</dependency>	</dependencies></dependencyManagement>

对所有路径开启 AdaptCachedBodyGlobalFilter:

@Configuration(proxyBeanMethods = false)public class ApiGatewayConfiguration {    @Autowired    private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter;    @Autowired    private GatewayProperties gatewayProperties;    @PostConstruct    public void init() {        gatewayProperties.getRoutes().forEach(routeDefinition -> {			//对 spring cloud gateway 路由配置中的每个路由都启用 AdaptCachedBodyGlobalFilter             EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition.getId());            adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent);        });    }}

配置(我们只有一个路由,将请求转发到 httpbin.org 这个 http 请求测试网站):

server:  port: 8181spring:  application:    name: apiGateway  cloud:    gateway:      httpclient:        connect-timeout: 500        response-timeout: 60000      routes:        - id: first_route          uri:           predicates:              - Path=/httpbin/**          filters:              - StripPrefix=1

添加两个全局 Filter,一个在 AdaptCachedBodyGlobalFilter 之前,一个在 AdaptCachedBodyGlobalFilter 之后。这两个 Filter 非常简单,只是打一行日志。

@Log4j2@Componentpublic class PreLogFilter implements GlobalFilter, Ordered {    public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1;    @Override    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        log.info("before AdaptCachedBodyGlobalFilter");        return chain.filter(exchange);    }    @Override    public int getOrder() {        return ORDER;    }}@Log4j2@Componentpublic class PostLogFilter implements GlobalFilter, Ordered {    public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1;    @Override    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        log.info("after AdaptCachedBodyGlobalFilter");        return chain.filter(exchange);    }    @Override    public int getOrder() {        return ORDER;    }}

最后指定 Log4j2 的输出格式中包含链路信息,就像系列文章开头中指定的那样。

启动这个应用,之后访问 ,查看日志,发现 PostLogFilter 中的日志,没有链路信息了:

2021-09-08 06:32:35.457  INFO [service-apiGateway,51063d6f1fe264d0,51063d6f1fe264d0] [30600] [reactor-http-nio-2][?:]: before AdaptCachedBodyGlobalFilter2021-09-08 06:32:35.474  INFO [service-apiGateway,,] [30600] [reactor-http-nio-2][?:]: after AdaptCachedBodyGlobalFilter
为何链路信息会丢失

我们来看经过 AdaptCachedBodyGlobalFilter 之后,我们前面拼的 Mono 链路会变成什么样:

return Mono.defer(() ->	new MonoWebFilterTrace(source, 		RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由				.flatMap((Function<Route, Mono<?>>) r -> {					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到					return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler				}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由					Mono.empty() 					.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志						if (logger.isTraceEnabled()) {							logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");					}				})))			.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404			.then(				Mono.defer(() -> {					//省略在 AdaptCachedBodyGlobalFilter 前面的链路嵌套					//读取 Body,由于 TCP 拆包,所以需要他们拼接到一起					DataBufferUtils.join(exchange.getRequest().getBody())						//如果没有 Body,则直接返回空 DataBuffer						.defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))						//decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body						//之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路						.map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))						.switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);				})				.then(Mono.empty()))			), //调用对应的 Handler	TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {		//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略	}););

其中 DataBufferUtils.join(exchange.getRequest().getBody()) 其实是一个 FluxReceive,这里我们可以理解为:提交一个尝试读取请求 Body 的任务,将之后的 GatewayFilter 的链路处理加到在读取完 Body 之后的回调当中,提交这个任务后,立刻返回。这么看可能比较复杂,我们用一个类似的例子类比下:

//首先我们创建一个新的 SpanSpan span = tracer.newTrace();//声明一个类似于 TraceWebFilter 中封装的 MonoWebFilterTrace 的 MonoOperatorclass MonoWebFilterTrace<T> extends MonoOperator<T, T> {	protected MonoWebFilterTrace(Mono<? extends T> source) {		super(source);	}	@Override	public void subscribe(CoreSubscriber<? super T> actual) {		//将 subscribe 用 span 包裹		try (Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span)) {			source.subscribe(actual);			//在将要关闭 spanInScope 的时候(即从 ThreadLocal 的 Map 中移除链路信息),打印日志			log.info("stopped");		}	}}Mono.defer(() -> new MonoWebFilterTrace(		Mono.fromRunnable(() -> {			log.info("first");		})		//模拟 FluxReceive		.then(Mono.delay(Duration.ofSeconds(1))		.doOnSuccess(longSignal -> log.info(longSignal))))).subscribe(aLong -> log.info(aLong));

Mono.delay 和 FluxReceive 表现类似,都是异步切换线程池执行。执行上面的代码,我们可以从日志上面就能看出来:

2021-09-08 07:12:45.236  INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: first2021-09-08 07:12:45.240  INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: stopped2021-09-08 07:12:46.241  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: doOnEach_onNext(0)2021-09-08 07:12:46.242  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: onComplete()2021-09-08 07:12:46.242  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: 0

在 Spring Cloud Gateway 中,Request Body 的 FluxReceive 使用的线程池和调用 GatewayFilter 的是同一个线程池,所以可能线程还是同一个,但是由于 Span 已经结束,从 ThreadLocal 的 Map 中已经移除了链路信息,所以日志中还是没有链路信息。

Spring Cloud Gateway 其他的可能丢失链路信息的点

经过前面的分析,我们可以看出,不止这里,还有其他地方会导致 Spring Cloud Sleuth 的链路追踪信息消失,这里举几个大家常见的例子:

1.在 GatewayFilter 中指定了异步执行某些任务,由于线程切换了,并且这时候可能 Span 已经结束了,所以没有链路信息,例如

@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {	return chain.filter(exchange).publishOn(Schedulers.parallel()).doOnSuccess(o -> {			//这里就没有链路信息了            log.info("success");	});}

2.将 GatewayFilter 中继续链路的 chain.filter(exchange) 放到了异步任务中执行,上面的 AdaptCachedBodyGlobalFilter 就属于这种情况,这样会导致之后的 GatewayFilter 都没有链路信息,例如:

@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {	return Mono.delay(Duration.ofSeconds(1)).then(chain.filter(exchange));}
Java 并发编程模型与 Project Reactor 编程模型的冲突思考

Java 中的很多框架,都用到了 ThreadLocal,或者通过 Thread 来标识唯一性。例如:

日志框架中的 MDC,一般都是 ThreadLocal 实现。所有的锁、基于 AQS 的数据结构,都是通过 Thread 的属性来唯一标识谁获取到了锁的。分布式锁等数据结构,也是通过 Thread 的属性来唯一标识谁获取到了锁的,例如 Redisson 中分布式 Redis 锁的实现。

但是放到 Project Reactor 编程模型,这就显得格格不入了,因为 Project Reactor 异步响应式编程就是不固定线程,没法保证提交任务和回调能在同一个线程,所以 ThreadLocal 的语义在这里很难成立。Project Reactor 虽然提供了对标 ThreadLocal 的 Context,但是主流框架还没有兼容这个 Context,所以给 Spring Cloud Sleuth 粘合这些链路追踪带来了很大困难,因为 MDC 是一个 ThreadLocal 的 Map 实现,而不是基于 Context 的 Map。这就需要 Spring Cloud Sleuth 在订阅一开始,就需要将链路信息放入 MDC,同时还需要保证运行时不切换线程。

运行不切换线程,这样其实限制了 Project Reactor 的灵活调度,是有一些性能损失的。我们其实想尽量就算加入了链路追踪信息,也不用强制运行不切换线程。但是 Spring Cloud Sleuth 是非侵入式设计,很难实现这一点。但是对于我们自己业务的使用,我们可以定制一些编程规范,来保证大家写的代码不丢失链路信息

改进我们的编程规范

首先,我们自定义 Mono 和 Flux 的工厂

公共 Subscriber 封装,将 reactor Subscriber 的所有关键接口,都检查当前上下文是否有链路信息,即 Span,如果没有就包裹上,如果有则直接执行即可。

public class TracedCoreSubscriber<T> implements Subscriber<T>{    private final Subscriber<T> delegate;    private final Tracer tracer;    private final CurrentTraceContext currentTraceContext;    private final Span span;    TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {        this.delegate = delegate;        this.tracer = tracer;        this.currentTraceContext = currentTraceContext;        this.span = span;    }    @Override    public void onSubscribe(Subscription s) {        executeWithinScope(() -> {            delegate.onSubscribe(s);        });    }    @Override    public void onError(Throwable t) {        executeWithinScope(() -> {            delegate.onError(t);        });    }    @Override    public void onComplete() {        executeWithinScope(() -> {            delegate.onComplete();        });    }    @Override    public void onNext(T o) {        executeWithinScope(() -> {            delegate.onNext(o);        });    }    private void executeWithinScope(Runnable runnable) {        //如果当前没有链路信息,强制包裹        if (tracer.currentSpan() == null) {            try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {                runnable.run();            }        } else {            //如果当前已有链路信息,则直接执行            runnable.run();        }    }}

之后分别定义所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其实就是在 subscribe 的时候,用 TracedCoreSubscriber 包装传入的 CoreSubscriber:

public class TracedFlux<T> extends Flux<T> {    private final Flux<T> delegate;    private final Tracer tracer;    private final CurrentTraceContext currentTraceContext;    private final Span span;    TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {        this.delegate = delegate;        this.tracer = tracer;        this.currentTraceContext = currentTraceContext;        this.span = span;    }    @Override    public void subscribe(CoreSubscriber<? super T> actual) {        delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));    }}public class TracedMono<T> extends Mono<T> {    private final Mono<T> delegate;    private final Tracer tracer;    private final CurrentTraceContext currentTraceContext;    private final Span span;    TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {        this.delegate = delegate;        this.tracer = tracer;        this.currentTraceContext = currentTraceContext;        this.span = span;    }    @Override    public void subscribe(CoreSubscriber<? super T> actual) {        delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));    }}

定义工厂类,使用请求 ServerWebExchange 和原始 Flux 创建 TracedFlux,以及使用请求 ServerWebExchange 和原始 Mono 创建 TracedMono,并且 Span 是通过 Attributes 获取的,根据前文的源码分析我们知道,这个 Attribute 是通过 TraceWebFilter 放入 Attributes 的。由于我们只在 GatewayFilter 中使用,一定在 TraceWebFilter 之后 所以这个 Attribute 一定存在。

@Componentpublic class TracedPublisherFactory {    protected static final String TRACE_REQUEST_ATTR = Span.class.getName();    @Autowired    private Tracer tracer;    @Autowired    private CurrentTraceContext currentTraceContext;    public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) {        return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));    }    public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) {        return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));    }}

然后,我们规定:1. 所有的 GatewayFilter,需要继承我们自定义的抽象类,这个抽象类仅仅是把 filter 的结果用 TracedPublisherFactory 的 getTracedMono 给封装了一层 TracedMono,以 GlobalFilter 为例子:

public abstract class AbstractTracedFilter implements GlobalFilter {    @Autowired    protected TracedPublisherFactory tracedPublisherFactory;    @Override    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        return tracedPublisherFactory.getTracedMono(traced(exchange, chain), exchange);    }    protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);}

2. GatewayFilter 中新生成的 Flux 或者 Mono,统一使用 TracedPublisherFactory 再封装一层

3. 对于 AdaptCachedBodyGlobalFilter 读取 Request Body 导致的链路丢失,我向社区提了一个 Pull Request: fix #2004 Span is not terminated properly in Spring Cloud Gateway,大家可以参考。也可以在这个 Filter 之前自己将 Request Body 使用 TracedPublisherFactory 进行封装解决。

标签: #apachepost未转发