Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/improve waitForStateCompletion error handling #363

Merged
merged 10 commits into from
Feb 21, 2024
2 changes: 1 addition & 1 deletion integ/get_with_wait_timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func doTestWorkflowWithWaitTimeout(t *testing.T, backendType service.BackendType
assertions.Nil(err)
err = json.Unmarshal(body, &errResp)
assertions.Equalf(iwfidl.ErrorResponse{
Detail: ptr.Any("workflow is still running, waiting has exceeded timeout limit"),
Detail: ptr.Any("workflow is still running, waiting has exceeded timeout limit, please retry"),
SubStatus: iwfidl.LONG_POLL_TIME_OUT_SUB_STATUS.Ptr(),
}, errResp, "body")

Expand Down
62 changes: 46 additions & 16 deletions service/api/cadence/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
return ok
}

func (t *cadenceClient) IsWorkflowTimeoutError(err error) bool {
return realcadence.IsTimeoutError(err)
}

func (t *cadenceClient) IsRequestTimeoutError(err error) bool {
return errors.Is(err, context.DeadlineExceeded)
}
Expand All @@ -63,7 +67,10 @@
return fmt.Errorf("not an application error. Critical code bug")
}

func NewCadenceClient(domain string, cClient client.Client, serviceClient workflowserviceclient.Interface, converter encoded.DataConverter, closeFunc func()) uclient.UnifiedClient {
func NewCadenceClient(
domain string, cClient client.Client, serviceClient workflowserviceclient.Interface,
converter encoded.DataConverter, closeFunc func(),
) uclient.UnifiedClient {
return &cadenceClient{
domain: domain,
cClient: cClient,
Expand All @@ -77,7 +84,9 @@
t.closeFunc()
}

func (t *cadenceClient) StartInterpreterWorkflow(ctx context.Context, options uclient.StartWorkflowOptions, args ...interface{}) (runId string, err error) {
func (t *cadenceClient) StartInterpreterWorkflow(
ctx context.Context, options uclient.StartWorkflowOptions, args ...interface{},
) (runId string, err error) {
_, ok := options.Memo[service.UseMemoForDataAttributesKey]
if ok {
return "", fmt.Errorf("using Memo is not supported with Cadence, see https://github.com/uber/cadence/issues/3729")
Expand Down Expand Up @@ -114,29 +123,34 @@
return run.RunID, nil
}

func (t *cadenceClient) StartWaitForStateCompletionWorkflow(ctx context.Context, options uclient.StartWorkflowOptions) (runId string, err error) {
func (t *cadenceClient) StartWaitForStateCompletionWorkflow(
ctx context.Context, options uclient.StartWorkflowOptions,
) (runId string, err error) {
workflowOptions := client.StartWorkflowOptions{
ID: options.ID,
TaskList: options.TaskQueue,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyRejectDuplicate,
ExecutionStartToCloseTimeout: options.WorkflowExecutionTimeout, // TODO, make this configurable
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicateFailedOnly, // the workflow could be timeout, so we allow duplicate
ExecutionStartToCloseTimeout: options.WorkflowExecutionTimeout,
}
run, err := t.cClient.StartWorkflow(ctx, workflowOptions, cadence.WaitforStateCompletionWorkflow)
if err != nil {
if t.IsWorkflowAlreadyStartedError(err) {
// if the workflow is already started, we return the runId

Check warning on line 138 in service/api/cadence/client.go

View check run for this annotation

Codecov / codecov/patch

service/api/cadence/client.go#L138

Added line #L138 was not covered by tests
return *err.(*shared.WorkflowExecutionAlreadyStartedError).RunId, nil
}
return "", err
}
return run.RunID, nil
}

func (t *cadenceClient) SignalWithStartWaitForStateCompletionWorkflow(ctx context.Context, options uclient.StartWorkflowOptions, stateCompletionOutput iwfidl.StateCompletionOutput) error {
func (t *cadenceClient) SignalWithStartWaitForStateCompletionWorkflow(
ctx context.Context, options uclient.StartWorkflowOptions, stateCompletionOutput iwfidl.StateCompletionOutput,
) error {
workflowOptions := client.StartWorkflowOptions{
ID: options.ID,
TaskList: options.TaskQueue,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyRejectDuplicate,
ExecutionStartToCloseTimeout: options.WorkflowExecutionTimeout, // TODO, make this configurable
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicateFailedOnly, // the workflow could be timeout, so we allow duplicate
ExecutionStartToCloseTimeout: options.WorkflowExecutionTimeout,
}

_, err := t.cClient.SignalWithStartWorkflow(ctx, options.ID, service.StateCompletionSignalChannelName, stateCompletionOutput, workflowOptions, cadence.WaitforStateCompletionWorkflow)
Expand All @@ -146,7 +160,9 @@
return nil
}

func (t *cadenceClient) SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error {
func (t *cadenceClient) SignalWorkflow(
ctx context.Context, workflowID string, runID string, signalName string, arg interface{},
) error {
return t.cClient.SignalWorkflow(ctx, workflowID, runID, signalName, arg)
}

Expand All @@ -165,7 +181,9 @@
return t.cClient.TerminateWorkflow(ctx, workflowID, runID, reasonStr, nil)
}

func (t *cadenceClient) ListWorkflow(ctx context.Context, request *uclient.ListWorkflowExecutionsRequest) (*uclient.ListWorkflowExecutionsResponse, error) {
func (t *cadenceClient) ListWorkflow(
ctx context.Context, request *uclient.ListWorkflowExecutionsRequest,
) (*uclient.ListWorkflowExecutionsResponse, error) {
listReq := &shared.ListWorkflowExecutionsRequest{
PageSize: &request.PageSize,
Query: &request.Query,
Expand All @@ -188,15 +206,19 @@
}, nil
}

func (t *cadenceClient) QueryWorkflow(ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{}) error {
func (t *cadenceClient) QueryWorkflow(
ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{},
) error {
qres, err := queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args)
if err != nil {
return err
}
return qres.Get(valuePtr)
}

func queryWorkflowWithStrongConsistency(t *cadenceClient, ctx context.Context, workflowID string, runID string, queryType string, args []interface{}) (encoded.Value, error) {
func queryWorkflowWithStrongConsistency(
t *cadenceClient, ctx context.Context, workflowID string, runID string, queryType string, args []interface{},
) (encoded.Value, error) {
queryWorkflowWithOptionsRequest := &client.QueryWorkflowWithOptionsRequest{
WorkflowID: workflowID,
RunID: runID,
Expand All @@ -211,7 +233,9 @@
return result.QueryResult, nil
}

func (t *cadenceClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType) (*uclient.DescribeWorkflowExecutionResponse, error) {
func (t *cadenceClient) DescribeWorkflowExecution(
ctx context.Context, workflowID, runID string, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType,
) (*uclient.DescribeWorkflowExecutionResponse, error) {
resp, err := t.cClient.DescribeWorkflowExecution(ctx, workflowID, runID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -295,16 +319,22 @@
}
}

func (t *cadenceClient) GetWorkflowResult(ctx context.Context, valuePtr interface{}, workflowID string, runID string) error {
func (t *cadenceClient) GetWorkflowResult(
ctx context.Context, valuePtr interface{}, workflowID string, runID string,
) error {
run := t.cClient.GetWorkflow(ctx, workflowID, runID)
return run.Get(ctx, valuePtr)
}

func (t *cadenceClient) SynchronousUpdateWorkflow(ctx context.Context, valuePtr interface{}, workflowID, runID, updateType string, input interface{}) error {
func (t *cadenceClient) SynchronousUpdateWorkflow(
ctx context.Context, valuePtr interface{}, workflowID, runID, updateType string, input interface{},
) error {
return fmt.Errorf("not supported in Cadence")
}

func (t *cadenceClient) ResetWorkflow(ctx context.Context, request iwfidl.WorkflowResetRequest) (newRunId string, err error) {
func (t *cadenceClient) ResetWorkflow(
ctx context.Context, request iwfidl.WorkflowResetRequest,
) (newRunId string, err error) {

reqRunId := request.GetWorkflowRunId()
if reqRunId == "" {
Expand Down
20 changes: 15 additions & 5 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,31 @@
options := uclient.StartWorkflowOptions{
ID: workflowId,
TaskQueue: s.taskQueue,
}

if req.WaitTimeSeconds != nil {
options.WorkflowExecutionTimeout = time.Duration(*req.WaitTimeSeconds) * time.Second
// TODO: https://github.com/indeedeng/iwf-java-sdk/issues/218
// it doesn't seem to have a way for SDK to know the timeout at this API
// So hardcoded to 1 minute for now. If it timeouts, the IDReusePolicy will restart a new one
WorkflowExecutionTimeout: 60 * time.Second,
}

runId, err := s.client.StartWaitForStateCompletionWorkflow(ctx, options)
if err != nil {
// TODO fix error handling so that this API can be called again to wait for multiple times

Check warning on line 149 in service/api/service.go

View check run for this annotation

Codecov / codecov/patch

service/api/service.go#L149

Added line #L149 was not covered by tests
return nil, s.handleError(err)
}

subCtx, cancFunc := utils.TrimContextByTimeoutWithCappedDDL(ctx, req.WaitTimeSeconds, s.config.Api.MaxWaitSeconds)
defer cancFunc()
var output service.WaitForStateCompletionWorkflowOutput
getErr := s.client.GetWorkflowResult(subCtx, &output, workflowId, runId)

if s.client.IsRequestTimeoutError(getErr) || s.client.IsWorkflowTimeoutError(getErr) {
// the workflow is still running, but the wait has exceeded limit
return nil, errors.NewErrorAndStatus(
service.HttpStatusCodeSpecial4xxError1,
iwfidl.LONG_POLL_TIME_OUT_SUB_STATUS,
"waiting has exceeded timeout limit, please retry")
}

Check warning on line 164 in service/api/service.go

View check run for this annotation

Codecov / codecov/patch

service/api/service.go#L159-L164

Added lines #L159 - L164 were not covered by tests

if getErr != nil {
return nil, s.handleError(getErr)
}
Expand Down Expand Up @@ -375,7 +385,7 @@
return nil, errors.NewErrorAndStatus(
service.HttpStatusCodeSpecial4xxError1,
iwfidl.LONG_POLL_TIME_OUT_SUB_STATUS,
"workflow is still running, waiting has exceeded timeout limit")
"workflow is still running, waiting has exceeded timeout limit, please retry")
}

var outputsToReturnWf []iwfidl.StateCompletionOutput
Expand Down
9 changes: 7 additions & 2 deletions service/api/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
return ok
}

func (t *temporalClient) IsWorkflowTimeoutError(err error) bool {
return realtemporal.IsTimeoutError(err)
}

func (t *temporalClient) GetApplicationErrorTypeIfIsApplicationError(err error) string {
var applicationError *realtemporal.ApplicationError
isAppErr := errors.As(err, &applicationError)
Expand Down Expand Up @@ -150,13 +154,14 @@
) (runId string, err error) {
workflowOptions := client.StartWorkflowOptions{
ID: options.ID,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, // the workflow could be timeout, so we allow duplicate
TaskQueue: options.TaskQueue,
WorkflowExecutionTimeout: options.WorkflowExecutionTimeout,
}

run, err := t.tClient.ExecuteWorkflow(ctx, workflowOptions, temporal.WaitforStateCompletionWorkflow)
if err != nil {
// because of WorkflowExecutionErrorWhenAlreadyStarted: false, we won't get WorkflowAlreadyStartedError as we do in Cadence

Check warning on line 164 in service/api/temporal/client.go

View check run for this annotation

Codecov / codecov/patch

service/api/temporal/client.go#L164

Added line #L164 was not covered by tests
return "", err
}
return run.GetRunID(), nil
Expand All @@ -167,7 +172,7 @@
) error {
workflowOptions := client.StartWorkflowOptions{
ID: options.ID,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, // the workflow could be timeout, so we allow duplicate
TaskQueue: options.TaskQueue,
WorkflowExecutionTimeout: options.WorkflowExecutionTimeout,
}
Expand Down
22 changes: 17 additions & 5 deletions service/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,28 @@ import (
type UnifiedClient interface {
Close()
errorHandler
StartInterpreterWorkflow(ctx context.Context, options StartWorkflowOptions, args ...interface{}) (runId string, err error)
StartInterpreterWorkflow(
ctx context.Context, options StartWorkflowOptions, args ...interface{},
) (runId string, err error)
StartWaitForStateCompletionWorkflow(ctx context.Context, options StartWorkflowOptions) (runId string, err error)
SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error
SignalWithStartWaitForStateCompletionWorkflow(ctx context.Context, options StartWorkflowOptions, stateCompletionOutput iwfidl.StateCompletionOutput) error
SignalWithStartWaitForStateCompletionWorkflow(
ctx context.Context, options StartWorkflowOptions, stateCompletionOutput iwfidl.StateCompletionOutput,
) error
CancelWorkflow(ctx context.Context, workflowID string, runID string) error
TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string) error
ListWorkflow(ctx context.Context, request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
QueryWorkflow(ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{}) error // TODO it doesn't return error correctly... the error is nil when query handler is not implemented
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType) (*DescribeWorkflowExecutionResponse, error)
QueryWorkflow(
ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string,
args ...interface{},
) error // TODO it doesn't return error correctly... the error is nil when query handler is not implemented
DescribeWorkflowExecution(
ctx context.Context, workflowID, runID string, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType,
) (*DescribeWorkflowExecutionResponse, error)
GetWorkflowResult(ctx context.Context, valuePtr interface{}, workflowID string, runID string) error
SynchronousUpdateWorkflow(ctx context.Context, valuePtr interface{}, workflowID, runID, updateType string, input interface{}) error
SynchronousUpdateWorkflow(
ctx context.Context, valuePtr interface{}, workflowID, runID, updateType string, input interface{},
) error
ResetWorkflow(ctx context.Context, request iwfidl.WorkflowResetRequest) (runId string, err error)
}

Expand All @@ -30,6 +41,7 @@ type errorHandler interface {
IsWorkflowAlreadyStartedError(error) bool
IsNotFoundError(error) bool
IsRequestTimeoutError(error) bool
IsWorkflowTimeoutError(error) bool
}

type StartWorkflowOptions struct {
Expand Down
25 changes: 15 additions & 10 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"github.com/indeedeng/iwf/service"
)

func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) {
func InterpreterImpl(
ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput,
) (*service.InterpreterWorkflowOutput, error) {
var err error
globalVersioner := NewGlobalVersioner(provider, ctx)
if globalVersioner.IsAfterVersionOfUsingGlobalVersioning() {
Expand Down Expand Up @@ -314,7 +316,8 @@
}

func checkClosingWorkflow(
ctx UnifiedContext, provider WorkflowProvider, decision *iwfidl.StateDecision, currentStateId, currentStateExeId string,
ctx UnifiedContext, provider WorkflowProvider, decision *iwfidl.StateDecision,
currentStateId, currentStateExeId string,
internalChannel *InterStateChannel, signalReceiver *SignalReceiver,
) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) {
if decision.HasConditionalClose() {
Expand Down Expand Up @@ -467,7 +470,7 @@
skipStart := compatibility.GetSkipStartApi(&options)
if skipStart {
return executeStateDecide(ctx, provider, basicInfo, state, stateExeId, persistenceManager, interStateChannel, executionContext,
nil, continueAsNewer, executeApi, stateExecutionLocal, shouldSendSignalOnCompletion, info.WorkflowExecutionTimeout)
nil, continueAsNewer, executeApi, stateExecutionLocal, shouldSendSignalOnCompletion)
}

if isResumeFromContinueAsNew {
Expand Down Expand Up @@ -686,7 +689,7 @@
}

return executeStateDecide(ctx, provider, basicInfo, state, stateExeId, persistenceManager, interStateChannel, executionContext,
commandRes, continueAsNewer, executeApi, stateExecutionLocal, shouldSendSignalOnCompletion, info.WorkflowExecutionTimeout)
commandRes, continueAsNewer, executeApi, stateExecutionLocal, shouldSendSignalOnCompletion)
}
func executeStateDecide(
ctx UnifiedContext,
Expand All @@ -702,7 +705,6 @@
executeApi interface{},
stateExecutionLocal []iwfidl.KeyValue,
shouldSendSignalOnCompletion bool,
workflowTimeout time.Duration,
) (*iwfidl.StateDecision, service.StateExecutionStatus, error) {
var err error
activityOptions := ActivityOptions{
Expand Down Expand Up @@ -733,7 +735,7 @@
DataObjects: persistenceManager.LoadDataObjects(ctx, doLoadingPolicy),
StateInput: state.StateInput,
},
}, shouldSendSignalOnCompletion, workflowTimeout).Get(ctx, &decideResponse)
}, false, 0).Get(ctx, &decideResponse)
persistenceManager.UnlockPersistence(saLoadingPolicy, doLoadingPolicy)
if err == nil && shouldSendSignalOnCompletion && !provider.IsReplaying(ctx) {
// NOTE: here uses NOT IsReplaying to signalWithStart, to save an activity for this operation
Expand All @@ -744,13 +746,14 @@
uclient.StartWorkflowOptions{
ID: service.IwfSystemConstPrefix + executionContext.WorkflowId + "_" + *executionContext.StateExecutionId,
TaskQueue: env.GetTaskQueue(),
WorkflowExecutionTimeout: workflowTimeout,
WorkflowExecutionTimeout: 60 * time.Second, // timeout doesn't matter here as it will complete immediate with the signal
},
iwfidl.StateCompletionOutput{
CompletedStateExecutionId: *executionContext.StateExecutionId,
})
if err != nil {
// for any reasons this fail, just panic and the workflow task will retry
if err != nil && !unifiedClient.IsWorkflowAlreadyStartedError(err) {
// WorkflowAlreadyStartedError is returned when the started workflow is closed and the signal is not sent
// panic will let the workflow task will retry until the signal is sent

Check warning on line 756 in service/interpreter/workflowImpl.go

View check run for this annotation

Codecov / codecov/patch

service/interpreter/workflowImpl.go#L755-L756

Added lines #L755 - L756 were not covered by tests
panic(fmt.Errorf("failed to signal on completion %w", err))
}
}
Expand Down Expand Up @@ -818,7 +821,9 @@
)
}

func WaitForStateCompletionWorkflowImpl(ctx UnifiedContext, provider WorkflowProvider) (*service.WaitForStateCompletionWorkflowOutput, error) {
func WaitForStateCompletionWorkflowImpl(
ctx UnifiedContext, provider WorkflowProvider,
) (*service.WaitForStateCompletionWorkflowOutput, error) {
signalReceiveChannel := provider.GetSignalChannel(ctx, service.StateCompletionSignalChannelName)
var signalValue iwfidl.StateCompletionOutput
signalReceiveChannel.ReceiveBlocking(ctx, &signalValue)
Expand Down
Loading