Skip to content

Commit

Permalink
Fix conditional complete on messages of internalChannels from state APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Nov 22, 2024
1 parent 9739cc1 commit a26f918
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 29 deletions.
26 changes: 14 additions & 12 deletions service/interpreter/globalVersioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ const StartingVersionContinueAsNewOnNoStates = 4
const StartingVersionTemporal26SDK = 5
const StartingVersionExecutingStateIdMode = 6
const StartingVersionNoIwfGlobalVersionSearchAttribute = 7
const MaxOfAllVersions = StartingVersionNoIwfGlobalVersionSearchAttribute
const StartingVersionYieldOnConditionalComplete = 8
const MaxOfAllVersions = StartingVersionYieldOnConditionalComplete

// GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api
type GlobalVersioner struct {
workflowProvider WorkflowProvider
ctx UnifiedContext
version int
OmitVersionMarker bool // indicate the version marker and upsertSearchAttribute is already set at the start of the workflow
workflowProvider WorkflowProvider
ctx UnifiedContext
version int
}

func NewGlobalVersioner(
Expand Down Expand Up @@ -65,20 +65,22 @@ func (p *GlobalVersioner) IsAfterVersionOfNoIwfGlobalVersionSearchAttribute() bo
return p.version >= StartingVersionNoIwfGlobalVersionSearchAttribute
}

func (p *GlobalVersioner) IsAfterVersionOfYieldOnConditionalComplete() bool {
return p.version >= StartingVersionYieldOnConditionalComplete
}

// methods checking feature/functionality availability

func (p *GlobalVersioner) IsUsingGlobalVersionSearchAttribute() bool {
return p.version >= StartingVersionUsingGlobalVersioning && p.version < StartingVersionNoIwfGlobalVersionSearchAttribute
}

func (p *GlobalVersioner) UpsertGlobalVersionSearchAttribute() error {
if p.OmitVersionMarker {
// the search attribute is already set when starting the workflow
return nil
}
// TODO this bug in Cadence SDK may cause concurrent writes
// https://github.com/uber-go/cadence-client/issues/1198
if p.workflowProvider.GetBackendType() != service.BackendTypeCadence {
if p.IsUsingGlobalVersionSearchAttribute() &&
p.workflowProvider.GetBackendType() != service.BackendTypeCadence {
// Note that there was bug in Cadence SDK may cause concurrent writes hence we never upsert for Cadence
// https://github.com/uber-go/cadence-client/issues/1198

return p.workflowProvider.UpsertSearchAttributes(p.ctx, map[string]interface{}{
service.SearchAttributeGlobalVersion: MaxOfAllVersions,
})
Expand Down
5 changes: 3 additions & 2 deletions service/interpreter/signalReceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,16 @@ func (sr *SignalReceiver) DumpReceived(channelNames []string) map[string][]*iwfi
return data
}

// DrainAllUnreceivedSignals will process all the signals
// DrainAllReceivedButUnprocessedSignals will process all the signals that are received but not processed in the current
// workflow task.
// There are two cases this is needed:
// 1. ContinueAsNew:
// retrieve signals that after signal handler threads are stopped,
// so that the signals can be carried over to next run by continueAsNew.
// This includes both regular user signals and system signals
// 2. Conditional close/complete workflow on signal/internal channel:
// retrieve all signal/internal channel messages before checking the signal/internal channels
func (sr *SignalReceiver) DrainAllUnreceivedSignals(ctx UnifiedContext) {
func (sr *SignalReceiver) DrainAllReceivedButUnprocessedSignals(ctx UnifiedContext) {
unhandledSigs := sr.provider.GetUnhandledSignalNames(ctx)
if len(unhandledSigs) == 0 {
return
Expand Down
46 changes: 31 additions & 15 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ func InterpreterImpl(
return
}

if globalVersioner.IsUsingGlobalVersionSearchAttribute() {
err = globalVersioner.UpsertGlobalVersionSearchAttribute()
if err != nil {
retErr = err
return
}
err = globalVersioner.UpsertGlobalVersionSearchAttribute()
if err != nil {
retErr = err
return
}

if !input.Config.GetDisableSystemSearchAttribute() {
Expand Down Expand Up @@ -244,7 +242,7 @@ func InterpreterImpl(
// NOTE: decision is only available on this CompletedStateExecutionStatus

canGoNext, gracefulComplete, forceComplete, forceFail, output, err :=
checkClosingWorkflow(ctx, provider, decision, state.GetStateId(), stateExeId, interStateChannel, signalReceiver)
checkClosingWorkflow(ctx, provider, globalVersioner, decision, state.GetStateId(), stateExeId, interStateChannel, signalReceiver)
if err != nil {
errToFailWf = err
// no return so that it can fall through to call MarkStateExecutionCompleted
Expand Down Expand Up @@ -339,7 +337,7 @@ func InterpreterImpl(
}
// NOTE: This must be the last thing before continueAsNew!!!
// Otherwise, there could be signals unhandled
signalReceiver.DrainAllUnreceivedSignals(ctx)
signalReceiver.DrainAllReceivedButUnprocessedSignals(ctx)

// after draining signals, there could be some changes
// last fail workflow signal, return the workflow so that we don't carry over the fail request
Expand Down Expand Up @@ -382,7 +380,7 @@ func InterpreterImpl(
}

func checkClosingWorkflow(
ctx UnifiedContext, provider WorkflowProvider, decision *iwfidl.StateDecision,
ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner, decision *iwfidl.StateDecision,
currentStateId, currentStateExeId string,
internalChannel *InterStateChannel, signalReceiver *SignalReceiver,
) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) {
Expand All @@ -391,12 +389,13 @@ func checkClosingWorkflow(
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY ||
conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY {
// trigger a signal draining so that all the signal/internal channel messages are processed
// TODO https://github.com/indeedeng/iwf/issues/289
// https://github.com/indeedeng/iwf/issues/290
// if a messages from internal channel is published via State execution,
// we don't do any draining here yet, so the conditional completion could still lose the messages
// to workaround, user code will have to use persistence locking
signalReceiver.DrainAllUnreceivedSignals(ctx)
signalReceiver.DrainAllReceivedButUnprocessedSignals(ctx)
// Messages of internal channels could be published via State executions, within the same workflow task.
// If we don't do any draining and process them, the conditional completion could lose the messages
err = DrainReceivedButUnprocessedInternalChannelsFromStateApis(ctx, provider, versioner)
if err != nil {
return
}

conditionMet := false
if conditionClose.GetConditionalCloseType() == iwfidl.FORCE_COMPLETE_ON_INTERNAL_CHANNEL_EMPTY &&
Expand Down Expand Up @@ -487,6 +486,23 @@ func checkClosingWorkflow(
return
}

func DrainReceivedButUnprocessedInternalChannelsFromStateApis(
ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner,
) error {
if versioner.IsAfterVersionOfYieldOnConditionalComplete() {
// Just yield, by waiting on an empty lambda, nothing else.
// It will let other workflow threads/coroutines to run.
// This will drain the messages published from state APIs.
// NOTE that this is extremely tricky in Cadence/Temporal programming model.
// Read more: https://stackoverflow.com/questions/71356668/how-does-multi-threading-works-in-cadence-temporal-workflow
//https://docs.temporal.io/encyclopedia/go-sdk-multithreading
return provider.Await(ctx, func() bool {
return true
})
}
return nil
}

func processStateExecution(
ctx UnifiedContext,
provider WorkflowProvider,
Expand Down

0 comments on commit a26f918

Please sign in to comment.