diff --git a/integ/reset_by_state_id_test.go b/integ/reset_by_state_id_test.go new file mode 100644 index 00000000..9941724a --- /dev/null +++ b/integ/reset_by_state_id_test.go @@ -0,0 +1,191 @@ +package integ + +import ( + "context" + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/integ/workflow/reset" + "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/ptr" + "github.com/stretchr/testify/assert" + "strconv" + "testing" + "time" +) + +func TestResetByStateIdWorkflowTemporal(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestResetByStatIdWorkflow(t, service.BackendTypeTemporal, nil) + smallWaitForFastTest() + + //TODO: uncomment below when IWF-403 implementation is done. + //TODO cont.: Reset with state id & state execution id is broken for local activities. + //doTestResetByStatIdWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) + //smallWaitForFastTest() + } +} + +func TestResetByStateIdWorkflowCadence(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestResetByStatIdWorkflow(t, service.BackendTypeCadence, nil) + smallWaitForFastTest() + + //TODO: uncomment below when IWF-403 implementation is done. + //TODO cont.: Reset with state id & state execution id is broken for local activities. + //doTestResetByStatIdWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) + //smallWaitForFastTest() + } +} + +func doTestResetByStatIdWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { + // start test workflow server + wfHandler := reset.NewHandler() + closeFunc1 := startWorkflowWorker(wfHandler) + defer closeFunc1() + + _, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ + BackendType: backendType, + }) + defer closeFunc2() + + // start a workflow + apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ + Servers: []iwfidl.ServerConfiguration{ + { + URL: "http://localhost:" + testIwfServerPort, + }, + }, + }) + wfId := reset.WorkflowType + strconv.Itoa(int(time.Now().UnixNano())) + wfInput := &iwfidl.EncodedObject{ + Encoding: iwfidl.PtrString("json"), + Data: iwfidl.PtrString("1"), + } + req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) + startReq := iwfidl.WorkflowStartRequest{ + WorkflowId: wfId, + IwfWorkflowType: reset.WorkflowType, + WorkflowTimeoutSeconds: 100, + IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, + StartStateId: ptr.Any(reset.State1), + StateInput: wfInput, + WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ + WorkflowConfigOverride: config, + WorkflowIDReusePolicy: ptr.Any(iwfidl.REJECT_DUPLICATE), + RetryPolicy: &iwfidl.WorkflowRetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(11), + BackoffCoefficient: iwfidl.PtrFloat32(11), + MaximumAttempts: iwfidl.PtrInt32(11), + MaximumIntervalSeconds: iwfidl.PtrInt32(11), + }, + }, + StateOptions: &iwfidl.WorkflowStateOptions{ + StartApiTimeoutSeconds: iwfidl.PtrInt32(12), + DecideApiTimeoutSeconds: iwfidl.PtrInt32(13), + //Skipping wait until for state1 + SkipWaitUntil: iwfidl.PtrBool(true), + StartApiRetryPolicy: &iwfidl.RetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(12), + BackoffCoefficient: iwfidl.PtrFloat32(12), + MaximumAttempts: iwfidl.PtrInt32(12), + MaximumIntervalSeconds: iwfidl.PtrInt32(12), + }, + DecideApiRetryPolicy: &iwfidl.RetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(13), + BackoffCoefficient: iwfidl.PtrFloat32(13), + MaximumAttempts: iwfidl.PtrInt32(13), + MaximumIntervalSeconds: iwfidl.PtrInt32(13), + }, + }, + } + startResp, httpResp, err := req.WorkflowStartRequest(startReq).Execute() + panicAtHttpError(err, httpResp) + + assertions := assert.New(t) + + req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + resp2, httpResp, err := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: wfId, + }).Execute() + panicAtHttpError(err, httpResp) + + history, _ := wfHandler.GetTestResult() + //expect no starts in history as WaitUntil api is skipped. + assertions.Equalf(map[string]int64{ + "S1_decide": 1, + "S2_decide": 5, + }, history, "reset test fail, %v", history) + + assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus()) + assertions.Equal(1, len(resp2.GetResults())) + assertions.Equal("S2", resp2.GetResults()[0].CompletedStateId) + assertions.Equal("S2-5", resp2.GetResults()[0].CompletedStateExecutionId) + assertions.Equal("5", resp2.GetResults()[0].CompletedStateOutput.GetData()) + + //reset workflow by state id + resetReq := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background()) + _, httpResp, err = resetReq.WorkflowResetRequest(iwfidl.WorkflowResetRequest{ + WorkflowRunId: iwfidl.PtrString(startResp.GetWorkflowRunId()), + WorkflowId: wfId, + ResetType: iwfidl.STATE_ID, + StateId: iwfidl.PtrString(reset.State2), + }).Execute() + panicAtHttpError(err, httpResp) + + time.Sleep(time.Second * 20) + + req3 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + resp3, httpResp, err := req3.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: wfId, + }).Execute() + panicAtHttpError(err, httpResp) + + resetHistory, _ := wfHandler.GetTestResult() + //expect no starts in history as WaitUntil api is skipped. + assertions.Equalf(map[string]int64{ + "S1_decide": 1, + "S2_decide": 10, + }, resetHistory, "reset test fail, %v", resetHistory) + + assertions.Equal(iwfidl.COMPLETED, resp3.GetWorkflowStatus()) + assertions.Equal(1, len(resp3.GetResults())) + assertions.Equal("S2", resp3.GetResults()[0].CompletedStateId) + assertions.Equal("S2-5", resp3.GetResults()[0].CompletedStateExecutionId) + assertions.Equal("5", resp3.GetResults()[0].CompletedStateOutput.GetData()) + + //reset workflow by state execution id + reset2Req := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background()) + _, httpResp, err = reset2Req.WorkflowResetRequest(iwfidl.WorkflowResetRequest{ + WorkflowRunId: iwfidl.PtrString(startResp.GetWorkflowRunId()), + WorkflowId: wfId, + ResetType: iwfidl.STATE_EXECUTION_ID, + StateExecutionId: iwfidl.PtrString(reset.State2 + "-4"), + }).Execute() + panicAtHttpError(err, httpResp) + + time.Sleep(time.Second * 20) + + req4 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + resp4, httpResp, err := req4.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: wfId, + }).Execute() + panicAtHttpError(err, httpResp) + + reset2History, _ := wfHandler.GetTestResult() + //expect no starts in history as WaitUntil api is skipped. + assertions.Equalf(map[string]int64{ + "S1_decide": 1, + "S2_decide": 12, + }, reset2History, "reset test fail, %v", reset2History) + + assertions.Equal(iwfidl.COMPLETED, resp4.GetWorkflowStatus()) + assertions.Equal(1, len(resp4.GetResults())) + assertions.Equal("S2", resp4.GetResults()[0].CompletedStateId) + assertions.Equal("S2-5", resp4.GetResults()[0].CompletedStateExecutionId) + assertions.Equal("5", resp4.GetResults()[0].CompletedStateOutput.GetData()) +} diff --git a/integ/workflow/reset/routers.go b/integ/workflow/reset/routers.go new file mode 100644 index 00000000..c08812e6 --- /dev/null +++ b/integ/workflow/reset/routers.go @@ -0,0 +1,168 @@ +package reset + +import ( + "fmt" + "github.com/gin-gonic/gin" + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" + "log" + "net/http" + "strconv" +) + +//A workflow without calling WaitUntil api for any state. It loops through state2 5 times. Used for: +// * Testing reset by state id and state execution id. + +const ( + WorkflowType = "reset" + State1 = "S1" + State2 = "S2" +) + +type handler struct { + invokeHistory map[string]int64 +} + +func NewHandler() *handler { + return &handler{ + invokeHistory: make(map[string]int64), + } +} + +// ApiV1WorkflowStartPost - for a workflow +func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { + var req iwfidl.WorkflowStateStartRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Println("received state start request, ", req) + + context := req.GetContext() + if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 { + panic("attempt and firstAttemptTimestamp should be greater than zero") + } + + if req.GetWorkflowType() == WorkflowType { + // basic workflow go straight to decide methods without any commands + if req.GetWorkflowStateId() == State1 || req.GetWorkflowStateId() == State2 { + h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + log.Println("beg start response") + c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ + CommandRequest: &iwfidl.CommandRequest{ + DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(), + }, + }) + return + } + } + + c.JSON(http.StatusBadRequest, struct{}{}) +} + +func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) { + log.Println("start of decide") + var req iwfidl.WorkflowStateDecideRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Println("received state decide request, ", req) + context := req.GetContext() + if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 { + panic("attempt and firstAttemptTimestamp should be greater than zero") + } + + if req.GetWorkflowType() == WorkflowType { + h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if req.GetWorkflowStateId() == State1 { + // go to S2 + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: []iwfidl.StateMovement{ + { + StateId: State2, + StateInput: req.StateInput, + StateOptions: &iwfidl.WorkflowStateOptions{ + StartApiTimeoutSeconds: iwfidl.PtrInt32(14), + ExecuteApiTimeoutSeconds: iwfidl.PtrInt32(15), + //Skipping wait until for 1st execution of state2 + SkipWaitUntil: iwfidl.PtrBool(true), + StartApiRetryPolicy: &iwfidl.RetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(14), + BackoffCoefficient: iwfidl.PtrFloat32(14), + MaximumAttempts: iwfidl.PtrInt32(14), + MaximumIntervalSeconds: iwfidl.PtrInt32(14), + }, + ExecuteApiRetryPolicy: &iwfidl.RetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(15), + BackoffCoefficient: iwfidl.PtrFloat32(15), + MaximumAttempts: iwfidl.PtrInt32(15), + MaximumIntervalSeconds: iwfidl.PtrInt32(15), + }, + }, + }, + }, + }, + }) + return + } else if req.GetWorkflowStateId() == State2 { + input := req.GetStateInput() + i, _ := strconv.Atoi(input.GetData()) + if i < 5 { + updatedInput := &iwfidl.EncodedObject{ + Encoding: iwfidl.PtrString("json"), + Data: iwfidl.PtrString(fmt.Sprintf("%v", i+1)), + } + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: []iwfidl.StateMovement{ + { + StateId: State2, + StateInput: updatedInput, + StateOptions: &iwfidl.WorkflowStateOptions{ + StartApiTimeoutSeconds: iwfidl.PtrInt32(14), + ExecuteApiTimeoutSeconds: iwfidl.PtrInt32(15), + //Skipping wait until for all executions of state2 after the 1st execution. + SkipWaitUntil: iwfidl.PtrBool(true), + StartApiRetryPolicy: &iwfidl.RetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(14), + BackoffCoefficient: iwfidl.PtrFloat32(14), + MaximumAttempts: iwfidl.PtrInt32(14), + MaximumIntervalSeconds: iwfidl.PtrInt32(14), + }, + ExecuteApiRetryPolicy: &iwfidl.RetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(15), + BackoffCoefficient: iwfidl.PtrFloat32(15), + MaximumAttempts: iwfidl.PtrInt32(15), + MaximumIntervalSeconds: iwfidl.PtrInt32(15), + }, + }, + }, + }, + }, + }) + return + } else { + // go to complete + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: []iwfidl.StateMovement{ + { + StateId: service.GracefulCompletingWorkflowStateId, + StateInput: req.StateInput, + }, + }, + }, + }) + return + } + } + } + + c.JSON(http.StatusBadRequest, struct{}{}) +} + +func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { + return h.invokeHistory, nil +} diff --git a/iwf-idl b/iwf-idl index e5bc6236..a6fbd8fe 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit e5bc6236d36b977c2fec85c2bf5e0742f859e04e +Subproject commit a6fbd8feb427fdf77b7ddc0f7c9d7eb154c385e7 diff --git a/service/client/cadence/reset.go b/service/client/cadence/reset.go index 3aafb014..080a646b 100644 --- a/service/client/cadence/reset.go +++ b/service/client/cadence/reset.go @@ -167,6 +167,7 @@ func getDecisionEventIDByStateOrStateExecutionId( if e.GetEventType() == shared.EventTypeDecisionTaskCompleted { decisionFinishID = e.GetEventId() } + //TODO: Add check for local activity. (IWF-403) if e.GetEventType() == shared.EventTypeActivityTaskScheduled { typeName := e.GetActivityTaskScheduledEventAttributes().GetActivityType().GetName() if strings.Contains(typeName, "StateApiExecute") || strings.Contains(typeName, "StateApiWaitUntil") { diff --git a/service/client/temporal/reset.go b/service/client/temporal/reset.go index d4f943c8..6e6de797 100644 --- a/service/client/temporal/reset.go +++ b/service/client/temporal/reset.go @@ -162,6 +162,7 @@ func getDecisionEventIDByStateOrStateExecutionId( if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { decisionFinishID = e.GetEventId() } + //TODO: Add check for local activity. (IWF-403) if e.GetEventType() == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED { typeName := e.GetActivityTaskScheduledEventAttributes().GetActivityType().GetName() if strings.Contains(typeName, "StateApiExecute") || strings.Contains(typeName, "StateApiWaitUntil") {