From 28d9b03d75782017e683b6fac881da03c4ceed2d Mon Sep 17 00:00:00 2001 From: jbowers Date: Thu, 19 Dec 2024 12:34:52 -0600 Subject: [PATCH 1/4] IWF-274: move packages for multiple timer impl --- service/api/service.go | 9 +-- service/interpreter/activityImpl.go | 21 +++--- service/interpreter/activityImpl_test.go | 5 +- .../interpreter/cadence/activityProvider.go | 12 +-- service/interpreter/cadence/workflow.go | 5 +- .../interpreter/cadence/workflowProvider.go | 68 ++++++++--------- service/interpreter/continueAsNewCounter.go | 8 +- service/interpreter/continueAsNewer.go | 19 ++--- service/interpreter/globalVersioner.go | 7 +- .../{ => interfaces}/interfaces.go | 14 +++- .../{ => interfaces}/interfaces_mock.go | 2 +- service/interpreter/persistence.go | 17 +++-- service/interpreter/queryHandler.go | 3 +- service/interpreter/signalReceiver.go | 25 ++++--- service/interpreter/stateExecutionCounter.go | 9 ++- .../interpreter/temporal/activityProvider.go | 12 +-- service/interpreter/temporal/workflow.go | 5 +- .../interpreter/temporal/workflowProvider.go | 74 +++++++++---------- .../simpleTimerProcessor.go} | 33 +++++---- service/interpreter/workflowImpl.go | 44 +++++------ service/interpreter/workflowUpdater.go | 19 ++--- 21 files changed, 218 insertions(+), 193 deletions(-) rename service/interpreter/{ => interfaces}/interfaces.go (88%) rename service/interpreter/{ => interfaces}/interfaces_mock.go (99%) rename service/interpreter/{timerProcessor.go => timers/simpleTimerProcessor.go} (84%) diff --git a/service/api/service.go b/service/api/service.go index fdf53e7a..2be54a4f 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -6,6 +6,7 @@ import ( "github.com/indeedeng/iwf/config" "github.com/indeedeng/iwf/service/common/event" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "net/http" "os" "strings" @@ -13,15 +14,13 @@ import ( uclient "github.com/indeedeng/iwf/service/client" "github.com/indeedeng/iwf/service/common/compatibility" - "github.com/indeedeng/iwf/service/common/rpc" - "github.com/indeedeng/iwf/service/common/utils" - "github.com/indeedeng/iwf/service/interpreter" - "github.com/indeedeng/iwf/service/common/errors" "github.com/indeedeng/iwf/service/common/log" "github.com/indeedeng/iwf/service/common/log/tag" "github.com/indeedeng/iwf/service/common/mapper" "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/common/rpc" + "github.com/indeedeng/iwf/service/common/utils" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" @@ -801,7 +800,7 @@ func (s *serviceImpl) handleRpcBySynchronousUpdate( ctx context.Context, req iwfidl.WorkflowRpcRequest, ) (resp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) { req.TimeoutSeconds = ptr.Any(utils.TrimRpcTimeoutSeconds(ctx, req)) - var output interpreter.HandlerOutput + var output interfaces.HandlerOutput err := s.client.SynchronousUpdateWorkflow(ctx, &output, req.GetWorkflowId(), req.GetWorkflowRunId(), service.ExecuteOptimisticLockingRpcUpdateType, req) if err != nil { errType := s.client.GetApplicationErrorTypeIfIsApplicationError(err) diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index da950002..3106b8bc 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -12,6 +12,7 @@ import ( "github.com/indeedeng/iwf/service/common/rpc" "github.com/indeedeng/iwf/service/common/urlautofix" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "io" "net/http" "os" @@ -30,7 +31,7 @@ func StateApiWaitUntil( ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput, ) (*iwfidl.WorkflowStateStartResponse, error) { stateApiWaitUntilStartTime := time.Now().UnixMilli() - provider := getActivityProviderByType(backendType) + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("StateWaitUntilActivity", "input", log.ToJsonAndTruncateForLogging(input)) iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl) @@ -115,7 +116,7 @@ func StateApiExecute( input service.StateDecideActivityInput, ) (*iwfidl.WorkflowStateDecideResponse, error) { stateApiExecuteStartTime := time.Now().UnixMilli() - provider := getActivityProviderByType(backendType) + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("StateExecuteActivity", "input", log.ToJsonAndTruncateForLogging(input)) @@ -197,20 +198,20 @@ func checkStateDecisionFromResponse(resp *iwfidl.WorkflowStateDecideResponse) er return nil } -func printDebugMsg(logger UnifiedLogger, err error, url string) { +func printDebugMsg(logger interfaces.UnifiedLogger, err error, url string) { debugMode := os.Getenv(service.EnvNameDebugMode) if debugMode != "" { logger.Info("check error at http request", err, url) } } -func composeStartApiRespError(provider ActivityProvider, err error, resp *iwfidl.WorkflowStateStartResponse) error { +func composeStartApiRespError(provider interfaces.ActivityProvider, err error, resp *iwfidl.WorkflowStateStartResponse) error { respStr, _ := resp.MarshalJSON() return provider.NewApplicationError(string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE), fmt.Sprintf("err msg: %v, response: %v", err, string(respStr))) } -func composeExecuteApiRespError(provider ActivityProvider, err error, resp *iwfidl.WorkflowStateDecideResponse) error { +func composeExecuteApiRespError(provider interfaces.ActivityProvider, err error, resp *iwfidl.WorkflowStateDecideResponse) error { respStr, _ := resp.MarshalJSON() return provider.NewApplicationError(string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE), fmt.Sprintf("err msg: %v, response: %v", err, string(respStr))) @@ -224,7 +225,7 @@ func checkHttpError(err error, httpResp *http.Response) bool { } func composeHttpError( - isLocalActivity bool, provider ActivityProvider, err error, httpResp *http.Response, errType string, + isLocalActivity bool, provider interfaces.ActivityProvider, err error, httpResp *http.Response, errType string, ) error { responseBody := "None" var statusCode int @@ -329,7 +330,7 @@ func listTimerSignalInternalChannelCommandIds(commandReq *iwfidl.CommandRequest) func DumpWorkflowInternal( ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest, ) (*iwfidl.WorkflowDumpResponse, error) { - provider := getActivityProviderByType(backendType) + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("DumpWorkflowInternalActivity", "input", log.ToJsonAndTruncateForLogging(req)) @@ -357,15 +358,15 @@ func DumpWorkflowInternal( func InvokeWorkerRpc( ctx context.Context, backendType service.BackendType, rpcPrep *service.PrepareRpcQueryResponse, req iwfidl.WorkflowRpcRequest, -) (*InvokeRpcActivityOutput, error) { - provider := getActivityProviderByType(backendType) +) (*interfaces.InvokeRpcActivityOutput, error) { + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("InvokeWorkerRpcActivity", "input", log.ToJsonAndTruncateForLogging(req)) apiMaxSeconds := env.GetSharedConfig().Api.MaxWaitSeconds resp, statusErr := rpc.InvokeWorkerRpc(ctx, rpcPrep, req, apiMaxSeconds) - return &InvokeRpcActivityOutput{ + return &interfaces.InvokeRpcActivityOutput{ RpcOutput: resp, StatusError: statusErr, }, nil diff --git a/service/interpreter/activityImpl_test.go b/service/interpreter/activityImpl_test.go index 4a4e2885..428085ba 100644 --- a/service/interpreter/activityImpl_test.go +++ b/service/interpreter/activityImpl_test.go @@ -6,6 +6,7 @@ import ( "github.com/golang/mock/gomock" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "github.com/stretchr/testify/assert" "io" "net/http" @@ -195,9 +196,9 @@ func TestComposeHttpError_RegularActivity_NilResponse(t *testing.T) { assert.Equal(t, returnedError, err) } -func createTestComposeHttpErrorInitialState(t *testing.T, httpError string, initialError string) (*MockActivityProvider, *http.Response, error) { +func createTestComposeHttpErrorInitialState(t *testing.T, httpError string, initialError string) (*interfaces.MockActivityProvider, *http.Response, error) { ctrl := gomock.NewController(t) - mockActivityProvider := NewMockActivityProvider(ctrl) + mockActivityProvider := interfaces.NewMockActivityProvider(ctrl) var httpResp *http.Response = nil if httpError != "" { diff --git a/service/interpreter/cadence/activityProvider.go b/service/interpreter/cadence/activityProvider.go index b8a4496c..6fb7fb3b 100644 --- a/service/interpreter/cadence/activityProvider.go +++ b/service/interpreter/cadence/activityProvider.go @@ -3,7 +3,7 @@ package cadence import ( "context" "github.com/indeedeng/iwf/service" - "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.uber.org/cadence" "go.uber.org/cadence/activity" ) @@ -11,27 +11,27 @@ import ( type activityProvider struct{} func init() { - interpreter.RegisterActivityProvider(service.BackendTypeCadence, &activityProvider{}) + interfaces.RegisterActivityProvider(service.BackendTypeCadence, &activityProvider{}) } func (a *activityProvider) NewApplicationError(errType string, details interface{}) error { return cadence.NewCustomError(errType, details) } -func (a *activityProvider) GetLogger(ctx context.Context) interpreter.UnifiedLogger { +func (a *activityProvider) GetLogger(ctx context.Context) interfaces.UnifiedLogger { zLogger := activity.GetLogger(ctx) return &loggerImpl{ zlogger: zLogger, } } -func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.ActivityInfo { +func (a *activityProvider) GetActivityInfo(ctx context.Context) interfaces.ActivityInfo { info := activity.GetInfo(ctx) - return interpreter.ActivityInfo{ + return interfaces.ActivityInfo{ ScheduledTime: info.ScheduledTimestamp, Attempt: info.Attempt + 1, // NOTE increase by one to match Temporal IsLocalActivity: false, // TODO cadence doesn't support this yet - WorkflowExecution: interpreter.WorkflowExecution{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, diff --git a/service/interpreter/cadence/workflow.go b/service/interpreter/cadence/workflow.go index 107d76e0..9c9512e1 100644 --- a/service/interpreter/cadence/workflow.go +++ b/service/interpreter/cadence/workflow.go @@ -3,13 +3,14 @@ package cadence import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.uber.org/cadence/workflow" ) func Interpreter(ctx workflow.Context, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) { - return interpreter.InterpreterImpl(interpreter.NewUnifiedContext(ctx), newCadenceWorkflowProvider(), input) + return interpreter.InterpreterImpl(interfaces.NewUnifiedContext(ctx), newCadenceWorkflowProvider(), input) } func WaitforStateCompletionWorkflow(ctx workflow.Context) (*service.WaitForStateCompletionWorkflowOutput, error) { - return interpreter.WaitForStateCompletionWorkflowImpl(interpreter.NewUnifiedContext(ctx), newCadenceWorkflowProvider()) + return interpreter.WaitForStateCompletionWorkflowImpl(interfaces.NewUnifiedContext(ctx), newCadenceWorkflowProvider()) } diff --git a/service/interpreter/cadence/workflowProvider.go b/service/interpreter/cadence/workflowProvider.go index 996f4e89..925b4f8d 100644 --- a/service/interpreter/cadence/workflowProvider.go +++ b/service/interpreter/cadence/workflowProvider.go @@ -3,12 +3,12 @@ package cadence import ( "fmt" "github.com/indeedeng/iwf/service/common/mapper" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/retry" - "github.com/indeedeng/iwf/service/interpreter" "go.uber.org/cadence" "go.uber.org/cadence/workflow" ) @@ -18,7 +18,7 @@ type workflowProvider struct { pendingThreadNames map[string]int } -func newCadenceWorkflowProvider() interpreter.WorkflowProvider { +func newCadenceWorkflowProvider() interfaces.WorkflowProvider { return &workflowProvider{ pendingThreadNames: map[string]int{}, } @@ -38,7 +38,7 @@ func (w *workflowProvider) IsApplicationError(err error) bool { } func (w *workflowProvider) NewInterpreterContinueAsNewError( - ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput, + ctx interfaces.UnifiedContext, input service.InterpreterWorkflowInput, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -48,7 +48,7 @@ func (w *workflowProvider) NewInterpreterContinueAsNewError( } func (w *workflowProvider) UpsertSearchAttributes( - ctx interpreter.UnifiedContext, attributes map[string]interface{}, + ctx interfaces.UnifiedContext, attributes map[string]interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -57,11 +57,11 @@ func (w *workflowProvider) UpsertSearchAttributes( return workflow.UpsertSearchAttributes(wfCtx, attributes) } -func (w *workflowProvider) UpsertMemo(ctx interpreter.UnifiedContext, memo map[string]iwfidl.EncodedObject) error { +func (w *workflowProvider) UpsertMemo(ctx interfaces.UnifiedContext, memo map[string]iwfidl.EncodedObject) error { return fmt.Errorf("upsert memo is not supported in Cadence") } -func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Duration) interpreter.Future { +func (w *workflowProvider) NewTimer(ctx interfaces.UnifiedContext, d time.Duration) interfaces.Future { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -72,14 +72,14 @@ func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Durat } } -func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) interpreter.WorkflowInfo { +func (w *workflowProvider) GetWorkflowInfo(ctx interfaces.UnifiedContext) interfaces.WorkflowInfo { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") } info := workflow.GetInfo(wfCtx) - return interpreter.WorkflowInfo{ - WorkflowExecution: interpreter.WorkflowExecution{ + return interfaces.WorkflowInfo{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, @@ -91,7 +91,7 @@ func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) inter } func (w *workflowProvider) GetSearchAttributes( - ctx interpreter.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, + ctx interfaces.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, ) (map[string]iwfidl.SearchAttribute, error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -103,7 +103,7 @@ func (w *workflowProvider) GetSearchAttributes( } func (w *workflowProvider) SetQueryHandler( - ctx interpreter.UnifiedContext, queryType string, handler interface{}, + ctx interfaces.UnifiedContext, queryType string, handler interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -113,32 +113,32 @@ func (w *workflowProvider) SetQueryHandler( } func (w *workflowProvider) SetRpcUpdateHandler( - ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, - handler interpreter.UnifiedRpcHandler, + ctx interfaces.UnifiedContext, updateType string, validator interfaces.UnifiedRpcValidator, + handler interfaces.UnifiedRpcHandler, ) error { // NOTE: this feature is not available in Cadence return nil } func (w *workflowProvider) ExtendContextWithValue( - parent interpreter.UnifiedContext, key string, val interface{}, -) interpreter.UnifiedContext { + parent interfaces.UnifiedContext, key string, val interface{}, +) interfaces.UnifiedContext { wfCtx, ok := parent.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") } - return interpreter.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) + return interfaces.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) } func (w *workflowProvider) GoNamed( - ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext), + ctx interfaces.UnifiedContext, name string, f func(ctx interfaces.UnifiedContext), ) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") } f2 := func(ctx workflow.Context) { - ctx2 := interpreter.NewUnifiedContext(ctx) + ctx2 := interfaces.NewUnifiedContext(ctx) w.pendingThreadNames[name]++ w.threadCount++ f(ctx2) @@ -159,7 +159,7 @@ func (w *workflowProvider) GetThreadCount() int { return w.threadCount } -func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func() bool) error { +func (w *workflowProvider) Await(ctx interfaces.UnifiedContext, condition func() bool) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -168,8 +168,8 @@ func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func( } func (w *workflowProvider) WithActivityOptions( - ctx interpreter.UnifiedContext, options interpreter.ActivityOptions, -) interpreter.UnifiedContext { + ctx interfaces.UnifiedContext, options interfaces.ActivityOptions, +) interfaces.UnifiedContext { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -190,7 +190,7 @@ func (w *workflowProvider) WithActivityOptions( ScheduleToCloseTimeout: time.Second * 7, RetryPolicy: retry.ConvertCadenceActivityRetryPolicy(options.RetryPolicy), }) - return interpreter.NewUnifiedContext(wfCtx3) + return interfaces.NewUnifiedContext(wfCtx3) } type futureImpl struct { @@ -201,7 +201,7 @@ func (t *futureImpl) IsReady() bool { return t.future.IsReady() } -func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error { +func (t *futureImpl) Get(ctx interfaces.UnifiedContext, valuePtr interface{}) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -212,7 +212,7 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e func (w *workflowProvider) ExecuteActivity( valuePtr interface{}, optimizeByLocalActivity bool, - ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}, + ctx interfaces.UnifiedContext, activity interface{}, args ...interface{}, ) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -232,7 +232,7 @@ func (w *workflowProvider) ExecuteActivity( return f.Get(wfCtx, valuePtr) } -func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { +func (w *workflowProvider) Now(ctx interfaces.UnifiedContext) time.Time { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -240,7 +240,7 @@ func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { return workflow.Now(wfCtx) } -func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { +func (w *workflowProvider) IsReplaying(ctx interfaces.UnifiedContext) bool { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -248,7 +248,7 @@ func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { return workflow.IsReplaying(wfCtx) } -func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration) (err error) { +func (w *workflowProvider) Sleep(ctx interfaces.UnifiedContext, d time.Duration) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -257,7 +257,7 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration } func (w *workflowProvider) GetVersion( - ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int, + ctx interfaces.UnifiedContext, changeID string, minSupported, maxSupported int, ) int { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -276,7 +276,7 @@ func (t *cadenceReceiveChannel) ReceiveAsync(valuePtr interface{}) (ok bool) { return t.channel.ReceiveAsync(valuePtr) } -func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, valuePtr interface{}) (ok bool) { +func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interfaces.UnifiedContext, valuePtr interface{}) (ok bool) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -286,8 +286,8 @@ func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, } func (w *workflowProvider) GetSignalChannel( - ctx interpreter.UnifiedContext, signalName string, -) interpreter.ReceiveChannel { + ctx interfaces.UnifiedContext, signalName string, +) interfaces.ReceiveChannel { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -298,7 +298,7 @@ func (w *workflowProvider) GetSignalChannel( } } -func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key string) interface{} { +func (w *workflowProvider) GetContextValue(ctx interfaces.UnifiedContext, key string) interface{} { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -306,7 +306,7 @@ func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key s return wfCtx.Value(key) } -func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter.UnifiedLogger { +func (w *workflowProvider) GetLogger(ctx interfaces.UnifiedContext) interfaces.UnifiedLogger { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -318,7 +318,7 @@ func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter } } -func (w *workflowProvider) GetUnhandledSignalNames(ctx interpreter.UnifiedContext) []string { +func (w *workflowProvider) GetUnhandledSignalNames(ctx interfaces.UnifiedContext) []string { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") diff --git a/service/interpreter/continueAsNewCounter.go b/service/interpreter/continueAsNewCounter.go index 279c7515..e40e8095 100644 --- a/service/interpreter/continueAsNewCounter.go +++ b/service/interpreter/continueAsNewCounter.go @@ -1,5 +1,7 @@ package interpreter +import "github.com/indeedeng/iwf/service/interpreter/interfaces" + type ContinueAsNewCounter struct { executedStateApis int32 signalsReceived int32 @@ -7,12 +9,12 @@ type ContinueAsNewCounter struct { triggeredByAPI bool configer *WorkflowConfiger - rootCtx UnifiedContext - provider WorkflowProvider + rootCtx interfaces.UnifiedContext + provider interfaces.WorkflowProvider } func NewContinueAsCounter( - configer *WorkflowConfiger, rootCtx UnifiedContext, provider WorkflowProvider, + configer *WorkflowConfiger, rootCtx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, ) *ContinueAsNewCounter { return &ContinueAsNewCounter{ configer: configer, diff --git a/service/interpreter/continueAsNewer.go b/service/interpreter/continueAsNewer.go index 4ec787cb..3264af6e 100644 --- a/service/interpreter/continueAsNewer.go +++ b/service/interpreter/continueAsNewer.go @@ -7,13 +7,14 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "math" "strings" "time" ) type ContinueAsNewer struct { - provider WorkflowProvider + provider interfaces.WorkflowProvider StateExecutionToResumeMap map[string]service.StateExecutionResumeInfo // stateExeId to StateExecutionResumeInfo inflightUpdateOperations int @@ -24,14 +25,14 @@ type ContinueAsNewer struct { persistenceManager *PersistenceManager signalReceiver *SignalReceiver outputCollector *OutputCollector - timerProcessor *TimerProcessor + timerProcessor interfaces.TimerProcessor } func NewContinueAsNewer( - provider WorkflowProvider, + provider interfaces.WorkflowProvider, interStateChannel *InternalChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector, - timerProcessor *TimerProcessor, + timerProcessor interfaces.TimerProcessor, ) *ContinueAsNewer { return &ContinueAsNewer{ provider: provider, @@ -49,9 +50,9 @@ func NewContinueAsNewer( } func LoadInternalsFromPreviousRun( - ctx UnifiedContext, provider WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32, ) (*service.ContinueAsNewDumpResponse, error) { - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 5 * time.Second, RetryPolicy: &iwfidl.RetryPolicy{ MaximumIntervalSeconds: iwfidl.PtrInt32(5), @@ -134,7 +135,7 @@ func (c *ContinueAsNewer) GetSnapshot() service.ContinueAsNewDumpResponse { } } -func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx UnifiedContext) error { +func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx interfaces.UnifiedContext) error { return c.provider.SetQueryHandler(ctx, service.ContinueAsNewDumpByPageQueryType, // return the current page of the whole snapshot func(request iwfidl.WorkflowDumpRequest) (*iwfidl.WorkflowDumpResponse, error) { @@ -192,7 +193,7 @@ func (c *ContinueAsNewer) RemoveStateExecutionToResume(stateExecutionId string) delete(c.StateExecutionToResumeMap, stateExecutionId) } -func (c *ContinueAsNewer) DrainThreads(ctx UnifiedContext) error { +func (c *ContinueAsNewer) DrainThreads(ctx interfaces.UnifiedContext) error { // TODO: add metric for before and after Await to monitor stuck // NOTE: consider using AwaitWithTimeout to get an alert when workflow stuck due to a bug in the draining logic for continueAsNew @@ -222,7 +223,7 @@ var inMemoryContinueAsNewMonitor = make(map[string]time.Time) const warnThreshold = time.Second * 5 const errThreshold = time.Second * 15 -func (c *ContinueAsNewer) allThreadsDrained(ctx UnifiedContext) bool { +func (c *ContinueAsNewer) allThreadsDrained(ctx interfaces.UnifiedContext) bool { runId := c.provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID remainingThreadCount := c.provider.GetThreadCount() diff --git a/service/interpreter/globalVersioner.go b/service/interpreter/globalVersioner.go index 68d623e2..765b1da9 100644 --- a/service/interpreter/globalVersioner.go +++ b/service/interpreter/globalVersioner.go @@ -2,6 +2,7 @@ package interpreter import ( "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/interpreter/interfaces" ) const globalChangeId = "global" @@ -18,13 +19,13 @@ const MaxOfAllVersions = StartingVersionYieldOnConditionalComplete // GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api type GlobalVersioner struct { - workflowProvider WorkflowProvider - ctx UnifiedContext + workflowProvider interfaces.WorkflowProvider + ctx interfaces.UnifiedContext version int } func NewGlobalVersioner( - workflowProvider WorkflowProvider, ctx UnifiedContext, + workflowProvider interfaces.WorkflowProvider, ctx interfaces.UnifiedContext, ) (*GlobalVersioner, error) { version := workflowProvider.GetVersion(ctx, globalChangeId, 0, MaxOfAllVersions) diff --git a/service/interpreter/interfaces.go b/service/interpreter/interfaces/interfaces.go similarity index 88% rename from service/interpreter/interfaces.go rename to service/interpreter/interfaces/interfaces.go index 7a6f733f..b74975ce 100644 --- a/service/interpreter/interfaces.go +++ b/service/interpreter/interfaces/interfaces.go @@ -1,4 +1,4 @@ -package interpreter +package interfaces import ( "context" @@ -31,7 +31,7 @@ func RegisterActivityProvider(backendType service.BackendType, provider Activity activityProviderRegistry[backendType] = provider } -func getActivityProviderByType(backendType service.BackendType) ActivityProvider { +func GetActivityProviderByType(backendType service.BackendType) ActivityProvider { provider := activityProviderRegistry[backendType] if provider == nil { panic("not supported yet: " + backendType) @@ -84,6 +84,16 @@ func NewUnifiedContext(ctx interface{}) UnifiedContext { } } +type TimerProcessor interface { + Dump() []service.StaleSkipTimerSignal + GetCurrentTimerInfos() map[string][]*service.TimerInfo + SkipTimer(stateExeId string, timerId string, timerIdx int) bool + RetryStaleSkipTimer() bool + WaitForTimerFiredOrSkipped(ctx UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool) service.InternalTimerStatus + RemovePendingTimersOfState(stateExeId string) + AddTimers(stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus) +} + type WorkflowProvider interface { NewApplicationError(errType string, details interface{}) error IsApplicationError(err error) bool diff --git a/service/interpreter/interfaces_mock.go b/service/interpreter/interfaces/interfaces_mock.go similarity index 99% rename from service/interpreter/interfaces_mock.go rename to service/interpreter/interfaces/interfaces_mock.go index 3b2799d0..55058d08 100644 --- a/service/interpreter/interfaces_mock.go +++ b/service/interpreter/interfaces/interfaces_mock.go @@ -2,7 +2,7 @@ // Source: /Users/lwolczynski/indeedeng/iwf-server/service/interpreter/interfaces.go // Package interpreter is a generated GoMock package. -package interpreter +package interfaces import ( context "context" diff --git a/service/interpreter/persistence.go b/service/interpreter/persistence.go index c9c34a30..45e1d284 100644 --- a/service/interpreter/persistence.go +++ b/service/interpreter/persistence.go @@ -5,12 +5,13 @@ import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/mapper" "github.com/indeedeng/iwf/service/common/utils" + "github.com/indeedeng/iwf/service/interpreter/interfaces" ) type PersistenceManager struct { dataObjects map[string]iwfidl.KeyValue searchAttributes map[string]iwfidl.SearchAttribute - provider WorkflowProvider + provider interfaces.WorkflowProvider lockedDataObjectKeys map[string]bool lockedSearchAttributeKeys map[string]bool @@ -19,7 +20,7 @@ type PersistenceManager struct { } func NewPersistenceManager( - provider WorkflowProvider, initDataAttributes []iwfidl.KeyValue, initSearchAttributes []iwfidl.SearchAttribute, + provider interfaces.WorkflowProvider, initDataAttributes []iwfidl.KeyValue, initSearchAttributes []iwfidl.SearchAttribute, useMemo bool, ) *PersistenceManager { searchAttributes := make(map[string]iwfidl.SearchAttribute) @@ -43,7 +44,7 @@ func NewPersistenceManager( } func RebuildPersistenceManager( - provider WorkflowProvider, + provider interfaces.WorkflowProvider, dolist []iwfidl.KeyValue, salist []iwfidl.SearchAttribute, useMemo bool, ) *PersistenceManager { @@ -89,7 +90,7 @@ func (am *PersistenceManager) GetDataObjectsByKey(request service.GetDataAttribu } func (am *PersistenceManager) LoadSearchAttributes( - ctx UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, + ctx interfaces.UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, ) []iwfidl.SearchAttribute { var loadingType iwfidl.PersistenceLoadingType var partialLoadingKeys []string @@ -127,7 +128,7 @@ func (am *PersistenceManager) LoadSearchAttributes( } func (am *PersistenceManager) LoadDataObjects( - ctx UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, + ctx interfaces.UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, ) []iwfidl.KeyValue { var loadingType iwfidl.PersistenceLoadingType var partialLoadingKeys []string @@ -181,7 +182,7 @@ func (am *PersistenceManager) GetAllDataObjects() []iwfidl.KeyValue { } func (am *PersistenceManager) ProcessUpsertSearchAttribute( - ctx UnifiedContext, attributes []iwfidl.SearchAttribute, + ctx interfaces.UnifiedContext, attributes []iwfidl.SearchAttribute, ) error { if len(attributes) == 0 { return nil @@ -197,7 +198,7 @@ func (am *PersistenceManager) ProcessUpsertSearchAttribute( return am.provider.UpsertSearchAttributes(ctx, attrsToUpsert) } -func (am *PersistenceManager) ProcessUpsertDataObject(ctx UnifiedContext, attributes []iwfidl.KeyValue) error { +func (am *PersistenceManager) ProcessUpsertDataObject(ctx interfaces.UnifiedContext, attributes []iwfidl.KeyValue) error { if len(attributes) == 0 { return nil } @@ -228,7 +229,7 @@ func (am *PersistenceManager) checkKeysAreUnlocked(lockedKeys map[string]bool, k return true } -func (am *PersistenceManager) awaitAndLockForKeys(ctx UnifiedContext, lockedKeys map[string]bool, keysToLock []string) { +func (am *PersistenceManager) awaitAndLockForKeys(ctx interfaces.UnifiedContext, lockedKeys map[string]bool, keysToLock []string) { // wait until all keys are not locked err := am.provider.Await(ctx, func() bool { for _, k := range keysToLock { diff --git a/service/interpreter/queryHandler.go b/service/interpreter/queryHandler.go index 7b2d7660..c70067d1 100644 --- a/service/interpreter/queryHandler.go +++ b/service/interpreter/queryHandler.go @@ -3,10 +3,11 @@ package interpreter import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/interpreter/interfaces" ) func SetQueryHandlers( - ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager, internalChannel *InternalChannel, signalReceiver *SignalReceiver, continueAsNewer *ContinueAsNewer, workflowConfiger *WorkflowConfiger, basicInfo service.BasicInfo, diff --git a/service/interpreter/signalReceiver.go b/service/interpreter/signalReceiver.go index 06aa17de..f8dff2ea 100644 --- a/service/interpreter/signalReceiver.go +++ b/service/interpreter/signalReceiver.go @@ -2,6 +2,7 @@ package interpreter import ( "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "strings" "github.com/indeedeng/iwf/gen/iwfidl" @@ -13,8 +14,8 @@ type SignalReceiver struct { receivedSignals map[string][]*iwfidl.EncodedObject failWorkflowByClient bool reasonFailWorkflowByClient *string - provider WorkflowProvider - timerProcessor *TimerProcessor + provider interfaces.WorkflowProvider + timerProcessor interfaces.TimerProcessor workflowConfiger *WorkflowConfiger interStateChannel *InternalChannel stateRequestQueue *StateRequestQueue @@ -22,9 +23,9 @@ type SignalReceiver struct { } func NewSignalReceiver( - ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InternalChannel, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, interStateChannel *InternalChannel, stateRequestQueue *StateRequestQueue, - persistenceManager *PersistenceManager, tp *TimerProcessor, continueAsNewCounter *ContinueAsNewCounter, + persistenceManager *PersistenceManager, tp interfaces.TimerProcessor, continueAsNewCounter *ContinueAsNewCounter, workflowConfiger *WorkflowConfiger, initReceivedSignals map[string][]*iwfidl.EncodedObject, ) *SignalReceiver { @@ -42,7 +43,7 @@ func NewSignalReceiver( persistenceManager: persistenceManager, } - provider.GoNamed(ctx, "fail-workflow-system-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "fail-workflow-system-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.FailWorkflowSignalChannelName) @@ -67,7 +68,7 @@ func NewSignalReceiver( } }) - provider.GoNamed(ctx, "skip-timer-system-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "skip-timer-system-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.SkipTimerSignalChannelName) val := service.SkipTimerSignalRequest{} @@ -91,7 +92,7 @@ func NewSignalReceiver( } }) - provider.GoNamed(ctx, "update-config-system-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "update-config-system-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.UpdateConfigSignalChannelName) val := iwfidl.WorkflowConfigUpdateRequest{} @@ -115,7 +116,7 @@ func NewSignalReceiver( } }) - provider.GoNamed(ctx, "trigger-continue-as-new-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "trigger-continue-as-new-handler", func(ctx interfaces.UnifiedContext) { // NOTE: unlike other signal channels, this one doesn't need to drain during continueAsNew // because if there is a continueAsNew, this signal is not needed anymore ch := provider.GetSignalChannel(ctx, service.TriggerContinueAsNewSignalChannelName) @@ -135,7 +136,7 @@ func NewSignalReceiver( return }) - provider.GoNamed(ctx, "execute-rpc-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "execute-rpc-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.ExecuteRpcSignalChannelName) var val service.ExecuteRpcSignalRequest @@ -164,7 +165,7 @@ func NewSignalReceiver( } }) - provider.GoNamed(ctx, "user-signal-receiver-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "user-signal-receiver-handler", func(ctx interfaces.UnifiedContext) { for { var toProcess []string err := provider.Await(ctx, func() bool { @@ -200,7 +201,7 @@ func NewSignalReceiver( return sr } -func (sr *SignalReceiver) receiveSignal(ctx UnifiedContext, sigName string) { +func (sr *SignalReceiver) receiveSignal(ctx interfaces.UnifiedContext, sigName string) { ch := sr.provider.GetSignalChannel(ctx, sigName) for { var sigVal iwfidl.EncodedObject @@ -257,7 +258,7 @@ func (sr *SignalReceiver) GetInfos() map[string]iwfidl.ChannelInfo { // This includes both regular user signals and system signals // 2. Conditional close/complete workflow on signal/internal channel: // retrieve all signal/internal channel messages before checking the signal/internal channels -func (sr *SignalReceiver) DrainAllReceivedButUnprocessedSignals(ctx UnifiedContext) { +func (sr *SignalReceiver) DrainAllReceivedButUnprocessedSignals(ctx interfaces.UnifiedContext) { unhandledSigs := sr.provider.GetUnhandledSignalNames(ctx) if len(unhandledSigs) == 0 { return diff --git a/service/interpreter/stateExecutionCounter.go b/service/interpreter/stateExecutionCounter.go index 1b21ea11..96346eee 100644 --- a/service/interpreter/stateExecutionCounter.go +++ b/service/interpreter/stateExecutionCounter.go @@ -6,13 +6,14 @@ import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/compatibility" "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "reflect" "slices" ) type StateExecutionCounter struct { - ctx UnifiedContext - provider WorkflowProvider + ctx interfaces.UnifiedContext + provider interfaces.WorkflowProvider configer *WorkflowConfiger globalVersioner *GlobalVersioner continueAsNewCounter *ContinueAsNewCounter @@ -24,7 +25,7 @@ type StateExecutionCounter struct { } func NewStateExecutionCounter( - ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, ) *StateExecutionCounter { return &StateExecutionCounter{ @@ -40,7 +41,7 @@ func NewStateExecutionCounter( } func RebuildStateExecutionCounter( - ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int, totalCurrentlyExecutingCount int, configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, diff --git a/service/interpreter/temporal/activityProvider.go b/service/interpreter/temporal/activityProvider.go index d8169791..5941427f 100644 --- a/service/interpreter/temporal/activityProvider.go +++ b/service/interpreter/temporal/activityProvider.go @@ -3,7 +3,7 @@ package temporal import ( "context" "github.com/indeedeng/iwf/service" - "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/temporal" ) @@ -11,10 +11,10 @@ import ( type activityProvider struct{} func init() { - interpreter.RegisterActivityProvider(service.BackendTypeTemporal, &activityProvider{}) + interfaces.RegisterActivityProvider(service.BackendTypeTemporal, &activityProvider{}) } -func (a *activityProvider) GetLogger(ctx context.Context) interpreter.UnifiedLogger { +func (a *activityProvider) GetLogger(ctx context.Context) interfaces.UnifiedLogger { return activity.GetLogger(ctx) } @@ -22,13 +22,13 @@ func (a *activityProvider) NewApplicationError(errType string, details interface return temporal.NewApplicationError("", errType, details) } -func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.ActivityInfo { +func (a *activityProvider) GetActivityInfo(ctx context.Context) interfaces.ActivityInfo { info := activity.GetInfo(ctx) - return interpreter.ActivityInfo{ + return interfaces.ActivityInfo{ ScheduledTime: info.ScheduledTime, Attempt: info.Attempt, IsLocalActivity: info.IsLocalActivity, - WorkflowExecution: interpreter.WorkflowExecution{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, diff --git a/service/interpreter/temporal/workflow.go b/service/interpreter/temporal/workflow.go index aa1e792c..59bec2ec 100644 --- a/service/interpreter/temporal/workflow.go +++ b/service/interpreter/temporal/workflow.go @@ -3,6 +3,7 @@ package temporal import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.temporal.io/sdk/workflow" // TODO(cretz): Remove when tagged @@ -10,9 +11,9 @@ import ( ) func Interpreter(ctx workflow.Context, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) { - return interpreter.InterpreterImpl(interpreter.NewUnifiedContext(ctx), newTemporalWorkflowProvider(), input) + return interpreter.InterpreterImpl(interfaces.NewUnifiedContext(ctx), newTemporalWorkflowProvider(), input) } func WaitforStateCompletionWorkflow(ctx workflow.Context) (*service.WaitForStateCompletionWorkflowOutput, error) { - return interpreter.WaitForStateCompletionWorkflowImpl(interpreter.NewUnifiedContext(ctx), newTemporalWorkflowProvider()) + return interpreter.WaitForStateCompletionWorkflowImpl(interfaces.NewUnifiedContext(ctx), newTemporalWorkflowProvider()) } diff --git a/service/interpreter/temporal/workflowProvider.go b/service/interpreter/temporal/workflowProvider.go index a4aeb57a..0a730606 100644 --- a/service/interpreter/temporal/workflowProvider.go +++ b/service/interpreter/temporal/workflowProvider.go @@ -3,12 +3,12 @@ package temporal import ( "errors" "github.com/indeedeng/iwf/service/common/mapper" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/retry" - "github.com/indeedeng/iwf/service/interpreter" "github.com/indeedeng/iwf/service/interpreter/env" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" @@ -19,7 +19,7 @@ type workflowProvider struct { pendingThreadNames map[string]int } -func newTemporalWorkflowProvider() interpreter.WorkflowProvider { +func newTemporalWorkflowProvider() interfaces.WorkflowProvider { return &workflowProvider{ pendingThreadNames: map[string]int{}, } @@ -39,7 +39,7 @@ func (w *workflowProvider) IsApplicationError(err error) bool { } func (w *workflowProvider) NewInterpreterContinueAsNewError( - ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput, + ctx interfaces.UnifiedContext, input service.InterpreterWorkflowInput, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -49,7 +49,7 @@ func (w *workflowProvider) NewInterpreterContinueAsNewError( } func (w *workflowProvider) UpsertSearchAttributes( - ctx interpreter.UnifiedContext, attributes map[string]interface{}, + ctx interfaces.UnifiedContext, attributes map[string]interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -58,7 +58,7 @@ func (w *workflowProvider) UpsertSearchAttributes( return workflow.UpsertSearchAttributes(wfCtx, attributes) } -func (w *workflowProvider) UpsertMemo(ctx interpreter.UnifiedContext, rawMemo map[string]iwfidl.EncodedObject) error { +func (w *workflowProvider) UpsertMemo(ctx interfaces.UnifiedContext, rawMemo map[string]iwfidl.EncodedObject) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -83,7 +83,7 @@ func (w *workflowProvider) UpsertMemo(ctx interpreter.UnifiedContext, rawMemo ma return workflow.UpsertMemo(wfCtx, memo) } -func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Duration) interpreter.Future { +func (w *workflowProvider) NewTimer(ctx interfaces.UnifiedContext, d time.Duration) interfaces.Future { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -94,14 +94,14 @@ func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Durat } } -func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) interpreter.WorkflowInfo { +func (w *workflowProvider) GetWorkflowInfo(ctx interfaces.UnifiedContext) interfaces.WorkflowInfo { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } info := workflow.GetInfo(wfCtx) - return interpreter.WorkflowInfo{ - WorkflowExecution: interpreter.WorkflowExecution{ + return interfaces.WorkflowInfo{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, @@ -113,7 +113,7 @@ func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) inter } func (w *workflowProvider) GetSearchAttributes( - ctx interpreter.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, + ctx interfaces.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, ) (map[string]iwfidl.SearchAttribute, error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -125,7 +125,7 @@ func (w *workflowProvider) GetSearchAttributes( } func (w *workflowProvider) SetQueryHandler( - ctx interpreter.UnifiedContext, queryType string, handler interface{}, + ctx interfaces.UnifiedContext, queryType string, handler interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -135,19 +135,19 @@ func (w *workflowProvider) SetQueryHandler( } func (w *workflowProvider) SetRpcUpdateHandler( - ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, - handler interpreter.UnifiedRpcHandler, + ctx interfaces.UnifiedContext, updateType string, validator interfaces.UnifiedRpcValidator, + handler interfaces.UnifiedRpcHandler, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } v2 := func(ctx workflow.Context, input iwfidl.WorkflowRpcRequest) error { - ctx2 := interpreter.NewUnifiedContext(ctx) + ctx2 := interfaces.NewUnifiedContext(ctx) return validator(ctx2, input) } - h2 := func(ctx workflow.Context, input iwfidl.WorkflowRpcRequest) (*interpreter.HandlerOutput, error) { - ctx2 := interpreter.NewUnifiedContext(ctx) + h2 := func(ctx workflow.Context, input iwfidl.WorkflowRpcRequest) (*interfaces.HandlerOutput, error) { + ctx2 := interfaces.NewUnifiedContext(ctx) return handler(ctx2, input) } return workflow.SetUpdateHandlerWithOptions( @@ -159,24 +159,24 @@ func (w *workflowProvider) SetRpcUpdateHandler( } func (w *workflowProvider) ExtendContextWithValue( - parent interpreter.UnifiedContext, key string, val interface{}, -) interpreter.UnifiedContext { + parent interfaces.UnifiedContext, key string, val interface{}, +) interfaces.UnifiedContext { wfCtx, ok := parent.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } - return interpreter.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) + return interfaces.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) } func (w *workflowProvider) GoNamed( - ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext), + ctx interfaces.UnifiedContext, name string, f func(ctx interfaces.UnifiedContext), ) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } f2 := func(ctx workflow.Context) { - ctx2 := interpreter.NewUnifiedContext(ctx) + ctx2 := interfaces.NewUnifiedContext(ctx) w.pendingThreadNames[name]++ w.threadCount++ f(ctx2) @@ -197,7 +197,7 @@ func (w *workflowProvider) GetThreadCount() int { return w.threadCount } -func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func() bool) error { +func (w *workflowProvider) Await(ctx interfaces.UnifiedContext, condition func() bool) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -206,8 +206,8 @@ func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func( } func (w *workflowProvider) WithActivityOptions( - ctx interpreter.UnifiedContext, options interpreter.ActivityOptions, -) interpreter.UnifiedContext { + ctx interfaces.UnifiedContext, options interfaces.ActivityOptions, +) interfaces.UnifiedContext { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -231,7 +231,7 @@ func (w *workflowProvider) WithActivityOptions( ScheduleToCloseTimeout: time.Second * 7, RetryPolicy: retry.ConvertTemporalActivityRetryPolicy(options.RetryPolicy), }) - return interpreter.NewUnifiedContext(wfCtx3) + return interfaces.NewUnifiedContext(wfCtx3) } type futureImpl struct { @@ -242,7 +242,7 @@ func (t *futureImpl) IsReady() bool { return t.future.IsReady() } -func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error { +func (t *futureImpl) Get(ctx interfaces.UnifiedContext, valuePtr interface{}) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -253,7 +253,7 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e func (w *workflowProvider) ExecuteActivity( valuePtr interface{}, optimizeByLocalActivity bool, - ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}, + ctx interfaces.UnifiedContext, activity interface{}, args ...interface{}, ) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -272,7 +272,7 @@ func (w *workflowProvider) ExecuteActivity( return f.Get(wfCtx, valuePtr) } -func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { +func (w *workflowProvider) Now(ctx interfaces.UnifiedContext) time.Time { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -280,7 +280,7 @@ func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { return workflow.Now(wfCtx) } -func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration) (err error) { +func (w *workflowProvider) Sleep(ctx interfaces.UnifiedContext, d time.Duration) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -288,7 +288,7 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration return workflow.Sleep(wfCtx, d) } -func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { +func (w *workflowProvider) IsReplaying(ctx interfaces.UnifiedContext) bool { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -297,7 +297,7 @@ func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { } func (w *workflowProvider) GetVersion( - ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int, + ctx interfaces.UnifiedContext, changeID string, minSupported, maxSupported int, ) int { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -316,7 +316,7 @@ func (t *temporalReceiveChannel) ReceiveAsync(valuePtr interface{}) (ok bool) { return t.channel.ReceiveAsync(valuePtr) } -func (t *temporalReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, valuePtr interface{}) (ok bool) { +func (t *temporalReceiveChannel) ReceiveBlocking(ctx interfaces.UnifiedContext, valuePtr interface{}) (ok bool) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -326,8 +326,8 @@ func (t *temporalReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, } func (w *workflowProvider) GetSignalChannel( - ctx interpreter.UnifiedContext, signalName string, -) interpreter.ReceiveChannel { + ctx interfaces.UnifiedContext, signalName string, +) interfaces.ReceiveChannel { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -338,7 +338,7 @@ func (w *workflowProvider) GetSignalChannel( } } -func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key string) interface{} { +func (w *workflowProvider) GetContextValue(ctx interfaces.UnifiedContext, key string) interface{} { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -346,7 +346,7 @@ func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key s return wfCtx.Value(key) } -func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter.UnifiedLogger { +func (w *workflowProvider) GetLogger(ctx interfaces.UnifiedContext) interfaces.UnifiedLogger { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -354,7 +354,7 @@ func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter return workflow.GetLogger(wfCtx) } -func (w *workflowProvider) GetUnhandledSignalNames(ctx interpreter.UnifiedContext) []string { +func (w *workflowProvider) GetUnhandledSignalNames(ctx interfaces.UnifiedContext) []string { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") diff --git a/service/interpreter/timerProcessor.go b/service/interpreter/timers/simpleTimerProcessor.go similarity index 84% rename from service/interpreter/timerProcessor.go rename to service/interpreter/timers/simpleTimerProcessor.go index ebd7fcf3..c9af9e8f 100644 --- a/service/interpreter/timerProcessor.go +++ b/service/interpreter/timers/simpleTimerProcessor.go @@ -1,23 +1,24 @@ -package interpreter +package timers import ( + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" ) -type TimerProcessor struct { +type SimpleTimerProcessor struct { stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo staleSkipTimerSignals []service.StaleSkipTimerSignal - provider WorkflowProvider - logger UnifiedLogger + provider interfaces.WorkflowProvider + logger interfaces.UnifiedLogger } -func NewTimerProcessor( - ctx UnifiedContext, provider WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal, -) *TimerProcessor { - tp := &TimerProcessor{ +func NewSimpleTimerProcessor( + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal, +) *SimpleTimerProcessor { + tp := &SimpleTimerProcessor{ provider: provider, stateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{}, logger: provider.GetLogger(ctx), @@ -35,16 +36,16 @@ func NewTimerProcessor( return tp } -func (t *TimerProcessor) Dump() []service.StaleSkipTimerSignal { +func (t *SimpleTimerProcessor) Dump() []service.StaleSkipTimerSignal { return t.staleSkipTimerSignals } -func (t *TimerProcessor) GetCurrentTimerInfos() map[string][]*service.TimerInfo { +func (t *SimpleTimerProcessor) GetCurrentTimerInfos() map[string][]*service.TimerInfo { return t.stateExecutionCurrentTimerInfos } // SkipTimer will attempt to skip a timer, return false if no valid timer found -func (t *TimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool { +func (t *SimpleTimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool { timer, valid := service.ValidateTimerSkipRequest(t.stateExecutionCurrentTimerInfos, stateExeId, timerId, timerIdx) if !valid { // since we have checked it before sending signals, this should only happen in some vary rare cases for racing condition @@ -61,7 +62,7 @@ func (t *TimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) boo return true } -func (t *TimerProcessor) RetryStaleSkipTimer() bool { +func (t *SimpleTimerProcessor) RetryStaleSkipTimer() bool { for i, staleSkip := range t.staleSkipTimerSignals { found := t.SkipTimer(staleSkip.StateExecutionId, staleSkip.TimerCommandId, staleSkip.TimerCommandIndex) if found { @@ -81,8 +82,8 @@ func removeElement(s []service.StaleSkipTimerSignal, i int) []service.StaleSkipT // WaitForTimerFiredOrSkipped waits for timer completed(fired or skipped), // return true when the timer is fired or skipped // return false if the waitingCommands is canceled by cancelWaiting bool pointer(when the trigger type is completed, or continueAsNew) -func (t *TimerProcessor) WaitForTimerFiredOrSkipped( - ctx UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool, +func (t *SimpleTimerProcessor) WaitForTimerFiredOrSkipped( + ctx interfaces.UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool, ) service.InternalTimerStatus { timerInfos := t.stateExecutionCurrentTimerInfos[stateExeId] if len(timerInfos) == 0 { @@ -122,11 +123,11 @@ func (t *TimerProcessor) WaitForTimerFiredOrSkipped( } // RemovePendingTimersOfState is for when a state is completed, remove all its pending timers -func (t *TimerProcessor) RemovePendingTimersOfState(stateExeId string) { +func (t *SimpleTimerProcessor) RemovePendingTimersOfState(stateExeId string) { delete(t.stateExecutionCurrentTimerInfos, stateExeId) } -func (t *TimerProcessor) AddTimers( +func (t *SimpleTimerProcessor) AddTimers( stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus, ) { timers := make([]*service.TimerInfo, len(commands)) diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index b747f862..f0cbf3f8 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -8,6 +8,8 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "github.com/indeedeng/iwf/service/common/utils" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" + "github.com/indeedeng/iwf/service/interpreter/timers" "time" "github.com/indeedeng/iwf/service/common/compatibility" @@ -18,7 +20,7 @@ import ( ) func InterpreterImpl( - ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, input service.InterpreterWorkflowInput, ) (output *service.InterpreterWorkflowOutput, retErr error) { defer func() { if !provider.IsReplaying(ctx) { @@ -79,7 +81,7 @@ func InterpreterImpl( var internalChannel *InternalChannel var stateRequestQueue *StateRequestQueue var persistenceManager *PersistenceManager - var timerProcessor *TimerProcessor + var timerProcessor interfaces.TimerProcessor var continueAsNewCounter *ContinueAsNewCounter var signalReceiver *SignalReceiver var stateExecutionCounter *StateExecutionCounter @@ -99,7 +101,7 @@ func InterpreterImpl( internalChannel = RebuildInternalChannel(previous.InterStateChannelReceived) stateRequestQueue = NewStateRequestQueueWithResumeRequests(previous.StatesToStartFromBeginning, previous.StateExecutionsToResume) persistenceManager = RebuildPersistenceManager(provider, previous.DataObjects, previous.SearchAttributes, input.UseMemoForDataAttributes) - timerProcessor = NewTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) + timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, previous.SignalsReceived) counterInfo := previous.StateExecutionCounterInfo @@ -112,7 +114,7 @@ func InterpreterImpl( internalChannel = NewInternalChannel() stateRequestQueue = NewStateRequestQueue() persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes) - timerProcessor = NewTimerProcessor(ctx, provider, nil) + timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, nil) continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil) stateExecutionCounter = NewStateExecutionCounter(ctx, provider, globalVersioner, workflowConfiger, continueAsNewCounter) @@ -203,7 +205,7 @@ func InterpreterImpl( // execute in another thread for parallelism // state must be passed via parameter https://stackoverflow.com/questions/67263092 stateCtx := provider.ExtendContextWithValue(ctx, "stateReq", stateReqForLoopingOnly) - provider.GoNamed(stateCtx, "state-execution-thread:"+stateReqForLoopingOnly.GetStateId(), func(ctx UnifiedContext) { + provider.GoNamed(stateCtx, "state-execution-thread:"+stateReqForLoopingOnly.GetStateId(), func(ctx interfaces.UnifiedContext) { stateReq, ok := provider.GetContextValue(ctx, "stateReq").(StateRequest) if !ok { errToFailWf = provider.NewApplicationError( @@ -380,7 +382,7 @@ func InterpreterImpl( } func checkClosingWorkflow( - ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner, decision *iwfidl.StateDecision, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, versioner *GlobalVersioner, decision *iwfidl.StateDecision, currentStateId, currentStateExeId string, internalChannel *InternalChannel, signalReceiver *SignalReceiver, ) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) { @@ -487,7 +489,7 @@ func checkClosingWorkflow( } func DrainReceivedButUnprocessedInternalChannelsFromStateApis( - ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, versioner *GlobalVersioner, ) error { if versioner.IsAfterVersionOfYieldOnConditionalComplete() { // Just yield, by waiting on an empty lambda, nothing else. @@ -504,8 +506,8 @@ func DrainReceivedButUnprocessedInternalChannelsFromStateApis( } func processStateExecution( - ctx UnifiedContext, - provider WorkflowProvider, + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, basicInfo service.BasicInfo, stateReq StateRequest, @@ -513,7 +515,7 @@ func processStateExecution( persistenceManager *PersistenceManager, interStateChannel *InternalChannel, signalReceiver *SignalReceiver, - timerProcessor *TimerProcessor, + timerProcessor interfaces.TimerProcessor, continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger, @@ -533,7 +535,7 @@ func processStateExecution( WorkflowStartedTimestamp: info.WorkflowStartTime.Unix(), StateExecutionId: &stateExeId, } - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 30 * time.Second, } @@ -638,7 +640,7 @@ func processStateExecution( } interStateChannel.ProcessPublishing(startResponse.GetPublishToInterStateChannel()) - commandReq = FixTimerCommandFromActivityOutput(provider.Now(ctx), startResponse.GetCommandRequest()) + commandReq = timers.FixTimerCommandFromActivityOutput(provider.Now(ctx), startResponse.GetCommandRequest()) stateExecutionLocal = startResponse.GetUpsertStateLocals() } @@ -650,7 +652,7 @@ func processStateExecution( continue } cmdCtx := provider.ExtendContextWithValue(ctx, "idx", idx) - provider.GoNamed(cmdCtx, getCommandThreadName("timer", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { + provider.GoNamed(cmdCtx, getCommandThreadName("timer", stateExeId, cmd.GetCommandId(), idx), func(ctx interfaces.UnifiedContext) { idx, ok := provider.GetContextValue(ctx, "idx").(int) if !ok { panic("critical code bug") @@ -675,7 +677,7 @@ func processStateExecution( } cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd) cmdCtx = provider.ExtendContextWithValue(cmdCtx, "idx", idx) - provider.GoNamed(cmdCtx, getCommandThreadName("signal", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { + provider.GoNamed(cmdCtx, getCommandThreadName("signal", stateExeId, cmd.GetCommandId(), idx), func(ctx interfaces.UnifiedContext) { cmd, ok := provider.GetContextValue(ctx, "cmd").(iwfidl.SignalCommand) if !ok { panic("critical code bug") @@ -707,7 +709,7 @@ func processStateExecution( } cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd) cmdCtx = provider.ExtendContextWithValue(cmdCtx, "idx", idx) - provider.GoNamed(cmdCtx, getCommandThreadName("interstate", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { + provider.GoNamed(cmdCtx, getCommandThreadName("interstate", stateExeId, cmd.GetCommandId(), idx), func(ctx interfaces.UnifiedContext) { cmd, ok := provider.GetContextValue(ctx, "cmd").(iwfidl.InterStateChannelCommand) if !ok { panic("critical code bug") @@ -812,8 +814,8 @@ func processStateExecution( } func invokeStateExecute( - ctx UnifiedContext, - provider WorkflowProvider, + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, basicInfo service.BasicInfo, state iwfidl.StateMovement, stateExeId string, @@ -828,7 +830,7 @@ func invokeStateExecute( shouldSendSignalOnCompletion bool, ) (*iwfidl.StateDecision, service.StateExecutionStatus, error) { var err error - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 30 * time.Second, } if state.StateOptions != nil { @@ -983,7 +985,7 @@ func shouldProceedOnExecuteApiError(state iwfidl.StateMovement) bool { options.GetExecuteApiFailurePolicy() == iwfidl.PROCEED_TO_CONFIGURED_STATE } -func convertStateApiActivityError(provider WorkflowProvider, err error) error { +func convertStateApiActivityError(provider interfaces.WorkflowProvider, err error) error { if provider.IsApplicationError(err) { return err } @@ -994,7 +996,7 @@ func getCommandThreadName(prefix string, stateExecId, cmdId string, idx int) str return fmt.Sprintf("%v-%v-%v-%v", prefix, stateExecId, cmdId, idx) } -func createUserWorkflowError(provider WorkflowProvider, message string) error { +func createUserWorkflowError(provider interfaces.WorkflowProvider, message string) error { return provider.NewApplicationError( string(iwfidl.INVALID_USER_WORKFLOW_CODE_ERROR_TYPE), message, @@ -1002,7 +1004,7 @@ func createUserWorkflowError(provider WorkflowProvider, message string) error { } func WaitForStateCompletionWorkflowImpl( - ctx UnifiedContext, provider WorkflowProvider, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, ) (*service.WaitForStateCompletionWorkflowOutput, error) { signalReceiveChannel := provider.GetSignalChannel(ctx, service.StateCompletionSignalChannelName) var signalValue iwfidl.StateCompletionOutput diff --git a/service/interpreter/workflowUpdater.go b/service/interpreter/workflowUpdater.go index 6846ee65..1391e709 100644 --- a/service/interpreter/workflowUpdater.go +++ b/service/interpreter/workflowUpdater.go @@ -4,25 +4,26 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/event" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" ) type WorkflowUpdater struct { persistenceManager *PersistenceManager - provider WorkflowProvider + provider interfaces.WorkflowProvider continueAsNewer *ContinueAsNewer continueAsNewCounter *ContinueAsNewCounter internalChannel *InternalChannel signalReceiver *SignalReceiver stateRequestQueue *StateRequestQueue configer *WorkflowConfiger - logger UnifiedLogger + logger interfaces.UnifiedLogger basicInfo service.BasicInfo globalVersioner *GlobalVersioner } func NewWorkflowUpdater( - ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger, internalChannel *InternalChannel, signalReceiver *SignalReceiver, basicInfo service.BasicInfo, @@ -51,8 +52,8 @@ func NewWorkflowUpdater( } func (u *WorkflowUpdater) handler( - ctx UnifiedContext, input iwfidl.WorkflowRpcRequest, -) (output *HandlerOutput, err error) { + ctx interfaces.UnifiedContext, input iwfidl.WorkflowRpcRequest, +) (output *interfaces.HandlerOutput, err error) { u.continueAsNewer.IncreaseInflightOperation() defer u.continueAsNewer.DecreaseInflightOperation() @@ -80,7 +81,7 @@ func (u *WorkflowUpdater) handler( InternalChannelInfo: u.internalChannel.GetInfos(), } - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 5 * time.Second, RetryPolicy: &iwfidl.RetryPolicy{ MaximumAttemptsDurationSeconds: input.TimeoutSeconds, @@ -88,7 +89,7 @@ func (u *WorkflowUpdater) handler( }, } ctx = u.provider.WithActivityOptions(ctx, activityOptions) - var activityOutput InvokeRpcActivityOutput + var activityOutput interfaces.InvokeRpcActivityOutput err = u.provider.ExecuteActivity(&activityOutput, u.configer.ShouldOptimizeActivity(), ctx, InvokeWorkerRpc, u.provider.GetBackendType(), rpcPrep, input) u.persistenceManager.UnlockPersistence(input.SearchAttributesLoadingPolicy, input.DataAttributesLoadingPolicy) @@ -97,7 +98,7 @@ func (u *WorkflowUpdater) handler( return nil, u.provider.NewApplicationError(string(iwfidl.SERVER_INTERNAL_ERROR_TYPE), "activity invocation failure:"+err.Error()) } - handlerOutput := &HandlerOutput{ + handlerOutput := &interfaces.HandlerOutput{ StatusError: activityOutput.StatusError, } @@ -118,7 +119,7 @@ func (u *WorkflowUpdater) handler( return handlerOutput, nil } -func (u *WorkflowUpdater) validator(_ UnifiedContext, input iwfidl.WorkflowRpcRequest) error { +func (u *WorkflowUpdater) validator(_ interfaces.UnifiedContext, input iwfidl.WorkflowRpcRequest) error { var daKeys, saKeys []string if input.HasDataAttributesLoadingPolicy() { daKeys = input.DataAttributesLoadingPolicy.LockingKeys From 3a6b2d5cc69acce2751c137a8bf2791c073c54b7 Mon Sep 17 00:00:00 2001 From: jbowers Date: Sat, 21 Dec 2024 00:35:09 -0600 Subject: [PATCH 2/4] IWF-274: create greedy timer creation goroutine --- gen/iwfidl/api/openapi.yaml | 6 + gen/iwfidl/docs/WorkflowConfig.md | 26 ++ gen/iwfidl/model_workflow_config.go | 36 +++ integ/any_timer_signal_test.go | 3 + integ/timer_test.go | 3 + iwf-idl | 2 +- .../timers/greedyTimerProcessor.go | 239 ++++++++++++++++++ .../timers/simpleTimerProcessor.go | 23 -- service/interpreter/timers/utils.go | 31 +++ service/interpreter/workflowImpl.go | 12 +- 10 files changed, 355 insertions(+), 26 deletions(-) create mode 100644 service/interpreter/timers/greedyTimerProcessor.go create mode 100644 service/interpreter/timers/utils.go diff --git a/gen/iwfidl/api/openapi.yaml b/gen/iwfidl/api/openapi.yaml index c18e8b48..27845c37 100644 --- a/gen/iwfidl/api/openapi.yaml +++ b/gen/iwfidl/api/openapi.yaml @@ -738,6 +738,7 @@ components: workflowConfigOverride: disableSystemSearchAttribute: true continueAsNewThreshold: 1 + optimizeTimer: true continueAsNewPageSizeInBytes: 4 executingStateIdMode: null optimizeActivity: true @@ -1011,6 +1012,7 @@ components: workflowConfigOverride: disableSystemSearchAttribute: true continueAsNewThreshold: 1 + optimizeTimer: true continueAsNewPageSizeInBytes: 4 executingStateIdMode: null optimizeActivity: true @@ -1601,6 +1603,7 @@ components: workflowConfig: disableSystemSearchAttribute: true continueAsNewThreshold: 1 + optimizeTimer: true continueAsNewPageSizeInBytes: 4 executingStateIdMode: null optimizeActivity: true @@ -3284,6 +3287,7 @@ components: example: disableSystemSearchAttribute: true continueAsNewThreshold: 1 + optimizeTimer: true continueAsNewPageSizeInBytes: 4 executingStateIdMode: null optimizeActivity: true @@ -3298,6 +3302,8 @@ components: type: integer optimizeActivity: type: boolean + optimizeTimer: + type: boolean type: object Context: example: diff --git a/gen/iwfidl/docs/WorkflowConfig.md b/gen/iwfidl/docs/WorkflowConfig.md index b7c2d6ff..ac490cc1 100644 --- a/gen/iwfidl/docs/WorkflowConfig.md +++ b/gen/iwfidl/docs/WorkflowConfig.md @@ -9,6 +9,7 @@ Name | Type | Description | Notes **ContinueAsNewThreshold** | Pointer to **int32** | | [optional] **ContinueAsNewPageSizeInBytes** | Pointer to **int32** | | [optional] **OptimizeActivity** | Pointer to **bool** | | [optional] +**OptimizeTimer** | Pointer to **bool** | | [optional] ## Methods @@ -154,6 +155,31 @@ SetOptimizeActivity sets OptimizeActivity field to given value. HasOptimizeActivity returns a boolean if a field has been set. +### GetOptimizeTimer + +`func (o *WorkflowConfig) GetOptimizeTimer() bool` + +GetOptimizeTimer returns the OptimizeTimer field if non-nil, zero value otherwise. + +### GetOptimizeTimerOk + +`func (o *WorkflowConfig) GetOptimizeTimerOk() (*bool, bool)` + +GetOptimizeTimerOk returns a tuple with the OptimizeTimer field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetOptimizeTimer + +`func (o *WorkflowConfig) SetOptimizeTimer(v bool)` + +SetOptimizeTimer sets OptimizeTimer field to given value. + +### HasOptimizeTimer + +`func (o *WorkflowConfig) HasOptimizeTimer() bool` + +HasOptimizeTimer returns a boolean if a field has been set. + [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/gen/iwfidl/model_workflow_config.go b/gen/iwfidl/model_workflow_config.go index 72374728..f55069cf 100644 --- a/gen/iwfidl/model_workflow_config.go +++ b/gen/iwfidl/model_workflow_config.go @@ -24,6 +24,7 @@ type WorkflowConfig struct { ContinueAsNewThreshold *int32 `json:"continueAsNewThreshold,omitempty"` ContinueAsNewPageSizeInBytes *int32 `json:"continueAsNewPageSizeInBytes,omitempty"` OptimizeActivity *bool `json:"optimizeActivity,omitempty"` + OptimizeTimer *bool `json:"optimizeTimer,omitempty"` } // NewWorkflowConfig instantiates a new WorkflowConfig object @@ -203,6 +204,38 @@ func (o *WorkflowConfig) SetOptimizeActivity(v bool) { o.OptimizeActivity = &v } +// GetOptimizeTimer returns the OptimizeTimer field value if set, zero value otherwise. +func (o *WorkflowConfig) GetOptimizeTimer() bool { + if o == nil || IsNil(o.OptimizeTimer) { + var ret bool + return ret + } + return *o.OptimizeTimer +} + +// GetOptimizeTimerOk returns a tuple with the OptimizeTimer field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *WorkflowConfig) GetOptimizeTimerOk() (*bool, bool) { + if o == nil || IsNil(o.OptimizeTimer) { + return nil, false + } + return o.OptimizeTimer, true +} + +// HasOptimizeTimer returns a boolean if a field has been set. +func (o *WorkflowConfig) HasOptimizeTimer() bool { + if o != nil && !IsNil(o.OptimizeTimer) { + return true + } + + return false +} + +// SetOptimizeTimer gets a reference to the given bool and assigns it to the OptimizeTimer field. +func (o *WorkflowConfig) SetOptimizeTimer(v bool) { + o.OptimizeTimer = &v +} + func (o WorkflowConfig) MarshalJSON() ([]byte, error) { toSerialize, err := o.ToMap() if err != nil { @@ -228,6 +261,9 @@ func (o WorkflowConfig) ToMap() (map[string]interface{}, error) { if !IsNil(o.OptimizeActivity) { toSerialize["optimizeActivity"] = o.OptimizeActivity } + if !IsNil(o.OptimizeTimer) { + toSerialize["optimizeTimer"] = o.OptimizeTimer + } return toSerialize, nil } diff --git a/integ/any_timer_signal_test.go b/integ/any_timer_signal_test.go index ee2c2810..24c2fce9 100644 --- a/integ/any_timer_signal_test.go +++ b/integ/any_timer_signal_test.go @@ -12,6 +12,7 @@ import ( "time" ) +// TODO: crate greedy tests for cancelling timer early func TestAnyTimerSignalWorkflowTemporal(t *testing.T) { if !*temporalIntegTest { t.Skip() @@ -52,6 +53,8 @@ func TestAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) { } } +// TODO: crate greedy tests for cancelling timer early + func doTestAnyTimerSignalWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { // start test workflow server wfHandler := anytimersignal.NewHandler() diff --git a/integ/timer_test.go b/integ/timer_test.go index 35681e1e..bb1521c4 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" ) +// TODO: create greedy tests by copying these 4 tests and pass in OptimizeTimer: true func TestTimerWorkflowTemporal(t *testing.T) { if !*temporalIntegTest { t.Skip() @@ -55,6 +56,8 @@ func TestTimerWorkflowCadenceContinueAsNew(t *testing.T) { } } +// TODO: create greedy tests by copying these 4 tests and pass in OptimizeTimer: true + func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { // start test workflow server wfHandler := timer.NewHandler() diff --git a/iwf-idl b/iwf-idl index a7fb5559..b29e553d 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit a7fb55597de5591fffa406b6b5f07d85fda4dfee +Subproject commit b29e553da2efc5d01a1e496398c2894920644bad diff --git a/service/interpreter/timers/greedyTimerProcessor.go b/service/interpreter/timers/greedyTimerProcessor.go new file mode 100644 index 00000000..5e104ad7 --- /dev/null +++ b/service/interpreter/timers/greedyTimerProcessor.go @@ -0,0 +1,239 @@ +package timers + +import ( + "github.com/indeedeng/iwf/service/interpreter/interfaces" + "time" + + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" +) + +type sortedTimers struct { + status service.InternalTimerStatus + // Ordered slice of all timers being awaited on + timers []*service.TimerInfo +} + +type GreedyTimerProcessor struct { + pendingTimers sortedTimers + stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo + staleSkipTimerSignals []service.StaleSkipTimerSignal + provider interfaces.WorkflowProvider + logger interfaces.UnifiedLogger +} + +func NewGreedyTimerProcessor( + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal, +) *GreedyTimerProcessor { + + tp := &GreedyTimerProcessor{ + provider: provider, + pendingTimers: sortedTimers{status: service.TimerPending}, + stateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{}, + logger: provider.GetLogger(ctx), + staleSkipTimerSignals: staleSkipTimerSignals, + } + + // start some single thread that manages timers + tp.createGreedyTimerScheduler(ctx) + + err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) { + return service.GetCurrentTimerInfosQueryResponse{ + StateExecutionCurrentTimerInfos: tp.stateExecutionCurrentTimerInfos, + }, nil + }) + if err != nil { + panic("cannot set query handler") + } + return tp +} + +func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) { + + if toAdd == nil || toAdd.Status != t.status { + panic("invalid timer added") + } + + insertIndex := 0 + for i, timer := range t.timers { + if toAdd.FiringUnixTimestampSeconds >= timer.FiringUnixTimestampSeconds { + insertIndex = i + break + } + insertIndex = i + 1 + } + t.timers = append( + t.timers[:insertIndex], + append([]*service.TimerInfo{toAdd}, t.timers[insertIndex:]...)...) +} + +func (t *sortedTimers) pruneToNextTimer(pruneTo int64) *service.TimerInfo { + + if len(t.timers) == 0 { + return nil + } + + index := len(t.timers) + + for i := len(t.timers) - 1; i >= 0; i-- { + timer := t.timers[i] + if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == t.status { + break + } + index = i + } + t.timers = t.timers[:index] + return t.timers[index-1] +} + +func (t *GreedyTimerProcessor) createGreedyTimerScheduler(ctx interfaces.UnifiedContext) { + + t.provider.GoNamed(ctx, "greedy-timer-scheduler", func(ctx interfaces.UnifiedContext) { + // NOTE: next timer to fire is at the end of the slice + var createdTimers []int64 + for { + t.provider.Await(ctx, func() bool { + // remove fired timers + now := t.provider.Now(ctx).Unix() + for i := len(createdTimers) - 1; i >= 0; i-- { + if createdTimers[i] > now { + createdTimers = createdTimers[:i+1] + break + } + } + next := t.pendingTimers.pruneToNextTimer(now) + return next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]) + }) + + now := t.provider.Now(ctx).Unix() + next := t.pendingTimers.pruneToNextTimer(now) + //next := t.pendingTimers.getEarliestTimer() + // only create a new timer when a pending timer exists before the next existing timer fires + if next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]) { + fireAt := next.FiringUnixTimestampSeconds + duration := time.Duration(fireAt-now) * time.Second + t.provider.NewTimer(ctx, duration) + createdTimers = append(createdTimers, fireAt) + } + } + }) +} + +func (t *GreedyTimerProcessor) Dump() []service.StaleSkipTimerSignal { + return t.staleSkipTimerSignals +} + +func (t *GreedyTimerProcessor) GetCurrentTimerInfos() map[string][]*service.TimerInfo { + return t.stateExecutionCurrentTimerInfos +} + +// SkipTimer will attempt to skip a timer, return false if no valid timer found +func (t *GreedyTimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool { + timer, valid := service.ValidateTimerSkipRequest(t.stateExecutionCurrentTimerInfos, stateExeId, timerId, timerIdx) + if !valid { + // since we have checked it before sending signals, this should only happen in some vary rare cases for racing condition + t.logger.Warn("cannot process timer skip request, maybe state is already closed...putting into a stale skip timer queue", stateExeId, timerId, timerIdx) + + t.staleSkipTimerSignals = append(t.staleSkipTimerSignals, service.StaleSkipTimerSignal{ + StateExecutionId: stateExeId, + TimerCommandId: timerId, + TimerCommandIndex: timerIdx, + }) + return false + } + timer.Status = service.TimerSkipped + return true +} + +func (t *GreedyTimerProcessor) RetryStaleSkipTimer() bool { + for i, staleSkip := range t.staleSkipTimerSignals { + found := t.SkipTimer(staleSkip.StateExecutionId, staleSkip.TimerCommandId, staleSkip.TimerCommandIndex) + if found { + newList := removeElement(t.staleSkipTimerSignals, i) + t.staleSkipTimerSignals = newList + return true + } + } + return false +} + +// WaitForTimerFiredOrSkipped waits for timer completed(fired or skipped), +// return true when the timer is fired or skipped +// return false if the waitingCommands is canceled by cancelWaiting bool pointer(when the trigger type is completed, or continueAsNew) +func (t *GreedyTimerProcessor) WaitForTimerFiredOrSkipped( + ctx interfaces.UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool, +) service.InternalTimerStatus { + timerInfos := t.stateExecutionCurrentTimerInfos[stateExeId] + if len(timerInfos) == 0 { + if *cancelWaiting { + // The waiting thread is later than the timer execState thread + // The execState thread got completed early and call RemovePendingTimersOfState to remove the timerInfos + // returning pending here + return service.TimerPending + } else { + panic("bug: this shouldn't happen") + } + } + timer := timerInfos[timerIdx] + if timer.Status == service.TimerFired || timer.Status == service.TimerSkipped { + return timer.Status + } + skippedByStaleSkip := t.RetryStaleSkipTimer() + if skippedByStaleSkip { + t.logger.Warn("timer skipped by stale skip signal", stateExeId, timerIdx) + timer.Status = service.TimerSkipped + return service.TimerSkipped + } + + _ = t.provider.Await(ctx, func() bool { + return timer.Status == service.TimerFired || timer.Status == service.TimerSkipped || timer.FiringUnixTimestampSeconds <= t.provider.Now(ctx).Unix() || *cancelWaiting + }) + + if timer.Status == service.TimerSkipped { + return service.TimerSkipped + } + + if timer.FiringUnixTimestampSeconds >= t.provider.Now(ctx).Unix() { + return service.TimerFired + } + + // otherwise *cancelWaiting should return false to indicate that this timer isn't completed(fired or skipped) + return service.TimerPending +} + +// RemovePendingTimersOfState is for when a state is completed, remove all its pending timers +func (t *GreedyTimerProcessor) RemovePendingTimersOfState(stateExeId string) { + + timers := t.stateExecutionCurrentTimerInfos[stateExeId] + + for _, timer := range timers { + timer.Status = service.TimerSkipped + } + + delete(t.stateExecutionCurrentTimerInfos, stateExeId) +} + +func (t *GreedyTimerProcessor) AddTimers( + stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus, +) { + timers := make([]*service.TimerInfo, len(commands)) + for idx, cmd := range commands { + var timer service.TimerInfo + if status, ok := completedTimerCmds[idx]; ok { + timer = service.TimerInfo{ + CommandId: cmd.CommandId, + FiringUnixTimestampSeconds: cmd.GetFiringUnixTimestampSeconds(), + Status: status, + } + } else { + timer = service.TimerInfo{ + CommandId: cmd.CommandId, + FiringUnixTimestampSeconds: cmd.GetFiringUnixTimestampSeconds(), + Status: service.TimerPending, + } + t.pendingTimers.addTimer(&timer) + } + timers[idx] = &timer + } + t.stateExecutionCurrentTimerInfos[stateExeId] = timers +} diff --git a/service/interpreter/timers/simpleTimerProcessor.go b/service/interpreter/timers/simpleTimerProcessor.go index c9af9e8f..91c6d9c9 100644 --- a/service/interpreter/timers/simpleTimerProcessor.go +++ b/service/interpreter/timers/simpleTimerProcessor.go @@ -74,11 +74,6 @@ func (t *SimpleTimerProcessor) RetryStaleSkipTimer() bool { return false } -func removeElement(s []service.StaleSkipTimerSignal, i int) []service.StaleSkipTimerSignal { - s[i] = s[len(s)-1] - return s[:len(s)-1] -} - // WaitForTimerFiredOrSkipped waits for timer completed(fired or skipped), // return true when the timer is fired or skipped // return false if the waitingCommands is canceled by cancelWaiting bool pointer(when the trigger type is completed, or continueAsNew) @@ -151,21 +146,3 @@ func (t *SimpleTimerProcessor) AddTimers( } t.stateExecutionCurrentTimerInfos[stateExeId] = timers } - -// FixTimerCommandFromActivityOutput converts the durationSeconds to firingUnixTimestampSeconds -// doing it right after the activity output so that we don't need to worry about the time drift after continueAsNew -func FixTimerCommandFromActivityOutput(now time.Time, request iwfidl.CommandRequest) iwfidl.CommandRequest { - var timerCommands []iwfidl.TimerCommand - for _, cmd := range request.GetTimerCommands() { - if cmd.HasDurationSeconds() { - timerCommands = append(timerCommands, iwfidl.TimerCommand{ - CommandId: cmd.CommandId, - FiringUnixTimestampSeconds: iwfidl.PtrInt64(now.Unix() + int64(cmd.GetDurationSeconds())), - }) - } else { - timerCommands = append(timerCommands, cmd) - } - } - request.TimerCommands = timerCommands - return request -} diff --git a/service/interpreter/timers/utils.go b/service/interpreter/timers/utils.go new file mode 100644 index 00000000..9e39a9b9 --- /dev/null +++ b/service/interpreter/timers/utils.go @@ -0,0 +1,31 @@ +package timers + +import ( + "time" + + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" +) + +func removeElement(s []service.StaleSkipTimerSignal, i int) []service.StaleSkipTimerSignal { + s[i] = s[len(s)-1] + return s[:len(s)-1] +} + +// FixTimerCommandFromActivityOutput converts the durationSeconds to firingUnixTimestampSeconds +// doing it right after the activity output so that we don't need to worry about the time drift after continueAsNew +func FixTimerCommandFromActivityOutput(now time.Time, request iwfidl.CommandRequest) iwfidl.CommandRequest { + var timerCommands []iwfidl.TimerCommand + for _, cmd := range request.GetTimerCommands() { + if cmd.HasDurationSeconds() { + timerCommands = append(timerCommands, iwfidl.TimerCommand{ + CommandId: cmd.CommandId, + FiringUnixTimestampSeconds: iwfidl.PtrInt64(now.Unix() + int64(cmd.GetDurationSeconds())), + }) + } else { + timerCommands = append(timerCommands, cmd) + } + } + request.TimerCommands = timerCommands + return request +} diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index f0cbf3f8..736defbb 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -101,7 +101,11 @@ func InterpreterImpl( internalChannel = RebuildInternalChannel(previous.InterStateChannelReceived) stateRequestQueue = NewStateRequestQueueWithResumeRequests(previous.StatesToStartFromBeginning, previous.StateExecutionsToResume) persistenceManager = RebuildPersistenceManager(provider, previous.DataObjects, previous.SearchAttributes, input.UseMemoForDataAttributes) - timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) + if input.Config.GetOptimizeTimer() { + timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) + } else { + timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) + } continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, previous.SignalsReceived) counterInfo := previous.StateExecutionCounterInfo @@ -114,7 +118,11 @@ func InterpreterImpl( internalChannel = NewInternalChannel() stateRequestQueue = NewStateRequestQueue() persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes) - timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, nil) + if input.Config.GetOptimizeTimer() { + timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, nil) + } else { + timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, nil) + } continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil) stateExecutionCounter = NewStateExecutionCounter(ctx, provider, globalVersioner, workflowConfiger, continueAsNewCounter) From a6399dfc3814f36c933e7091d1d7d40e78c5457b Mon Sep 17 00:00:00 2001 From: jbowers Date: Sat, 21 Dec 2024 01:30:24 -0600 Subject: [PATCH 3/4] IWF-274: continue as new should complete timer spawning goroutine --- .../{ => config}/workflowConfiger.go | 2 +- .../{ => cont}/continueAsNewCounter.go | 11 +++--- service/interpreter/queryHandler.go | 3 +- service/interpreter/signalReceiver.go | 8 +++-- service/interpreter/stateExecutionCounter.go | 10 +++--- .../timers/greedyTimerProcessor.go | 35 ++++++++++++++++--- service/interpreter/workflowImpl.go | 20 ++++++----- service/interpreter/workflowUpdater.go | 8 +++-- 8 files changed, 67 insertions(+), 30 deletions(-) rename service/interpreter/{ => config}/workflowConfiger.go (98%) rename service/interpreter/{ => cont}/continueAsNewCounter.go (80%) diff --git a/service/interpreter/workflowConfiger.go b/service/interpreter/config/workflowConfiger.go similarity index 98% rename from service/interpreter/workflowConfiger.go rename to service/interpreter/config/workflowConfiger.go index 81fedc02..dfa53506 100644 --- a/service/interpreter/workflowConfiger.go +++ b/service/interpreter/config/workflowConfiger.go @@ -1,4 +1,4 @@ -package interpreter +package config import ( "github.com/indeedeng/iwf/gen/iwfidl" diff --git a/service/interpreter/continueAsNewCounter.go b/service/interpreter/cont/continueAsNewCounter.go similarity index 80% rename from service/interpreter/continueAsNewCounter.go rename to service/interpreter/cont/continueAsNewCounter.go index e40e8095..c98bb20d 100644 --- a/service/interpreter/continueAsNewCounter.go +++ b/service/interpreter/cont/continueAsNewCounter.go @@ -1,6 +1,9 @@ -package interpreter +package cont -import "github.com/indeedeng/iwf/service/interpreter/interfaces" +import ( + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/interfaces" +) type ContinueAsNewCounter struct { executedStateApis int32 @@ -8,13 +11,13 @@ type ContinueAsNewCounter struct { syncUpdateReceived int32 triggeredByAPI bool - configer *WorkflowConfiger + configer *config.WorkflowConfiger rootCtx interfaces.UnifiedContext provider interfaces.WorkflowProvider } func NewContinueAsCounter( - configer *WorkflowConfiger, rootCtx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, + configer *config.WorkflowConfiger, rootCtx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, ) *ContinueAsNewCounter { return &ContinueAsNewCounter{ configer: configer, diff --git a/service/interpreter/queryHandler.go b/service/interpreter/queryHandler.go index c70067d1..b67ae942 100644 --- a/service/interpreter/queryHandler.go +++ b/service/interpreter/queryHandler.go @@ -3,6 +3,7 @@ package interpreter import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/interpreter/config" "github.com/indeedeng/iwf/service/interpreter/interfaces" ) @@ -10,7 +11,7 @@ func SetQueryHandlers( ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager, internalChannel *InternalChannel, signalReceiver *SignalReceiver, continueAsNewer *ContinueAsNewer, - workflowConfiger *WorkflowConfiger, basicInfo service.BasicInfo, + workflowConfiger *config.WorkflowConfiger, basicInfo service.BasicInfo, ) error { err := provider.SetQueryHandler(ctx, service.GetDataAttributesWorkflowQueryType, func(req service.GetDataAttributesQueryRequest) (service.GetDataAttributesQueryResponse, error) { dos := persistenceManager.GetDataObjectsByKey(req) diff --git a/service/interpreter/signalReceiver.go b/service/interpreter/signalReceiver.go index f8dff2ea..f73356a5 100644 --- a/service/interpreter/signalReceiver.go +++ b/service/interpreter/signalReceiver.go @@ -2,6 +2,8 @@ package interpreter import ( "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" "github.com/indeedeng/iwf/service/interpreter/interfaces" "strings" @@ -16,7 +18,7 @@ type SignalReceiver struct { reasonFailWorkflowByClient *string provider interfaces.WorkflowProvider timerProcessor interfaces.TimerProcessor - workflowConfiger *WorkflowConfiger + workflowConfiger *config.WorkflowConfiger interStateChannel *InternalChannel stateRequestQueue *StateRequestQueue persistenceManager *PersistenceManager @@ -25,8 +27,8 @@ type SignalReceiver struct { func NewSignalReceiver( ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, interStateChannel *InternalChannel, stateRequestQueue *StateRequestQueue, - persistenceManager *PersistenceManager, tp interfaces.TimerProcessor, continueAsNewCounter *ContinueAsNewCounter, - workflowConfiger *WorkflowConfiger, + persistenceManager *PersistenceManager, tp interfaces.TimerProcessor, continueAsNewCounter *cont.ContinueAsNewCounter, + workflowConfiger *config.WorkflowConfiger, initReceivedSignals map[string][]*iwfidl.EncodedObject, ) *SignalReceiver { if initReceivedSignals == nil { diff --git a/service/interpreter/stateExecutionCounter.go b/service/interpreter/stateExecutionCounter.go index 96346eee..58d3d888 100644 --- a/service/interpreter/stateExecutionCounter.go +++ b/service/interpreter/stateExecutionCounter.go @@ -6,6 +6,8 @@ import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/compatibility" "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" "github.com/indeedeng/iwf/service/interpreter/interfaces" "reflect" "slices" @@ -14,9 +16,9 @@ import ( type StateExecutionCounter struct { ctx interfaces.UnifiedContext provider interfaces.WorkflowProvider - configer *WorkflowConfiger + configer *config.WorkflowConfiger globalVersioner *GlobalVersioner - continueAsNewCounter *ContinueAsNewCounter + continueAsNewCounter *cont.ContinueAsNewCounter stateIdCompletedCounts map[string]int stateIdStartedCounts map[string]int // For creating stateExecutionId: count the stateId for how many times that have been executed @@ -26,7 +28,7 @@ type StateExecutionCounter struct { func NewStateExecutionCounter( ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, - configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, + configer *config.WorkflowConfiger, continueAsNewCounter *cont.ContinueAsNewCounter, ) *StateExecutionCounter { return &StateExecutionCounter{ ctx: ctx, @@ -44,7 +46,7 @@ func RebuildStateExecutionCounter( ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int, totalCurrentlyExecutingCount int, - configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, + configer *config.WorkflowConfiger, continueAsNewCounter *cont.ContinueAsNewCounter, ) *StateExecutionCounter { return &StateExecutionCounter{ ctx: ctx, diff --git a/service/interpreter/timers/greedyTimerProcessor.go b/service/interpreter/timers/greedyTimerProcessor.go index 5e104ad7..6883dd51 100644 --- a/service/interpreter/timers/greedyTimerProcessor.go +++ b/service/interpreter/timers/greedyTimerProcessor.go @@ -1,6 +1,7 @@ package timers import ( + "github.com/indeedeng/iwf/service/interpreter/cont" "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" @@ -23,7 +24,10 @@ type GreedyTimerProcessor struct { } func NewGreedyTimerProcessor( - ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal, + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, + continueAsNewCounter *cont.ContinueAsNewCounter, + staleSkipTimerSignals []service.StaleSkipTimerSignal, ) *GreedyTimerProcessor { tp := &GreedyTimerProcessor{ @@ -35,7 +39,7 @@ func NewGreedyTimerProcessor( } // start some single thread that manages timers - tp.createGreedyTimerScheduler(ctx) + tp.createGreedyTimerScheduler(ctx, continueAsNewCounter) err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) { return service.GetCurrentTimerInfosQueryResponse{ @@ -57,6 +61,10 @@ func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) { insertIndex := 0 for i, timer := range t.timers { if toAdd.FiringUnixTimestampSeconds >= timer.FiringUnixTimestampSeconds { + // don't want dupes. Makes remove simpler + if toAdd == timer { + return + } insertIndex = i break } @@ -67,6 +75,15 @@ func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) { append([]*service.TimerInfo{toAdd}, t.timers[insertIndex:]...)...) } +func (t *sortedTimers) removeTimer(toRemove *service.TimerInfo) { + for i, timer := range t.timers { + if toRemove == timer { + t.timers = append(t.timers[:i], t.timers[i+1:]...) + return + } + } +} + func (t *sortedTimers) pruneToNextTimer(pruneTo int64) *service.TimerInfo { if len(t.timers) == 0 { @@ -86,7 +103,9 @@ func (t *sortedTimers) pruneToNextTimer(pruneTo int64) *service.TimerInfo { return t.timers[index-1] } -func (t *GreedyTimerProcessor) createGreedyTimerScheduler(ctx interfaces.UnifiedContext) { +func (t *GreedyTimerProcessor) createGreedyTimerScheduler( + ctx interfaces.UnifiedContext, + continueAsNewCounter *cont.ContinueAsNewCounter) { t.provider.GoNamed(ctx, "greedy-timer-scheduler", func(ctx interfaces.UnifiedContext) { // NOTE: next timer to fire is at the end of the slice @@ -102,9 +121,13 @@ func (t *GreedyTimerProcessor) createGreedyTimerScheduler(ctx interfaces.Unified } } next := t.pendingTimers.pruneToNextTimer(now) - return next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]) + return (next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1])) || continueAsNewCounter.IsThresholdMet() }) + if continueAsNewCounter.IsThresholdMet() { + break + } + now := t.provider.Now(ctx).Unix() next := t.pendingTimers.pruneToNextTimer(now) //next := t.pendingTimers.getEarliestTimer() @@ -194,10 +217,12 @@ func (t *GreedyTimerProcessor) WaitForTimerFiredOrSkipped( } if timer.FiringUnixTimestampSeconds >= t.provider.Now(ctx).Unix() { + timer.Status = service.TimerFired return service.TimerFired } // otherwise *cancelWaiting should return false to indicate that this timer isn't completed(fired or skipped) + t.pendingTimers.removeTimer(timer) return service.TimerPending } @@ -207,7 +232,7 @@ func (t *GreedyTimerProcessor) RemovePendingTimersOfState(stateExeId string) { timers := t.stateExecutionCurrentTimerInfos[stateExeId] for _, timer := range timers { - timer.Status = service.TimerSkipped + t.pendingTimers.removeTimer(timer) } delete(t.stateExecutionCurrentTimerInfos, stateExeId) diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 736defbb..439dff13 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -7,6 +7,8 @@ import ( "github.com/indeedeng/iwf/service/common/event" "github.com/indeedeng/iwf/service/common/ptr" "github.com/indeedeng/iwf/service/common/utils" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" "github.com/indeedeng/iwf/service/interpreter/env" "github.com/indeedeng/iwf/service/interpreter/interfaces" "github.com/indeedeng/iwf/service/interpreter/timers" @@ -72,7 +74,7 @@ func InterpreterImpl( } } - workflowConfiger := NewWorkflowConfiger(input.Config) + workflowConfiger := config.NewWorkflowConfiger(input.Config) basicInfo := service.BasicInfo{ IwfWorkflowType: input.IwfWorkflowType, IwfWorkerUrl: input.IwfWorkerUrl, @@ -82,7 +84,7 @@ func InterpreterImpl( var stateRequestQueue *StateRequestQueue var persistenceManager *PersistenceManager var timerProcessor interfaces.TimerProcessor - var continueAsNewCounter *ContinueAsNewCounter + var continueAsNewCounter *cont.ContinueAsNewCounter var signalReceiver *SignalReceiver var stateExecutionCounter *StateExecutionCounter var outputCollector *OutputCollector @@ -101,12 +103,12 @@ func InterpreterImpl( internalChannel = RebuildInternalChannel(previous.InterStateChannelReceived) stateRequestQueue = NewStateRequestQueueWithResumeRequests(previous.StatesToStartFromBeginning, previous.StateExecutionsToResume) persistenceManager = RebuildPersistenceManager(provider, previous.DataObjects, previous.SearchAttributes, input.UseMemoForDataAttributes) + continueAsNewCounter = cont.NewContinueAsCounter(workflowConfiger, ctx, provider) if input.Config.GetOptimizeTimer() { - timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) + timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, continueAsNewCounter, previous.StaleSkipTimerSignals) } else { timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) } - continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, previous.SignalsReceived) counterInfo := previous.StateExecutionCounterInfo stateExecutionCounter = RebuildStateExecutionCounter(ctx, provider, globalVersioner, @@ -118,12 +120,12 @@ func InterpreterImpl( internalChannel = NewInternalChannel() stateRequestQueue = NewStateRequestQueue() persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes) + continueAsNewCounter = cont.NewContinueAsCounter(workflowConfiger, ctx, provider) if input.Config.GetOptimizeTimer() { - timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, nil) + timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, continueAsNewCounter, nil) } else { timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, nil) } - continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil) stateExecutionCounter = NewStateExecutionCounter(ctx, provider, globalVersioner, workflowConfiger, continueAsNewCounter) outputCollector = NewOutputCollector(nil) @@ -525,8 +527,8 @@ func processStateExecution( signalReceiver *SignalReceiver, timerProcessor interfaces.TimerProcessor, continueAsNewer *ContinueAsNewer, - continueAsNewCounter *ContinueAsNewCounter, - configer *WorkflowConfiger, + continueAsNewCounter *cont.ContinueAsNewCounter, + configer *config.WorkflowConfiger, shouldSendSignalOnCompletion bool, ) (*iwfidl.StateDecision, service.StateExecutionStatus, error) { waitUntilApi := StateStart @@ -832,7 +834,7 @@ func invokeStateExecute( executionContext iwfidl.Context, commandRes *iwfidl.CommandResults, continueAsNewer *ContinueAsNewer, - configer *WorkflowConfiger, + configer *config.WorkflowConfiger, executeApi interface{}, stateExecutionLocal []iwfidl.KeyValue, shouldSendSignalOnCompletion bool, diff --git a/service/interpreter/workflowUpdater.go b/service/interpreter/workflowUpdater.go index 1391e709..eb2d20db 100644 --- a/service/interpreter/workflowUpdater.go +++ b/service/interpreter/workflowUpdater.go @@ -4,6 +4,8 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/event" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" ) @@ -12,11 +14,11 @@ type WorkflowUpdater struct { persistenceManager *PersistenceManager provider interfaces.WorkflowProvider continueAsNewer *ContinueAsNewer - continueAsNewCounter *ContinueAsNewCounter + continueAsNewCounter *cont.ContinueAsNewCounter internalChannel *InternalChannel signalReceiver *SignalReceiver stateRequestQueue *StateRequestQueue - configer *WorkflowConfiger + configer *config.WorkflowConfiger logger interfaces.UnifiedLogger basicInfo service.BasicInfo globalVersioner *GlobalVersioner @@ -25,7 +27,7 @@ type WorkflowUpdater struct { func NewWorkflowUpdater( ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, - continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger, + continueAsNewer *ContinueAsNewer, continueAsNewCounter *cont.ContinueAsNewCounter, configer *config.WorkflowConfiger, internalChannel *InternalChannel, signalReceiver *SignalReceiver, basicInfo service.BasicInfo, globalVersioner *GlobalVersioner, ) (*WorkflowUpdater, error) { From b1fef0a3b268aca1524e369a2db484cb0837fd7f Mon Sep 17 00:00:00 2001 From: jbowers Date: Mon, 13 Jan 2025 22:56:51 -0600 Subject: [PATCH 4/4] IWF-274: fix prune split logic and tests to pass timer config through to interpreter --- integ/timer_test.go | 39 ++++++++ integ/util.go | 13 +++ service/api/service.go | 3 + .../timers/greedyTimerProcessor.go | 93 ++++++++++--------- service/interpreter/timers/timerScheduler.go | 1 + 5 files changed, 107 insertions(+), 42 deletions(-) create mode 100644 service/interpreter/timers/timerScheduler.go diff --git a/integ/timer_test.go b/integ/timer_test.go index bb1521c4..198cba7e 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -57,6 +57,45 @@ func TestTimerWorkflowCadenceContinueAsNew(t *testing.T) { } // TODO: create greedy tests by copying these 4 tests and pass in OptimizeTimer: true +func TestGreedyTimerWorkflowTemporal(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumGreedyTimerConfig(true, false)) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowCadence(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeCadence, minimumGreedyTimerConfig(true, false)) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowTemporalContinueAsNew(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowCadenceContinueAsNew(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) + smallWaitForFastTest() + } +} func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { // start test workflow server diff --git a/integ/util.go b/integ/util.go index 6fdef79e..f6d410e5 100644 --- a/integ/util.go +++ b/integ/util.go @@ -220,6 +220,19 @@ func minimumContinueAsNewConfig(optimizeActivity bool) *iwfidl.WorkflowConfig { } } +func minimumGreedyTimerConfig(optimizeTimer bool, continueAsNew bool) *iwfidl.WorkflowConfig { + if continueAsNew { + return &iwfidl.WorkflowConfig{ + ContinueAsNewThreshold: iwfidl.PtrInt32(1), + OptimizeTimer: iwfidl.PtrBool(optimizeTimer), + } + } + + return &iwfidl.WorkflowConfig{ + OptimizeTimer: iwfidl.PtrBool(optimizeTimer), + } +} + func minimumContinueAsNewConfigV0() *iwfidl.WorkflowConfig { return minimumContinueAsNewConfig(false) } diff --git a/service/api/service.go b/service/api/service.go index 2be54a4f..c48739d5 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -196,6 +196,9 @@ func overrideWorkflowConfig(configOverride iwfidl.WorkflowConfig, workflowConfig if configOverride.OptimizeActivity != nil { workflowConfig.OptimizeActivity = configOverride.OptimizeActivity } + if configOverride.OptimizeTimer != nil { + workflowConfig.OptimizeTimer = configOverride.OptimizeTimer + } } func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion( diff --git a/service/interpreter/timers/greedyTimerProcessor.go b/service/interpreter/timers/greedyTimerProcessor.go index 6883dd51..6cd8d8da 100644 --- a/service/interpreter/timers/greedyTimerProcessor.go +++ b/service/interpreter/timers/greedyTimerProcessor.go @@ -9,14 +9,15 @@ import ( "github.com/indeedeng/iwf/service" ) -type sortedTimers struct { - status service.InternalTimerStatus - // Ordered slice of all timers being awaited on - timers []*service.TimerInfo +type TimerManager struct { + // Does not map to the timers actually created by the workflow provider + PendingScheduling []*service.TimerInfo + // timers created through the workflow provider that are going to fire + ScheduledTimerTimes []int64 } type GreedyTimerProcessor struct { - pendingTimers sortedTimers + timerManger TimerManager stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo staleSkipTimerSignals []service.StaleSkipTimerSignal provider interfaces.WorkflowProvider @@ -32,13 +33,13 @@ func NewGreedyTimerProcessor( tp := &GreedyTimerProcessor{ provider: provider, - pendingTimers: sortedTimers{status: service.TimerPending}, + timerManger: TimerManager{}, stateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{}, logger: provider.GetLogger(ctx), staleSkipTimerSignals: staleSkipTimerSignals, } - // start some single thread that manages timers + // start some single thread that manages PendingScheduling tp.createGreedyTimerScheduler(ctx, continueAsNewCounter) err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) { @@ -52,14 +53,14 @@ func NewGreedyTimerProcessor( return tp } -func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) { +func (t *TimerManager) addTimer(toAdd *service.TimerInfo) { - if toAdd == nil || toAdd.Status != t.status { + if toAdd == nil || toAdd.Status != service.TimerPending { panic("invalid timer added") } insertIndex := 0 - for i, timer := range t.timers { + for i, timer := range t.PendingScheduling { if toAdd.FiringUnixTimestampSeconds >= timer.FiringUnixTimestampSeconds { // don't want dupes. Makes remove simpler if toAdd == timer { @@ -70,37 +71,45 @@ func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) { } insertIndex = i + 1 } - t.timers = append( - t.timers[:insertIndex], - append([]*service.TimerInfo{toAdd}, t.timers[insertIndex:]...)...) + t.PendingScheduling = append( + t.PendingScheduling[:insertIndex], + append([]*service.TimerInfo{toAdd}, t.PendingScheduling[insertIndex:]...)...) } -func (t *sortedTimers) removeTimer(toRemove *service.TimerInfo) { - for i, timer := range t.timers { +func (t *TimerManager) removeTimer(toRemove *service.TimerInfo) { + for i, timer := range t.PendingScheduling { if toRemove == timer { - t.timers = append(t.timers[:i], t.timers[i+1:]...) + t.PendingScheduling = append(t.PendingScheduling[:i], t.PendingScheduling[i+1:]...) return } } } -func (t *sortedTimers) pruneToNextTimer(pruneTo int64) *service.TimerInfo { +func (t *TimerManager) pruneToNextTimer(pruneTo int64) *service.TimerInfo { - if len(t.timers) == 0 { + if len(t.PendingScheduling) == 0 { return nil } - index := len(t.timers) + index := len(t.PendingScheduling) - for i := len(t.timers) - 1; i >= 0; i-- { - timer := t.timers[i] - if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == t.status { + for i := len(t.PendingScheduling) - 1; i >= 0; i-- { + timer := t.PendingScheduling[i] + if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == service.TimerPending { break } index = i } - t.timers = t.timers[:index] - return t.timers[index-1] + + // If index is 0, it means all timers are pruned + if index == 0 { + t.PendingScheduling = nil + return nil + } + + prunedTimer := t.PendingScheduling[index-1] + t.PendingScheduling = t.PendingScheduling[:index] + return prunedTimer } func (t *GreedyTimerProcessor) createGreedyTimerScheduler( @@ -108,20 +117,18 @@ func (t *GreedyTimerProcessor) createGreedyTimerScheduler( continueAsNewCounter *cont.ContinueAsNewCounter) { t.provider.GoNamed(ctx, "greedy-timer-scheduler", func(ctx interfaces.UnifiedContext) { - // NOTE: next timer to fire is at the end of the slice - var createdTimers []int64 for { - t.provider.Await(ctx, func() bool { - // remove fired timers + _ = t.provider.Await(ctx, func() bool { + // remove fired PendingScheduling now := t.provider.Now(ctx).Unix() - for i := len(createdTimers) - 1; i >= 0; i-- { - if createdTimers[i] > now { - createdTimers = createdTimers[:i+1] + for i := len(t.timerManger.ScheduledTimerTimes) - 1; i >= 0; i-- { + if t.timerManger.ScheduledTimerTimes[i] > now { + t.timerManger.ScheduledTimerTimes = t.timerManger.ScheduledTimerTimes[:i+1] break } } - next := t.pendingTimers.pruneToNextTimer(now) - return (next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1])) || continueAsNewCounter.IsThresholdMet() + next := t.timerManger.pruneToNextTimer(now) + return (next != nil && (len(t.timerManger.ScheduledTimerTimes) == 0 || next.FiringUnixTimestampSeconds < t.timerManger.ScheduledTimerTimes[len(t.timerManger.ScheduledTimerTimes)-1])) || continueAsNewCounter.IsThresholdMet() }) if continueAsNewCounter.IsThresholdMet() { @@ -129,14 +136,14 @@ func (t *GreedyTimerProcessor) createGreedyTimerScheduler( } now := t.provider.Now(ctx).Unix() - next := t.pendingTimers.pruneToNextTimer(now) - //next := t.pendingTimers.getEarliestTimer() + next := t.timerManger.pruneToNextTimer(now) + //next := t.timerManger.getEarliestTimer() // only create a new timer when a pending timer exists before the next existing timer fires - if next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]) { + if next != nil && (len(t.timerManger.ScheduledTimerTimes) == 0 || next.FiringUnixTimestampSeconds < t.timerManger.ScheduledTimerTimes[len(t.timerManger.ScheduledTimerTimes)-1]) { fireAt := next.FiringUnixTimestampSeconds duration := time.Duration(fireAt-now) * time.Second t.provider.NewTimer(ctx, duration) - createdTimers = append(createdTimers, fireAt) + t.timerManger.ScheduledTimerTimes = append(t.timerManger.ScheduledTimerTimes, fireAt) } } }) @@ -216,23 +223,23 @@ func (t *GreedyTimerProcessor) WaitForTimerFiredOrSkipped( return service.TimerSkipped } - if timer.FiringUnixTimestampSeconds >= t.provider.Now(ctx).Unix() { + if timer.FiringUnixTimestampSeconds <= t.provider.Now(ctx).Unix() { timer.Status = service.TimerFired return service.TimerFired } // otherwise *cancelWaiting should return false to indicate that this timer isn't completed(fired or skipped) - t.pendingTimers.removeTimer(timer) + t.timerManger.removeTimer(timer) return service.TimerPending } -// RemovePendingTimersOfState is for when a state is completed, remove all its pending timers +// RemovePendingTimersOfState is for when a state is completed, remove all its pending PendingScheduling func (t *GreedyTimerProcessor) RemovePendingTimersOfState(stateExeId string) { timers := t.stateExecutionCurrentTimerInfos[stateExeId] for _, timer := range timers { - t.pendingTimers.removeTimer(timer) + t.timerManger.removeTimer(timer) } delete(t.stateExecutionCurrentTimerInfos, stateExeId) @@ -256,7 +263,9 @@ func (t *GreedyTimerProcessor) AddTimers( FiringUnixTimestampSeconds: cmd.GetFiringUnixTimestampSeconds(), Status: service.TimerPending, } - t.pendingTimers.addTimer(&timer) + } + if timer.Status == service.TimerPending { + t.timerManger.addTimer(&timer) } timers[idx] = &timer } diff --git a/service/interpreter/timers/timerScheduler.go b/service/interpreter/timers/timerScheduler.go new file mode 100644 index 00000000..4eba3151 --- /dev/null +++ b/service/interpreter/timers/timerScheduler.go @@ -0,0 +1 @@ +package timers