From 428f8e4234db19678b27b1d4fa0947302452123d Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Fri, 4 Oct 2024 10:34:36 -0700 Subject: [PATCH] Early-return sample (#366) --- README.md | 3 + early-return/README.md | 17 +++++ early-return/starter/main.go | 57 +++++++++++++++++ early-return/worker/main.go | 27 ++++++++ early-return/workflow.go | 116 ++++++++++++++++++++++++++++++++++ early-return/workflow_test.go | 102 ++++++++++++++++++++++++++++++ 6 files changed, 322 insertions(+) create mode 100644 early-return/README.md create mode 100644 early-return/starter/main.go create mode 100644 early-return/worker/main.go create mode 100644 early-return/workflow.go create mode 100644 early-return/workflow_test.go diff --git a/README.md b/README.md index 387e863e..330a0df8 100644 --- a/README.md +++ b/README.md @@ -232,6 +232,9 @@ resource waiting its successful completion - [**Request/Response with Response Updates**](./reqrespupdate): Demonstrates how to accept requests and responsond via updates. +- [**Early-Return**](./early-return): + Demonstrates how to receive a response mid-workflow, while the workflow continues to run to completion. + ### Pending examples Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/ diff --git a/early-return/README.md b/early-return/README.md new file mode 100644 index 00000000..ace156e9 --- /dev/null +++ b/early-return/README.md @@ -0,0 +1,17 @@ +### Early-Return Sample + +This sample demonstrates an early-return from a workflow. + +By utilizing Update-with-Start, a client can start a new workflow and synchronously receive +a response mid-workflow, while the workflow continues to run to completion. + +### Steps to run this sample: +1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). +2) Run the following command to start the worker +``` +go run early-return/worker/main.go +``` +3) Run the following command to start the example +``` +go run early-return/starter/main.go +``` diff --git a/early-return/starter/main.go b/early-return/starter/main.go new file mode 100644 index 00000000..ad54eb5d --- /dev/null +++ b/early-return/starter/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "log" + "time" + + "github.com/pborman/uuid" + "github.com/temporalio/samples-go/early-return" + "go.temporal.io/sdk/client" +) + +func main() { + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + updateOperation := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: earlyreturn.UpdateName, + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + tx := earlyreturn.Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100} + workflowOptions := client.StartWorkflowOptions{ + ID: "early-return-workflow-ID-" + tx.ID, + TaskQueue: earlyreturn.TaskQueueName, + WithStartOperation: updateOperation, + } + we, err := c.ExecuteWorkflow(ctxWithTimeout, workflowOptions, earlyreturn.Workflow, tx) + if err != nil { + log.Fatalln("Error executing workflow:", err) + } + + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + updateHandle, err := updateOperation.Get(ctxWithTimeout) + if err != nil { + log.Fatalln("Error obtaining update handle:", err) + } + + err = updateHandle.Get(ctxWithTimeout, nil) + if err != nil { + // The workflow will continue running, cancelling the transaction. + + // NOTE: If the error is retryable, a retry attempt must use a unique workflow ID. + log.Fatalln("Error obtaining update result:", err) + } + + log.Println("Transaction initialized successfully") + // The workflow will continue running, completing the transaction. +} diff --git a/early-return/worker/main.go b/early-return/worker/main.go new file mode 100644 index 00000000..1c8520ef --- /dev/null +++ b/early-return/worker/main.go @@ -0,0 +1,27 @@ +package main + +import ( + "log" + + "github.com/temporalio/samples-go/early-return" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, earlyreturn.TaskQueueName, worker.Options{}) + + w.RegisterWorkflow(earlyreturn.Workflow) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} diff --git a/early-return/workflow.go b/early-return/workflow.go new file mode 100644 index 00000000..2ea47762 --- /dev/null +++ b/early-return/workflow.go @@ -0,0 +1,116 @@ +package earlyreturn + +import ( + "context" + "errors" + "fmt" + "time" + + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/workflow" +) + +const ( + UpdateName = "early-return" + TaskQueueName = "early-return-tq" +) + +type Transaction struct { + ID string + SourceAccount string + TargetAccount string + Amount int // in cents + + initErr error + initDone bool +} + +// Workflow processes a transaction in two phases. First, the transaction is initialized, and if successful, +// it proceeds to completion. However, if initialization fails - due to validation errors or transient +// issues (e.g., network connectivity problems) - the transaction is cancelled. +// +// By utilizing Update-with-Start, the client can initiate the workflow and immediately receive the result of +// the initialization in a single round trip, even before the transaction processing completes. The remainder +// of the transaction is then processed asynchronously. +func Workflow(ctx workflow.Context, tx Transaction) error { + return run(ctx, tx) +} + +func run(ctx workflow.Context, tx Transaction) error { + logger := workflow.GetLogger(ctx) + + if err := workflow.SetUpdateHandler( + ctx, + UpdateName, + tx.returnInitResult, + ); err != nil { + return err + } + + // Phase 1: Initialize the transaction synchronously. + // + // By using a local activity, an additional server roundtrip is avoided. + // See https://docs.temporal.io/activities#local-activity for more details. + + activityOptions := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: 5 * time.Second, // short timeout to avoid another Workflow Task being scheduled + }) + tx.initErr = workflow.ExecuteLocalActivity(activityOptions, tx.InitTransaction).Get(ctx, nil) + tx.initDone = true + + // Phase 2: Complete or cancel the transaction asychronously. + + activityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Second, + }) + if tx.initErr != nil { + logger.Error(fmt.Sprintf("cancelling transaction due to init error: %v", tx.initErr)) + + // Transaction failed to be initialized or not quickly enough; cancel the transaction. + if err := workflow.ExecuteActivity(activityCtx, tx.CancelTransaction).Get(ctx, nil); err != nil { + return fmt.Errorf("cancelling the transaction failed: %w", err) + } + + return tx.initErr + } + + logger.Info("completing transaction") + + // Transaction was initialized successfully; complete the transaction. + if err := workflow.ExecuteActivity(activityCtx, tx.CompleteTransaction).Get(ctx, nil); err != nil { + return fmt.Errorf("completing the transaction failed: %w", err) + } + + return nil +} + +func (tx *Transaction) returnInitResult(ctx workflow.Context) error { + if err := workflow.Await(ctx, func() bool { return tx.initDone }); err != nil { + return fmt.Errorf("transaction init cancelled: %w", err) + } + return tx.initErr +} + +func (tx *Transaction) InitTransaction(ctx context.Context) error { + logger := activity.GetLogger(ctx) + if tx.Amount <= 0 { + return errors.New("invalid Amount") + } + time.Sleep(500 * time.Millisecond) + logger.Info("Transaction initialized") + return nil +} + +func (tx *Transaction) CancelTransaction(ctx context.Context) error { + logger := activity.GetLogger(ctx) + time.Sleep(1 * time.Second) + logger.Info("Transaction cancelled") + return nil +} + +func (tx *Transaction) CompleteTransaction(ctx context.Context) error { + logger := activity.GetLogger(ctx) + time.Sleep(1 * time.Second) + logger.Info("Transaction completed") + return nil +} diff --git a/early-return/workflow_test.go b/early-return/workflow_test.go new file mode 100644 index 00000000..e47ee598 --- /dev/null +++ b/early-return/workflow_test.go @@ -0,0 +1,102 @@ +package earlyreturn + +import ( + "fmt" + "testing" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" +) + +func Test_CompleteTransaction(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100} + env.RegisterActivity(tx.InitTransaction) + env.RegisterActivity(tx.CompleteTransaction) + + uc := &updateCallback{} + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), uc) + }, 0) // NOTE: zero delay ensures Update is delivered in first workflow task + env.ExecuteWorkflow(Workflow, tx) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + require.NoError(t, uc.completeErr) +} + +func Test_CompleteTransaction_Fails(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100} + env.RegisterActivity(tx.InitTransaction) + env.RegisterActivity(tx.CompleteTransaction) + + env.OnActivity(tx.CompleteTransaction, mock.Anything).Return(fmt.Errorf("crash")) + + uc := &updateCallback{} + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), uc) + }, 0) + env.ExecuteWorkflow(Workflow, tx) + + require.True(t, env.IsWorkflowCompleted()) + require.ErrorContains(t, env.GetWorkflowError(), "crash") +} + +func Test_CancelTransaction(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid! + env.RegisterActivity(tx.InitTransaction) + env.RegisterActivity(tx.CancelTransaction) + + uc := &updateCallback{} + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), uc) + }, 0) + env.ExecuteWorkflow(Workflow, tx) + + require.True(t, env.IsWorkflowCompleted()) + require.ErrorContains(t, uc.completeErr, "invalid Amount") + require.ErrorContains(t, env.GetWorkflowError(), "invalid Amount") +} + +func Test_CancelTransaction_Fails(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid! + env.RegisterActivity(tx.InitTransaction) + env.RegisterActivity(tx.CancelTransaction) + + env.OnActivity(tx.CancelTransaction, mock.Anything).Return(fmt.Errorf("crash")) + + uc := &updateCallback{} + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), uc) + }, 0) + env.ExecuteWorkflow(Workflow, tx) + + require.True(t, env.IsWorkflowCompleted()) + require.ErrorContains(t, uc.completeErr, "invalid Amount") + require.ErrorContains(t, env.GetWorkflowError(), "crash") +} + +type updateCallback struct { + completeErr error +} + +func (uc *updateCallback) Accept() {} + +func (uc *updateCallback) Reject(err error) {} + +func (uc *updateCallback) Complete(success interface{}, err error) { + uc.completeErr = err +}