From 06184cd5223ca57f8985ae34a89b456a1b6452c9 Mon Sep 17 00:00:00 2001 From: "Mark S. Lewis" Date: Sat, 2 Dec 2023 21:34:12 +0000 Subject: [PATCH] Use Executor in asset-transfer-events/application-gateway-java The default ForkJoinPool.commonPool may have limited capacity in some environments, risking deadlock. This implementation also better demonstrates handling of connection errors. Signed-off-by: Mark S. Lewis --- .../src/main/java/App.java | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/asset-transfer-events/application-gateway-java/src/main/java/App.java b/asset-transfer-events/application-gateway-java/src/main/java/App.java index c515cdcddd..e0d94f777e 100644 --- a/asset-transfer-events/application-gateway-java/src/main/java/App.java +++ b/asset-transfer-events/application-gateway-java/src/main/java/App.java @@ -7,6 +7,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonParser; +import io.grpc.Status; import org.hyperledger.fabric.client.ChaincodeEvent; import org.hyperledger.fabric.client.CloseableIterator; import org.hyperledger.fabric.client.CommitException; @@ -14,15 +15,17 @@ import org.hyperledger.fabric.client.Contract; import org.hyperledger.fabric.client.EndorseException; import org.hyperledger.fabric.client.Gateway; +import org.hyperledger.fabric.client.GatewayRuntimeException; import org.hyperledger.fabric.client.Network; import org.hyperledger.fabric.client.SubmitException; import java.nio.charset.StandardCharsets; import java.time.Instant; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -public final class App { +public final class App implements AutoCloseable { private static final String channelName = "mychannel"; private static final String chaincodeName = "events"; @@ -30,6 +33,7 @@ public final class App { private final Contract contract; private final String assetId = "asset" + Instant.now().toEpochMilli(); private final Gson gson = new GsonBuilder().setPrettyPrinting().create(); + private final ExecutorService executor = Executors.newCachedThreadPool(); public static void main(final String[] args) throws Exception { var grpcChannel = Connections.newGrpcConnection(); @@ -42,8 +46,8 @@ public static void main(final String[] args) throws Exception { .submitOptions(options -> options.withDeadlineAfter(5, TimeUnit.SECONDS)) .commitStatusOptions(options -> options.withDeadlineAfter(1, TimeUnit.MINUTES)); - try (var gateway = builder.connect()) { - new App(gateway).run(); + try (var gateway = builder.connect(); var app = new App(gateway)) { + app.run(); } finally { grpcChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } @@ -71,15 +75,22 @@ private CloseableIterator startChaincodeEventListening() { System.out.println("\n*** Start chaincode event listening"); var eventIter = network.getChaincodeEvents(chaincodeName); + executor.execute(() -> readEvents(eventIter)); - CompletableFuture.runAsync(() -> { + return eventIter; + } + + private void readEvents(final CloseableIterator eventIter) { + try { eventIter.forEachRemaining(event -> { var payload = prettyJson(event.getPayload()); System.out.println("\n<-- Chaincode event received: " + event.getEventName() + " - " + payload); }); - }); - - return eventIter; + } catch (GatewayRuntimeException e) { + if (e.getStatus().getCode() != Status.Code.CANCELLED) { + e.printStackTrace(); + } + } } private String prettyJson(final byte[] json) { @@ -154,4 +165,9 @@ private void replayChaincodeEvents(final long startBlock) { } } } + + @Override + public void close() throws Exception { + executor.shutdownNow(); + } }