diff --git a/internal/error_test.go b/internal/error_test.go index a3cbd0ffa..751060ea9 100644 --- a/internal/error_test.go +++ b/internal/error_test.go @@ -460,7 +460,7 @@ func Test_SignalExternalWorkflowExecutionFailedError(t *testing.T) { InitiatedEventId: common.Int64Ptr(initiatedEventID), Cause: shared.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution.Ptr(), }) - require.NoError(t, weh.handleSignalExternalWorkflowExecutionFailed(event)) + weh.handleSignalExternalWorkflowExecutionFailed(event) _, ok := actualErr.(*UnknownExternalWorkflowExecutionError) require.True(t, ok) } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 7c8ef2346..909aab5cd 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -623,7 +623,10 @@ func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, max // Also upsert search attributes to enable ability to search by changeVersion. version = maxSupported wc.decisionsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter()) - wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions)) + err := wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions)) + if err != nil { + wc.logger.Warn("Failed to upsert search attributes for change version", zap.Error(err)) + } } validateVersion(changeID, version, minSupported, maxSupported) @@ -806,131 +809,89 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( }) switch event.GetEventType() { + // Noops + case m.EventTypeWorkflowExecutionCompleted, + m.EventTypeWorkflowExecutionTimedOut, + m.EventTypeWorkflowExecutionFailed, + m.EventTypeDecisionTaskScheduled, + m.EventTypeDecisionTaskTimedOut, + m.EventTypeDecisionTaskFailed, + m.EventTypeActivityTaskStarted, + m.EventTypeDecisionTaskCompleted, + m.EventTypeWorkflowExecutionCanceled, + m.EventTypeWorkflowExecutionContinuedAsNew: + // No Operation case m.EventTypeWorkflowExecutionStarted: err = weh.handleWorkflowExecutionStarted(event.WorkflowExecutionStartedEventAttributes) - - case m.EventTypeWorkflowExecutionCompleted: - // No Operation - case m.EventTypeWorkflowExecutionFailed: - // No Operation - case m.EventTypeWorkflowExecutionTimedOut: - // No Operation - case m.EventTypeDecisionTaskScheduled: - // No Operation case m.EventTypeDecisionTaskStarted: // Set replay clock. weh.SetCurrentReplayTime(time.Unix(0, event.GetTimestamp())) weh.workflowDefinition.OnDecisionTaskStarted() // Set replay decisionStarted eventID weh.workflowInfo.DecisionStartedEventID = event.GetEventId() - - case m.EventTypeDecisionTaskTimedOut: - // No Operation - case m.EventTypeDecisionTaskFailed: - // No Operation - case m.EventTypeDecisionTaskCompleted: - // No Operation case m.EventTypeActivityTaskScheduled: weh.decisionsHelper.handleActivityTaskScheduled( event.GetEventId(), event.ActivityTaskScheduledEventAttributes.GetActivityId()) - - case m.EventTypeActivityTaskStarted: - // No Operation - case m.EventTypeActivityTaskCompleted: - err = weh.handleActivityTaskCompleted(event) - + weh.handleActivityTaskCompleted(event) case m.EventTypeActivityTaskFailed: - err = weh.handleActivityTaskFailed(event) - + weh.handleActivityTaskFailed(event) case m.EventTypeActivityTaskTimedOut: - err = weh.handleActivityTaskTimedOut(event) - + weh.handleActivityTaskTimedOut(event) case m.EventTypeActivityTaskCancelRequested: weh.decisionsHelper.handleActivityTaskCancelRequested( event.ActivityTaskCancelRequestedEventAttributes.GetActivityId()) - case m.EventTypeRequestCancelActivityTaskFailed: weh.decisionsHelper.handleRequestCancelActivityTaskFailed( event.RequestCancelActivityTaskFailedEventAttributes.GetActivityId()) - case m.EventTypeActivityTaskCanceled: - err = weh.handleActivityTaskCanceled(event) - + weh.handleActivityTaskCanceled(event) case m.EventTypeTimerStarted: weh.decisionsHelper.handleTimerStarted(event.TimerStartedEventAttributes.GetTimerId()) - case m.EventTypeTimerFired: weh.handleTimerFired(event) - case m.EventTypeTimerCanceled: weh.decisionsHelper.handleTimerCanceled(event.TimerCanceledEventAttributes.GetTimerId()) - case m.EventTypeCancelTimerFailed: weh.decisionsHelper.handleCancelTimerFailed(event.CancelTimerFailedEventAttributes.GetTimerId()) - case m.EventTypeWorkflowExecutionCancelRequested: weh.handleWorkflowExecutionCancelRequested() - - case m.EventTypeWorkflowExecutionCanceled: - // No Operation. - case m.EventTypeRequestCancelExternalWorkflowExecutionInitiated: weh.handleRequestCancelExternalWorkflowExecutionInitiated(event) - case m.EventTypeRequestCancelExternalWorkflowExecutionFailed: weh.handleRequestCancelExternalWorkflowExecutionFailed(event) - case m.EventTypeExternalWorkflowExecutionCancelRequested: weh.handleExternalWorkflowExecutionCancelRequested(event) - - case m.EventTypeWorkflowExecutionContinuedAsNew: - // No Operation. - case m.EventTypeWorkflowExecutionSignaled: weh.handleWorkflowExecutionSignaled(event.WorkflowExecutionSignaledEventAttributes) - case m.EventTypeSignalExternalWorkflowExecutionInitiated: signalID := string(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control) weh.decisionsHelper.handleSignalExternalWorkflowExecutionInitiated(event.GetEventId(), signalID) - case m.EventTypeSignalExternalWorkflowExecutionFailed: weh.handleSignalExternalWorkflowExecutionFailed(event) - case m.EventTypeExternalWorkflowExecutionSignaled: weh.handleSignalExternalWorkflowExecutionCompleted(event) - case m.EventTypeMarkerRecorded: err = weh.handleMarkerRecorded(event.GetEventId(), event.MarkerRecordedEventAttributes) - case m.EventTypeStartChildWorkflowExecutionInitiated: weh.decisionsHelper.handleStartChildWorkflowExecutionInitiated( event.StartChildWorkflowExecutionInitiatedEventAttributes.GetWorkflowId()) - case m.EventTypeStartChildWorkflowExecutionFailed: - err = weh.handleStartChildWorkflowExecutionFailed(event) - + weh.handleStartChildWorkflowExecutionFailed(event) case m.EventTypeChildWorkflowExecutionStarted: - err = weh.handleChildWorkflowExecutionStarted(event) - + weh.handleChildWorkflowExecutionStarted(event) case m.EventTypeChildWorkflowExecutionCompleted: - err = weh.handleChildWorkflowExecutionCompleted(event) - + weh.handleChildWorkflowExecutionCompleted(event) case m.EventTypeChildWorkflowExecutionFailed: - err = weh.handleChildWorkflowExecutionFailed(event) - + weh.handleChildWorkflowExecutionFailed(event) case m.EventTypeChildWorkflowExecutionCanceled: - err = weh.handleChildWorkflowExecutionCanceled(event) - + weh.handleChildWorkflowExecutionCanceled(event) case m.EventTypeChildWorkflowExecutionTimedOut: - err = weh.handleChildWorkflowExecutionTimedOut(event) - + weh.handleChildWorkflowExecutionTimedOut(event) case m.EventTypeChildWorkflowExecutionTerminated: - err = weh.handleChildWorkflowExecutionTerminated(event) - + weh.handleChildWorkflowExecutionTerminated(event) case m.EventTypeUpsertWorkflowSearchAttributes: weh.handleUpsertWorkflowSearchAttributes(event) - default: weh.logger.Error("unknown event type", zap.Int64(tagEventID, event.GetEventId()), @@ -1010,38 +971,38 @@ func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionStarted( return nil } -func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCompleted(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCompleted(event *m.HistoryEvent) { activityID := weh.decisionsHelper.getActivityID(event) decision := weh.decisionsHelper.handleActivityTaskClosed(activityID) activity := decision.getData().(*scheduledActivity) if activity.handled { - return nil + return } activity.handle(event.ActivityTaskCompletedEventAttributes.Result, nil) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskFailed(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskFailed(event *m.HistoryEvent) { activityID := weh.decisionsHelper.getActivityID(event) decision := weh.decisionsHelper.handleActivityTaskClosed(activityID) activity := decision.getData().(*scheduledActivity) if activity.handled { - return nil + return } attributes := event.ActivityTaskFailedEventAttributes err := constructError(*attributes.Reason, attributes.Details, weh.GetDataConverter()) activity.handle(nil, err) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskTimedOut(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskTimedOut(event *m.HistoryEvent) { activityID := weh.decisionsHelper.getActivityID(event) decision := weh.decisionsHelper.handleActivityTaskClosed(activityID) activity := decision.getData().(*scheduledActivity) if activity.handled { - return nil + return } var err error @@ -1056,15 +1017,15 @@ func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskTimedOut(event * err = NewTimeoutError(attributes.GetTimeoutType(), details) } activity.handle(nil, err) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCanceled(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCanceled(event *m.HistoryEvent) { activityID := weh.decisionsHelper.getActivityID(event) decision := weh.decisionsHelper.handleActivityTaskCanceled(activityID) activity := decision.getData().(*scheduledActivity) if activity.handled { - return nil + return } if decision.isDone() || !activity.waitForCancelRequest { @@ -1074,7 +1035,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCanceled(event * activity.handle(nil, err) } - return nil + return } func (weh *workflowExecutionEventHandlerImpl) handleTimerFired(event *m.HistoryEvent) { @@ -1101,13 +1062,19 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded( case sideEffectMarkerName: var sideEffectID int32 var result []byte - encodedValues.Get(&sideEffectID, &result) + err := encodedValues.Get(&sideEffectID, &result) + if err != nil { + return fmt.Errorf("extract side effect: %w", err) + } weh.sideEffectResult[sideEffectID] = result return nil case versionMarkerName: var changeID string var version Version - encodedValues.Get(&changeID, &version) + err := encodedValues.Get(&changeID, &version) + if err != nil { + return fmt.Errorf("extract change id: %w", err) + } weh.changeVersions[changeID] = version return nil case localActivityMarkerName: @@ -1115,7 +1082,10 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded( case mutableSideEffectMarkerName: var fixedID string var result string - encodedValues.Get(&fixedID, &result) + err := encodedValues.Get(&fixedID, &result) + if err != nil { + return fmt.Errorf("extract fixed id: %w", err) + } weh.mutableSideEffect[fixedID] = []byte(result) return nil default: @@ -1200,13 +1170,13 @@ func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionSignaled( weh.signalHandler(attributes.GetSignalName(), attributes.Input) } -func (weh *workflowExecutionEventHandlerImpl) handleStartChildWorkflowExecutionFailed(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleStartChildWorkflowExecutionFailed(event *m.HistoryEvent) { attributes := event.StartChildWorkflowExecutionFailedEventAttributes childWorkflowID := attributes.GetWorkflowId() decision := weh.decisionsHelper.handleStartChildWorkflowExecutionFailed(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } err := &m.WorkflowExecutionAlreadyStartedError{ @@ -1215,17 +1185,17 @@ func (weh *workflowExecutionEventHandlerImpl) handleStartChildWorkflowExecutionF childWorkflow.startedCallback(WorkflowExecution{}, err) childWorkflow.handle(nil, err) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionStarted(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionStarted(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionStartedEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() childRunID := attributes.WorkflowExecution.GetRunId() decision := weh.decisionsHelper.handleChildWorkflowExecutionStarted(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } childWorkflowExecution := WorkflowExecution{ @@ -1234,94 +1204,94 @@ func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionStarte } childWorkflow.startedCallback(childWorkflowExecution, nil) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCompleted(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCompleted(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionCompletedEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() decision := weh.decisionsHelper.handleChildWorkflowExecutionClosed(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } childWorkflow.handle(attributes.Result, nil) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionFailed(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionFailed(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionFailedEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() decision := weh.decisionsHelper.handleChildWorkflowExecutionClosed(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } err := constructError(attributes.GetReason(), attributes.Details, weh.GetDataConverter()) childWorkflow.handle(nil, err) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCanceled(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCanceled(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionCanceledEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() decision := weh.decisionsHelper.handleChildWorkflowExecutionCanceled(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } details := newEncodedValues(attributes.Details, weh.GetDataConverter()) err := NewCanceledError(details) childWorkflow.handle(nil, err) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTimedOut(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTimedOut(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionTimedOutEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() decision := weh.decisionsHelper.handleChildWorkflowExecutionClosed(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } err := NewTimeoutError(attributes.GetTimeoutType()) childWorkflow.handle(nil, err) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTerminated(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTerminated(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionTerminatedEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() decision := weh.decisionsHelper.handleChildWorkflowExecutionClosed(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } err := newTerminatedError() childWorkflow.handle(nil, err) - return nil + return } func (weh *workflowExecutionEventHandlerImpl) handleUpsertWorkflowSearchAttributes(event *m.HistoryEvent) { weh.updateWorkflowInfoWithSearchAttributes(event.UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes) } -func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflowExecutionInitiated(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflowExecutionInitiated(event *m.HistoryEvent) { // For cancellation of child workflow only, we do not use cancellation ID // for cancellation of external workflow, we have to use cancellation ID attribute := event.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes workflowID := attribute.WorkflowExecution.GetWorkflowId() cancellationID := string(attribute.Control) weh.decisionsHelper.handleRequestCancelExternalWorkflowExecutionInitiated(event.GetEventId(), workflowID, cancellationID) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleExternalWorkflowExecutionCancelRequested(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleExternalWorkflowExecutionCancelRequested(event *m.HistoryEvent) { // For cancellation of child workflow only, we do not use cancellation ID // for cancellation of external workflow, we have to use cancellation ID attributes := event.ExternalWorkflowExecutionCancelRequestedEventAttributes @@ -1331,15 +1301,15 @@ func (weh *workflowExecutionEventHandlerImpl) handleExternalWorkflowExecutionCan // for cancel external workflow, we need to set the future cancellation := decision.getData().(*scheduledCancellation) if cancellation.handled { - return nil + return } cancellation.handle(nil, nil) } - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflowExecutionFailed(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflowExecutionFailed(event *m.HistoryEvent) { // For cancellation of child workflow only, we do not use cancellation ID // for cancellation of external workflow, we have to use cancellation ID attributes := event.RequestCancelExternalWorkflowExecutionFailedEventAttributes @@ -1349,33 +1319,33 @@ func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflo // for cancel external workflow, we need to set the future cancellation := decision.getData().(*scheduledCancellation) if cancellation.handled { - return nil + return } err := fmt.Errorf("cancel external workflow failed, %v", attributes.GetCause()) cancellation.handle(nil, err) } - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionCompleted(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionCompleted(event *m.HistoryEvent) { attributes := event.ExternalWorkflowExecutionSignaledEventAttributes decision := weh.decisionsHelper.handleSignalExternalWorkflowExecutionCompleted(attributes.GetInitiatedEventId()) signal := decision.getData().(*scheduledSignal) if signal.handled { - return nil + return } signal.handle(nil, nil) - return nil + return } -func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionFailed(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionFailed(event *m.HistoryEvent) { attributes := event.SignalExternalWorkflowExecutionFailedEventAttributes decision := weh.decisionsHelper.handleSignalExternalWorkflowExecutionFailed(attributes.GetInitiatedEventId()) signal := decision.getData().(*scheduledSignal) if signal.handled { - return nil + return } var err error @@ -1388,5 +1358,5 @@ func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecut signal.handle(nil, err) - return nil + return } diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index e7118858e..55afc0b29 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -24,6 +24,10 @@ import ( "encoding/json" "testing" + "github.com/opentracing/opentracing-go" + "github.com/uber-go/tally" + "go.uber.org/zap/zaptest" + "go.uber.org/cadence/internal/common" "github.com/stretchr/testify/assert" @@ -402,3 +406,322 @@ func TestProcessQuery_KnownQueryTypes(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "[\"__open_sessions\",\"__query_types\",\"__stack_trace\",\"a\"]\n", string(result)) } + +func TestWorkflowExecutionEventHandler_ProcessEvent_WorkflowExecutionStarted(t *testing.T) { + t.Run("success", func(t *testing.T) { + testRegistry := newRegistry() + testRegistry.RegisterWorkflowWithOptions(func(ctx Context) error { return nil }, RegisterWorkflowOptions{Name: "test"}) + + weh := testWorkflowExecutionEventHandler(t, testRegistry) + + testHeaderStruct := &s.Header{ + Fields: map[string][]byte{ + "test": []byte("test"), + }, + } + testInput := []byte("testInput") + event := &s.HistoryEvent{ + EventType: common.EventTypePtr(s.EventTypeWorkflowExecutionStarted), + WorkflowExecutionStartedEventAttributes: &s.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &s.WorkflowType{ + Name: common.StringPtr("test"), + }, + Input: testInput, + Header: testHeaderStruct, + }, + } + + err := weh.ProcessEvent(event, false, false) + assert.NoError(t, err) + }) + t.Run("error", func(t *testing.T) { + testRegistry := newRegistry() + + weh := testWorkflowExecutionEventHandler(t, testRegistry) + + event := &s.HistoryEvent{ + EventType: common.EventTypePtr(s.EventTypeWorkflowExecutionStarted), + WorkflowExecutionStartedEventAttributes: &s.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &s.WorkflowType{ + Name: common.StringPtr("test"), + }, + }, + } + + err := weh.ProcessEvent(event, false, false) + assert.ErrorContains(t, err, errMsgUnknownWorkflowType) + }) +} + +func TestWorkflowExecutionEventHandler_ProcessEvent_Noops(t *testing.T) { + for _, tc := range []s.EventType{ + s.EventTypeWorkflowExecutionCompleted, + s.EventTypeWorkflowExecutionTimedOut, + s.EventTypeWorkflowExecutionFailed, + s.EventTypeDecisionTaskScheduled, + s.EventTypeDecisionTaskTimedOut, + s.EventTypeDecisionTaskFailed, + s.EventTypeActivityTaskStarted, + s.EventTypeDecisionTaskCompleted, + s.EventTypeWorkflowExecutionCanceled, + s.EventTypeWorkflowExecutionContinuedAsNew, + } { + t.Run(tc.String(), func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + + event := &s.HistoryEvent{ + EventType: common.EventTypePtr(tc), + } + + err := weh.ProcessEvent(event, false, false) + assert.NoError(t, err) + }) + } +} + +func TestWorkflowExecutionEventHandler_ProcessEvent_nil(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + + err := weh.ProcessEvent(nil, false, false) + assert.ErrorContains(t, err, "nil event provided") +} + +func TestWorkflowExecutionEventHandler_ProcessEvent_no_error_events(t *testing.T) { + for _, tc := range []struct { + event *s.HistoryEvent + prepareHandler func(*testing.T, *workflowExecutionEventHandlerImpl) + assertHandler func(*testing.T, *workflowExecutionEventHandlerImpl) + }{ + { + event: &s.HistoryEvent{ + EventType: s.EventType(-1).Ptr(), + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeActivityTaskCompleted.Ptr(), + EventId: common.Int64Ptr(2), + ActivityTaskCompletedEventAttributes: &s.ActivityTaskCompletedEventAttributes{ + Result: []byte("test"), + ScheduledEventId: common.Int64Ptr(1), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.scheduleActivityTask(&s.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("test-activity"), + }) + decision.setData(&scheduledActivity{ + handled: true, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleActivityTaskScheduled(1, "test-activity") + impl.completeHandler = func(result []byte, err error) { + assert.NoError(t, err) + } + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeActivityTaskFailed.Ptr(), + EventId: common.Int64Ptr(3), + ActivityTaskFailedEventAttributes: &s.ActivityTaskFailedEventAttributes{ + Reason: common.StringPtr("test-reason"), + ScheduledEventId: common.Int64Ptr(2), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.scheduleActivityTask(&s.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("test-activity"), + }) + decision.setData(&scheduledActivity{ + handled: false, + callback: func(result []byte, err error) { + var customErr *CustomError + assert.ErrorAs(t, err, &customErr) + assert.Equal(t, customErr.reason, "test-reason") + }, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleActivityTaskScheduled(2, "test-activity") + impl.completeHandler = func(result []byte, err error) { + assert.NoError(t, err) + } + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeActivityTaskCanceled.Ptr(), + EventId: common.Int64Ptr(4), + ActivityTaskCanceledEventAttributes: &s.ActivityTaskCanceledEventAttributes{ + Details: []byte("test-details"), + ScheduledEventId: common.Int64Ptr(3), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.scheduleActivityTask(&s.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("test-activity"), + }) + decision.setData(&scheduledActivity{ + handled: false, + callback: func(result []byte, err error) { + var canceledErr *CanceledError + assert.ErrorAs(t, err, &canceledErr) + }, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleActivityTaskScheduled(3, "test-activity") + decision.cancel() + decision.handleDecisionSent() + impl.completeHandler = func(result []byte, err error) { + assert.NoError(t, err) + } + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeTimerCanceled.Ptr(), + EventId: common.Int64Ptr(5), + TimerCanceledEventAttributes: &s.TimerCanceledEventAttributes{ + TimerId: common.StringPtr("test-timer"), + StartedEventId: common.Int64Ptr(4), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.startTimer(&s.StartTimerDecisionAttributes{ + TimerId: common.StringPtr("test-timer"), + }) + decision.setData(&scheduledTimer{ + callback: func(result []byte, err error) { + var canceledErr *CanceledError + assert.ErrorAs(t, err, &canceledErr) + }, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleTimerStarted("test-timer") + decision.cancel() + decision.handleDecisionSent() + impl.completeHandler = func(result []byte, err error) { + assert.NoError(t, err) + } + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeCancelTimerFailed.Ptr(), + EventId: common.Int64Ptr(6), + CancelTimerFailedEventAttributes: &s.CancelTimerFailedEventAttributes{ + TimerId: common.StringPtr("test-timer"), + Cause: common.StringPtr("test-cause"), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.startTimer(&s.StartTimerDecisionAttributes{ + TimerId: common.StringPtr("test-timer"), + }) + decision.setData(&scheduledTimer{ + callback: func(result []byte, err error) { + var customErr *CustomError + assert.ErrorAs(t, err, &customErr) + assert.Equal(t, customErr.reason, "test-cause") + }, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleTimerStarted("test-timer") + decision.cancel() + decision.handleDecisionSent() + impl.completeHandler = func(result []byte, err error) { + assert.NoError(t, err) + } + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeWorkflowExecutionCancelRequested.Ptr(), + EventId: common.Int64Ptr(7), + WorkflowExecutionCancelRequestedEventAttributes: &s.WorkflowExecutionCancelRequestedEventAttributes{ + Cause: common.StringPtr("test-cause"), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + cancelCalled := false + t.Cleanup(func() { + if !cancelCalled { + t.Error("cancelWorkflow not called") + t.FailNow() + } + }) + impl.cancelHandler = func() { + cancelCalled = true + } + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeRequestCancelExternalWorkflowExecutionInitiated.Ptr(), + EventId: common.Int64Ptr(8), + RequestCancelExternalWorkflowExecutionInitiatedEventAttributes: &s.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes{ + DecisionTaskCompletedEventId: common.Int64Ptr(7), + Domain: common.StringPtr(testDomain), + WorkflowExecution: &s.WorkflowExecution{ + WorkflowId: common.StringPtr("wid"), + }, + ChildWorkflowOnly: common.BoolPtr(true), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.startChildWorkflowExecution(&s.StartChildWorkflowExecutionDecisionAttributes{ + Domain: common.StringPtr(testDomain), + WorkflowId: common.StringPtr("wid"), + }) + decision.setData(&scheduledChildWorkflow{ + handled: true, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleStartChildWorkflowExecutionInitiated("wid") + impl.decisionsHelper.handleChildWorkflowExecutionStarted("wid") + decision = impl.decisionsHelper.requestCancelExternalWorkflowExecution(testDomain, "wid", "", "", true) + decision.setData(&scheduledCancellation{ + callback: func(result []byte, err error) { + var canceledErr *CanceledError + assert.ErrorAs(t, err, &canceledErr) + }, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleRequestCancelExternalWorkflowExecutionInitiated(7, "wid", "") + decision.handleDecisionSent() + }, + }, + } { + t.Run(tc.event.EventType.String(), func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + if tc.prepareHandler != nil { + tc.prepareHandler(t, weh) + } + + err := weh.ProcessEvent(tc.event, false, false) + assert.NoError(t, err) + }) + } +} + +func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workflowExecutionEventHandlerImpl { + return newWorkflowExecutionEventHandler( + testWorkflowInfo, + func(result []byte, err error) {}, + zaptest.NewLogger(t), + true, + tally.NewTestScope("test", nil), + registry, + nil, + nil, + opentracing.NoopTracer{}, + nil, + ).(*workflowExecutionEventHandlerImpl) +} + +var testWorkflowInfo = &WorkflowInfo{ + WorkflowType: WorkflowType{ + Name: "test", + Path: "", + }, +} diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index b98982b7c..1dbe1429a 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -106,7 +106,7 @@ type ( // WorkflowDefinition wraps the code that can execute a workflow. workflowDefinition interface { Execute(env workflowEnvironment, header *shared.Header, input []byte) - // Called for each non timed out startDecision event. + // OnDecisionTaskStarted is called for each non timed out startDecision event. // Executed after all history events since the previous decision are applied to workflowDefinition OnDecisionTaskStarted() StackTrace() string // Stack trace of all coroutines owned by the Dispatcher instance