Skip to content

Commit

Permalink
Add option to dispatch activity tasks locally (#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
mkolodezny authored Dec 4, 2020
1 parent cd2f6bf commit e33ed95
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 35 deletions.
11 changes: 11 additions & 0 deletions src/main/java/com/uber/cadence/internal/metrics/MetricsType.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.uber.cadence.internal.metrics;

public class MetricsType {

public static final String CADENCE_METRICS_PREFIX = "cadence-";
public static final String WORKFLOW_START_COUNTER = CADENCE_METRICS_PREFIX + "workflow-start";
public static final String WORKFLOW_COMPLETED_COUNTER =
Expand Down Expand Up @@ -119,6 +120,16 @@ public class MetricsType {
CADENCE_METRICS_PREFIX + "local-activity-panic";
public static final String LOCAL_ACTIVITY_EXECUTION_LATENCY =
CADENCE_METRICS_PREFIX + "local-activity-execution-latency";
public static final String LOCALLY_DISPATCHED_ACTIVITY_POLL_TOTAL_COUNTER =
CADENCE_METRICS_PREFIX + "locally-dispatched-activity-poll-total";
public static final String LOCALLY_DISPATCHED_ACTIVITY_POLL_NO_TASK_COUNTER =
CADENCE_METRICS_PREFIX + "locally-dispatched-activity-poll-no-task";
public static final String LOCALLY_DISPATCHED_ACTIVITY_POLL_SUCCEED_COUNTER =
CADENCE_METRICS_PREFIX + "locally-dispatched-activity-poll-succeed";
public static final String ACTIVITY_LOCAL_DISPATCH_FAILED_COUNTER =
CADENCE_METRICS_PREFIX + "activity-local-dispatch-failed";
public static final String ACTIVITY_LOCAL_DISPATCH_SUCCEED_COUNTER =
CADENCE_METRICS_PREFIX + "activity-local-dispatch-succeed";
public static final String WORKER_PANIC_COUNTER = CADENCE_METRICS_PREFIX + "worker-panic";

public static final String TASK_LIST_QUEUE_LATENCY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.uber.cadence.internal.replay.ReplayDecisionTaskHandler;
import com.uber.cadence.internal.worker.DecisionTaskHandler;
import com.uber.cadence.internal.worker.LocalActivityWorker;
import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker;
import com.uber.cadence.internal.worker.SingleWorkerOptions;
import com.uber.cadence.internal.worker.SuspendableWorker;
import com.uber.cadence.internal.worker.WorkflowWorker;
Expand All @@ -49,10 +50,14 @@ public class SyncWorkflowWorker

private final WorkflowWorker workflowWorker;
private final LocalActivityWorker laWorker;
private final LocallyDispatchedActivityWorker ldaWorker;

private final POJOWorkflowImplementationFactory factory;
private final DataConverter dataConverter;
private final POJOActivityTaskHandler laTaskHandler;
private final POJOActivityTaskHandler ldaTaskHandler;
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);

public SyncWorkflowWorker(
IWorkflowService service,
Expand All @@ -61,6 +66,7 @@ public SyncWorkflowWorker(
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
SingleWorkerOptions workflowOptions,
SingleWorkerOptions localActivityOptions,
SingleWorkerOptions locallyDispatchedActivityOptions,
DeciderCache cache,
String stickyTaskListName,
Duration stickyDecisionScheduleToStartTimeout,
Expand Down Expand Up @@ -91,10 +97,25 @@ public SyncWorkflowWorker(
stickyDecisionScheduleToStartTimeout,
service,
laWorker.getLocalActivityTaskPoller());
ldaTaskHandler =
new POJOActivityTaskHandler(
service,
domain,
locallyDispatchedActivityOptions.getDataConverter(),
ldaHeartbeatExecutor);
ldaWorker =
new LocallyDispatchedActivityWorker(
service, domain, taskList, locallyDispatchedActivityOptions, ldaTaskHandler);

workflowWorker =
new WorkflowWorker(
service, domain, taskList, workflowOptions, taskHandler, stickyTaskListName);
service,
domain,
taskList,
workflowOptions,
taskHandler,
ldaWorker.getLocallyDispatchedActivityTaskPoller(),
stickyTaskListName);
}

public void setWorkflowImplementationTypes(
Expand All @@ -115,64 +136,80 @@ public void setLocalActivitiesImplementation(Object... activitiesImplementation)
this.laTaskHandler.setLocalActivitiesImplementation(activitiesImplementation);
}

public void setActivitiesImplementationToDispatchLocally(Object... activitiesImplementation) {
this.ldaTaskHandler.setActivitiesImplementation(activitiesImplementation);
}

@Override
public void start() {
workflowWorker.start();
// workflowWorker doesn't start if no types are registered with it. In that case we don't need
// to start LocalActivity Worker.
if (workflowWorker.isStarted()) {
laWorker.start();
ldaWorker.start();
}
}

@Override
public boolean isStarted() {
return workflowWorker.isStarted() && laWorker.isStarted();
return workflowWorker.isStarted() && laWorker.isStarted() && ldaWorker.isStarted();
}

@Override
public boolean isShutdown() {
return workflowWorker.isShutdown() && laWorker.isShutdown();
return workflowWorker.isShutdown() && laWorker.isShutdown() && ldaWorker.isShutdown();
}

@Override
public boolean isTerminated() {
return workflowWorker.isTerminated() && laWorker.isTerminated();
return workflowWorker.isTerminated()
&& laWorker.isTerminated()
&& ldaHeartbeatExecutor.isTerminated()
&& ldaWorker.isTerminated();
}

@Override
public void shutdown() {
laWorker.shutdown();
ldaHeartbeatExecutor.shutdown();
ldaWorker.shutdown();
workflowWorker.shutdown();
}

@Override
public void shutdownNow() {
laWorker.shutdownNow();
ldaHeartbeatExecutor.shutdownNow();
ldaWorker.shutdownNow();
workflowWorker.shutdownNow();
}

@Override
public void awaitTermination(long timeout, TimeUnit unit) {
long timeoutMillis = InternalUtils.awaitTermination(laWorker, unit.toMillis(timeout));
timeoutMillis = InternalUtils.awaitTermination(ldaHeartbeatExecutor, timeoutMillis);
timeoutMillis = InternalUtils.awaitTermination(ldaWorker, timeoutMillis);
InternalUtils.awaitTermination(workflowWorker, timeoutMillis);
}

@Override
public void suspendPolling() {
workflowWorker.suspendPolling();
laWorker.suspendPolling();
ldaWorker.suspendPolling();
}

@Override
public void resumePolling() {
workflowWorker.resumePolling();
laWorker.resumePolling();
ldaWorker.resumePolling();
}

@Override
public boolean isSuspended() {
return workflowWorker.isSuspended() && laWorker.isSuspended();
return workflowWorker.isSuspended() && laWorker.isSuspended() && ldaWorker.isSuspended();
}

public <R> R queryWorkflowExecution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,29 @@
import com.uber.cadence.internal.metrics.MetricsType;
import com.uber.cadence.serviceclient.IWorkflowService;
import com.uber.m3.tally.Stopwatch;
import com.uber.m3.util.Duration;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ActivityPollTask implements Poller.PollTask<PollForActivityTaskResponse> {
final class ActivityPollTask extends ActivityPollTaskBase {

private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);
private final IWorkflowService service;
private final String domain;
private final String taskList;
private final SingleWorkerOptions options;
private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);

public ActivityPollTask(
IWorkflowService service, String domain, String taskList, SingleWorkerOptions options) {

super(options);
this.service = service;
this.domain = domain;
this.taskList = taskList;
this.options = options;
}

@Override
public PollForActivityTaskResponse poll() throws TException {
protected PollForActivityTaskResponse pollTask() throws TException {
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_COUNTER).inc(1);
Stopwatch sw = options.getMetricsScope().timer(MetricsType.ACTIVITY_POLL_LATENCY).start();

PollForActivityTaskRequest pollRequest = new PollForActivityTaskRequest();
pollRequest.setDomain(domain);
pollRequest.setIdentity(options.getIdentity());
Expand Down Expand Up @@ -89,14 +85,6 @@ public PollForActivityTaskResponse poll() throws TException {
if (log.isTraceEnabled()) {
log.trace("poll request returned " + result);
}

options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_SUCCEED_COUNTER).inc(1);
options
.getMetricsScope()
.timer(MetricsType.ACTIVITY_SCHEDULED_TO_START_LATENCY)
.record(
Duration.ofNanos(
result.getStartedTimestamp() - result.getScheduledTimestampOfThisAttempt()));
sw.stop();
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.internal.worker;

import com.uber.cadence.PollForActivityTaskResponse;
import com.uber.cadence.internal.metrics.MetricsType;
import com.uber.m3.util.Duration;
import org.apache.thrift.TException;

abstract class ActivityPollTaskBase implements Poller.PollTask<PollForActivityTaskResponse> {

protected final SingleWorkerOptions options;

public ActivityPollTaskBase(SingleWorkerOptions options) {
this.options = options;
}

public PollForActivityTaskResponse poll() throws TException {

PollForActivityTaskResponse result = pollTask();
if (result == null || result.getTaskToken() == null) {
return null;
}
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_SUCCEED_COUNTER).inc(1);
options
.getMetricsScope()
.timer(MetricsType.ACTIVITY_SCHEDULED_TO_START_LATENCY)
.record(
Duration.ofNanos(
result.getStartedTimestamp() - result.getScheduledTimestampOfThisAttempt()));
return result;
}

protected abstract PollForActivityTaskResponse pollTask() throws TException;
}
25 changes: 19 additions & 6 deletions src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.uber.cadence.internal.metrics.MetricsTag;
import com.uber.cadence.internal.metrics.MetricsType;
import com.uber.cadence.internal.worker.ActivityTaskHandler.Result;
import com.uber.cadence.internal.worker.Poller.PollTask;
import com.uber.cadence.serviceclient.IWorkflowService;
import com.uber.m3.tally.Scope;
import com.uber.m3.tally.Stopwatch;
Expand All @@ -43,22 +44,30 @@
import org.apache.thrift.TException;
import org.slf4j.MDC;

public final class ActivityWorker extends SuspendableWorkerBase {

private static final String POLL_THREAD_NAME_PREFIX = "Activity Poller taskList=";
public class ActivityWorker extends SuspendableWorkerBase {

protected final SingleWorkerOptions options;
private final ActivityTaskHandler handler;
private final IWorkflowService service;
private final String domain;
private final String taskList;
private final SingleWorkerOptions options;

public ActivityWorker(
IWorkflowService service,
String domain,
String taskList,
SingleWorkerOptions options,
ActivityTaskHandler handler) {
this(service, domain, taskList, options, handler, "Activity Poller taskList=");
}

public ActivityWorker(
IWorkflowService service,
String domain,
String taskList,
SingleWorkerOptions options,
ActivityTaskHandler handler,
String pollThreadNamePrefix) {
this.service = Objects.requireNonNull(service);
this.domain = Objects.requireNonNull(domain);
this.taskList = Objects.requireNonNull(taskList);
Expand All @@ -69,7 +78,7 @@ public ActivityWorker(
pollerOptions =
PollerOptions.newBuilder(pollerOptions)
.setPollThreadNamePrefix(
POLL_THREAD_NAME_PREFIX + "\"" + taskList + "\", domain=\"" + domain + "\"")
pollThreadNamePrefix + "\"" + taskList + "\", domain=\"" + domain + "\"")
.build();
}
this.options = SingleWorkerOptions.newBuilder(options).setPollerOptions(pollerOptions).build();
Expand All @@ -81,7 +90,7 @@ public void start() {
SuspendableWorker poller =
new Poller<>(
options.getIdentity(),
new ActivityPollTask(service, domain, taskList, options),
getOrCreateActivityPollTask(),
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
options.getPollerOptions(),
options.getMetricsScope());
Expand All @@ -91,6 +100,10 @@ public void start() {
}
}

protected PollTask<PollForActivityTaskResponse> getOrCreateActivityPollTask() {
return new ActivityPollTask(service, domain, taskList, options);
}

private class TaskHandlerImpl
implements PollTaskExecutor.TaskHandler<PollForActivityTaskResponse> {

Expand Down
Loading

0 comments on commit e33ed95

Please sign in to comment.