From 95e19033207d9a1a60e7525ae231d5233eb18d8c Mon Sep 17 00:00:00 2001 From: theskyinflames-macos Date: Mon, 19 Dec 2022 19:47:46 +0100 Subject: [PATCH] Add basic aggregate type --- pkg/ddd/aggregate_test.go | 62 +++++++++++++++++++++++++++++++++++++++ pkg/ddd/aggretate.go | 46 +++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+) create mode 100644 pkg/ddd/aggregate_test.go create mode 100644 pkg/ddd/aggretate.go diff --git a/pkg/ddd/aggregate_test.go b/pkg/ddd/aggregate_test.go new file mode 100644 index 0000000..c052e10 --- /dev/null +++ b/pkg/ddd/aggregate_test.go @@ -0,0 +1,62 @@ +package ddd_test + +import ( + "sync" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/theskyinflames/cqrs-eda/pkg/ddd" +) + +type TestEvent struct { + name string +} + +func (e TestEvent) Name() string { + return e.name +} + +func TestRecordEventConcurrent(t *testing.T) { + a := ddd.NewAggregateBasic(uuid.New()) + + // Record events concurrently + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + a.RecordEvent(TestEvent{name: "event1"}) + } + }() + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + a.RecordEvent(TestEvent{name: "event2"}) + } + }() + wg.Wait() + + // Check that all events were recorded correctly + events := a.Events() + require.Len(t, events, 2000) + + event1Count := 0 + event2Count := 0 + for _, e := range events { + switch e.Name() { + case "event1": + event1Count++ + case "event2": + event2Count++ + default: + t.Error("Unexpected event name:", e.Name()) + } + } + if event1Count != 1000 { + t.Error("Expected 1000 event1 events, got", event1Count) + } + if event2Count != 1000 { + t.Error("Expected 1000 event2 events, got", event2Count) + } +} diff --git a/pkg/ddd/aggretate.go b/pkg/ddd/aggretate.go new file mode 100644 index 0000000..0fe1121 --- /dev/null +++ b/pkg/ddd/aggretate.go @@ -0,0 +1,46 @@ +package ddd + +import ( + "sync" + + "github.com/google/uuid" +) + +// Event is self-described +type Event interface { + Name() string +} + +// AggregateBasic implements +type AggregateBasic struct { + id uuid.UUID + events []Event + + mux *sync.Mutex +} + +// NewAggregateBasic is a constructor +func NewAggregateBasic(ID uuid.UUID) AggregateBasic { + return AggregateBasic{id: ID, mux: &sync.Mutex{}} +} + +// ID is a getter +func (ab AggregateBasic) ID() uuid.UUID { + return ab.id +} + +// RecordEvent is self-described +func (ab *AggregateBasic) RecordEvent(e Event) { + ab.mux.Lock() + defer ab.mux.Unlock() + ab.events = append(ab.events, e) +} + +// Events is self-described +func (ab *AggregateBasic) Events() []Event { + ab.mux.Lock() + defer ab.mux.Unlock() + e := ab.events + ab.events = []Event{} + return e +}