From 5debdce14a8bd195557e3fb728904eac7a85bc85 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Fri, 23 Feb 2024 12:10:24 -0800 Subject: [PATCH] Expose missing config for setVersionAtStart --- integ/basic_test.go | 5 ++++- integ/config.go | 7 ++++--- integ/persistence_test.go | 4 +++- integ/util.go | 9 +++++---- service/api/service.go | 17 +++++++++++++---- service/common/config/config.go | 3 +++ service/interpreter/globalVersioner.go | 3 +++ 7 files changed, 35 insertions(+), 13 deletions(-) diff --git a/integ/basic_test.go b/integ/basic_test.go index b1b680e2..e01eb110 100644 --- a/integ/basic_test.go +++ b/integ/basic_test.go @@ -50,7 +50,10 @@ func doTestBasicWorkflow(t *testing.T, backendType service.BackendType, config * closeFunc1 := startWorkflowWorker(wfHandler) defer closeFunc1() - closeFunc2 := startIwfService(backendType) + _, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ + BackendType: backendType, + SetVersionAtStart: true, + }) defer closeFunc2() // start a workflow diff --git a/integ/config.go b/integ/config.go index 3d495e51..51a84975 100644 --- a/integ/config.go +++ b/integ/config.go @@ -7,11 +7,12 @@ import ( const testWorkflowServerPort = "9714" const testIwfServerPort = "9715" -func createTestConfig(failAtMemoCompatibility bool) config.Config { +func createTestConfig(failAtMemoCompatibility bool, setVersionAtStart bool) config.Config { return config.Config{ Api: config.ApiConfig{ - Port: 9715, - MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test + Port: 9715, + MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test + SetVersionAtStart: setVersionAtStart, }, Interpreter: config.Interpreter{ VerboseDebug: false, diff --git a/integ/persistence_test.go b/integ/persistence_test.go index a6a56f6a..8557a3c9 100644 --- a/integ/persistence_test.go +++ b/integ/persistence_test.go @@ -96,7 +96,9 @@ func TestPersistenceWorkflowCadenceContinueAsNew(t *testing.T) { } } -func doTestPersistenceWorkflow(t *testing.T, backendType service.BackendType, useMemo, memoEncryption bool, config *iwfidl.WorkflowConfig) { +func doTestPersistenceWorkflow( + t *testing.T, backendType service.BackendType, useMemo, memoEncryption bool, config *iwfidl.WorkflowConfig, +) { assertions := assert.New(t) wfHandler := persistence.NewHandler() closeFunc1 := startWorkflowWorker(wfHandler) diff --git a/integ/util.go b/integ/util.go index f7e590ff..6f22f7ee 100644 --- a/integ/util.go +++ b/integ/util.go @@ -66,6 +66,7 @@ type IwfServiceTestConfig struct { BackendType service.BackendType MemoEncryption bool DisableFailAtMemoIncompatibility bool // default to false so that we will fail at test + SetVersionAtStart bool } func startIwfService(backendType service.BackendType) (closeFunc func()) { @@ -110,7 +111,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U panic(err) } uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption) - iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility), uclient, logger) + iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, Handler: iwfService, @@ -122,7 +123,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U }() // start iwf interpreter worker - interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient) + interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient) interpreter.Start() return uclient, func() { iwfServer.Close() @@ -141,7 +142,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U panic(err) } uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc) - iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility), uclient, logger) + iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, Handler: iwfService, @@ -153,7 +154,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U }() // start iwf interpreter worker - interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient) + interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient) interpreter.Start() return uclient, func() { iwfServer.Close() diff --git a/service/api/service.go b/service/api/service.go index d59b8e8c..ea661c80 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -56,14 +56,23 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( ) (wresp *iwfidl.WorkflowStartResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() + var sysSAs map[string]interface{} + if s.config.Api.SetVersionAtStart { + sysSAs = map[string]interface{}{ + service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType, + service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions, + } + } else { + sysSAs = map[string]interface{}{ + service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType, + } + } + workflowOptions := uclient.StartWorkflowOptions{ ID: req.GetWorkflowId(), TaskQueue: s.taskQueue, WorkflowExecutionTimeout: time.Duration(req.WorkflowTimeoutSeconds) * time.Second, - SearchAttributes: map[string]interface{}{ - service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType, - service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions, - }, + SearchAttributes: sysSAs, } var workflowConfig iwfidl.WorkflowConfig diff --git a/service/common/config/config.go b/service/common/config/config.go index b9962f88..7952f725 100644 --- a/service/common/config/config.go +++ b/service/common/config/config.go @@ -26,6 +26,9 @@ type ( // Port is the port on which the API service will bind to Port int `yaml:"port"` MaxWaitSeconds int64 `yaml:"maxWaitSeconds"` + // SetVersionAtStart is the flag to set version at startAPI + // This is for optimizing the workflow actions(version marker, upsertSearchAttribute) + SetVersionAtStart bool `yaml:"setVersionAtStart"` } Interpreter struct { diff --git a/service/interpreter/globalVersioner.go b/service/interpreter/globalVersioner.go index f4801a4d..68ed023f 100644 --- a/service/interpreter/globalVersioner.go +++ b/service/interpreter/globalVersioner.go @@ -36,6 +36,9 @@ func NewGlobalVersioner(workflowProvider WorkflowProvider, ctx UnifiedContext) ( panic("search attribute global version is not found") } version = int(attribute.GetIntegerValue()) + if versions.MaxOfAllVersions < version { + panic("requesting for a version that is not supported, panic to retry in next workflow task") + } isFromStart = true }