Skip to content

Commit

Permalink
Early-return sample (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos authored Oct 4, 2024
1 parent 8c4e82a commit 428f8e4
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 0 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ resource waiting its successful completion
- [**Request/Response with Response Updates**](./reqrespupdate):
Demonstrates how to accept requests and responsond via updates.

- [**Early-Return**](./early-return):
Demonstrates how to receive a response mid-workflow, while the workflow continues to run to completion.

### Pending examples

Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/
Expand Down
17 changes: 17 additions & 0 deletions early-return/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### Early-Return Sample

This sample demonstrates an early-return from a workflow.

By utilizing Update-with-Start, a client can start a new workflow and synchronously receive
a response mid-workflow, while the workflow continues to run to completion.

### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker
```
go run early-return/worker/main.go
```
3) Run the following command to start the example
```
go run early-return/starter/main.go
```
57 changes: 57 additions & 0 deletions early-return/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"
"log"
"time"

"github.com/pborman/uuid"
"github.com/temporalio/samples-go/early-return"
"go.temporal.io/sdk/client"
)

func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

updateOperation := client.NewUpdateWithStartWorkflowOperation(
client.UpdateWorkflowOptions{
UpdateName: earlyreturn.UpdateName,
WaitForStage: client.WorkflowUpdateStageCompleted,
})

tx := earlyreturn.Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100}
workflowOptions := client.StartWorkflowOptions{
ID: "early-return-workflow-ID-" + tx.ID,
TaskQueue: earlyreturn.TaskQueueName,
WithStartOperation: updateOperation,
}
we, err := c.ExecuteWorkflow(ctxWithTimeout, workflowOptions, earlyreturn.Workflow, tx)
if err != nil {
log.Fatalln("Error executing workflow:", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

updateHandle, err := updateOperation.Get(ctxWithTimeout)
if err != nil {
log.Fatalln("Error obtaining update handle:", err)
}

err = updateHandle.Get(ctxWithTimeout, nil)
if err != nil {
// The workflow will continue running, cancelling the transaction.

// NOTE: If the error is retryable, a retry attempt must use a unique workflow ID.
log.Fatalln("Error obtaining update result:", err)
}

log.Println("Transaction initialized successfully")
// The workflow will continue running, completing the transaction.
}
27 changes: 27 additions & 0 deletions early-return/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"log"

"github.com/temporalio/samples-go/early-return"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, earlyreturn.TaskQueueName, worker.Options{})

w.RegisterWorkflow(earlyreturn.Workflow)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
116 changes: 116 additions & 0 deletions early-return/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package earlyreturn

import (
"context"
"errors"
"fmt"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

const (
UpdateName = "early-return"
TaskQueueName = "early-return-tq"
)

type Transaction struct {
ID string
SourceAccount string
TargetAccount string
Amount int // in cents

initErr error
initDone bool
}

// Workflow processes a transaction in two phases. First, the transaction is initialized, and if successful,
// it proceeds to completion. However, if initialization fails - due to validation errors or transient
// issues (e.g., network connectivity problems) - the transaction is cancelled.
//
// By utilizing Update-with-Start, the client can initiate the workflow and immediately receive the result of
// the initialization in a single round trip, even before the transaction processing completes. The remainder
// of the transaction is then processed asynchronously.
func Workflow(ctx workflow.Context, tx Transaction) error {
return run(ctx, tx)
}

func run(ctx workflow.Context, tx Transaction) error {
logger := workflow.GetLogger(ctx)

if err := workflow.SetUpdateHandler(
ctx,
UpdateName,
tx.returnInitResult,
); err != nil {
return err
}

// Phase 1: Initialize the transaction synchronously.
//
// By using a local activity, an additional server roundtrip is avoided.
// See https://docs.temporal.io/activities#local-activity for more details.

activityOptions := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
ScheduleToCloseTimeout: 5 * time.Second, // short timeout to avoid another Workflow Task being scheduled
})
tx.initErr = workflow.ExecuteLocalActivity(activityOptions, tx.InitTransaction).Get(ctx, nil)
tx.initDone = true

// Phase 2: Complete or cancel the transaction asychronously.

activityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
})
if tx.initErr != nil {
logger.Error(fmt.Sprintf("cancelling transaction due to init error: %v", tx.initErr))

// Transaction failed to be initialized or not quickly enough; cancel the transaction.
if err := workflow.ExecuteActivity(activityCtx, tx.CancelTransaction).Get(ctx, nil); err != nil {
return fmt.Errorf("cancelling the transaction failed: %w", err)
}

return tx.initErr
}

logger.Info("completing transaction")

// Transaction was initialized successfully; complete the transaction.
if err := workflow.ExecuteActivity(activityCtx, tx.CompleteTransaction).Get(ctx, nil); err != nil {
return fmt.Errorf("completing the transaction failed: %w", err)
}

return nil
}

func (tx *Transaction) returnInitResult(ctx workflow.Context) error {
if err := workflow.Await(ctx, func() bool { return tx.initDone }); err != nil {
return fmt.Errorf("transaction init cancelled: %w", err)
}
return tx.initErr
}

func (tx *Transaction) InitTransaction(ctx context.Context) error {
logger := activity.GetLogger(ctx)
if tx.Amount <= 0 {
return errors.New("invalid Amount")
}
time.Sleep(500 * time.Millisecond)
logger.Info("Transaction initialized")
return nil
}

func (tx *Transaction) CancelTransaction(ctx context.Context) error {
logger := activity.GetLogger(ctx)
time.Sleep(1 * time.Second)
logger.Info("Transaction cancelled")
return nil
}

func (tx *Transaction) CompleteTransaction(ctx context.Context) error {
logger := activity.GetLogger(ctx)
time.Sleep(1 * time.Second)
logger.Info("Transaction completed")
return nil
}
102 changes: 102 additions & 0 deletions early-return/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package earlyreturn

import (
"fmt"
"testing"

"github.com/pborman/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func Test_CompleteTransaction(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100}
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CompleteTransaction)

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0) // NOTE: zero delay ensures Update is delivered in first workflow task
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
require.NoError(t, uc.completeErr)
}

func Test_CompleteTransaction_Fails(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100}
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CompleteTransaction)

env.OnActivity(tx.CompleteTransaction, mock.Anything).Return(fmt.Errorf("crash"))

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0)
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.ErrorContains(t, env.GetWorkflowError(), "crash")
}

func Test_CancelTransaction(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid!
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CancelTransaction)

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0)
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.ErrorContains(t, uc.completeErr, "invalid Amount")
require.ErrorContains(t, env.GetWorkflowError(), "invalid Amount")
}

func Test_CancelTransaction_Fails(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid!
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CancelTransaction)

env.OnActivity(tx.CancelTransaction, mock.Anything).Return(fmt.Errorf("crash"))

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0)
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.ErrorContains(t, uc.completeErr, "invalid Amount")
require.ErrorContains(t, env.GetWorkflowError(), "crash")
}

type updateCallback struct {
completeErr error
}

func (uc *updateCallback) Accept() {}

func (uc *updateCallback) Reject(err error) {}

func (uc *updateCallback) Complete(success interface{}, err error) {
uc.completeErr = err
}

0 comments on commit 428f8e4

Please sign in to comment.