Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[internal] Improve code coverage of internal_task_pollers.go #1373

Merged
Next Next commit
[internal] Improve code coverage of internal_task_pollers.go
  • Loading branch information
3vilhamster committed Nov 1, 2024
commit 105b0161aacb92d79f5c5822c7b919c032674dd8
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -219,7 +219,6 @@ $(THRIFT_GEN): $(THRIFT_FILES) $(BIN)/thriftrw $(BIN)/thriftrw-plugin-yarpc
# this needs to be both the files defining the generate command, AND the files that define the interfaces.
$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go $(BIN)/mockery
$Q $(BIN_PATH) go generate ./...
$Q touch $@
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved

# ====================================
# other intermediates
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ codecov:
ignore:
- "**/*_generated.go"
- "**/*_mock.go"
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved
- "**/mock_*.go"
- "**/testdata/**"
- "**/*_test.go"
- "**/*_testsuite.go"
6 changes: 4 additions & 2 deletions internal/internal_public.go
Original file line number Diff line number Diff line change
@@ -33,6 +33,8 @@ import (
s "go.uber.org/cadence/.gen/go/shared"
)

//go:generate mockery --srcpkg . --name WorkflowTaskHandler --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved

type (
decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error)

@@ -71,7 +73,7 @@ type (

// WorkflowTaskHandler represents decision task handlers.
WorkflowTaskHandler interface {
// Processes the workflow task
// ProcessWorkflowTask processes the workflow task
// The response could be:
// - RespondDecisionTaskCompletedRequest
// - RespondDecisionTaskFailedRequest
@@ -84,7 +86,7 @@ type (

// ActivityTaskHandler represents activity task handlers.
ActivityTaskHandler interface {
// Executes the activity task
// Execute executes the activity task
// The response is one of the types:
// - RespondActivityTaskCompletedRequest
// - RespondActivityTaskFailedRequest
205 changes: 111 additions & 94 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
@@ -45,6 +45,8 @@ import (
"go.uber.org/cadence/internal/common/serializer"
)

//go:generate mockery --srcpkg . --name LocalDispatcher --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved

const (
pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta

@@ -76,7 +78,7 @@ type (
identity string
service workflowserviceclient.Interface
taskHandler WorkflowTaskHandler
ldaTunnel *locallyDispatchedActivityTunnel
ldaTunnel LocalDispatcher
metricsScope *metrics.TaggedScope
logger *zap.Logger

@@ -159,6 +161,11 @@ type (
stopCh <-chan struct{}
metricsScope *metrics.TaggedScope
}

// LocalDispatcher is an interface to dispatch locally dispatched activities.
LocalDispatcher interface {
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved
SendTask(task *locallyDispatchedActivityTask) bool
}
)

func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel {
@@ -214,7 +221,7 @@ func (ldat *locallyDispatchedActivityTunnel) getTask() *locallyDispatchedActivit
}
}

func (ldat *locallyDispatchedActivityTunnel) sendTask(task *locallyDispatchedActivityTask) bool {
func (ldat *locallyDispatchedActivityTunnel) SendTask(task *locallyDispatchedActivityTask) bool {
select {
case ldat.taskCh <- task:
return true
@@ -349,7 +356,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
func(response interface{}, startTime time.Time) (*workflowTask, error) {
wtp.logger.Debug("Force RespondDecisionTaskCompleted.", zap.Int64("TaskStartedEventID", task.task.GetStartedEventId()))
wtp.metricsScope.Counter(metrics.DecisionTaskForceCompleted).Inc(1)
heartbeatResponse, err := wtp.RespondTaskCompletedWithMetrics(response, nil, task.task, startTime)
heartbeatResponse, err := wtp.RespondTaskCompleted(response, nil, task.task, startTime)
if err != nil {
return nil, err
}
@@ -365,10 +372,10 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
if completedRequest == nil && err == nil {
return nil
}
if _, ok := err.(decisionHeartbeatError); ok {
if errors.As(err, new(*decisionHeartbeatError)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, looks good - it's only returned from one place right now, and no wrapping, so this won't change behavior 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not working before at all.
The error was returned as &decicisionHeartbeatError so type assertion err.(decisionHeartbeatError) was always false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good point. and good fix 👍

return err
}
response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime)
response, err = wtp.RespondTaskCompleted(completedRequest, err, task.task, startTime)
if err != nil {
return err
}
@@ -397,8 +404,7 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa
return nil
}

func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) {

func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) {
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved
metricsScope := wtp.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName())
if taskErr != nil {
metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1)
@@ -416,7 +422,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(startTime))

responseStartTime := time.Now()
if response, err = wtp.RespondTaskCompleted(completedRequest, task); err != nil {
if response, err = wtp.respondTaskCompleted(completedRequest, task); err != nil {
metricsScope.Counter(metrics.DecisionResponseFailedCounter).Inc(1)
return
}
@@ -425,103 +431,114 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
return
}

func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
func (wtp *workflowTaskPoller) respondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
ctx := context.Background()
// Respond task completion.
err = backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx, wtp.featureFlags)
defer cancel()
var err1 error
switch request := completedRequest.(type) {
case *s.RespondDecisionTaskFailedRequest:
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
if task.Attempt != nil && task.GetAttempt() == 0 {
err1 = wtp.service.RespondDecisionTaskFailed(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondDecisionTaskFailed failed.", zap.Error(err1))
})
}
response, err = wtp.respondTaskCompletedAttempt(completedRequest, task)
return err
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)

return response, err
}

func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (*s.RespondDecisionTaskCompletedResponse, error) {
ctx, cancel, _ := newChannelContext(context.Background(), wtp.featureFlags)
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
var (
err error
response *s.RespondDecisionTaskCompletedResponse
operation string
)
switch request := completedRequest.(type) {
case *s.RespondDecisionTaskFailedRequest:
err = wtp.handleDecisionFailedRequest(ctx, task, request)
operation = "RespondDecisionTaskFailed"
case *s.RespondDecisionTaskCompletedRequest:
response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request)
operation = "RespondDecisionTaskCompleted"
case *s.RespondQueryTaskCompletedRequest:
err = wtp.service.RespondQueryTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
operation = "RespondQueryTaskCompleted"
default:
// should not happen
panic("unknown request type from ProcessWorkflowTask()")
}

traceLog(func() {
wtp.logger.Debug("Call failed.", zap.Error(err), zap.String("Operation", operation))
})
3vilhamster marked this conversation as resolved.
Show resolved Hide resolved

return response, err
}

func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest) error {
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
if task.Attempt != nil && task.GetAttempt() == 0 {
return wtp.service.RespondDecisionTaskFailed(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
}
return nil
}

func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest) (response *s.RespondDecisionTaskCompletedResponse, err error) {
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
request.StickyAttributes = &s.StickyExecutionAttributes{
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
}
} else {
request.ReturnNewDecisionTask = common.BoolPtr(false)
}

