Skip to content

Commit

Permalink
feat: Create Client with external ExecutorService (#1987)
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Ivanov <[email protected]>
  • Loading branch information
0xivanov authored Sep 19, 2024
1 parent c21c03a commit dd86ea8
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 15 deletions.
104 changes: 94 additions & 10 deletions sdk/src/main/java/com/hedera/hashgraph/sdk/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,27 @@ static ExecutorService createExecutor() {
threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
}

/**
*
* Construct a client given a set of nodes.
* It is the responsibility of the caller to ensure that all nodes in the map are part of the
* same Hedera network. Failure to do so will result in undefined behavior.
* The client will load balance all requests to Hedera using a simple round-robin scheme to
* chose nodes to send transactions to. For one transaction, at most 1/3 of the nodes will be tried.
*
* @param networkMap the map of node IDs to node addresses that make up the network.
* @param executor runs the grpc requests asynchronously. Note that calling `close()` method on one of the
* clients will close the executor for all the other clients sharing this executor
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forNetwork(Map<String, AccountId> networkMap, ExecutorService executor) {
var network = Network.forNetwork(executor, networkMap);
var mirrorNetwork = MirrorNetwork.forNetwork(executor, new ArrayList<>());

return new Client(executor, network, mirrorNetwork, null, null);
}


/**
* Construct a client given a set of nodes.
*
Expand All @@ -158,10 +179,7 @@ static ExecutorService createExecutor() {
*/
public static Client forNetwork(Map<String, AccountId> networkMap) {
var executor = createExecutor();
var network = Network.forNetwork(executor, networkMap);
var mirrorNetwork = MirrorNetwork.forNetwork(executor, new ArrayList<>());

return new Client(executor, network, mirrorNetwork, null, null);
return forNetwork(networkMap, executor);
}

/**
Expand All @@ -183,10 +201,11 @@ public static Client forName(String name) {
* Construct a Hedera client pre-configured for <a
* href="https://docs.hedera.com/guides/mainnet/address-book#mainnet-address-book">Mainnet access</a>.
*
* @param executor runs the grpc requests asynchronously. Note that calling `close()` method on one of the
* clients will close the executor for all the other clients sharing this executor
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forMainnet() {
var executor = createExecutor();
public static Client forMainnet(ExecutorService executor) {
var network = Network.forMainnet(executor);
var mirrorNetwork = MirrorNetwork.forMainnet(executor);

Expand All @@ -198,10 +217,11 @@ public static Client forMainnet() {
* Construct a Hedera client pre-configured for <a href="https://docs.hedera.com/guides/testnet/nodes">Testnet
* access</a>.
*
* @param executor runs the grpc requests asynchronously. Note that calling `close()` method on one of the
* clients will close the executor for all the other clients sharing this executor
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forTestnet() {
var executor = createExecutor();
public static Client forTestnet(ExecutorService executor) {
var network = Network.forTestnet(executor);
var mirrorNetwork = MirrorNetwork.forTestnet(executor);

Expand All @@ -214,17 +234,53 @@ public static Client forTestnet() {
* href="https://docs.hedera.com/guides/testnet/testnet-nodes#previewnet-node-public-keys">Preview Testnet
* nodes</a>.
*
* @param executor runs the grpc requests asynchronously. Note that calling `close()` method on one of the
* clients will close the executor for all the other clients sharing this executor
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forPreviewnet() {
var executor = createExecutor();
public static Client forPreviewnet(ExecutorService executor) {
var network = Network.forPreviewnet(executor);
var mirrorNetwork = MirrorNetwork.forPreviewnet(executor);

return new Client(executor, network, mirrorNetwork, NETWORK_UPDATE_INITIAL_DELAY,
DEFAULT_NETWORK_UPDATE_PERIOD);
}


/**
* Construct a Hedera client pre-configured for <a
* href="https://docs.hedera.com/guides/mainnet/address-book#mainnet-address-book">Mainnet access</a>.
*
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forMainnet() {
var executor = createExecutor();
return forMainnet(executor);
}

/**
* Construct a Hedera client pre-configured for <a href="https://docs.hedera.com/guides/testnet/nodes">Testnet
* access</a>.
*
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forTestnet() {
var executor = createExecutor();
return forTestnet(executor);
}

/**
* Construct a Hedera client pre-configured for <a
* href="https://docs.hedera.com/guides/testnet/testnet-nodes#previewnet-node-public-keys">Preview Testnet
* nodes</a>.
*
* @return {@link com.hedera.hashgraph.sdk.Client}
*/
public static Client forPreviewnet() {
var executor = createExecutor();
return forPreviewnet(executor);
}

/**
* Configure a client based off the given JSON string.
*
Expand Down Expand Up @@ -1365,6 +1421,34 @@ public synchronized void close() throws TimeoutException {
close(closeTimeout);
}

/**
* Initiates an orderly shutdown of all channels (to the Hedera network),
* without closing the ExecutorService {@link #executor}
*
* @throws TimeoutException if the network doesn't close in time
*/
public synchronized void closeChannels() throws TimeoutException {
var closeDeadline = Instant.now().plus(closeTimeout);

networkUpdatePeriod = null;
cancelScheduledNetworkUpdate();
cancelAllSubscriptions();

network.beginClose();
mirrorNetwork.beginClose();

var networkError = network.awaitClose(closeDeadline, null);
var mirrorNetworkError = mirrorNetwork.awaitClose(closeDeadline, networkError);

if (mirrorNetworkError != null) {
if (mirrorNetworkError instanceof TimeoutException ex) {
throw ex;
} else {
throw new RuntimeException(mirrorNetworkError);
}
}
}

/**
* Initiates an orderly shutdown of all channels (to the Hedera network) in which preexisting transactions or
* queries continue but more would be immediately cancelled.
Expand Down
6 changes: 5 additions & 1 deletion sdk/src/main/java/com/hedera/hashgraph/sdk/Executable.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ private void delay(long delay) {
}
try {
if (delay > 0) {
if (logger.isEnabledForLevel(LogLevel.DEBUG)) {
logger.debug("Sleeping for: " + delay + " | Thread name: " + Thread.currentThread().getName());
}
Thread.sleep(delay);
}
} catch (InterruptedException e) {
Expand Down Expand Up @@ -829,8 +832,9 @@ ExecutionState getExecutionState(Status status, ResponseT response) {
switch (status) {
case PLATFORM_TRANSACTION_NOT_CREATED:
case PLATFORM_NOT_ACTIVE:
case BUSY:
return ExecutionState.SERVER_ERROR;
case BUSY:
return ExecutionState.RETRY;
case OK:
return ExecutionState.SUCCESS;
default:
Expand Down
33 changes: 33 additions & 0 deletions sdk/src/test/java/com/hedera/hashgraph/sdk/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
*/
package com.hedera.hashgraph.sdk;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -47,12 +50,42 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class ClientTest {

@Test
@DisplayName("Can construct mainnet client")
void forMainnet() throws TimeoutException {
Client.forMainnet().close();
}

@Test
@DisplayName("Can construct mainnet client with executor")
void forMainnetWithExecutor() throws TimeoutException {
var executor = new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

Client.forMainnet(executor).close();
}

@Test
@DisplayName("Can construct testnet client with executor")
void forTestnetWithExecutor() throws TimeoutException {
var executor = new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

Client.forTestnet(executor).close();
}

@Test
@DisplayName("Can construct previewnet client with executor")
void forPreviewnetWithWithExecutor() throws TimeoutException {
var executor = new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

Client.forPreviewnet(executor).close();
}

@Test
@DisplayName("Client.setMaxQueryPayment() negative")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ void shouldRetryReturnsCorrectStates() {
assertThat(tx.getExecutionState(Status.PLATFORM_TRANSACTION_NOT_CREATED, null)).isEqualTo(
ExecutionState.SERVER_ERROR);
assertThat(tx.getExecutionState(Status.PLATFORM_NOT_ACTIVE, null)).isEqualTo(ExecutionState.SERVER_ERROR);
assertThat(tx.getExecutionState(Status.BUSY, null)).isEqualTo(ExecutionState.SERVER_ERROR);
assertThat(tx.getExecutionState(Status.BUSY, null)).isEqualTo(ExecutionState.RETRY);
assertThat(tx.getExecutionState(Status.OK, null)).isEqualTo(ExecutionState.SUCCESS);
assertThat(tx.getExecutionState(Status.ACCOUNT_DELETED, null)).isEqualTo(ExecutionState.REQUEST_ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
import org.junit.jupiter.api.Assumptions;

public class IntegrationTestEnv {
private static final String LOCAL_CONSENSUS_NODE_ENDPOINT = "127.0.0.1:50211";
private static final String LOCAL_MIRROR_NODE_GRPC_ENDPOINT = "127.0.0.1:5600";
private static final AccountId LOCAL_CONSENSUS_NODE_ACCOUNT_ID = new AccountId(3);
static final String LOCAL_CONSENSUS_NODE_ENDPOINT = "127.0.0.1:50211";
static final String LOCAL_MIRROR_NODE_GRPC_ENDPOINT = "127.0.0.1:5600";
static final AccountId LOCAL_CONSENSUS_NODE_ACCOUNT_ID = new AccountId(3);
private final Client originalClient;
public Client client;
public PublicKey operatorKey;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*-
*
* Hedera Java SDK
*
* Copyright (C) 2020 - 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.hedera.hashgraph.sdk.test.integration;

import static org.junit.jupiter.api.Assertions.fail;

import com.hedera.hashgraph.sdk.AccountCreateTransaction;
import com.hedera.hashgraph.sdk.AccountId;
import com.hedera.hashgraph.sdk.Client;
import com.hedera.hashgraph.sdk.PrivateKey;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

class LoadIntegrationTest {

@Test
@DisplayName("Load test with multiple clients and single executor")
void loadTest() throws Exception {
var testEnv = new IntegrationTestEnv(1);
var operatorPrivateKey = PrivateKey.fromString(System.getProperty("OPERATOR_KEY"));
var operatorId = AccountId.fromString(System.getProperty("OPERATOR_ID"));

int nThreads = 10;
var clientExecutor = Executors.newFixedThreadPool(16);

var threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);

long startTime = System.currentTimeMillis();

System.out.println("Finished executing tasks:");
for (int i = 0; i < nThreads; i++) {
int finalI = i;
threadPoolExecutor.submit(() -> {
var client = Client.forNetwork(testEnv.client.getNetwork(), clientExecutor);
client.setOperator(operatorId, operatorPrivateKey);
client.setMaxAttempts(10);
try {
new AccountCreateTransaction()
.setKey(PrivateKey.generateED25519())
.execute(client)
.getReceipt(client);
System.out.println(finalI);
} catch (Exception e) {
fail("AccountCreateTransaction failed, " + e);
} finally {
try {
client.closeChannels();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
});
}

threadPoolExecutor.shutdown();

// Wait for all tasks to finish
try {
if (!threadPoolExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
System.out.println();
System.out.println("Forcing shutdown");
threadPoolExecutor.shutdownNow();
}
} catch (InterruptedException e) {
threadPoolExecutor.shutdownNow();
}

long endTime = System.currentTimeMillis();
long executionTime = endTime - startTime;
System.out.println();
System.out.println("All tasks have finished execution in " + executionTime + "ms");
clientExecutor.shutdownNow();
testEnv.close();
}
}

0 comments on commit dd86ea8

Please sign in to comment.