Skip to content

Commit

Permalink
Implement system search attributes(global version, executingStateIds,…
Browse files Browse the repository at this point in the history
… IwfWorkflowType) (#68)
  • Loading branch information
longquanzheng authored Nov 8, 2022
1 parent affce4f commit de0d711
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 13 deletions.
28 changes: 24 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,40 @@ Any contribution is welcome.

### Run with local Temporalite
1. Run a local Temporalite following the [instruction](https://github.com/temporalio/temporalite). If you see error `error setting up schema`, try use command `temporalite start --namespace default -f my_test.db` instead to start.
2. Go to http://localhost:8233/ for Temporal WebUI
2. Register a default namespace
```shell
tctl --ns default n re
```
3. Go to http://localhost:8233/ for Temporal WebUI

NOTE: alternatively, go to [Temporal-dockercompose](https://github.com/temporalio/docker-compose) to run with docker

3. For `attribute_test.go` integTests, you need to register search attributes:
```bash
3. Register system search attributes required by iWF server
```shell
tctl adm cl asa -n IwfWorkflowType -t Keyword
tctl adm cl asa -n GlobalWorkflowVersion -t Int
tctl adm cl asa -n ExecutingStateIds -t Keyword

```
4 For `attribute_test.go` integTests, you need to register search attributes:
```shell
tctl adm cl asa -n CustomKeywordField -t Keyword
tctl adm cl asa -n CustomIntField -t Int
```

### Run with local Cadence
1. Run a local Cadence server following the [instructions](https://github.com/uber/cadence/tree/master/docker)
```
docker-compose -f docker-compose-es-v7.yml up
```
2. Register a new domain if not haven `cadence --do default domain register`
3. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days
3. Register system search attributes required by iWF server
```
cadence adm cl asa --search_attr_key GlobalWorkflowVersion --search_attr_type 2
cadence adm cl asa --search_attr_key ExecutingStateIds --search_attr_type 0
cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0
```
4. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days

## Development Plan
### 1.0
Expand Down
2 changes: 1 addition & 1 deletion integ/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType) {
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

closeFunc2 := startIwfService(service.BackendTypeTemporal)
closeFunc2 := startIwfService(backendType)
defer closeFunc2()

// start a workflow
Expand Down
4 changes: 4 additions & 0 deletions service/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (
WorkflowStatusTerminated = "TERMINATED"
WorkflowStatusCanceled = "CANCELED"
WorkflowStatusContinueAsNew = "CONTINUED_AS_NEW"

SearchAttributeGlobalVersion = "GlobalWorkflowVersion"
SearchAttributeExecutingStateIds = "ExecutingStateIds"
SearchAttributeIwfWorkflowType = "IwfWorkflowType"
)

type BackendType string
Expand Down
10 changes: 10 additions & 0 deletions service/interpreter/cadence/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration
return workflow.Sleep(wfCtx, d)
}

func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
}

version := workflow.GetVersion(wfCtx, changeID, workflow.Version(minSupported), workflow.Version(maxSupported))
return int(version)
}

type cadenceReceiveChannel struct {
channel workflow.Channel
}
Expand Down
29 changes: 29 additions & 0 deletions service/interpreter/globalVersionProvider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package interpreter

import "github.com/indeedeng/iwf/service"

const globalChangeId = "global"
const startingVersionUsingGlobalVersioning = 1
const maxOfAllVersions = startingVersionUsingGlobalVersioning

// see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api
type globalVersioner struct {
workflowProvider WorkflowProvider
}

func newGlobalVersionProvider(workflowProvider WorkflowProvider) *globalVersioner {
return &globalVersioner{
workflowProvider: workflowProvider,
}
}

func (p *globalVersioner) isAfterVersionOfUsingGlobalVersioning(ctx UnifiedContext) bool {
version := p.workflowProvider.GetVersion(ctx, globalChangeId, 0, maxOfAllVersions)
return version >= startingVersionUsingGlobalVersioning
}

func (p *globalVersioner) upsertGlobalVersionSearchAttribute(ctx UnifiedContext) error {
return p.workflowProvider.UpsertSearchAttributes(ctx, map[string]interface{}{
service.SearchAttributeGlobalVersion: maxOfAllVersions,
})
}
1 change: 1 addition & 0 deletions service/interpreter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type WorkflowProvider interface {
Sleep(ctx UnifiedContext, d time.Duration) (err error)
GetSignalChannel(ctx UnifiedContext, signalName string) (receiveChannel ReceiveChannel)
GetContextValue(ctx UnifiedContext, key string) interface{}
GetVersion(ctx UnifiedContext, changeID string, minSupported, maxSupported int) int
GetBackendType() service.BackendType
}

Expand Down
63 changes: 63 additions & 0 deletions service/interpreter/stateExecutingManager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package interpreter

import (
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
)

type stateExecutingManager struct {
ctx UnifiedContext
provider WorkflowProvider

stateIdCount map[string]int
totalExecutingCount int
}

func newStateExecutingManager(ctx UnifiedContext, provider WorkflowProvider) *stateExecutingManager {
return &stateExecutingManager{
ctx: ctx,
provider: provider,
stateIdCount: map[string]int{},
totalExecutingCount: 0,
}
}

func (e *stateExecutingManager) startStates(states []iwfidl.StateMovement) error {
needsUpdate := false
for _, s := range states {
e.stateIdCount[s.StateId]++
if e.stateIdCount[s.StateId] == 1 {
// first time the stateId show up
needsUpdate = true
}
}
e.totalExecutingCount += len(states)
if needsUpdate {
return e.updateSearchAttribute()
}
return nil
}

func (e *stateExecutingManager) completeStates(state iwfidl.StateMovement) error {
e.stateIdCount[state.StateId]--
e.totalExecutingCount -= 1
if e.stateIdCount[state.StateId] == 0 {
delete(e.stateIdCount, state.StateId)
return e.updateSearchAttribute()
}
return nil
}

func (e *stateExecutingManager) getTotalExecutingStates() int {
return e.totalExecutingCount
}

func (e *stateExecutingManager) updateSearchAttribute() error {
var executingStateIds []string
for sid := range e.stateIdCount {
executingStateIds = append(executingStateIds, sid)
}
return e.provider.UpsertSearchAttributes(e.ctx, map[string]interface{}{
service.SearchAttributeExecutingStateIds: executingStateIds,
})
}
10 changes: 10 additions & 0 deletions service/interpreter/temporal/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration
return workflow.Sleep(wfCtx, d)
}

func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
}

version := workflow.GetVersion(wfCtx, changeID, workflow.Version(minSupported), workflow.Version(maxSupported))
return int(version)
}

type temporalReceiveChannel struct {
channel workflow.ReceiveChannel
}
Expand Down
37 changes: 29 additions & 8 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ import (
)

func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) {
globalVersionProvider := newGlobalVersionProvider(provider)
if globalVersionProvider.isAfterVersionOfUsingGlobalVersioning(ctx) {
err := globalVersionProvider.upsertGlobalVersionSearchAttribute(ctx)
if err != nil {
return nil, err
}
}

err := provider.UpsertSearchAttributes(ctx, map[string]interface{}{
service.SearchAttributeIwfWorkflowType: input.IwfWorkflowType,
})
if err != nil {
return nil, err
}

execution := service.IwfWorkflowExecution{
IwfWorkerUrl: input.IwfWorkerUrl,
WorkflowType: input.IwfWorkflowType,
Expand All @@ -28,7 +43,7 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
return provider.UpsertSearchAttributes(ctx, attributes)
})

err := provider.SetQueryHandler(ctx, service.AttributeQueryType, func(req service.QueryAttributeRequest) (service.QueryAttributeResponse, error) {
err = provider.SetQueryHandler(ctx, service.AttributeQueryType, func(req service.QueryAttributeRequest) (service.QueryAttributeResponse, error) {
return attrMgr.GetQueryAttributesByKey(req), nil
})
if err != nil {
Expand All @@ -38,12 +53,16 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
var errToFailWf error // TODO Note that today different errors could overwrite each other, we only support last one wins. we may use multiError to improve.
var outputsToReturnWf []iwfidl.StateCompletionOutput
var forceCompleteWf bool
inFlightExecutingStateCount := 0
stateExecutingMgr := newStateExecutingManager(ctx, provider)
//inFlightExecutingStateCount := 0

for len(currentStates) > 0 {
// copy the whole slice(pointer)
inFlightExecutingStateCount += len(currentStates)
statesToExecute := currentStates
err := stateExecutingMgr.startStates(currentStates)
if err != nil {
return nil, err
}
//reset to empty slice since each iteration will process all current states in the queue
currentStates = nil

Expand All @@ -52,10 +71,6 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
// state must be passed via parameter https://stackoverflow.com/questions/67263092
stateCtx := provider.ExtendContextWithValue(ctx, "state", stateToExecute)
provider.GoNamed(stateCtx, stateToExecute.GetStateId(), func(ctx UnifiedContext) {
defer func() {
inFlightExecutingStateCount--
}()

state, ok := provider.GetContextValue(ctx, "state").(iwfidl.StateMovement)
if !ok {
errToFailWf = provider.NewApplicationError(
Expand All @@ -64,6 +79,12 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
)
return
}
defer func() {
err := stateExecutingMgr.completeStates(state)
if err != nil {
errToFailWf = err
}
}()

stateExeId := stateExeIdMgr.IncAndGetNextExecutionId(state.GetStateId())
decision, err := executeState(ctx, provider, state, execution, stateExeId, attrMgr, interStateChannel)
Expand Down Expand Up @@ -94,7 +115,7 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
}

awaitError := provider.Await(ctx, func() bool {
return len(currentStates) > 0 || errToFailWf != nil || forceCompleteWf || inFlightExecutingStateCount == 0
return len(currentStates) > 0 || errToFailWf != nil || forceCompleteWf || stateExecutingMgr.getTotalExecutingStates() == 0
})
if errToFailWf != nil || forceCompleteWf {
return &service.InterpreterWorkflowOutput{
Expand Down

0 comments on commit de0d711

Please sign in to comment.