Skip to content

Commit

Permalink
IWF-369: Remove key from map when signal/internal channel is empty (#495
Browse files Browse the repository at this point in the history
)
  • Loading branch information
longquanzheng authored Dec 3, 2024
1 parent 1d24724 commit 1352220
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 83 deletions.
File renamed without changes.
59 changes: 0 additions & 59 deletions service/interpreter/InterStateChannel.go

This file was deleted.

57 changes: 57 additions & 0 deletions service/interpreter/InternalChannel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package interpreter

import "github.com/indeedeng/iwf/gen/iwfidl"

type InternalChannel struct {
// key is channel name
receivedData map[string][]*iwfidl.EncodedObject
}

func NewInternalChannel() *InternalChannel {
return &InternalChannel{
receivedData: map[string][]*iwfidl.EncodedObject{},
}
}

func RebuildInternalChannel(refill map[string][]*iwfidl.EncodedObject) *InternalChannel {
return &InternalChannel{
receivedData: refill,
}
}

func (i *InternalChannel) GetAllReceived() map[string][]*iwfidl.EncodedObject {
return i.receivedData
}

func (i *InternalChannel) HasData(channelName string) bool {
l := i.receivedData[channelName]
return len(l) > 0
}

func (i *InternalChannel) ProcessPublishing(publishes []iwfidl.InterStateChannelPublishing) {
for _, pub := range publishes {
i.receive(pub.ChannelName, pub.Value)
}
}

func (i *InternalChannel) receive(channelName string, data *iwfidl.EncodedObject) {
l := i.receivedData[channelName]
l = append(l, data)
i.receivedData[channelName] = l
}

