Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address false positive non-determinism scenarios during replay #1281

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func (e *NonDeterministicError) Error() string {
case "mismatch":
// historical text
return "nondeterministic workflow: " +
"mismatching history event and replay decision found. " +
"history event is " + e.HistoryEventText + ", " +
"replay decision is " + e.DecisionText
default:
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,10 @@ func deSerializeFunctionResult(f interface{}, result []byte, to interface{}, dat
}

// For everything we return result.
// Code reaches here for 2 cases:
// 1. activity is executed by name (not the func pointer) and it wasn't registered
// 2. activity is executed by func pointer and the signature indicates it doesn't/can't return data.
// for example it only has one return parameter (which can only be be error).
return decodeArg(dataConverter, result, to)
}
Comment on lines 434 to 440
Copy link
Contributor

@Groxx Groxx Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 is incorrect, a string-executed activity that doesn't return data still calls deSerializeFnResultFromFnType and returns at line 430, doing nothing. It doesn't reach this line.


Also huh. On closer look, this whole chain of code is pretty strange:

  • deSerializeFunctionResult is only called if result is non-nil
  • deSerializeFnResultFromFnType is only called by this func, and it does nothing if result is nil
  • none of this needs the registered func type at all:
    • if everything agrees, the type does nothing to contribute to the behavior
    • if there if there is data + an out param + the registered activity does not return data, the earlier code silently does nothing (the data exists but is inaccessible)
    • if it's not registered, it falls back to just decodeArg here, which is what deSerializeFnResultFromFnType does anyway

... I think deSerializeFunctionResult might be more-correctly replaced with just a call to decodeArg instead.

(this doesn't really belong in this PR IMO, it's just an odd discovery)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I need to re-debug this to remember how I ended up finding those 2 cases. Maybe I meant to add this somewhere else. From the code in this function it seems 2 is not possible.


Expand Down
61 changes: 58 additions & 3 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,28 @@ func isDecisionEvent(eventType s.EventType) bool {
}
}

// isDecisionEventForReplay is different from isDecisionEvent because during replays
// we want to intentionally ignore workflow complete/fail/cancel/continueasnew events so that
// decision tree replays matches with the workflow processing respond tasks
func isDecisionEventForReplay(eventType s.EventType) bool {
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
switch eventType {
case
s.EventTypeActivityTaskScheduled,
s.EventTypeActivityTaskCancelRequested,
s.EventTypeTimerStarted,
s.EventTypeTimerCanceled,
s.EventTypeCancelTimerFailed,
s.EventTypeMarkerRecorded,
s.EventTypeStartChildWorkflowExecutionInitiated,
s.EventTypeRequestCancelExternalWorkflowExecutionInitiated,
s.EventTypeSignalExternalWorkflowExecutionInitiated,
s.EventTypeUpsertWorkflowSearchAttributes:
return true
default:
return false
}
}
Comment on lines +268 to +285
Copy link
Contributor

@Groxx Groxx Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To stick a comment in here as a random representative location, from chats a while back, for visibility:
I generally like this approach and I think it stands a pretty good chance of being an improvement...

... but I'm not (yet) confident that it's correct, nor that it doesn't worsen other scenarios. It's a pretty substantial change despite how small it is. From vague memory of my last attempt to verify, I ended up with more concerns than I started with, but no real evidence in either direction :\

Some of ^ this is "I'm not entirely sure what the end result is during replay" and some is "it's extremely hard to verify that this does not impact normal replays / restores original behavior when disabled" (because the code here is extremely convoluted and stateful). E.g. it could be written in a more obviously-safe way than it is, which would make the latter concern go away, but perhaps this is a cleaner end result than that would be. The former is... hard though.

It may be fine even if it does introduce new flaws, it's definitely not perfect now, but we need to have an informed decision. The good news is that we now have a LOT more internal replay-shadow-tests (compared to when this was first made) that we can run to discover what has changed - if it finds nothing but improvements against our fairly wide range of behavior in those tests, that will probably be good enough.
I just definitely do not trust our test suite here to prove correctness, they don't even use like half of our features nor do they use them in complicated ways. Issues they find are plenty useful, but they're very strange in a lot of ways and that makes them very very far from evidence of correctness.


