Skip to content

Commit

Permalink
Fix any command of timer and signal combination (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Nov 15, 2022
1 parent fe375a9 commit a6e1e31
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 16 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ Contribution is welcome.
# Why you would need iWF

## TL;DR
AWS published SWF in 2012 and then moved to Step Functions in 2016 because they found it’s too hard to support SWF. The creators later joined Uber and built Cadence and I was lucky enough to join Uber Cadence team. After leaving Cadence team, I have been supporting community and Indeed teams to use Cadence & Temporal. I realized that AWS is right that the programming of SWF/Cadence/Temporal is hard to adopt because of leaking too many internals. Inspired by Step Function, I created this iWF framework.
AWS published SWF in 2012 and then moved to Step Functions in 2016 because they found it’s too hard to support SWF.
Cadence & Temporal continued the idea of SWF and became much more powerful.
However, AWS is right that the programming of SWF/Cadence/Temporal is hard to adopt because of leaking too many internals.
Inspired by Step Function, iWF is created to provide equivalent power of Cadence/Temporal, but hiding all the internal details
and provide clean and simple API to use.

<img width="916" alt="Screen Shot 2022-11-10 at 11 23 24 AM" src="https://user-images.githubusercontent.com/4523955/201188875-32e1d070-ab53-4ac5-92fd-bb8ed16dd7dc.png">

Expand All @@ -53,10 +57,10 @@ AWS published SWF in 2012 and then moved to Step Functions in 2016 because they
## If you are not
* Check out this [doc](https://docs.google.com/document/d/1zyCKvy4S2l7XBVJzZuS65OIsqV9CRPPYJY3OBbuWrPE) to understand some history

iWF is a application platform that provides you a comprehensive tooling:
iWF is an application platform that provides you a comprehensive tooling:
* WorkflowAsCode for highly flexibile/customizable business logic
* Parallel execution of multiple threads of business
* Intermidiate states stored as "QueryAttributes" and can be retrived by APIs
* Intermidiate states stored as "QueryAttributes" and can be retrieved by APIs
* Receiving data from external system by Signal
* Searchable attributes that can be used for flexible searching, even full text searching, backed by ElasticSearch
* Durable timer, and cron job scheduling
Expand Down Expand Up @@ -103,7 +107,7 @@ On top of the above basic concepts, you may want to deeply customize your workfl

### More advanced command types
* `InterStateChannelCommand`: will be waiting for a value being published from another state(internally in the same workflow)
* WIP `LongRunninngActivityCommand`: will schedule a Cadence/Temporal activity. This is only necessary for long-running activity like hours/days.
* [Future] `LongRunninngActivityCommand`: will schedule a Cadence/Temporal activity. This is only necessary for long-running activity like hours/days.

### WorkflowStartOption
* IdReusePolicy
Expand Down
91 changes: 91 additions & 0 deletions integ/any_timer_signal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package integ

import (
"context"
"github.com/indeedeng/iwf/gen/iwfidl"
anytimersignal "github.com/indeedeng/iwf/integ/workflow/any_timer_signal"
"github.com/indeedeng/iwf/service"
"github.com/stretchr/testify/assert"
"strconv"
"testing"
"time"
)

func TestAnyTimerSignalWorkflowTemporal(t *testing.T) {
doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal)
}

// TODO this bug in Cadence SDK may cause the test to fail https://github.com/uber-go/cadence-client/issues/1198
func TestAnyTimerSignalWorkflowCadence(t *testing.T) {
doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence)
}

func doTestAnyTimerSignalWorkflow(t *testing.T, backendType service.BackendType) {
// start test workflow server
wfHandler := anytimersignal.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

closeFunc2 := startIwfService(backendType)
defer closeFunc2()

// create client
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})

// start a workflow
wfId := anytimersignal.WorkflowType + strconv.Itoa(int(time.Now().Unix()))
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: anytimersignal.WorkflowType,
WorkflowTimeoutSeconds: 10,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: anytimersignal.State1,
}).Execute()
panicAtHttpError(err, httpResp)

