From e77201899a9b324444b9508ff75503c0a10a5fee Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Mon, 1 Jan 2024 21:33:58 -0800 Subject: [PATCH] Default to continueAsNew with disable sys search attributes (#357) --- service/api/service.go | 81 +++++++++++++++++++++++++-------- service/common/config/config.go | 7 ++- 2 files changed, 67 insertions(+), 21 deletions(-) diff --git a/service/api/service.go b/service/api/service.go index 87270d6d..7766541e 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -39,7 +39,9 @@ func (s *serviceImpl) Close() { s.client.Close() } -func NewApiService(config config.Config, client uclient.UnifiedClient, taskQueue string, logger log.Logger) (ApiService, error) { +func NewApiService( + config config.Config, client uclient.UnifiedClient, taskQueue string, logger log.Logger, +) (ApiService, error) { return &serviceImpl{ client: client, taskQueue: taskQueue, @@ -48,7 +50,9 @@ func NewApiService(config config.Config, client uclient.UnifiedClient, taskQueue }, nil } -func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.WorkflowStartRequest) (wresp *iwfidl.WorkflowStartResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowStartPost( + ctx context.Context, req iwfidl.WorkflowStartRequest, +) (wresp *iwfidl.WorkflowStartResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() workflowOptions := uclient.StartWorkflowOptions{ @@ -60,9 +64,14 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.Wor }, } - var initCustomSAs []iwfidl.SearchAttribute - workflowConfig := s.config.Interpreter.DefaultWorkflowConfig + var workflowConfig iwfidl.WorkflowConfig + if s.config.Interpreter.DefaultWorkflowConfig == nil { + workflowConfig = *config.DefaultWorkflowConfig + } else { + workflowConfig = *s.config.Interpreter.DefaultWorkflowConfig + } + var initCustomSAs []iwfidl.SearchAttribute // workerUrl is always needed, for optimizing None as persistence loading type workflowOptions.Memo = map[string]interface{}{ service.WorkerUrlMemoKey: iwfidl.EncodedObject{ @@ -120,7 +129,9 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.Wor }, nil } -func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(ctx context.Context, req iwfidl.WorkflowWaitForStateCompletionRequest) (wresp *iwfidl.WorkflowWaitForStateCompletionResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion( + ctx context.Context, req iwfidl.WorkflowWaitForStateCompletionRequest, +) (wresp *iwfidl.WorkflowWaitForStateCompletionResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() workflowId := service.IwfSystemConstPrefix + req.WorkflowId + "_" + req.StateExecutionId @@ -151,7 +162,9 @@ func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(ctx context.Context, r }, nil } -func (s *serviceImpl) ApiV1WorkflowSignalPost(ctx context.Context, req iwfidl.WorkflowSignalRequest) (retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowSignalPost( + ctx context.Context, req iwfidl.WorkflowSignalRequest, +) (retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() err := s.client.SignalWorkflow(ctx, @@ -162,7 +175,9 @@ func (s *serviceImpl) ApiV1WorkflowSignalPost(ctx context.Context, req iwfidl.Wo return nil } -func (s *serviceImpl) ApiV1WorkflowConfigUpdate(ctx context.Context, req iwfidl.WorkflowConfigUpdateRequest) (retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowConfigUpdate( + ctx context.Context, req iwfidl.WorkflowConfigUpdateRequest, +) (retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() err := s.client.SignalWorkflow(ctx, @@ -173,7 +188,9 @@ func (s *serviceImpl) ApiV1WorkflowConfigUpdate(ctx context.Context, req iwfidl. return nil } -func (s *serviceImpl) ApiV1WorkflowStopPost(ctx context.Context, req iwfidl.WorkflowStopRequest) (retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowStopPost( + ctx context.Context, req iwfidl.WorkflowStopRequest, +) (retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() wfId := req.GetWorkflowId() @@ -206,7 +223,9 @@ func (s *serviceImpl) ApiV1WorkflowStopPost(ctx context.Context, req iwfidl.Work return nil } -func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost(ctx context.Context, req iwfidl.WorkflowGetDataObjectsRequest) (wresp *iwfidl.WorkflowGetDataObjectsResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost( + ctx context.Context, req iwfidl.WorkflowGetDataObjectsRequest, +) (wresp *iwfidl.WorkflowGetDataObjectsResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() var queryResp service.GetDataObjectsQueryResponse @@ -272,7 +291,9 @@ func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost(ctx context.Context, r }, nil } -func (s *serviceImpl) ApiV1WorkflowGetSearchAttributesPost(ctx context.Context, req iwfidl.WorkflowGetSearchAttributesRequest) (wresp *iwfidl.WorkflowGetSearchAttributesResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowGetSearchAttributesPost( + ctx context.Context, req iwfidl.WorkflowGetSearchAttributesRequest, +) (wresp *iwfidl.WorkflowGetSearchAttributesResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() response, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), req.Keys) @@ -293,13 +314,17 @@ func (s *serviceImpl) ApiV1WorkflowGetSearchAttributesPost(ctx context.Context, }, nil } -func (s *serviceImpl) ApiV1WorkflowGetPost(ctx context.Context, req iwfidl.WorkflowGetRequest) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowGetPost( + ctx context.Context, req iwfidl.WorkflowGetRequest, +) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() return s.doApiV1WorkflowGetPost(ctx, req, false) } -func (s *serviceImpl) ApiV1WorkflowGetWithWaitPost(ctx context.Context, req iwfidl.WorkflowGetRequest) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowGetWithWaitPost( + ctx context.Context, req iwfidl.WorkflowGetRequest, +) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() return s.doApiV1WorkflowGetPost(ctx, req, true) @@ -310,7 +335,9 @@ func (s *serviceImpl) ApiV1WorkflowGetWithWaitPost(ctx context.Context, req iwfi // because s.client.GetWorkflowResult will wait for the completion if workflow is running -- // when withWait is false, if workflow is not running and needResults is true, it will then call s.client.GetWorkflowResult to get results // when withWait is true, it will do everything -func (s *serviceImpl) doApiV1WorkflowGetPost(ctx context.Context, req iwfidl.WorkflowGetRequest, withWait bool) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) doApiV1WorkflowGetPost( + ctx context.Context, req iwfidl.WorkflowGetRequest, withWait bool, +) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) { descResp, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), nil) if err != nil { return nil, s.handleError(err) @@ -414,7 +441,9 @@ func (s *serviceImpl) doApiV1WorkflowGetPost(ctx context.Context, req iwfidl.Wor } -func (s *serviceImpl) ApiV1WorkflowSearchPost(ctx context.Context, req iwfidl.WorkflowSearchRequest) (wresp *iwfidl.WorkflowSearchResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowSearchPost( + ctx context.Context, req iwfidl.WorkflowSearchRequest, +) (wresp *iwfidl.WorkflowSearchResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() pageSize := int32(1000) @@ -435,7 +464,9 @@ func (s *serviceImpl) ApiV1WorkflowSearchPost(ctx context.Context, req iwfidl.Wo }, nil } -func (s *serviceImpl) ApiV1WorkflowRpcPost(ctx context.Context, req iwfidl.WorkflowRpcRequest) (wresp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowRpcPost( + ctx context.Context, req iwfidl.WorkflowRpcRequest, +) (wresp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() if needLocking(req) { @@ -496,7 +527,9 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost(ctx context.Context, req iwfidl.Workf return &iwfidl.WorkflowRpcResponse{Output: resp.Output}, nil } -func (s *serviceImpl) tryPrepareRPCbyDescribe(ctx context.Context, req iwfidl.WorkflowRpcRequest) (rpcPrep *service.PrepareRpcQueryResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) tryPrepareRPCbyDescribe( + ctx context.Context, req iwfidl.WorkflowRpcRequest, +) (rpcPrep *service.PrepareRpcQueryResponse, retError *errors.ErrorAndStatus) { var searchAttributes []iwfidl.SearchAttribute var dataAttributes []iwfidl.KeyValue @@ -612,7 +645,9 @@ func needLocking(req iwfidl.WorkflowRpcRequest) bool { return false } -func (s *serviceImpl) handleRpcBySynchronousUpdate(ctx context.Context, req iwfidl.WorkflowRpcRequest) (resp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) handleRpcBySynchronousUpdate( + ctx context.Context, req iwfidl.WorkflowRpcRequest, +) (resp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) { req.TimeoutSeconds = ptr.Any(utils.TrimRpcTimeoutSeconds(ctx, req)) var output interpreter.HandlerOutput err := s.client.SynchronousUpdateWorkflow(ctx, &output, req.GetWorkflowId(), req.GetWorkflowRunId(), service.ExecuteOptimisticLockingRpcUpdateType, req) @@ -641,7 +676,9 @@ func doNeedLocking(policy *iwfidl.PersistenceLoadingPolicy) bool { return false } -func (s *serviceImpl) ApiV1WorkflowResetPost(ctx context.Context, req iwfidl.WorkflowResetRequest) (wresp *iwfidl.WorkflowResetResponse, retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowResetPost( + ctx context.Context, req iwfidl.WorkflowResetRequest, +) (wresp *iwfidl.WorkflowResetResponse, retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() runId, err := s.client.ResetWorkflow(ctx, req) @@ -653,7 +690,9 @@ func (s *serviceImpl) ApiV1WorkflowResetPost(ctx context.Context, req iwfidl.Wor }, nil } -func (s *serviceImpl) ApiV1WorkflowSkipTimerPost(ctx context.Context, request iwfidl.WorkflowSkipTimerRequest) (retError *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowSkipTimerPost( + ctx context.Context, request iwfidl.WorkflowSkipTimerRequest, +) (retError *errors.ErrorAndStatus) { defer func() { log.CapturePanic(recover(), s.logger, &retError) }() if request.GetTimerCommandId() == "" && request.TimerCommandIndex == nil { @@ -681,7 +720,9 @@ func (s *serviceImpl) ApiV1WorkflowSkipTimerPost(ctx context.Context, request iw return nil } -func (s *serviceImpl) ApiV1WorkflowDumpPost(ctx context.Context, request iwfidl.WorkflowDumpRequest) (*iwfidl.WorkflowDumpResponse, *errors.ErrorAndStatus) { +func (s *serviceImpl) ApiV1WorkflowDumpPost( + ctx context.Context, request iwfidl.WorkflowDumpRequest, +) (*iwfidl.WorkflowDumpResponse, *errors.ErrorAndStatus) { var internals service.ContinueAsNewDumpResponse err := s.client.QueryWorkflow(ctx, &internals, request.GetWorkflowId(), request.GetWorkflowRunId(), service.ContinueAsNewDumpQueryType) diff --git a/service/common/config/config.go b/service/common/config/config.go index 247f7419..b9962f88 100644 --- a/service/common/config/config.go +++ b/service/common/config/config.go @@ -33,7 +33,7 @@ type ( Temporal *TemporalConfig `yaml:"temporal"` // Cadence config is the config to connect to Cadence Cadence *CadenceConfig `yaml:"cadence"` - DefaultWorkflowConfig iwfidl.WorkflowConfig `json:"defaultWorkflowConfig"` + DefaultWorkflowConfig *iwfidl.WorkflowConfig `json:"defaultWorkflowConfig"` InterpreterActivityConfig InterpreterActivityConfig `yaml:"interpreterActivityConfig"` VerboseDebug bool FailAtMemoIncompatibility bool @@ -89,6 +89,11 @@ type ( } ) +var DefaultWorkflowConfig = &iwfidl.WorkflowConfig{ + ContinueAsNewThreshold: iwfidl.PtrInt32(100), + DisableSystemSearchAttribute: iwfidl.PtrBool(true), +} + // NewConfig returns a new decoded Config struct func NewConfig(configPath string) (*Config, error) { log.Printf("Loading configFile=%v\n", configPath)