So tl;dr: it just has to be sufficiently-validated or fully understood/described.
We've done some validation (thanks @agautam478 !) but we need to do a bit more, and none of the tests here seem like particularly strong evidence for what changed so I need to dig back in and try to reason about it more carefully.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All sounds good to me. At the time I looked into these false positives I focused on a subset of the potential scenarios. I'm not super confident this is completely safe change even though it looks like it is. Finding more scenarios and adding them as test cases should be next step as discussed offline. Due to other priorities I'm handing this over to you and @agautam478.


// NextDecisionEvents returns events that there processed as new by the next decision.
// TODO(maxim): Refactor to return a struct instead of multiple parameters
func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.HistoryEvent, binaryChecksum *string, err error) {
Expand Down Expand Up @@ -840,6 +862,19 @@ process_Workflow_Loop:
return response, err
}

// ProcessWorkflowTask processes the given workflow which includes
// - fetching, reordering and replaying historical decision events. (Decision events in this context is an umbrella term for workflow relevant events)
// - state machine is incrementally built with every decision.
// - state machine makes sure that when a workflow restarts for some reason same activities (or timers etc.) are not called again and previous result state is loaded into memory
//
// Note about Replay tests mode:
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
//
// This mode works by replaying the historical decision events responses (as defined in isDecisionEventForReplay())
// and comparing these with the replays gotten from state machine
//
// Compared to isDecisionEvent(), isDecisionEventForReplay() omits the following events even though they are workflow relevant respond events:
// complete/failed/cancel/continueasnew
// The reason is that state machine doesn't have a correspondong decision for these so they cause false positive non-determinism errors in Replay tests.
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
task := workflowTask.task
historyIterator := workflowTask.historyIterator
Expand Down Expand Up @@ -899,8 +934,16 @@ ProcessEvents:
for i, event := range reorderedEvents {
isInReplay := reorderedHistory.IsReplayEvent(event)
isLast := !isInReplay && i == len(reorderedEvents)-1
if !skipReplayCheck && isDecisionEvent(event.GetEventType()) {
respondEvents = append(respondEvents, event)
if !skipReplayCheck {
isDecisionEventFn := isDecisionEvent
// when strict nondeterminism is enabled we use a different function to check for decision events during replay
if !w.wth.disableStrictNonDeterminism && isInReplay {
isDecisionEventFn = isDecisionEventForReplay
}

if isDecisionEventFn(event.GetEventType()) {
respondEvents = append(respondEvents, event)
}
}

if isPreloadMarkerEvent(event) {
Expand All @@ -918,7 +961,16 @@ ProcessEvents:
if err != nil {
return nil, err
}
if w.isWorkflowCompleted {

// Break the event processing loop if either
// - Workflow is completed AND strict nondeterminism checks disabled.
// - Workflow is completed AND strict nondeterminism checks enabled AND NOT in replay mode.
// With strict nondeterminism checks enabled, breaking the loop early causes missing events
// in respondEvents which then causes false positives or false negatives.
stopProcessing := (w.isWorkflowCompleted && w.wth.disableStrictNonDeterminism) ||
(w.isWorkflowCompleted && !w.wth.disableStrictNonDeterminism && !isInReplay)

if stopProcessing {
break ProcessEvents
}
}
Expand All @@ -936,6 +988,9 @@ ProcessEvents:
}
}
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
// incomplete decisions (e.g. start without a complete) at the end of history will still have decisions in decisionsHelper
// but there won't be corresponding respond events. This breaks the non-determinism check therefore we ignore such final partial decisions.
// Example scenario is covered by TestReplayWorkflowHistory_Partial_NoDecisionEvents
lastDecisionEventsForReplayTest := isReplayTest && !reorderedHistory.HasNextDecisionEvents()
if isReplay && !lastDecisionEventsForReplayTest {
eventDecisions := eventHandler.decisionsHelper.getDecisions(true)
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,9 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicLogNonexistingI
require.NotNil(t.T(), replayErrorField)
require.Equal(t.T(), zapcore.ErrorType, replayErrorField.Type)
require.ErrorContains(t.T(), replayErrorField.Interface.(error),
"nondeterministic workflow: "+
"nondeterministic workflow: mismatching history event and replay decision found. "+
"history event is ActivityTaskScheduled: (ActivityId:NotAnActivityID, ActivityType:(Name:pkg.Greeter_Activity), TaskList:(Name:taskList), Input:[]), "+
"replay decision is ScheduleActivityTask: (ActivityId:0, ActivityType:(Name:Greeter_Activity), TaskList:(Name:taskList)")
"replay decision is ScheduleActivityTask: (ActivityId:0, ActivityType:(Name:Greeter_Activity), TaskList:(Name:taskList), Input:[], ScheduleToCloseTimeoutSeconds:120, ScheduleToStartTimeoutSeconds:60, StartToCloseTimeoutSeconds:60, HeartbeatTimeoutSeconds:20, Header:(Fields:map[]))")
}

