Skip to content

Commit

Permalink
Update doc and provide taskQueue param (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Nov 8, 2022
1 parent de0d711 commit 65be199
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 30 deletions.
39 changes: 27 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ Contribution is welcome.
* [If you are familar with <a href="https://github.com/uber/cadence">Cadence</a>/<a href="https://github.com/temporalio/temporal">Temporal</a>](#if-you-are-familar-with-cadencetemporal)
* [If you are not](#if-you-are-not)
* [What is iWF](#what-is-iwf)
* [Basic Concepts &amp; Usage](#basic-concepts--usage)
* [Advanced Concepts &amp; Usage](#advanced-concepts--usage)
* [WorkflowStartOption](#workflowstartoption)
* [WorkflowStateOption](#workflowstateoption)
* [Reset Workflow](#reset-workflow)
* [How to run](#how-to-run)
* [Using docker image](#using-docker-image)
* [How to build &amp; run locally](#how-to-build--run-locally)
Expand Down Expand Up @@ -61,17 +59,16 @@ iWF is a application platform that provides you a comprehensive tooling:

# What is iWF

## Basic Concepts & Usage
iWF lets you build long-running applications by implementing the workflow interface, e.g. [Java Workflow interface](https://github.com/indeedeng/iwf-java-sdk/blob/main/src/main/java/io/github/cadenceoss/iwf/core/Workflow.java).

The key elements of a workflow are `WorkflowState`. A workflow can contain any number of WorkflowStates.

A WorkflowState is implemented with two APIs: `start` and `decide`. `start` API is invoked immediately when a WorkflowState is started. It will return some `Commands` to server. When the requested `Commands` are completed, `decide` API will be triggered.

There are several types of commands:
* `SignalCommand`: will be waiting for a signal from external to the workflow signal channel.
* `TimerCommand`: will be waiting for a durable timer to fire.
* `InterStateChannelCommand`: will be waiting for a value being published from another state(internally in the same workflow)
* `LongRunninngActivityCommand`: will schedule a Cadence/Temporal activity. This is only necessary for long-running activity like hours/days.
These are the two basic command types:
* `SignalCommand`: will be waiting for a signal from external to the workflow signal channel. External application can use SignalWorkflow API to signal a workflow.
* `TimerCommand`: will be waiting for a **durable timer** to fire.

Note that `start` API can return multiple commands, and choose different DeciderTriggerType for triggering decide API:
* `AllCommandCompleted`: this will wait for all command completed
Expand All @@ -92,10 +89,15 @@ iWF provides the below primitives when implementing the WorkflowState:
* search for workflows in Cadence/Temporal WebUI in Advanced tab
* search attribute type must be registered in Cadence/Temporal server before using for searching because it is backed up ElasticSearch
* the data types supported are limited as server has to understand the value for indexing
* See [Temporal doc](https://docs.temporal.io/concepts/what-is-a-search-attribute) and [Cadence doc](https://cadenceworkflow.io/docs/concepts/search-workflows/) to understand more about SearchAttribute

## Advanced Concepts & Usage
On top of the above basic concepts, you may want to deeply customize your workflow by using the below features.

### More advanced command types
* `InterStateChannelCommand`: will be waiting for a value being published from another state(internally in the same workflow)
* WIP `LongRunninngActivityCommand`: will schedule a Cadence/Temporal activity. This is only necessary for long-running activity like hours/days.

### WorkflowStartOption
* IdReusePolicy
* CronSchedule
Expand All @@ -105,7 +107,20 @@ On top of the above basic concepts, you may want to deeply customize your workfl
* API timeout & retry
### Reset Workflow

## How to change workflow code
Unlike Cadence/Temporal, there is no [Non-deterministic](https://docs.temporal.io/workflows#deterministic-constraints) errors anymore in iWF.
And there is no [versioning APIs](https://docs.temporal.io/go/versioning). You will never see the worker crashing & replay issues on iWF workers.

However, changing workflow code could still have backward compatibility issues. Here are some tips:
* Changing the behavior of a WorkflowState will always apply to any existing workflow executions.
* Usually that's what you want. If it's not, then you should utilize QueryAttribute, SearchAttribute or StateLocalAttribute to record some data, and use it to decide the new behavior
* Removing an existing WorkflowState could cause existing workflow executions to stuck if there is any workflow executing on it.
* If that happens, the worker will return errors to server, which will fail the server workflow activity and keep on backoff retry until it's fixed
* To safely delete a WorkflowState, you can utilize the `IwfExecutingStateIds` search attribute to check if the stateId is still being executed

## How to write unit test
Writing unit test for iWF workflow code should be super easy compared to Cadence/Temporal. There is no need to use any special testEnv.
The standard unit test library should be sufficient.

# How to run

Expand Down Expand Up @@ -148,8 +163,8 @@ NOTE: alternatively, go to [Temporal-dockercompose](https://github.com/temporali
3. Register system search attributes required by iWF server
```shell
tctl adm cl asa -n IwfWorkflowType -t Keyword
tctl adm cl asa -n GlobalWorkflowVersion -t Int
tctl adm cl asa -n ExecutingStateIds -t Keyword
tctl adm cl asa -n IwfGlobalWorkflowVersion -t Int
tctl adm cl asa -n IwfExecutingStateIds -t Keyword

```
4 For `attribute_test.go` integTests, you need to register search attributes:
Expand All @@ -166,8 +181,8 @@ docker-compose -f docker-compose-es-v7.yml up
2. Register a new domain if not haven `cadence --do default domain register`
3. Register system search attributes required by iWF server
```
cadence adm cl asa --search_attr_key GlobalWorkflowVersion --search_attr_type 2
cadence adm cl asa --search_attr_key ExecutingStateIds --search_attr_type 0
cadence adm cl asa --search_attr_key IwfGlobalWorkflowVersion --search_attr_type 2
cadence adm cl asa --search_attr_key IwfExecutingStateIds --search_attr_type 0
cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0
```
4. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days
Expand Down
5 changes: 3 additions & 2 deletions cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package iwf
import (
"fmt"
"github.com/indeedeng/iwf/service"
isvc "github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/api"
cadenceapi "github.com/indeedeng/iwf/service/api/cadence"
temporalapi "github.com/indeedeng/iwf/service/api/temporal"
Expand Down Expand Up @@ -141,7 +142,7 @@ func launchTemporalService(svcName string, config *service.Config, unifiedClient
svc := api.NewService(unifiedClient)
log.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port)))
case serviceInterpreter:
interpreter := temporal.NewInterpreterWorker(temporalClient)
interpreter := temporal.NewInterpreterWorker(temporalClient, isvc.TaskQueue)
interpreter.Start()
default:
log.Printf("Invalid service: %v", svcName)
Expand All @@ -160,7 +161,7 @@ func launchCadenceService(
svc := api.NewService(unifiedClient)
log.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port)))
case serviceInterpreter:
interpreter := cadence.NewInterpreterWorker(service, domain, closeFunc)
interpreter := cadence.NewInterpreterWorker(service, domain, isvc.TaskQueue, closeFunc)
interpreter.Start()
default:
log.Printf("Invalid service: %v", svcName)
Expand Down
4 changes: 2 additions & 2 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func startIwfService(backendType service.BackendType) (closeFunc func()) {
}()

// start iwf interpreter worker
interpreter := temporal.NewInterpreterWorker(temporalClient)
interpreter := temporal.NewInterpreterWorker(temporalClient, service.TaskQueue)
interpreter.Start()
return func() {
iwfServer.Close()
Expand All @@ -81,7 +81,7 @@ func startIwfService(backendType service.BackendType) (closeFunc func()) {
}()

// start iwf interpreter worker
interpreter := cadence.NewInterpreterWorker(serviceClient, iwf.DefaultCadenceDomain, closeFunc)
interpreter := cadence.NewInterpreterWorker(serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc)
interpreter.Start()
return func() {
iwfServer.Close()
Expand Down
3 changes: 2 additions & 1 deletion service/api/handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"github.com/indeedeng/iwf/service"
"log"
"net/http"

Expand All @@ -14,7 +15,7 @@ type handler struct {
}

func newHandler(client UnifiedClient) *handler {
svc, err := NewApiService(client)
svc, err := NewApiService(client, service.TaskQueue)
if err != nil {
panic(err)
}
Expand Down
10 changes: 6 additions & 4 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@ import (
)

type serviceImpl struct {
client UnifiedClient
client UnifiedClient
taskQueue string
}

func (s *serviceImpl) Close() {
s.client.Close()
}

func NewApiService(client UnifiedClient) (ApiService, error) {
func NewApiService(client UnifiedClient, taskQueue string) (ApiService, error) {
return &serviceImpl{
client: client,
client: client,
taskQueue: taskQueue,
}, nil
}

func (s *serviceImpl) ApiV1WorkflowStartPost(req iwfidl.WorkflowStartRequest) (*iwfidl.WorkflowStartResponse, *ErrorAndStatus) {
workflowOptions := StartWorkflowOptions{
ID: req.GetWorkflowId(),
TaskQueue: service.TaskQueue,
TaskQueue: s.taskQueue,
WorkflowRunTimeout: time.Duration(req.WorkflowTimeoutSeconds) * time.Second,
}

Expand Down
6 changes: 3 additions & 3 deletions service/const.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package service

const (
TaskQueue = "Interpreter"
TaskQueue = "Interpreter_DEFAULT"
GracefulCompletingWorkflowStateId = "_SYS_GRACEFUL_COMPLETING_WORKFLOW"
ForceCompletingWorkflowStateId = "_SYS_FORCE_COMPLETING_WORKFLOW"
ForceFailingWorkflowStateId = "_SYS_FORCE_FAILING_WORKFLOW"
Expand Down Expand Up @@ -38,8 +38,8 @@ const (
WorkflowStatusCanceled = "CANCELED"
WorkflowStatusContinueAsNew = "CONTINUED_AS_NEW"

SearchAttributeGlobalVersion = "GlobalWorkflowVersion"
SearchAttributeExecutingStateIds = "ExecutingStateIds"
SearchAttributeGlobalVersion = "IwfGlobalWorkflowVersion"
SearchAttributeExecutingStateIds = "IwfExecutingStateIds"
SearchAttributeIwfWorkflowType = "IwfWorkflowType"
)

Expand Down
7 changes: 4 additions & 3 deletions service/interpreter/cadence/worker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cadence

import (
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/interpreter"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/worker"
Expand All @@ -13,12 +12,14 @@ type InterpreterWorker struct {
closeFunc func()
domain string
worker worker.Worker
tasklist string
}

func NewInterpreterWorker(service workflowserviceclient.Interface, domain string, closeFunc func()) *InterpreterWorker {
func NewInterpreterWorker(service workflowserviceclient.Interface, domain, tasklist string, closeFunc func()) *InterpreterWorker {
return &InterpreterWorker{
service: service,
domain: domain,
tasklist: tasklist,
closeFunc: closeFunc,
}
}
Expand All @@ -29,7 +30,7 @@ func (iw *InterpreterWorker) Close() {
}

func (iw *InterpreterWorker) Start() {
iw.worker = worker.New(iw.service, iw.domain, service.TaskQueue, worker.Options{})
iw.worker = worker.New(iw.service, iw.domain, iw.tasklist, worker.Options{})

iw.worker.RegisterWorkflow(Interpreter)
iw.worker.RegisterActivity(interpreter.StateStart)
Expand Down
7 changes: 4 additions & 3 deletions service/interpreter/temporal/worker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package temporal

import (
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/interpreter"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
Expand All @@ -11,11 +10,13 @@ import (
type InterpreterWorker struct {
temporalClient client.Client
worker worker.Worker
taskQueue string
}

func NewInterpreterWorker(temporalClient client.Client) *InterpreterWorker {
func NewInterpreterWorker(temporalClient client.Client, taskQueue string) *InterpreterWorker {
return &InterpreterWorker{
temporalClient: temporalClient,
taskQueue: taskQueue,
}
}

Expand All @@ -25,7 +26,7 @@ func (iw *InterpreterWorker) Close() {
}

func (iw *InterpreterWorker) Start() {
iw.worker = worker.New(iw.temporalClient, service.TaskQueue, worker.Options{})
iw.worker = worker.New(iw.temporalClient, iw.taskQueue, worker.Options{})

iw.worker.RegisterWorkflow(Interpreter)
iw.worker.RegisterActivity(interpreter.StateStart)
Expand Down

0 comments on commit 65be199

Please sign in to comment.