Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Oct 4, 2024
1 parent ecd78c8 commit 2097b86
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 66 deletions.
36 changes: 0 additions & 36 deletions early-return/activity.go

This file was deleted.

7 changes: 4 additions & 3 deletions early-return/starter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func main() {
WaitForStage: client.WorkflowUpdateStageCompleted,
})

tx := earlyreturn.Transaction{ID: uuid.New(), FromAccount: "Bob", ToAccount: "Alice", Amount: 100.0}
tx := earlyreturn.Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100}
workflowOptions := client.StartWorkflowOptions{
ID: "early-return-workflow-ID-" + tx.ID,
TaskQueue: earlyreturn.TaskQueueName,
Expand All @@ -46,11 +46,12 @@ func main() {

err = updateHandle.Get(ctxWithTimeout, nil)
if err != nil {
// The workflow will continue running, cancelling the transaction.

// NOTE: If the error is retryable, a retry attempt must use a unique workflow ID.
log.Fatalln("Error obtaining update result:", err)
}

log.Println("Transaction initialized successfully")

// The workflow will continue running, either completing or cancelling the transaction.
// The workflow will continue running, completing the transaction.
}
87 changes: 60 additions & 27 deletions early-return/workflow.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
package earlyreturn

import (
"context"
"errors"
"fmt"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

const (
UpdateName = "early-return"
TaskQueueName = "early-return-tq"
activityTimeout = 2 * time.Second
earlyReturnTimeout = 5 * time.Second
UpdateName = "early-return"
TaskQueueName = "early-return-tq"
)

type Transaction struct {
ID string
FromAccount string
ToAccount string
Amount float64
ID string
SourceAccount string
TargetAccount string
Amount int // in cents

initErr error
initDone bool
}

// Workflow processes a transaction in two phases. First, the transaction is initialized, and if successful,
Expand All @@ -30,22 +33,12 @@ type Transaction struct {
// the initialization in a single round trip, even before the transaction processing completes. The remainder
// of the transaction is then processed asynchronously.
func Workflow(ctx workflow.Context, tx Transaction) error {
var initErr error
var initDone bool
logger := workflow.GetLogger(ctx)

if err := workflow.SetUpdateHandler(
ctx,
UpdateName,
func(ctx workflow.Context) error {
condition := func() bool { return initDone }
if completed, err := workflow.AwaitWithTimeout(ctx, earlyReturnTimeout, condition); err != nil {
return fmt.Errorf("update cancelled: %w", err)
} else if !completed {
return errors.New("update timed out")
}
return initErr
},
tx.ReturnInitResult,
); err != nil {
return err
}
Expand All @@ -56,24 +49,64 @@ func Workflow(ctx workflow.Context, tx Transaction) error {
// See https://docs.temporal.io/activities#local-activity for more details.

activityOptions := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
ScheduleToCloseTimeout: activityTimeout,
ScheduleToCloseTimeout: 10 * time.Second,
})
initErr = workflow.ExecuteLocalActivity(activityOptions, InitTransaction, tx).Get(ctx, nil)
initDone = true
tx.initErr = workflow.ExecuteLocalActivity(activityOptions, tx.InitTransaction).Get(ctx, nil)
tx.initDone = true

// Phase 2: Complete or cancel the transaction asychronously.

activityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
StartToCloseTimeout: 30 * time.Second,
})
if initErr != nil {
logger.Info("cancelling transaction due to error: %v", initErr)
if tx.initErr != nil {
logger.Error(fmt.Sprintf("cancelling transaction due to init error: %v", tx.initErr))

// Transaction failed to be initialized or not quickly enough; cancel the transaction.
return workflow.ExecuteActivity(activityCtx, CancelTransaction, tx).Get(ctx, nil)
if err := workflow.ExecuteActivity(activityCtx, tx.CancelTransaction).Get(ctx, nil); err != nil {
return fmt.Errorf("cancelling the transaction failed: %w", err)
}

return tx.initErr
}

logger.Info("completing transaction")

// Transaction was initialized successfully; complete the transaction.
return workflow.ExecuteActivity(activityCtx, CompleteTransaction, tx).Get(ctx, nil)
if err := workflow.ExecuteActivity(activityCtx, tx.CompleteTransaction).Get(ctx, nil); err != nil {
return fmt.Errorf("completing the transaction failed: %w", err)
}

return nil
}

func (tx *Transaction) ReturnInitResult(ctx workflow.Context) error {
if err := workflow.Await(ctx, func() bool { return tx.initDone }); err != nil {
return fmt.Errorf("transaction init cancelled: %w", err)
}
return tx.initErr
}

func (tx *Transaction) InitTransaction(ctx context.Context) error {
logger := activity.GetLogger(ctx)
if tx.Amount <= 0 {
return errors.New("invalid Amount")
}
time.Sleep(500 * time.Millisecond)
logger.Info("Transaction initialized")
return nil
}

func (tx *Transaction) CancelTransaction(ctx context.Context) error {
logger := activity.GetLogger(ctx)
time.Sleep(1 * time.Second)
logger.Info("Transaction cancelled")
return nil
}

func (tx *Transaction) CompleteTransaction(ctx context.Context) error {
logger := activity.GetLogger(ctx)
time.Sleep(1 * time.Second)
logger.Info("Transaction completed")
return nil
}
103 changes: 103 additions & 0 deletions early-return/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package earlyreturn

import (
"fmt"
"testing"

"github.com/pborman/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func Test_CompleteTransaction(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100}
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CompleteTransaction)

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0)
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
require.NoError(t, uc.completeErr)
}

func Test_CompleteTransaction_Fails(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100}
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CompleteTransaction)

env.OnActivity(tx.CompleteTransaction, mock.Anything).Return(fmt.Errorf("crash"))

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0)
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.ErrorContains(t, env.GetWorkflowError(), "crash")
}

func Test_CancelTransaction(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid!
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CancelTransaction)

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0)
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.ErrorContains(t, uc.completeErr, "invalid Amount")
require.ErrorContains(t, env.GetWorkflowError(), "invalid Amount")
}

func Test_CancelTransaction_Fails(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid!
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CancelTransaction)

env.OnActivity(tx.CancelTransaction, mock.Anything).Return(fmt.Errorf("crash"))

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0)
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.ErrorContains(t, uc.completeErr, "invalid Amount")
require.ErrorContains(t, env.GetWorkflowError(), "crash")
}

type updateCallback struct {
complete func(any, error)

Check failure on line 93 in early-return/workflow_test.go

View workflow job for this annotation

GitHub Actions / build-and-test

field complete is unused (U1000)
completeErr error
}

func (uc *updateCallback) Accept() {}

func (uc *updateCallback) Reject(err error) {}

func (uc *updateCallback) Complete(success interface{}, err error) {
uc.completeErr = err
}

0 comments on commit 2097b86

Please sign in to comment.