func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowReturnsPanicError() {
Expand Down
17 changes: 17 additions & 0 deletions internal/workflow_replayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,23 @@ func (s *workflowReplayerSuite) TestReplayWorkflowHistory_Partial_WithDecisionEv
s.NoError(err)
}

// This test case covers partial decision scenario where a decision is started but not closed
// History:
//
// 1: WorkflowExecutionStarted
// 2: DecisionTaskScheduled
// 3: DecisionTaskStarted
// 4: DecisionTaskFailed
// 5: DecisionTaskScheduled
// 6: DecisionTaskStarted
//
// Notes on task handling logic during replay:
//
// reorderedHistory.NextDecisionEvents() ignores events 2, 3, 4 because it failed.
// it only returns 1 and 6 to be replayed.
// 6 changes the state in decisionsHelper (generates a decision) however there's no corresponding
// respond due to missing close event (failed/complete etc.)
// Such partial decisions at the end of the history is ignored during replay tests to avoid non-determinism error
func (s *workflowReplayerSuite) TestReplayWorkflowHistory_Partial_NoDecisionEvents() {
err := s.replayer.ReplayWorkflowHistory(s.logger, getTestReplayWorkflowPartialHistoryNoDecisionEvents(s.T()))
s.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion test/replaytests/branch_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func sampleBranchWorkflow2(ctx workflow.Context) error {
}
ctx = workflow.WithActivityOptions(ctx, ao)

for i := 1; i <= 4; i++ {
for i := 1; i <= 2; i++ {
activityInput := fmt.Sprintf("branch %d of 4", i)
future := workflow.ExecuteActivity(ctx, sampleActivity, activityInput)
futures = append(futures, future)
Expand Down
29 changes: 28 additions & 1 deletion test/replaytests/choice.json
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,34 @@
},
{
"eventId": 12,
"timestamp": 1679427717321911295,
"timestamp": 1679427717321780254,
"eventType": "ActivityTaskStarted",
"version": 0,
"taskId": 5243011,
"activityTaskStartedEventAttributes": {
"scheduledEventId": 11,
"identity": "82203@agautam-NV709R969P@choiceGroup@41d230ae-253a-4d01-9079-322ef05c09fb",
"requestId": "ae2aad96-6588-4359-807b-a39a16f0896a",
"attempt": 0,
"lastFailureReason": ""
}
},
{
"eventId": 13,
"timestamp": 1679427717321780255,
"eventType": "ActivityTaskCompleted",
Copy link
Contributor

@Groxx Groxx Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add the completed event?

though reading more of this file: I don't think this is a realistic history... "completed" implies "ended normally, i.e. by returning" but we don't create activity-task-scheduled events for activities executed during the final decision (since their result cannot be observed)...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it wasn't a realistic history because it's missing the final decision task scheduled->started->completed events. I'm not quite getting why adding the activity completion looks invalid if that's what you meant

Copy link
Contributor

@Groxx Groxx Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, there are only 2 ways workflows can end:

  • return normally from the code (success or error) as the result of a completed decision task
  • cut off externally due to timeout/termination/etc

and everything that affects ^ this occurs in transactions (no interleaving between these):

  • a decision-complete and all its operations go in as one chunk
  • an external event that the workflow can observe goes in with the event and a decision-task-schedule to process it
    • activity complete, child start or complete, timer fire, etc
  • an external event that cannot be observed just gets appended
    • decision task start, activity start, etc
  • external cutoffs write the terminal event and close the workflow (could be converted into a continue-as-new due to cron or retry)

in this history we originally had:

  • decision task schedule->start->complete
  • activity scheduled
  • workflow complete
    • a normal one, which means it returned, which means this was the result of the most-recent decision task.

^ this is possible because, unlike child workflows, we don't "prune" activity-scheduled-s that are created in the final decision task. (we should consider doing that tbh, or keeping both for easier troubleshooting. I forget exactly how child workflows do this, but I suspect the workflow-end cancels the context -> cancels the state machine -> case decisionStateCreated: d.moveState(decisionStateCompleted, eventCancel) moves it out of a send-able state)

So in this final decision task, we executed one activity, did not wait on it, and returned immediately. Code like this would result in that:

func work(ctx workflow.Context) error {
  // ...
  workflow.ExecuteActivity(ctx, "ignored") // non-blocking
  return nil
}

As of the current PR, we have this sequence:

  • decision task schedule->start->complete
  • activity scheduled
  • activity started
  • activity complete
  • workflow complete (still normal)

which is not possible.

it would mean we blocked after scheduling (this is necessary to submit it to the server, which is a necessary prerequisite to the activity starting)... which is fine because workflow.ExecuteActivity(...).Get(..) would end the batch at "scheduled".

however.

after that, asynchronously, the activity was started (this can be an isolated event because it's not observable, that's fine), and then completed (this requires both complete and decision-task-scheduled because it is observable, so already it's wrong), and then also the workflow function returned on its own without a decision task to learn about that result.

The only possible outcomes that include an activity that was started are things like either:

  • activity started -> activity completed -> decision scheduled -> [started -> completed (both or neither)] -> workflow completed
    • i.e. a decision ran and decided to end the workflow
  • activity started -> workflow cut off externally (timeout, terminate) without a decision task
    • it cannot be a "completed" type (with a possible exception for a continue-as-new but that isn't in this history either)

because "activity completed -> workflow completed" would be splitting the transaction that "activity completed" is involved in, at the very least.


so it's clearly hand-manipulated rather than recorded, because the hand-manipulation produces an impossible sequence. that would be fine for a regression-like "we had a bug that produced these histories, and we want to ensure it behaves like X" tests... but it doesn't help prove much of anything about how non-corrupted workflows behave, and that's pretty much the only purpose of this folder + PR.

Copy link
Contributor

@Groxx Groxx Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh I'm kinda surprised it doesn't panic with an illegal transition or something. But it would be an easy check to miss in the state machine too (particularly because it's more "a collection of many sub-state-machines" rather than enforcing rules in the spaces between them)

"version": 0,
"taskId": 5243000,
"activityTaskCompletedEventAttributes": {
"result": "ImJhbmFuYSIK",
"scheduledEventId": 11,
"startedEventId": 12,
"identity": "82203@agautam-NV709R969P@choiceGroup@41d230ae-253a-4d01-9079-322ef05c09fb"
}
},
{
"eventId": 14,
"timestamp": 1679427717321780256,
"eventType": "WorkflowExecutionCompleted",
"version": 0,
"taskId": 5243011,
Expand Down
90 changes: 90 additions & 0 deletions test/replaytests/continue_as_new.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
[
{
"eventId": 1,
"timestamp": 1699856700704442400,
"eventType": "WorkflowExecutionStarted",
"version": 4,
"taskId": 882931375,
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "fx.SimpleSignalWorkflow"
},
"taskList": {
"name": "fx-worker"
},
"executionStartToCloseTimeoutSeconds": 600,
"taskStartToCloseTimeoutSeconds": 10,
"continuedExecutionRunId": "a664f402-bfe9-4739-945c-9cbc637548f1",
"initiator": "CronSchedule",
"continuedFailureReason": "cadenceInternal:Timeout START_TO_CLOSE",
"originalExecutionRunId": "d0baf930-6a83-4740-b773-71aaa696eed1",
"firstExecutionRunId": "e85fa1b9-8899-40ce-8af9-7e0f93ed7ae5",
"firstScheduleTimeNano": "2023-05-22T15:45:26.535595761-07:00",
"cronSchedule": "* * * * *",
"firstDecisionTaskBackoffSeconds": 60,
"PartitionConfig": {
"isolation-group": "dca11"
}
}
},
{
"eventId": 2,
"timestamp": 1699856760713586608,
"eventType": "DecisionTaskScheduled",
"version": 4,
"taskId": 882931383,
"decisionTaskScheduledEventAttributes": {
"taskList": {
"name": "fx-worker"
},
"startToCloseTimeoutSeconds": 10
}
},
{
"eventId": 3,
"timestamp": 1699856760741837021,
"eventType": "DecisionTaskStarted",
"version": 4,
"taskId": 882931387,
"decisionTaskStartedEventAttributes": {
"scheduledEventId": 2,
"identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4",
"requestId": "bb0ee926-13d1-4af4-9f9c-51433333ad04"
}
},
{
"eventId": 4,
"timestamp": 1699856760773459755,
"eventType": "DecisionTaskCompleted",
"version": 4,
"taskId": 882931391,
"decisionTaskCompletedEventAttributes": {
"scheduledEventId": 2,
"startedEventId": 3,
"identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4",
"binaryChecksum": "uDeploy:dc3e318b30a49e8bb88f462a50fe3a01dd210a3a"
}
},
{
"eventId": 5,
"timestamp": 1699857360713649962,
"eventType": "WorkflowExecutionContinuedAsNew",
"version": 4,
"taskId": 882931394,
"workflowExecutionContinuedAsNewEventAttributes": {
"newExecutionRunId": "06c2468c-2d2d-44f7-ac7a-ff3c383f6e90",
"workflowType": {
"name": "fx.SimpleSignalWorkflow"
},
"taskList": {
"name": "fx-worker"
},
"executionStartToCloseTimeoutSeconds": 600,
"taskStartToCloseTimeoutSeconds": 10,
"decisionTaskCompletedEventId": -23,
"backoffStartIntervalInSeconds": 60,
"initiator": "CronSchedule",
"failureReason": "cadenceInternal:Timeout START_TO_CLOSE"
}
}
]
26 changes: 26 additions & 0 deletions test/replaytests/continue_as_new_wf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package replaytests

import (
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
)

// ContinueAsNewWorkflow is a sample Cadence workflows that can receive a signal
func ContinueAsNewWorkflow(ctx workflow.Context) error {
selector := workflow.NewSelector(ctx)
var signalResult string
signalName := "helloWorldSignal"
for {
signalChan := workflow.GetSignalChannel(ctx, signalName)
selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
c.Receive(ctx, &signalResult)
workflow.GetLogger(ctx).Info("Received age signalResult from signal!", zap.String("signal", signalName), zap.String("value", signalResult))
})
workflow.GetLogger(ctx).Info("Waiting for signal on channel.. " + signalName)
// Wait for signal
selector.Select(ctx)
if signalResult == "kill" {
return nil
}
}
}
Comment on lines +9 to +26
Copy link
Contributor

@Groxx Groxx Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the only history this is running on is the continue_as_new.json, this could be just

func ContinueAsNewWorkflow(ctx workflow.Context) error {
	return workflow.GetSignalChannel(ctx, "unused").Receive(ctx, nil)
}

and it would be exactly equivalent: it decides to do nothing, waiting on an outside event to occur. not sure what the added complexity is buying us, unless it triggers some specific edge case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik, continue ask new cases fail for only those scenarios where a workflow was marked continue as new and then the history has nothing after continue as new decision.
This looks like an attempt to cover the edge case where sometimes the workflow continues as new after receiving a signal. the continue as new case has been pretty inconsistent to begin with.

I would say let's keep the original behavior here to preserve the original scenario where we first observed this bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no signal in the history though, so it can't be a "received one, did not receive a second" kind of test. anything that waits forever without recording an event should be equivalent (signal chan, regular chan, waitgroup, etc all would work)

35 changes: 10 additions & 25 deletions test/replaytests/exclusive_choice_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
orderChoiceCherry = "cherry"
)

// exclusiveChoiceWorkflow Workflow Decider. This workflow executes Cherry order.
// exclusiveChoiceWorkflow executes main.getOrderActivity and executes either cherry or banana activity depends on what main.getOrderActivity returns
func exclusiveChoiceWorkflow(ctx workflow.Context) error {
// Get order.
ao := workflow.ActivityOptions{
Expand All @@ -50,7 +50,7 @@ func exclusiveChoiceWorkflow(ctx workflow.Context) error {
ctx = workflow.WithActivityOptions(ctx, ao)

var orderChoice string
err := workflow.ExecuteActivity(ctx, getOrderActivity).Get(ctx, &orderChoice)
err := workflow.ExecuteActivity(ctx, "main.getOrderActivity").Get(ctx, &orderChoice)
if err != nil {
return err
}
Expand All @@ -60,9 +60,9 @@ func exclusiveChoiceWorkflow(ctx workflow.Context) error {
// choose next activity based on order result
switch orderChoice {
case orderChoiceBanana:
workflow.ExecuteActivity(ctx, orderBananaActivity, orderChoice)
workflow.ExecuteActivity(ctx, "main.orderBananaActivity", orderChoice)
case orderChoiceCherry:
workflow.ExecuteActivity(ctx, orderCherryActivity, orderChoice)
workflow.ExecuteActivity(ctx, "main.orderCherryActivity", orderChoice)
default:
logger.Error("Unexpected order", zap.String("Choice", orderChoice))
}
Expand All @@ -71,8 +71,8 @@ func exclusiveChoiceWorkflow(ctx workflow.Context) error {
return nil
}

// This workflow explicitly executes Apple Activity received from the getorderActivity.
func exclusiveChoiceWorkflow2(ctx workflow.Context) error {
// exclusiveChoiceWorkflow executes main.getOrderActivity and executes either cherry or banana activity depends on what main.getOrderActivity returns
func exclusiveChoiceWorkflowAlwaysCherry(ctx workflow.Context) error {
// Get order.
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
Expand All @@ -82,40 +82,25 @@ func exclusiveChoiceWorkflow2(ctx workflow.Context) error {
ctx = workflow.WithActivityOptions(ctx, ao)

var orderChoice string
err := workflow.ExecuteActivity(ctx, getAppleOrderActivity).Get(ctx, &orderChoice)
err := workflow.ExecuteActivity(ctx, "main.getOrderActivity").Get(ctx, &orderChoice)
if err != nil {
return err
}

logger := workflow.GetLogger(ctx)
logger.Sugar().Infof("Got order for %s but will ignore and order cherry!!", orderChoice)

// choose next activity based on order result. It's apple in this case.
switch orderChoice {
case orderChoiceApple:
workflow.ExecuteActivity(ctx, orderAppleActivity, orderChoice)
default:
logger.Error("Unexpected order", zap.String("Choice", orderChoice))
}
workflow.ExecuteActivity(ctx, "main.orderCherryActivity", orderChoice)

logger.Info("Workflow completed.")
return nil
}

func getOrderActivity() (string, error) {
fmt.Printf("Order is for Cherry")
return "cherry", nil
}

func getAppleOrderActivity() (string, error) {
func getBananaOrderActivity() (string, error) {
fmt.Printf("Order is for Apple")
return "apple", nil
}

func orderAppleActivity(choice string) error {
fmt.Printf("Order choice: %v\n", choice)
return nil
}

func orderBananaActivity(choice string) error {
fmt.Printf("Order choice: %v\n", choice)
return nil
Expand Down
Loading