From 1352220e39f14d522fa338581860f0a183c9c32e Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Mon, 2 Dec 2024 16:44:10 -0800 Subject: [PATCH] IWF-369: Remove key from map when signal/internal channel is empty (#495) --- ...rstate_test.go => internalchannel_test.go} | 0 service/interpreter/InterStateChannel.go | 59 ------------------- service/interpreter/InternalChannel.go | 57 ++++++++++++++++++ service/interpreter/continueAsNewer.go | 8 +-- service/interpreter/signalReceiver.go | 22 ++++--- service/interpreter/workflowImpl.go | 12 ++-- service/interpreter/workflowUpdater.go | 4 +- 7 files changed, 79 insertions(+), 83 deletions(-) rename integ/{interstate_test.go => internalchannel_test.go} (100%) delete mode 100644 service/interpreter/InterStateChannel.go create mode 100644 service/interpreter/InternalChannel.go diff --git a/integ/interstate_test.go b/integ/internalchannel_test.go similarity index 100% rename from integ/interstate_test.go rename to integ/internalchannel_test.go diff --git a/service/interpreter/InterStateChannel.go b/service/interpreter/InterStateChannel.go deleted file mode 100644 index 8c5a920c..00000000 --- a/service/interpreter/InterStateChannel.go +++ /dev/null @@ -1,59 +0,0 @@ -package interpreter - -import "github.com/indeedeng/iwf/gen/iwfidl" - -type InterStateChannel struct { - // key is channel name - receivedData map[string][]*iwfidl.EncodedObject -} - -func NewInterStateChannel() *InterStateChannel { - return &InterStateChannel{ - receivedData: map[string][]*iwfidl.EncodedObject{}, - } -} - -func RebuildInterStateChannel(refill map[string][]*iwfidl.EncodedObject) *InterStateChannel { - return &InterStateChannel{ - receivedData: refill, - } -} - -func (i *InterStateChannel) ReadReceived(channelNames []string) map[string][]*iwfidl.EncodedObject { - if len(channelNames) == 0 { - return i.receivedData - } - data := make(map[string][]*iwfidl.EncodedObject) - for _, n := range channelNames { - data[n] = i.receivedData[n] - } - return data -} - -func (i *InterStateChannel) HasData(channelName string) bool { - l := i.receivedData[channelName] - return len(l) > 0 -} - -func (i *InterStateChannel) ProcessPublishing(publishes []iwfidl.InterStateChannelPublishing) { - for _, pub := range publishes { - i.receive(pub.ChannelName, pub.Value) - } -} - -func (i *InterStateChannel) receive(channelName string, data *iwfidl.EncodedObject) { - l := i.receivedData[channelName] - l = append(l, data) - i.receivedData[channelName] = l -} - -func (i *InterStateChannel) Retrieve(channelName string) *iwfidl.EncodedObject { - l := i.receivedData[channelName] - if len(l) <= 0 { - panic("critical bug, this shouldn't happen") - } - data := l[0] - l = l[1:] - i.receivedData[channelName] = l - return data -} diff --git a/service/interpreter/InternalChannel.go b/service/interpreter/InternalChannel.go new file mode 100644 index 00000000..7737f455 --- /dev/null +++ b/service/interpreter/InternalChannel.go @@ -0,0 +1,57 @@ +package interpreter + +import "github.com/indeedeng/iwf/gen/iwfidl" + +type InternalChannel struct { + // key is channel name + receivedData map[string][]*iwfidl.EncodedObject +} + +func NewInternalChannel() *InternalChannel { + return &InternalChannel{ + receivedData: map[string][]*iwfidl.EncodedObject{}, + } +} + +func RebuildInternalChannel(refill map[string][]*iwfidl.EncodedObject) *InternalChannel { + return &InternalChannel{ + receivedData: refill, + } +} + +func (i *InternalChannel) GetAllReceived() map[string][]*iwfidl.EncodedObject { + return i.receivedData +} + +func (i *InternalChannel) HasData(channelName string) bool { + l := i.receivedData[channelName] + return len(l) > 0 +} + +func (i *InternalChannel) ProcessPublishing(publishes []iwfidl.InterStateChannelPublishing) { + for _, pub := range publishes { + i.receive(pub.ChannelName, pub.Value) + } +} + +func (i *InternalChannel) receive(channelName string, data *iwfidl.EncodedObject) { + l := i.receivedData[channelName] + l = append(l, data) + i.receivedData[channelName] = l +} + +func (i *InternalChannel) Retrieve(channelName string) *iwfidl.EncodedObject { + l := i.receivedData[channelName] + if len(l) <= 0 { + panic("critical bug, this shouldn't happen") + } + data := l[0] + l = l[1:] + if len(l) == 0 { + delete(i.receivedData, channelName) + } else { + i.receivedData[channelName] = l + } + + return data +} diff --git a/service/interpreter/continueAsNewer.go b/service/interpreter/continueAsNewer.go index 45a8e590..8abdf20d 100644 --- a/service/interpreter/continueAsNewer.go +++ b/service/interpreter/continueAsNewer.go @@ -16,7 +16,7 @@ type ContinueAsNewer struct { inflightUpdateOperations int stateRequestQueue *StateRequestQueue - interStateChannel *InterStateChannel + interStateChannel *InternalChannel stateExecutionCounter *StateExecutionCounter persistenceManager *PersistenceManager signalReceiver *SignalReceiver @@ -26,7 +26,7 @@ type ContinueAsNewer struct { func NewContinueAsNewer( provider WorkflowProvider, - interStateChannel *InterStateChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter, + interStateChannel *InternalChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector, timerProcessor *TimerProcessor, ) *ContinueAsNewer { @@ -120,8 +120,8 @@ func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx UnifiedContext) e localStateExecutionToResumeMap[value.StateExecutionId] = value } return &service.ContinueAsNewDumpResponse{ - InterStateChannelReceived: c.interStateChannel.ReadReceived(nil), - SignalsReceived: c.signalReceiver.DumpReceived(nil), + InterStateChannelReceived: c.interStateChannel.GetAllReceived(), + SignalsReceived: c.signalReceiver.GetAllReceived(), StateExecutionCounterInfo: c.stateExecutionCounter.Dump(), DataObjects: c.persistenceManager.GetAllDataObjects(), SearchAttributes: c.persistenceManager.GetAllSearchAttributes(), diff --git a/service/interpreter/signalReceiver.go b/service/interpreter/signalReceiver.go index 466709da..50e00f7c 100644 --- a/service/interpreter/signalReceiver.go +++ b/service/interpreter/signalReceiver.go @@ -15,13 +15,13 @@ type SignalReceiver struct { provider WorkflowProvider timerProcessor *TimerProcessor workflowConfiger *WorkflowConfiger - interStateChannel *InterStateChannel + interStateChannel *InternalChannel stateRequestQueue *StateRequestQueue persistenceManager *PersistenceManager } func NewSignalReceiver( - ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InterStateChannel, + ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InternalChannel, stateRequestQueue *StateRequestQueue, persistenceManager *PersistenceManager, tp *TimerProcessor, continueAsNewCounter *ContinueAsNewCounter, workflowConfiger *WorkflowConfiger, @@ -224,19 +224,17 @@ func (sr *SignalReceiver) Retrieve(channelName string) *iwfidl.EncodedObject { } sigVal := l[0] l = l[1:] - sr.receivedSignals[channelName] = l + if len(l) == 0 { + delete(sr.receivedSignals, channelName) + } else { + sr.receivedSignals[channelName] = l + } + return sigVal } -func (sr *SignalReceiver) DumpReceived(channelNames []string) map[string][]*iwfidl.EncodedObject { - if len(channelNames) == 0 { - return sr.receivedSignals - } - data := make(map[string][]*iwfidl.EncodedObject) - for _, n := range channelNames { - data[n] = sr.receivedSignals[n] - } - return data +func (sr *SignalReceiver) GetAllReceived() map[string][]*iwfidl.EncodedObject { + return sr.receivedSignals } // DrainAllReceivedButUnprocessedSignals will process all the signals that are received but not processed in the current diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index a9b15a84..3ba89d1a 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -76,7 +76,7 @@ func InterpreterImpl( IwfWorkerUrl: input.IwfWorkerUrl, } - var interStateChannel *InterStateChannel + var interStateChannel *InternalChannel var stateRequestQueue *StateRequestQueue var persistenceManager *PersistenceManager var timerProcessor *TimerProcessor @@ -96,7 +96,7 @@ func InterpreterImpl( // The below initialization order should be the same as for non-continueAsNew - interStateChannel = RebuildInterStateChannel(previous.InterStateChannelReceived) + interStateChannel = RebuildInternalChannel(previous.InterStateChannelReceived) stateRequestQueue = NewStateRequestQueueWithResumeRequests(previous.StatesToStartFromBeginning, previous.StateExecutionsToResume) persistenceManager = RebuildPersistenceManager(provider, previous.DataObjects, previous.SearchAttributes, input.UseMemoForDataAttributes) timerProcessor = NewTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) @@ -109,7 +109,7 @@ func InterpreterImpl( outputCollector = NewOutputCollector(previous.StateOutputs) continueAsNewer = NewContinueAsNewer(provider, interStateChannel, signalReceiver, stateExecutionCounter, persistenceManager, stateRequestQueue, outputCollector, timerProcessor) } else { - interStateChannel = NewInterStateChannel() + interStateChannel = NewInternalChannel() stateRequestQueue = NewStateRequestQueue() persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes) timerProcessor = NewTimerProcessor(ctx, provider, nil) @@ -382,7 +382,7 @@ func InterpreterImpl( func checkClosingWorkflow( ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner, decision *iwfidl.StateDecision, currentStateId, currentStateExeId string, - internalChannel *InterStateChannel, signalReceiver *SignalReceiver, + internalChannel *InternalChannel, signalReceiver *SignalReceiver, ) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) { if decision.HasConditionalClose() { conditionClose := decision.ConditionalClose @@ -511,7 +511,7 @@ func processStateExecution( stateReq StateRequest, stateExeId string, persistenceManager *PersistenceManager, - interStateChannel *InterStateChannel, + interStateChannel *InternalChannel, signalReceiver *SignalReceiver, timerProcessor *TimerProcessor, continueAsNewer *ContinueAsNewer, @@ -818,7 +818,7 @@ func invokeStateExecute( state iwfidl.StateMovement, stateExeId string, persistenceManager *PersistenceManager, - interStateChannel *InterStateChannel, + interStateChannel *InternalChannel, executionContext iwfidl.Context, commandRes *iwfidl.CommandResults, continueAsNewer *ContinueAsNewer, diff --git a/service/interpreter/workflowUpdater.go b/service/interpreter/workflowUpdater.go index a45c9bc5..448bbc19 100644 --- a/service/interpreter/workflowUpdater.go +++ b/service/interpreter/workflowUpdater.go @@ -11,7 +11,7 @@ type WorkflowUpdater struct { provider WorkflowProvider continueAsNewer *ContinueAsNewer continueAsNewCounter *ContinueAsNewCounter - interStateChannel *InterStateChannel + interStateChannel *InternalChannel stateRequestQueue *StateRequestQueue configer *WorkflowConfiger logger UnifiedLogger @@ -23,7 +23,7 @@ func NewWorkflowUpdater( ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger, - interStateChannel *InterStateChannel, basicInfo service.BasicInfo, globalVersioner *GlobalVersioner, + interStateChannel *InternalChannel, basicInfo service.BasicInfo, globalVersioner *GlobalVersioner, ) (*WorkflowUpdater, error) { updater := &WorkflowUpdater{ persistenceManager: persistenceManager,