// wait for 3 secs and send the signal
time.Sleep(time.Second * 3)
signalValue := iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString("test-data-1"),
}
req2 := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background())
httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: anytimersignal.SignalName,
SignalValue: &signalValue,
}).Execute()
panicAtHttpError(err, httpResp)

// wait for the workflow
reqWait := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
_, httpResp, err = reqWait.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

history, data := wfHandler.GetTestResult()
assertions := assert.New(t)
assertions.Equalf(map[string]int64{
"S1_start": 2,
"S1_decide": 2,
"S2_start": 1,
"S2_decide": 1,
}, history, "anytimersignal test fail, %v", history)

assertions.Equal(anytimersignal.SignalName, data["signalChannelName1"])
assertions.Equal("signal-cmd-id", data["signalCommandId1"])
assertions.Equal(service.SignalStatusWaiting, data["signalStatus1"])

assertions.Equal(anytimersignal.SignalName, data["signalChannelName2"])
assertions.Equal("signal-cmd-id", data["signalCommandId2"])
assertions.Equal(service.SignalStatusReceived, data["signalStatus2"])
assertions.Equal(signalValue, data["signalValue2"])
}
136 changes: 136 additions & 0 deletions integ/workflow/any_timer_signal/routers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package anytimersignal

import (
"github.com/gin-gonic/gin"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/integ/workflow/common"
"github.com/indeedeng/iwf/service"
"log"
"net/http"
"time"
)

const (
WorkflowType = "any_timer_signal"
State1 = "S1"
State2 = "S2"
SignalName = "test-signal-name"
)

type handler struct {
invokeHistory map[string]int64
invokeData map[string]interface{}
}

func NewHandler() common.WorkflowHandler {
return &handler{
invokeHistory: make(map[string]int64),
invokeData: make(map[string]interface{}),
}
}

// ApiV1WorkflowStartPost - for a workflow
func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
var req iwfidl.WorkflowStateStartRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state start request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
if req.GetWorkflowStateId() == State1 {
var timerCommands []iwfidl.TimerCommand
context := req.GetContext()
if context.GetStateExecutionId() == State1+"-"+"1" {
now := time.Now().Unix()
timerCommands = []iwfidl.TimerCommand{
{
FiringUnixTimestampSeconds: now + 1, // fire after 1s
},
}
}

c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: []iwfidl.SignalCommand{
{
CommandId: "signal-cmd-id",
SignalChannelName: SignalName,
},
},
TimerCommands: timerCommands,
DeciderTriggerType: service.DeciderTypeAnyCommandCompleted,
},
})
return
}
if req.GetWorkflowStateId() == State2 {
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
DeciderTriggerType: service.DeciderTypeAllCommandCompleted,
},
})
return
}
}

c.JSON(http.StatusBadRequest, struct{}{})
}

func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
var req iwfidl.WorkflowStateDecideRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state decide request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if req.GetWorkflowStateId() == State1 {
signalResults := req.GetCommandResults()
var movements []iwfidl.StateMovement

context := req.GetContext()
if context.GetStateExecutionId() == State1+"-"+"1" {
h.invokeData["signalChannelName1"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId1"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus1"] = signalResults.SignalResults[0].GetSignalRequestStatus()
movements = []iwfidl.StateMovement{{StateId: State1}}
} else {
h.invokeData["signalChannelName2"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId2"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus2"] = signalResults.SignalResults[0].GetSignalRequestStatus()
h.invokeData["signalValue2"] = signalResults.SignalResults[0].GetSignalValue()
movements = []iwfidl.StateMovement{{StateId: State2}}
}

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: movements,
},
})
return
} else if req.GetWorkflowStateId() == State2 {
// go to complete
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: service.GracefulCompletingWorkflowStateId,
},
},
},
})
return
}
}

c.JSON(http.StatusBadRequest, struct{}{})
}

func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) {
return h.invokeHistory, h.invokeData
}
25 changes: 22 additions & 3 deletions service/interpreter/cadence/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ func (w *workflowProvider) UpsertSearchAttributes(ctx interpreter.UnifiedContext
return workflow.UpsertSearchAttributes(wfCtx, attributes)
}

