Skip to content

Commit

Permalink
Default to continueAsNew with disable sys search attributes (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Jan 2, 2024
1 parent 8e57a72 commit e772018
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 21 deletions.
81 changes: 61 additions & 20 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (s *serviceImpl) Close() {
s.client.Close()
}

func NewApiService(config config.Config, client uclient.UnifiedClient, taskQueue string, logger log.Logger) (ApiService, error) {
func NewApiService(
config config.Config, client uclient.UnifiedClient, taskQueue string, logger log.Logger,
) (ApiService, error) {
return &serviceImpl{
client: client,
taskQueue: taskQueue,
Expand All @@ -48,7 +50,9 @@ func NewApiService(config config.Config, client uclient.UnifiedClient, taskQueue
}, nil
}

func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.WorkflowStartRequest) (wresp *iwfidl.WorkflowStartResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowStartPost(
ctx context.Context, req iwfidl.WorkflowStartRequest,
) (wresp *iwfidl.WorkflowStartResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

workflowOptions := uclient.StartWorkflowOptions{
Expand All @@ -60,9 +64,14 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.Wor
},
}

var initCustomSAs []iwfidl.SearchAttribute
workflowConfig := s.config.Interpreter.DefaultWorkflowConfig
var workflowConfig iwfidl.WorkflowConfig
if s.config.Interpreter.DefaultWorkflowConfig == nil {
workflowConfig = *config.DefaultWorkflowConfig
} else {
workflowConfig = *s.config.Interpreter.DefaultWorkflowConfig
}

var initCustomSAs []iwfidl.SearchAttribute
// workerUrl is always needed, for optimizing None as persistence loading type
workflowOptions.Memo = map[string]interface{}{
service.WorkerUrlMemoKey: iwfidl.EncodedObject{
Expand Down Expand Up @@ -120,7 +129,9 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(ctx context.Context, req iwfidl.Wor
}, nil
}

func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(ctx context.Context, req iwfidl.WorkflowWaitForStateCompletionRequest) (wresp *iwfidl.WorkflowWaitForStateCompletionResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(
ctx context.Context, req iwfidl.WorkflowWaitForStateCompletionRequest,
) (wresp *iwfidl.WorkflowWaitForStateCompletionResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

workflowId := service.IwfSystemConstPrefix + req.WorkflowId + "_" + req.StateExecutionId
Expand Down Expand Up @@ -151,7 +162,9 @@ func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(ctx context.Context, r
}, nil
}

func (s *serviceImpl) ApiV1WorkflowSignalPost(ctx context.Context, req iwfidl.WorkflowSignalRequest) (retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowSignalPost(
ctx context.Context, req iwfidl.WorkflowSignalRequest,
) (retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

err := s.client.SignalWorkflow(ctx,
Expand All @@ -162,7 +175,9 @@ func (s *serviceImpl) ApiV1WorkflowSignalPost(ctx context.Context, req iwfidl.Wo
return nil
}

func (s *serviceImpl) ApiV1WorkflowConfigUpdate(ctx context.Context, req iwfidl.WorkflowConfigUpdateRequest) (retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowConfigUpdate(
ctx context.Context, req iwfidl.WorkflowConfigUpdateRequest,
) (retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

err := s.client.SignalWorkflow(ctx,
Expand All @@ -173,7 +188,9 @@ func (s *serviceImpl) ApiV1WorkflowConfigUpdate(ctx context.Context, req iwfidl.
return nil
}

func (s *serviceImpl) ApiV1WorkflowStopPost(ctx context.Context, req iwfidl.WorkflowStopRequest) (retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowStopPost(
ctx context.Context, req iwfidl.WorkflowStopRequest,
) (retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

wfId := req.GetWorkflowId()
Expand Down Expand Up @@ -206,7 +223,9 @@ func (s *serviceImpl) ApiV1WorkflowStopPost(ctx context.Context, req iwfidl.Work
return nil
}

func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost(ctx context.Context, req iwfidl.WorkflowGetDataObjectsRequest) (wresp *iwfidl.WorkflowGetDataObjectsResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost(
ctx context.Context, req iwfidl.WorkflowGetDataObjectsRequest,
) (wresp *iwfidl.WorkflowGetDataObjectsResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

var queryResp service.GetDataObjectsQueryResponse
Expand Down Expand Up @@ -272,7 +291,9 @@ func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost(ctx context.Context, r
}, nil
}

func (s *serviceImpl) ApiV1WorkflowGetSearchAttributesPost(ctx context.Context, req iwfidl.WorkflowGetSearchAttributesRequest) (wresp *iwfidl.WorkflowGetSearchAttributesResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowGetSearchAttributesPost(
ctx context.Context, req iwfidl.WorkflowGetSearchAttributesRequest,
) (wresp *iwfidl.WorkflowGetSearchAttributesResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

response, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), req.Keys)
Expand All @@ -293,13 +314,17 @@ func (s *serviceImpl) ApiV1WorkflowGetSearchAttributesPost(ctx context.Context,
}, nil
}

func (s *serviceImpl) ApiV1WorkflowGetPost(ctx context.Context, req iwfidl.WorkflowGetRequest) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowGetPost(
ctx context.Context, req iwfidl.WorkflowGetRequest,
) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

return s.doApiV1WorkflowGetPost(ctx, req, false)
}

func (s *serviceImpl) ApiV1WorkflowGetWithWaitPost(ctx context.Context, req iwfidl.WorkflowGetRequest) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowGetWithWaitPost(
ctx context.Context, req iwfidl.WorkflowGetRequest,
) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

return s.doApiV1WorkflowGetPost(ctx, req, true)
Expand All @@ -310,7 +335,9 @@ func (s *serviceImpl) ApiV1WorkflowGetWithWaitPost(ctx context.Context, req iwfi
// because s.client.GetWorkflowResult will wait for the completion if workflow is running --
// when withWait is false, if workflow is not running and needResults is true, it will then call s.client.GetWorkflowResult to get results
// when withWait is true, it will do everything
func (s *serviceImpl) doApiV1WorkflowGetPost(ctx context.Context, req iwfidl.WorkflowGetRequest, withWait bool) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) doApiV1WorkflowGetPost(
ctx context.Context, req iwfidl.WorkflowGetRequest, withWait bool,
) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) {
descResp, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), nil)
if err != nil {
return nil, s.handleError(err)
Expand Down Expand Up @@ -414,7 +441,9 @@ func (s *serviceImpl) doApiV1WorkflowGetPost(ctx context.Context, req iwfidl.Wor

}

func (s *serviceImpl) ApiV1WorkflowSearchPost(ctx context.Context, req iwfidl.WorkflowSearchRequest) (wresp *iwfidl.WorkflowSearchResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowSearchPost(
ctx context.Context, req iwfidl.WorkflowSearchRequest,
) (wresp *iwfidl.WorkflowSearchResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

pageSize := int32(1000)
Expand All @@ -435,7 +464,9 @@ func (s *serviceImpl) ApiV1WorkflowSearchPost(ctx context.Context, req iwfidl.Wo
}, nil
}

func (s *serviceImpl) ApiV1WorkflowRpcPost(ctx context.Context, req iwfidl.WorkflowRpcRequest) (wresp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowRpcPost(
ctx context.Context, req iwfidl.WorkflowRpcRequest,
) (wresp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

if needLocking(req) {
Expand Down Expand Up @@ -496,7 +527,9 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost(ctx context.Context, req iwfidl.Workf
return &iwfidl.WorkflowRpcResponse{Output: resp.Output}, nil
}

func (s *serviceImpl) tryPrepareRPCbyDescribe(ctx context.Context, req iwfidl.WorkflowRpcRequest) (rpcPrep *service.PrepareRpcQueryResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) tryPrepareRPCbyDescribe(
ctx context.Context, req iwfidl.WorkflowRpcRequest,
) (rpcPrep *service.PrepareRpcQueryResponse, retError *errors.ErrorAndStatus) {
var searchAttributes []iwfidl.SearchAttribute
var dataAttributes []iwfidl.KeyValue

Expand Down Expand Up @@ -612,7 +645,9 @@ func needLocking(req iwfidl.WorkflowRpcRequest) bool {
return false
}

func (s *serviceImpl) handleRpcBySynchronousUpdate(ctx context.Context, req iwfidl.WorkflowRpcRequest) (resp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) handleRpcBySynchronousUpdate(
ctx context.Context, req iwfidl.WorkflowRpcRequest,
) (resp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) {
req.TimeoutSeconds = ptr.Any(utils.TrimRpcTimeoutSeconds(ctx, req))
var output interpreter.HandlerOutput
err := s.client.SynchronousUpdateWorkflow(ctx, &output, req.GetWorkflowId(), req.GetWorkflowRunId(), service.ExecuteOptimisticLockingRpcUpdateType, req)
Expand Down Expand Up @@ -641,7 +676,9 @@ func doNeedLocking(policy *iwfidl.PersistenceLoadingPolicy) bool {
return false
}

func (s *serviceImpl) ApiV1WorkflowResetPost(ctx context.Context, req iwfidl.WorkflowResetRequest) (wresp *iwfidl.WorkflowResetResponse, retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowResetPost(
ctx context.Context, req iwfidl.WorkflowResetRequest,
) (wresp *iwfidl.WorkflowResetResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

runId, err := s.client.ResetWorkflow(ctx, req)
Expand All @@ -653,7 +690,9 @@ func (s *serviceImpl) ApiV1WorkflowResetPost(ctx context.Context, req iwfidl.Wor
}, nil
}

func (s *serviceImpl) ApiV1WorkflowSkipTimerPost(ctx context.Context, request iwfidl.WorkflowSkipTimerRequest) (retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowSkipTimerPost(
ctx context.Context, request iwfidl.WorkflowSkipTimerRequest,
) (retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

if request.GetTimerCommandId() == "" && request.TimerCommandIndex == nil {
Expand Down Expand Up @@ -681,7 +720,9 @@ func (s *serviceImpl) ApiV1WorkflowSkipTimerPost(ctx context.Context, request iw
return nil
}

func (s *serviceImpl) ApiV1WorkflowDumpPost(ctx context.Context, request iwfidl.WorkflowDumpRequest) (*iwfidl.WorkflowDumpResponse, *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowDumpPost(
ctx context.Context, request iwfidl.WorkflowDumpRequest,
) (*iwfidl.WorkflowDumpResponse, *errors.ErrorAndStatus) {
var internals service.ContinueAsNewDumpResponse

err := s.client.QueryWorkflow(ctx, &internals, request.GetWorkflowId(), request.GetWorkflowRunId(), service.ContinueAsNewDumpQueryType)
Expand Down
7 changes: 6 additions & 1 deletion service/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type (
Temporal *TemporalConfig `yaml:"temporal"`
// Cadence config is the config to connect to Cadence
Cadence *CadenceConfig `yaml:"cadence"`
DefaultWorkflowConfig iwfidl.WorkflowConfig `json:"defaultWorkflowConfig"`
DefaultWorkflowConfig *iwfidl.WorkflowConfig `json:"defaultWorkflowConfig"`
InterpreterActivityConfig InterpreterActivityConfig `yaml:"interpreterActivityConfig"`
VerboseDebug bool
FailAtMemoIncompatibility bool
Expand Down Expand Up @@ -89,6 +89,11 @@ type (
}
)

var DefaultWorkflowConfig = &iwfidl.WorkflowConfig{
ContinueAsNewThreshold: iwfidl.PtrInt32(100),
DisableSystemSearchAttribute: iwfidl.PtrBool(true),
}

// NewConfig returns a new decoded Config struct
func NewConfig(configPath string) (*Config, error) {
log.Printf("Loading configFile=%v\n", configPath)
Expand Down

0 comments on commit e772018

Please sign in to comment.