Skip to content

Commit

Permalink
replace ForkJoinPool.commonPool() with Executors.newCachedThreadPool(…
Browse files Browse the repository at this point in the history
…) to avoid CPU-count-based parallelism
  • Loading branch information
andrewazores committed Nov 6, 2023
1 parent 6f7bcca commit 61bb4d8
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 75 deletions.
4 changes: 2 additions & 2 deletions src/test/java/itest/CustomTargetsIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -51,7 +51,7 @@
@TestMethodOrder(OrderAnnotation.class)
public class CustomTargetsIT extends StandardSelfTest {

private final ExecutorService worker = ForkJoinPool.commonPool();
private final ExecutorService worker = Executors.newCachedThreadPool();
static final Map<String, String> NULL_RESULT = new HashMap<>();
private String itestJvmId;
private static StoredCredential storedCredential;
Expand Down
143 changes: 70 additions & 73 deletions src/test/java/itest/bases/StandardSelfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -53,6 +54,7 @@
public abstract class StandardSelfTest {

private static final String SELFTEST_ALIAS = "selftest";
private static final ExecutorService WORKER = Executors.newCachedThreadPool();
public static final Logger logger = Logger.getLogger(StandardSelfTest.class);
public static final ObjectMapper mapper = new ObjectMapper();
public static final int REQUEST_TIMEOUT_SECONDS = 30;
Expand All @@ -65,24 +67,23 @@ public static void waitForDiscovery() {
while (!found && System.nanoTime() < deadline) {
logger.infov("Waiting for discovery to see at least one target...");
CompletableFuture<Boolean> queryFound = new CompletableFuture<>();
ForkJoinPool.commonPool()
.submit(
() -> {
webClient
.get("/api/v3/targets")
.basicAuthentication("user", "pass")
.as(BodyCodec.jsonArray())
.timeout(500)
.send(
ar -> {
if (ar.failed()) {
logger.error(ar.cause());
return;
}
JsonArray arr = ar.result().body();
queryFound.complete(arr.size() >= 1);
});
});
WORKER.submit(
() -> {
webClient
.get("/api/v3/targets")
.basicAuthentication("user", "pass")
.as(BodyCodec.jsonArray())
.timeout(500)
.send(
ar -> {
if (ar.failed()) {
logger.error(ar.cause());
return;
}
JsonArray arr = ar.result().body();
queryFound.complete(arr.size() >= 1);
});
});
try {
found |= queryFound.get(500, TimeUnit.MILLISECONDS);
if (!found) {
Expand All @@ -108,67 +109,63 @@ private static void tryDefineSelfCustomTarget() {
"service:jmx:rmi:///jndi/rmi://localhost:0/jmxrmi",
"alias",
SELFTEST_ALIAS));
ForkJoinPool.commonPool()
.submit(
() -> {
webClient
.post("/api/v2/targets")
.basicAuthentication("user", "pass")
.timeout(500)
.sendJson(
self,
ar -> {
if (ar.failed()) {
logger.error(ar.cause());
return;
}
HttpResponse<Buffer> resp = ar.result();
logger.infov(
"HTTP {0} {1}: {2} [{3}]",
resp.statusCode(),
resp.statusMessage(),
resp.bodyAsString(),
resp.headers());
});
});
WORKER.submit(
() -> {
webClient
.post("/api/v2/targets")
.basicAuthentication("user", "pass")
.timeout(500)
.sendJson(
self,
ar -> {
if (ar.failed()) {
logger.error(ar.cause());
return;
}
HttpResponse<Buffer> resp = ar.result();
logger.infov(
"HTTP {0} {1}: {2} [{3}]",
resp.statusCode(),
resp.statusMessage(),
resp.bodyAsString(),
resp.headers());
});
});
} catch (Exception e) {
logger.warn(e);
}
}

public static String getSelfReferenceConnectUrl() {
CompletableFuture<JsonObject> future = new CompletableFuture<>();
ForkJoinPool.commonPool()
.submit(
() -> {
webClient
.get("/api/v3/targets")
.basicAuthentication("user", "pass")
.as(BodyCodec.jsonArray())
.timeout(5000)
.send(
ar -> {
if (ar.failed()) {
logger.error(ar.cause());
return;
}
JsonArray arr = ar.result().body();
boolean found = false;
for (int i = 0; i < arr.size(); i++) {
JsonObject obj = arr.getJsonObject(i);
if (SELFTEST_ALIAS.equals(
obj.getString("alias"))) {
future.complete(obj);
found = true;
break;
}
}
if (!found) {
future.completeExceptionally(
new NotFoundException());
}
});
});
WORKER.submit(
() -> {
webClient
.get("/api/v3/targets")
.basicAuthentication("user", "pass")
.as(BodyCodec.jsonArray())
.timeout(5000)
.send(
ar -> {
if (ar.failed()) {
logger.error(ar.cause());
return;
}
JsonArray arr = ar.result().body();
boolean found = false;
for (int i = 0; i < arr.size(); i++) {
JsonObject obj = arr.getJsonObject(i);
if (SELFTEST_ALIAS.equals(obj.getString("alias"))) {
future.complete(obj);
found = true;
break;
}
}
if (!found) {
future.completeExceptionally(new NotFoundException());
}
});
});
try {
JsonObject obj = future.get(5000, TimeUnit.MILLISECONDS);
return obj.getString("connectUrl");
Expand Down

0 comments on commit 61bb4d8

Please sign in to comment.