func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Duration) interpreter.Future {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
}
f := workflow.NewTimer(wfCtx, d)
return &futureImpl{
future: f,
}
}

func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) interpreter.WorkflowInfo {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
Expand Down Expand Up @@ -102,11 +113,15 @@ func (w *workflowProvider) WithActivityOptions(ctx interpreter.UnifiedContext, o
return interpreter.NewUnifiedContext(wfCtx2)
}

type temporalFuture struct {
type futureImpl struct {
future workflow.Future
}

func (t *temporalFuture) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error {
func (t *futureImpl) IsReady() bool {
return t.future.IsReady()
}

func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
Expand All @@ -121,7 +136,7 @@ func (w *workflowProvider) ExecuteActivity(ctx interpreter.UnifiedContext, activ
panic("cannot convert to temporal workflow context")
}
f := workflow.ExecuteActivity(wfCtx, activity, args...)
return &temporalFuture{
return &futureImpl{
future: f,
}
}
Expand Down Expand Up @@ -156,6 +171,10 @@ type cadenceReceiveChannel struct {
channel workflow.Channel
}

func (t *cadenceReceiveChannel) ReceiveAsync(valuePtr interface{}) (ok bool) {
return t.channel.ReceiveAsync(valuePtr)
}

func (t *cadenceReceiveChannel) Receive(ctx interpreter.UnifiedContext, valuePtr interface{}) (more bool) {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
Expand Down
3 changes: 3 additions & 0 deletions service/interpreter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type WorkflowProvider interface {
ExecuteActivity(ctx UnifiedContext, activity interface{}, args ...interface{}) (future Future)
Now(ctx UnifiedContext) time.Time
Sleep(ctx UnifiedContext, d time.Duration) (err error)
NewTimer(ctx UnifiedContext, d time.Duration) Future
GetSignalChannel(ctx UnifiedContext, signalName string) (receiveChannel ReceiveChannel)
GetContextValue(ctx UnifiedContext, key string) interface{}
GetVersion(ctx UnifiedContext, changeID string, minSupported, maxSupported int) int
Expand All @@ -89,8 +90,10 @@ type WorkflowProvider interface {

type ReceiveChannel interface {
Receive(ctx UnifiedContext, valuePtr interface{}) (more bool)
ReceiveAsync(valuePtr interface{}) (ok bool)
}

type Future interface {
Get(ctx UnifiedContext, valuePtr interface{}) error
IsReady() bool
}
25 changes: 22 additions & 3 deletions service/interpreter/temporal/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ func (w *workflowProvider) UpsertSearchAttributes(ctx interpreter.UnifiedContext
return workflow.UpsertSearchAttributes(wfCtx, attributes)
}

func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Duration) interpreter.Future {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
}
f := workflow.NewTimer(wfCtx, d)
return &futureImpl{
future: f,
}
}

func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) interpreter.WorkflowInfo {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
Expand Down Expand Up @@ -90,11 +101,15 @@ func (w *workflowProvider) WithActivityOptions(ctx interpreter.UnifiedContext, o
return interpreter.NewUnifiedContext(wfCtx2)
}

type temporalFuture struct {
type futureImpl struct {
future workflow.Future
}

func (t *temporalFuture) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error {
func (t *futureImpl) IsReady() bool {
return t.future.IsReady()
}

func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
Expand All @@ -109,7 +124,7 @@ func (w *workflowProvider) ExecuteActivity(ctx interpreter.UnifiedContext, activ
panic("cannot convert to temporal workflow context")
}
f := workflow.ExecuteActivity(wfCtx, activity, args...)
return &temporalFuture{
return &futureImpl{
future: f,
}
}
Expand Down Expand Up @@ -144,6 +159,10 @@ type temporalReceiveChannel struct {
channel workflow.ReceiveChannel
}

func (t *temporalReceiveChannel) ReceiveAsync(valuePtr interface{}) (ok bool) {
return t.channel.ReceiveAsync(valuePtr)
}

func (t *temporalReceiveChannel) Receive(ctx interpreter.UnifiedContext, valuePtr interface{}) (more bool) {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
Expand Down
Loading

0 comments on commit a6e1e31

Please sign in to comment.