Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-124: Ignore empty output for collector #497

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
59 changes: 0 additions & 59 deletions service/interpreter/InterStateChannel.go

This file was deleted.

57 changes: 57 additions & 0 deletions service/interpreter/InternalChannel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package interpreter

import "github.com/indeedeng/iwf/gen/iwfidl"

type InternalChannel struct {
// key is channel name
receivedData map[string][]*iwfidl.EncodedObject
}

func NewInterStateChannel() *InternalChannel {
return &InternalChannel{
receivedData: map[string][]*iwfidl.EncodedObject{},
}
}

func RebuildInterStateChannel(refill map[string][]*iwfidl.EncodedObject) *InternalChannel {
return &InternalChannel{
receivedData: refill,
}
}

func (i *InternalChannel) DumpReceived() map[string][]*iwfidl.EncodedObject {
return i.receivedData
}

func (i *InternalChannel) HasData(channelName string) bool {
l := i.receivedData[channelName]
return len(l) > 0
}

func (i *InternalChannel) ProcessPublishing(publishes []iwfidl.InterStateChannelPublishing) {
for _, pub := range publishes {
i.receive(pub.ChannelName, pub.Value)
}
}

func (i *InternalChannel) receive(channelName string, data *iwfidl.EncodedObject) {
l := i.receivedData[channelName]
l = append(l, data)
i.receivedData[channelName] = l
}

