Skip to content

Commit

Permalink
Add events ch middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
theskyinflames-macos authored and theskyinflames-macos committed Dec 21, 2022
1 parent 4c34cdd commit 5dd3559
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 69 deletions.
3 changes: 2 additions & 1 deletion examples/command_bus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/theskyinflames/cqrs-eda/pkg/bus"
"github.com/theskyinflames/cqrs-eda/pkg/cqrs"
"github.com/theskyinflames/cqrs-eda/pkg/events"
"github.com/theskyinflames/cqrs-eda/pkg/helpers"

"github.com/google/uuid"
Expand All @@ -29,7 +30,7 @@ func (ac AddUserCommand) Name() string {
type AddUserCommandHandler struct{}

// Handle implements cqrs.CommandHandler interface
func (ch AddUserCommandHandler) Handle(ctx context.Context, cmd cqrs.Command) ([]cqrs.Event, error) {
func (ch AddUserCommandHandler) Handle(ctx context.Context, cmd cqrs.Command) ([]events.Event, error) {
addUserCmd, ok := cmd.(AddUserCommand)
if !ok {
return nil, fmt.Errorf("expected command %s, but received %s", addUserCommandName, cmd.Name())
Expand Down
41 changes: 28 additions & 13 deletions pkg/cqrs/cqrs.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package cqrs

//go:generate moq -stub -out mock_cqrs_test.go -pkg cqrs_test . Command Query CommandHandler QueryHandler Event
//go:generate moq -stub -out mock_cqrs_test.go -pkg cqrs_test . Command Query CommandHandler QueryHandler Bus
//go:generate moq -stub -out mock_logger_test.go -pkg cqrs_test . Logger

import (
"context"
"encoding/json"

"github.com/google/uuid"
"github.com/theskyinflames/cqrs-eda/pkg/bus"
"github.com/theskyinflames/cqrs-eda/pkg/events"
)

//go:generate moq -stub -out zmock_cqrs_event_test.go -pkg cqrs_test . Event
Expand All @@ -17,27 +18,21 @@ type Logger interface {
Printf(format string, v ...interface{})
}

// Event is an event
type Event interface {
Name() string
AggregateID() uuid.UUID
}

// Command is a CQRS command
type Command interface {
Name() string
}

// CommandHandler handles a command
type CommandHandler interface {
Handle(ctx context.Context, cmd Command) ([]Event, error)
Handle(ctx context.Context, cmd Command) ([]events.Event, error)
}

// CommandHandlerFunc is a function that implements CommandHandler interface
type CommandHandlerFunc func(ctx context.Context, cmd Command) ([]Event, error)
type CommandHandlerFunc func(ctx context.Context, cmd Command) ([]events.Event, error)

// Handle implements the CommandHandler interface
func (chf CommandHandlerFunc) Handle(ctx context.Context, cmd Command) ([]Event, error) {
func (chf CommandHandlerFunc) Handle(ctx context.Context, cmd Command) ([]events.Event, error) {
return chf(ctx, cmd)
}

Expand All @@ -47,7 +42,7 @@ type CommandHandlerMiddleware func(CommandHandler) CommandHandler
// CommandHandlerMultiMiddleware is self-described
func CommandHandlerMultiMiddleware(mws ...CommandHandlerMiddleware) CommandHandlerMiddleware {
return func(ch CommandHandler) CommandHandler {
return CommandHandlerFunc(func(ctx context.Context, cmd Command) ([]Event, error) {
return CommandHandlerFunc(func(ctx context.Context, cmd Command) ([]events.Event, error) {
mw := mws[0](ch)
for _, outerMw := range mws[1:] {
mw = outerMw(mw)
Expand All @@ -60,7 +55,7 @@ func CommandHandlerMultiMiddleware(mws ...CommandHandlerMiddleware) CommandHandl
// ChErrMw is a command handler middleware
func ChErrMw(l Logger) CommandHandlerMiddleware {
return func(ch CommandHandler) CommandHandler {
return CommandHandlerFunc(func(ctx context.Context, cmd Command) ([]Event, error) {
return CommandHandlerFunc(func(ctx context.Context, cmd Command) ([]events.Event, error) {
evs, err := ch.Handle(ctx, cmd)
if err != nil {
b, _ := json.Marshal(cmd)
Expand All @@ -71,6 +66,26 @@ func ChErrMw(l Logger) CommandHandlerMiddleware {
}
}

// Bus is an Events bus
type Bus interface {
Dispatch(context.Context, bus.Dispatchable) (interface{}, error)
}

// ChEventMw is a domain events handler middleware
func ChEventMw(eventsBus Bus) CommandHandlerMiddleware {
return func(ch CommandHandler) CommandHandler {
return CommandHandlerFunc(func(ctx context.Context, cmd Command) ([]events.Event, error) {
evs, err := ch.Handle(ctx, cmd)
if err == nil {
for _, e := range evs {
eventsBus.Dispatch(ctx, e)
}
}
return evs, err
})
}
}

// Query is a CQRS query
type Query interface {
Name() string
Expand Down
74 changes: 70 additions & 4 deletions pkg/cqrs/cqrs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"errors"
"testing"

"github.com/theskyinflames/cqrs-eda/pkg/bus"
"github.com/theskyinflames/cqrs-eda/pkg/cqrs"
"github.com/theskyinflames/cqrs-eda/pkg/events"

"github.com/stretchr/testify/require"
)
Expand All @@ -16,7 +18,7 @@ func TestChErrMw(t *testing.T) {
logger = &LoggerMock{}
randomErr = errors.New("")
ch = &CommandHandlerMock{
HandleFunc: func(_ context.Context, _ cqrs.Command) ([]cqrs.Event, error) {
HandleFunc: func(_ context.Context, _ cqrs.Command) ([]events.Event, error) {
return nil, randomErr
},
}
Expand Down Expand Up @@ -60,8 +62,8 @@ func TestCommandHandlerMultiMiddleware(t *testing.T) {
ev = &EventMock{}

ch = &CommandHandlerMock{
HandleFunc: func(_ context.Context, _ cqrs.Command) ([]cqrs.Event, error) {
return []cqrs.Event{ev}, nil
HandleFunc: func(_ context.Context, _ cqrs.Command) ([]events.Event, error) {
return []events.Event{ev}, nil
},
}
)
Expand All @@ -80,7 +82,7 @@ func TestCommandHandlerMultiMiddleware(t *testing.T) {

func chTestMw(name string, calls *[]string) cqrs.CommandHandlerMiddleware {
return func(ch cqrs.CommandHandler) cqrs.CommandHandler {
return cqrs.CommandHandlerFunc(func(ctx context.Context, cmd cqrs.Command) ([]cqrs.Event, error) {
return cqrs.CommandHandlerFunc(func(ctx context.Context, cmd cqrs.Command) ([]events.Event, error) {
*calls = append(*calls, name)
return ch.Handle(ctx, cmd)
})
Expand Down Expand Up @@ -125,3 +127,67 @@ func qhTestMw(name string, calls *[]string) cqrs.QueryHandlerMiddleware {
})
}
}

func TestChEventMw(t *testing.T) {
t.Run(`Given a events ch middleware with a events bus,
when it catches an error from the ch,
then no events are dispatched to the events bus`, func(t *testing.T) {
var (
eventName = "entity.changed"
ev = &EventMock{
NameFunc: func() string {
return eventName
},
}
evBus = &BusMock{
DispatchFunc: func(_ context.Context, _ bus.Dispatchable) (interface{}, error) {
return nil, nil
},
}
err = errors.New("")
ch = &CommandHandlerMock{
HandleFunc: func(_ context.Context, _ cqrs.Command) ([]events.Event, error) {
return []events.Event{ev}, err
},
}
)

evs, gotErr := cqrs.ChEventMw(evBus)(ch).Handle(context.Background(), &CommandMock{})
require.ErrorIs(t, err, gotErr)
require.Len(t, ch.HandleCalls(), 1)
require.Len(t, evBus.DispatchCalls(), 0)
require.Len(t, evs, 1)
require.Equal(t, ev, evs[0])
})

t.Run(`Given a events ch middleware with a events bus,
when it catches events from the ch,
then they are dispatched to the events bus`, func(t *testing.T) {
var (
eventName = "entity.changed"
ev = &EventMock{
NameFunc: func() string {
return eventName
},
}
evBus = &BusMock{
DispatchFunc: func(_ context.Context, _ bus.Dispatchable) (interface{}, error) {
return nil, nil
},
}
ch = &CommandHandlerMock{
HandleFunc: func(_ context.Context, _ cqrs.Command) ([]events.Event, error) {
return []events.Event{ev}, nil
},
}
)

evs, err := cqrs.ChEventMw(evBus)(ch).Handle(context.Background(), &CommandMock{})
require.NoError(t, err)
require.Len(t, ch.HandleCalls(), 1)
require.Len(t, evBus.DispatchCalls(), 1)
require.Equal(t, ev, evBus.DispatchCalls()[0].Dispatchable)
require.Len(t, evs, 1)
require.Equal(t, ev, evs[0])
})
}
Loading

0 comments on commit 5dd3559

Please sign in to comment.