From 9000724688495fe5e8ce1b3cddcdae4c06d4a197 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Fri, 26 Jul 2024 10:41:25 -0700 Subject: [PATCH] Support WaitForCompletionStateIds and add local activity input debug (#407) --- gen/iwfidl/api/openapi.yaml | 14 ++++++++ gen/iwfidl/docs/TimerCommand.md | 8 ++--- gen/iwfidl/docs/WorkflowStartRequest.md | 26 ++++++++++++++ .../docs/WorkflowStateDecideResponse.md | 26 ++++++++++++++ gen/iwfidl/docs/WorkflowStateStartResponse.md | 26 ++++++++++++++ gen/iwfidl/model_timer_command.go | 12 +++---- gen/iwfidl/model_workflow_start_request.go | 36 +++++++++++++++++++ .../model_workflow_state_decide_response.go | 36 +++++++++++++++++++ .../model_workflow_state_start_response.go | 36 +++++++++++++++++++ integ/wait_for_state_completion_test.go | 23 ++++++++---- integ/workflow/timer/routers.go | 6 ++-- iwf-idl | 2 +- script/http/local/home.http | 2 +- service/api/service.go | 1 + service/interfaces.go | 1 + service/interpreter/activityImpl.go | 32 +++++++++++++---- service/interpreter/workflowImpl.go | 4 ++- 17 files changed, 263 insertions(+), 28 deletions(-) diff --git a/gen/iwfidl/api/openapi.yaml b/gen/iwfidl/api/openapi.yaml index ee6ac115..3c991400 100644 --- a/gen/iwfidl/api/openapi.yaml +++ b/gen/iwfidl/api/openapi.yaml @@ -783,6 +783,9 @@ components: WorkflowStartRequest: example: startStateId: startStateId + waitForCompletionStateIds: + - waitForCompletionStateIds + - waitForCompletionStateIds workflowTimeoutSeconds: 0 waitForCompletionStateExecutionIds: - waitForCompletionStateExecutionIds @@ -903,6 +906,10 @@ components: type: string startStateId: type: string + waitForCompletionStateIds: + items: + type: string + type: array waitForCompletionStateExecutionIds: items: type: string @@ -1777,6 +1784,7 @@ components: data: data encoding: encoding key: key + localActivityInput: localActivityInput commandRequest: signalCommands: - signalChannelName: signalChannelName @@ -1840,6 +1848,8 @@ components: encoding: encoding key: key properties: + localActivityInput: + type: string upsertSearchAttributes: items: $ref: '#/components/schemas/SearchAttribute' @@ -2147,6 +2157,7 @@ components: encoding: encoding channelName: channelName conditionalCloseType: null + localActivityInput: localActivityInput publishToInterStateChannel: - channelName: channelName value: @@ -2175,6 +2186,8 @@ components: encoding: encoding key: key properties: + localActivityInput: + type: string stateDecision: $ref: '#/components/schemas/StateDecision' upsertSearchAttributes: @@ -2632,6 +2645,7 @@ components: format: int64 type: integer durationSeconds: + format: int64 type: integer required: - commandId diff --git a/gen/iwfidl/docs/TimerCommand.md b/gen/iwfidl/docs/TimerCommand.md index 743a3e91..74f3c61a 100644 --- a/gen/iwfidl/docs/TimerCommand.md +++ b/gen/iwfidl/docs/TimerCommand.md @@ -6,7 +6,7 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **CommandId** | **string** | | **FiringUnixTimestampSeconds** | Pointer to **int64** | | [optional] -**DurationSeconds** | Pointer to **int32** | | [optional] +**DurationSeconds** | Pointer to **int64** | | [optional] ## Methods @@ -74,20 +74,20 @@ HasFiringUnixTimestampSeconds returns a boolean if a field has been set. ### GetDurationSeconds -`func (o *TimerCommand) GetDurationSeconds() int32` +`func (o *TimerCommand) GetDurationSeconds() int64` GetDurationSeconds returns the DurationSeconds field if non-nil, zero value otherwise. ### GetDurationSecondsOk -`func (o *TimerCommand) GetDurationSecondsOk() (*int32, bool)` +`func (o *TimerCommand) GetDurationSecondsOk() (*int64, bool)` GetDurationSecondsOk returns a tuple with the DurationSeconds field if it's non-nil, zero value otherwise and a boolean to check if the value has been set. ### SetDurationSeconds -`func (o *TimerCommand) SetDurationSeconds(v int32)` +`func (o *TimerCommand) SetDurationSeconds(v int64)` SetDurationSeconds sets DurationSeconds field to given value. diff --git a/gen/iwfidl/docs/WorkflowStartRequest.md b/gen/iwfidl/docs/WorkflowStartRequest.md index f3db6696..357b477d 100644 --- a/gen/iwfidl/docs/WorkflowStartRequest.md +++ b/gen/iwfidl/docs/WorkflowStartRequest.md @@ -9,6 +9,7 @@ Name | Type | Description | Notes **WorkflowTimeoutSeconds** | **int32** | | **IwfWorkerUrl** | **string** | | **StartStateId** | Pointer to **string** | | [optional] +**WaitForCompletionStateIds** | Pointer to **[]string** | | [optional] **WaitForCompletionStateExecutionIds** | Pointer to **[]string** | | [optional] **StateInput** | Pointer to [**EncodedObject**](EncodedObject.md) | | [optional] **StateOptions** | Pointer to [**WorkflowStateOptions**](WorkflowStateOptions.md) | | [optional] @@ -138,6 +139,31 @@ SetStartStateId sets StartStateId field to given value. HasStartStateId returns a boolean if a field has been set. +### GetWaitForCompletionStateIds + +`func (o *WorkflowStartRequest) GetWaitForCompletionStateIds() []string` + +GetWaitForCompletionStateIds returns the WaitForCompletionStateIds field if non-nil, zero value otherwise. + +### GetWaitForCompletionStateIdsOk + +`func (o *WorkflowStartRequest) GetWaitForCompletionStateIdsOk() (*[]string, bool)` + +GetWaitForCompletionStateIdsOk returns a tuple with the WaitForCompletionStateIds field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetWaitForCompletionStateIds + +`func (o *WorkflowStartRequest) SetWaitForCompletionStateIds(v []string)` + +SetWaitForCompletionStateIds sets WaitForCompletionStateIds field to given value. + +### HasWaitForCompletionStateIds + +`func (o *WorkflowStartRequest) HasWaitForCompletionStateIds() bool` + +HasWaitForCompletionStateIds returns a boolean if a field has been set. + ### GetWaitForCompletionStateExecutionIds `func (o *WorkflowStartRequest) GetWaitForCompletionStateExecutionIds() []string` diff --git a/gen/iwfidl/docs/WorkflowStateDecideResponse.md b/gen/iwfidl/docs/WorkflowStateDecideResponse.md index 304fd8bc..4619214b 100644 --- a/gen/iwfidl/docs/WorkflowStateDecideResponse.md +++ b/gen/iwfidl/docs/WorkflowStateDecideResponse.md @@ -4,6 +4,7 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- +**LocalActivityInput** | Pointer to **string** | | [optional] **StateDecision** | Pointer to [**StateDecision**](StateDecision.md) | | [optional] **UpsertSearchAttributes** | Pointer to [**[]SearchAttribute**](SearchAttribute.md) | | [optional] **UpsertDataObjects** | Pointer to [**[]KeyValue**](KeyValue.md) | | [optional] @@ -30,6 +31,31 @@ NewWorkflowStateDecideResponseWithDefaults instantiates a new WorkflowStateDecid This constructor will only assign default values to properties that have it defined, but it doesn't guarantee that properties required by API are set +### GetLocalActivityInput + +`func (o *WorkflowStateDecideResponse) GetLocalActivityInput() string` + +GetLocalActivityInput returns the LocalActivityInput field if non-nil, zero value otherwise. + +### GetLocalActivityInputOk + +`func (o *WorkflowStateDecideResponse) GetLocalActivityInputOk() (*string, bool)` + +GetLocalActivityInputOk returns a tuple with the LocalActivityInput field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetLocalActivityInput + +`func (o *WorkflowStateDecideResponse) SetLocalActivityInput(v string)` + +SetLocalActivityInput sets LocalActivityInput field to given value. + +### HasLocalActivityInput + +`func (o *WorkflowStateDecideResponse) HasLocalActivityInput() bool` + +HasLocalActivityInput returns a boolean if a field has been set. + ### GetStateDecision `func (o *WorkflowStateDecideResponse) GetStateDecision() StateDecision` diff --git a/gen/iwfidl/docs/WorkflowStateStartResponse.md b/gen/iwfidl/docs/WorkflowStateStartResponse.md index 8c8c7232..af4b1582 100644 --- a/gen/iwfidl/docs/WorkflowStateStartResponse.md +++ b/gen/iwfidl/docs/WorkflowStateStartResponse.md @@ -4,6 +4,7 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- +**LocalActivityInput** | Pointer to **string** | | [optional] **UpsertSearchAttributes** | Pointer to [**[]SearchAttribute**](SearchAttribute.md) | | [optional] **UpsertDataObjects** | Pointer to [**[]KeyValue**](KeyValue.md) | | [optional] **CommandRequest** | Pointer to [**CommandRequest**](CommandRequest.md) | | [optional] @@ -30,6 +31,31 @@ NewWorkflowStateStartResponseWithDefaults instantiates a new WorkflowStateStartR This constructor will only assign default values to properties that have it defined, but it doesn't guarantee that properties required by API are set +### GetLocalActivityInput + +`func (o *WorkflowStateStartResponse) GetLocalActivityInput() string` + +GetLocalActivityInput returns the LocalActivityInput field if non-nil, zero value otherwise. + +### GetLocalActivityInputOk + +`func (o *WorkflowStateStartResponse) GetLocalActivityInputOk() (*string, bool)` + +GetLocalActivityInputOk returns a tuple with the LocalActivityInput field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetLocalActivityInput + +`func (o *WorkflowStateStartResponse) SetLocalActivityInput(v string)` + +SetLocalActivityInput sets LocalActivityInput field to given value. + +### HasLocalActivityInput + +`func (o *WorkflowStateStartResponse) HasLocalActivityInput() bool` + +HasLocalActivityInput returns a boolean if a field has been set. + ### GetUpsertSearchAttributes `func (o *WorkflowStateStartResponse) GetUpsertSearchAttributes() []SearchAttribute` diff --git a/gen/iwfidl/model_timer_command.go b/gen/iwfidl/model_timer_command.go index 2b702ae3..f9bd3297 100644 --- a/gen/iwfidl/model_timer_command.go +++ b/gen/iwfidl/model_timer_command.go @@ -21,7 +21,7 @@ var _ MappedNullable = &TimerCommand{} type TimerCommand struct { CommandId string `json:"commandId"` FiringUnixTimestampSeconds *int64 `json:"firingUnixTimestampSeconds,omitempty"` - DurationSeconds *int32 `json:"durationSeconds,omitempty"` + DurationSeconds *int64 `json:"durationSeconds,omitempty"` } // NewTimerCommand instantiates a new TimerCommand object @@ -99,9 +99,9 @@ func (o *TimerCommand) SetFiringUnixTimestampSeconds(v int64) { } // GetDurationSeconds returns the DurationSeconds field value if set, zero value otherwise. -func (o *TimerCommand) GetDurationSeconds() int32 { +func (o *TimerCommand) GetDurationSeconds() int64 { if o == nil || IsNil(o.DurationSeconds) { - var ret int32 + var ret int64 return ret } return *o.DurationSeconds @@ -109,7 +109,7 @@ func (o *TimerCommand) GetDurationSeconds() int32 { // GetDurationSecondsOk returns a tuple with the DurationSeconds field value if set, nil otherwise // and a boolean to check if the value has been set. -func (o *TimerCommand) GetDurationSecondsOk() (*int32, bool) { +func (o *TimerCommand) GetDurationSecondsOk() (*int64, bool) { if o == nil || IsNil(o.DurationSeconds) { return nil, false } @@ -125,8 +125,8 @@ func (o *TimerCommand) HasDurationSeconds() bool { return false } -// SetDurationSeconds gets a reference to the given int32 and assigns it to the DurationSeconds field. -func (o *TimerCommand) SetDurationSeconds(v int32) { +// SetDurationSeconds gets a reference to the given int64 and assigns it to the DurationSeconds field. +func (o *TimerCommand) SetDurationSeconds(v int64) { o.DurationSeconds = &v } diff --git a/gen/iwfidl/model_workflow_start_request.go b/gen/iwfidl/model_workflow_start_request.go index 081f8f74..e4ecf68b 100644 --- a/gen/iwfidl/model_workflow_start_request.go +++ b/gen/iwfidl/model_workflow_start_request.go @@ -24,6 +24,7 @@ type WorkflowStartRequest struct { WorkflowTimeoutSeconds int32 `json:"workflowTimeoutSeconds"` IwfWorkerUrl string `json:"iwfWorkerUrl"` StartStateId *string `json:"startStateId,omitempty"` + WaitForCompletionStateIds []string `json:"waitForCompletionStateIds,omitempty"` WaitForCompletionStateExecutionIds []string `json:"waitForCompletionStateExecutionIds,omitempty"` StateInput *EncodedObject `json:"stateInput,omitempty"` StateOptions *WorkflowStateOptions `json:"stateOptions,omitempty"` @@ -179,6 +180,38 @@ func (o *WorkflowStartRequest) SetStartStateId(v string) { o.StartStateId = &v } +// GetWaitForCompletionStateIds returns the WaitForCompletionStateIds field value if set, zero value otherwise. +func (o *WorkflowStartRequest) GetWaitForCompletionStateIds() []string { + if o == nil || IsNil(o.WaitForCompletionStateIds) { + var ret []string + return ret + } + return o.WaitForCompletionStateIds +} + +// GetWaitForCompletionStateIdsOk returns a tuple with the WaitForCompletionStateIds field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *WorkflowStartRequest) GetWaitForCompletionStateIdsOk() ([]string, bool) { + if o == nil || IsNil(o.WaitForCompletionStateIds) { + return nil, false + } + return o.WaitForCompletionStateIds, true +} + +// HasWaitForCompletionStateIds returns a boolean if a field has been set. +func (o *WorkflowStartRequest) HasWaitForCompletionStateIds() bool { + if o != nil && !IsNil(o.WaitForCompletionStateIds) { + return true + } + + return false +} + +// SetWaitForCompletionStateIds gets a reference to the given []string and assigns it to the WaitForCompletionStateIds field. +func (o *WorkflowStartRequest) SetWaitForCompletionStateIds(v []string) { + o.WaitForCompletionStateIds = v +} + // GetWaitForCompletionStateExecutionIds returns the WaitForCompletionStateExecutionIds field value if set, zero value otherwise. func (o *WorkflowStartRequest) GetWaitForCompletionStateExecutionIds() []string { if o == nil || IsNil(o.WaitForCompletionStateExecutionIds) { @@ -324,6 +357,9 @@ func (o WorkflowStartRequest) ToMap() (map[string]interface{}, error) { if !IsNil(o.StartStateId) { toSerialize["startStateId"] = o.StartStateId } + if !IsNil(o.WaitForCompletionStateIds) { + toSerialize["waitForCompletionStateIds"] = o.WaitForCompletionStateIds + } if !IsNil(o.WaitForCompletionStateExecutionIds) { toSerialize["waitForCompletionStateExecutionIds"] = o.WaitForCompletionStateExecutionIds } diff --git a/gen/iwfidl/model_workflow_state_decide_response.go b/gen/iwfidl/model_workflow_state_decide_response.go index e3976ef4..e81a1120 100644 --- a/gen/iwfidl/model_workflow_state_decide_response.go +++ b/gen/iwfidl/model_workflow_state_decide_response.go @@ -19,6 +19,7 @@ var _ MappedNullable = &WorkflowStateDecideResponse{} // WorkflowStateDecideResponse struct for WorkflowStateDecideResponse type WorkflowStateDecideResponse struct { + LocalActivityInput *string `json:"localActivityInput,omitempty"` StateDecision *StateDecision `json:"stateDecision,omitempty"` UpsertSearchAttributes []SearchAttribute `json:"upsertSearchAttributes,omitempty"` UpsertDataObjects []KeyValue `json:"upsertDataObjects,omitempty"` @@ -44,6 +45,38 @@ func NewWorkflowStateDecideResponseWithDefaults() *WorkflowStateDecideResponse { return &this } +// GetLocalActivityInput returns the LocalActivityInput field value if set, zero value otherwise. +func (o *WorkflowStateDecideResponse) GetLocalActivityInput() string { + if o == nil || IsNil(o.LocalActivityInput) { + var ret string + return ret + } + return *o.LocalActivityInput +} + +// GetLocalActivityInputOk returns a tuple with the LocalActivityInput field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *WorkflowStateDecideResponse) GetLocalActivityInputOk() (*string, bool) { + if o == nil || IsNil(o.LocalActivityInput) { + return nil, false + } + return o.LocalActivityInput, true +} + +// HasLocalActivityInput returns a boolean if a field has been set. +func (o *WorkflowStateDecideResponse) HasLocalActivityInput() bool { + if o != nil && !IsNil(o.LocalActivityInput) { + return true + } + + return false +} + +// SetLocalActivityInput gets a reference to the given string and assigns it to the LocalActivityInput field. +func (o *WorkflowStateDecideResponse) SetLocalActivityInput(v string) { + o.LocalActivityInput = &v +} + // GetStateDecision returns the StateDecision field value if set, zero value otherwise. func (o *WorkflowStateDecideResponse) GetStateDecision() StateDecision { if o == nil || IsNil(o.StateDecision) { @@ -246,6 +279,9 @@ func (o WorkflowStateDecideResponse) MarshalJSON() ([]byte, error) { func (o WorkflowStateDecideResponse) ToMap() (map[string]interface{}, error) { toSerialize := map[string]interface{}{} + if !IsNil(o.LocalActivityInput) { + toSerialize["localActivityInput"] = o.LocalActivityInput + } if !IsNil(o.StateDecision) { toSerialize["stateDecision"] = o.StateDecision } diff --git a/gen/iwfidl/model_workflow_state_start_response.go b/gen/iwfidl/model_workflow_state_start_response.go index 6092a697..74706143 100644 --- a/gen/iwfidl/model_workflow_state_start_response.go +++ b/gen/iwfidl/model_workflow_state_start_response.go @@ -19,6 +19,7 @@ var _ MappedNullable = &WorkflowStateStartResponse{} // WorkflowStateStartResponse struct for WorkflowStateStartResponse type WorkflowStateStartResponse struct { + LocalActivityInput *string `json:"localActivityInput,omitempty"` UpsertSearchAttributes []SearchAttribute `json:"upsertSearchAttributes,omitempty"` UpsertDataObjects []KeyValue `json:"upsertDataObjects,omitempty"` CommandRequest *CommandRequest `json:"commandRequest,omitempty"` @@ -44,6 +45,38 @@ func NewWorkflowStateStartResponseWithDefaults() *WorkflowStateStartResponse { return &this } +// GetLocalActivityInput returns the LocalActivityInput field value if set, zero value otherwise. +func (o *WorkflowStateStartResponse) GetLocalActivityInput() string { + if o == nil || IsNil(o.LocalActivityInput) { + var ret string + return ret + } + return *o.LocalActivityInput +} + +// GetLocalActivityInputOk returns a tuple with the LocalActivityInput field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *WorkflowStateStartResponse) GetLocalActivityInputOk() (*string, bool) { + if o == nil || IsNil(o.LocalActivityInput) { + return nil, false + } + return o.LocalActivityInput, true +} + +// HasLocalActivityInput returns a boolean if a field has been set. +func (o *WorkflowStateStartResponse) HasLocalActivityInput() bool { + if o != nil && !IsNil(o.LocalActivityInput) { + return true + } + + return false +} + +// SetLocalActivityInput gets a reference to the given string and assigns it to the LocalActivityInput field. +func (o *WorkflowStateStartResponse) SetLocalActivityInput(v string) { + o.LocalActivityInput = &v +} + // GetUpsertSearchAttributes returns the UpsertSearchAttributes field value if set, zero value otherwise. func (o *WorkflowStateStartResponse) GetUpsertSearchAttributes() []SearchAttribute { if o == nil || IsNil(o.UpsertSearchAttributes) { @@ -246,6 +279,9 @@ func (o WorkflowStateStartResponse) MarshalJSON() ([]byte, error) { func (o WorkflowStateStartResponse) ToMap() (map[string]interface{}, error) { toSerialize := map[string]interface{}{} + if !IsNil(o.LocalActivityInput) { + toSerialize["localActivityInput"] = o.LocalActivityInput + } if !IsNil(o.UpsertSearchAttributes) { toSerialize["upsertSearchAttributes"] = o.UpsertSearchAttributes } diff --git a/integ/wait_for_state_completion_test.go b/integ/wait_for_state_completion_test.go index b925af4b..f408ca38 100644 --- a/integ/wait_for_state_completion_test.go +++ b/integ/wait_for_state_completion_test.go @@ -20,7 +20,9 @@ func TestWaitForStateCompletionTemporal(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestWaitForStateCompletion(t, service.BackendTypeTemporal, nil) + doTestWaitForStateCompletion(t, service.BackendTypeTemporal, nil, false) + smallWaitForFastTest() + doTestWaitForStateCompletion(t, service.BackendTypeTemporal, nil, true) smallWaitForFastTest() } } @@ -30,12 +32,16 @@ func TestWaitForStateCompletionCadence(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestWaitForStateCompletion(t, service.BackendTypeCadence, nil) + doTestWaitForStateCompletion(t, service.BackendTypeCadence, nil, false) + smallWaitForFastTest() + doTestWaitForStateCompletion(t, service.BackendTypeCadence, nil, true) smallWaitForFastTest() } } -func doTestWaitForStateCompletion(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { +func doTestWaitForStateCompletion( + t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, useStateId bool, +) { // start test workflow server wfHandler := wait_for_state_completion.NewHandler() closeFunc1 := startWorkflowWorker(wfHandler) @@ -55,7 +61,7 @@ func doTestWaitForStateCompletion(t *testing.T, backendType service.BackendType, wfId := wait_for_state_completion.WorkflowType + strconv.Itoa(int(time.Now().UnixNano())) req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) nowTimestamp := time.Now().Unix() - _, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{ + startReq := iwfidl.WorkflowStartRequest{ WorkflowId: wfId, IwfWorkflowType: wait_for_state_completion.WorkflowType, WorkflowTimeoutSeconds: 30, @@ -67,8 +73,13 @@ func doTestWaitForStateCompletion(t *testing.T, backendType service.BackendType, WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ WorkflowConfigOverride: config, }, - WaitForCompletionStateExecutionIds: []string{"S1-1"}, - }).Execute() + } + if useStateId { + startReq.WaitForCompletionStateIds = []string{"S1"} + } else { + startReq.WaitForCompletionStateExecutionIds = []string{"S1-1"} + } + _, httpResp, err := req.WorkflowStartRequest(startReq).Execute() panicAtHttpError(err, httpResp) req1 := apiClient.DefaultApi.ApiV1WorkflowWaitForStateCompletionPost(context.Background()) diff --git a/integ/workflow/timer/routers.go b/integ/workflow/timer/routers.go index e8f13f10..41c5b6ce 100644 --- a/integ/workflow/timer/routers.go +++ b/integ/workflow/timer/routers.go @@ -53,15 +53,15 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { TimerCommands: []iwfidl.TimerCommand{ { CommandId: "timer-cmd-id", - DurationSeconds: iwfidl.PtrInt32(10), // fire after 10s + DurationSeconds: iwfidl.PtrInt64(10), // fire after 10s }, { CommandId: "timer-cmd-id-2", - DurationSeconds: iwfidl.PtrInt32(86400), // fire after one day + DurationSeconds: iwfidl.PtrInt64(86400), // fire after one day }, { CommandId: "timer-cmd-id-3", - DurationSeconds: iwfidl.PtrInt32(86400 * 365), // fire after one year + DurationSeconds: iwfidl.PtrInt64(86400 * 365), // fire after one year }, }, DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(), diff --git a/iwf-idl b/iwf-idl index f3d0dba7..a972a2a9 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit f3d0dba7a380d0a459a1b6549bd2ee8df5f66c0f +Subproject commit a972a2a9ccbaf9ad597218a341b984834b04bee9 diff --git a/script/http/local/home.http b/script/http/local/home.http index 7aa6b8a8..ac21bba9 100644 --- a/script/http/local/home.http +++ b/script/http/local/home.http @@ -2,7 +2,7 @@ GET http://localhost:8801/ ### start API POST -POST http://localhost:8801//api/v1/workflow/start +POST http://localhost:8801/api/v1/workflow/start Content-Type: application/json { diff --git a/service/api/service.go b/service/api/service.go index 576c246b..e0cf55d0 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -130,6 +130,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( Config: workflowConfig, UseMemoForDataAttributes: useMemo, WaitForCompletionStateExecutionIds: req.GetWaitForCompletionStateExecutionIds(), + WaitForCompletionStateIds: req.GetWaitForCompletionStateIds(), OmitVersionMarker: s.config.Api.OptimizedVersioning, } diff --git a/service/interfaces.go b/service/interfaces.go index 43be29c9..303cebf8 100644 --- a/service/interfaces.go +++ b/service/interfaces.go @@ -13,6 +13,7 @@ type ( StartStateId *string `json:"startStateId,omitempty"` WaitForCompletionStateExecutionIds []string `json:"waitForCompletionStateExecutionIds,omitempty"` + WaitForCompletionStateIds []string `json:"waitForCompletionStateIds,omitempty"` StateInput *iwfidl.EncodedObject `json:"stateInput,omitempty"` diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index 3a5209ba..2fa1855b 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -38,8 +38,9 @@ func StateApiWaitUntil( }, }) - attempt := provider.GetActivityInfo(ctx).Attempt - scheduledTs := provider.GetActivityInfo(ctx).ScheduledTime.Unix() + activityInfo := provider.GetActivityInfo(ctx) + attempt := activityInfo.Attempt + scheduledTs := activityInfo.ScheduledTime.Unix() input.Request.Context.Attempt = &attempt input.Request.Context.FirstAttemptTimestamp = &scheduledTs @@ -48,7 +49,7 @@ func StateApiWaitUntil( printDebugMsg(logger, err, iwfWorkerBaseUrl) if checkHttpError(err, httpResp) { return nil, composeHttpError( - provider.GetActivityInfo(ctx).IsLocalActivity, + activityInfo.IsLocalActivity, provider, err, httpResp, string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE)) } @@ -56,6 +57,12 @@ func StateApiWaitUntil( return nil, composeStartApiRespError(provider, err, resp) } + // Before returning successful results, check if it's local activity then compose some info for debug purpose + // This is because local activity doesn't record input into the history. + // But there are some small info that are important to record + if activityInfo.IsLocalActivity { + resp.LocalActivityInput = composeInputForDebug(input.Request.Context.GetStateExecutionId()) + } return resp, nil } @@ -86,8 +93,9 @@ func StateApiExecute( }, }) - attempt := provider.GetActivityInfo(ctx).Attempt - scheduledTs := provider.GetActivityInfo(ctx).ScheduledTime.Unix() + activityInfo := provider.GetActivityInfo(ctx) + attempt := activityInfo.Attempt + scheduledTs := activityInfo.ScheduledTime.Unix() input.Request.Context.Attempt = &attempt input.Request.Context.FirstAttemptTimestamp = &scheduledTs @@ -96,7 +104,7 @@ func StateApiExecute( printDebugMsg(logger, err, iwfWorkerBaseUrl) if checkHttpError(err, httpResp) { return nil, composeHttpError( - provider.GetActivityInfo(ctx).IsLocalActivity, + activityInfo.IsLocalActivity, provider, err, httpResp, string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE)) } @@ -104,9 +112,21 @@ func StateApiExecute( return nil, composeExecuteApiRespError(provider, err, resp) } + // Before returning successful results, check if it's local activity then compose some info for debug purpose + // This is because local activity doesn't record input into the history. + // But there are some small info that are important to record + if activityInfo.IsLocalActivity { + resp.LocalActivityInput = composeInputForDebug(input.Request.Context.GetStateExecutionId()) + } + return resp, nil } +func composeInputForDebug(stateExeId string) *string { + // NOTE: only use the stateExecutionId for now, but we can add more later if needed + return iwfidl.PtrString(fmt.Sprintf("stateExeId: %s", stateExeId)) +} + func checkStateDecisionFromResponse(resp *iwfidl.WorkflowStateDecideResponse) error { if resp == nil || resp.StateDecision == nil || len(resp.StateDecision.NextStates) == 0 { return fmt.Errorf("empty state decision is no longer supported. If it's from old SDKs then upgrade the SDK to newer versions") diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index cd3b8320..6ca5bd8c 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -178,7 +178,9 @@ func InterpreterImpl( stateExeId = stateExecutionCounter.CreateNextExecutionId(state.GetStateId()) } - shouldSendSignalOnCompletion := slices.Contains(input.WaitForCompletionStateExecutionIds, stateExeId) + shouldSendSignalOnCompletion := + slices.Contains(input.WaitForCompletionStateExecutionIds, stateExeId) || + slices.Contains(input.WaitForCompletionStateIds, state.GetStateId()) decision, stateExecStatus, err := executeState( ctx, provider, globalVersioner, basicInfo, stateReq, stateExeId, persistenceManager, interStateChannel,