diff --git a/src/main/java/com/uber/cadence/client/WorkflowClient.java b/src/main/java/com/uber/cadence/client/WorkflowClient.java index 412650c4e..19b1038e3 100644 --- a/src/main/java/com/uber/cadence/client/WorkflowClient.java +++ b/src/main/java/com/uber/cadence/client/WorkflowClient.java @@ -107,77 +107,29 @@ public interface WorkflowClient { /** Replays workflow to the current state and returns empty result or error if replay failed. */ String QUERY_TYPE_REPLAY_ONLY = "__replay_only"; - /** - * Creates worker that connects to the local instance of the Cadence Service that listens on a - * default port (7933). - * - * @param domain domain that worker uses to poll. - */ - static WorkflowClient newInstance(String domain) { - return WorkflowClientInternal.newInstance(domain); - } - - /** - * Creates worker that connects to the local instance of the Cadence Service that listens on a - * default port (7933). - * - * @param domain domain that worker uses to poll. - * @param options Options (like {@link com.uber.cadence.converter.DataConverter}er override) for - * configuring client. - */ - static WorkflowClient newInstance(String domain, WorkflowClientOptions options) { - return WorkflowClientInternal.newInstance(domain, options); - } - - /** - * Creates client that connects to an instance of the Cadence Service. - * - * @param host of the Cadence Service endpoint - * @param port of the Cadence Service endpoint - * @param domain domain that worker uses to poll. - */ - static WorkflowClient newInstance(String host, int port, String domain) { - return WorkflowClientInternal.newInstance(host, port, domain); - } - - /** - * Creates client that connects to an instance of the Cadence Service. - * - * @param host of the Cadence Service endpoint - * @param port of the Cadence Service endpoint - * @param domain domain that worker uses to poll. - * @param options Options (like {@link com.uber.cadence.converter.DataConverter}er override) for - * configuring client. - */ - static WorkflowClient newInstance( - String host, int port, String domain, WorkflowClientOptions options) { - return WorkflowClientInternal.newInstance(host, port, domain, options); - } - /** * Creates client that connects to an instance of the Cadence Service. * * @param service client to the Cadence Service endpoint. - * @param domain domain that worker uses to poll. */ - static WorkflowClient newInstance(IWorkflowService service, String domain) { - return WorkflowClientInternal.newInstance(service, domain); + static WorkflowClient newInstance(IWorkflowService service) { + return WorkflowClientInternal.newInstance(service, WorkflowClientOptions.defaultInstance()); } /** * Creates client that connects to an instance of the Cadence Service. * * @param service client to the Cadence Service endpoint. - * @param domain domain that worker uses to poll. * @param options Options (like {@link com.uber.cadence.converter.DataConverter}er override) for * configuring client. */ - static WorkflowClient newInstance( - IWorkflowService service, String domain, WorkflowClientOptions options) { - return WorkflowClientInternal.newInstance(service, domain, options); + static WorkflowClient newInstance(IWorkflowService service, WorkflowClientOptions options) { + return WorkflowClientInternal.newInstance(service, options); } - String getDomain(); + WorkflowClientOptions getOptions(); + + IWorkflowService getService(); /** * Creates workflow client stub that can be used to start a single workflow execution. The first @@ -728,7 +680,4 @@ static CompletableFuture execute( A6 arg6) { return WorkflowClientInternal.execute(workflow, arg1, arg2, arg3, arg4, arg5, arg6); } - - /** Closes the workflow client and the underlying IWorkflowService when this method is called. */ - void close(); } diff --git a/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java b/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java index e25d541f6..93dd2e8ff 100644 --- a/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java +++ b/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java @@ -25,21 +25,47 @@ /** Options for WorkflowClient configuration. */ public final class WorkflowClientOptions { + private static final String DEFAULT_DOMAIN = "default"; + private static final WorkflowClientOptions DEFAULT_INSTANCE; + private static final WorkflowClientInterceptor[] EMPTY_INTERCEPTOR_ARRAY = + new WorkflowClientInterceptor[0]; - public static final class Builder { + static { + DEFAULT_INSTANCE = new Builder().build(); + } + + public static WorkflowClientOptions defaultInstance() { + return DEFAULT_INSTANCE; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static Builder newBuilder(WorkflowClientOptions options) { + return new Builder(options); + } + public static final class Builder { + private String domain = DEFAULT_DOMAIN; private DataConverter dataConverter = JsonDataConverter.getInstance(); private WorkflowClientInterceptor[] interceptors = EMPTY_INTERCEPTOR_ARRAY; - private Scope metricsScope; + private Scope metricsScope = NoopScope.getInstance(); - public Builder() {} + private Builder() {} - public Builder(WorkflowClientOptions options) { + private Builder(WorkflowClientOptions options) { + domain = options.getDomain(); dataConverter = options.getDataConverter(); interceptors = options.getInterceptors(); metricsScope = options.getMetricsScope(); } + public Builder setDomain(String domain) { + this.domain = domain; + return this; + } + /** * Used to override default (JSON) data converter implementation. * @@ -72,28 +98,30 @@ public Builder setMetricsScope(Scope metricsScope) { } public WorkflowClientOptions build() { - if (metricsScope == null) { - metricsScope = NoopScope.getInstance(); - } - return new WorkflowClientOptions(dataConverter, interceptors, metricsScope); + return new WorkflowClientOptions(domain, dataConverter, interceptors, metricsScope); } } - private static final WorkflowClientInterceptor[] EMPTY_INTERCEPTOR_ARRAY = - new WorkflowClientInterceptor[0]; + private final String domain; private final DataConverter dataConverter; - private final WorkflowClientInterceptor[] interceptors; - private final Scope metricsScope; private WorkflowClientOptions( - DataConverter dataConverter, WorkflowClientInterceptor[] interceptors, Scope metricsScope) { + String domain, + DataConverter dataConverter, + WorkflowClientInterceptor[] interceptors, + Scope metricsScope) { + this.domain = domain; this.dataConverter = dataConverter; this.interceptors = interceptors; this.metricsScope = metricsScope; } + public String getDomain() { + return domain; + } + public DataConverter getDataConverter() { return dataConverter; } diff --git a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientFactoryImpl.java b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientFactoryImpl.java index 12b67dbd9..3697d1e7b 100644 --- a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientFactoryImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientFactoryImpl.java @@ -24,6 +24,7 @@ import com.uber.m3.tally.Scope; import com.uber.m3.util.ImmutableMap; import java.util.Map; +import java.util.Objects; public class ManualActivityCompletionClientFactoryImpl extends ManualActivityCompletionClientFactory { @@ -35,9 +36,9 @@ public class ManualActivityCompletionClientFactoryImpl public ManualActivityCompletionClientFactoryImpl( IWorkflowService service, String domain, DataConverter dataConverter, Scope metricsScope) { - this.service = service; - this.domain = domain; - this.dataConverter = dataConverter; + this.service = Objects.requireNonNull(service); + this.domain = Objects.requireNonNull(domain); + this.dataConverter = Objects.requireNonNull(dataConverter); Map tags = new ImmutableMap.Builder(1).put(MetricsTag.DOMAIN, domain).build(); @@ -54,16 +55,11 @@ public DataConverter getDataConverter() { @Override public ManualActivityCompletionClient getClient(byte[] taskToken) { - if (service == null) { - throw new IllegalStateException("required property service is null"); - } - if (dataConverter == null) { - throw new IllegalStateException("required property dataConverter is null"); - } if (taskToken == null || taskToken.length == 0) { throw new IllegalArgumentException("null or empty task token"); } - return new ManualActivityCompletionClientImpl(service, taskToken, dataConverter, metricsScope); + return new ManualActivityCompletionClientImpl( + service, domain, taskToken, dataConverter, metricsScope); } @Override diff --git a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java index 84a8cf98c..ab30159bc 100644 --- a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java @@ -57,11 +57,15 @@ class ManualActivityCompletionClientImpl extends ManualActivityCompletionClient private final Scope metricsScope; ManualActivityCompletionClientImpl( - IWorkflowService service, byte[] taskToken, DataConverter dataConverter, Scope metricsScope) { + IWorkflowService service, + String domain, + byte[] taskToken, + DataConverter dataConverter, + Scope metricsScope) { this.service = service; this.taskToken = taskToken; this.dataConverter = dataConverter; - this.domain = null; + this.domain = domain; this.execution = null; this.activityId = null; this.metricsScope = metricsScope; diff --git a/src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java b/src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java index 5a12b0a7d..f63b93b61 100644 --- a/src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java @@ -273,6 +273,7 @@ public void runUntilAllBlocked() throws Throwable { if (nextWakeUpTime < currentTimeMillis() || nextWakeUpTime == Long.MAX_VALUE) { nextWakeUpTime = 0; } + } finally { inRunUntilAllBlocked = false; // Close was requested while running diff --git a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java index 33479e6b8..30dcb206d 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java @@ -74,7 +74,7 @@ public TestActivityEnvironmentInternal(TestEnvironmentOptions options) { activityTaskHandler = new POJOActivityTaskHandler( new WorkflowServiceWrapper(workflowService), - testEnvironmentOptions.getDomain(), + testEnvironmentOptions.getWorkflowClientOptions().getDomain(), testEnvironmentOptions.getDataConverter(), heartbeatExecutor); } diff --git a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java index 32e1a9504..a8388108f 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java @@ -17,7 +17,76 @@ package com.uber.cadence.internal.sync; -import com.uber.cadence.*; +import com.uber.cadence.BadRequestError; +import com.uber.cadence.CancellationAlreadyRequestedError; +import com.uber.cadence.ClientVersionNotSupportedError; +import com.uber.cadence.ClusterInfo; +import com.uber.cadence.CountWorkflowExecutionsRequest; +import com.uber.cadence.CountWorkflowExecutionsResponse; +import com.uber.cadence.DeprecateDomainRequest; +import com.uber.cadence.DescribeDomainRequest; +import com.uber.cadence.DescribeDomainResponse; +import com.uber.cadence.DescribeTaskListRequest; +import com.uber.cadence.DescribeTaskListResponse; +import com.uber.cadence.DescribeWorkflowExecutionRequest; +import com.uber.cadence.DescribeWorkflowExecutionResponse; +import com.uber.cadence.DomainAlreadyExistsError; +import com.uber.cadence.DomainNotActiveError; +import com.uber.cadence.EntityNotExistsError; +import com.uber.cadence.GetSearchAttributesResponse; +import com.uber.cadence.GetWorkflowExecutionHistoryRequest; +import com.uber.cadence.GetWorkflowExecutionHistoryResponse; +import com.uber.cadence.InternalServiceError; +import com.uber.cadence.LimitExceededError; +import com.uber.cadence.ListArchivedWorkflowExecutionsRequest; +import com.uber.cadence.ListArchivedWorkflowExecutionsResponse; +import com.uber.cadence.ListClosedWorkflowExecutionsRequest; +import com.uber.cadence.ListClosedWorkflowExecutionsResponse; +import com.uber.cadence.ListDomainsRequest; +import com.uber.cadence.ListDomainsResponse; +import com.uber.cadence.ListOpenWorkflowExecutionsRequest; +import com.uber.cadence.ListOpenWorkflowExecutionsResponse; +import com.uber.cadence.ListTaskListPartitionsRequest; +import com.uber.cadence.ListTaskListPartitionsResponse; +import com.uber.cadence.ListWorkflowExecutionsRequest; +import com.uber.cadence.ListWorkflowExecutionsResponse; +import com.uber.cadence.PollForActivityTaskRequest; +import com.uber.cadence.PollForActivityTaskResponse; +import com.uber.cadence.PollForDecisionTaskRequest; +import com.uber.cadence.PollForDecisionTaskResponse; +import com.uber.cadence.QueryFailedError; +import com.uber.cadence.QueryRejectCondition; +import com.uber.cadence.QueryWorkflowRequest; +import com.uber.cadence.QueryWorkflowResponse; +import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest; +import com.uber.cadence.RecordActivityTaskHeartbeatRequest; +import com.uber.cadence.RecordActivityTaskHeartbeatResponse; +import com.uber.cadence.RegisterDomainRequest; +import com.uber.cadence.RequestCancelWorkflowExecutionRequest; +import com.uber.cadence.ResetStickyTaskListRequest; +import com.uber.cadence.ResetStickyTaskListResponse; +import com.uber.cadence.ResetWorkflowExecutionRequest; +import com.uber.cadence.ResetWorkflowExecutionResponse; +import com.uber.cadence.RespondActivityTaskCanceledByIDRequest; +import com.uber.cadence.RespondActivityTaskCanceledRequest; +import com.uber.cadence.RespondActivityTaskCompletedByIDRequest; +import com.uber.cadence.RespondActivityTaskCompletedRequest; +import com.uber.cadence.RespondActivityTaskFailedByIDRequest; +import com.uber.cadence.RespondActivityTaskFailedRequest; +import com.uber.cadence.RespondDecisionTaskCompletedRequest; +import com.uber.cadence.RespondDecisionTaskCompletedResponse; +import com.uber.cadence.RespondDecisionTaskFailedRequest; +import com.uber.cadence.RespondQueryTaskCompletedRequest; +import com.uber.cadence.ServiceBusyError; +import com.uber.cadence.SignalWithStartWorkflowExecutionRequest; +import com.uber.cadence.SignalWorkflowExecutionRequest; +import com.uber.cadence.StartWorkflowExecutionRequest; +import com.uber.cadence.StartWorkflowExecutionResponse; +import com.uber.cadence.TerminateWorkflowExecutionRequest; +import com.uber.cadence.UpdateDomainRequest; +import com.uber.cadence.UpdateDomainResponse; +import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowExecutionAlreadyStartedError; import com.uber.cadence.client.ActivityCompletionClient; import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.client.WorkflowClientInterceptor; @@ -56,8 +125,10 @@ public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) { } service = new WorkflowServiceWrapper(); service.lockTimeSkipping("TestWorkflowEnvironmentInternal constructor"); + WorkflowClient client = + WorkflowClient.newInstance(service, testEnvironmentOptions.getWorkflowClientOptions()); workerFactory = - new Worker.Factory(service, options.getDomain(), options.getWorkerFactoryOptions()); + Worker.Factory.newInstance(client, testEnvironmentOptions.getWorkerFactoryOptions()); } @Override @@ -71,25 +142,25 @@ public Worker newWorker( WorkerOptions.Builder builder = new WorkerOptions.Builder() .setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory()) - .setMetricsScope(testEnvironmentOptions.getMetricsScope()) + .setMetricsScope(testEnvironmentOptions.getWorkflowClientOptions().getMetricsScope()) .setEnableLoggingInReplay(testEnvironmentOptions.isLoggingEnabledInReplay()); if (testEnvironmentOptions.getDataConverter() != null) { builder.setDataConverter(testEnvironmentOptions.getDataConverter()); } builder = overrideOptions.apply(builder); - Worker result = workerFactory.newWorker(taskList, builder.build()); - return result; + return workerFactory.newWorker(taskList, builder.build()); } @Override public WorkflowClient newWorkflowClient() { WorkflowClientOptions options = - new WorkflowClientOptions.Builder() + WorkflowClientOptions.newBuilder() .setDataConverter(testEnvironmentOptions.getDataConverter()) .setInterceptors(new TimeLockingInterceptor(service)) - .setMetricsScope(testEnvironmentOptions.getMetricsScope()) + .setMetricsScope(testEnvironmentOptions.getWorkflowClientOptions().getMetricsScope()) + .setDomain(testEnvironmentOptions.getWorkflowClientOptions().getDomain()) .build(); - return WorkflowClientInternal.newInstance(service, testEnvironmentOptions.getDomain(), options); + return WorkflowClientInternal.newInstance(service, options); } @Override @@ -100,9 +171,8 @@ public WorkflowClient newWorkflowClient(WorkflowClientOptions options) { System.arraycopy(existingInterceptors, 0, interceptors, 0, existingInterceptors.length); interceptors[interceptors.length - 1] = new TimeLockingInterceptor(service); WorkflowClientOptions newOptions = - new WorkflowClientOptions.Builder(options).setInterceptors(interceptors).build(); - return WorkflowClientInternal.newInstance( - service, testEnvironmentOptions.getDomain(), newOptions); + WorkflowClientOptions.newBuilder(options).setInterceptors(interceptors).build(); + return WorkflowClientInternal.newInstance(service, newOptions); } @Override @@ -127,7 +197,7 @@ public IWorkflowService getWorkflowService() { @Override public String getDomain() { - return testEnvironmentOptions.getDomain(); + return testEnvironmentOptions.getWorkflowClientOptions().getDomain(); } @Override diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowClientInternal.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowClientInternal.java index 8f0f098f5..4f417340a 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowClientInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowClientInternal.java @@ -33,7 +33,6 @@ import com.uber.cadence.internal.external.ManualActivityCompletionClientFactoryImpl; import com.uber.cadence.internal.sync.WorkflowInvocationHandler.InvocationType; import com.uber.cadence.serviceclient.IWorkflowService; -import com.uber.cadence.serviceclient.WorkflowServiceTChannel; import com.uber.cadence.workflow.Functions; import com.uber.cadence.workflow.QueryMethod; import com.uber.cadence.workflow.WorkflowMethod; @@ -41,6 +40,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.Arrays; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -51,104 +51,48 @@ public final class WorkflowClientInternal implements WorkflowClient { private final DataConverter dataConverter; private final WorkflowClientInterceptor[] interceptors; private final IWorkflowService workflowService; - - /** - * Creates worker that connects to the local instance of the Cadence Service that listens on a - * default port (7933). - * - * @param domain domain that worker uses to poll. - */ - public static WorkflowClient newInstance(String domain) { - return new WorkflowClientInternal( - new WorkflowServiceTChannel(), domain, new WorkflowClientOptions.Builder().build()); - } - - /** - * Creates worker that connects to the local instance of the Cadence Service that listens on a - * default port (7933). - * - * @param domain domain that worker uses to poll. - * @param options Options (like {@link com.uber.cadence.converter.DataConverter} override) for - * configuring client. - */ - public static WorkflowClient newInstance(String domain, WorkflowClientOptions options) { - return new WorkflowClientInternal(new WorkflowServiceTChannel(), domain, options); - } - - /** - * Creates client that connects to an instance of the Cadence Service. - * - * @param host of the Cadence Service endpoint - * @param port of the Cadence Service endpoint - * @param domain domain that worker uses to poll. - */ - public static WorkflowClient newInstance(String host, int port, String domain) { - return new WorkflowClientInternal( - new WorkflowServiceTChannel(host, port), - domain, - new WorkflowClientOptions.Builder().build()); - } - - /** - * Creates client that connects to an instance of the Cadence Service. - * - * @param host of the Cadence Service endpoint - * @param port of the Cadence Service endpoint - * @param domain domain that worker uses to poll. - * @param options Options (like {@link com.uber.cadence.converter.DataConverter} override) for - * configuring client. - */ - public static WorkflowClient newInstance( - String host, int port, String domain, WorkflowClientOptions options) { - return new WorkflowClientInternal(new WorkflowServiceTChannel(host, port), domain, options); - } - - /** - * Creates client that connects to an instance of the Cadence Service. - * - * @param service client to the Cadence Service endpoint. - * @param domain domain that worker uses to poll. - */ - public static WorkflowClient newInstance(IWorkflowService service, String domain) { - return new WorkflowClientInternal(service, domain, null); - } + private final WorkflowClientOptions options; /** * Creates client that connects to an instance of the Cadence Service. * * @param service client to the Cadence Service endpoint. - * @param domain domain that worker uses to poll. * @param options Options (like {@link com.uber.cadence.converter.DataConverter} override) for * configuring client. */ public static WorkflowClient newInstance( - IWorkflowService service, String domain, WorkflowClientOptions options) { - return new WorkflowClientInternal(service, domain, options); + IWorkflowService service, WorkflowClientOptions options) { + Objects.requireNonNull(service); + Objects.requireNonNull(options); + return new WorkflowClientInternal(service, options); } - private WorkflowClientInternal( - IWorkflowService service, String domain, WorkflowClientOptions options) { - if (options == null) { - options = new WorkflowClientOptions.Builder().build(); - } + private WorkflowClientInternal(IWorkflowService service, WorkflowClientOptions options) { + this.options = options; this.workflowService = service; this.genericClient = - new GenericWorkflowClientExternalImpl(service, domain, options.getMetricsScope()); + new GenericWorkflowClientExternalImpl( + service, options.getDomain(), options.getMetricsScope()); this.dataConverter = options.getDataConverter(); this.interceptors = options.getInterceptors(); this.manualActivityCompletionClientFactory = new ManualActivityCompletionClientFactoryImpl( - service, domain, dataConverter, options.getMetricsScope()); + service, options.getDomain(), dataConverter, options.getMetricsScope()); } @Override - public T newWorkflowStub(Class workflowInterface) { - return newWorkflowStub(workflowInterface, (WorkflowOptions) null); + public WorkflowClientOptions getOptions() { + return options; } @Override - public String getDomain() { - return genericClient.getDomain(); + public IWorkflowService getService() { + return workflowService; + } + + @Override + public T newWorkflowStub(Class workflowInterface) { + return newWorkflowStub(workflowInterface, (WorkflowOptions) null); } @Override @@ -451,9 +395,4 @@ public static CompletableFuture execute( A6 arg6) { return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6)); } - - @Override - public void close() { - this.workflowService.close(); - } } diff --git a/src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcher.java b/src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcher.java index e288fac7e..886d48034 100644 --- a/src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcher.java +++ b/src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcher.java @@ -76,7 +76,6 @@ public void process(PollForDecisionTaskResponse t) { try { service.RespondDecisionTaskFailed(request); - } catch (Exception e) { uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e); } diff --git a/src/main/java/com/uber/cadence/internal/worker/Poller.java b/src/main/java/com/uber/cadence/internal/worker/Poller.java index 157c8c23a..03a0dfbda 100644 --- a/src/main/java/com/uber/cadence/internal/worker/Poller.java +++ b/src/main/java/com/uber/cadence/internal/worker/Poller.java @@ -90,8 +90,8 @@ public Poller( @Override public void start() { - if (log.isInfoEnabled()) { - log.info("start(): " + toString()); + if (log.isDebugEnabled()) { + log.debug("start(): " + toString()); } if (pollerOptions.getMaximumPollRatePerSecond() > 0.0) { pollRateThrottler = @@ -143,7 +143,7 @@ public boolean isTerminated() { @Override public void shutdown() { - log.info("shutdown"); + log.debug("shutdown"); if (!isStarted()) { return; } @@ -159,8 +159,8 @@ public void shutdown() { @Override public void shutdownNow() { - if (log.isInfoEnabled()) { - log.info("shutdownNow poller=" + this.pollerOptions.getPollThreadNamePrefix()); + if (log.isDebugEnabled()) { + log.debug("shutdownNow poller=" + this.pollerOptions.getPollThreadNamePrefix()); } if (!isStarted()) { return; diff --git a/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java b/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java new file mode 100644 index 000000000..b8781f861 --- /dev/null +++ b/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java @@ -0,0 +1,316 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017-2020 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.serviceclient; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.uber.cadence.internal.metrics.NoopScope; +import com.uber.m3.tally.Scope; +import java.util.Map; + +public class ClientOptions { + private static final int DEFAULT_LOCAL_CADENCE_SERVER_PORT = 7933; + + private static final String LOCALHOST = "127.0.0.1"; + + /** Default RPC timeout used for all non long poll calls. */ + private static final long DEFAULT_RPC_TIMEOUT_MILLIS = 1000; + /** Default RPC timeout used for all long poll calls. */ + private static final long DEFAULT_POLL_RPC_TIMEOUT_MILLIS = 125 * 1000; + + /** Default RPC timeout for QueryWorkflow */ + private static final long DEFAULT_QUERY_RPC_TIMEOUT_MILLIS = 10 * 1000; + + /** Default RPC timeout for ListArchivedWorkflow */ + private static final long DEFAULT_LIST_ARCHIVED_WORKFLOW_TIMEOUT_MILLIS = 180 * 1000; + + private static final String DEFAULT_CLIENT_APP_NAME = "cadence-client"; + + /** Name of the Cadence service front end as required by TChannel. */ + private static final String DEFAULT_SERVICE_NAME = "cadence-frontend"; + + private final String host; + private final int port; + + /** The tChannel timeout in milliseconds */ + private final long rpcTimeoutMillis; + + /** The tChannel timeout for long poll calls in milliseconds */ + private final long rpcLongPollTimeoutMillis; + + /** The tChannel timeout for query workflow call in milliseconds */ + private final long rpcQueryTimeoutMillis; + + /** The tChannel timeout for list archived workflow call in milliseconds */ + private final long rpcListArchivedWorkflowTimeoutMillis; + + /** TChannel service name that the Cadence service was started with. */ + private final String serviceName; + + /** Name of the service using the cadence-client. */ + private final String clientAppName; + + /** Client for metrics reporting. */ + private final Scope metricsScope; + + /** Optional TChannel transport headers */ + private final Map transportHeaders; + + /** Optional TChannel headers */ + private final Map headers; + + private static final ClientOptions DEFAULT_INSTANCE; + + static { + DEFAULT_INSTANCE = new Builder().build(); + } + + public static ClientOptions defaultInstance() { + return DEFAULT_INSTANCE; + } + + public static Builder newBuilder() { + return new Builder(); + } + + private ClientOptions(Builder builder) { + if (Strings.isNullOrEmpty(builder.host)) { + host = + Strings.isNullOrEmpty(System.getenv("CADENCE_SEEDS")) + ? LOCALHOST + : System.getenv("CADENCE_SEEDS"); + } else { + host = builder.host; + } + + this.port = builder.port; + this.rpcTimeoutMillis = builder.rpcTimeoutMillis; + if (builder.clientAppName == null) { + this.clientAppName = DEFAULT_CLIENT_APP_NAME; + } else { + this.clientAppName = builder.clientAppName; + } + if (builder.serviceName == null) { + this.serviceName = DEFAULT_SERVICE_NAME; + } else { + this.serviceName = builder.serviceName; + } + this.rpcLongPollTimeoutMillis = builder.rpcLongPollTimeoutMillis; + this.rpcQueryTimeoutMillis = builder.rpcQueryTimeoutMillis; + this.rpcListArchivedWorkflowTimeoutMillis = builder.rpcListArchivedWorkflowTimeoutMillis; + if (builder.metricsScope == null) { + builder.metricsScope = NoopScope.getInstance(); + } + this.metricsScope = builder.metricsScope; + if (builder.transportHeaders != null) { + this.transportHeaders = ImmutableMap.copyOf(builder.transportHeaders); + } else { + this.transportHeaders = ImmutableMap.of(); + } + + if (builder.headers != null) { + this.headers = ImmutableMap.copyOf(builder.headers); + } else { + this.headers = ImmutableMap.of(); + } + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + /** @return Returns the rpc timeout value in millis. */ + public long getRpcTimeoutMillis() { + return rpcTimeoutMillis; + } + + /** @return Returns the rpc timout for long poll requests in millis. */ + public long getRpcLongPollTimeoutMillis() { + return rpcLongPollTimeoutMillis; + } + + /** @return Returns the rpc timout for query workflow requests in millis. */ + public long getRpcQueryTimeoutMillis() { + return rpcQueryTimeoutMillis; + } + + /** @return Returns the rpc timout for list archived workflow requests in millis. */ + public long getRpcListArchivedWorkflowTimeoutMillis() { + return rpcListArchivedWorkflowTimeoutMillis; + } + + /** Returns the client application name. */ + public String getClientAppName() { + return this.clientAppName; + } + + public String getServiceName() { + return serviceName; + } + + public Scope getMetricsScope() { + return metricsScope; + } + + public Map getTransportHeaders() { + return transportHeaders; + } + + public Map getHeaders() { + return headers; + } + + /** + * Builder is the builder for ClientOptions. + * + * @author venkat + */ + public static class Builder { + private String host; + private int port = DEFAULT_LOCAL_CADENCE_SERVER_PORT; + private String clientAppName = DEFAULT_CLIENT_APP_NAME; + private long rpcTimeoutMillis = DEFAULT_RPC_TIMEOUT_MILLIS; + private long rpcLongPollTimeoutMillis = DEFAULT_POLL_RPC_TIMEOUT_MILLIS; + private long rpcQueryTimeoutMillis = DEFAULT_QUERY_RPC_TIMEOUT_MILLIS; + private long rpcListArchivedWorkflowTimeoutMillis = + DEFAULT_LIST_ARCHIVED_WORKFLOW_TIMEOUT_MILLIS; + private String serviceName; + private Scope metricsScope; + private Map transportHeaders; + private Map headers; + + private Builder() {} + + public Builder setHost(String host) { + this.host = host; + return this; + } + + public Builder setPort(int port) { + this.port = port; + return this; + } + + /** + * Sets the rpc timeout value for non query and non long poll calls. Default is 1000. + * + * @param timeoutMillis timeout, in millis. + */ + public Builder setRpcTimeout(long timeoutMillis) { + this.rpcTimeoutMillis = timeoutMillis; + return this; + } + + /** + * Sets the rpc timeout value for the following long poll based operations: PollForDecisionTask, + * PollForActivityTask, GetWorkflowExecutionHistory. Should never be below 60000 as this is + * server side timeout for the long poll. Default is 61000. + * + * @param timeoutMillis timeout, in millis. + */ + public Builder setRpcLongPollTimeout(long timeoutMillis) { + this.rpcLongPollTimeoutMillis = timeoutMillis; + return this; + } + + /** + * Sets the rpc timeout value for query calls. Default is 10000. + * + * @param timeoutMillis timeout, in millis. + */ + public Builder setQueryRpcTimeout(long timeoutMillis) { + this.rpcQueryTimeoutMillis = timeoutMillis; + return this; + } + + /** + * Sets the rpc timeout value for query calls. Default is 180000. + * + * @param timeoutMillis timeout, in millis. + */ + public Builder setListArchivedWorkflowRpcTimeout(long timeoutMillis) { + this.rpcListArchivedWorkflowTimeoutMillis = timeoutMillis; + return this; + } + + /** + * Sets the client application name. + * + *

This name will be used as the tchannel client service name. It will also be reported as a + * tag along with metrics emitted to m3. + * + * @param clientAppName String representing the client application name. + * @return Builder for ClentOptions + */ + public Builder setClientAppName(String clientAppName) { + this.clientAppName = clientAppName; + return this; + } + + /** + * Sets the service name that Cadence service was started with. + * + * @param serviceName String representing the service name + * @return Builder for ClentOptions + */ + public Builder setServiceName(String serviceName) { + this.serviceName = serviceName; + return this; + } + + /** + * Sets the metrics scope to be used for metrics reporting. + * + * @param metricsScope + * @return Builder for ClentOptions + */ + public Builder setMetricsScope(Scope metricsScope) { + this.metricsScope = metricsScope; + return this; + } + + /** + * Sets additional transport headers for tchannel client. + * + * @param transportHeaders Map with additional transport headers + * @return Builder for ClentOptions + */ + public Builder setTransportHeaders(Map transportHeaders) { + this.transportHeaders = transportHeaders; + return this; + } + + public Builder setHeaders(Map headers) { + this.headers = headers; + return this; + } + + /** + * Builds and returns a ClientOptions object. + * + * @return ClientOptions object with the specified params. + */ + public ClientOptions build() { + return new ClientOptions(this); + } + } +} diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index 0c7180dab..2044dff5a 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -17,7 +17,6 @@ package com.uber.cadence.serviceclient; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.uber.cadence.BadRequestError; import com.uber.cadence.ClientVersionNotSupportedError; @@ -93,7 +92,6 @@ import com.uber.cadence.internal.common.CheckedExceptionWrapper; import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.internal.metrics.MetricsType; -import com.uber.cadence.internal.metrics.NoopScope; import com.uber.cadence.internal.metrics.ServiceMethod; import com.uber.m3.tally.Scope; import com.uber.m3.tally.Stopwatch; @@ -108,7 +106,10 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.thrift.TException; @@ -118,255 +119,8 @@ import org.slf4j.LoggerFactory; public class WorkflowServiceTChannel implements IWorkflowService { - - private static final int DEFAULT_LOCAL_CADENCE_SERVER_PORT = 7933; - - private static final String LOCALHOST = "127.0.0.1"; - - /** Default RPC timeout used for all non long poll calls. */ - private static final long DEFAULT_RPC_TIMEOUT_MILLIS = 1000; - /** Default RPC timeout used for all long poll calls. */ - private static final long DEFAULT_POLL_RPC_TIMEOUT_MILLIS = 121 * 1000; - - /** Default RPC timeout for QueryWorkflow */ - private static final long DEFAULT_QUERY_RPC_TIMEOUT_MILLIS = 10 * 1000; - - /** Default RPC timeout for ListArchivedWorkflow */ - private static final long DEFAULT_LIST_ARCHIVED_WORKFLOW_TIMEOUT_MILLIS = 180 * 1000; - - private static final String DEFAULT_CLIENT_APP_NAME = "cadence-client"; - - /** Name of the Cadence service front end as required by TChannel. */ - private static final String DEFAULT_SERVICE_NAME = "cadence-frontend"; - private static final Logger log = LoggerFactory.getLogger(WorkflowServiceTChannel.class); - public static class ClientOptions { - - /** The tChannel timeout in milliseconds */ - private final long rpcTimeoutMillis; - - /** The tChannel timeout for long poll calls in milliseconds */ - private final long rpcLongPollTimeoutMillis; - - /** The tChannel timeout for query workflow call in milliseconds */ - private final long rpcQueryTimeoutMillis; - - /** The tChannel timeout for list archived workflow call in milliseconds */ - private final long rpcListArchivedWorkflowTimeoutMillis; - - /** TChannel service name that the Cadence service was started with. */ - private final String serviceName; - - /** Name of the service using the cadence-client. */ - private final String clientAppName; - - /** Client for metrics reporting. */ - private final Scope metricsScope; - - /** Optional TChannel transport headers */ - private final Map transportHeaders; - - /** Optional TChannel headers */ - private final Map headers; - - private ClientOptions(Builder builder) { - this.rpcTimeoutMillis = builder.rpcTimeoutMillis; - if (builder.clientAppName == null) { - this.clientAppName = DEFAULT_CLIENT_APP_NAME; - } else { - this.clientAppName = builder.clientAppName; - } - if (builder.serviceName == null) { - this.serviceName = DEFAULT_SERVICE_NAME; - } else { - this.serviceName = builder.serviceName; - } - this.rpcLongPollTimeoutMillis = builder.rpcLongPollTimeoutMillis; - this.rpcQueryTimeoutMillis = builder.rpcQueryTimeoutMillis; - this.rpcListArchivedWorkflowTimeoutMillis = builder.rpcListArchivedWorkflowTimeoutMillis; - if (builder.metricsScope == null) { - builder.metricsScope = NoopScope.getInstance(); - } - this.metricsScope = builder.metricsScope; - if (builder.transportHeaders != null) { - this.transportHeaders = ImmutableMap.copyOf(builder.transportHeaders); - } else { - this.transportHeaders = ImmutableMap.of(); - } - - if (builder.headers != null) { - this.headers = ImmutableMap.copyOf(builder.headers); - } else { - this.headers = ImmutableMap.of(); - } - } - - /** @return Returns the rpc timeout value in millis. */ - public long getRpcTimeoutMillis() { - return rpcTimeoutMillis; - } - - /** @return Returns the rpc timout for long poll requests in millis. */ - public long getRpcLongPollTimeoutMillis() { - return rpcLongPollTimeoutMillis; - } - - /** @return Returns the rpc timout for query workflow requests in millis. */ - public long getRpcQueryTimeoutMillis() { - return rpcQueryTimeoutMillis; - } - - /** @return Returns the rpc timout for list archived workflow requests in millis. */ - public long getRpcListArchivedWorkflowTimeoutMillis() { - return rpcListArchivedWorkflowTimeoutMillis; - } - - /** Returns the client application name. */ - public String getClientAppName() { - return this.clientAppName; - } - - public String getServiceName() { - return serviceName; - } - - public Scope getMetricsScope() { - return metricsScope; - } - - public Map getTransportHeaders() { - return transportHeaders; - } - - public Map getHeaders() { - return headers; - } - - /** - * Builder is the builder for ClientOptions. - * - * @author venkat - */ - public static class Builder { - - private String clientAppName = DEFAULT_CLIENT_APP_NAME; - // private MetricsClient metricsClient = new DefaultMetricsClient(); - private long rpcTimeoutMillis = DEFAULT_RPC_TIMEOUT_MILLIS; - private long rpcLongPollTimeoutMillis = DEFAULT_POLL_RPC_TIMEOUT_MILLIS; - public long rpcQueryTimeoutMillis = DEFAULT_QUERY_RPC_TIMEOUT_MILLIS; - public long rpcListArchivedWorkflowTimeoutMillis = - DEFAULT_LIST_ARCHIVED_WORKFLOW_TIMEOUT_MILLIS; - public String serviceName; - private Scope metricsScope; - private Map transportHeaders; - private Map headers; - - /** - * Sets the rpc timeout value for non query and non long poll calls. Default is 1000. - * - * @param timeoutMillis timeout, in millis. - */ - public Builder setRpcTimeout(long timeoutMillis) { - this.rpcTimeoutMillis = timeoutMillis; - return this; - } - - /** - * Sets the rpc timeout value for the following long poll based operations: - * PollForDecisionTask, PollForActivityTask, GetWorkflowExecutionHistory. Should never be - * below 60000 as this is server side timeout for the long poll. Default is 61000. - * - * @param timeoutMillis timeout, in millis. - */ - public Builder setRpcLongPollTimeout(long timeoutMillis) { - this.rpcLongPollTimeoutMillis = timeoutMillis; - return this; - } - - /** - * Sets the rpc timeout value for query calls. Default is 10000. - * - * @param timeoutMillis timeout, in millis. - */ - public Builder setQueryRpcTimeout(long timeoutMillis) { - this.rpcQueryTimeoutMillis = timeoutMillis; - return this; - } - - /** - * Sets the rpc timeout value for query calls. Default is 180000. - * - * @param timeoutMillis timeout, in millis. - */ - public Builder setListArchivedWorkflowRpcTimeout(long timeoutMillis) { - this.rpcListArchivedWorkflowTimeoutMillis = timeoutMillis; - return this; - } - - /** - * Sets the client application name. - * - *

This name will be used as the tchannel client service name. It will also be reported as - * a tag along with metrics emitted to m3. - * - * @param clientAppName String representing the client application name. - * @return Builder for ClentOptions - */ - public Builder setClientAppName(String clientAppName) { - this.clientAppName = clientAppName; - return this; - } - - /** - * Sets the service name that Cadence service was started with. - * - * @param serviceName String representing the service name - * @return Builder for ClentOptions - */ - public Builder setServiceName(String serviceName) { - this.serviceName = serviceName; - return this; - } - - /** - * Sets the metrics scope to be used for metrics reporting. - * - * @param metricsScope - * @return Builder for ClentOptions - */ - public Builder setMetricsScope(Scope metricsScope) { - this.metricsScope = metricsScope; - return this; - } - - /** - * Sets additional transport headers for tchannel client. - * - * @param transportHeaders Map with additional transport headers - * @return Builder for ClentOptions - */ - public Builder setTransportHeaders(Map transportHeaders) { - this.transportHeaders = transportHeaders; - return this; - } - - public Builder setHeaders(Map headers) { - this.headers = headers; - return this; - } - - /** - * Builds and returns a ClientOptions object. - * - * @return ClientOptions object with the specified params. - */ - public ClientOptions build() { - return new ClientOptions(this); - } - } - } - private static final String INTERFACE_NAME = "WorkflowService"; private final ClientOptions options; @@ -374,65 +128,34 @@ public ClientOptions build() { private final TChannel tChannel; private final SubChannel subChannel; - /** - * Creates Cadence client that connects to the local instance of the Cadence Service that listens - * on a default port (7933). - */ - public WorkflowServiceTChannel() { - this( - Strings.isNullOrEmpty(System.getenv("CADENCE_SEEDS")) - ? LOCALHOST - : System.getenv("CADENCE_SEEDS"), - DEFAULT_LOCAL_CADENCE_SERVER_PORT, - new ClientOptions.Builder().build()); - } - - /** - * Creates Cadence client that connects to the specified host and port using default options. - * - * @param host host to connect - * @param port port to connect - */ - public WorkflowServiceTChannel(String host, int port) { - this(host, port, new ClientOptions.Builder().build()); - } - /** * Creates Cadence client that connects to the specified host and port using specified options. * - * @param host host to connect - * @param port port to connect * @param options configuration options like rpc timeouts. */ - public WorkflowServiceTChannel(String host, int port, ClientOptions options) { - if (host == null) { - throw new IllegalArgumentException("null host"); - } - if (port <= 0) { - throw new IllegalArgumentException("0 or negative port"); - } + public WorkflowServiceTChannel(ClientOptions options) { this.options = options; this.thriftHeaders = getThriftHeaders(options); - // this.metricsReporter = new MetricsReporter(options.getMetricsClient()); - // Need to create tChannel last in order to prevent leaking when an exception is thrown this.tChannel = new TChannel.Builder(options.getClientAppName()).build(); + InetAddress address; try { - InetAddress address = InetAddress.getByName(host); - ArrayList peers = new ArrayList<>(); - peers.add(new InetSocketAddress(address, port)); - this.subChannel = tChannel.makeSubChannel(options.getServiceName()).setPeers(peers); - log.info( - "Initialized TChannel for service " - + this.subChannel.getServiceName() - + ", LibraryVersion: " - + Version.LIBRARY_VERSION - + ", FeatureVersion: " - + Version.FEATURE_VERSION); + address = InetAddress.getByName(options.getHost()); } catch (UnknownHostException e) { tChannel.shutdown(); - throw new RuntimeException("Unable to get name of host " + host, e); + throw new RuntimeException("Unable to get name of host " + options.getHost(), e); } + + ArrayList peers = new ArrayList<>(); + peers.add(new InetSocketAddress(address, options.getPort())); + this.subChannel = tChannel.makeSubChannel(options.getServiceName()).setPeers(peers); + log.info( + "Initialized TChannel for service " + + this.subChannel.getServiceName() + + ", LibraryVersion: " + + Version.LIBRARY_VERSION + + ", FeatureVersion: " + + Version.FEATURE_VERSION); } /** @@ -444,7 +167,6 @@ public WorkflowServiceTChannel(String host, int port, ClientOptions options) { public WorkflowServiceTChannel(SubChannel subChannel, ClientOptions options) { this.options = options; this.thriftHeaders = getThriftHeaders(options); - // this.metricsReporter = new MetricsReporter(options.getMetricsClient()); this.tChannel = null; this.subChannel = subChannel; } @@ -466,8 +188,8 @@ private static Map getThriftHeaders(ClientOptions options) { .put("cadence-client-feature-version", Version.FEATURE_VERSION) .put("cadence-client-name", "uber-java"); - if (options.headers != null) { - for (Map.Entry entry : options.headers.entrySet()) { + if (options.getHeaders() != null) { + for (Map.Entry entry : options.getHeaders().entrySet()) { builder.put(entry.getKey(), entry.getValue()); } } diff --git a/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java b/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java index 3db435e01..54c549a64 100644 --- a/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java +++ b/src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java @@ -18,12 +18,11 @@ package com.uber.cadence.testing; import com.google.common.annotations.VisibleForTesting; +import com.uber.cadence.client.WorkflowClientOptions; import com.uber.cadence.converter.DataConverter; import com.uber.cadence.converter.JsonDataConverter; -import com.uber.cadence.internal.metrics.NoopScope; import com.uber.cadence.worker.Worker; import com.uber.cadence.workflow.WorkflowInterceptor; -import com.uber.m3.tally.Scope; import java.util.Objects; import java.util.function.Function; @@ -34,25 +33,22 @@ public static final class Builder { private DataConverter dataConverter = JsonDataConverter.getInstance(); - private String domain = "unit-test"; - private Function interceptorFactory = (n) -> n; - private Scope metricsScope; - private boolean enableLoggingInReplay; private Worker.FactoryOptions factoryOptions; - /** Sets data converter to use for unit-tests. Default is {@link JsonDataConverter}. */ - public Builder setDataConverter(DataConverter dataConverter) { - this.dataConverter = Objects.requireNonNull(dataConverter); + private WorkflowClientOptions workflowClientOptions = WorkflowClientOptions.defaultInstance(); + + public Builder setWorkflowClientOptions(WorkflowClientOptions workflowClientOptions) { + this.workflowClientOptions = workflowClientOptions; return this; } - /** Set domain to use for test workflows. Optional. Default is "unit-test" */ - public Builder setDomain(String domain) { - this.domain = Objects.requireNonNull(domain); + /** Sets data converter to use for unit-tests. Default is {@link JsonDataConverter}. */ + public Builder setDataConverter(DataConverter dataConverter) { + this.dataConverter = Objects.requireNonNull(dataConverter); return this; } @@ -67,16 +63,8 @@ public Builder setInterceptorFactory( return this; } - /** - * Set scope to use for metrics reporting. Optional. Default is noop scope that skips reporting. - */ - public Builder setMetricsScope(Scope metricsScope) { - this.metricsScope = metricsScope; - return this; - } - /** Set factoryOptions for worker factory used to create workers. */ - public Builder setFactoryOptions(Worker.FactoryOptions options) { + public Builder setWorkerFactoryOptions(Worker.FactoryOptions options) { this.factoryOptions = options; return this; } @@ -88,10 +76,6 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) { } public TestEnvironmentOptions build() { - if (metricsScope == null) { - metricsScope = NoopScope.getInstance(); - } - if (factoryOptions == null) { factoryOptions = new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(); @@ -99,33 +83,29 @@ public TestEnvironmentOptions build() { return new TestEnvironmentOptions( dataConverter, - domain, interceptorFactory, - metricsScope, factoryOptions, + workflowClientOptions, enableLoggingInReplay); } } private final DataConverter dataConverter; - private final String domain; private final Function interceptorFactory; - private final Scope metricsScope; private final boolean enableLoggingInReplay; private final Worker.FactoryOptions workerFactoryOptions; + private final WorkflowClientOptions workflowClientOptions; private TestEnvironmentOptions( DataConverter dataConverter, - String domain, Function interceptorFactory, - Scope metricsScope, Worker.FactoryOptions options, + WorkflowClientOptions workflowClientOptions, boolean enableLoggingInReplay) { this.dataConverter = dataConverter; - this.domain = domain; this.interceptorFactory = interceptorFactory; - this.metricsScope = metricsScope; this.workerFactoryOptions = options; + this.workflowClientOptions = workflowClientOptions; this.enableLoggingInReplay = enableLoggingInReplay; } @@ -133,18 +113,10 @@ public DataConverter getDataConverter() { return dataConverter; } - public String getDomain() { - return domain; - } - public Function getInterceptorFactory() { return interceptorFactory; } - public Scope getMetricsScope() { - return metricsScope; - } - public boolean isLoggingEnabledInReplay() { return enableLoggingInReplay; } @@ -153,14 +125,17 @@ public Worker.FactoryOptions getWorkerFactoryOptions() { return workerFactoryOptions; } + public WorkflowClientOptions getWorkflowClientOptions() { + return workflowClientOptions; + } + @Override public String toString() { return "TestEnvironmentOptions{" + "dataConverter=" + dataConverter - + ", domain='" - + domain - + '\'' + + ", workflowClientOptions=" + + workflowClientOptions + '}'; } } diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 00fe4d340..a9e68b3d8 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -28,7 +28,6 @@ import com.uber.cadence.converter.DataConverter; import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.internal.metrics.MetricsTag; -import com.uber.cadence.internal.metrics.NoopScope; import com.uber.cadence.internal.replay.DeciderCache; import com.uber.cadence.internal.sync.SyncActivityWorker; import com.uber.cadence.internal.sync.SyncWorkflowWorker; @@ -39,7 +38,6 @@ import com.uber.cadence.internal.worker.Suspendable; import com.uber.cadence.internal.worker.WorkflowPollTaskFactory; import com.uber.cadence.serviceclient.IWorkflowService; -import com.uber.cadence.serviceclient.WorkflowServiceTChannel; import com.uber.cadence.workflow.Functions.Func; import com.uber.cadence.workflow.WorkflowMethod; import com.uber.m3.tally.Scope; @@ -52,7 +50,6 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -394,14 +391,19 @@ public boolean isSuspended() { /** Maintains worker creation and lifecycle. */ public static final class Factory { - private final List workers = new ArrayList<>(); - private final IWorkflowService workflowService; - /** Indicates if factory owns the service. An owned service is closed on shutdown. */ - private final boolean closeServiceOnShutdown; - private final String domain; - private final UUID id = - UUID.randomUUID(); // Guarantee uniqueness for stickyTaskListName when multiple factories + public static Factory newInstance(WorkflowClient workflowClient) { + return Factory.newInstance(workflowClient, FactoryOptions.defaultInstance()); + } + + public static Factory newInstance(WorkflowClient workflowClient, FactoryOptions options) { + return new Factory(workflowClient, options); + } + + private final List workers = new ArrayList<>(); + private final WorkflowClient workflowClient; + // Guarantee uniqueness for stickyTaskListName when multiple factories + private final UUID id = UUID.randomUUID(); private final ThreadPoolExecutor workflowThreadPool; private final AtomicInteger workflowThreadCounter = new AtomicInteger(); private final FactoryOptions factoryOptions; @@ -415,85 +417,16 @@ public static final class Factory { private final String statusErrorMessage = "attempted to %s while in %s state. Acceptable States: %s"; private static final Logger log = LoggerFactory.getLogger(Factory.class); - /** - * Creates a factory. Workers will be connected to a local deployment of cadence-server - * - * @param domain Domain used by workers to poll for workflows. - */ - public Factory(String domain) { - this(new WorkflowServiceTChannel(), true, domain, null); - } - - /** - * Creates a factory. Workers will be connected to the cadence-server at the specific host and - * port. - * - * @param host host used by the underlying workflowServiceClient to connect to. - * @param port port used by the underlying workflowServiceClient to connect to. - * @param domain Domain used by workers to poll for workflows. - */ - public Factory(String host, int port, String domain) { - this(new WorkflowServiceTChannel(host, port), true, domain, null); - } - - /** - * Creates a factory connected to a local deployment of cadence-server. - * - * @param domain Domain used by workers to poll for workflows. - * @param factoryOptions Options used to configure factory settings - */ - public Factory(String domain, FactoryOptions factoryOptions) { - this(new WorkflowServiceTChannel(), true, domain, factoryOptions); - } - - /** - * Creates a factory. Workers will be connected to the cadence-server at the specific host and - * port. - * - * @param host host used by the underlying workflowServiceClient to connect to. - * @param port port used by the underlying workflowServiceClient to connect to. - * @param domain Domain used by workers to poll for workflows. - * @param factoryOptions Options used to configure factory settings - */ - public Factory(String host, int port, String domain, FactoryOptions factoryOptions) { - this(new WorkflowServiceTChannel(host, port), true, domain, factoryOptions); - } /** * Creates a factory. Workers will be connect to the cadence-server using the workflowService * client passed in. * - * @param workflowService client to the Cadence Service endpoint. - * @param domain Domain used by workers to poll for workflows. - */ - public Factory(IWorkflowService workflowService, String domain) { - this(workflowService, false, domain, null); - } - - /** - * Creates a factory. Workers will be connect to the cadence-server using the workflowService - * client passed in. - * - * @param workflowService client to the Cadence Service endpoint. - * @param domain Domain used by workers to poll for workflows. + * @param workflowClient client to the Cadence Service endpoint. * @param factoryOptions Options used to configure factory settings */ - public Factory(IWorkflowService workflowService, String domain, FactoryOptions factoryOptions) { - this(workflowService, false, domain, factoryOptions); - } - - private Factory( - IWorkflowService workflowService, - boolean closeServiceOnShutdown, - String domain, - FactoryOptions factoryOptions) { - Preconditions.checkArgument( - !Strings.isNullOrEmpty(domain), "domain should not be an empty string"); - - this.domain = domain; - this.workflowService = - Objects.requireNonNull(workflowService, "workflowService should not be null"); - this.closeServiceOnShutdown = closeServiceOnShutdown; + public Factory(WorkflowClient workflowClient, FactoryOptions factoryOptions) { + this.workflowClient = Objects.requireNonNull(workflowClient); factoryOptions = factoryOptions == null ? new FactoryOptions.Builder().build() : factoryOptions; this.factoryOptions = factoryOptions; @@ -513,20 +446,27 @@ private Factory( } Scope metricsScope = - this.factoryOptions.metricsScope.tagged( - new ImmutableMap.Builder(2) - .put(MetricsTag.DOMAIN, domain) - .put(MetricsTag.TASK_LIST, getHostName()) - .build()); + workflowClient + .getOptions() + .getMetricsScope() + .tagged( + new ImmutableMap.Builder(2) + .put(MetricsTag.DOMAIN, workflowClient.getOptions().getDomain()) + .put(MetricsTag.TASK_LIST, getHostName()) + .build()); this.cache = new DeciderCache(this.factoryOptions.cacheMaximumSize, metricsScope); - dispatcher = new PollDecisionTaskDispatcher(workflowService); + dispatcher = new PollDecisionTaskDispatcher(workflowClient.getService()); stickyPoller = new Poller<>( id.toString(), new WorkflowPollTaskFactory( - workflowService, domain, getStickyTaskListName(), metricsScope, id.toString()) + workflowClient.getService(), + workflowClient.getOptions().getDomain(), + getStickyTaskListName(), + metricsScope, + id.toString()) .get(), dispatcher, this.factoryOptions.stickyWorkflowPollerOptions, @@ -565,8 +505,8 @@ public synchronized Worker newWorker(String taskList, WorkerOptions options) { statusErrorMessage, "create new worker", state.name(), State.Initial.name())); Worker worker = new Worker( - workflowService, - domain, + workflowClient.getService(), + workflowClient.getOptions().getDomain(), taskList, options, cache, @@ -636,9 +576,9 @@ public synchronized boolean isTerminated() { return true; } - /** @return instance of the cadence client that this worker uses. */ - public IWorkflowService getWorkflowService() { - return workflowService; + /** @return instance of the Cadence client that this worker uses. */ + public WorkflowClient getWorkflowClient() { + return workflowClient; } /** @@ -660,24 +600,6 @@ public synchronized void shutdown() { for (Worker worker : workers) { worker.shutdown(); } - closeServiceWhenTerminated(); - } - - /** - * Closes Cadence client object. It should be closed only after all tasks have completed - * execution as tasks use it to report completion. - */ - private void closeServiceWhenTerminated() { - if (closeServiceOnShutdown) { - ForkJoinPool.commonPool() - .execute( - () -> { - // Service is used to report task completions. - awaitTermination(1, TimeUnit.HOURS); - log.info("Closing workflow service client"); - workflowService.close(); - }); - } } /** @@ -701,7 +623,6 @@ public synchronized void shutdownNow() { for (Worker worker : workers) { worker.shutdownNow(); } - closeServiceWhenTerminated(); } /** @@ -780,13 +701,22 @@ enum State { } public static class FactoryOptions { + private static final FactoryOptions DEFAULT_INSTANCE; + + static { + DEFAULT_INSTANCE = new Builder().build(); + } + + static FactoryOptions defaultInstance() { + return DEFAULT_INSTANCE; + } + public static class Builder { private boolean disableStickyExecution; private int stickyDecisionScheduleToStartTimeoutInSeconds = 5; private int cacheMaximumSize = 600; private int maxWorkflowThreadCount = 600; private PollerOptions stickyWorkflowPollerOptions; - private Scope metricScope; private List contextPropagators; /** @@ -837,11 +767,6 @@ public Builder setStickyWorkflowPollerOptions(PollerOptions stickyWorkflowPoller return this; } - public Builder setMetricScope(Scope metricScope) { - this.metricScope = metricScope; - return this; - } - public Builder setContextPropagators(List contextPropagators) { this.contextPropagators = contextPropagators; return this; @@ -854,7 +779,6 @@ public FactoryOptions build() { maxWorkflowThreadCount, stickyDecisionScheduleToStartTimeoutInSeconds, stickyWorkflowPollerOptions, - metricScope, contextPropagators); } } @@ -864,7 +788,6 @@ public FactoryOptions build() { private final int maxWorkflowThreadCount; private final int stickyDecisionScheduleToStartTimeoutInSeconds; private final PollerOptions stickyWorkflowPollerOptions; - private final Scope metricsScope; private List contextPropagators; private FactoryOptions( @@ -873,7 +796,6 @@ private FactoryOptions( int maxWorkflowThreadCount, int stickyDecisionScheduleToStartTimeoutInSeconds, PollerOptions stickyWorkflowPollerOptions, - Scope metricsScope, List contextPropagators) { Preconditions.checkArgument( cacheMaximumSize > 0, "cacheMaximumSize should be greater than 0"); @@ -900,12 +822,6 @@ private FactoryOptions( this.stickyWorkflowPollerOptions = stickyWorkflowPollerOptions; } - if (metricsScope == null) { - this.metricsScope = NoopScope.getInstance(); - } else { - this.metricsScope = metricsScope; - } - if (contextPropagators != null) { this.contextPropagators = contextPropagators; } else { diff --git a/src/test/java/com/uber/cadence/RegisterTestDomain.java b/src/test/java/com/uber/cadence/RegisterTestDomain.java index c115a9dbf..5fda8d16b 100644 --- a/src/test/java/com/uber/cadence/RegisterTestDomain.java +++ b/src/test/java/com/uber/cadence/RegisterTestDomain.java @@ -2,6 +2,7 @@ import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; +import com.uber.cadence.serviceclient.ClientOptions; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.cadence.serviceclient.WorkflowServiceTChannel; import org.apache.thrift.TException; @@ -11,12 +12,12 @@ public class RegisterTestDomain { private static final boolean useDockerService = Boolean.parseBoolean(System.getenv("USE_DOCKER_SERVICE")); - public static void main(String[] args) throws TException, InterruptedException { + public static void main(String[] args) throws InterruptedException { if (!useDockerService) { return; } - IWorkflowService service = new WorkflowServiceTChannel(); + IWorkflowService service = new WorkflowServiceTChannel(ClientOptions.defaultInstance()); RegisterDomainRequest request = new RegisterDomainRequest().setName(DOMAIN).setWorkflowExecutionRetentionPeriodInDays(1); while (true) { diff --git a/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java b/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java index 62fa3723c..1929a8621 100644 --- a/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java +++ b/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java @@ -24,7 +24,6 @@ import com.google.gson.GsonBuilder; import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.client.WorkflowClientOptions; -import com.uber.cadence.client.WorkflowClientOptions.Builder; import com.uber.cadence.client.WorkflowOptions; import com.uber.cadence.context.ContextPropagator; import com.uber.cadence.testing.TestEnvironmentOptions; @@ -69,7 +68,7 @@ public WorkflowClient runTestCadenceEnv(boolean propagationEnabled) { testEnv = TestWorkflowEnvironment.newInstance( new TestEnvironmentOptions.Builder() - .setFactoryOptions( + .setWorkerFactoryOptions( new FactoryOptions.Builder() .setContextPropagators( propagationEnabled ? PROPAGATORS : Collections.emptyList()) @@ -82,10 +81,8 @@ public WorkflowClient runTestCadenceEnv(boolean propagationEnabled) { LocalActivityContextPropagationWorkflowImpl::new); worker.registerActivitiesImplementations(localActivityContextPropagation); - - WorkflowClientOptions workflowClientOptions = new Builder().build(); - WorkflowClient workflowClient = testEnv.newWorkflowClient(workflowClientOptions); - + WorkflowClient workflowClient = + testEnv.newWorkflowClient(WorkflowClientOptions.defaultInstance()); testEnv.start(); return workflowClient; diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java index 73d5fbb62..7e6fce326 100644 --- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java @@ -101,7 +101,7 @@ protected void failed(Throwable e, Description description) { public void setUp() { TestEnvironmentOptions options = new TestEnvironmentOptions.Builder() - .setFactoryOptions( + .setWorkerFactoryOptions( new Worker.FactoryOptions.Builder() .setContextPropagators(Collections.singletonList(new TestContextPropagator())) .build()) @@ -580,7 +580,7 @@ public void testTimerCancellation() throws TException { .GetWorkflowExecutionHistory( new GetWorkflowExecutionHistoryRequest() .setExecution(new WorkflowExecution().setWorkflowId(workflowID)) - .setDomain(client.getDomain())) + .setDomain(client.getOptions().getDomain())) .getHistory(); List historyEvents = history.getEvents(); assertTrue( diff --git a/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java b/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java index f2730c2d3..fa173a112 100644 --- a/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java +++ b/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java @@ -30,7 +30,9 @@ import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.client.ActivityWorkerShutdownException; import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; import com.uber.cadence.client.WorkflowOptions; +import com.uber.cadence.serviceclient.ClientOptions; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.cadence.serviceclient.WorkflowServiceTChannel; import com.uber.cadence.testing.TestEnvironmentOptions; @@ -79,7 +81,7 @@ public static Object[] data() { @Before public void setUp() { if (useExternalService) { - service = new WorkflowServiceTChannel(); + service = new WorkflowServiceTChannel(ClientOptions.defaultInstance()); } } @@ -135,16 +137,18 @@ public void testShutdown() throws ExecutionException, InterruptedException, TExc Worker.Factory workerFactory = null; TestWorkflowEnvironment testEnvironment = null; CompletableFuture started = new CompletableFuture<>(); + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build(); if (useExternalService) { - workerFactory = new Worker.Factory(service, DOMAIN); + workflowClient = WorkflowClient.newInstance(service, clientOptions); + workerFactory = Worker.Factory.newInstance(workflowClient); Worker worker = workerFactory.newWorker(taskList); - workflowClient = WorkflowClient.newInstance(service, DOMAIN); worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class); worker.registerActivitiesImplementations(new ActivitiesImpl(started)); workerFactory.start(); } else { TestEnvironmentOptions testOptions = - new TestEnvironmentOptions.Builder().setDomain(DOMAIN).build(); + new TestEnvironmentOptions.Builder().setWorkflowClientOptions(clientOptions).build(); testEnvironment = TestWorkflowEnvironment.newInstance(testOptions); service = testEnvironment.getWorkflowService(); Worker worker = testEnvironment.newWorker(taskList); @@ -187,16 +191,18 @@ public void testShutdownNow() throws ExecutionException, InterruptedException, T Worker.Factory workerFactory = null; TestWorkflowEnvironment testEnvironment = null; CompletableFuture started = new CompletableFuture<>(); + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build(); if (useExternalService) { - workerFactory = new Worker.Factory(service, DOMAIN); + workflowClient = WorkflowClient.newInstance(service, clientOptions); + workerFactory = Worker.Factory.newInstance(workflowClient); Worker worker = workerFactory.newWorker(taskList); - workflowClient = WorkflowClient.newInstance(service, DOMAIN); worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class); worker.registerActivitiesImplementations(new ActivitiesImpl(started)); workerFactory.start(); } else { TestEnvironmentOptions testOptions = - new TestEnvironmentOptions.Builder().setDomain(DOMAIN).build(); + new TestEnvironmentOptions.Builder().setWorkflowClientOptions(clientOptions).build(); testEnvironment = TestWorkflowEnvironment.newInstance(testOptions); service = testEnvironment.getWorkflowService(); Worker worker = testEnvironment.newWorker(taskList); @@ -266,16 +272,18 @@ public void testShutdownHeartbeatingActivity() Worker.Factory workerFactory = null; TestWorkflowEnvironment testEnvironment = null; CompletableFuture started = new CompletableFuture<>(); + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build(); if (useExternalService) { - workerFactory = new Worker.Factory(service, DOMAIN); + workflowClient = WorkflowClient.newInstance(service, clientOptions); + workerFactory = Worker.Factory.newInstance(workflowClient); Worker worker = workerFactory.newWorker(taskList); - workflowClient = WorkflowClient.newInstance(service, DOMAIN); worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class); worker.registerActivitiesImplementations(new HeartbeatingActivitiesImpl(started)); workerFactory.start(); } else { TestEnvironmentOptions testOptions = - new TestEnvironmentOptions.Builder().setDomain(DOMAIN).build(); + new TestEnvironmentOptions.Builder().setWorkflowClientOptions(clientOptions).build(); testEnvironment = TestWorkflowEnvironment.newInstance(testOptions); service = testEnvironment.getWorkflowService(); Worker worker = testEnvironment.newWorker(taskList); diff --git a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java index 3e3d2e052..dc5240738 100644 --- a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java +++ b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java @@ -31,10 +31,13 @@ import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; import com.uber.cadence.client.WorkflowOptions; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; +import com.uber.cadence.internal.metrics.NoopScope; import com.uber.cadence.internal.replay.DeciderCache; +import com.uber.cadence.serviceclient.ClientOptions; import com.uber.cadence.serviceclient.WorkflowServiceTChannel; import com.uber.cadence.testing.TestEnvironmentOptions; import com.uber.cadence.testing.TestWorkflowEnvironment; @@ -93,7 +96,7 @@ public static Object[] data() { @BeforeClass public static void setUp() { if (useDockerService) { - service = new WorkflowServiceTChannel(); + service = new WorkflowServiceTChannel(ClientOptions.defaultInstance()); } } @@ -115,12 +118,9 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedSignals() throws Exception .reporter(reporter) .reportEvery(com.uber.m3.util.Duration.ofMillis(300)); - TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder() - .setDisableStickyExecution(false) - .setMetricScope(scope) - .build()); + Worker.FactoryOptions factoryOptions = + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(); + TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper(factoryOptions, scope); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -173,14 +173,13 @@ public void workflowCacheEvictionDueToThreads() { .reporter(reporter) .reportEvery(com.uber.m3.util.Duration.ofMillis(300)); - TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder() - .setDisableStickyExecution(false) - .setMetricScope(scope) - .setMaxWorkflowThreadCount(10) - .setCacheMaximumSize(100) - .build()); + Worker.FactoryOptions factoryOptions = + new Worker.FactoryOptions.Builder() + .setDisableStickyExecution(false) + .setMaxWorkflowThreadCount(10) + .setCacheMaximumSize(100) + .build(); + TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper(factoryOptions, scope); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker( @@ -231,12 +230,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedActivities() throws Except .reporter(reporter) .reportEvery(com.uber.m3.util.Duration.ofMillis(300)); - TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder() - .setDisableStickyExecution(false) - .setMetricScope(scope) - .build()); + Worker.FactoryOptions factoryOptions = + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(); + + TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper(factoryOptions, scope); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class); @@ -289,12 +286,9 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedChildWorkflows() throws Ex .reporter(reporter) .reportEvery(com.uber.m3.util.Duration.ofMillis(300)); - TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder() - .setDisableStickyExecution(false) - .setMetricScope(scope) - .build()); + Worker.FactoryOptions factoryOptions = + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(); + TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper(factoryOptions, scope); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes( @@ -340,12 +334,9 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedMutableSideEffect() throws .reporter(reporter) .reportEvery(com.uber.m3.util.Duration.ofMillis(300)); - TestEnvironmentWrapper wrapper = - new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder() - .setDisableStickyExecution(false) - .setMetricScope(scope) - .build()); + Worker.FactoryOptions factoryOptions = + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(); + TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper(factoryOptions, scope); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build()); worker.registerWorkflowImplementationTypes(TestMutableSideEffectWorkflowImpl.class); @@ -394,7 +385,8 @@ public void whenStickyIsNotEnabledThenTheWorkflowIsNotCached() { String taskListName = "notCachedStickyTest"; TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setDisableStickyExecution(true).build()); + new Worker.FactoryOptions.Builder().setDisableStickyExecution(true).build(), + NoopScope.getInstance()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -427,7 +419,8 @@ public void whenCacheIsEvictedTheWorkerCanRecover() throws Exception { String taskListName = "evictedStickyTest"; TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build()); + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(), + NoopScope.getInstance()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -467,7 +460,8 @@ public void workflowsCanBeQueried() throws Exception { String taskListName = "queryStickyTest"; TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build()); + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(), + NoopScope.getInstance()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -508,7 +502,8 @@ public void workflowsCanBeQueriedAfterEviction() throws Exception { String taskListName = "queryEvictionStickyTest"; TestEnvironmentWrapper wrapper = new TestEnvironmentWrapper( - new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build()); + new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(), + NoopScope.getInstance()); Worker.Factory factory = wrapper.getWorkerFactory(); Worker worker = factory.newWorker(taskListName); worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class); @@ -550,18 +545,22 @@ private class TestEnvironmentWrapper { private TestWorkflowEnvironment testEnv; private Worker.Factory factory; + private final Scope scope; - public TestEnvironmentWrapper(Worker.FactoryOptions options) { + public TestEnvironmentWrapper(Worker.FactoryOptions options, Scope scope) { + this.scope = scope; if (options == null) { options = new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(); } if (useExternalService) { - factory = new Worker.Factory(service, DOMAIN, options); + factory = Worker.Factory.newInstance(getWorkflowClient(), options); } else { + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder().setDomain(DOMAIN).setMetricsScope(scope).build(); TestEnvironmentOptions testOptions = new TestEnvironmentOptions.Builder() - .setDomain(DOMAIN) - .setFactoryOptions(options) + .setWorkflowClientOptions(clientOptions) + .setWorkerFactoryOptions(options) .build(); testEnv = TestWorkflowEnvironment.newInstance(testOptions); } @@ -572,8 +571,10 @@ private Worker.Factory getWorkerFactory() { } private WorkflowClient getWorkflowClient() { + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder().setDomain(DOMAIN).setMetricsScope(scope).build(); return useExternalService - ? WorkflowClient.newInstance(service, DOMAIN) + ? WorkflowClient.newInstance(service, clientOptions) : testEnv.newWorkflowClient(); } diff --git a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java index 1cb8bd6d3..b2b5cee18 100644 --- a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java +++ b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java @@ -23,8 +23,12 @@ import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; import com.uber.cadence.client.WorkflowOptions; import com.uber.cadence.client.WorkflowStub; +import com.uber.cadence.serviceclient.ClientOptions; +import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.cadence.serviceclient.WorkflowServiceTChannel; import com.uber.cadence.testing.TestEnvironmentOptions; import com.uber.cadence.testing.TestWorkflowEnvironment; import com.uber.cadence.workflow.Async; @@ -174,10 +178,20 @@ public TestEnvironmentWrapper(Worker.FactoryOptions options) { if (options == null) { options = new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build(); } - factory = new Worker.Factory(DOMAIN, options); - TestEnvironmentOptions testOptions = - new TestEnvironmentOptions.Builder().setDomain(DOMAIN).setFactoryOptions(options).build(); - testEnv = TestWorkflowEnvironment.newInstance(testOptions); + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build(); + if (useDockerService) { + IWorkflowService service = new WorkflowServiceTChannel(ClientOptions.defaultInstance()); + WorkflowClient client = WorkflowClient.newInstance(service, clientOptions); + factory = Worker.Factory.newInstance(client, options); + } else { + TestEnvironmentOptions testOptions = + new TestEnvironmentOptions.Builder() + .setWorkflowClientOptions(clientOptions) + .setWorkerFactoryOptions(options) + .build(); + testEnv = TestWorkflowEnvironment.newInstance(testOptions); + } } private Worker.Factory getWorkerFactory() { @@ -185,9 +199,7 @@ private Worker.Factory getWorkerFactory() { } private WorkflowClient getWorkflowClient() { - return useExternalService - ? WorkflowClient.newInstance(factory.getWorkflowService(), DOMAIN) - : testEnv.newWorkflowClient(); + return useExternalService ? factory.getWorkflowClient() : testEnv.newWorkflowClient(); } private void close() { diff --git a/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java b/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java index bfd1b7f6d..942f554e1 100644 --- a/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java +++ b/src/test/java/com/uber/cadence/workerFactory/WorkerFactoryTests.java @@ -20,11 +20,13 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.serviceclient.ClientOptions; +import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.cadence.serviceclient.WorkflowServiceTChannel; import com.uber.cadence.worker.Worker; import java.util.concurrent.TimeUnit; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.*; public class WorkerFactoryTests { @@ -36,9 +38,24 @@ public static void beforeClass() { Assume.assumeTrue(useDockerService); } + private IWorkflowService service; + private WorkflowClient client; + private Worker.Factory factory; + + @Before + public void setUp() { + service = new WorkflowServiceTChannel(ClientOptions.defaultInstance()); + client = WorkflowClient.newInstance(service); + factory = Worker.Factory.newInstance(client); + } + + @After + public void tearDown() { + service.close(); + } + @Test public void whenAFactoryIsStartedAllWorkersStart() { - Worker.Factory factory = new Worker.Factory("domain"); factory.newWorker("task1"); factory.newWorker("task2"); @@ -50,7 +67,6 @@ public void whenAFactoryIsStartedAllWorkersStart() { @Test public void whenAFactoryIsShutdownAllWorkersAreShutdown() { - Worker.Factory factory = new Worker.Factory("domain"); factory.newWorker("task1"); factory.newWorker("task2"); @@ -69,8 +85,6 @@ public void whenAFactoryIsShutdownAllWorkersAreShutdown() { @Test public void aFactoryCanBeStartedMoreThanOnce() { - Worker.Factory factory = new Worker.Factory("domain"); - factory.start(); factory.start(); factory.shutdown(); @@ -79,7 +93,6 @@ public void aFactoryCanBeStartedMoreThanOnce() { @Test(expected = IllegalStateException.class) public void aFactoryCannotBeStartedAfterShutdown() { - Worker.Factory factory = new Worker.Factory("domain"); factory.newWorker("task1"); factory.shutdown(); @@ -89,7 +102,6 @@ public void aFactoryCannotBeStartedAfterShutdown() { @Test(expected = IllegalStateException.class) public void workersCannotBeCreatedAfterFactoryHasStarted() { - Worker.Factory factory = new Worker.Factory("domain"); factory.newWorker("task1"); factory.start(); @@ -104,7 +116,6 @@ public void workersCannotBeCreatedAfterFactoryHasStarted() { @Test(expected = IllegalStateException.class) public void workersCannotBeCreatedAfterFactoryIsShutdown() { - Worker.Factory factory = new Worker.Factory("domain"); factory.newWorker("task1"); factory.shutdown(); @@ -119,7 +130,6 @@ public void workersCannotBeCreatedAfterFactoryIsShutdown() { @Test public void factoryCanOnlyBeShutdownMoreThanOnce() { - Worker.Factory factory = new Worker.Factory("domain"); factory.newWorker("task1"); factory.shutdown(); diff --git a/src/test/java/com/uber/cadence/workflow/LoggerTest.java b/src/test/java/com/uber/cadence/workflow/LoggerTest.java index 1b1ec7bc0..9e08704dc 100644 --- a/src/test/java/com/uber/cadence/workflow/LoggerTest.java +++ b/src/test/java/com/uber/cadence/workflow/LoggerTest.java @@ -24,6 +24,7 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; import com.uber.cadence.client.WorkflowOptions; import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.testing.TestEnvironmentOptions; @@ -86,9 +87,11 @@ public void executeChild(String id) { @Test public void testWorkflowLogger() { + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder().setDomain(WorkflowTest.DOMAIN).build(); TestEnvironmentOptions testOptions = new TestEnvironmentOptions.Builder() - .setDomain(WorkflowTest.DOMAIN) + .setWorkflowClientOptions(clientOptions) .setEnableLoggingInReplay(false) .build(); TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance(testOptions); diff --git a/src/test/java/com/uber/cadence/workflow/MetricsTest.java b/src/test/java/com/uber/cadence/workflow/MetricsTest.java index fe2523f2d..21417847a 100644 --- a/src/test/java/com/uber/cadence/workflow/MetricsTest.java +++ b/src/test/java/com/uber/cadence/workflow/MetricsTest.java @@ -24,6 +24,7 @@ import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; import com.uber.cadence.client.WorkflowOptions; import com.uber.cadence.common.RetryOptions; import com.uber.cadence.internal.metrics.MetricsTag; @@ -195,11 +196,13 @@ public void setUp(com.uber.m3.util.Duration reportingFrequecy) { reporter = mock(StatsReporter.class); Scope scope = new RootScopeBuilder().reporter(reporter).reportEvery(reportingFrequecy); - TestEnvironmentOptions testOptions = - new TestEnvironmentOptions.Builder() + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder() .setDomain(WorkflowTest.DOMAIN) .setMetricsScope(scope) .build(); + TestEnvironmentOptions testOptions = + new TestEnvironmentOptions.Builder().setWorkflowClientOptions(clientOptions).build(); testEnvironment = TestWorkflowEnvironment.newInstance(testOptions); } diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 5c1c94a5e..4763680af 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -67,6 +67,8 @@ import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.internal.sync.DeterministicRunnerTest; import com.uber.cadence.internal.worker.PollerOptions; +import com.uber.cadence.serviceclient.ClientOptions; +import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.cadence.serviceclient.WorkflowServiceTChannel; import com.uber.cadence.testing.TestEnvironmentOptions; import com.uber.cadence.testing.TestWorkflowEnvironment; @@ -207,11 +209,11 @@ protected void failed(Throwable e, Description description) { private Worker worker; private TestActivitiesImpl activitiesImpl; private WorkflowClient workflowClient; - private WorkflowClient workflowClientWithOptions; private TestWorkflowEnvironment testEnvironment; private ScheduledExecutorService scheduledExecutor; private List> delayedCallbacks = new ArrayList<>(); - private static final WorkflowServiceTChannel service = new WorkflowServiceTChannel(); + private static final IWorkflowService service = + new WorkflowServiceTChannel(ClientOptions.defaultInstance()); @AfterClass public static void closeService() { @@ -278,12 +280,15 @@ public void setUp() { } tracer = new TracingWorkflowInterceptorFactory(); // TODO: Create a version of TestWorkflowEnvironment that runs against a real service. + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build(); if (useExternalService) { + workflowClient = WorkflowClient.newInstance(service, clientOptions); Worker.FactoryOptions factoryOptions = new Worker.FactoryOptions.Builder() .setDisableStickyExecution(disableStickyExecution) .build(); - workerFactory = new Worker.Factory(service, DOMAIN, factoryOptions); + workerFactory = new Worker.Factory(workflowClient, factoryOptions); WorkerOptions workerOptions = new WorkerOptions.Builder() .setActivityPollerOptions(new PollerOptions.Builder().setPollThreadCount(5).build()) @@ -291,19 +296,13 @@ public void setUp() { .setInterceptorFactory(tracer) .build(); worker = workerFactory.newWorker(taskList, workerOptions); - workflowClient = WorkflowClient.newInstance(service, DOMAIN); - WorkflowClientOptions clientOptions = - new WorkflowClientOptions.Builder() - .setDataConverter(JsonDataConverter.getInstance()) - .build(); - workflowClientWithOptions = WorkflowClient.newInstance(service, DOMAIN, clientOptions); scheduledExecutor = new ScheduledThreadPoolExecutor(1); } else { TestEnvironmentOptions testOptions = new TestEnvironmentOptions.Builder() - .setDomain(DOMAIN) + .setWorkflowClientOptions(clientOptions) .setInterceptorFactory(tracer) - .setFactoryOptions( + .setWorkerFactoryOptions( new Worker.FactoryOptions.Builder() .setDisableStickyExecution(disableStickyExecution) .build()) @@ -311,7 +310,6 @@ public void setUp() { testEnvironment = TestWorkflowEnvironment.newInstance(testOptions); worker = testEnvironment.newWorker(taskList); workflowClient = testEnvironment.newWorkflowClient(); - workflowClientWithOptions = testEnvironment.newWorkflowClient(); } ActivityCompletionClient completionClient = workflowClient.newActivityCompletionClient(); @@ -1448,7 +1446,7 @@ public void testStart() { } // Check that duplicated start is not allowed for AllowDuplicate IdReusePolicy TestMultiargsWorkflowsFunc2 stubF2 = - workflowClientWithOptions.newWorkflowStub( + workflowClient.newWorkflowStub( TestMultiargsWorkflowsFunc2.class, newWorkflowOptionsBuilder(taskList) .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.AllowDuplicate) @@ -1474,32 +1472,25 @@ public void testStart() { assertResult("123456", WorkflowClient.start(stubF6::func6, "1", 2, 3, 4, 5, 6)); TestMultiargsWorkflowsProc stubP = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc.class, workflowOptions); waitForProc(WorkflowClient.start(stubP::proc)); TestMultiargsWorkflowsProc1 stubP1 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc1.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc1.class, workflowOptions); waitForProc(WorkflowClient.start(stubP1::proc1, "1")); TestMultiargsWorkflowsProc2 stubP2 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc2.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc2.class, workflowOptions); waitForProc(WorkflowClient.start(stubP2::proc2, "1", 2)); TestMultiargsWorkflowsProc3 stubP3 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc3.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc3.class, workflowOptions); waitForProc(WorkflowClient.start(stubP3::proc3, "1", 2, 3)); TestMultiargsWorkflowsProc4 stubP4 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc4.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc4.class, workflowOptions); waitForProc(WorkflowClient.start(stubP4::proc4, "1", 2, 3, 4)); TestMultiargsWorkflowsProc5 stubP5 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc5.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc5.class, workflowOptions); waitForProc(WorkflowClient.start(stubP5::proc5, "1", 2, 3, 4, 5)); TestMultiargsWorkflowsProc6 stubP6 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc6.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc6.class, workflowOptions); waitForProc(WorkflowClient.start(stubP6::proc6, "1", 2, 3, 4, 5, 6)); assertEquals("proc", stubP.query()); @@ -1643,32 +1634,25 @@ public void testExecute() throws ExecutionException, InterruptedException { assertEquals("123456", WorkflowClient.execute(stubF6::func6, "1", 2, 3, 4, 5, 6).get()); TestMultiargsWorkflowsProc stubP = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc.class, workflowOptions); WorkflowClient.execute(stubP::proc).get(); TestMultiargsWorkflowsProc1 stubP1 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc1.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc1.class, workflowOptions); WorkflowClient.execute(stubP1::proc1, "1").get(); TestMultiargsWorkflowsProc2 stubP2 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc2.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc2.class, workflowOptions); WorkflowClient.execute(stubP2::proc2, "1", 2).get(); TestMultiargsWorkflowsProc3 stubP3 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc3.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc3.class, workflowOptions); WorkflowClient.execute(stubP3::proc3, "1", 2, 3).get(); TestMultiargsWorkflowsProc4 stubP4 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc4.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc4.class, workflowOptions); WorkflowClient.execute(stubP4::proc4, "1", 2, 3, 4).get(); TestMultiargsWorkflowsProc5 stubP5 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc5.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc5.class, workflowOptions); WorkflowClient.execute(stubP5::proc5, "1", 2, 3, 4, 5).get(); TestMultiargsWorkflowsProc6 stubP6 = - workflowClientWithOptions.newWorkflowStub( - TestMultiargsWorkflowsProc6.class, workflowOptions); + workflowClient.newWorkflowStub(TestMultiargsWorkflowsProc6.class, workflowOptions); WorkflowClient.execute(stubP6::proc6, "1", 2, 3, 4, 5, 6).get(); assertEquals("proc", stubP.query()); @@ -2418,11 +2402,11 @@ public void mySignal(String value) { } @Test - public void testSignal() throws Exception { + public void testSignal() { // Test getTrace through replay by a local worker. Worker queryWorker; if (useExternalService) { - Worker.Factory workerFactory = new Worker.Factory(service, DOMAIN); + Worker.Factory workerFactory = Worker.Factory.newInstance(workflowClient); queryWorker = workerFactory.newWorker(taskList); } else { queryWorker = testEnvironment.newWorker(taskList); @@ -2492,11 +2476,11 @@ public void mySignal(String value) { } @Test - public void testSignalWithStart() throws Exception { + public void testSignalWithStart() { // Test getTrace through replay by a local worker. Worker queryWorker; if (useExternalService) { - Worker.Factory workerFactory = new Worker.Factory(service, DOMAIN); + Worker.Factory workerFactory = Worker.Factory.newInstance(workflowClient); queryWorker = workerFactory.newWorker(taskList); } else { queryWorker = testEnvironment.newWorker(taskList); @@ -2996,7 +2980,7 @@ public void testChildWorkflowRetry() { options.setTaskList(taskList); AtomicReference capturedWorkflowType = new AtomicReference<>(); WorkflowClientOptions clientOptions = - new WorkflowClientOptions.Builder() + WorkflowClientOptions.newBuilder() .setInterceptors( new WorkflowClientInterceptorBase() { @Override @@ -3006,10 +2990,11 @@ public WorkflowStub newUntypedWorkflowStub( return next; } }) + .setDomain(DOMAIN) .build(); WorkflowClient wc; if (useExternalService) { - wc = WorkflowClient.newInstance(service, DOMAIN, clientOptions); + wc = WorkflowClient.newInstance(service, clientOptions); } else { wc = testEnvironment.newWorkflowClient(clientOptions); } @@ -3072,7 +3057,7 @@ public void testChildWorkflowExecutionPromiseHandler() { options.setTaskList(taskList); WorkflowClient wc; if (useExternalService) { - wc = WorkflowClient.newInstance(service, DOMAIN); + wc = workflowClient; } else { wc = testEnvironment.newWorkflowClient(); }