我們繼續(xù)分析上一節(jié)提到的 ??WebHandler?
?。加入 Spring Cloud Sleuth 以及 Prometheus 相關(guān)依賴(lài)之后, Spring Cloud Gateway 的處理流程如下所示:
Spring Cloud Gateway 入口 -> WebFlux 的 DefaultWebFilterChain
Spring Cloud Gateway 是基于 Spring WebFlux 開(kāi)發(fā)的異步響應(yīng)式網(wǎng)關(guān),異步響應(yīng)式代碼比較難以理解和閱讀,我這里給大家分享一種方法去理解,通過(guò)這個(gè)流程來(lái)理解 Spring Cloud Gateway 的工作流程以及底層原理。其實(shí)可以理解為,上圖這個(gè)流程,就是拼出來(lái)一個(gè)完整的 Mono(或者 Flux)流,最后 subscribe 執(zhí)行。
當(dāng)收到一個(gè)請(qǐng)求的時(shí)候,會(huì)經(jīng)過(guò) ??org.springframework.web.server.handler.DefaultWebFilterChain?
?,這是 WebFilter 的調(diào)用鏈,這個(gè)鏈路包括三個(gè) WebFilter:
- ?
?org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter?
?:添加 Prometheus 相關(guān)依賴(lài)之后,會(huì)有這個(gè) MetricsWebFilter,用于記錄請(qǐng)求處理耗時(shí),采集相關(guān)指標(biāo)。 - ?
?org.springframework.cloud.sleuth.instrument.web.TraceWebFilter?
?:添加 Spring Cloud Sleuth 相關(guān)依賴(lài)之后,會(huì)有這個(gè) TraceWebFilter。 - ?
?org.springframework.cloud.gateway.handler.predicate.WeightCalculatorWebFilter?
?:Spring Cloud Gateway 路由權(quán)重相關(guān)配置功能相關(guān)實(shí)現(xiàn)類(lèi),這個(gè)我們這里不關(guān)心。
在這個(gè) ??DefaultWebFilterChain?
? 會(huì)形成這樣一個(gè) Mono,我們依次將他們標(biāo)記出來(lái),首先是入口代碼 ??org.springframework.web.server.handler.DefaultWebFilterChain#filter?
?:
public Monofilter(ServerWebExchange exchange) {
return Mono.defer(() ->
// this.currentFilter != null 代表 WebFilter 鏈還沒(méi)有結(jié)束
// this.chain != null 代表 WebFilter 鏈不為空
this.currentFilter != null && this.chain != null ?
//在 WebFilter 鏈沒(méi)有結(jié)束的情況下,調(diào)用 WebFilter
invokeFilter(this.currentFilter, this.chain, exchange) :
//在 WebFilter 結(jié)束的情況下,調(diào)用 handler
this.handler.handle(exchange));
}
對(duì)于我們這里的 WebFilter 鏈的第一個(gè) ??MetricsWebFilter?
?,假設(shè)啟用了對(duì)應(yīng)的采集統(tǒng)計(jì)的話(huà),這時(shí)候生成的 Mono 就是:
return Mono.defer(() ->
chain.filter(exchange).transformDeferred((call) -> {
long start = System.nanoTime();
return call
//成功時(shí),記錄響應(yīng)時(shí)間
.doOnSuccess((done) -> MetricsWebFilter.this.onSuccess(exchange, start))
//失敗時(shí),記錄響應(yīng)時(shí)間和異常
.doOnError((cause) -> MetricsWebFilter.this.onError(exchange, start, cause));
});
);
這里為了方便,我們對(duì)代碼做了簡(jiǎn)化,由于我們要將整個(gè)鏈路的所有 Mono 和 Flux 拼接在一起行程完整鏈路,所以原本是 ??MetricsWebFilter?
?中的 ??onSuccess(exchange, start)?
?方法,被改成了 ??MetricsWebFilter.this.onSuccess(exchange, start)?
? 這種偽代碼。
接著,根據(jù)??DefaultWebFilterChain?
? 的源碼分析,??chain.filter(exchange)?
? 會(huì)繼續(xù) WebFilter 鏈路,到達(dá)下一個(gè) WebFilter,即 ??TraceWebFilter?
?。經(jīng)過(guò) ??TraceWebFilter?
?,Mono 就會(huì)變成:
return Mono.defer(() ->
new MonoWebFilterTrace(source, chain.filter(exchange), TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相關(guān)的處理,在前面的代碼中給出了,這里省略
});
);
可以看出,在 ??TraceWebFilter?
? 中,整個(gè)內(nèi)部 Mono (??chain.filter(exchange)?
? 后續(xù)的結(jié)果)都被封裝成了一個(gè) ??MonoWebFilterTrace?
?,這也是保持鏈路追蹤信息的關(guān)鍵實(shí)現(xiàn)。
繼續(xù) WebFilter 鏈路,經(jīng)過(guò)最后一個(gè) WebFilter ??WeightCalculatorWebFilter?
?; 這個(gè) WebFilter 我們不關(guān)心,里面對(duì)路由權(quán)重做了一些計(jì)算操作,我們這里直接忽略即可。這樣我們就走完了所有 WebFilter 鏈路,來(lái)到了最后的調(diào)用 ??DefaultWebFilterChain.this.handler?
?,這個(gè) handler 就是 ??org.springframework.web.reactive.DispatcherHandler?
?。在 DispatcherHandler 中,我們會(huì)計(jì)算出路由并發(fā)送請(qǐng)求到符合條件的 GatewayFilter。經(jīng)過(guò) DispatcherHandler,Mono 會(huì)變成:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
Flux.fromIterable(DispatcherHandler.this.handlerMappings) //讀取所有的 handlerMappings
.concatMap(mapping -> mapping.getHandler(exchange)) //按順序調(diào)用所有的 handlerMappings 的 getHandler 方法,如果有對(duì)應(yīng)的 Handler 會(huì)返回,否則返回 Mono.empty();
.next() //找到第一個(gè)返回不是 Mono.empty() 的 Handler
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果沒(méi)有返回不為 Mono.empty() 的 handlerMapping,則直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //調(diào)用對(duì)應(yīng)的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //處理結(jié)果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相關(guān)的處理,在前面的代碼中給出了,這里省略
});
);
handlerMappings 包括:
- ?
?org.springframework.boot.actuate.endpoint.web.reactive.WebFluxEndPointHandlerMapping?
?:由于我們項(xiàng)目中添加了 Actuator 相關(guān)依賴(lài),所以這里有這個(gè) HandlerMapping。Actuator 相關(guān)路徑映射,不是我們這里關(guān)心的。但是可以看出,Actuator 相關(guān)路徑優(yōu)先于 Spring Cloud Gateway 配置路由 - ?
?org.springframework.boot.actuate.endpoint.web.reactive.ControllerEndpointHandlerMapping?
?:由于我們項(xiàng)目中添加了 Actuator 相關(guān)依賴(lài),所以這里有這個(gè) HandlerMapping。使用 @ControllerEndpoint 或者 @RestControllerEndpoint 注解標(biāo)注的 Actuator 相關(guān)路徑映射,不是我們這里關(guān)心的。 - ?
?org.springframework.web.reactive.function.server.support.RouterFunctionMapping?
?:在 Spring-WebFlux 中,你可以定義很多不同的 RouterFunction 來(lái)控制路徑路由,但這也不是我們這里關(guān)心的。但是可以看出,自定義的 RouterFunction 會(huì)優(yōu)先于 Spring Cloud Gateway 配置路由 - ?
?org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping?
?:針對(duì) @RequestMapping 注解的路徑的 HandlerMapping,不是我們這里關(guān)心的。但是可以看出,如果你在 Spring Cloud Gateway 中指定 RequestMapping 路徑,會(huì)優(yōu)先于 Spring Cloud Gateway 配置路由。 - ?
?org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping?
?:這個(gè)是 Spring Cloud Gateway 的 HandlerMapping,會(huì)讀取 Spring Cloud Gateway 配置并生成路由。這個(gè)是我們這里要詳細(xì)分析的。
其實(shí)這些 handlerMappings,我們這里肯定走的是 ??RoutePredicateHandlerMapping?
? 的相關(guān)邏輯,所以我們的 Mono 又可以簡(jiǎn)化成:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.getHandler(exchange)
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果沒(méi)有返回不為 Mono.empty() 的 handlerMapping,則直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //調(diào)用對(duì)應(yīng)的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //處理結(jié)果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相關(guān)的處理,在前面的代碼中給出了,這里省略
});
);
我們來(lái)看 ??RoutePredicateHandlerMapping?
?,首先這些 handlerMapping 都是繼承了抽象類(lèi) ??org.springframework.web.reactive.handler.AbstractHandlerMapping?
?, 前面我們拼接的 Mono 里面的 getHandler 的實(shí)現(xiàn)其實(shí)就在這個(gè)抽象類(lèi)中:
public Mono
可以看出,其實(shí)核心就是每個(gè)實(shí)現(xiàn)類(lèi)的 ??getHandlerInternal(exchange)?
?方法,所以在我們拼接的 Mono 中,我們會(huì)忽略抽象類(lèi)中的針對(duì) handler 之后的 map 處理。
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.getHandlerInternal(exchange)
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果沒(méi)有返回不為 Mono.empty() 的 handlerMapping,則直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //調(diào)用對(duì)應(yīng)的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //處理結(jié)果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相關(guān)的處理,在前面的代碼中給出了,這里省略
});
);
接下來(lái)經(jīng)過(guò) ??RoutePredicateHandlerMapping?
? 的 ??getHandlerInternal(exchange)?
? 方法,我們的 Mono 變成了:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根據(jù)請(qǐng)求尋找路由
.flatMap((Function>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //將路由放入 Attributes 中,后面我們還會(huì)用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果為 Mono.empty(),也就是沒(méi)找到路由
Mono.empty() //返回 Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,記錄日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果沒(méi)有返回不為 Mono.empty() 的 handlerMapping,則直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //調(diào)用對(duì)應(yīng)的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //處理結(jié)果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相關(guān)的處理,在前面的代碼中給出了,這里省略
});
);
??RoutePredicateHandlerMapping.this.lookupRoute(exchange)?
? 根據(jù)請(qǐng)求尋找路由,這個(gè)我們就不詳細(xì)展開(kāi)了,其實(shí)就是根據(jù)你的 Spring Cloud Gateway 配置,找到合適的路由。接下來(lái)我們來(lái)看調(diào)用對(duì)應(yīng)的 Handler,即 FilteringWebHandler。??DispatcherHandler.this.invokeHandler(exchange, handler)?
? 我們這里也不詳細(xì)展開(kāi),我們知道其實(shí)就是調(diào)用 Handler 的 handle 方法,即 FilteringWebHandler 的 handle 方法,所以 我們的 Mono 變成了:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根據(jù)請(qǐng)求尋找路由
.flatMap((Function>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //將路由放入 Attributes 中,后面我們還會(huì)用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果為 Mono.empty(),也就是沒(méi)找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,記錄日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果沒(méi)有返回不為 Mono.empty() 的 handlerMapping,則直接返回 404
.then(FilteringWebHandler.this.handle(exchange).then(Mono.empty())) //調(diào)用對(duì)應(yīng)的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //處理結(jié)果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相關(guān)的處理,在前面的代碼中給出了,這里省略
});
);
由于調(diào)用對(duì)應(yīng)的 Handler,最后返回的是 ??Mono.empty()?
?,所以后面的 flatMap 其實(shí)不會(huì)執(zhí)行了。所以我們可以將最后的處理結(jié)果這一步去掉。所以我們的 Mono 就變成了:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根據(jù)請(qǐng)求尋找路由
.flatMap((Function>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //將路由放入 Attributes 中,后面我們還會(huì)用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果為 Mono.empty(),也就是沒(méi)找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,記錄日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果沒(méi)有返回不為 Mono.empty() 的 handlerMapping,則直接返回 404
.then(FilteringWebHandler.this.handle(exchange).then(Mono.empty()))), //調(diào)用對(duì)應(yīng)的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相關(guān)的處理,在前面的代碼中給出了,這里省略
});
);
??FilteringWebHandler.this.handle(exchange)?
? 其實(shí)就是從 Attributes 中取出路由,從路由中取出對(duì)應(yīng)的 GatewayFilters,與全局 GatewayFilters 放到同一個(gè) List 中,并按照這些 GatewayFilter 的順序排序(可以通過(guò)實(shí)現(xiàn) ??org.springframework.core.Ordered?
? 接口來(lái)制定順序),然后生成 ??DefaultGatewayFilterChain?
? 即 GatewayFilter 鏈路。對(duì)應(yīng)的源碼是:
public Monohandle(ServerWebExchange exchange) {
//從 Attributes 中取出路由,從路由中取出對(duì)應(yīng)的 GatewayFilters
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
ListgatewayFilters = route.getFilters();
//與全局 GatewayFilters 放到同一個(gè) List 中
Listcombined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
//按照這些 GatewayFilter 的順序排序(可以通過(guò)實(shí)現(xiàn) `org.springframework.core.Ordered` 接口來(lái)制定順序)
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
//生成調(diào)用鏈
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
這個(gè) GatewayFilter 調(diào)用鏈和 WebFilter 調(diào)用鏈類(lèi)似,參考 ??DefaultGatewayFilterChain?
? 的源碼:
public Monofilter(ServerWebExchange exchange) {
return Mono.defer(() -> {
//如果鏈路沒(méi)有結(jié)束,則繼續(xù)鏈路
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
//這里將 index + 1,也就是調(diào)用鏈路中的下一個(gè) GatewayFilter
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
//每個(gè) filter 中如果想要繼續(xù)鏈路,則會(huì)調(diào)用 chain.filter(exchange),這也是我們開(kāi)發(fā) GatewayFilter 的時(shí)候的使用方式
return filter.filter(exchange, chain);
}
else {
//到達(dá)末尾,鏈路結(jié)束
return Mono.empty(); // complete
}
});
}
所以,經(jīng)過(guò) ??DefaultGatewayFilterChain?
? 后,我們的 Mono 就會(huì)變成:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根據(jù)請(qǐng)求尋找路由
.flatMap((Function>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //將路由放入 Attributes 中,后面我們還會(huì)用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果為 Mono.empty(),也就是沒(méi)找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,記錄日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果沒(méi)有返回不為 Mono.empty() 的 handlerMapping,則直接返回 404
.then(new DefaultGatewayFilterChain(combined).filter(exchange).then(Mono.empty()))), //調(diào)用對(duì)應(yīng)的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相關(guān)的處理,在前面的代碼中給出了,這里省略
});
);
再繼續(xù)展開(kāi) ??DefaultGatewayFilterChain?
? 的鏈路調(diào)用,可以得到:
return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根據(jù)請(qǐng)求尋找路由
.flatMap((Function>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //將路由放入 Attributes 中,后面我們還會(huì)用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果為 Mono.empty(),也就是沒(méi)找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,記錄日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果沒(méi)有返回不為 Mono.empty() 的 handlerMapping,則直接返回 404
.then(
Mono.defer(() -> {
//如果鏈路沒(méi)有結(jié)束,則繼續(xù)鏈路
if (DefaultGatewayFilterChain.this.index < DefaultGatewayFilterChain.this.filters.size()) {
GatewayFilter filter = DefaultGatewayFilterChain.this.filters.get(DefaultGatewayFilterChain.this.index);
//這里將 index + 1,也就是調(diào)用鏈路中的下一個(gè) GatewayFilter
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(DefaultGatewayFilterChain.this, DefaultGatewayFilterChain.this.index + 1);
//每個(gè) filter 中如果想要繼續(xù)鏈路,則會(huì)調(diào)用 chain.filter(exchange),這也是我們開(kāi)發(fā) GatewayFilter 的時(shí)候的使用方式
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); //鏈路完成
}
})
.then(Mono.empty()))
), //調(diào)用對(duì)應(yīng)的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相關(guān)的處理,在前面的代碼中給出了,這里省略
});
);
這樣,就形成了 Spring Cloud Gateway 針對(duì)路由請(qǐng)求的完整 Mono 調(diào)用鏈。
微信搜索“我的編程喵”關(guān)注公眾號(hào),每日一刷,輕松提升技術(shù),斬獲各種offer: