Skip to content

Commit

Permalink
[internal] Improve code coverage of internal_task_pollers.go (#1373)
Browse files Browse the repository at this point in the history
* [internal] Improve code coverage of internal_task_pollers.go
  • Loading branch information
3vilhamster authored Nov 1, 2024
1 parent f21e350 commit a71b70f
Show file tree
Hide file tree
Showing 6 changed files with 450 additions and 94 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ $(THRIFT_GEN): $(THRIFT_FILES) $(BIN)/thriftrw $(BIN)/thriftrw-plugin-yarpc

# mockery is quite noisy so it's worth being kinda precise with the files.
# this needs to be both the files defining the generate command, AND the files that define the interfaces.
$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go $(BIN)/mockery
$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go internal/internal_public.go internal/internal_task_pollers.go $(BIN)/mockery
$Q $(BIN_PATH) go generate ./...
$Q touch $@

Expand Down
6 changes: 4 additions & 2 deletions internal/internal_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
s "go.uber.org/cadence/.gen/go/shared"
)

//go:generate mockery --name WorkflowTaskHandler --inpackage --with-expecter --case snake --filename internal_workflow_task_handler_mock.go --boilerplate-file ../LICENSE

type (
decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error)

Expand Down Expand Up @@ -71,7 +73,7 @@ type (

// WorkflowTaskHandler represents decision task handlers.
WorkflowTaskHandler interface {
// Processes the workflow task
// ProcessWorkflowTask processes the workflow task
// The response could be:
// - RespondDecisionTaskCompletedRequest
// - RespondDecisionTaskFailedRequest
Expand All @@ -84,7 +86,7 @@ type (

// ActivityTaskHandler represents activity task handlers.
ActivityTaskHandler interface {
// Executes the activity task
// Execute executes the activity task
// The response is one of the types:
// - RespondActivityTaskCompletedRequest
// - RespondActivityTaskFailedRequest
Expand Down
203 changes: 112 additions & 91 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"sync"
"time"

"go.uber.org/yarpc"

"go.uber.org/cadence/internal/common/debug"

"github.com/opentracing/opentracing-go"
Expand All @@ -45,6 +47,8 @@ import (
"go.uber.org/cadence/internal/common/serializer"
)

//go:generate mockery --name localDispatcher --inpackage --with-expecter --case snake --filename local_dispatcher_mock.go --boilerplate-file ../LICENSE

const (
pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta

Expand Down Expand Up @@ -76,7 +80,7 @@ type (
identity string
service workflowserviceclient.Interface
taskHandler WorkflowTaskHandler
ldaTunnel *locallyDispatchedActivityTunnel
ldaTunnel localDispatcher
metricsScope *metrics.TaggedScope
logger *zap.Logger

Expand Down Expand Up @@ -159,6 +163,11 @@ type (
stopCh <-chan struct{}
metricsScope *metrics.TaggedScope
}

// LocalDispatcher is an interface to dispatch locally dispatched activities.
localDispatcher interface {
SendTask(task *locallyDispatchedActivityTask) bool
}
)

func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel {
Expand Down Expand Up @@ -214,7 +223,7 @@ func (ldat *locallyDispatchedActivityTunnel) getTask() *locallyDispatchedActivit
}
}

func (ldat *locallyDispatchedActivityTunnel) sendTask(task *locallyDispatchedActivityTask) bool {
func (ldat *locallyDispatchedActivityTunnel) SendTask(task *locallyDispatchedActivityTask) bool {
select {
case ldat.taskCh <- task:
return true
Expand Down Expand Up @@ -365,7 +374,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
if completedRequest == nil && err == nil {
return nil
}
if _, ok := err.(decisionHeartbeatError); ok {
if errors.As(err, new(*decisionHeartbeatError)) {
return err
}
response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime)
Expand Down Expand Up @@ -398,7 +407,6 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa
}

func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) {

metricsScope := wtp.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName())
if taskErr != nil {
metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1)
Expand All @@ -416,7 +424,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(startTime))

responseStartTime := time.Now()
if response, err = wtp.RespondTaskCompleted(completedRequest, task); err != nil {
if response, err = wtp.respondTaskCompleted(completedRequest, task); err != nil {
metricsScope.Counter(metrics.DecisionResponseFailedCounter).Inc(1)
return
}
Expand All @@ -425,103 +433,116 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
return
}

func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
func (wtp *workflowTaskPoller) respondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
ctx := context.Background()
// Respond task completion.
err = backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx, wtp.featureFlags)
defer cancel()
var err1 error
switch request := completedRequest.(type) {
case *s.RespondDecisionTaskFailedRequest:
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
if task.Attempt != nil && task.GetAttempt() == 0 {
err1 = wtp.service.RespondDecisionTaskFailed(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondDecisionTaskFailed failed.", zap.Error(err1))
})
}
response, err = wtp.respondTaskCompletedAttempt(completedRequest, task)
return err
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)

return response, err
}

func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (*s.RespondDecisionTaskCompletedResponse, error) {
ctx, cancel, opts := newChannelContext(context.Background(), wtp.featureFlags)
defer cancel()
var (
err error
response *s.RespondDecisionTaskCompletedResponse
operation string
)
switch request := completedRequest.(type) {
case *s.RespondDecisionTaskFailedRequest:
err = wtp.handleDecisionFailedRequest(ctx, task, request, opts...)
operation = "RespondDecisionTaskFailed"
case *s.RespondDecisionTaskCompletedRequest:
response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request, opts...)
operation = "RespondDecisionTaskCompleted"
case *s.RespondQueryTaskCompletedRequest:
err = wtp.service.RespondQueryTaskCompleted(ctx, request, opts...)
operation = "RespondQueryTaskCompleted"
default:
// should not happen
panic("unknown request type from ProcessWorkflowTask()")
}

traceLog(func() {
if err != nil {
wtp.logger.Debug(fmt.Sprintf("%s failed.", operation), zap.Error(err))
}
})

return response, err
}

func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest, opts ...yarpc.CallOption) error {
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
if task.Attempt != nil && task.GetAttempt() == 0 {
return wtp.service.RespondDecisionTaskFailed(ctx, request, opts...)
}
return nil
}

func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption) (response *s.RespondDecisionTaskCompletedResponse, err error) {
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
request.StickyAttributes = &s.StickyExecutionAttributes{
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
}
} else {
request.ReturnNewDecisionTask = common.BoolPtr(false)
}

if wtp.ldaTunnel != nil {
var activityTasks []*locallyDispatchedActivityTask
for _, decision := range request.Decisions {
attr := decision.ScheduleActivityTaskDecisionAttributes
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
// assume the activity type is in registry otherwise the activity would be failed and retried from server
activityTask := &locallyDispatchedActivityTask{
readyCh: make(chan bool, 1),
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType,
Input: attr.Input,
Header: attr.Header,
WorkflowDomain: common.StringPtr(wtp.domain),
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
WorkflowExecution: task.WorkflowExecution,
WorkflowType: task.WorkflowType,
}
case *s.RespondDecisionTaskCompletedRequest:
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
request.StickyAttributes = &s.StickyExecutionAttributes{
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
}
if wtp.ldaTunnel.SendTask(activityTask) {
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
activityTasks = append(activityTasks, activityTask)
} else {
request.ReturnNewDecisionTask = common.BoolPtr(false)
}
var activityTasks []*locallyDispatchedActivityTask
if wtp.ldaTunnel != nil {
for _, decision := range request.Decisions {
attr := decision.ScheduleActivityTaskDecisionAttributes
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
// assume the activity type is in registry otherwise the activity would be failed and retried from server
activityTask := &locallyDispatchedActivityTask{
readyCh: make(chan bool, 1),
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType,
Input: attr.Input,
Header: attr.Header,
WorkflowDomain: common.StringPtr(wtp.domain),
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
WorkflowExecution: task.WorkflowExecution,
WorkflowType: task.WorkflowType,
}
if wtp.ldaTunnel.sendTask(activityTask) {
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
activityTasks = append(activityTasks, activityTask)
} else {
// all pollers are busy - no room to optimize
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
}
}
}
// all pollers are busy - no room to optimize
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
}
defer func() {
for _, at := range activityTasks {
started := false
if response != nil && err1 == nil {
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
at.ScheduledTimestamp = adl.ScheduledTimestamp
at.StartedTimestamp = adl.StartedTimestamp
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
at.TaskToken = adl.TaskToken
started = true
}
}
at.readyCh <- started
}
}
defer func() {
for _, at := range activityTasks {
started := false
if response != nil && err == nil {
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
at.ScheduledTimestamp = adl.ScheduledTimestamp
at.StartedTimestamp = adl.StartedTimestamp
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
at.TaskToken = adl.TaskToken
started = true
}
}()
response, err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondDecisionTaskCompleted failed.", zap.Error(err1))
})
}

case *s.RespondQueryTaskCompletedRequest:
err1 = wtp.service.RespondQueryTaskCompleted(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondQueryTaskCompleted failed.", zap.Error(err1))
})
}
default:
// should not happen
panic("unknown request type from ProcessWorkflowTask()")
at.readyCh <- started
}
}()
}

return err1
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)

return
return wtp.service.RespondDecisionTaskCompleted(ctx, request, opts...)
}

func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localActivityTunnel) *localActivityTaskPoller {
Expand Down
Loading

0 comments on commit a71b70f

Please sign in to comment.