diff --git a/config/config.go b/config/config.go index 0ae048a3..6bdfacea 100644 --- a/config/config.go +++ b/config/config.go @@ -26,8 +26,6 @@ type ( // Port is the port on which the API service will bind to Port int `yaml:"port"` MaxWaitSeconds int64 `yaml:"maxWaitSeconds"` - // OptimizedVersioning is the versioning optimization flag - OptimizedVersioning *bool `yaml:"optimizedVersioning"` // omitRpcInputOutputInHistory is the flag to omit rpc input/output in history // the input/output is only for debugging purpose but could be too expensive to store OmitRpcInputOutputInHistory *bool `yaml:"omitRpcInputOutputInHistory"` diff --git a/integ/config.go b/integ/config.go index e3c2bdf3..91258333 100644 --- a/integ/config.go +++ b/integ/config.go @@ -10,9 +10,8 @@ const testIwfServerPort = "9715" func createTestConfig(testCfg IwfServiceTestConfig) config.Config { return config.Config{ Api: config.ApiConfig{ - Port: 9715, - MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test - OptimizedVersioning: testCfg.OptimizedVersioning, + Port: 9715, + MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test WaitForStateCompletionMigration: config.WaitForStateCompletionMigration{ SignalWithStartOn: "old", WaitForOn: "old", diff --git a/integ/locking_test.go b/integ/locking_test.go index e9865641..3f0ee9a6 100644 --- a/integ/locking_test.go +++ b/integ/locking_test.go @@ -66,7 +66,6 @@ func doTestLockingWorkflow(t *testing.T, backendType service.BackendType, config _, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ BackendType: backendType, DisableFailAtMemoIncompatibility: true, - OptimizedVersioning: ptr.Any(true), }) defer closeFunc2() diff --git a/integ/persistence_test.go b/integ/persistence_test.go index e09e5b33..1cb3a580 100644 --- a/integ/persistence_test.go +++ b/integ/persistence_test.go @@ -106,9 +106,8 @@ func doTestPersistenceWorkflow( defer closeFunc1() uclient, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ - BackendType: backendType, - MemoEncryption: memoEncryption, - OptimizedVersioning: ptr.Any(true), + BackendType: backendType, + MemoEncryption: memoEncryption, }) defer closeFunc2() @@ -402,7 +401,10 @@ func doTestPersistenceWorkflow( } } -func getDataAttributes(initReqQry iwfidl.ApiApiV1WorkflowDataobjectsGetPostRequest, wfId string, expectedDataAttribute iwfidl.KeyValue, useMemo bool) (*iwfidl.WorkflowGetDataObjectsResponse, *http.Response, error) { +func getDataAttributes( + initReqQry iwfidl.ApiApiV1WorkflowDataobjectsGetPostRequest, wfId string, expectedDataAttribute iwfidl.KeyValue, + useMemo bool, +) (*iwfidl.WorkflowGetDataObjectsResponse, *http.Response, error) { return initReqQry.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{ WorkflowId: wfId, Keys: []string{ diff --git a/integ/start_delay_test.go b/integ/start_delay_test.go index 60c14b65..ec07442b 100644 --- a/integ/start_delay_test.go +++ b/integ/start_delay_test.go @@ -33,8 +33,7 @@ func doTestStartDelay(t *testing.T, backendType service.BackendType, config *iwf defer closeFunc1() _, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ - BackendType: backendType, - OptimizedVersioning: ptr.Any(false), + BackendType: backendType, }) defer closeFunc2() diff --git a/integ/util.go b/integ/util.go index 99fffe41..8affc720 100644 --- a/integ/util.go +++ b/integ/util.go @@ -66,7 +66,6 @@ type IwfServiceTestConfig struct { BackendType service.BackendType MemoEncryption bool DisableFailAtMemoIncompatibility bool // default to false so that we will fail at test - OptimizedVersioning *bool DefaultHeaders map[string]string } diff --git a/integ/wait_until_search_attributes_optimization_test.go b/integ/wait_until_search_attributes_optimization_test.go index ce9e5bf4..fcaade3f 100644 --- a/integ/wait_until_search_attributes_optimization_test.go +++ b/integ/wait_until_search_attributes_optimization_test.go @@ -52,8 +52,7 @@ func doTestWaitUntilHistoryCompleted( defer closeFunc1() uclient, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ - BackendType: backendType, - OptimizedVersioning: ptr.Any(true), + BackendType: backendType, }) defer closeFunc2() diff --git a/integ/wait_until_search_attributes_test.go b/integ/wait_until_search_attributes_test.go index 85b73552..a65ae6e7 100644 --- a/integ/wait_until_search_attributes_test.go +++ b/integ/wait_until_search_attributes_test.go @@ -47,8 +47,7 @@ func doTestWaitUntilSearchAttributes( defer closeFunc1() _, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ - BackendType: backendType, - OptimizedVersioning: ptr.Any(true), + BackendType: backendType, }) defer closeFunc2() diff --git a/service/api/service.go b/service/api/service.go index 7bb14dc2..17d8833c 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -7,7 +7,6 @@ import ( "fmt" "github.com/indeedeng/iwf/config" "github.com/indeedeng/iwf/service/interpreter/env" - "github.com/indeedeng/iwf/service/interpreter/versions" "math" "net/http" "os" @@ -57,16 +56,8 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( ) (wresp *iwfidl.WorkflowStartResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() - var sysSAs map[string]interface{} - if s.config.Api.OptimizedVersioning != nil && *s.config.Api.OptimizedVersioning { - sysSAs = map[string]interface{}{ - service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType, - service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions, - } - } else { - sysSAs = map[string]interface{}{ - service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType, - } + sysSAs := map[string]interface{}{ + service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType, } workflowOptions := uclient.StartWorkflowOptions{ @@ -156,7 +147,6 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( UseMemoForDataAttributes: useMemoForDAs, WaitForCompletionStateExecutionIds: req.GetWaitForCompletionStateExecutionIds(), WaitForCompletionStateIds: req.GetWaitForCompletionStateIds(), - OmitVersionMarker: s.config.Api.OptimizedVersioning, } runId, err := s.client.StartInterpreterWorkflow(ctx, workflowOptions, input) @@ -422,7 +412,8 @@ func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost( } func (s *serviceImpl) ApiV1WorkflowSetQueryAttributesPost( - ctx context.Context, req iwfidl.WorkflowSetDataObjectsRequest) (retError *errors.ErrorAndStatus) { + ctx context.Context, req iwfidl.WorkflowSetDataObjectsRequest, +) (retError *errors.ErrorAndStatus) { sigVal := service.ExecuteRpcSignalRequest{ UpsertDataObjects: req.Objects, } @@ -457,7 +448,9 @@ func (s *serviceImpl) ApiV1WorkflowGetSearchAttributesPost( }, nil } -func (s *serviceImpl) ApiV1WorkflowSetSearchAttributesPost(ctx context.Context, req iwfidl.WorkflowSetSearchAttributesRequest) (retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowSetSearchAttributesPost( + ctx context.Context, req iwfidl.WorkflowSetSearchAttributesRequest, +) (retError *errors.ErrorAndStatus) { sigVal := service.ExecuteRpcSignalRequest{ UpsertSearchAttributes: req.SearchAttributes, } diff --git a/service/interfaces.go b/service/interfaces.go index 2053ebe6..03b4afdc 100644 --- a/service/interfaces.go +++ b/service/interfaces.go @@ -32,8 +32,6 @@ type ( IsResumeFromContinueAsNew bool `json:"isResumeFromContinueAsNew,omitempty"` ContinueAsNewInput *ContinueAsNewInput `json:"continueAsNewInput,omitempty"` - - OmitVersionMarker *bool `json:"omtVers,omitempty"` } ContinueAsNewInput struct { diff --git a/service/interpreter/globalVersioner.go b/service/interpreter/globalVersioner.go index 96d3aad1..bca18a92 100644 --- a/service/interpreter/globalVersioner.go +++ b/service/interpreter/globalVersioner.go @@ -1,14 +1,19 @@ package interpreter import ( - "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" - "github.com/indeedeng/iwf/service/common/ptr" - "github.com/indeedeng/iwf/service/interpreter/versions" ) const globalChangeId = "global" +const StartingVersionUsingGlobalVersioning = 1 +const StartingVersionOptimizedUpsertSearchAttribute = 2 +const StartingVersionRenamedStateApi = 3 +const StartingVersionContinueAsNewOnNoStates = 4 +const StartingVersionTemporal26SDK = 5 +const StartingVersionExecutingStateIdMode = 6 +const MaxOfAllVersions = StartingVersionExecutingStateIdMode + // GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api type GlobalVersioner struct { workflowProvider WorkflowProvider @@ -18,60 +23,39 @@ type GlobalVersioner struct { } func NewGlobalVersioner( - workflowProvider WorkflowProvider, omitVersionMarker bool, ctx UnifiedContext, + workflowProvider WorkflowProvider, ctx UnifiedContext, ) (*GlobalVersioner, error) { - sas, err := workflowProvider.GetSearchAttributes(ctx, []iwfidl.SearchAttributeKeyAndType{ - {Key: ptr.Any(service.SearchAttributeGlobalVersion), - ValueType: ptr.Any(iwfidl.INT)}, - }) - if err != nil { - return nil, err - } - version := 0 - if omitVersionMarker { - // TODO: future improvement https://github.com/indeedeng/iwf/issues/369 - attribute, ok := sas[service.SearchAttributeGlobalVersion] - if !ok { - panic("search attribute global version is not found") - } - version = int(attribute.GetIntegerValue()) - if versions.MaxOfAllVersions < version { - panic("requesting for a version that is not supported, panic to retry in next workflow task") - } - } else { - version = workflowProvider.GetVersion(ctx, globalChangeId, 0, versions.MaxOfAllVersions) - } + version := workflowProvider.GetVersion(ctx, globalChangeId, 0, MaxOfAllVersions) return &GlobalVersioner{ - workflowProvider: workflowProvider, - ctx: ctx, - version: version, - OmitVersionMarker: omitVersionMarker, + workflowProvider: workflowProvider, + ctx: ctx, + version: version, }, nil } func (p *GlobalVersioner) IsAfterVersionOfContinueAsNewOnNoStates() bool { - return p.version >= versions.StartingVersionContinueAsNewOnNoStates + return p.version >= StartingVersionContinueAsNewOnNoStates } func (p *GlobalVersioner) IsAfterVersionOfUsingGlobalVersioning() bool { - return p.version >= versions.StartingVersionUsingGlobalVersioning + return p.version >= StartingVersionUsingGlobalVersioning } func (p *GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute() bool { - return p.version >= versions.StartingVersionOptimizedUpsertSearchAttribute + return p.version >= StartingVersionOptimizedUpsertSearchAttribute } func (p *GlobalVersioner) IsAfterVersionOfExecutingStateIdMode() bool { - return p.version >= versions.StartingVersionExecutingStateIdMode + return p.version >= StartingVersionExecutingStateIdMode } func (p *GlobalVersioner) IsAfterVersionOfRenamedStateApi() bool { - return p.version >= versions.StartingVersionRenamedStateApi + return p.version >= StartingVersionRenamedStateApi } func (p *GlobalVersioner) IsAfterVersionOfTemporal26SDK() bool { - return p.version >= versions.StartingVersionTemporal26SDK + return p.version >= StartingVersionTemporal26SDK } func (p *GlobalVersioner) UpsertGlobalVersionSearchAttribute() error { @@ -83,7 +67,7 @@ func (p *GlobalVersioner) UpsertGlobalVersionSearchAttribute() error { // https://github.com/uber-go/cadence-client/issues/1198 if p.workflowProvider.GetBackendType() != service.BackendTypeCadence { return p.workflowProvider.UpsertSearchAttributes(p.ctx, map[string]interface{}{ - service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions, + service.SearchAttributeGlobalVersion: MaxOfAllVersions, }) } return nil diff --git a/service/interpreter/versions/versions.go b/service/interpreter/versions/versions.go deleted file mode 100644 index 9203ca29..00000000 --- a/service/interpreter/versions/versions.go +++ /dev/null @@ -1,9 +0,0 @@ -package versions - -const StartingVersionUsingGlobalVersioning = 1 -const StartingVersionOptimizedUpsertSearchAttribute = 2 -const StartingVersionRenamedStateApi = 3 -const StartingVersionContinueAsNewOnNoStates = 4 -const StartingVersionTemporal26SDK = 5 -const StartingVersionExecutingStateIdMode = 6 -const MaxOfAllVersions = StartingVersionExecutingStateIdMode diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 189c57ec..9caa5d5c 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -45,7 +45,7 @@ func InterpreterImpl( var err error - globalVersioner, err := NewGlobalVersioner(provider, input.OmitVersionMarker != nil && *input.OmitVersionMarker, ctx) + globalVersioner, err := NewGlobalVersioner(provider, ctx) if err != nil { retErr = err return