func (i *InternalChannel) Retrieve(channelName string) *iwfidl.EncodedObject {
l := i.receivedData[channelName]
if len(l) <= 0 {
panic("critical bug, this shouldn't happen")
}
data := l[0]
l = l[1:]
if len(l) == 0 {
delete(i.receivedData, channelName)
} else {
i.receivedData[channelName] = l
}

return data
}
8 changes: 4 additions & 4 deletions service/interpreter/continueAsNewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ContinueAsNewer struct {
inflightUpdateOperations int

stateRequestQueue *StateRequestQueue
interStateChannel *InterStateChannel
interStateChannel *InternalChannel
stateExecutionCounter *StateExecutionCounter
persistenceManager *PersistenceManager
signalReceiver *SignalReceiver
Expand All @@ -26,7 +26,7 @@ type ContinueAsNewer struct {

func NewContinueAsNewer(
provider WorkflowProvider,
interStateChannel *InterStateChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter,
interStateChannel *InternalChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter,
persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector,
timerProcessor *TimerProcessor,
) *ContinueAsNewer {
Expand Down Expand Up @@ -120,8 +120,8 @@ func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx UnifiedContext) e
localStateExecutionToResumeMap[value.StateExecutionId] = value
}
return &service.ContinueAsNewDumpResponse{
InterStateChannelReceived: c.interStateChannel.ReadReceived(nil),
SignalsReceived: c.signalReceiver.DumpReceived(nil),
InterStateChannelReceived: c.interStateChannel.GetAllReceived(),
SignalsReceived: c.signalReceiver.GetAllReceived(),
StateExecutionCounterInfo: c.stateExecutionCounter.Dump(),
DataObjects: c.persistenceManager.GetAllDataObjects(),
SearchAttributes: c.persistenceManager.GetAllSearchAttributes(),
Expand Down
22 changes: 10 additions & 12 deletions service/interpreter/signalReceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type SignalReceiver struct {
provider WorkflowProvider
timerProcessor *TimerProcessor
workflowConfiger *WorkflowConfiger
interStateChannel *InterStateChannel
interStateChannel *InternalChannel
stateRequestQueue *StateRequestQueue
persistenceManager *PersistenceManager
}

func NewSignalReceiver(
ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InterStateChannel,
ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InternalChannel,
stateRequestQueue *StateRequestQueue,
persistenceManager *PersistenceManager, tp *TimerProcessor, continueAsNewCounter *ContinueAsNewCounter,
workflowConfiger *WorkflowConfiger,
Expand Down Expand Up @@ -224,19 +224,17 @@ func (sr *SignalReceiver) Retrieve(channelName string) *iwfidl.EncodedObject {
}
sigVal := l[0]
l = l[1:]
sr.receivedSignals[channelName] = l
if len(l) == 0 {
delete(sr.receivedSignals, channelName)
} else {
sr.receivedSignals[channelName] = l
}

return sigVal
}

func (sr *SignalReceiver) DumpReceived(channelNames []string) map[string][]*iwfidl.EncodedObject {
if len(channelNames) == 0 {
return sr.receivedSignals
}
data := make(map[string][]*iwfidl.EncodedObject)
for _, n := range channelNames {
data[n] = sr.receivedSignals[n]
}
return data
func (sr *SignalReceiver) GetAllReceived() map[string][]*iwfidl.EncodedObject {
return sr.receivedSignals
}

// DrainAllReceivedButUnprocessedSignals will process all the signals that are received but not processed in the current
Expand Down
12 changes: 6 additions & 6 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func InterpreterImpl(
IwfWorkerUrl: input.IwfWorkerUrl,
}

var interStateChannel *InterStateChannel
var interStateChannel *InternalChannel
var stateRequestQueue *StateRequestQueue
var persistenceManager *PersistenceManager
var timerProcessor *TimerProcessor
Expand All @@ -96,7 +96,7 @@ func InterpreterImpl(

// The below initialization order should be the same as for non-continueAsNew

interStateChannel = RebuildInterStateChannel(previous.InterStateChannelReceived)
interStateChannel = RebuildInternalChannel(previous.InterStateChannelReceived)
stateRequestQueue = NewStateRequestQueueWithResumeRequests(previous.StatesToStartFromBeginning, previous.StateExecutionsToResume)
persistenceManager = RebuildPersistenceManager(provider, previous.DataObjects, previous.SearchAttributes, input.UseMemoForDataAttributes)
timerProcessor = NewTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals)
Expand All @@ -109,7 +109,7 @@ func InterpreterImpl(
outputCollector = NewOutputCollector(previous.StateOutputs)
continueAsNewer = NewContinueAsNewer(provider, interStateChannel, signalReceiver, stateExecutionCounter, persistenceManager, stateRequestQueue, outputCollector, timerProcessor)
} else {
interStateChannel = NewInterStateChannel()
interStateChannel = NewInternalChannel()
stateRequestQueue = NewStateRequestQueue()
persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes)
timerProcessor = NewTimerProcessor(ctx, provider, nil)
Expand Down Expand Up @@ -382,7 +382,7 @@ func InterpreterImpl(
func checkClosingWorkflow(
ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner, decision *iwfidl.StateDecision,
currentStateId, currentStateExeId string,
internalChannel *InterStateChannel, signalReceiver *SignalReceiver,
internalChannel *InternalChannel, signalReceiver *SignalReceiver,
) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) {
if decision.HasConditionalClose() {
conditionClose := decision.ConditionalClose
Expand Down Expand Up @@ -511,7 +511,7 @@ func processStateExecution(
stateReq StateRequest,
stateExeId string,
persistenceManager *PersistenceManager,
interStateChannel *InterStateChannel,
interStateChannel *InternalChannel,
signalReceiver *SignalReceiver,
timerProcessor *TimerProcessor,
continueAsNewer *ContinueAsNewer,
Expand Down Expand Up @@ -818,7 +818,7 @@ func invokeStateExecute(
state iwfidl.StateMovement,
stateExeId string,
persistenceManager *PersistenceManager,
interStateChannel *InterStateChannel,
interStateChannel *InternalChannel,
executionContext iwfidl.Context,
commandRes *iwfidl.CommandResults,
continueAsNewer *ContinueAsNewer,
Expand Down
4 changes: 2 additions & 2 deletions service/interpreter/workflowUpdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type WorkflowUpdater struct {
provider WorkflowProvider
continueAsNewer *ContinueAsNewer
continueAsNewCounter *ContinueAsNewCounter
interStateChannel *InterStateChannel
interStateChannel *InternalChannel
stateRequestQueue *StateRequestQueue
configer *WorkflowConfiger
logger UnifiedLogger
Expand All @@ -23,7 +23,7 @@ func NewWorkflowUpdater(
ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager,
stateRequestQueue *StateRequestQueue,
continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger,
interStateChannel *InterStateChannel, basicInfo service.BasicInfo, globalVersioner *GlobalVersioner,
interStateChannel *InternalChannel, basicInfo service.BasicInfo, globalVersioner *GlobalVersioner,
) (*WorkflowUpdater, error) {
updater := &WorkflowUpdater{
persistenceManager: persistenceManager,
Expand Down

0 comments on commit 1352220

Please sign in to comment.