diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts index 2dedc5d9f8..1956b72f5e 100644 --- a/benchmarks/build.gradle.kts +++ b/benchmarks/build.gradle.kts @@ -32,6 +32,7 @@ dependencies { api(projects.micronautJacksonDatabind) api(projects.micronautRouter) api(projects.micronautRuntime) + api(projects.micronautCoreReactive) api(platform(libs.test.boms.micronaut.validation)) api(libs.managed.reactor) diff --git a/benchmarks/src/jmh/java/io/micronaut/http/server/stack/ControllersBenchmark.java b/benchmarks/src/jmh/java/io/micronaut/http/server/stack/ControllersBenchmark.java index 5c4df2647b..bf332a9776 100644 --- a/benchmarks/src/jmh/java/io/micronaut/http/server/stack/ControllersBenchmark.java +++ b/benchmarks/src/jmh/java/io/micronaut/http/server/stack/ControllersBenchmark.java @@ -3,6 +3,7 @@ import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Requires; import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.async.annotation.SingleResult; import io.micronaut.http.MediaType; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Get; @@ -22,6 +23,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import jakarta.inject.Inject; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Assertions; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Mode; @@ -31,14 +33,19 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.infra.Blackhole; -import org.openjdk.jmh.profile.AsyncProfiler; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; /** @@ -122,7 +129,7 @@ public enum Request { TFB_LIKE { @Override FullHttpRequest request() { - FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/tfblike"); + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/tfblike/bytes"); request.headers().add(HttpHeaderNames.ACCEPT, "text/plain,text/html;q=0.9,application/xhtml+xml;q=0.9,application/xml;q=0.8,*/*;q=0.7"); return request; } @@ -136,6 +143,150 @@ void verifyResponse(FullHttpResponse response) { Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH)); } }, + TFB_STRING { + @Override + FullHttpRequest request() { + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/tfblike/string"); + request.headers().add(HttpHeaderNames.ACCEPT, "text/plain,text/html;q=0.9,application/xhtml+xml;q=0.9,application/xml;q=0.8,*/*;q=0.7"); + return request; + } + + @Override + void verifyResponse(FullHttpResponse response) { + Assertions.assertEquals(HttpResponseStatus.OK, response.status()); + Assertions.assertEquals("text/plain", response.headers().get(HttpHeaderNames.CONTENT_TYPE)); + String expectedResponseBody = "Hello, World!"; + Assertions.assertEquals(expectedResponseBody, response.content().toString(StandardCharsets.UTF_8)); + Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH)); + } + }, + TFB_LIKE_BEANS1 { + @Override + FullHttpRequest request() { + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/tfblike/beans1"); + request.headers().add(HttpHeaderNames.ACCEPT, "application/json"); + return request; + } + + @Override + void verifyResponse(FullHttpResponse response) { + Assertions.assertEquals(HttpResponseStatus.OK, response.status()); + Assertions.assertEquals("application/json", response.headers().get(HttpHeaderNames.CONTENT_TYPE)); + String expectedResponseBody = """ +[{"id":1,"message":"A"},{"id":2,"message":"B"},{"id":3,"message":"C"}]"""; + Assertions.assertEquals(expectedResponseBody, response.content().toString(StandardCharsets.UTF_8)); + Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH)); + } + }, + TFB_LIKE_BEANS2 { + @Override + FullHttpRequest request() { + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/tfblike/beans2"); + request.headers().add(HttpHeaderNames.ACCEPT, "application/json"); + return request; + } + + @Override + void verifyResponse(FullHttpResponse response) { + Assertions.assertEquals(HttpResponseStatus.OK, response.status()); + Assertions.assertEquals("application/json", response.headers().get(HttpHeaderNames.CONTENT_TYPE)); + String expectedResponseBody = """ +[{"id":1,"randomNumber":123},{"id":2,"randomNumber":456},{"id":3,"randomNumber":789}]"""; + Assertions.assertEquals(expectedResponseBody, response.content().toString(StandardCharsets.UTF_8)); + Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH)); + } + }, + TFB_LIKE_ASYNC_BEANS1 { + @Override + FullHttpRequest request() { + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/tfblike/async/beans1"); + request.headers().add(HttpHeaderNames.ACCEPT, "application/json"); + return request; + } + + @Override + void verifyResponse(FullHttpResponse response) { + Assertions.assertEquals(HttpResponseStatus.OK, response.status()); + Assertions.assertEquals("application/json", response.headers().get(HttpHeaderNames.CONTENT_TYPE)); + String expectedResponseBody = """ +[{"id":1,"message":"A"},{"id":2,"message":"B"},{"id":3,"message":"C"}]"""; + Assertions.assertEquals(expectedResponseBody, response.content().toString(StandardCharsets.UTF_8)); + Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH)); + } + }, + TFB_LIKE_ASYNC_BEANS2 { + @Override + FullHttpRequest request() { + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/tfblike/async/beans2"); + request.headers().add(HttpHeaderNames.ACCEPT, "application/json"); + return request; + } + + @Override + void verifyResponse(FullHttpResponse response) { + Assertions.assertEquals(HttpResponseStatus.OK, response.status()); + Assertions.assertEquals("application/json", response.headers().get(HttpHeaderNames.CONTENT_TYPE)); + String expectedResponseBody = """ +[{"id":1,"randomNumber":123},{"id":2,"randomNumber":456},{"id":3,"randomNumber":789}]"""; + Assertions.assertEquals(expectedResponseBody, response.content().toString(StandardCharsets.UTF_8)); + Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH)); + } + }, +// Type pollution because of the Reactor +// TFB_LIKE_REACTIVE_BEANS1 { +// @Override +// FullHttpRequest request() { +// FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/tfblike/reactive/beans1"); +// request.headers().add(HttpHeaderNames.ACCEPT, "application/json"); +// return request; +// } +// +// @Override +// void verifyResponse(FullHttpResponse response) { +// Assertions.assertEquals(HttpResponseStatus.OK, response.status()); +// Assertions.assertEquals("application/json", response.headers().get(HttpHeaderNames.CONTENT_TYPE)); +// String expectedResponseBody = """ +//[{"id":1,"message":"A"},{"id":2,"message":"B"},{"id":3,"message":"C"}]"""; +// Assertions.assertEquals(expectedResponseBody, response.content().toString(StandardCharsets.UTF_8)); +// Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH)); +// } +// }, +// TFB_LIKE_REACTIVE_BEANS2 { +// @Override +// FullHttpRequest request() { +// FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/tfblike/reactive/beans2"); +// request.headers().add(HttpHeaderNames.ACCEPT, "application/json"); +// return request; +// } +// +// @Override +// void verifyResponse(FullHttpResponse response) { +// Assertions.assertEquals(HttpResponseStatus.OK, response.status()); +// Assertions.assertEquals("application/json", response.headers().get(HttpHeaderNames.CONTENT_TYPE)); +// String expectedResponseBody = """ +//[{"id":1,"randomNumber":123},{"id":2,"randomNumber":456},{"id":3,"randomNumber":789}]"""; +// Assertions.assertEquals(expectedResponseBody, response.content().toString(StandardCharsets.UTF_8)); +// Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH)); +// } +// }, + TFB_LIKE_MAP { + @Override + FullHttpRequest request() { + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/tfblike/map"); + request.headers().add(HttpHeaderNames.ACCEPT, "application/json"); + return request; + } + + @Override + void verifyResponse(FullHttpResponse response) { + Assertions.assertEquals(HttpResponseStatus.OK, response.status()); + Assertions.assertEquals("application/json", response.headers().get(HttpHeaderNames.CONTENT_TYPE)); + String expectedResponseBody = """ +{"message":"Hello, World!"}"""; + Assertions.assertEquals(expectedResponseBody, response.content().toString(StandardCharsets.UTF_8)); + Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH)); + } + }, MISSING_QUERY_PARAMETER { @Override FullHttpRequest request() { @@ -152,8 +303,7 @@ void verifyResponse(FullHttpResponse response) { Assertions.assertEquals(expectedResponseBody, response.content().toString(StandardCharsets.UTF_8)); Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH)); } - } - ; + }; abstract FullHttpRequest request(); @@ -164,11 +314,64 @@ void verifyResponse(FullHttpResponse response) { @Requires(property = "spec.name", value = "ControllersBenchmark") static class TfbLikeController { - private static final byte[] TEXT = "Hello, World!".getBytes(StandardCharsets.UTF_8); + public static final String STRING = "Hello, World!"; + private static final byte[] BYTES = STRING.getBytes(StandardCharsets.UTF_8); + public static final List<@NotNull SomeBean1> BEANS1 = List.of( + new SomeBean1(1, "A"), + new SomeBean1(2, "B"), + new SomeBean1(3, "C") + ); + public static final List<@NotNull SomeBean2> BEANS2 = List.of( + new SomeBean2(1, 123), + new SomeBean2(2, 456), + new SomeBean2(3, 789) + ); + + @Get(value = "/bytes", produces = MediaType.TEXT_PLAIN) + public byte[] bytes() { + return BYTES; + } + @Get(value = "/string", produces = MediaType.TEXT_PLAIN) + public String string() { + return STRING; + } + + @Get("/beans1") + public List beans1() { + return BEANS1; + } - @Get(value = "/", produces = MediaType.TEXT_PLAIN) - public byte[] getPlainText() { - return TEXT; + @Get("/beans2") + public List beans2() { + return BEANS2; + } + + @Get("/async/beans1") + public CompletionStage> asyncBeans1() { + return CompletableFuture.completedFuture(BEANS1); + } + + @Get("/async/beans2") + public CompletionStage> asyncBeans2() { + return CompletableFuture.completedFuture(BEANS2); + } + + @SingleResult + @Get("/reactive/beans1") + public Publisher> publisherBeans1() { + return Mono.just(BEANS1); + } + + @Get("/reactive/beans2") + public Mono> publisherBeans2() { + return Mono.just(BEANS2); + } + + @Get("/map") + public Map getJson() { + final Map map = new HashMap<>(); + map.put("message", STRING); + return map; } } @@ -188,4 +391,36 @@ String echoMissingParameter(String text, return text; } } + + public record SomeBean1(int id, String message) { + } + + public static class SomeBean2 { + private int id; + private int randomNumber; + + public SomeBean2() { + } + + public SomeBean2(int id, int randomNumber) { + this.id = id; + this.randomNumber = randomNumber; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public int getRandomNumber() { + return randomNumber; + } + + public void setRandomNumber(int randomNumber) { + this.randomNumber = randomNumber; + } + } } diff --git a/core-reactive/src/main/java/io/micronaut/core/async/propagation/ReactivePropagation.java b/core-reactive/src/main/java/io/micronaut/core/async/propagation/ReactivePropagation.java index 13d6a2e057..7ab14717e5 100644 --- a/core-reactive/src/main/java/io/micronaut/core/async/propagation/ReactivePropagation.java +++ b/core-reactive/src/main/java/io/micronaut/core/async/propagation/ReactivePropagation.java @@ -46,27 +46,8 @@ private ReactivePropagation() { * @return propagation aware publisher */ public static Publisher propagate(PropagatedContext propagatedContext, Publisher actual) { - if (actual instanceof CorePublisher) { - return new CorePublisher<>() { - @Override - public void subscribe(@NonNull CoreSubscriber subscriber) { - CorePublisher actualCorePublisher = (CorePublisher) actual; - try (PropagatedContext.Scope ignore = propagatedContext.propagate()) { - actualCorePublisher.subscribe(propagate(propagatedContext, subscriber)); - } - } - - @Override - public void subscribe(Subscriber subscriber) { - if (subscriber instanceof CoreSubscriber coreSubscriber) { - subscribe(coreSubscriber); - return; - } - try (PropagatedContext.Scope ignore = propagatedContext.propagate()) { - actual.subscribe(propagate(propagatedContext, subscriber)); - } - } - }; + if (actual instanceof CorePublisher corePublisher) { + return propagate(propagatedContext, corePublisher); } return subscriber -> { try (PropagatedContext.Scope ignore = propagatedContext.propagate()) { @@ -74,6 +55,36 @@ public void subscribe(Subscriber subscriber) { } }; } + /** + * Creates propagation context aware {@link Publisher}. + * + * @param propagatedContext The context + * @param actual The publisher + * @param The publisher element type + * @return propagation aware publisher + * @since 4.8 + */ + public static Publisher propagate(PropagatedContext propagatedContext, CorePublisher actual) { + return new CorePublisher<>() { + @Override + public void subscribe(@NonNull CoreSubscriber subscriber) { + try (PropagatedContext.Scope ignore = propagatedContext.propagate()) { + actual.subscribe(propagate(propagatedContext, subscriber)); + } + } + + @Override + public void subscribe(Subscriber subscriber) { + if (subscriber instanceof CoreSubscriber coreSubscriber) { + subscribe(coreSubscriber); + return; + } + try (PropagatedContext.Scope ignore = propagatedContext.propagate()) { + actual.subscribe(propagate(propagatedContext, subscriber)); + } + } + }; + } /** * Creates propagation context aware {@link Subscriber}. diff --git a/http/src/main/java/io/micronaut/http/reactive/execution/ReactorExecutionFlowImpl.java b/http/src/main/java/io/micronaut/http/reactive/execution/ReactorExecutionFlowImpl.java index 750a538c0f..5a1202f2c5 100644 --- a/http/src/main/java/io/micronaut/http/reactive/execution/ReactorExecutionFlowImpl.java +++ b/http/src/main/java/io/micronaut/http/reactive/execution/ReactorExecutionFlowImpl.java @@ -328,7 +328,9 @@ public ImperativeExecutionFlow tryComplete() { static Mono toMono(ExecutionFlow next) { if (next instanceof ReactorExecutionFlowImpl reactiveFlowImpl) { return reactiveFlowImpl.value; - } else if (next instanceof ImperativeExecutionFlow imperativeFlow) { + } + ImperativeExecutionFlow imperativeFlow = next.tryComplete(); + if (imperativeFlow != null) { Mono m; if (imperativeFlow.getError() != null) { m = Mono.error(imperativeFlow.getError()); @@ -347,9 +349,8 @@ static Mono toMono(ExecutionFlow next) { }); } return m; - } else { - return new FlowAsMono<>(next); } + return new FlowAsMono<>(next); } static Mono toMono(Supplier> next) {