Skip to content

Commit

Permalink
Merge pull request #43444 from mkouba/ws-next-blocking-http-upgrade-c…
Browse files Browse the repository at this point in the history
…heck

Add VertxContextSupport#executeBlocking()
  • Loading branch information
mkouba authored Sep 24, 2024
2 parents 3baeed1 + d211835 commit cb31fbb
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package io.quarkus.vertx;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.RequestScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand All @@ -18,6 +22,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.runtime.BlockingOperationControl;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
Expand Down Expand Up @@ -46,25 +51,41 @@ public void testRunner() throws InterruptedException {
@Singleton
public static class Alpha {

@Inject
Bravo bravo;

String val;

final List<Integer> vals = new CopyOnWriteArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);

void onStart(@Observes StartupEvent event) {
// Request context is active but duplicated context is not used
String bravoId = bravo.getId();
Supplier<Uni<String>> uniSupplier = new Supplier<Uni<String>>() {
@Override
public Uni<String> get() {
assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Error", "Error");
assertTrue(Arc.container().requestContext().isActive());
return Uni.createFrom().item("foo");
// New duplicated contex -> new request context
String asyncBravoId = bravo.getId();
assertNotEquals(bravoId, asyncBravoId);

return VertxContextSupport.executeBlocking(() -> {
assertTrue(BlockingOperationControl.isBlockingAllowed());
assertTrue(VertxContext.isOnDuplicatedContext());
assertTrue(Arc.container().requestContext().isActive());
// Duplicated context is propagated -> the same request context
assertEquals(asyncBravoId, bravo.getId());
return "foo";
});
}
};
try {
val = VertxContextSupport.subscribeAndAwait(uniSupplier);
} catch (Throwable e) {
fail();
fail(e);
}

Supplier<Multi<Integer>> multiSupplier = new Supplier<Multi<Integer>>() {
Expand All @@ -80,4 +101,20 @@ public Multi<Integer> get() {
}
}

@RequestScoped
public static class Bravo {

private String id;

@PostConstruct
void init() {
this.id = UUID.randomUUID().toString();
}

public String getId() {
return id;
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.vertx;

import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -34,7 +35,7 @@ private VertxContextSupport() {
* @throws IllegalStateException If called on an event loop thread.
*/
public static <T> T subscribeAndAwait(Supplier<Uni<T>> uniSupplier) throws Throwable {
Context context = getContext();
Context context = getContext(false);
VertxContextSafetyToggle.setContextSafe(context, true);
return Uni.createFrom().<T> emitter(e -> {
context.runOnContext(new Handler<Void>() {
Expand Down Expand Up @@ -69,7 +70,7 @@ public void handle(Void event) {
* @param subscribeConsumer
*/
public static <T> void subscribe(Supplier<Multi<T>> multiSupplier, Consumer<MultiSubscribe<T>> subscribeConsumer) {
Context context = getContext();
Context context = getContext(false);
VertxContextSafetyToggle.setContextSafe(context, true);
context.runOnContext(new Handler<Void>() {

Expand All @@ -96,14 +97,44 @@ public void accept(MultiSubscribe<T> ms) {
});
}

private static Context getContext() {
/**
* Executes the supplied blocking {@link Callable} on a Vertx duplicated context; does not block the current thread.
* <p>
* If necessary, the CDI request context is activated during execution of the blocking code.
*
* @param <T>
* @param callable
* @return the produced {@link Uni}
* @see VertxContext#getOrCreateDuplicatedContext(Vertx)
*/
public static <T> Uni<T> executeBlocking(Callable<T> callable) {
Context context = getContext(true);
return Uni.createFrom().completionStage(() -> {
return context.executeBlocking(() -> {
ManagedContext requestContext = Arc.container().requestContext();
boolean terminate = requestContext.isActive() ? false : true;
if (terminate) {
requestContext.activate();
}
try {
return callable.call();
} finally {
if (terminate) {
requestContext.terminate();
}
}
}, false).toCompletionStage();
});
}

private static Context getContext(boolean blocking) {
Context context = Vertx.currentContext();
if (context == null) {
Vertx vertx = VertxCoreRecorder.getVertx().get();
context = VertxContext.getOrCreateDuplicatedContext(vertx);
} else {
// Executed on a vertx thread...
if (Context.isOnEventLoopThread()) {
if (!blocking && Context.isOnEventLoopThread()) {
throw new IllegalStateException("VertxContextSupport#subscribeAndAwait() must not be called on an event loop!");
}
context = VertxContext.getOrCreateDuplicatedContext(context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.quarkus.websockets.next.test.upgrade;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.runtime.BlockingOperationControl;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.vertx.VertxContextSupport;
import io.quarkus.websockets.next.HttpUpgradeCheck;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;

public class BlockingHttpUpgradeCheckTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(BlockingHttpUpgradeCheck.class, Endpoint.class, WSClient.class));

@TestHTTPResource("/end")
URI endUri;

@Inject
Vertx vertx;

@Test
public void testBlockingCheck() {
try (WSClient client = new WSClient(vertx)) {
client.connect(endUri);
client.waitForMessages(1);
assertEquals("ok", client.getMessages().get(0).toString());
assertTrue(BlockingHttpUpgradeCheck.PERFORMED.get());
}
}

@WebSocket(path = "/end")
public static class Endpoint {

@OnOpen
String open() {
return "ok";
}

}

@Singleton
public static class BlockingHttpUpgradeCheck implements HttpUpgradeCheck {

static final AtomicBoolean PERFORMED = new AtomicBoolean();

@Override
public Uni<CheckResult> perform(HttpUpgradeContext context) {
return VertxContextSupport.executeBlocking(new Callable<CheckResult>() {

@Override
public CheckResult call() throws Exception {
assertTrue(BlockingOperationControl.isBlockingAllowed());
assertTrue(VertxContext.isOnDuplicatedContext());
assertTrue(Arc.container().requestContext().isActive());
PERFORMED.set(true);
return CheckResult.permitUpgradeSync();
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.stream.Stream;

import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.vertx.VertxContextSupport;
import io.smallrye.mutiny.Uni;
import io.vertx.core.http.HttpServerRequest;

Expand All @@ -23,6 +24,9 @@ public interface HttpUpgradeCheck {

/**
* This method inspects HTTP Upgrade context and either allows or denies upgrade to a WebSocket connection.
* <p>
* Use {@link VertxContextSupport#executeBlocking(java.util.concurrent.Callable)} in order to execute some blocking code in
* the check.
*
* @param context {@link HttpUpgradeContext}
* @return check result; must never be null
Expand Down

0 comments on commit cb31fbb

Please sign in to comment.