diff --git a/build.gradle b/build.gradle index cb5d5094..f3809012 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ dependencies { implementation "org.slf4j:slf4j-api:${versions.slf4j}" - api 'io.orkes.conductor:orkes-conductor-common-protos:0.9.1' + api 'io.orkes.conductor:orkes-conductor-common-protos:0.9.2' //gRPC dependencies implementation ("io.grpc:grpc-netty:${versions.ioGRPC}") { diff --git a/gradle.properties b/gradle.properties index 98c86237..984ef0d5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=3.0.14 \ No newline at end of file +version=3.0.22 \ No newline at end of file diff --git a/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java b/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java index f7bfcd07..4b1fc2ac 100644 --- a/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java +++ b/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java @@ -166,6 +166,9 @@ public void runAccumulatedRequests() { if (currentPending <= 0) { return; } + if(currentPending > 20) { + currentPending = 20; + } // Make GRPC call for these many // Observe for results, add them to local queue if (callAgain.get()) { diff --git a/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java b/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java index 640dbf00..b8d41951 100644 --- a/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java +++ b/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java @@ -63,7 +63,11 @@ public List batchPollTasksInDomain( @Override public void updateTask(TaskResult taskResult) { - taskResourceApi.updateTask(taskResult); + if(apiClient.isUseGRPC()) { + grpcTaskClient.updateTask(taskResult); + } else { + taskResourceApi.updateTask(taskResult); + } } @Override diff --git a/src/test/java/io/orkes/conductor/client/LoadTestWorker.java b/src/test/java/io/orkes/conductor/client/LoadTestWorker.java index 91b5ee86..7267c71c 100644 --- a/src/test/java/io/orkes/conductor/client/LoadTestWorker.java +++ b/src/test/java/io/orkes/conductor/client/LoadTestWorker.java @@ -14,13 +14,11 @@ import java.security.SecureRandom; import java.util.UUID; -import java.util.concurrent.TimeUnit; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskResult; -import com.google.common.util.concurrent.Uninterruptibles; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -44,7 +42,7 @@ public TaskResult execute(Task task) { log.info("Executing {} - {}", task.getTaskType(), task.getTaskId()); TaskResult result = new TaskResult(task); - Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS); + //Uninterruptibles.sleepUninterruptibly(10000, TimeUnit.MILLISECONDS); result.setStatus(TaskResult.Status.COMPLETED); int resultCount = Math.max(20, secureRandom.nextInt(keyCount)); diff --git a/src/test/java/io/orkes/conductor/client/LocalWorkerTest.java b/src/test/java/io/orkes/conductor/client/LocalWorkerTest.java index 6dcb3414..236e64d1 100644 --- a/src/test/java/io/orkes/conductor/client/LocalWorkerTest.java +++ b/src/test/java/io/orkes/conductor/client/LocalWorkerTest.java @@ -28,7 +28,7 @@ public class LocalWorkerTest { public static void main(String[] args) { ApiClient apiClient = new ApiClient("http://localhost:8080/api"); - //apiClient.setUseGRPC("localhost", 8090); + apiClient.setUseGRPC("localhost", 8090); OrkesClients clients = new OrkesClients(apiClient); TaskClient taskClient = clients.getTaskClient(); @@ -36,7 +36,10 @@ public static void main(String[] args) { List workers = new ArrayList<>(); Map taskThreadCount = new HashMap<>(); - for (int i = 1; i <= 2; i++) { + workers.add(new LoadTestWorker("x_test_worker_4")); + taskThreadCount.put("x_test_worker_4", 1000); + + for (int i = 0; i < 0; i++) { workers.add(new LoadTestWorker("x_test_worker_" + i)); taskThreadCount.put("x_test_worker_" + i, 100); }