diff --git a/internal/error.go b/internal/error.go index 12dc9f654..12da08834 100644 --- a/internal/error.go +++ b/internal/error.go @@ -508,6 +508,7 @@ func (e *NonDeterministicError) Error() string { case "mismatch": // historical text return "nondeterministic workflow: " + + "mismatching history event and replay decision found. " + "history event is " + e.HistoryEventText + ", " + "replay decision is " + e.DecisionText default: diff --git a/internal/internal_activity.go b/internal/internal_activity.go index ed0e5c835..a75eff882 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -432,6 +432,10 @@ func deSerializeFunctionResult(f interface{}, result []byte, to interface{}, dat } // For everything we return result. + // Code reaches here for 2 cases: + // 1. activity is executed by name (not the func pointer) and it wasn't registered + // 2. activity is executed by func pointer and the signature indicates it doesn't/can't return data. + // for example it only has one return parameter (which can only be be error). return decodeArg(dataConverter, result, to) } diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index c872b87e0..7ac0482b8 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -262,6 +262,28 @@ func isDecisionEvent(eventType s.EventType) bool { } } +// isDecisionEventForReplay is different from isDecisionEvent because during replays +// we want to intentionally ignore workflow complete/fail/cancel/continueasnew events so that +// decision tree replays matches with the workflow processing respond tasks +func isDecisionEventForReplay(eventType s.EventType) bool { + switch eventType { + case + s.EventTypeActivityTaskScheduled, + s.EventTypeActivityTaskCancelRequested, + s.EventTypeTimerStarted, + s.EventTypeTimerCanceled, + s.EventTypeCancelTimerFailed, + s.EventTypeMarkerRecorded, + s.EventTypeStartChildWorkflowExecutionInitiated, + s.EventTypeRequestCancelExternalWorkflowExecutionInitiated, + s.EventTypeSignalExternalWorkflowExecutionInitiated, + s.EventTypeUpsertWorkflowSearchAttributes: + return true + default: + return false + } +} + // NextDecisionEvents returns events that there processed as new by the next decision. // TODO(maxim): Refactor to return a struct instead of multiple parameters func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.HistoryEvent, binaryChecksum *string, err error) { @@ -840,6 +862,19 @@ process_Workflow_Loop: return response, err } +// ProcessWorkflowTask processes the given workflow which includes +// - fetching, reordering and replaying historical decision events. (Decision events in this context is an umbrella term for workflow relevant events) +// - state machine is incrementally built with every decision. +// - state machine makes sure that when a workflow restarts for some reason same activities (or timers etc.) are not called again and previous result state is loaded into memory +// +// Note about Replay tests mode: +// +// This mode works by replaying the historical decision events responses (as defined in isDecisionEventForReplay()) +// and comparing these with the replays gotten from state machine +// +// Compared to isDecisionEvent(), isDecisionEventForReplay() omits the following events even though they are workflow relevant respond events: +// complete/failed/cancel/continueasnew +// The reason is that state machine doesn't have a correspondong decision for these so they cause false positive non-determinism errors in Replay tests. func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) { task := workflowTask.task historyIterator := workflowTask.historyIterator @@ -899,8 +934,16 @@ ProcessEvents: for i, event := range reorderedEvents { isInReplay := reorderedHistory.IsReplayEvent(event) isLast := !isInReplay && i == len(reorderedEvents)-1 - if !skipReplayCheck && isDecisionEvent(event.GetEventType()) { - respondEvents = append(respondEvents, event) + if !skipReplayCheck { + isDecisionEventFn := isDecisionEvent + // when strict nondeterminism is enabled we use a different function to check for decision events during replay + if !w.wth.disableStrictNonDeterminism && isInReplay { + isDecisionEventFn = isDecisionEventForReplay + } + + if isDecisionEventFn(event.GetEventType()) { + respondEvents = append(respondEvents, event) + } } if isPreloadMarkerEvent(event) { @@ -918,7 +961,16 @@ ProcessEvents: if err != nil { return nil, err } - if w.isWorkflowCompleted { + + // Break the event processing loop if either + // - Workflow is completed AND strict nondeterminism checks disabled. + // - Workflow is completed AND strict nondeterminism checks enabled AND NOT in replay mode. + // With strict nondeterminism checks enabled, breaking the loop early causes missing events + // in respondEvents which then causes false positives or false negatives. + stopProcessing := (w.isWorkflowCompleted && w.wth.disableStrictNonDeterminism) || + (w.isWorkflowCompleted && !w.wth.disableStrictNonDeterminism && !isInReplay) + + if stopProcessing { break ProcessEvents } } @@ -936,6 +988,9 @@ ProcessEvents: } } isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) + // incomplete decisions (e.g. start without a complete) at the end of history will still have decisions in decisionsHelper + // but there won't be corresponding respond events. This breaks the non-determinism check therefore we ignore such final partial decisions. + // Example scenario is covered by TestReplayWorkflowHistory_Partial_NoDecisionEvents lastDecisionEventsForReplayTest := isReplayTest && !reorderedHistory.HasNextDecisionEvents() if isReplay && !lastDecisionEventsForReplayTest { eventDecisions := eventHandler.decisionsHelper.getDecisions(true) diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index cb90afdc5..8708352da 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -880,9 +880,9 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicLogNonexistingI require.NotNil(t.T(), replayErrorField) require.Equal(t.T(), zapcore.ErrorType, replayErrorField.Type) require.ErrorContains(t.T(), replayErrorField.Interface.(error), - "nondeterministic workflow: "+ + "nondeterministic workflow: mismatching history event and replay decision found. "+ "history event is ActivityTaskScheduled: (ActivityId:NotAnActivityID, ActivityType:(Name:pkg.Greeter_Activity), TaskList:(Name:taskList), Input:[]), "+ - "replay decision is ScheduleActivityTask: (ActivityId:0, ActivityType:(Name:Greeter_Activity), TaskList:(Name:taskList)") + "replay decision is ScheduleActivityTask: (ActivityId:0, ActivityType:(Name:Greeter_Activity), TaskList:(Name:taskList), Input:[], ScheduleToCloseTimeoutSeconds:120, ScheduleToStartTimeoutSeconds:60, StartToCloseTimeoutSeconds:60, HeartbeatTimeoutSeconds:20, Header:(Fields:map[]))") } func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowReturnsPanicError() { diff --git a/internal/workflow_replayer_test.go b/internal/workflow_replayer_test.go index 90c7c52e6..ead4cce22 100644 --- a/internal/workflow_replayer_test.go +++ b/internal/workflow_replayer_test.go @@ -99,6 +99,23 @@ func (s *workflowReplayerSuite) TestReplayWorkflowHistory_Partial_WithDecisionEv s.NoError(err) } +// This test case covers partial decision scenario where a decision is started but not closed +// History: +// +// 1: WorkflowExecutionStarted +// 2: DecisionTaskScheduled +// 3: DecisionTaskStarted +// 4: DecisionTaskFailed +// 5: DecisionTaskScheduled +// 6: DecisionTaskStarted +// +// Notes on task handling logic during replay: +// +// reorderedHistory.NextDecisionEvents() ignores events 2, 3, 4 because it failed. +// it only returns 1 and 6 to be replayed. +// 6 changes the state in decisionsHelper (generates a decision) however there's no corresponding +// respond due to missing close event (failed/complete etc.) +// Such partial decisions at the end of the history is ignored during replay tests to avoid non-determinism error func (s *workflowReplayerSuite) TestReplayWorkflowHistory_Partial_NoDecisionEvents() { err := s.replayer.ReplayWorkflowHistory(s.logger, getTestReplayWorkflowPartialHistoryNoDecisionEvents(s.T())) s.NoError(err) diff --git a/test/replaytests/branch_workflow.go b/test/replaytests/branch_workflow.go index 380660fa7..99b189763 100644 --- a/test/replaytests/branch_workflow.go +++ b/test/replaytests/branch_workflow.go @@ -72,7 +72,7 @@ func sampleBranchWorkflow2(ctx workflow.Context) error { } ctx = workflow.WithActivityOptions(ctx, ao) - for i := 1; i <= 4; i++ { + for i := 1; i <= 2; i++ { activityInput := fmt.Sprintf("branch %d of 4", i) future := workflow.ExecuteActivity(ctx, sampleActivity, activityInput) futures = append(futures, future) diff --git a/test/replaytests/choice.json b/test/replaytests/choice.json index 4e4962c17..384c9cdbe 100644 --- a/test/replaytests/choice.json +++ b/test/replaytests/choice.json @@ -176,7 +176,34 @@ }, { "eventId": 12, - "timestamp": 1679427717321911295, + "timestamp": 1679427717321780254, + "eventType": "ActivityTaskStarted", + "version": 0, + "taskId": 5243011, + "activityTaskStartedEventAttributes": { + "scheduledEventId": 11, + "identity": "82203@agautam-NV709R969P@choiceGroup@41d230ae-253a-4d01-9079-322ef05c09fb", + "requestId": "ae2aad96-6588-4359-807b-a39a16f0896a", + "attempt": 0, + "lastFailureReason": "" + } + }, + { + "eventId": 13, + "timestamp": 1679427717321780255, + "eventType": "ActivityTaskCompleted", + "version": 0, + "taskId": 5243000, + "activityTaskCompletedEventAttributes": { + "result": "ImJhbmFuYSIK", + "scheduledEventId": 11, + "startedEventId": 12, + "identity": "82203@agautam-NV709R969P@choiceGroup@41d230ae-253a-4d01-9079-322ef05c09fb" + } + }, + { + "eventId": 14, + "timestamp": 1679427717321780256, "eventType": "WorkflowExecutionCompleted", "version": 0, "taskId": 5243011, diff --git a/test/replaytests/continue_as_new.json b/test/replaytests/continue_as_new.json new file mode 100644 index 000000000..ef403b847 --- /dev/null +++ b/test/replaytests/continue_as_new.json @@ -0,0 +1,90 @@ +[ + { + "eventId": 1, + "timestamp": 1699856700704442400, + "eventType": "WorkflowExecutionStarted", + "version": 4, + "taskId": 882931375, + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "fx.SimpleSignalWorkflow" + }, + "taskList": { + "name": "fx-worker" + }, + "executionStartToCloseTimeoutSeconds": 600, + "taskStartToCloseTimeoutSeconds": 10, + "continuedExecutionRunId": "a664f402-bfe9-4739-945c-9cbc637548f1", + "initiator": "CronSchedule", + "continuedFailureReason": "cadenceInternal:Timeout START_TO_CLOSE", + "originalExecutionRunId": "d0baf930-6a83-4740-b773-71aaa696eed1", + "firstExecutionRunId": "e85fa1b9-8899-40ce-8af9-7e0f93ed7ae5", + "firstScheduleTimeNano": "2023-05-22T15:45:26.535595761-07:00", + "cronSchedule": "* * * * *", + "firstDecisionTaskBackoffSeconds": 60, + "PartitionConfig": { + "isolation-group": "dca11" + } + } + }, + { + "eventId": 2, + "timestamp": 1699856760713586608, + "eventType": "DecisionTaskScheduled", + "version": 4, + "taskId": 882931383, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "fx-worker" + }, + "startToCloseTimeoutSeconds": 10 + } + }, + { + "eventId": 3, + "timestamp": 1699856760741837021, + "eventType": "DecisionTaskStarted", + "version": 4, + "taskId": 882931387, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 2, + "identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4", + "requestId": "bb0ee926-13d1-4af4-9f9c-51433333ad04" + } + }, + { + "eventId": 4, + "timestamp": 1699856760773459755, + "eventType": "DecisionTaskCompleted", + "version": 4, + "taskId": 882931391, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 2, + "startedEventId": 3, + "identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4", + "binaryChecksum": "uDeploy:dc3e318b30a49e8bb88f462a50fe3a01dd210a3a" + } + }, + { + "eventId": 5, + "timestamp": 1699857360713649962, + "eventType": "WorkflowExecutionContinuedAsNew", + "version": 4, + "taskId": 882931394, + "workflowExecutionContinuedAsNewEventAttributes": { + "newExecutionRunId": "06c2468c-2d2d-44f7-ac7a-ff3c383f6e90", + "workflowType": { + "name": "fx.SimpleSignalWorkflow" + }, + "taskList": { + "name": "fx-worker" + }, + "executionStartToCloseTimeoutSeconds": 600, + "taskStartToCloseTimeoutSeconds": 10, + "decisionTaskCompletedEventId": -23, + "backoffStartIntervalInSeconds": 60, + "initiator": "CronSchedule", + "failureReason": "cadenceInternal:Timeout START_TO_CLOSE" + } + } +] diff --git a/test/replaytests/continue_as_new_wf.go b/test/replaytests/continue_as_new_wf.go new file mode 100644 index 000000000..a09c83b89 --- /dev/null +++ b/test/replaytests/continue_as_new_wf.go @@ -0,0 +1,26 @@ +package replaytests + +import ( + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// ContinueAsNewWorkflow is a sample Cadence workflows that can receive a signal +func ContinueAsNewWorkflow(ctx workflow.Context) error { + selector := workflow.NewSelector(ctx) + var signalResult string + signalName := "helloWorldSignal" + for { + signalChan := workflow.GetSignalChannel(ctx, signalName) + selector.AddReceive(signalChan, func(c workflow.Channel, more bool) { + c.Receive(ctx, &signalResult) + workflow.GetLogger(ctx).Info("Received age signalResult from signal!", zap.String("signal", signalName), zap.String("value", signalResult)) + }) + workflow.GetLogger(ctx).Info("Waiting for signal on channel.. " + signalName) + // Wait for signal + selector.Select(ctx) + if signalResult == "kill" { + return nil + } + } +} diff --git a/test/replaytests/exclusive_choice_workflow.go b/test/replaytests/exclusive_choice_workflow.go index 595f0a747..e2a2186ad 100644 --- a/test/replaytests/exclusive_choice_workflow.go +++ b/test/replaytests/exclusive_choice_workflow.go @@ -39,7 +39,7 @@ const ( orderChoiceCherry = "cherry" ) -// exclusiveChoiceWorkflow Workflow Decider. This workflow executes Cherry order. +// exclusiveChoiceWorkflow executes main.getOrderActivity and executes either cherry or banana activity depends on what main.getOrderActivity returns func exclusiveChoiceWorkflow(ctx workflow.Context) error { // Get order. ao := workflow.ActivityOptions{ @@ -50,7 +50,7 @@ func exclusiveChoiceWorkflow(ctx workflow.Context) error { ctx = workflow.WithActivityOptions(ctx, ao) var orderChoice string - err := workflow.ExecuteActivity(ctx, getOrderActivity).Get(ctx, &orderChoice) + err := workflow.ExecuteActivity(ctx, "main.getOrderActivity").Get(ctx, &orderChoice) if err != nil { return err } @@ -60,9 +60,9 @@ func exclusiveChoiceWorkflow(ctx workflow.Context) error { // choose next activity based on order result switch orderChoice { case orderChoiceBanana: - workflow.ExecuteActivity(ctx, orderBananaActivity, orderChoice) + workflow.ExecuteActivity(ctx, "main.orderBananaActivity", orderChoice) case orderChoiceCherry: - workflow.ExecuteActivity(ctx, orderCherryActivity, orderChoice) + workflow.ExecuteActivity(ctx, "main.orderCherryActivity", orderChoice) default: logger.Error("Unexpected order", zap.String("Choice", orderChoice)) } @@ -71,8 +71,8 @@ func exclusiveChoiceWorkflow(ctx workflow.Context) error { return nil } -// This workflow explicitly executes Apple Activity received from the getorderActivity. -func exclusiveChoiceWorkflow2(ctx workflow.Context) error { +// exclusiveChoiceWorkflow executes main.getOrderActivity and executes either cherry or banana activity depends on what main.getOrderActivity returns +func exclusiveChoiceWorkflowAlwaysCherry(ctx workflow.Context) error { // Get order. ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, @@ -82,40 +82,25 @@ func exclusiveChoiceWorkflow2(ctx workflow.Context) error { ctx = workflow.WithActivityOptions(ctx, ao) var orderChoice string - err := workflow.ExecuteActivity(ctx, getAppleOrderActivity).Get(ctx, &orderChoice) + err := workflow.ExecuteActivity(ctx, "main.getOrderActivity").Get(ctx, &orderChoice) if err != nil { return err } logger := workflow.GetLogger(ctx) + logger.Sugar().Infof("Got order for %s but will ignore and order cherry!!", orderChoice) - // choose next activity based on order result. It's apple in this case. - switch orderChoice { - case orderChoiceApple: - workflow.ExecuteActivity(ctx, orderAppleActivity, orderChoice) - default: - logger.Error("Unexpected order", zap.String("Choice", orderChoice)) - } + workflow.ExecuteActivity(ctx, "main.orderCherryActivity", orderChoice) logger.Info("Workflow completed.") return nil } -func getOrderActivity() (string, error) { - fmt.Printf("Order is for Cherry") - return "cherry", nil -} - -func getAppleOrderActivity() (string, error) { +func getBananaOrderActivity() (string, error) { fmt.Printf("Order is for Apple") return "apple", nil } -func orderAppleActivity(choice string) error { - fmt.Printf("Order choice: %v\n", choice) - return nil -} - func orderBananaActivity(choice string) error { fmt.Printf("Order choice: %v\n", choice) return nil diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index 382f1c4d2..0f7ee2874 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -35,6 +35,7 @@ import ( "go.uber.org/cadence/workflow" ) +// Basic happy paths func TestReplayWorkflowHistoryFromFile(t *testing.T) { for _, testFile := range []string{"basic.json", "basic_new.json", "version.json", "version_new.json"} { t.Run("replay_"+strings.Split(testFile, ".")[0], func(t *testing.T) { @@ -48,6 +49,7 @@ func TestReplayWorkflowHistoryFromFile(t *testing.T) { } } +// Child workflow happy path func TestReplayChildWorkflowBugBackport(t *testing.T) { replayer := worker.NewWorkflowReplayer() replayer.RegisterWorkflowWithOptions(childWorkflow, workflow.RegisterOptions{Name: "child"}) @@ -62,9 +64,10 @@ func TestGreetingsWorkflowforActivity(t *testing.T) { replayer := worker.NewWorkflowReplayer() replayer.RegisterWorkflowWithOptions(greetingsWorkflowActivity, workflow.RegisterOptions{Name: "greetings"}) err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "greetings.json") - require.Error(t, err) + assert.ErrorContains(t, err, "nondeterministic workflow: mismatching history event and replay decision found") } +// Simple greeting workflow with 3 activities executed sequentially: getGreetingsActivity, getNameActivity, sayGreetingsActivity func TestGreetingsWorkflow(t *testing.T) { replayer := worker.NewWorkflowReplayer() replayer.RegisterWorkflowWithOptions(greetingsWorkflow, workflow.RegisterOptions{Name: "greetings"}) @@ -72,7 +75,7 @@ func TestGreetingsWorkflow(t *testing.T) { require.NoError(t, err) } -// Should have failed but passed. Maybe, because the result recorded in history still matches the return type of the workflow. +// Return types of activity change is not considered non-determinism (at least for now) so this test doesn't find non-determinism error func TestGreetingsWorkflow3(t *testing.T) { replayer := worker.NewWorkflowReplayer() replayer.RegisterActivityWithOptions(getNameActivity3, activity.RegisterOptions{Name: "main.getNameActivity", DisableAlreadyRegisteredCheck: true}) @@ -81,92 +84,110 @@ func TestGreetingsWorkflow3(t *testing.T) { require.NoError(t, err) } -// Fails because the expected signature was different from history. -func TestGreetingsWorkflow4(t *testing.T) { +// The recorded history has following activities in this order: main.getOrderActivity, main.orderBananaActivity +// This test runs a version of choice workflow which does the exact same thing so no errors expected. +func TestExclusiveChoiceWorkflowSuccess(t *testing.T) { replayer := worker.NewWorkflowReplayer() - replayer.RegisterActivityWithOptions(getNameActivity4, activity.RegisterOptions{Name: "main.getNameActivity", DisableAlreadyRegisteredCheck: true}) - replayer.RegisterWorkflowWithOptions(greetingsWorkflow4, workflow.RegisterOptions{Name: "greetings"}) - err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "greetings.json") - require.Error(t, err) + replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow, workflow.RegisterOptions{Name: "choice"}) + replayer.RegisterActivityWithOptions(getBananaOrderActivity, activity.RegisterOptions{Name: "main.getOrderActivity"}) + replayer.RegisterActivityWithOptions(orderBananaActivity, activity.RegisterOptions{Name: "main.orderBananaActivity"}) + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json") + require.NoError(t, err) } -// Panic with failed to register activity. This passes in cadence_samples because it's registered in Helper. -// To test it on cadence_samples change the https://github.com/uber-common/cadence-samples/blob/master/cmd/samples/recipes/greetings/greetings_workflow.go -// to include the extra return types in getNameActivity. -func TestGreetingsWorkflow2(t *testing.T) { - - t.Skip("Panic with failed to register activity. Here the activity returns incompatible arguments so the test should fail") +// The recorded history has following activities in this order: main.getOrderActivity, main.orderBananaActivity +// This test runs a version of choice workflow which does the exact same thing but the activities are not registered. +// It doesn't matter for replayer so no exceptions expected. +// The reason is that activity result decoding logic just passes the result back to the given pointer +func TestExclusiveChoiceWorkflowActivitiyRegistrationMissing(t *testing.T) { replayer := worker.NewWorkflowReplayer() - replayer.RegisterActivityWithOptions(getNameActivity2, activity.RegisterOptions{Name: "main.getNameActivity", DisableAlreadyRegisteredCheck: true}) - replayer.RegisterWorkflowWithOptions(greetingsWorkflow2, workflow.RegisterOptions{Name: "greetings"}) - err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "greetings.json") - require.Error(t, err) + replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow, workflow.RegisterOptions{Name: "choice"}) + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json") + require.NoError(t, err) } -// Ideally replayer doesn't concern itself with the change in the activity content until it matches the expected output type. -// History has recorded the output of banana activity instead. The replayer should have failed because we have not registered any -// activity here in the test. -// The replayer still runs whatever it found in the history and passes. -func TestExclusiveChoiceWorkflowWithUnregisteredActivity(t *testing.T) { +// The recorded history has following activities in this order: main.getOrderActivity, main.orderBananaActivity +// This test runs a version of choice workflow which registers a single return parameter function for main.getOrderActivity +// - Original main.getOrderActivity signature: func() (string, error) +// - New main.getOrderActivity signature: func() error +// +// In this case result of main.getOrderActivity from history is not passed back to the given pointer by the workflow. +// Compared to the activity registration missing scenario (above case) this is a little bit weird behavior. +// The workflow code continues with orderChoice="" instead of "banana". Therefore it doesn't invoke 2nd activity main.getOrderActivity. +// This means history has more events then replay decisions which causes non-determinism error +func TestExclusiveChoiceWorkflowWithActivitySignatureChange(t *testing.T) { replayer := worker.NewWorkflowReplayer() - replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow, workflow.RegisterOptions{Name: "choice"}) + replayer.RegisterActivityWithOptions(func() error { return nil }, activity.RegisterOptions{Name: "main.getOrderActivity"}) + replayer.RegisterActivityWithOptions(orderBananaActivity, activity.RegisterOptions{Name: "main.orderBananaActivity"}) err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json") - require.NoError(t, err) + assert.ErrorContains(t, err, "nondeterministic workflow: missing replay decision") } -// This test registers Cherry Activity as the activity but calls Apple activity in the workflow code. Infact, Cherry and Banana -// activities are not even a part of the workflow code in question. -// History has recorded the output of banana activity. Here, The workflow is not waiting for the activity so it doesn't notice -// that registered activity is different from executed activity. -// The replayer relies on whatever is recorded in the History so as long as the main activity name in the options matched partially -// it doesn't raise errors. -func TestExclusiveChoiceWorkflowWithDifferentActvityCombo(t *testing.T) { +// The recorded history has following activities in this order: main.getOrderActivity, main.orderBananaActivity +// This test runs a version of choice workflow which calls main.getOrderActivity and then calls the main.orderCherryActivity. +// The replayer will find non-determinism because of mismatch between replay decision and history (banana vs cherry) +func TestExclusiveChoiceWorkflowWithMismatchingActivity(t *testing.T) { replayer := worker.NewWorkflowReplayer() - - replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflow2, workflow.RegisterOptions{Name: "choice"}) - replayer.RegisterActivityWithOptions(getAppleOrderActivity, activity.RegisterOptions{Name: "main.getOrderActivity"}) - replayer.RegisterActivityWithOptions(orderAppleActivity, activity.RegisterOptions{Name: "testactivity"}) + replayer.RegisterWorkflowWithOptions(exclusiveChoiceWorkflowAlwaysCherry, workflow.RegisterOptions{Name: "choice"}) + replayer.RegisterActivityWithOptions(getBananaOrderActivity, activity.RegisterOptions{Name: "main.getOrderActivity"}) + replayer.RegisterActivityWithOptions(orderCherryActivity, activity.RegisterOptions{Name: "main.orderCherryActivity"}) err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "choice.json") - require.NoError(t, err) + assert.ErrorContains(t, err, "nondeterministic workflow: mismatching history event and replay decision found") } +// Branch workflow happy case. +// It branches out to 3 open activities and then they complete. func TestBranchWorkflow(t *testing.T) { replayer := worker.NewWorkflowReplayer() - replayer.RegisterWorkflowWithOptions(sampleBranchWorkflow, workflow.RegisterOptions{Name: "branch"}) - err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch.json") require.NoError(t, err) } -// Fails with a non deterministic error because there was an additional unexpected branch. Decreasing the number of branches will -// also fail the test because the history expects the same number of branches executing the activity. +// Branch workflow normal history file is replayed against modified workflow code which +// has 2 branches only. This causes nondetereministic error. func TestBranchWorkflowWithExtraBranch(t *testing.T) { replayer := worker.NewWorkflowReplayer() - replayer.RegisterWorkflowWithOptions(sampleBranchWorkflow2, workflow.RegisterOptions{Name: "branch"}) - err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch.json") - assert.ErrorContains(t, err, "nondeterministic workflow") + assert.ErrorContains(t, err, "nondeterministic workflow: missing replay decision") } -func TestParallel(t *testing.T) { +// TestSequentialStepsWorkflow replays a history with 2 sequential non overlapping activity calls (one completes before the other is scheduled) +// and runs it against new version of the workflow code which only calls 1 activity. +// This is considered as non-determinism error. +func TestSequentialStepsWorkflow(t *testing.T) { replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflowWithOptions(replayerHelloWorldWorkflow, workflow.RegisterOptions{Name: "fx.ReplayerHelloWorldWorkflow"}) + replayer.RegisterActivityWithOptions(replayerHelloWorldActivity, activity.RegisterOptions{Name: "replayerhello"}) + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "sequential.json") + assert.ErrorContains(t, err, "nondeterministic workflow: missing replay decision") +} +// Runs simpleParallelWorkflow which starts two workflow.Go routines that executes 1 and 2 activities respectively. +func TestSimpleParallelWorkflow(t *testing.T) { + replayer := worker.NewWorkflowReplayer() replayer.RegisterWorkflowWithOptions(sampleParallelWorkflow, workflow.RegisterOptions{Name: "branch2"}) - err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json") require.NoError(t, err) } -// Should have failed since the first go routine has only one branch whereas the history has two branches. -// The replayer totally misses this change. -func TestParallel2(t *testing.T) { +// Runs modified version of simpleParallelWorkflow which starts 1 less activity in the second workflow-gouroutine. +// This is considered as non-determinism error. +func TestSimpleParallelWorkflowWithMissingActivityCall(t *testing.T) { replayer := worker.NewWorkflowReplayer() - replayer.RegisterWorkflowWithOptions(sampleParallelWorkflow2, workflow.RegisterOptions{Name: "branch2"}) - err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json") - require.NoError(t, err) + assert.ErrorContains(t, err, "nondeterministic workflow: missing replay decision") +} + +// Runs a history which ends with WorkflowExecutionContinuedAsNew. Replay fails because of the additional checks done +// for continue as new case by replayWorkflowHistory(). +// This should not have any error because it's a valid continue as new case. +func TestContinueAsNew(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflowWithOptions(ContinueAsNewWorkflow, workflow.RegisterOptions{Name: "fx.SimpleSignalWorkflow"}) + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "continue_as_new.json") + assert.ErrorContains(t, err, "replay workflow doesn't return the same result as the last event") } diff --git a/test/replaytests/sequential.json b/test/replaytests/sequential.json new file mode 100644 index 000000000..b52eecdde --- /dev/null +++ b/test/replaytests/sequential.json @@ -0,0 +1,230 @@ +[ + { + "eventId": 1, + "timestamp": 1697648630382933224, + "eventType": "WorkflowExecutionStarted", + "taskId": 3145798, + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "fx.ReplayerHelloWorldWorkflow" + }, + "taskList": { + "name": "fx-worker" + }, + "input": "eyJNZXNzYWdlIjoiaGVsbG8gcmVwbGF5ZXIifQ==", + "executionStartToCloseTimeoutSeconds": 60, + "taskStartToCloseTimeoutSeconds": 60, + "originalExecutionRunId": "dadbe958-4159-4762-88e7-6b352b4cccff", + "identity": "cadence-cli@taylan-trial", + "firstExecutionRunId": "dadbe958-4159-4762-88e7-6b352b4cccff", + "firstDecisionTaskBackoffSeconds": 0, + "PartitionConfig": null + } + }, + { + "eventId": 2, + "timestamp": 1697648630382957815, + "eventType": "DecisionTaskScheduled", + "taskId": 3145799, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "fx-worker" + }, + "startToCloseTimeoutSeconds": 60 + } + }, + { + "eventId": 3, + "timestamp": 1697648630401121943, + "eventType": "DecisionTaskStarted", + "taskId": 3145804, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 2, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "requestId": "20ca16c3-b13e-4c72-85f9-bd79384d430e" + } + }, + { + "eventId": 4, + "timestamp": 1697648630412706912, + "eventType": "DecisionTaskCompleted", + "taskId": 3145807, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 2, + "startedEventId": 3, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "binaryChecksum": "uDeploy:" + } + }, + { + "eventId": 5, + "timestamp": 1697648630412753262, + "eventType": "ActivityTaskScheduled", + "taskId": 3145808, + "activityTaskScheduledEventAttributes": { + "activityId": "0", + "activityType": { + "name": "replayerhello" + }, + "taskList": { + "name": "fx-worker" + }, + "input": "eyJNZXNzYWdlIjoiaGVsbG8gcmVwbGF5ZXIifQo=", + "scheduleToCloseTimeoutSeconds": 60, + "scheduleToStartTimeoutSeconds": 60, + "startToCloseTimeoutSeconds": 60, + "heartbeatTimeoutSeconds": 0, + "decisionTaskCompletedEventId": 4, + "header": {} + } + }, + { + "eventId": 6, + "timestamp": 1697648630412771832, + "eventType": "ActivityTaskStarted", + "taskId": 3145809, + "activityTaskStartedEventAttributes": { + "scheduledEventId": 5, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "requestId": "6adca0b2-bd1e-4877-ac44-c3fdf45784c6", + "lastFailureReason": "" + } + }, + { + "eventId": 7, + "timestamp": 1697648630422944301, + "eventType": "ActivityTaskCompleted", + "taskId": 3145812, + "activityTaskCompletedEventAttributes": { + "result": "IkhlbGxvLCBoZWxsbyByZXBsYXllciEiCg==", + "scheduledEventId": 5, + "startedEventId": 6, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401" + } + }, + { + "eventId": 8, + "timestamp": 1697648630422957221, + "eventType": "DecisionTaskScheduled", + "taskId": 3145814, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "taylan-trial:4037e064-7565-4716-8169-e97eb2449f32" + }, + "startToCloseTimeoutSeconds": 60 + } + }, + { + "eventId": 9, + "timestamp": 1697648630432583300, + "eventType": "DecisionTaskStarted", + "taskId": 3145818, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 8, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "requestId": "6eea9a40-e738-4a9b-9de3-30cc30af3597" + } + }, + { + "eventId": 10, + "timestamp": 1697648630442893019, + "eventType": "DecisionTaskCompleted", + "taskId": 3145821, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 8, + "startedEventId": 9, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "binaryChecksum": "uDeploy:" + } + }, + { + "eventId": 11, + "timestamp": 1697648630442935059, + "eventType": "ActivityTaskScheduled", + "taskId": 3145822, + "activityTaskScheduledEventAttributes": { + "activityId": "1", + "activityType": { + "name": "replayerhello" + }, + "taskList": { + "name": "fx-worker" + }, + "input": "eyJNZXNzYWdlIjoiaGVsbG8gcmVwbGF5ZXIifQo=", + "scheduleToCloseTimeoutSeconds": 60, + "scheduleToStartTimeoutSeconds": 60, + "startToCloseTimeoutSeconds": 60, + "heartbeatTimeoutSeconds": 0, + "decisionTaskCompletedEventId": 10, + "header": {} + } + }, + { + "eventId": 12, + "timestamp": 1697648630442953489, + "eventType": "ActivityTaskStarted", + "taskId": 3145823, + "activityTaskStartedEventAttributes": { + "scheduledEventId": 11, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "requestId": "f44f0ef6-b40a-479f-96e2-3a5093433996", + "lastFailureReason": "" + } + }, + { + "eventId": 13, + "timestamp": 1697648630451389698, + "eventType": "ActivityTaskCompleted", + "taskId": 3145826, + "activityTaskCompletedEventAttributes": { + "result": "IkhlbGxvLCBoZWxsbyByZXBsYXllciEiCg==", + "scheduledEventId": 11, + "startedEventId": 12, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401" + } + }, + { + "eventId": 14, + "timestamp": 1697648630451401018, + "eventType": "DecisionTaskScheduled", + "taskId": 3145828, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "taylan-trial:4037e064-7565-4716-8169-e97eb2449f32" + }, + "startToCloseTimeoutSeconds": 60 + } + }, + { + "eventId": 15, + "timestamp": 1697648630460950187, + "eventType": "DecisionTaskStarted", + "taskId": 3145832, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 14, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "requestId": "51123563-8c56-442a-9583-5f6b2ae46643" + } + }, + { + "eventId": 16, + "timestamp": 1697648630471749886, + "eventType": "DecisionTaskCompleted", + "taskId": 3145835, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 14, + "startedEventId": 15, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "binaryChecksum": "uDeploy:" + } + }, + { + "eventId": 17, + "timestamp": 1697648630471777786, + "eventType": "WorkflowExecutionCompleted", + "taskId": 3145836, + "workflowExecutionCompletedEventAttributes": { + "decisionTaskCompletedEventId": 16 + } + } +] diff --git a/test/replaytests/sequential_workflow.go b/test/replaytests/sequential_workflow.go new file mode 100644 index 000000000..dd49b6ed3 --- /dev/null +++ b/test/replaytests/sequential_workflow.go @@ -0,0 +1,56 @@ +package replaytests + +import ( + "context" + "fmt" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +type replayerSampleMessage struct { + Message string +} + +// replayerHelloWorldWorkflow is a sample workflow that runs replayerHelloWorldActivity. +// In the previous version it was running the activity twice, sequentially. +// History of a past execution is in sequential.json +// Corresponding unit test covers the scenario that new workflow's history records is subset of previous version's history. +// +// v1: wf started -> call activity -> call activity -> wf complete +// v2: wf started -> call activity -> wf complete +// +// The v2 clearly has determinism issues and should be considered as non-determism error for replay tests. +func replayerHelloWorldWorkflow(ctx workflow.Context, inputMsg *replayerSampleMessage) error { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + logger := workflow.GetLogger(ctx) + logger.Info("executing replayerHelloWorldWorkflow") + ctx = workflow.WithActivityOptions(ctx, ao) + + count := 1 + for i := 0; i < count; i++ { + var greeting string + err := workflow.ExecuteActivity(ctx, replayerHelloWorldActivity, inputMsg).Get(ctx, &greeting) + if err != nil { + logger.Error("replayerHelloWorldActivity is broken ", zap.Error(err)) + return err + } + + logger.Sugar().Infof("replayerHelloWorldWorkflow is greeting to you %dth time -> ", i, greeting) + } + + return nil +} + +// replayerHelloWorldActivity takes a sample message input and return a greeting to the caller. +func replayerHelloWorldActivity(ctx context.Context, inputMsg *replayerSampleMessage) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("executing replayerHelloWorldActivity") + + return fmt.Sprintf("Hello, %s!", inputMsg.Message), nil +}