From 65be1997d25b74bcf2008d157c1e11fc8da43efc Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Mon, 7 Nov 2022 23:02:19 -0800 Subject: [PATCH] Update doc and provide taskQueue param (#70) --- README.md | 39 ++++++++++++++++++-------- cmd/server/iwf/iwf.go | 5 ++-- integ/util.go | 4 +-- service/api/handler.go | 3 +- service/api/service.go | 10 ++++--- service/const.go | 6 ++-- service/interpreter/cadence/worker.go | 7 +++-- service/interpreter/temporal/worker.go | 7 +++-- 8 files changed, 51 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index cf57db24..fe5091f6 100644 --- a/README.md +++ b/README.md @@ -21,10 +21,8 @@ Contribution is welcome. * [If you are familar with Cadence/Temporal](#if-you-are-familar-with-cadencetemporal) * [If you are not](#if-you-are-not) * [What is iWF](#what-is-iwf) + * [Basic Concepts & Usage](#basic-concepts--usage) * [Advanced Concepts & Usage](#advanced-concepts--usage) - * [WorkflowStartOption](#workflowstartoption) - * [WorkflowStateOption](#workflowstateoption) - * [Reset Workflow](#reset-workflow) * [How to run](#how-to-run) * [Using docker image](#using-docker-image) * [How to build & run locally](#how-to-build--run-locally) @@ -61,17 +59,16 @@ iWF is a application platform that provides you a comprehensive tooling: # What is iWF +## Basic Concepts & Usage iWF lets you build long-running applications by implementing the workflow interface, e.g. [Java Workflow interface](https://github.com/indeedeng/iwf-java-sdk/blob/main/src/main/java/io/github/cadenceoss/iwf/core/Workflow.java). The key elements of a workflow are `WorkflowState`. A workflow can contain any number of WorkflowStates. A WorkflowState is implemented with two APIs: `start` and `decide`. `start` API is invoked immediately when a WorkflowState is started. It will return some `Commands` to server. When the requested `Commands` are completed, `decide` API will be triggered. -There are several types of commands: -* `SignalCommand`: will be waiting for a signal from external to the workflow signal channel. -* `TimerCommand`: will be waiting for a durable timer to fire. -* `InterStateChannelCommand`: will be waiting for a value being published from another state(internally in the same workflow) -* `LongRunninngActivityCommand`: will schedule a Cadence/Temporal activity. This is only necessary for long-running activity like hours/days. +These are the two basic command types: +* `SignalCommand`: will be waiting for a signal from external to the workflow signal channel. External application can use SignalWorkflow API to signal a workflow. +* `TimerCommand`: will be waiting for a **durable timer** to fire. Note that `start` API can return multiple commands, and choose different DeciderTriggerType for triggering decide API: * `AllCommandCompleted`: this will wait for all command completed @@ -92,10 +89,15 @@ iWF provides the below primitives when implementing the WorkflowState: * search for workflows in Cadence/Temporal WebUI in Advanced tab * search attribute type must be registered in Cadence/Temporal server before using for searching because it is backed up ElasticSearch * the data types supported are limited as server has to understand the value for indexing + * See [Temporal doc](https://docs.temporal.io/concepts/what-is-a-search-attribute) and [Cadence doc](https://cadenceworkflow.io/docs/concepts/search-workflows/) to understand more about SearchAttribute ## Advanced Concepts & Usage On top of the above basic concepts, you may want to deeply customize your workflow by using the below features. +### More advanced command types +* `InterStateChannelCommand`: will be waiting for a value being published from another state(internally in the same workflow) +* WIP `LongRunninngActivityCommand`: will schedule a Cadence/Temporal activity. This is only necessary for long-running activity like hours/days. + ### WorkflowStartOption * IdReusePolicy * CronSchedule @@ -105,7 +107,20 @@ On top of the above basic concepts, you may want to deeply customize your workfl * API timeout & retry ### Reset Workflow +## How to change workflow code +Unlike Cadence/Temporal, there is no [Non-deterministic](https://docs.temporal.io/workflows#deterministic-constraints) errors anymore in iWF. +And there is no [versioning APIs](https://docs.temporal.io/go/versioning). You will never see the worker crashing & replay issues on iWF workers. + +However, changing workflow code could still have backward compatibility issues. Here are some tips: +* Changing the behavior of a WorkflowState will always apply to any existing workflow executions. + * Usually that's what you want. If it's not, then you should utilize QueryAttribute, SearchAttribute or StateLocalAttribute to record some data, and use it to decide the new behavior +* Removing an existing WorkflowState could cause existing workflow executions to stuck if there is any workflow executing on it. + * If that happens, the worker will return errors to server, which will fail the server workflow activity and keep on backoff retry until it's fixed + * To safely delete a WorkflowState, you can utilize the `IwfExecutingStateIds` search attribute to check if the stateId is still being executed +## How to write unit test +Writing unit test for iWF workflow code should be super easy compared to Cadence/Temporal. There is no need to use any special testEnv. +The standard unit test library should be sufficient. # How to run @@ -148,8 +163,8 @@ NOTE: alternatively, go to [Temporal-dockercompose](https://github.com/temporali 3. Register system search attributes required by iWF server ```shell tctl adm cl asa -n IwfWorkflowType -t Keyword -tctl adm cl asa -n GlobalWorkflowVersion -t Int -tctl adm cl asa -n ExecutingStateIds -t Keyword +tctl adm cl asa -n IwfGlobalWorkflowVersion -t Int +tctl adm cl asa -n IwfExecutingStateIds -t Keyword ``` 4 For `attribute_test.go` integTests, you need to register search attributes: @@ -166,8 +181,8 @@ docker-compose -f docker-compose-es-v7.yml up 2. Register a new domain if not haven `cadence --do default domain register` 3. Register system search attributes required by iWF server ``` -cadence adm cl asa --search_attr_key GlobalWorkflowVersion --search_attr_type 2 -cadence adm cl asa --search_attr_key ExecutingStateIds --search_attr_type 0 +cadence adm cl asa --search_attr_key IwfGlobalWorkflowVersion --search_attr_type 2 +cadence adm cl asa --search_attr_key IwfExecutingStateIds --search_attr_type 0 cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0 ``` 4. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days diff --git a/cmd/server/iwf/iwf.go b/cmd/server/iwf/iwf.go index d368365a..809f2d1b 100644 --- a/cmd/server/iwf/iwf.go +++ b/cmd/server/iwf/iwf.go @@ -23,6 +23,7 @@ package iwf import ( "fmt" "github.com/indeedeng/iwf/service" + isvc "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/api" cadenceapi "github.com/indeedeng/iwf/service/api/cadence" temporalapi "github.com/indeedeng/iwf/service/api/temporal" @@ -141,7 +142,7 @@ func launchTemporalService(svcName string, config *service.Config, unifiedClient svc := api.NewService(unifiedClient) log.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port))) case serviceInterpreter: - interpreter := temporal.NewInterpreterWorker(temporalClient) + interpreter := temporal.NewInterpreterWorker(temporalClient, isvc.TaskQueue) interpreter.Start() default: log.Printf("Invalid service: %v", svcName) @@ -160,7 +161,7 @@ func launchCadenceService( svc := api.NewService(unifiedClient) log.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port))) case serviceInterpreter: - interpreter := cadence.NewInterpreterWorker(service, domain, closeFunc) + interpreter := cadence.NewInterpreterWorker(service, domain, isvc.TaskQueue, closeFunc) interpreter.Start() default: log.Printf("Invalid service: %v", svcName) diff --git a/integ/util.go b/integ/util.go index f8d0a01a..160c409a 100644 --- a/integ/util.go +++ b/integ/util.go @@ -55,7 +55,7 @@ func startIwfService(backendType service.BackendType) (closeFunc func()) { }() // start iwf interpreter worker - interpreter := temporal.NewInterpreterWorker(temporalClient) + interpreter := temporal.NewInterpreterWorker(temporalClient, service.TaskQueue) interpreter.Start() return func() { iwfServer.Close() @@ -81,7 +81,7 @@ func startIwfService(backendType service.BackendType) (closeFunc func()) { }() // start iwf interpreter worker - interpreter := cadence.NewInterpreterWorker(serviceClient, iwf.DefaultCadenceDomain, closeFunc) + interpreter := cadence.NewInterpreterWorker(serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc) interpreter.Start() return func() { iwfServer.Close() diff --git a/service/api/handler.go b/service/api/handler.go index 9213b2a2..c5c08316 100644 --- a/service/api/handler.go +++ b/service/api/handler.go @@ -1,6 +1,7 @@ package api import ( + "github.com/indeedeng/iwf/service" "log" "net/http" @@ -14,7 +15,7 @@ type handler struct { } func newHandler(client UnifiedClient) *handler { - svc, err := NewApiService(client) + svc, err := NewApiService(client, service.TaskQueue) if err != nil { panic(err) } diff --git a/service/api/service.go b/service/api/service.go index d0ac0561..ddeb8589 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -11,23 +11,25 @@ import ( ) type serviceImpl struct { - client UnifiedClient + client UnifiedClient + taskQueue string } func (s *serviceImpl) Close() { s.client.Close() } -func NewApiService(client UnifiedClient) (ApiService, error) { +func NewApiService(client UnifiedClient, taskQueue string) (ApiService, error) { return &serviceImpl{ - client: client, + client: client, + taskQueue: taskQueue, }, nil } func (s *serviceImpl) ApiV1WorkflowStartPost(req iwfidl.WorkflowStartRequest) (*iwfidl.WorkflowStartResponse, *ErrorAndStatus) { workflowOptions := StartWorkflowOptions{ ID: req.GetWorkflowId(), - TaskQueue: service.TaskQueue, + TaskQueue: s.taskQueue, WorkflowRunTimeout: time.Duration(req.WorkflowTimeoutSeconds) * time.Second, } diff --git a/service/const.go b/service/const.go index 5ce77646..a92c1ba9 100644 --- a/service/const.go +++ b/service/const.go @@ -1,7 +1,7 @@ package service const ( - TaskQueue = "Interpreter" + TaskQueue = "Interpreter_DEFAULT" GracefulCompletingWorkflowStateId = "_SYS_GRACEFUL_COMPLETING_WORKFLOW" ForceCompletingWorkflowStateId = "_SYS_FORCE_COMPLETING_WORKFLOW" ForceFailingWorkflowStateId = "_SYS_FORCE_FAILING_WORKFLOW" @@ -38,8 +38,8 @@ const ( WorkflowStatusCanceled = "CANCELED" WorkflowStatusContinueAsNew = "CONTINUED_AS_NEW" - SearchAttributeGlobalVersion = "GlobalWorkflowVersion" - SearchAttributeExecutingStateIds = "ExecutingStateIds" + SearchAttributeGlobalVersion = "IwfGlobalWorkflowVersion" + SearchAttributeExecutingStateIds = "IwfExecutingStateIds" SearchAttributeIwfWorkflowType = "IwfWorkflowType" ) diff --git a/service/interpreter/cadence/worker.go b/service/interpreter/cadence/worker.go index 9ada467d..7fff4b4d 100644 --- a/service/interpreter/cadence/worker.go +++ b/service/interpreter/cadence/worker.go @@ -1,7 +1,6 @@ package cadence import ( - "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/cadence/worker" @@ -13,12 +12,14 @@ type InterpreterWorker struct { closeFunc func() domain string worker worker.Worker + tasklist string } -func NewInterpreterWorker(service workflowserviceclient.Interface, domain string, closeFunc func()) *InterpreterWorker { +func NewInterpreterWorker(service workflowserviceclient.Interface, domain, tasklist string, closeFunc func()) *InterpreterWorker { return &InterpreterWorker{ service: service, domain: domain, + tasklist: tasklist, closeFunc: closeFunc, } } @@ -29,7 +30,7 @@ func (iw *InterpreterWorker) Close() { } func (iw *InterpreterWorker) Start() { - iw.worker = worker.New(iw.service, iw.domain, service.TaskQueue, worker.Options{}) + iw.worker = worker.New(iw.service, iw.domain, iw.tasklist, worker.Options{}) iw.worker.RegisterWorkflow(Interpreter) iw.worker.RegisterActivity(interpreter.StateStart) diff --git a/service/interpreter/temporal/worker.go b/service/interpreter/temporal/worker.go index bf84c2d9..208bee0c 100644 --- a/service/interpreter/temporal/worker.go +++ b/service/interpreter/temporal/worker.go @@ -1,7 +1,6 @@ package temporal import ( - "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" @@ -11,11 +10,13 @@ import ( type InterpreterWorker struct { temporalClient client.Client worker worker.Worker + taskQueue string } -func NewInterpreterWorker(temporalClient client.Client) *InterpreterWorker { +func NewInterpreterWorker(temporalClient client.Client, taskQueue string) *InterpreterWorker { return &InterpreterWorker{ temporalClient: temporalClient, + taskQueue: taskQueue, } } @@ -25,7 +26,7 @@ func (iw *InterpreterWorker) Close() { } func (iw *InterpreterWorker) Start() { - iw.worker = worker.New(iw.temporalClient, service.TaskQueue, worker.Options{}) + iw.worker = worker.New(iw.temporalClient, iw.taskQueue, worker.Options{}) iw.worker.RegisterWorkflow(Interpreter) iw.worker.RegisterActivity(interpreter.StateStart)