From c58672e907e11dc873fc639e6f1b38d9175e5400 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Cac=CC=A7ador?= Date: Wed, 25 Sep 2024 14:02:47 +0100 Subject: [PATCH] adding integ tests --- integ/headers_test.go | 167 ++++++++++++++++++++++++++++++ integ/workflow/headers/config.go | 4 + integ/workflow/headers/routers.go | 131 +++++++++++++++++++++++ 3 files changed, 302 insertions(+) create mode 100644 integ/headers_test.go create mode 100644 integ/workflow/headers/config.go create mode 100644 integ/workflow/headers/routers.go diff --git a/integ/headers_test.go b/integ/headers_test.go new file mode 100644 index 00000000..837d816d --- /dev/null +++ b/integ/headers_test.go @@ -0,0 +1,167 @@ +package integ + +import ( + "context" + "github.com/indeedeng/iwf/integ/workflow/basic" + "github.com/indeedeng/iwf/integ/workflow/headers" + "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/env" + "log" + "strconv" + "testing" + "time" + + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" + "github.com/stretchr/testify/assert" +) + +func TestHeadersWorkflowTemporal(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, nil) + smallWaitForFastTest() + + doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) + smallWaitForFastTest() + + doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, &iwfidl.WorkflowConfig{ + DisableSystemSearchAttribute: iwfidl.PtrBool(true), + }) + smallWaitForFastTest() + } +} + +func TestHeadersWorkflowCadence(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestWorkflowWithHeaders(t, service.BackendTypeCadence, nil) + smallWaitForFastTest() + doTestWorkflowWithHeaders(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) + smallWaitForFastTest() + } +} + +func doTestWorkflowWithHeaders(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { + // start test workflow server + wfHandler := headers.NewHandler() + closeFunc1 := startWorkflowWorker(wfHandler) + defer closeFunc1() + + _, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ + BackendType: backendType, + DefaultHeaders: map[string]string{ + headers.TestHeaderKey: headers.TestHeaderValue, + }, + }) + defer closeFunc2() + + // start a workflow + sharedConfig := env.GetSharedConfig() + apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ + DefaultHeader: sharedConfig.Interpreter.InterpreterActivityConfig.DefaultHeader, + Servers: []iwfidl.ServerConfiguration{ + { + URL: "http://localhost:" + testIwfServerPort, + }, + }, + }) + wfId := basic.WorkflowType + strconv.Itoa(int(time.Now().UnixNano())) + wfInput := &iwfidl.EncodedObject{ + Encoding: iwfidl.PtrString("json"), + Data: iwfidl.PtrString("test data"), + } + req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) + startReq := iwfidl.WorkflowStartRequest{ + WorkflowId: wfId, + IwfWorkflowType: basic.WorkflowType, + WorkflowTimeoutSeconds: 100, + IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, + StartStateId: ptr.Any(basic.State1), + StateInput: wfInput, + WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ + WorkflowConfigOverride: config, + WorkflowIDReusePolicy: ptr.Any(iwfidl.REJECT_DUPLICATE), + // TODO: need more work to write integ test for cron + // manual testing for now by uncomment the following line + // CronSchedule: iwfidl.PtrString("* * * * *"), + RetryPolicy: &iwfidl.WorkflowRetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(11), + BackoffCoefficient: iwfidl.PtrFloat32(11), + MaximumAttempts: iwfidl.PtrInt32(11), + MaximumIntervalSeconds: iwfidl.PtrInt32(11), + }, + }, + StateOptions: &iwfidl.WorkflowStateOptions{ + StartApiTimeoutSeconds: iwfidl.PtrInt32(12), + DecideApiTimeoutSeconds: iwfidl.PtrInt32(13), + StartApiRetryPolicy: &iwfidl.RetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(12), + BackoffCoefficient: iwfidl.PtrFloat32(12), + MaximumAttempts: iwfidl.PtrInt32(12), + MaximumIntervalSeconds: iwfidl.PtrInt32(12), + }, + DecideApiRetryPolicy: &iwfidl.RetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(13), + BackoffCoefficient: iwfidl.PtrFloat32(13), + MaximumAttempts: iwfidl.PtrInt32(13), + MaximumIntervalSeconds: iwfidl.PtrInt32(13), + }, + }, + } + _, httpResp, err := req.WorkflowStartRequest(startReq).Execute() + panicAtHttpError(err, httpResp) + + // start it again should return already started error + _, _, err = req.WorkflowStartRequest(startReq).Execute() + apiErr, ok := err.(*iwfidl.GenericOpenAPIError) + if !ok { + log.Fatalf("Should fail to invoke start api %v", err) + } + errResp, ok := apiErr.Model().(iwfidl.ErrorResponse) + if !ok { + log.Fatalf("should be error response") + } + assertions := assert.New(t) + assertions.Equal(errResp.GetSubStatus(), iwfidl.WORKFLOW_ALREADY_STARTED_SUB_STATUS) + + req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + resp2, httpResp, err := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: wfId, + }).Execute() + panicAtHttpError(err, httpResp) + + // use a wrong workflowId to test the error case + _, _, err = req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: "a wrong workflowId", + }).Execute() + apiErr, ok = err.(*iwfidl.GenericOpenAPIError) + if !ok { + log.Fatalf("Should fail to invoke get api %v", err) + } + errResp, ok = apiErr.Model().(iwfidl.ErrorResponse) + if !ok { + log.Fatalf("should be error response") + } + assertions.Equal(errResp.GetSubStatus(), iwfidl.WORKFLOW_NOT_EXISTS_SUB_STATUS) + + history, _ := wfHandler.GetTestResult() + assertions.Equalf(map[string]int64{ + "S1_start": 1, + "S1_decide": 1, + "S2_start": 1, + "S2_decide": 1, + }, history, "basic test fail, %v", history) + + assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus()) + assertions.Equal(1, len(resp2.GetResults())) + assertions.Equal(iwfidl.StateCompletionOutput{ + CompletedStateId: "S2", + CompletedStateExecutionId: "S2-1", + CompletedStateOutput: wfInput, + }, resp2.GetResults()[0]) +} diff --git a/integ/workflow/headers/config.go b/integ/workflow/headers/config.go new file mode 100644 index 00000000..0337d491 --- /dev/null +++ b/integ/workflow/headers/config.go @@ -0,0 +1,4 @@ +package headers + +const TestHeaderKey = "integration-test-header" +const TestHeaderValue = "integration-test-value" diff --git a/integ/workflow/headers/routers.go b/integ/workflow/headers/routers.go new file mode 100644 index 00000000..75716189 --- /dev/null +++ b/integ/workflow/headers/routers.go @@ -0,0 +1,131 @@ +package headers + +import ( + "github.com/gin-gonic/gin" + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" + "log" + "net/http" +) + +const ( + WorkflowType = "basic" + State1 = "S1" + State2 = "S2" +) + +type handler struct { + invokeHistory map[string]int64 +} + +func NewHandler() *handler { + return &handler{ + invokeHistory: make(map[string]int64), + } +} + +// ApiV1WorkflowStartPost - for a workflow +func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { + headerValue := c.GetHeader(TestHeaderKey) + if headerValue != TestHeaderValue { + panic("test header not found") + } + + var req iwfidl.WorkflowStateStartRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Println("received state start request, ", req) + + context := req.GetContext() + if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 { + panic("attempt and firstAttemptTimestamp should be greater than zero") + } + + if req.GetWorkflowType() == WorkflowType { + // basic workflow go straight to decide methods without any commands + if req.GetWorkflowStateId() == State1 || req.GetWorkflowStateId() == State2 { + h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ + CommandRequest: &iwfidl.CommandRequest{ + DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(), + }, + }) + return + } + } + + c.JSON(http.StatusBadRequest, struct{}{}) +} + +func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) { + headerValue := c.GetHeader(TestHeaderKey) + if headerValue != TestHeaderValue { + panic("test header not found") + } + + var req iwfidl.WorkflowStateDecideRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Println("received state decide request, ", req) + context := req.GetContext() + if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 { + panic("attempt and firstAttemptTimestamp should be greater than zero") + } + + if req.GetWorkflowType() == WorkflowType { + h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if req.GetWorkflowStateId() == State1 { + // go to S2 + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: []iwfidl.StateMovement{ + { + StateId: State2, + StateInput: req.StateInput, + StateOptions: &iwfidl.WorkflowStateOptions{ + StartApiTimeoutSeconds: iwfidl.PtrInt32(14), + ExecuteApiTimeoutSeconds: iwfidl.PtrInt32(15), + StartApiRetryPolicy: &iwfidl.RetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(14), + BackoffCoefficient: iwfidl.PtrFloat32(14), + MaximumAttempts: iwfidl.PtrInt32(14), + MaximumIntervalSeconds: iwfidl.PtrInt32(14), + }, + ExecuteApiRetryPolicy: &iwfidl.RetryPolicy{ + InitialIntervalSeconds: iwfidl.PtrInt32(15), + BackoffCoefficient: iwfidl.PtrFloat32(15), + MaximumAttempts: iwfidl.PtrInt32(15), + MaximumIntervalSeconds: iwfidl.PtrInt32(15), + }, + }, + }, + }, + }, + }) + return + } else if req.GetWorkflowStateId() == State2 { + // go to complete + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: []iwfidl.StateMovement{ + { + StateId: service.GracefulCompletingWorkflowStateId, + StateInput: req.StateInput, + }, + }, + }, + }) + return + } + } + + c.JSON(http.StatusBadRequest, struct{}{}) +} + +func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { + return h.invokeHistory, nil +}