Skip to content

Commit

Permalink
Refactor options and worker dependencies (#551)
Browse files Browse the repository at this point in the history
This is the first part of a general options and worker dependencies cleanup. After this refactoring, user should follow the sequence below to setup worker and its dependencies:
- IWorkflowService is a client stub for communicating to Cadence Server
- WorkflowClient wraps IWorkflowService and provides higher level API to start, signal, query and wait for completion of workflows.
- WorkerFactory wraps WorkflowClient and can be used to create instances of Workers.
- Worker listens on a single task list and is created by a WorkerFactory.
  • Loading branch information
meiliang86 authored Oct 20, 2020
1 parent 47c9097 commit 3caeb97
Show file tree
Hide file tree
Showing 24 changed files with 720 additions and 785 deletions.
65 changes: 7 additions & 58 deletions src/main/java/com/uber/cadence/client/WorkflowClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,77 +107,29 @@ public interface WorkflowClient {
/** Replays workflow to the current state and returns empty result or error if replay failed. */
String QUERY_TYPE_REPLAY_ONLY = "__replay_only";

/**
* Creates worker that connects to the local instance of the Cadence Service that listens on a
* default port (7933).
*
* @param domain domain that worker uses to poll.
*/
static WorkflowClient newInstance(String domain) {
return WorkflowClientInternal.newInstance(domain);
}

/**
* Creates worker that connects to the local instance of the Cadence Service that listens on a
* default port (7933).
*
* @param domain domain that worker uses to poll.
* @param options Options (like {@link com.uber.cadence.converter.DataConverter}er override) for
* configuring client.
*/
static WorkflowClient newInstance(String domain, WorkflowClientOptions options) {
return WorkflowClientInternal.newInstance(domain, options);
}

/**
* Creates client that connects to an instance of the Cadence Service.
*
* @param host of the Cadence Service endpoint
* @param port of the Cadence Service endpoint
* @param domain domain that worker uses to poll.
*/
static WorkflowClient newInstance(String host, int port, String domain) {
return WorkflowClientInternal.newInstance(host, port, domain);
}

/**
* Creates client that connects to an instance of the Cadence Service.
*
* @param host of the Cadence Service endpoint
* @param port of the Cadence Service endpoint
* @param domain domain that worker uses to poll.
* @param options Options (like {@link com.uber.cadence.converter.DataConverter}er override) for
* configuring client.
*/
static WorkflowClient newInstance(
String host, int port, String domain, WorkflowClientOptions options) {
return WorkflowClientInternal.newInstance(host, port, domain, options);
}

/**
* Creates client that connects to an instance of the Cadence Service.
*
* @param service client to the Cadence Service endpoint.
* @param domain domain that worker uses to poll.
*/
static WorkflowClient newInstance(IWorkflowService service, String domain) {
return WorkflowClientInternal.newInstance(service, domain);
static WorkflowClient newInstance(IWorkflowService service) {
return WorkflowClientInternal.newInstance(service, WorkflowClientOptions.defaultInstance());
}

/**
* Creates client that connects to an instance of the Cadence Service.
*
* @param service client to the Cadence Service endpoint.
* @param domain domain that worker uses to poll.
* @param options Options (like {@link com.uber.cadence.converter.DataConverter}er override) for
* configuring client.
*/
static WorkflowClient newInstance(
IWorkflowService service, String domain, WorkflowClientOptions options) {
return WorkflowClientInternal.newInstance(service, domain, options);
static WorkflowClient newInstance(IWorkflowService service, WorkflowClientOptions options) {
return WorkflowClientInternal.newInstance(service, options);
}

String getDomain();
WorkflowClientOptions getOptions();

IWorkflowService getService();

/**
* Creates workflow client stub that can be used to start a single workflow execution. The first
Expand Down Expand Up @@ -728,7 +680,4 @@ static <A1, A2, A3, A4, A5, A6, R> CompletableFuture<R> execute(
A6 arg6) {
return WorkflowClientInternal.execute(workflow, arg1, arg2, arg3, arg4, arg5, arg6);
}

/** Closes the workflow client and the underlying IWorkflowService when this method is called. */
void close();
}
54 changes: 41 additions & 13 deletions src/main/java/com/uber/cadence/client/WorkflowClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,47 @@

/** Options for WorkflowClient configuration. */
public final class WorkflowClientOptions {
private static final String DEFAULT_DOMAIN = "default";
private static final WorkflowClientOptions DEFAULT_INSTANCE;
private static final WorkflowClientInterceptor[] EMPTY_INTERCEPTOR_ARRAY =
new WorkflowClientInterceptor[0];

public static final class Builder {
static {
DEFAULT_INSTANCE = new Builder().build();
}

public static WorkflowClientOptions defaultInstance() {
return DEFAULT_INSTANCE;
}

public static Builder newBuilder() {
return new Builder();
}

public static Builder newBuilder(WorkflowClientOptions options) {
return new Builder(options);
}

public static final class Builder {
private String domain = DEFAULT_DOMAIN;
private DataConverter dataConverter = JsonDataConverter.getInstance();
private WorkflowClientInterceptor[] interceptors = EMPTY_INTERCEPTOR_ARRAY;
private Scope metricsScope;
private Scope metricsScope = NoopScope.getInstance();

public Builder() {}
private Builder() {}

public Builder(WorkflowClientOptions options) {
private Builder(WorkflowClientOptions options) {
domain = options.getDomain();
dataConverter = options.getDataConverter();
interceptors = options.getInterceptors();
metricsScope = options.getMetricsScope();
}

public Builder setDomain(String domain) {
this.domain = domain;
return this;
}

/**
* Used to override default (JSON) data converter implementation.
*
Expand Down Expand Up @@ -72,28 +98,30 @@ public Builder setMetricsScope(Scope metricsScope) {
}

public WorkflowClientOptions build() {
if (metricsScope == null) {
metricsScope = NoopScope.getInstance();
}
return new WorkflowClientOptions(dataConverter, interceptors, metricsScope);
return new WorkflowClientOptions(domain, dataConverter, interceptors, metricsScope);
}
}

private static final WorkflowClientInterceptor[] EMPTY_INTERCEPTOR_ARRAY =
new WorkflowClientInterceptor[0];
private final String domain;
private final DataConverter dataConverter;

private final WorkflowClientInterceptor[] interceptors;

private final Scope metricsScope;

private WorkflowClientOptions(
DataConverter dataConverter, WorkflowClientInterceptor[] interceptors, Scope metricsScope) {
String domain,
DataConverter dataConverter,
WorkflowClientInterceptor[] interceptors,
Scope metricsScope) {
this.domain = domain;
this.dataConverter = dataConverter;
this.interceptors = interceptors;
this.metricsScope = metricsScope;
}

public String getDomain() {
return domain;
}

public DataConverter getDataConverter() {
return dataConverter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.uber.m3.tally.Scope;
import com.uber.m3.util.ImmutableMap;
import java.util.Map;
import java.util.Objects;

public class ManualActivityCompletionClientFactoryImpl
extends ManualActivityCompletionClientFactory {
Expand All @@ -35,9 +36,9 @@ public class ManualActivityCompletionClientFactoryImpl

public ManualActivityCompletionClientFactoryImpl(
IWorkflowService service, String domain, DataConverter dataConverter, Scope metricsScope) {
this.service = service;
this.domain = domain;
this.dataConverter = dataConverter;
this.service = Objects.requireNonNull(service);
this.domain = Objects.requireNonNull(domain);
this.dataConverter = Objects.requireNonNull(dataConverter);

Map<String, String> tags =
new ImmutableMap.Builder<String, String>(1).put(MetricsTag.DOMAIN, domain).build();
Expand All @@ -54,16 +55,11 @@ public DataConverter getDataConverter() {

@Override
public ManualActivityCompletionClient getClient(byte[] taskToken) {
if (service == null) {
throw new IllegalStateException("required property service is null");
}
if (dataConverter == null) {
throw new IllegalStateException("required property dataConverter is null");
}
if (taskToken == null || taskToken.length == 0) {
throw new IllegalArgumentException("null or empty task token");
}
return new ManualActivityCompletionClientImpl(service, taskToken, dataConverter, metricsScope);
return new ManualActivityCompletionClientImpl(
service, domain, taskToken, dataConverter, metricsScope);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ class ManualActivityCompletionClientImpl extends ManualActivityCompletionClient
private final Scope metricsScope;

ManualActivityCompletionClientImpl(
IWorkflowService service, byte[] taskToken, DataConverter dataConverter, Scope metricsScope) {
IWorkflowService service,
String domain,
byte[] taskToken,
DataConverter dataConverter,
Scope metricsScope) {
this.service = service;
this.taskToken = taskToken;
this.dataConverter = dataConverter;
this.domain = null;
this.domain = domain;
this.execution = null;
this.activityId = null;
this.metricsScope = metricsScope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ public void runUntilAllBlocked() throws Throwable {
if (nextWakeUpTime < currentTimeMillis() || nextWakeUpTime == Long.MAX_VALUE) {
nextWakeUpTime = 0;
}

} finally {
inRunUntilAllBlocked = false;
// Close was requested while running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public TestActivityEnvironmentInternal(TestEnvironmentOptions options) {
activityTaskHandler =
new POJOActivityTaskHandler(
new WorkflowServiceWrapper(workflowService),
testEnvironmentOptions.getDomain(),
testEnvironmentOptions.getWorkflowClientOptions().getDomain(),
testEnvironmentOptions.getDataConverter(),
heartbeatExecutor);
}
Expand Down
Loading

0 comments on commit 3caeb97

Please sign in to comment.