diff --git a/internal/event/event.go b/internal/event/event.go new file mode 100644 index 0000000..d7dcea5 --- /dev/null +++ b/internal/event/event.go @@ -0,0 +1,32 @@ +package event + +type Event[T any] struct { + cap int + listeners []chan T +} + +func NewEvent[T any](cap int) *Event[T] { + return &Event[T]{cap: cap} +} + +func (e *Event[T]) Close() { + for _, c := range e.listeners { + close(c) + } +} + +func (e *Event[T]) Listen() <-chan T { + c := make(chan T, e.cap) + e.listeners = append(e.listeners, c) + return c +} + +func (e *Event[T]) Send(t T) { + go func() { + defer func() { _ = recover() }() + + for _, c := range e.listeners { + c <- t + } + }() +} diff --git a/user/adapter/prom_probe.go b/user/adapter/prom_probe.go index 3278434..f431983 100644 --- a/user/adapter/prom_probe.go +++ b/user/adapter/prom_probe.go @@ -1,6 +1,7 @@ package adapter import ( + "app/internal/event" "app/user" "context" @@ -11,13 +12,15 @@ import ( type PromProbe struct { logger *zap.Logger + userCreatedEvent *event.Event[user.UserCreated] usersCreated prometheus.Counter usersCreateFailed prometheus.Counter } -func NewPromProbe(logger *zap.Logger) *PromProbe { +func NewPromProbe(logger *zap.Logger, userCreatedEvent *event.Event[user.UserCreated]) *PromProbe { return &PromProbe{ logger: logger, + userCreatedEvent: userCreatedEvent, usersCreated: promauto.NewCounter(prometheus.CounterOpts{Name: "users_created"}), usersCreateFailed: promauto.NewCounter(prometheus.CounterOpts{Name: "users_create_fail"}), } @@ -45,6 +48,6 @@ func (p *PromProbe) FailedToEnqueue(err error) { } func (p *PromProbe) UserCreated(_ context.Context, u user.User) { - p.logger.Info("user created", zap.String("name", u.Name)) p.usersCreated.Inc() + p.userCreatedEvent.Send(user.NewUserCreated(u)) } diff --git a/user/event.go b/user/event.go new file mode 100644 index 0000000..cbda04d --- /dev/null +++ b/user/event.go @@ -0,0 +1,9 @@ +package user + +type UserCreated struct { + User User `json:"user"` +} + +func NewUserCreated(u User) UserCreated { + return UserCreated{User: u} +} diff --git a/user/module/module.go b/user/module/module.go index 48beee7..11ec17a 100644 --- a/user/module/module.go +++ b/user/module/module.go @@ -1,10 +1,12 @@ package user import ( + "app/internal/event" "app/user" "app/user/adapter" "app/user/http" "app/user/queue" + "context" "go.uber.org/fx" ) @@ -25,4 +27,23 @@ var Module = fx.Module( fx.Provide(func(probe *adapter.PromProbe) user.Probe { return probe }), fx.Provide(http.NewController), + + fx.Provide(func(lifecycle fx.Lifecycle) *event.Event[user.UserCreated] { + e := event.NewEvent[user.UserCreated](10) + lifecycle.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + e.Close() + return nil + }, + }) + return e + }), + + fx.Invoke(func(event *event.Event[user.UserCreated], greeter *queue.Greeter) { + go func() { + for userCreatedEvent := range event.Listen() { + _ = greeter.Enqueue(userCreatedEvent.User.Name) + } + }() + }), ) diff --git a/user/service.go b/user/service.go index bdc09b7..7cf4166 100644 --- a/user/service.go +++ b/user/service.go @@ -1,19 +1,16 @@ package user import ( - "app/user/queue" - "context" ) type Service struct { - repo Repository - greeter *queue.Greeter - probe Probe + repo Repository + probe Probe } -func NewUserService(repo Repository, greeter *queue.Greeter, probe Probe) *Service { - return &Service{repo: repo, greeter: greeter, probe: probe} +func NewUserService(repo Repository, probe Probe) *Service { + return &Service{repo: repo, probe: probe} } func (service *Service) CreateUser(ctx context.Context, name string) (User, error) { @@ -25,10 +22,6 @@ func (service *Service) CreateUser(ctx context.Context, name string) (User, erro } service.probe.UserCreated(ctx, user) - if err := service.greeter.Enqueue(name); err != nil { - service.probe.FailedToEnqueue(err) - } - return user, err }