Skip to content

Commit

Permalink
Remove concatMap in lookupRoute to improve throughput
Browse files Browse the repository at this point in the history
Fixes gh-3291
  • Loading branch information
spencergibb committed Oct 31, 2024
1 parent 4cbd756 commit 3f62607
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,17 @@ private String getExchangeDesc(ServerWebExchange exchange) {
}

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
return this.routeLocator.getRoutes().filterWhen(route -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, route.getId());
try {
return route.getPredicate().apply(exchange);
}
catch (Exception e) {
logger.error("Error applying predicate for route: " + route.getId(), e);
}
return Mono.just(false);
})
.next()
// TODO: error handling
.map(route -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void lookupRouteFromAsyncPredicates(CapturedOutput capturedOutput) {
Route routeError = Route.async()
.id("routeError")
.uri("http://localhost")
.asyncPredicate(swe -> Mono.error(new IllegalStateException("boom1")))
.asyncPredicate(swe -> Mono.just(boom1()))
.build();
Route routeFail = Route.async().id("routeFail").uri("http://localhost").asyncPredicate(swe -> {
throw new IllegalStateException("boom2");
Expand All @@ -96,4 +96,8 @@ public void lookupRouteFromAsyncPredicates(CapturedOutput capturedOutput) {
Assertions.assertTrue(capturedOutput.getOut().contains("java.lang.IllegalStateException: boom2"));
}

boolean boom1() {
throw new IllegalStateException("boom1");
}

}

0 comments on commit 3f62607

Please sign in to comment.