diff --git a/README.md b/README.md index 7b4a61ae..c7e32fcd 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,11 @@ Contribution is welcome. # Why you would need iWF ## TL;DR -AWS published SWF in 2012 and then moved to Step Functions in 2016 because they found it’s too hard to support SWF. The creators later joined Uber and built Cadence and I was lucky enough to join Uber Cadence team. After leaving Cadence team, I have been supporting community and Indeed teams to use Cadence & Temporal. I realized that AWS is right that the programming of SWF/Cadence/Temporal is hard to adopt because of leaking too many internals. Inspired by Step Function, I created this iWF framework. +AWS published SWF in 2012 and then moved to Step Functions in 2016 because they found it’s too hard to support SWF. +Cadence & Temporal continued the idea of SWF and became much more powerful. +However, AWS is right that the programming of SWF/Cadence/Temporal is hard to adopt because of leaking too many internals. +Inspired by Step Function, iWF is created to provide equivalent power of Cadence/Temporal, but hiding all the internal details +and provide clean and simple API to use. Screen Shot 2022-11-10 at 11 23 24 AM @@ -53,10 +57,10 @@ AWS published SWF in 2012 and then moved to Step Functions in 2016 because they ## If you are not * Check out this [doc](https://docs.google.com/document/d/1zyCKvy4S2l7XBVJzZuS65OIsqV9CRPPYJY3OBbuWrPE) to understand some history -iWF is a application platform that provides you a comprehensive tooling: +iWF is an application platform that provides you a comprehensive tooling: * WorkflowAsCode for highly flexibile/customizable business logic * Parallel execution of multiple threads of business -* Intermidiate states stored as "QueryAttributes" and can be retrived by APIs +* Intermidiate states stored as "QueryAttributes" and can be retrieved by APIs * Receiving data from external system by Signal * Searchable attributes that can be used for flexible searching, even full text searching, backed by ElasticSearch * Durable timer, and cron job scheduling @@ -103,7 +107,7 @@ On top of the above basic concepts, you may want to deeply customize your workfl ### More advanced command types * `InterStateChannelCommand`: will be waiting for a value being published from another state(internally in the same workflow) -* WIP `LongRunninngActivityCommand`: will schedule a Cadence/Temporal activity. This is only necessary for long-running activity like hours/days. +* [Future] `LongRunninngActivityCommand`: will schedule a Cadence/Temporal activity. This is only necessary for long-running activity like hours/days. ### WorkflowStartOption * IdReusePolicy diff --git a/integ/any_timer_signal_test.go b/integ/any_timer_signal_test.go new file mode 100644 index 00000000..4e0899dc --- /dev/null +++ b/integ/any_timer_signal_test.go @@ -0,0 +1,91 @@ +package integ + +import ( + "context" + "github.com/indeedeng/iwf/gen/iwfidl" + anytimersignal "github.com/indeedeng/iwf/integ/workflow/any_timer_signal" + "github.com/indeedeng/iwf/service" + "github.com/stretchr/testify/assert" + "strconv" + "testing" + "time" +) + +func TestAnyTimerSignalWorkflowTemporal(t *testing.T) { + doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal) +} + +// TODO this bug in Cadence SDK may cause the test to fail https://github.com/uber-go/cadence-client/issues/1198 +func TestAnyTimerSignalWorkflowCadence(t *testing.T) { + doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence) +} + +func doTestAnyTimerSignalWorkflow(t *testing.T, backendType service.BackendType) { + // start test workflow server + wfHandler := anytimersignal.NewHandler() + closeFunc1 := startWorkflowWorker(wfHandler) + defer closeFunc1() + + closeFunc2 := startIwfService(backendType) + defer closeFunc2() + + // create client + apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ + Servers: []iwfidl.ServerConfiguration{ + { + URL: "http://localhost:" + testIwfServerPort, + }, + }, + }) + + // start a workflow + wfId := anytimersignal.WorkflowType + strconv.Itoa(int(time.Now().Unix())) + req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) + _, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{ + WorkflowId: wfId, + IwfWorkflowType: anytimersignal.WorkflowType, + WorkflowTimeoutSeconds: 10, + IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, + StartStateId: anytimersignal.State1, + }).Execute() + panicAtHttpError(err, httpResp) + + // wait for 3 secs and send the signal + time.Sleep(time.Second * 3) + signalValue := iwfidl.EncodedObject{ + Encoding: iwfidl.PtrString("json"), + Data: iwfidl.PtrString("test-data-1"), + } + req2 := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background()) + httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{ + WorkflowId: wfId, + SignalChannelName: anytimersignal.SignalName, + SignalValue: &signalValue, + }).Execute() + panicAtHttpError(err, httpResp) + + // wait for the workflow + reqWait := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + _, httpResp, err = reqWait.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: wfId, + }).Execute() + panicAtHttpError(err, httpResp) + + history, data := wfHandler.GetTestResult() + assertions := assert.New(t) + assertions.Equalf(map[string]int64{ + "S1_start": 2, + "S1_decide": 2, + "S2_start": 1, + "S2_decide": 1, + }, history, "anytimersignal test fail, %v", history) + + assertions.Equal(anytimersignal.SignalName, data["signalChannelName1"]) + assertions.Equal("signal-cmd-id", data["signalCommandId1"]) + assertions.Equal(service.SignalStatusWaiting, data["signalStatus1"]) + + assertions.Equal(anytimersignal.SignalName, data["signalChannelName2"]) + assertions.Equal("signal-cmd-id", data["signalCommandId2"]) + assertions.Equal(service.SignalStatusReceived, data["signalStatus2"]) + assertions.Equal(signalValue, data["signalValue2"]) +} diff --git a/integ/workflow/any_timer_signal/routers.go b/integ/workflow/any_timer_signal/routers.go new file mode 100644 index 00000000..2f1c40aa --- /dev/null +++ b/integ/workflow/any_timer_signal/routers.go @@ -0,0 +1,136 @@ +package anytimersignal + +import ( + "github.com/gin-gonic/gin" + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/integ/workflow/common" + "github.com/indeedeng/iwf/service" + "log" + "net/http" + "time" +) + +const ( + WorkflowType = "any_timer_signal" + State1 = "S1" + State2 = "S2" + SignalName = "test-signal-name" +) + +type handler struct { + invokeHistory map[string]int64 + invokeData map[string]interface{} +} + +func NewHandler() common.WorkflowHandler { + return &handler{ + invokeHistory: make(map[string]int64), + invokeData: make(map[string]interface{}), + } +} + +// ApiV1WorkflowStartPost - for a workflow +func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { + var req iwfidl.WorkflowStateStartRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Println("received state start request, ", req) + + if req.GetWorkflowType() == WorkflowType { + h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ + if req.GetWorkflowStateId() == State1 { + var timerCommands []iwfidl.TimerCommand + context := req.GetContext() + if context.GetStateExecutionId() == State1+"-"+"1" { + now := time.Now().Unix() + timerCommands = []iwfidl.TimerCommand{ + { + FiringUnixTimestampSeconds: now + 1, // fire after 1s + }, + } + } + + c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ + CommandRequest: &iwfidl.CommandRequest{ + SignalCommands: []iwfidl.SignalCommand{ + { + CommandId: "signal-cmd-id", + SignalChannelName: SignalName, + }, + }, + TimerCommands: timerCommands, + DeciderTriggerType: service.DeciderTypeAnyCommandCompleted, + }, + }) + return + } + if req.GetWorkflowStateId() == State2 { + c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ + CommandRequest: &iwfidl.CommandRequest{ + DeciderTriggerType: service.DeciderTypeAllCommandCompleted, + }, + }) + return + } + } + + c.JSON(http.StatusBadRequest, struct{}{}) +} + +func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) { + var req iwfidl.WorkflowStateDecideRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Println("received state decide request, ", req) + + if req.GetWorkflowType() == WorkflowType { + h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if req.GetWorkflowStateId() == State1 { + signalResults := req.GetCommandResults() + var movements []iwfidl.StateMovement + + context := req.GetContext() + if context.GetStateExecutionId() == State1+"-"+"1" { + h.invokeData["signalChannelName1"] = signalResults.SignalResults[0].GetSignalChannelName() + h.invokeData["signalCommandId1"] = signalResults.SignalResults[0].GetCommandId() + h.invokeData["signalStatus1"] = signalResults.SignalResults[0].GetSignalRequestStatus() + movements = []iwfidl.StateMovement{{StateId: State1}} + } else { + h.invokeData["signalChannelName2"] = signalResults.SignalResults[0].GetSignalChannelName() + h.invokeData["signalCommandId2"] = signalResults.SignalResults[0].GetCommandId() + h.invokeData["signalStatus2"] = signalResults.SignalResults[0].GetSignalRequestStatus() + h.invokeData["signalValue2"] = signalResults.SignalResults[0].GetSignalValue() + movements = []iwfidl.StateMovement{{StateId: State2}} + } + + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: movements, + }, + }) + return + } else if req.GetWorkflowStateId() == State2 { + // go to complete + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: []iwfidl.StateMovement{ + { + StateId: service.GracefulCompletingWorkflowStateId, + }, + }, + }, + }) + return + } + } + + c.JSON(http.StatusBadRequest, struct{}{}) +} + +func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { + return h.invokeHistory, h.invokeData +} diff --git a/service/interpreter/cadence/workflowProvider.go b/service/interpreter/cadence/workflowProvider.go index 87c4dedf..865ef18c 100644 --- a/service/interpreter/cadence/workflowProvider.go +++ b/service/interpreter/cadence/workflowProvider.go @@ -28,6 +28,17 @@ func (w *workflowProvider) UpsertSearchAttributes(ctx interpreter.UnifiedContext return workflow.UpsertSearchAttributes(wfCtx, attributes) } +func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Duration) interpreter.Future { + wfCtx, ok := ctx.GetContext().(workflow.Context) + if !ok { + panic("cannot convert to temporal workflow context") + } + f := workflow.NewTimer(wfCtx, d) + return &futureImpl{ + future: f, + } +} + func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) interpreter.WorkflowInfo { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -102,11 +113,15 @@ func (w *workflowProvider) WithActivityOptions(ctx interpreter.UnifiedContext, o return interpreter.NewUnifiedContext(wfCtx2) } -type temporalFuture struct { +type futureImpl struct { future workflow.Future } -func (t *temporalFuture) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error { +func (t *futureImpl) IsReady() bool { + return t.future.IsReady() +} + +func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -121,7 +136,7 @@ func (w *workflowProvider) ExecuteActivity(ctx interpreter.UnifiedContext, activ panic("cannot convert to temporal workflow context") } f := workflow.ExecuteActivity(wfCtx, activity, args...) - return &temporalFuture{ + return &futureImpl{ future: f, } } @@ -156,6 +171,10 @@ type cadenceReceiveChannel struct { channel workflow.Channel } +func (t *cadenceReceiveChannel) ReceiveAsync(valuePtr interface{}) (ok bool) { + return t.channel.ReceiveAsync(valuePtr) +} + func (t *cadenceReceiveChannel) Receive(ctx interpreter.UnifiedContext, valuePtr interface{}) (more bool) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { diff --git a/service/interpreter/interfaces.go b/service/interpreter/interfaces.go index b96ffa32..df98ea21 100644 --- a/service/interpreter/interfaces.go +++ b/service/interpreter/interfaces.go @@ -81,6 +81,7 @@ type WorkflowProvider interface { ExecuteActivity(ctx UnifiedContext, activity interface{}, args ...interface{}) (future Future) Now(ctx UnifiedContext) time.Time Sleep(ctx UnifiedContext, d time.Duration) (err error) + NewTimer(ctx UnifiedContext, d time.Duration) Future GetSignalChannel(ctx UnifiedContext, signalName string) (receiveChannel ReceiveChannel) GetContextValue(ctx UnifiedContext, key string) interface{} GetVersion(ctx UnifiedContext, changeID string, minSupported, maxSupported int) int @@ -89,8 +90,10 @@ type WorkflowProvider interface { type ReceiveChannel interface { Receive(ctx UnifiedContext, valuePtr interface{}) (more bool) + ReceiveAsync(valuePtr interface{}) (ok bool) } type Future interface { Get(ctx UnifiedContext, valuePtr interface{}) error + IsReady() bool } diff --git a/service/interpreter/temporal/workflowProvider.go b/service/interpreter/temporal/workflowProvider.go index 08e92368..aaddbbdd 100644 --- a/service/interpreter/temporal/workflowProvider.go +++ b/service/interpreter/temporal/workflowProvider.go @@ -28,6 +28,17 @@ func (w *workflowProvider) UpsertSearchAttributes(ctx interpreter.UnifiedContext return workflow.UpsertSearchAttributes(wfCtx, attributes) } +func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Duration) interpreter.Future { + wfCtx, ok := ctx.GetContext().(workflow.Context) + if !ok { + panic("cannot convert to temporal workflow context") + } + f := workflow.NewTimer(wfCtx, d) + return &futureImpl{ + future: f, + } +} + func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) interpreter.WorkflowInfo { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -90,11 +101,15 @@ func (w *workflowProvider) WithActivityOptions(ctx interpreter.UnifiedContext, o return interpreter.NewUnifiedContext(wfCtx2) } -type temporalFuture struct { +type futureImpl struct { future workflow.Future } -func (t *temporalFuture) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error { +func (t *futureImpl) IsReady() bool { + return t.future.IsReady() +} + +func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -109,7 +124,7 @@ func (w *workflowProvider) ExecuteActivity(ctx interpreter.UnifiedContext, activ panic("cannot convert to temporal workflow context") } f := workflow.ExecuteActivity(wfCtx, activity, args...) - return &temporalFuture{ + return &futureImpl{ future: f, } } @@ -144,6 +159,10 @@ type temporalReceiveChannel struct { channel workflow.ReceiveChannel } +func (t *temporalReceiveChannel) ReceiveAsync(valuePtr interface{}) (ok bool) { + return t.channel.ReceiveAsync(valuePtr) +} + func (t *temporalReceiveChannel) Receive(ctx interpreter.UnifiedContext, valuePtr interface{}) (more bool) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 4362e728..d9ef3a4e 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -10,6 +10,8 @@ import ( func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) { globalVersionProvider := newGlobalVersionProvider(provider) if globalVersionProvider.isAfterVersionOfUsingGlobalVersioning(ctx) { + // TODO this bug in Cadence SDK may cause concurrent writes + // https://github.com/uber-go/cadence-client/issues/1198 err := globalVersionProvider.upsertGlobalVersionSearchAttribute(ctx) if err != nil { return nil, err @@ -223,6 +225,7 @@ func executeState( interStateChannel.ProcessPublishing(startResponse.GetPublishToInterStateChannel()) commandReq := startResponse.GetCommandRequest() + commandReqDone := false completedTimerCmds := map[int]bool{} if len(commandReq.GetTimerCommands()) > 0 { @@ -242,8 +245,13 @@ func executeState( now := provider.Now(ctx).Unix() fireAt := cmd.GetFiringUnixTimestampSeconds() duration := time.Duration(fireAt-now) * time.Second - _ = provider.Sleep(ctx, duration) - completedTimerCmds[idx] = true + future := provider.NewTimer(ctx, duration) + _ = provider.Await(ctx, func() bool { + return future.IsReady() || commandReqDone + }) + if future.IsReady() { + completedTimerCmds[idx] = true + } }) } } @@ -264,8 +272,14 @@ func executeState( } ch := provider.GetSignalChannel(ctx, cmd.GetSignalChannelName()) value := iwfidl.EncodedObject{} - ch.Receive(ctx, &value) - completedSignalCmds[idx] = &value + received := false + _ = provider.Await(ctx, func() bool { + received = ch.ReceiveAsync(&value) + return received || commandReqDone + }) + if received { + completedSignalCmds[idx] = &value + } }) } } @@ -285,9 +299,10 @@ func executeState( panic("critical code bug") } + received := false _ = provider.Await(ctx, func() bool { - res := interStateChannel.HasData(cmd.ChannelName) - return res + received = interStateChannel.HasData(cmd.ChannelName) + return received || commandReqDone }) completedInterStateChannelCmds[idx] = interStateChannel.Retrieve(cmd.ChannelName) @@ -315,6 +330,7 @@ func executeState( return nil, provider.NewApplicationError("unsupported decider trigger type", "unsupported", triggerType) } } + commandReqDone = true if err != nil { return nil, err