Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
theskyinflames-macos authored and theskyinflames-macos committed Dec 9, 2022
0 parents commit bec1019
Show file tree
Hide file tree
Showing 23 changed files with 2,350 additions and 0 deletions.
675 changes: 675 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
default: test

test:
go test -v -race ./...

lint:
revive -config ./revive.toml
go mod tidy -v && git --no-pager diff --quiet go.mod go.sum

tools: tool-moq tool-revive

tool-revive:
go install github.com/mgechev/revive@main

tool-moq:
go install github.com/matryer/moq@main

todo:
find . -name '*.go' \! -name '*_generated.go' -prune | xargs grep -n TODO


63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# CQRS - EDA
This repo contains a set of tools to implement CQRS/EDA services. This tooling is composed of:
* CQRS utils:
* Command, command handler
* Command middleware
* Query, query handler
* Query handler middleware
* EDA:
* Events basic
* Events listener
* Bus:
* Sequential generic bus
* Concurrent generic bus

## CQRS
[CQRS](https://learn.microsoft.com/en-us/azure/architecture/patterns/cqrs) is a pattern that allows isolating the operations that modify the domain state, called *Commands*, from those that don't, called *Queries*. As a result of a *Command* execution, one or more domain events will be published.

As commands as queries are handled by specialized handlers called *Command Handler* and *Query Handler* respectively.

In some documentation, CQRS uses a read model as a separate infrastructure that serves queries, but I don't see it this way. You can use CQRS without that, and it only makes sense when you need to split R/W operations on your domain.

### C/Q handler middlewares
CommandHandler and QueryHandler middlewares are used to intercept the flux to and from the handler. They help inject handler dependencies and react to the handler return. There is a simple example of middleware that prints handler-returned errors. Another option is to wrap command handlers in a DB TX middleware in charge of starting a transaction, pass it to the command handler, and rollbacking or committing the tx depending on whether the command handler fails.

It is a pattern that allows the C/Q handler taking care only of what is its responsibility as application services

You will find the CQRS tooling in [pkg/cqrs](pkg/cqrs) directory.

## Events and EDA
[EDA](https://en.wikipedia.org/wiki/Event-driven_architecture) stands for *Event-Driven-Architecture* It's an architectural pattern that allows decoupling the command handler that executes the command, and hence, the one that changes the domain, from those that react to this change. These reacting command handlers can belong to the same service or not. This decoupling is achieved by domain events publishing.

You will find the Events tooling in [pkg/events](pkg/events) directory.

### Events listener
The events tooling includes an events listener implementation. It's in charge of listening to a specific event and dispatching it to an event handler. Usually, this event handler will map the event to a command and call a command handler to react to the domain change notified by the event.

Take into account that the tradeoff of EDA architectures is [eventual consistency](https://en.wikipedia.org/wiki/Eventual_consistency)

## Bus and Hexagonal Architecture
As CQRS architectures as EDA ones, use a bus:

* CQRS uses a command/query bus to dispatch the commands and queries from the entry point, usually an HTTP or RCP API. It allows the decoupling of the infra layer from the application layer where command and query handlers live.

* For EDA architectures, when event consumers live in the same bounded context (same service), they're usually dispatched to an Even bus, which is in charge of delegating them to the corresponding event handlers. Usually, these event handlers will map the event to a command and dispatch it to the command bus.

You will find the Bus tooling in [pkg/bus](pkg/bus) directory.

### Bus implementations
There are two bus implementations: a sequential and a concurrent. Use the first one if you don't have performance issues related to events dispatching.

## Examples
I've implemented some examples to help you to understand how to use this tooling:

* [command_bus](examples/concurrent_bus) Example of a command dispatched to a command bus using the sequential bus.

* [event_bus](examples/events_bus) Example of an event dispatched to a sequential event bus.

* [concurrent_event_bus](examples/concurrent_event_bus) Example of two events dispatched to a concurrent event bus

## Do you think this is useful? back me up
Thinking and building this tool has taken part of my time and effort. If you find it useful, and you think I deserve it, you can invite me a coffee :-)

<a href="https://www.buymeacoffee.com/jaumearus" target="_blank"><img src="https://cdn.buymeacoffee.com/buttons/default-orange.png" alt="Buy Me A Coffee" height="41" width="174"></a>
66 changes: 66 additions & 0 deletions examples/command_bus/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"errors"
"fmt"
"time"

"github.com/theskyinflames/cqrs-eda/pkg/bus"
"github.com/theskyinflames/cqrs-eda/pkg/cqrs"

"github.com/google/uuid"
)

// AddUserCommand is a command
type AddUserCommand struct {
ID uuid.UUID
UserName string
}

const addUserCommandName = "add_user"

// Name implements cqrs.Name interface
func (ac AddUserCommand) Name() string {
return addUserCommandName
}

// AddUserCommandHandler is a command handler
type AddUserCommandHandler struct{}

// Handle implements cqrs.CommandHandler interface
func (ch AddUserCommandHandler) Handle(ctx context.Context, cmd cqrs.Command) ([]cqrs.Event, error) {
addUserCmd, ok := cmd.(AddUserCommand)
if !ok {
return nil, fmt.Errorf("expected command %s, but received %s", addUserCommandName, cmd.Name())
}
fmt.Printf("added user: %s (%s)\n", addUserCmd.UserName, addUserCmd.ID)
return nil, nil
}

func main() {
bus := bus.New()
bus.Register(addUserCommandName, busHandler(AddUserCommandHandler{}))

cmd := AddUserCommand{
ID: uuid.New(),
UserName: "Bond, James Bond",
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bus.Dispatch(ctx, cmd)

// Give time to output traces
time.Sleep(time.Second)
}

func busHandler(ch cqrs.CommandHandler) bus.Handler {
return func(ctx context.Context, d bus.Dispatchable) (any, error) {
cmd, ok := d.(cqrs.Command)
if !ok {
return nil, errors.New("unexpected dispatchable")
}
return ch.Handle(ctx, cmd)
}
}
119 changes: 119 additions & 0 deletions examples/concurrent_event_bus/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package main

import (
"context"
"errors"
"fmt"
"time"

"github.com/theskyinflames/cqrs-eda/pkg/bus"
"github.com/theskyinflames/cqrs-eda/pkg/events"

"github.com/google/uuid"
)

/*
This is an example of concurrent events bus use case.
*/
func main() {
listeners, inChs := listeners(2)
busHandlers := busHandlers(inChs)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start the events listeners
errChan := make(chan error)
go func() {
for err := range errChan {
fmt.Println(err.Error())
}
}()
for _, l := range listeners {
go l.Listen(ctx, errChan)
}

// Create the events bus
const (
concurrencyLimit = 2
dispatchingTimeout = time.Second
)
bus := bus.NewConcurrentBus(dispatchingTimeout, concurrencyLimit)

// Register the handlers for each event to be dispatched to their listeners
for i, bh := range busHandlers {
bus.Register(eventName(i), bh)
}
go bus.Run(ctx)

// Dispatch an event 1
rsChan := bus.Dispatch(ctx, events.NewEventBasic(uuid.New(), eventName(0), nil))

// Dispatch an event 2
rsChan2 := bus.Dispatch(ctx, events.NewEventBasic(uuid.New(), eventName(1), nil))

go func() {
i := 0
for {
select {
case <-ctx.Done():
return
case eventHndResponse := <-rsChan:
fmt.Printf("RS 1: %#v\n", eventHndResponse)
i++
case eventHndResponse := <-rsChan2:
fmt.Printf("RS 2: %#v\n", eventHndResponse)
i++
}
if i == len(inChs) {
return
}
}
}()

// Giving time to output traces
time.Sleep(time.Second)
}

func listeners(n int) ([]events.Listener, []chan events.Event) {
var (
listeners []events.Listener
inChan []chan events.Event
)
for i := 0; i < n; i++ {
i := i
eventsChan := make(chan events.Event)
eventHandlers := []events.Handler{
func(e events.Event) {
fmt.Printf("eh %d.1, received %s event, with id %s\n", i, e.Name(), e.AggregateID().String())
},
func(e events.Event) {
fmt.Printf("eh %d.2, received %s event, with id %s\n", i, e.Name(), e.AggregateID().String())
},
}
listeners = append(listeners, events.NewListener(eventsChan, eventName(i), eventHandlers...))
inChan = append(inChan, eventsChan)
}

return listeners, inChan
}

func eventName(i int) string {
return fmt.Sprintf("event%d", i)
}

func busHandlers(inChs []chan events.Event) []bus.Handler {
var busHnds []bus.Handler
for i := range inChs {
i := i
busHnds = append(busHnds, func(_ context.Context, d bus.Dispatchable) (any, error) {
e, ok := d.(events.Event)
if !ok {
return nil, errors.New("unexpected dispatchable")
}
inChs[i] <- e
return fmt.Sprintf("dispatchable processed by hnd %d\n", i), nil
})
}
return busHnds
}
62 changes: 62 additions & 0 deletions examples/events_bus/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"context"
"errors"
"fmt"
"time"

"github.com/theskyinflames/cqrs-eda/pkg/bus"
"github.com/theskyinflames/cqrs-eda/pkg/events"

"github.com/google/uuid"
)

const eventName = "anEvent"

/*
This is an example of events bus use case.
*/
func main() {
var (
eventsChan = make(chan events.Event)
eventHandlers = []events.Handler{
func(e events.Event) {
fmt.Printf("eh1, received %s event, with id %s\n", e.Name(), e.AggregateID().String())
},
func(e events.Event) {
fmt.Printf("eh2, received %s event, with id %s\n", e.Name(), e.AggregateID().String())
},
}
eventsListener = events.NewListener(eventsChan, eventName, eventHandlers...)
busHandler bus.Handler = func(_ context.Context, d bus.Dispatchable) (any, error) {
e, ok := d.(events.Event)
if !ok {
return nil, errors.New("unexpected dispatchable")
}
eventsChan <- e
return "dispatchable processed", nil
}
)

// Start the events listener
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errChan := make(chan error)
go func() {
for err := range errChan {
fmt.Printf("events listener: %s", err.Error())
}
}()
go eventsListener.Listen(ctx, errChan)

// Start the events bus
bus := bus.New()
bus.Register(eventName, busHandler)

// Dispatch an event
bus.Dispatch(ctx, events.NewEventBasic(uuid.New(), eventName, nil))

// Give time to output the logs
time.Sleep(time.Second)
}
14 changes: 14 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module github.com/theskyinflames/cqrs-eda

go 1.19

require (
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.8.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
19 changes: 19 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading

0 comments on commit bec1019

Please sign in to comment.