diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/VertxContextSupportTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/VertxContextSupportTest.java index f005acefa7a52..5be7efcef15d9 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/VertxContextSupportTest.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/VertxContextSupportTest.java @@ -1,15 +1,19 @@ package io.quarkus.vertx; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.util.List; +import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.RequestScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import jakarta.inject.Singleton; @@ -18,6 +22,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkus.arc.Arc; +import io.quarkus.runtime.BlockingOperationControl; import io.quarkus.runtime.StartupEvent; import io.quarkus.test.QuarkusUnitTest; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; @@ -46,25 +51,41 @@ public void testRunner() throws InterruptedException { @Singleton public static class Alpha { + @Inject + Bravo bravo; + String val; final List vals = new CopyOnWriteArrayList<>(); final CountDownLatch latch = new CountDownLatch(1); void onStart(@Observes StartupEvent event) { + // Request context is active but duplicated context is not used + String bravoId = bravo.getId(); Supplier> uniSupplier = new Supplier>() { @Override public Uni get() { assertTrue(VertxContext.isOnDuplicatedContext()); VertxContextSafetyToggle.validateContextIfExists("Error", "Error"); assertTrue(Arc.container().requestContext().isActive()); - return Uni.createFrom().item("foo"); + // New duplicated contex -> new request context + String asyncBravoId = bravo.getId(); + assertNotEquals(bravoId, asyncBravoId); + + return VertxContextSupport.executeBlocking(() -> { + assertTrue(BlockingOperationControl.isBlockingAllowed()); + assertTrue(VertxContext.isOnDuplicatedContext()); + assertTrue(Arc.container().requestContext().isActive()); + // Duplicated context is propagated -> the same request context + assertEquals(asyncBravoId, bravo.getId()); + return "foo"; + }); } }; try { val = VertxContextSupport.subscribeAndAwait(uniSupplier); } catch (Throwable e) { - fail(); + fail(e); } Supplier> multiSupplier = new Supplier>() { @@ -80,4 +101,20 @@ public Multi get() { } } + @RequestScoped + public static class Bravo { + + private String id; + + @PostConstruct + void init() { + this.id = UUID.randomUUID().toString(); + } + + public String getId() { + return id; + } + + } + } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/VertxContextSupport.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/VertxContextSupport.java index f349be8d0f8f8..340a9d1f65646 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/VertxContextSupport.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/VertxContextSupport.java @@ -1,5 +1,6 @@ package io.quarkus.vertx; +import java.util.concurrent.Callable; import java.util.function.Consumer; import java.util.function.Supplier; @@ -34,7 +35,7 @@ private VertxContextSupport() { * @throws IllegalStateException If called on an event loop thread. */ public static T subscribeAndAwait(Supplier> uniSupplier) throws Throwable { - Context context = getContext(); + Context context = getContext(false); VertxContextSafetyToggle.setContextSafe(context, true); return Uni.createFrom(). emitter(e -> { context.runOnContext(new Handler() { @@ -69,7 +70,7 @@ public void handle(Void event) { * @param subscribeConsumer */ public static void subscribe(Supplier> multiSupplier, Consumer> subscribeConsumer) { - Context context = getContext(); + Context context = getContext(false); VertxContextSafetyToggle.setContextSafe(context, true); context.runOnContext(new Handler() { @@ -96,14 +97,44 @@ public void accept(MultiSubscribe ms) { }); } - private static Context getContext() { + /** + * Executes the supplied blocking {@link Callable} on a Vertx duplicated context; does not block the current thread. + *

+ * If necessary, the CDI request context is activated during execution of the blocking code. + * + * @param + * @param callable + * @return the produced {@link Uni} + * @see VertxContext#getOrCreateDuplicatedContext(Vertx) + */ + public static Uni executeBlocking(Callable callable) { + Context context = getContext(true); + return Uni.createFrom().completionStage(() -> { + return context.executeBlocking(() -> { + ManagedContext requestContext = Arc.container().requestContext(); + boolean terminate = requestContext.isActive() ? false : true; + if (terminate) { + requestContext.activate(); + } + try { + return callable.call(); + } finally { + if (terminate) { + requestContext.terminate(); + } + } + }, false).toCompletionStage(); + }); + } + + private static Context getContext(boolean blocking) { Context context = Vertx.currentContext(); if (context == null) { Vertx vertx = VertxCoreRecorder.getVertx().get(); context = VertxContext.getOrCreateDuplicatedContext(vertx); } else { // Executed on a vertx thread... - if (Context.isOnEventLoopThread()) { + if (!blocking && Context.isOnEventLoopThread()) { throw new IllegalStateException("VertxContextSupport#subscribeAndAwait() must not be called on an event loop!"); } context = VertxContext.getOrCreateDuplicatedContext(context); diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/upgrade/BlockingHttpUpgradeCheckTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/upgrade/BlockingHttpUpgradeCheckTest.java new file mode 100644 index 0000000000000..27eb5365addc0 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/upgrade/BlockingHttpUpgradeCheckTest.java @@ -0,0 +1,82 @@ +package io.quarkus.websockets.next.test.upgrade; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.arc.Arc; +import io.quarkus.runtime.BlockingOperationControl; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.vertx.VertxContextSupport; +import io.quarkus.websockets.next.HttpUpgradeCheck; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Vertx; + +public class BlockingHttpUpgradeCheckTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root + .addClasses(BlockingHttpUpgradeCheck.class, Endpoint.class, WSClient.class)); + + @TestHTTPResource("/end") + URI endUri; + + @Inject + Vertx vertx; + + @Test + public void testBlockingCheck() { + try (WSClient client = new WSClient(vertx)) { + client.connect(endUri); + client.waitForMessages(1); + assertEquals("ok", client.getMessages().get(0).toString()); + assertTrue(BlockingHttpUpgradeCheck.PERFORMED.get()); + } + } + + @WebSocket(path = "/end") + public static class Endpoint { + + @OnOpen + String open() { + return "ok"; + } + + } + + @Singleton + public static class BlockingHttpUpgradeCheck implements HttpUpgradeCheck { + + static final AtomicBoolean PERFORMED = new AtomicBoolean(); + + @Override + public Uni perform(HttpUpgradeContext context) { + return VertxContextSupport.executeBlocking(new Callable() { + + @Override + public CheckResult call() throws Exception { + assertTrue(BlockingOperationControl.isBlockingAllowed()); + assertTrue(VertxContext.isOnDuplicatedContext()); + assertTrue(Arc.container().requestContext().isActive()); + PERFORMED.set(true); + return CheckResult.permitUpgradeSync(); + } + }); + } + } +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/HttpUpgradeCheck.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/HttpUpgradeCheck.java index 937e0fb319049..2e342aad4d00f 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/HttpUpgradeCheck.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/HttpUpgradeCheck.java @@ -6,6 +6,7 @@ import java.util.stream.Stream; import io.quarkus.security.identity.SecurityIdentity; +import io.quarkus.vertx.VertxContextSupport; import io.smallrye.mutiny.Uni; import io.vertx.core.http.HttpServerRequest; @@ -23,6 +24,9 @@ public interface HttpUpgradeCheck { /** * This method inspects HTTP Upgrade context and either allows or denies upgrade to a WebSocket connection. + *

+ * Use {@link VertxContextSupport#executeBlocking(java.util.concurrent.Callable)} in order to execute some blocking code in + * the check. * * @param context {@link HttpUpgradeContext} * @return check result; must never be null