if wtp.ldaTunnel != nil {
var activityTasks []*locallyDispatchedActivityTask
for _, decision := range request.Decisions {
attr := decision.ScheduleActivityTaskDecisionAttributes
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
// assume the activity type is in registry otherwise the activity would be failed and retried from server
activityTask := &locallyDispatchedActivityTask{
readyCh: make(chan bool, 1),
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType,
Input: attr.Input,
Header: attr.Header,
WorkflowDomain: common.StringPtr(wtp.domain),
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
WorkflowExecution: task.WorkflowExecution,
WorkflowType: task.WorkflowType,
}
case *s.RespondDecisionTaskCompletedRequest:
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
request.StickyAttributes = &s.StickyExecutionAttributes{
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
}
if wtp.ldaTunnel.SendTask(activityTask) {
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
activityTasks = append(activityTasks, activityTask)
} else {
request.ReturnNewDecisionTask = common.BoolPtr(false)
// all pollers are busy - no room to optimize
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
}
var activityTasks []*locallyDispatchedActivityTask
if wtp.ldaTunnel != nil {
for _, decision := range request.Decisions {
attr := decision.ScheduleActivityTaskDecisionAttributes
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
// assume the activity type is in registry otherwise the activity would be failed and retried from server
activityTask := &locallyDispatchedActivityTask{
readyCh: make(chan bool, 1),
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType,
Input: attr.Input,
Header: attr.Header,
WorkflowDomain: common.StringPtr(wtp.domain),
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
WorkflowExecution: task.WorkflowExecution,
WorkflowType: task.WorkflowType,
}
if wtp.ldaTunnel.sendTask(activityTask) {
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
activityTasks = append(activityTasks, activityTask)
} else {
// all pollers are busy - no room to optimize
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
}
}
}
}
defer func() {
for _, at := range activityTasks {
started := false
if response != nil && err1 == nil {
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
at.ScheduledTimestamp = adl.ScheduledTimestamp
at.StartedTimestamp = adl.StartedTimestamp
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
at.TaskToken = adl.TaskToken
started = true
}
}
at.readyCh <- started
}
}
defer func() {
for _, at := range activityTasks {
started := false
if response != nil && err == nil {
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
at.ScheduledTimestamp = adl.ScheduledTimestamp
at.StartedTimestamp = adl.StartedTimestamp
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
at.TaskToken = adl.TaskToken
started = true
}
}()
response, err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondDecisionTaskCompleted failed.", zap.Error(err1))
})
}

case *s.RespondQueryTaskCompletedRequest:
err1 = wtp.service.RespondQueryTaskCompleted(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondQueryTaskCompleted failed.", zap.Error(err1))
})
}
default:
// should not happen
panic("unknown request type from ProcessWorkflowTask()")
at.readyCh <- started
}
}()
}

return err1
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)

return
return wtp.service.RespondDecisionTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
}

func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localActivityTunnel) *localActivityTaskPoller {
Loading