func (i *InternalChannel) Retrieve(channelName string) *iwfidl.EncodedObject {
l := i.receivedData[channelName]
if len(l) <= 0 {
panic("critical bug, this shouldn't happen")
}
data := l[0]
l = l[1:]
if len(l) == 0 {
delete(i.receivedData, channelName)
} else {
i.receivedData[channelName] = l
}

return data
}
8 changes: 4 additions & 4 deletions service/interpreter/continueAsNewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ContinueAsNewer struct {
inflightUpdateOperations int

stateRequestQueue *StateRequestQueue
interStateChannel *InterStateChannel
interStateChannel *InternalChannel
stateExecutionCounter *StateExecutionCounter
persistenceManager *PersistenceManager
signalReceiver *SignalReceiver
Expand All @@ -26,7 +26,7 @@ type ContinueAsNewer struct {

func NewContinueAsNewer(
provider WorkflowProvider,
interStateChannel *InterStateChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter,
interStateChannel *InternalChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter,
persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector,
timerProcessor *TimerProcessor,
) *ContinueAsNewer {
Expand Down Expand Up @@ -120,8 +120,8 @@ func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx UnifiedContext) e
localStateExecutionToResumeMap[value.StateExecutionId] = value
}
return &service.ContinueAsNewDumpResponse{
InterStateChannelReceived: c.interStateChannel.ReadReceived(nil),
SignalsReceived: c.signalReceiver.DumpReceived(nil),
InterStateChannelReceived: c.interStateChannel.DumpReceived(),
SignalsReceived: c.signalReceiver.DumpReceived(),
StateExecutionCounterInfo: c.stateExecutionCounter.Dump(),
DataObjects: c.persistenceManager.GetAllDataObjects(),
SearchAttributes: c.persistenceManager.GetAllSearchAttributes(),
Expand Down
4 changes: 3 additions & 1 deletion service/interpreter/outputCollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ func NewOutputCollector(initOutputs []iwfidl.StateCompletionOutput) *OutputColle
}

func (o *OutputCollector) Add(output iwfidl.StateCompletionOutput) {
o.outputs = append(o.outputs, output)
if output.CompletedStateOutput != nil {
o.outputs = append(o.outputs, output)
}
}

func (o *OutputCollector) GetAll() []iwfidl.StateCompletionOutput {
Expand Down
22 changes: 10 additions & 12 deletions service/interpreter/signalReceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type SignalReceiver struct {
provider WorkflowProvider
timerProcessor *TimerProcessor
workflowConfiger *WorkflowConfiger
interStateChannel *InterStateChannel
interStateChannel *InternalChannel
stateRequestQueue *StateRequestQueue
persistenceManager *PersistenceManager
}

func NewSignalReceiver(
ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InterStateChannel,
ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InternalChannel,
stateRequestQueue *StateRequestQueue,
persistenceManager *PersistenceManager, tp *TimerProcessor, continueAsNewCounter *ContinueAsNewCounter,
workflowConfiger *WorkflowConfiger,
Expand Down Expand Up @@ -224,19 +224,17 @@ func (sr *SignalReceiver) Retrieve(channelName string) *iwfidl.EncodedObject {
}
sigVal := l[0]
l = l[1:]
sr.receivedSignals[channelName] = l
if len(l) == 0 {
delete(sr.receivedSignals, channelName)
} else {
sr.receivedSignals[channelName] = l
}

return sigVal
}

func (sr *SignalReceiver) DumpReceived(channelNames []string) map[string][]*iwfidl.EncodedObject {
if len(channelNames) == 0 {
return sr.receivedSignals
}
data := make(map[string][]*iwfidl.EncodedObject)
for _, n := range channelNames {
data[n] = sr.receivedSignals[n]
}
return data
func (sr *SignalReceiver) DumpReceived() map[string][]*iwfidl.EncodedObject {
return sr.receivedSignals
}

// DrainAllReceivedButUnprocessedSignals will process all the signals that are received but not processed in the current
Expand Down
8 changes: 4 additions & 4 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func InterpreterImpl(
IwfWorkerUrl: input.IwfWorkerUrl,
}

var interStateChannel *InterStateChannel
var interStateChannel *InternalChannel
var stateRequestQueue *StateRequestQueue
var persistenceManager *PersistenceManager
var timerProcessor *TimerProcessor
Expand Down Expand Up @@ -382,7 +382,7 @@ func InterpreterImpl(
func checkClosingWorkflow(
ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner, decision *iwfidl.StateDecision,
currentStateId, currentStateExeId string,
internalChannel *InterStateChannel, signalReceiver *SignalReceiver,
internalChannel *InternalChannel, signalReceiver *SignalReceiver,
) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) {
if decision.HasConditionalClose() {
conditionClose := decision.ConditionalClose
Expand Down Expand Up @@ -511,7 +511,7 @@ func processStateExecution(
stateReq StateRequest,
stateExeId string,
persistenceManager *PersistenceManager,
interStateChannel *InterStateChannel,
interStateChannel *InternalChannel,
signalReceiver *SignalReceiver,
timerProcessor *TimerProcessor,
continueAsNewer *ContinueAsNewer,
Expand Down Expand Up @@ -818,7 +818,7 @@ func invokeStateExecute(
state iwfidl.StateMovement,
stateExeId string,
persistenceManager *PersistenceManager,
interStateChannel *InterStateChannel,
interStateChannel *InternalChannel,
executionContext iwfidl.Context,
commandRes *iwfidl.CommandResults,
continueAsNewer *ContinueAsNewer,
Expand Down
4 changes: 2 additions & 2 deletions service/interpreter/workflowUpdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type WorkflowUpdater struct {
provider WorkflowProvider
continueAsNewer *ContinueAsNewer
continueAsNewCounter *ContinueAsNewCounter
interStateChannel *InterStateChannel
interStateChannel *InternalChannel
stateRequestQueue *StateRequestQueue
configer *WorkflowConfiger
logger UnifiedLogger
Expand All @@ -23,7 +23,7 @@ func NewWorkflowUpdater(
ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager,
stateRequestQueue *StateRequestQueue,
continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger,
interStateChannel *InterStateChannel, basicInfo service.BasicInfo, globalVersioner *GlobalVersioner,
interStateChannel *InternalChannel, basicInfo service.BasicInfo, globalVersioner *GlobalVersioner,
) (*WorkflowUpdater, error) {
updater := &WorkflowUpdater{
persistenceManager: persistenceManager,
Expand Down
Loading