diff --git a/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java b/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java index 5a3205c4e..afda23569 100644 --- a/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java +++ b/src/main/java/com/uber/cadence/internal/metrics/MetricsType.java @@ -18,6 +18,7 @@ package com.uber.cadence.internal.metrics; public class MetricsType { + public static final String CADENCE_METRICS_PREFIX = "cadence-"; public static final String WORKFLOW_START_COUNTER = CADENCE_METRICS_PREFIX + "workflow-start"; public static final String WORKFLOW_COMPLETED_COUNTER = @@ -119,6 +120,16 @@ public class MetricsType { CADENCE_METRICS_PREFIX + "local-activity-panic"; public static final String LOCAL_ACTIVITY_EXECUTION_LATENCY = CADENCE_METRICS_PREFIX + "local-activity-execution-latency"; + public static final String LOCALLY_DISPATCHED_ACTIVITY_POLL_TOTAL_COUNTER = + CADENCE_METRICS_PREFIX + "locally-dispatched-activity-poll-total"; + public static final String LOCALLY_DISPATCHED_ACTIVITY_POLL_NO_TASK_COUNTER = + CADENCE_METRICS_PREFIX + "locally-dispatched-activity-poll-no-task"; + public static final String LOCALLY_DISPATCHED_ACTIVITY_POLL_SUCCEED_COUNTER = + CADENCE_METRICS_PREFIX + "locally-dispatched-activity-poll-succeed"; + public static final String ACTIVITY_LOCAL_DISPATCH_FAILED_COUNTER = + CADENCE_METRICS_PREFIX + "activity-local-dispatch-failed"; + public static final String ACTIVITY_LOCAL_DISPATCH_SUCCEED_COUNTER = + CADENCE_METRICS_PREFIX + "activity-local-dispatch-succeed"; public static final String WORKER_PANIC_COUNTER = CADENCE_METRICS_PREFIX + "worker-panic"; public static final String TASK_LIST_QUEUE_LATENCY = diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java b/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java index 64386f41e..b19ac1a55 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java @@ -26,6 +26,7 @@ import com.uber.cadence.internal.replay.ReplayDecisionTaskHandler; import com.uber.cadence.internal.worker.DecisionTaskHandler; import com.uber.cadence.internal.worker.LocalActivityWorker; +import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker; import com.uber.cadence.internal.worker.SingleWorkerOptions; import com.uber.cadence.internal.worker.SuspendableWorker; import com.uber.cadence.internal.worker.WorkflowWorker; @@ -49,10 +50,14 @@ public class SyncWorkflowWorker private final WorkflowWorker workflowWorker; private final LocalActivityWorker laWorker; + private final LocallyDispatchedActivityWorker ldaWorker; + private final POJOWorkflowImplementationFactory factory; private final DataConverter dataConverter; private final POJOActivityTaskHandler laTaskHandler; + private final POJOActivityTaskHandler ldaTaskHandler; private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4); + private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4); public SyncWorkflowWorker( IWorkflowService service, @@ -61,6 +66,7 @@ public SyncWorkflowWorker( Function interceptorFactory, SingleWorkerOptions workflowOptions, SingleWorkerOptions localActivityOptions, + SingleWorkerOptions locallyDispatchedActivityOptions, DeciderCache cache, String stickyTaskListName, Duration stickyDecisionScheduleToStartTimeout, @@ -91,10 +97,25 @@ public SyncWorkflowWorker( stickyDecisionScheduleToStartTimeout, service, laWorker.getLocalActivityTaskPoller()); + ldaTaskHandler = + new POJOActivityTaskHandler( + service, + domain, + locallyDispatchedActivityOptions.getDataConverter(), + ldaHeartbeatExecutor); + ldaWorker = + new LocallyDispatchedActivityWorker( + service, domain, taskList, locallyDispatchedActivityOptions, ldaTaskHandler); workflowWorker = new WorkflowWorker( - service, domain, taskList, workflowOptions, taskHandler, stickyTaskListName); + service, + domain, + taskList, + workflowOptions, + taskHandler, + ldaWorker.getLocallyDispatchedActivityTaskPoller(), + stickyTaskListName); } public void setWorkflowImplementationTypes( @@ -115,6 +136,10 @@ public void setLocalActivitiesImplementation(Object... activitiesImplementation) this.laTaskHandler.setLocalActivitiesImplementation(activitiesImplementation); } + public void setActivitiesImplementationToDispatchLocally(Object... activitiesImplementation) { + this.ldaTaskHandler.setActivitiesImplementation(activitiesImplementation); + } + @Override public void start() { workflowWorker.start(); @@ -122,39 +147,49 @@ public void start() { // to start LocalActivity Worker. if (workflowWorker.isStarted()) { laWorker.start(); + ldaWorker.start(); } } @Override public boolean isStarted() { - return workflowWorker.isStarted() && laWorker.isStarted(); + return workflowWorker.isStarted() && laWorker.isStarted() && ldaWorker.isStarted(); } @Override public boolean isShutdown() { - return workflowWorker.isShutdown() && laWorker.isShutdown(); + return workflowWorker.isShutdown() && laWorker.isShutdown() && ldaWorker.isShutdown(); } @Override public boolean isTerminated() { - return workflowWorker.isTerminated() && laWorker.isTerminated(); + return workflowWorker.isTerminated() + && laWorker.isTerminated() + && ldaHeartbeatExecutor.isTerminated() + && ldaWorker.isTerminated(); } @Override public void shutdown() { laWorker.shutdown(); + ldaHeartbeatExecutor.shutdown(); + ldaWorker.shutdown(); workflowWorker.shutdown(); } @Override public void shutdownNow() { laWorker.shutdownNow(); + ldaHeartbeatExecutor.shutdownNow(); + ldaWorker.shutdownNow(); workflowWorker.shutdownNow(); } @Override public void awaitTermination(long timeout, TimeUnit unit) { long timeoutMillis = InternalUtils.awaitTermination(laWorker, unit.toMillis(timeout)); + timeoutMillis = InternalUtils.awaitTermination(ldaHeartbeatExecutor, timeoutMillis); + timeoutMillis = InternalUtils.awaitTermination(ldaWorker, timeoutMillis); InternalUtils.awaitTermination(workflowWorker, timeoutMillis); } @@ -162,17 +197,19 @@ public void awaitTermination(long timeout, TimeUnit unit) { public void suspendPolling() { workflowWorker.suspendPolling(); laWorker.suspendPolling(); + ldaWorker.suspendPolling(); } @Override public void resumePolling() { workflowWorker.resumePolling(); laWorker.resumePolling(); + ldaWorker.resumePolling(); } @Override public boolean isSuspended() { - return workflowWorker.isSuspended() && laWorker.isSuspended(); + return workflowWorker.isSuspended() && laWorker.isSuspended() && ldaWorker.isSuspended(); } public R queryWorkflowExecution( diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java index ca0e46a7d..927fef782 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java @@ -26,33 +26,29 @@ import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.m3.tally.Stopwatch; -import com.uber.m3.util.Duration; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class ActivityPollTask implements Poller.PollTask { +final class ActivityPollTask extends ActivityPollTaskBase { + private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class); private final IWorkflowService service; private final String domain; private final String taskList; - private final SingleWorkerOptions options; - private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class); public ActivityPollTask( IWorkflowService service, String domain, String taskList, SingleWorkerOptions options) { - + super(options); this.service = service; this.domain = domain; this.taskList = taskList; - this.options = options; } @Override - public PollForActivityTaskResponse poll() throws TException { + protected PollForActivityTaskResponse pollTask() throws TException { options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_COUNTER).inc(1); Stopwatch sw = options.getMetricsScope().timer(MetricsType.ACTIVITY_POLL_LATENCY).start(); - PollForActivityTaskRequest pollRequest = new PollForActivityTaskRequest(); pollRequest.setDomain(domain); pollRequest.setIdentity(options.getIdentity()); @@ -89,14 +85,6 @@ public PollForActivityTaskResponse poll() throws TException { if (log.isTraceEnabled()) { log.trace("poll request returned " + result); } - - options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_SUCCEED_COUNTER).inc(1); - options - .getMetricsScope() - .timer(MetricsType.ACTIVITY_SCHEDULED_TO_START_LATENCY) - .record( - Duration.ofNanos( - result.getStartedTimestamp() - result.getScheduledTimestampOfThisAttempt())); sw.stop(); return result; } diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTaskBase.java b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTaskBase.java new file mode 100644 index 000000000..89e4967ec --- /dev/null +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTaskBase.java @@ -0,0 +1,50 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.worker; + +import com.uber.cadence.PollForActivityTaskResponse; +import com.uber.cadence.internal.metrics.MetricsType; +import com.uber.m3.util.Duration; +import org.apache.thrift.TException; + +abstract class ActivityPollTaskBase implements Poller.PollTask { + + protected final SingleWorkerOptions options; + + public ActivityPollTaskBase(SingleWorkerOptions options) { + this.options = options; + } + + public PollForActivityTaskResponse poll() throws TException { + + PollForActivityTaskResponse result = pollTask(); + if (result == null || result.getTaskToken() == null) { + return null; + } + options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_SUCCEED_COUNTER).inc(1); + options + .getMetricsScope() + .timer(MetricsType.ACTIVITY_SCHEDULED_TO_START_LATENCY) + .record( + Duration.ofNanos( + result.getStartedTimestamp() - result.getScheduledTimestampOfThisAttempt())); + return result; + } + + protected abstract PollForActivityTaskResponse pollTask() throws TException; +} diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index a0a86014a..f1a100fb3 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -29,6 +29,7 @@ import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.internal.worker.ActivityTaskHandler.Result; +import com.uber.cadence.internal.worker.Poller.PollTask; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.m3.tally.Scope; import com.uber.m3.tally.Stopwatch; @@ -43,15 +44,13 @@ import org.apache.thrift.TException; import org.slf4j.MDC; -public final class ActivityWorker extends SuspendableWorkerBase { - - private static final String POLL_THREAD_NAME_PREFIX = "Activity Poller taskList="; +public class ActivityWorker extends SuspendableWorkerBase { + protected final SingleWorkerOptions options; private final ActivityTaskHandler handler; private final IWorkflowService service; private final String domain; private final String taskList; - private final SingleWorkerOptions options; public ActivityWorker( IWorkflowService service, @@ -59,6 +58,16 @@ public ActivityWorker( String taskList, SingleWorkerOptions options, ActivityTaskHandler handler) { + this(service, domain, taskList, options, handler, "Activity Poller taskList="); + } + + public ActivityWorker( + IWorkflowService service, + String domain, + String taskList, + SingleWorkerOptions options, + ActivityTaskHandler handler, + String pollThreadNamePrefix) { this.service = Objects.requireNonNull(service); this.domain = Objects.requireNonNull(domain); this.taskList = Objects.requireNonNull(taskList); @@ -69,7 +78,7 @@ public ActivityWorker( pollerOptions = PollerOptions.newBuilder(pollerOptions) .setPollThreadNamePrefix( - POLL_THREAD_NAME_PREFIX + "\"" + taskList + "\", domain=\"" + domain + "\"") + pollThreadNamePrefix + "\"" + taskList + "\", domain=\"" + domain + "\"") .build(); } this.options = SingleWorkerOptions.newBuilder(options).setPollerOptions(pollerOptions).build(); @@ -81,7 +90,7 @@ public void start() { SuspendableWorker poller = new Poller<>( options.getIdentity(), - new ActivityPollTask(service, domain, taskList, options), + getOrCreateActivityPollTask(), new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)), options.getPollerOptions(), options.getMetricsScope()); @@ -91,6 +100,10 @@ public void start() { } } + protected PollTask getOrCreateActivityPollTask() { + return new ActivityPollTask(service, domain, taskList, options); + } + private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler { diff --git a/src/main/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityPollTask.java b/src/main/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityPollTask.java new file mode 100644 index 000000000..cc57f3f19 --- /dev/null +++ b/src/main/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityPollTask.java @@ -0,0 +1,88 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.worker; + +import com.uber.cadence.PollForActivityTaskResponse; +import com.uber.cadence.internal.metrics.MetricsType; +import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker.Task; +import java.util.concurrent.SynchronousQueue; +import java.util.function.Function; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class LocallyDispatchedActivityPollTask extends ActivityPollTaskBase + implements Function { + + private static final Logger log = + LoggerFactory.getLogger(LocallyDispatchedActivityPollTask.class); + private final SynchronousQueue pendingTasks = new SynchronousQueue<>(); + + public LocallyDispatchedActivityPollTask(SingleWorkerOptions options) { + super(options); + } + + @Override + protected PollForActivityTaskResponse pollTask() throws TException { + Task task; + try { + task = pendingTasks.take(); + } catch (InterruptedException e) { + throw new RuntimeException("locally dispatch activity poll task interrupted", e); + } + try { + if (!task.await()) { + options + .getMetricsScope() + .counter(MetricsType.LOCALLY_DISPATCHED_ACTIVITY_POLL_NO_TASK_COUNTER) + .inc(1); + return null; + } + } catch (InterruptedException e) { + throw new RuntimeException("locally dispatch activity await task interrupted", e); + } + options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_COUNTER).inc(1); + options + .getMetricsScope() + .counter(MetricsType.LOCALLY_DISPATCHED_ACTIVITY_POLL_SUCCEED_COUNTER) + .inc(1); + PollForActivityTaskResponse result = new PollForActivityTaskResponse(); + result.activityId = task.activityId; + result.activityType = task.activityType; + result.header = task.header; + result.input = task.input; + result.workflowExecution = task.workflowExecution; + result.scheduledTimestampOfThisAttempt = task.scheduledTimestampOfThisAttempt; + result.scheduledTimestamp = task.scheduledTimestamp; + result.scheduleToCloseTimeoutSeconds = task.scheduleToCloseTimeoutSeconds; + result.startedTimestamp = task.startedTimestamp; + result.startToCloseTimeoutSeconds = task.startToCloseTimeoutSeconds; + result.heartbeatTimeoutSeconds = task.heartbeatTimeoutSeconds; + result.taskToken = task.taskToken; + result.workflowType = task.workflowType; + result.workflowDomain = task.workflowDomain; + result.attempt = 0; + return result; + } + + @Override + public Boolean apply(Task task) { + // non blocking put to the unbuffered queue + return pendingTasks.offer(task); + } +} diff --git a/src/main/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityWorker.java new file mode 100644 index 000000000..cb9c462b6 --- /dev/null +++ b/src/main/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityWorker.java @@ -0,0 +1,112 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.worker; + +import com.uber.cadence.ActivityType; +import com.uber.cadence.Header; +import com.uber.cadence.PollForActivityTaskResponse; +import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowType; +import com.uber.cadence.internal.worker.Poller.PollTask; +import com.uber.cadence.serviceclient.IWorkflowService; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.function.Function; + +public final class LocallyDispatchedActivityWorker extends ActivityWorker { + + private LocallyDispatchedActivityPollTask ldaPollTask; + + public LocallyDispatchedActivityWorker( + IWorkflowService service, + String domain, + String taskList, + SingleWorkerOptions options, + ActivityTaskHandler handler) { + super( + service, + domain, + taskList, + options, + handler, + "Locally Dispatched Activity Poller taskList="); + ldaPollTask = new LocallyDispatchedActivityPollTask(options); + } + + protected PollTask getOrCreateActivityPollTask() { + return ldaPollTask; + } + + public Function getLocallyDispatchedActivityTaskPoller() { + return ldaPollTask; + } + + public static class Task { + + protected final WorkflowExecution workflowExecution; + protected final String activityId; + protected final ActivityType activityType; + protected final ByteBuffer input; + protected final int scheduleToCloseTimeoutSeconds; + protected final int startToCloseTimeoutSeconds; + protected final int heartbeatTimeoutSeconds; + protected final WorkflowType workflowType; + protected final String workflowDomain; + protected final Header header; + // used to notify the poller the response from server is completed and the task is ready + private final CountDownLatch latch = new CountDownLatch(1); + protected long scheduledTimestamp; + protected long scheduledTimestampOfThisAttempt; + protected long startedTimestamp; + protected ByteBuffer taskToken; + private volatile boolean ready; + + public Task( + String activityId, + ActivityType activityType, + ByteBuffer input, + int scheduleToCloseTimeoutSeconds, + int startToCloseTimeoutSeconds, + int heartbeatTimeoutSeconds, + WorkflowType workflowType, + String workflowDomain, + Header header, + WorkflowExecution workflowExecution) { + this.workflowExecution = workflowExecution; + this.activityId = activityId; + this.activityType = activityType; + this.input = input; + this.scheduleToCloseTimeoutSeconds = scheduleToCloseTimeoutSeconds; + this.startToCloseTimeoutSeconds = startToCloseTimeoutSeconds; + this.heartbeatTimeoutSeconds = heartbeatTimeoutSeconds; + this.workflowType = workflowType; + this.workflowDomain = workflowDomain; + this.header = header; + } + + protected boolean await() throws InterruptedException { + latch.await(); + return ready; + } + + public void notify(boolean ready) { + this.ready = ready; + latch.countDown(); + } + } +} diff --git a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java index d6a7bbeee..f914b1aac 100644 --- a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java @@ -18,13 +18,17 @@ package com.uber.cadence.internal.worker; import com.google.common.base.Strings; +import com.uber.cadence.ActivityLocalDispatchInfo; +import com.uber.cadence.Decision; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; import com.uber.cadence.History; import com.uber.cadence.HistoryEvent; import com.uber.cadence.PollForDecisionTaskResponse; import com.uber.cadence.RespondDecisionTaskCompletedRequest; +import com.uber.cadence.RespondDecisionTaskCompletedResponse; import com.uber.cadence.RespondDecisionTaskFailedRequest; import com.uber.cadence.RespondQueryTaskCompletedRequest; +import com.uber.cadence.ScheduleActivityTaskDecisionAttributes; import com.uber.cadence.WorkflowExecution; import com.uber.cadence.WorkflowExecutionStartedEventAttributes; import com.uber.cadence.WorkflowQuery; @@ -35,15 +39,18 @@ import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; +import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker.Task; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.m3.tally.Scope; import com.uber.m3.tally.Stopwatch; import com.uber.m3.util.ImmutableMap; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.locks.Lock; import java.util.function.Consumer; +import java.util.function.Function; import org.apache.thrift.TException; import org.slf4j.MDC; @@ -51,8 +58,6 @@ public final class WorkflowWorker extends SuspendableWorkerBase implements Consumer { private static final String POLL_THREAD_NAME_PREFIX = "Workflow Poller taskList="; - - private PollTaskExecutor pollTaskExecutor; private final DecisionTaskHandler handler; private final IWorkflowService service; private final String domain; @@ -60,6 +65,8 @@ public final class WorkflowWorker extends SuspendableWorkerBase private final SingleWorkerOptions options; private final String stickyTaskListName; private final WorkflowRunLockManager runLocks = new WorkflowRunLockManager(); + private final Function ldaTaskPoller; + private PollTaskExecutor pollTaskExecutor; public WorkflowWorker( IWorkflowService service, @@ -67,11 +74,13 @@ public WorkflowWorker( String taskList, SingleWorkerOptions options, DecisionTaskHandler handler, + Function ldaTaskPoller, String stickyTaskListName) { this.service = Objects.requireNonNull(service); this.domain = Objects.requireNonNull(domain); this.taskList = Objects.requireNonNull(taskList); this.handler = handler; + this.ldaTaskPoller = ldaTaskPoller; this.stickyTaskListName = stickyTaskListName; PollerOptions pollerOptions = options.getPollerOptions(); @@ -204,7 +213,7 @@ public void handle(PollForDecisionTaskResponse task) throws Exception { sw.stop(); sw = metricsScope.timer(MetricsType.DECISION_RESPONSE_LATENCY).start(); - sendReply(service, task.getTaskToken(), response); + sendReply(service, task, response); sw.stop(); metricsScope.counter(MetricsType.DECISION_TASK_COMPLETED_COUNTER).inc(1); @@ -231,23 +240,90 @@ public Throwable wrapFailure(PollForDecisionTaskResponse task, Throwable failure } private void sendReply( - IWorkflowService service, byte[] taskToken, DecisionTaskHandler.Result response) + IWorkflowService service, + PollForDecisionTaskResponse task, + DecisionTaskHandler.Result response) throws TException { RespondDecisionTaskCompletedRequest taskCompleted = response.getTaskCompleted(); if (taskCompleted != null) { taskCompleted.setIdentity(options.getIdentity()); - taskCompleted.setTaskToken(taskToken); - RpcRetryer.retry(() -> service.RespondDecisionTaskCompleted(taskCompleted)); + taskCompleted.setTaskToken(task.getTaskToken()); + RpcRetryer.retry( + () -> { + RespondDecisionTaskCompletedResponse taskCompletedResponse = null; + List activityTasks = new ArrayList<>(); + try { + for (Decision decision : taskCompleted.getDecisions()) { + ScheduleActivityTaskDecisionAttributes attr = + decision.getScheduleActivityTaskDecisionAttributes(); + if (attr != null && taskList.equals(attr.getTaskList().getName())) { + // assume the activity type is in registry otherwise the activity would be + // failed and retried from server + Task activityTask = + new Task( + attr.getActivityId(), + attr.getActivityType(), + attr.bufferForInput(), + attr.getScheduleToCloseTimeoutSeconds(), + attr.getStartToCloseTimeoutSeconds(), + attr.getHeartbeatTimeoutSeconds(), + task.getWorkflowType(), + domain, + attr.getHeader(), + task.getWorkflowExecution()); + if (ldaTaskPoller.apply(activityTask)) { + options + .getMetricsScope() + .counter(MetricsType.ACTIVITY_LOCAL_DISPATCH_SUCCEED_COUNTER) + .inc(1); + decision + .getScheduleActivityTaskDecisionAttributes() + .setRequestLocalDispatch(true); + activityTasks.add(activityTask); + } else { + // all pollers are busy - no room to optimize + options + .getMetricsScope() + .counter(MetricsType.ACTIVITY_LOCAL_DISPATCH_FAILED_COUNTER) + .inc(1); + } + } + } + taskCompletedResponse = service.RespondDecisionTaskCompleted(taskCompleted); + } finally { + for (Task activityTask : activityTasks) { + boolean started = false; + if (taskCompletedResponse != null + && taskCompletedResponse.getActivitiesToDispatchLocally() != null) { + ActivityLocalDispatchInfo activityLocalDispatchInfo = + taskCompletedResponse + .getActivitiesToDispatchLocally() + .getOrDefault(activityTask.activityId, null); + if (activityLocalDispatchInfo != null) { + activityTask.scheduledTimestamp = + activityLocalDispatchInfo.getScheduledTimestamp(); + activityTask.startedTimestamp = + activityLocalDispatchInfo.getStartedTimestamp(); + activityTask.scheduledTimestampOfThisAttempt = + activityLocalDispatchInfo.getScheduledTimestampOfThisAttempt(); + activityTask.taskToken = activityLocalDispatchInfo.bufferForTaskToken(); + started = true; + } + } + activityTask.notify(started); + } + } + }); } else { RespondDecisionTaskFailedRequest taskFailed = response.getTaskFailed(); if (taskFailed != null) { taskFailed.setIdentity(options.getIdentity()); - taskFailed.setTaskToken(taskToken); + taskFailed.setTaskToken(task.getTaskToken()); RpcRetryer.retry(() -> service.RespondDecisionTaskFailed(taskFailed)); } else { RespondQueryTaskCompletedRequest queryCompleted = response.getQueryCompleted(); if (queryCompleted != null) { - queryCompleted.setTaskToken(taskToken); + queryCompleted.setTaskToken(task.getTaskToken()); // Do not retry query response. service.RespondQueryTaskCompleted(queryCompleted); } diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 1dd9eb79e..2bbe25358 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -124,6 +124,7 @@ public final class Worker implements Suspendable { this.options.getInterceptorFactory(), workflowOptions, localActivityOptions, + activityOptions, cache, stickyTaskListName, stickyDecisionScheduleToStartTimeout, @@ -234,6 +235,7 @@ public void registerActivitiesImplementations(Object... activityImplementations) if (activityWorker != null) { activityWorker.setActivitiesImplementation(activityImplementations); + workflowWorker.setActivitiesImplementationToDispatchLocally(activityImplementations); } workflowWorker.setLocalActivitiesImplementation(activityImplementations);