From de0d711bb8b4c506865dd4ba0686cd3d0f1506f9 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Mon, 7 Nov 2022 21:13:41 -0800 Subject: [PATCH] Implement system search attributes(global version, executingStateIds, IwfWorkflowType) (#68) --- README.md | 28 +++++++-- integ/timer_test.go | 2 +- service/const.go | 4 ++ .../interpreter/cadence/workflowProvider.go | 10 +++ service/interpreter/globalVersionProvider.go | 29 +++++++++ service/interpreter/interfaces.go | 1 + service/interpreter/stateExecutingManager.go | 63 +++++++++++++++++++ .../interpreter/temporal/workflowProvider.go | 10 +++ service/interpreter/workflowImpl.go | 37 ++++++++--- 9 files changed, 171 insertions(+), 13 deletions(-) create mode 100644 service/interpreter/globalVersionProvider.go create mode 100644 service/interpreter/stateExecutingManager.go diff --git a/README.md b/README.md index 7d238a65..cf57db24 100644 --- a/README.md +++ b/README.md @@ -137,20 +137,40 @@ Any contribution is welcome. ### Run with local Temporalite 1. Run a local Temporalite following the [instruction](https://github.com/temporalio/temporalite). If you see error `error setting up schema`, try use command `temporalite start --namespace default -f my_test.db` instead to start. -2. Go to http://localhost:8233/ for Temporal WebUI +2. Register a default namespace +```shell +tctl --ns default n re +``` +3. Go to http://localhost:8233/ for Temporal WebUI NOTE: alternatively, go to [Temporal-dockercompose](https://github.com/temporalio/docker-compose) to run with docker -3. For `attribute_test.go` integTests, you need to register search attributes: -```bash +3. Register system search attributes required by iWF server +```shell +tctl adm cl asa -n IwfWorkflowType -t Keyword +tctl adm cl asa -n GlobalWorkflowVersion -t Int +tctl adm cl asa -n ExecutingStateIds -t Keyword + +``` +4 For `attribute_test.go` integTests, you need to register search attributes: +```shell tctl adm cl asa -n CustomKeywordField -t Keyword tctl adm cl asa -n CustomIntField -t Int ``` ### Run with local Cadence 1. Run a local Cadence server following the [instructions](https://github.com/uber/cadence/tree/master/docker) +``` +docker-compose -f docker-compose-es-v7.yml up +``` 2. Register a new domain if not haven `cadence --do default domain register` -3. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days +3. Register system search attributes required by iWF server +``` +cadence adm cl asa --search_attr_key GlobalWorkflowVersion --search_attr_type 2 +cadence adm cl asa --search_attr_key ExecutingStateIds --search_attr_type 0 +cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0 +``` +4. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days ## Development Plan ### 1.0 diff --git a/integ/timer_test.go b/integ/timer_test.go index 15202b72..eb56015f 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -27,7 +27,7 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType) { closeFunc1 := startWorkflowWorker(wfHandler) defer closeFunc1() - closeFunc2 := startIwfService(service.BackendTypeTemporal) + closeFunc2 := startIwfService(backendType) defer closeFunc2() // start a workflow diff --git a/service/const.go b/service/const.go index c13bc37f..5ce77646 100644 --- a/service/const.go +++ b/service/const.go @@ -37,6 +37,10 @@ const ( WorkflowStatusTerminated = "TERMINATED" WorkflowStatusCanceled = "CANCELED" WorkflowStatusContinueAsNew = "CONTINUED_AS_NEW" + + SearchAttributeGlobalVersion = "GlobalWorkflowVersion" + SearchAttributeExecutingStateIds = "ExecutingStateIds" + SearchAttributeIwfWorkflowType = "IwfWorkflowType" ) type BackendType string diff --git a/service/interpreter/cadence/workflowProvider.go b/service/interpreter/cadence/workflowProvider.go index 7c15851f..87c4dedf 100644 --- a/service/interpreter/cadence/workflowProvider.go +++ b/service/interpreter/cadence/workflowProvider.go @@ -142,6 +142,16 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration return workflow.Sleep(wfCtx, d) } +func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int { + wfCtx, ok := ctx.GetContext().(workflow.Context) + if !ok { + panic("cannot convert to temporal workflow context") + } + + version := workflow.GetVersion(wfCtx, changeID, workflow.Version(minSupported), workflow.Version(maxSupported)) + return int(version) +} + type cadenceReceiveChannel struct { channel workflow.Channel } diff --git a/service/interpreter/globalVersionProvider.go b/service/interpreter/globalVersionProvider.go new file mode 100644 index 00000000..5d638ff9 --- /dev/null +++ b/service/interpreter/globalVersionProvider.go @@ -0,0 +1,29 @@ +package interpreter + +import "github.com/indeedeng/iwf/service" + +const globalChangeId = "global" +const startingVersionUsingGlobalVersioning = 1 +const maxOfAllVersions = startingVersionUsingGlobalVersioning + +// see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api +type globalVersioner struct { + workflowProvider WorkflowProvider +} + +func newGlobalVersionProvider(workflowProvider WorkflowProvider) *globalVersioner { + return &globalVersioner{ + workflowProvider: workflowProvider, + } +} + +func (p *globalVersioner) isAfterVersionOfUsingGlobalVersioning(ctx UnifiedContext) bool { + version := p.workflowProvider.GetVersion(ctx, globalChangeId, 0, maxOfAllVersions) + return version >= startingVersionUsingGlobalVersioning +} + +func (p *globalVersioner) upsertGlobalVersionSearchAttribute(ctx UnifiedContext) error { + return p.workflowProvider.UpsertSearchAttributes(ctx, map[string]interface{}{ + service.SearchAttributeGlobalVersion: maxOfAllVersions, + }) +} diff --git a/service/interpreter/interfaces.go b/service/interpreter/interfaces.go index b867ee46..b96ffa32 100644 --- a/service/interpreter/interfaces.go +++ b/service/interpreter/interfaces.go @@ -83,6 +83,7 @@ type WorkflowProvider interface { Sleep(ctx UnifiedContext, d time.Duration) (err error) GetSignalChannel(ctx UnifiedContext, signalName string) (receiveChannel ReceiveChannel) GetContextValue(ctx UnifiedContext, key string) interface{} + GetVersion(ctx UnifiedContext, changeID string, minSupported, maxSupported int) int GetBackendType() service.BackendType } diff --git a/service/interpreter/stateExecutingManager.go b/service/interpreter/stateExecutingManager.go new file mode 100644 index 00000000..b11c9e7d --- /dev/null +++ b/service/interpreter/stateExecutingManager.go @@ -0,0 +1,63 @@ +package interpreter + +import ( + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" +) + +type stateExecutingManager struct { + ctx UnifiedContext + provider WorkflowProvider + + stateIdCount map[string]int + totalExecutingCount int +} + +func newStateExecutingManager(ctx UnifiedContext, provider WorkflowProvider) *stateExecutingManager { + return &stateExecutingManager{ + ctx: ctx, + provider: provider, + stateIdCount: map[string]int{}, + totalExecutingCount: 0, + } +} + +func (e *stateExecutingManager) startStates(states []iwfidl.StateMovement) error { + needsUpdate := false + for _, s := range states { + e.stateIdCount[s.StateId]++ + if e.stateIdCount[s.StateId] == 1 { + // first time the stateId show up + needsUpdate = true + } + } + e.totalExecutingCount += len(states) + if needsUpdate { + return e.updateSearchAttribute() + } + return nil +} + +func (e *stateExecutingManager) completeStates(state iwfidl.StateMovement) error { + e.stateIdCount[state.StateId]-- + e.totalExecutingCount -= 1 + if e.stateIdCount[state.StateId] == 0 { + delete(e.stateIdCount, state.StateId) + return e.updateSearchAttribute() + } + return nil +} + +func (e *stateExecutingManager) getTotalExecutingStates() int { + return e.totalExecutingCount +} + +func (e *stateExecutingManager) updateSearchAttribute() error { + var executingStateIds []string + for sid := range e.stateIdCount { + executingStateIds = append(executingStateIds, sid) + } + return e.provider.UpsertSearchAttributes(e.ctx, map[string]interface{}{ + service.SearchAttributeExecutingStateIds: executingStateIds, + }) +} diff --git a/service/interpreter/temporal/workflowProvider.go b/service/interpreter/temporal/workflowProvider.go index 9d05aeb7..08e92368 100644 --- a/service/interpreter/temporal/workflowProvider.go +++ b/service/interpreter/temporal/workflowProvider.go @@ -130,6 +130,16 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration return workflow.Sleep(wfCtx, d) } +func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int { + wfCtx, ok := ctx.GetContext().(workflow.Context) + if !ok { + panic("cannot convert to temporal workflow context") + } + + version := workflow.GetVersion(wfCtx, changeID, workflow.Version(minSupported), workflow.Version(maxSupported)) + return int(version) +} + type temporalReceiveChannel struct { channel workflow.ReceiveChannel } diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 008179df..4362e728 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -8,6 +8,21 @@ import ( ) func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) { + globalVersionProvider := newGlobalVersionProvider(provider) + if globalVersionProvider.isAfterVersionOfUsingGlobalVersioning(ctx) { + err := globalVersionProvider.upsertGlobalVersionSearchAttribute(ctx) + if err != nil { + return nil, err + } + } + + err := provider.UpsertSearchAttributes(ctx, map[string]interface{}{ + service.SearchAttributeIwfWorkflowType: input.IwfWorkflowType, + }) + if err != nil { + return nil, err + } + execution := service.IwfWorkflowExecution{ IwfWorkerUrl: input.IwfWorkerUrl, WorkflowType: input.IwfWorkflowType, @@ -28,7 +43,7 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic return provider.UpsertSearchAttributes(ctx, attributes) }) - err := provider.SetQueryHandler(ctx, service.AttributeQueryType, func(req service.QueryAttributeRequest) (service.QueryAttributeResponse, error) { + err = provider.SetQueryHandler(ctx, service.AttributeQueryType, func(req service.QueryAttributeRequest) (service.QueryAttributeResponse, error) { return attrMgr.GetQueryAttributesByKey(req), nil }) if err != nil { @@ -38,12 +53,16 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic var errToFailWf error // TODO Note that today different errors could overwrite each other, we only support last one wins. we may use multiError to improve. var outputsToReturnWf []iwfidl.StateCompletionOutput var forceCompleteWf bool - inFlightExecutingStateCount := 0 + stateExecutingMgr := newStateExecutingManager(ctx, provider) + //inFlightExecutingStateCount := 0 for len(currentStates) > 0 { // copy the whole slice(pointer) - inFlightExecutingStateCount += len(currentStates) statesToExecute := currentStates + err := stateExecutingMgr.startStates(currentStates) + if err != nil { + return nil, err + } //reset to empty slice since each iteration will process all current states in the queue currentStates = nil @@ -52,10 +71,6 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic // state must be passed via parameter https://stackoverflow.com/questions/67263092 stateCtx := provider.ExtendContextWithValue(ctx, "state", stateToExecute) provider.GoNamed(stateCtx, stateToExecute.GetStateId(), func(ctx UnifiedContext) { - defer func() { - inFlightExecutingStateCount-- - }() - state, ok := provider.GetContextValue(ctx, "state").(iwfidl.StateMovement) if !ok { errToFailWf = provider.NewApplicationError( @@ -64,6 +79,12 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic ) return } + defer func() { + err := stateExecutingMgr.completeStates(state) + if err != nil { + errToFailWf = err + } + }() stateExeId := stateExeIdMgr.IncAndGetNextExecutionId(state.GetStateId()) decision, err := executeState(ctx, provider, state, execution, stateExeId, attrMgr, interStateChannel) @@ -94,7 +115,7 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic } awaitError := provider.Await(ctx, func() bool { - return len(currentStates) > 0 || errToFailWf != nil || forceCompleteWf || inFlightExecutingStateCount == 0 + return len(currentStates) > 0 || errToFailWf != nil || forceCompleteWf || stateExecutingMgr.getTotalExecutingStates() == 0 }) if errToFailWf != nil || forceCompleteWf { return &service.InterpreterWorkflowOutput{