Skip to content

Commit

Permalink
Rename versioning optimization flag (#382)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored May 11, 2024
1 parent cbce142 commit bf3ea11
Show file tree
Hide file tree
Showing 10 changed files with 17 additions and 34 deletions.
4 changes: 1 addition & 3 deletions integ/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package integ

import (
"context"
config2 "github.com/indeedeng/iwf/service/common/config"
"github.com/indeedeng/iwf/service/common/ptr"
"log"
"strconv"
Expand Down Expand Up @@ -52,8 +51,7 @@ func doTestBasicWorkflow(t *testing.T, backendType service.BackendType, config *
defer closeFunc1()

_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
OptimizationVersion: ptr.Any(config2.OptimizationVersionNone),
BackendType: backendType,
})
defer closeFunc2()

Expand Down
4 changes: 2 additions & 2 deletions integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
const testWorkflowServerPort = "9714"
const testIwfServerPort = "9715"

func createTestConfig(failAtMemoCompatibility bool, optimizationVersion *int) config.Config {
func createTestConfig(failAtMemoCompatibility bool, optimizedVersioning *bool) config.Config {
return config.Config{
Api: config.ApiConfig{
Port: 9715,
MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test
OptimizationVersion: optimizationVersion,
OptimizedVersioning: optimizedVersioning,
},
Interpreter: config.Interpreter{
VerboseDebug: false,
Expand Down
3 changes: 1 addition & 2 deletions integ/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"github.com/indeedeng/iwf/integ/workflow/locking"
config2 "github.com/indeedeng/iwf/service/common/config"
"github.com/indeedeng/iwf/service/common/ptr"
"io/ioutil"
"strconv"
Expand Down Expand Up @@ -66,7 +65,7 @@ func doTestLockingWorkflow(t *testing.T, backendType service.BackendType, config
_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
DisableFailAtMemoIncompatibility: true,
OptimizationVersion: ptr.Any(config2.OptimizationVersionV1),
OptimizedVersioning: ptr.Any(true),
})
defer closeFunc2()

Expand Down
3 changes: 1 addition & 2 deletions integ/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package integ
import (
"context"
"fmt"
config2 "github.com/indeedeng/iwf/service/common/config"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/indeedeng/iwf/service/common/timeparser"
"log"
Expand Down Expand Up @@ -108,7 +107,7 @@ func doTestPersistenceWorkflow(
uclient, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
MemoEncryption: memoEncryption,
OptimizationVersion: ptr.Any(config2.OptimizationVersionV1),
OptimizedVersioning: ptr.Any(true),
})
defer closeFunc2()

Expand Down
3 changes: 1 addition & 2 deletions integ/start_delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/integ/workflow/basic"
"github.com/indeedeng/iwf/service"
config2 "github.com/indeedeng/iwf/service/common/config"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/stretchr/testify/assert"
"strconv"
Expand Down Expand Up @@ -35,7 +34,7 @@ func doTestStartDelay(t *testing.T, backendType service.BackendType, config *iwf

_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
OptimizationVersion: ptr.Any(config2.OptimizationVersionNone),
OptimizedVersioning: ptr.Any(false),
})
defer closeFunc2()

Expand Down
10 changes: 5 additions & 5 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type IwfServiceTestConfig struct {
BackendType service.BackendType
MemoEncryption bool
DisableFailAtMemoIncompatibility bool // default to false so that we will fail at test
OptimizationVersion *int
OptimizedVersioning *bool
}

func startIwfService(backendType service.BackendType) (closeFunc func()) {
Expand Down Expand Up @@ -111,7 +111,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.OptimizationVersion), uclient, logger)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -123,7 +123,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.OptimizationVersion), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand All @@ -142,7 +142,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.OptimizationVersion), uclient, logger)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -154,7 +154,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.OptimizationVersion), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand Down
4 changes: 2 additions & 2 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

var sysSAs map[string]interface{}
if config.IsVersioningOptimized(s.config.Api.OptimizationVersion) {
if s.config.Api.OptimizedVersioning != nil && *s.config.Api.OptimizedVersioning {
sysSAs = map[string]interface{}{
service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType,
service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions,
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
Config: workflowConfig,
UseMemoForDataAttributes: useMemo,
WaitForCompletionStateExecutionIds: req.GetWaitForCompletionStateExecutionIds(),
OptimizationVersion: s.config.Api.OptimizationVersion,
OmitVersionMarker: s.config.Api.OptimizedVersioning,
}

runId, err := s.client.StartInterpreterWorkflow(ctx, workflowOptions, input)
Expand Down
15 changes: 2 additions & 13 deletions service/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type (
// Port is the port on which the API service will bind to
Port int `yaml:"port"`
MaxWaitSeconds int64 `yaml:"maxWaitSeconds"`
// OptimizationVersion is the flag to use optimization
OptimizationVersion *int `yaml:"optimizationVersion"`
// OptimizedVersioning is the versioning optimization flag
OptimizedVersioning *bool `yaml:"optimizedVersioning"`
}

Interpreter struct {
Expand Down Expand Up @@ -91,17 +91,6 @@ type (
}
)

const (
// OptimizationVersionNone is not doing any optimization. This is the default optimization version
OptimizationVersionNone = 0
// OptimizationVersionV1 is for optimizing the workflow actions(version marker, upsertSearchAttribute)
OptimizationVersionV1 = 1
)

func IsVersioningOptimized(version *int) bool {
return version != nil && *version >= OptimizationVersionV1
}

var DefaultWorkflowConfig = &iwfidl.WorkflowConfig{
ContinueAsNewThreshold: iwfidl.PtrInt32(100),
DisableSystemSearchAttribute: iwfidl.PtrBool(true),
Expand Down
2 changes: 1 addition & 1 deletion service/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type (

ContinueAsNewInput *ContinueAsNewInput `json:"continueAsNewInput,omitempty"`

OptimizationVersion *int `json:"optmn,omitempty"`
OmitVersionMarker *bool `json:"omtVers,omitempty"`
}

ContinueAsNewInput struct {
Expand Down
3 changes: 1 addition & 2 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
uclient "github.com/indeedeng/iwf/service/client"
"github.com/indeedeng/iwf/service/common/config"
"github.com/indeedeng/iwf/service/interpreter/env"
"time"

Expand All @@ -19,7 +18,7 @@ func InterpreterImpl(
ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput,
) (*service.InterpreterWorkflowOutput, error) {
var err error
globalVersioner, err := NewGlobalVersioner(provider, config.IsVersioningOptimized(input.OptimizationVersion), ctx)
globalVersioner, err := NewGlobalVersioner(provider, input.OmitVersionMarker != nil && *input.OmitVersionMarker, ctx)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit bf3ea11

Please sign in to comment.