From adb03b4ed8717d1e55b4c504c5baeed4a6567f44 Mon Sep 17 00:00:00 2001 From: b1ackd0t <28790446+rodneyosodo@users.noreply.github.com> Date: Mon, 26 Feb 2024 18:42:08 +0300 Subject: [PATCH] NOISSUE - Add Event Subscriber Config (#2054) Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> --- cmd/bootstrap/main.go | 18 +- cmd/lora/main.go | 22 +- cmd/opcua/main.go | 22 +- pkg/events/events.go | 9 +- pkg/events/mocks/subscriber.go | 10 +- pkg/events/nats/publisher_test.go | 192 +++++++++------- pkg/events/nats/setup_test.go | 2 +- pkg/events/nats/subscriber.go | 47 ++-- pkg/events/rabbitmq/publisher_test.go | 194 ++++++++-------- pkg/events/rabbitmq/setup_test.go | 2 +- pkg/events/rabbitmq/subscriber.go | 43 ++-- pkg/events/redis/publisher.go | 5 +- pkg/events/redis/publisher_test.go | 211 ++++++++++-------- pkg/events/redis/setup_test.go | 3 - pkg/events/redis/subscriber.go | 55 ++--- .../store/{brokers_nats.go => store_nats.go} | 4 +- ...{brokers_rabbitmq.go => store_rabbitmq.go} | 4 +- .../{brokers_redis.go => store_redis.go} | 4 +- 18 files changed, 448 insertions(+), 399 deletions(-) rename pkg/events/store/{brokers_nats.go => store_nats.go} (76%) rename pkg/events/store/{brokers_rabbitmq.go => store_rabbitmq.go} (76%) rename pkg/events/store/{brokers_redis.go => store_redis.go} (78%) diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index 78200f43cc..61f56096fc 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -28,6 +28,7 @@ import ( httpserver "github.com/absmach/magistrala/internal/server/http" mglog "github.com/absmach/magistrala/logger" "github.com/absmach/magistrala/pkg/auth" + "github.com/absmach/magistrala/pkg/events" "github.com/absmach/magistrala/pkg/events/store" mgsdk "github.com/absmach/magistrala/pkg/sdk/go" "github.com/absmach/magistrala/pkg/uuid" @@ -45,7 +46,7 @@ const ( defDB = "bootstrap" defSvcHTTPPort = "9013" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" streamID = "magistrala.bootstrap" ) @@ -142,6 +143,8 @@ func main() { return } + logger.Info("Subscribed to Event Store") + httpServerConfig := server.Config{Port: defSvcHTTPPort} if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) @@ -197,14 +200,15 @@ func newService(ctx context.Context, authClient magistrala.AuthServiceClient, db } func subscribeToThingsES(ctx context.Context, svc bootstrap.Service, cfg config, logger *slog.Logger) error { - subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger) if err != nil { return err } - handler := consumer.NewEventHandler(svc) - - logger.Info("Subscribed to Redis Event Store") - - return subscriber.Subscribe(ctx, handler) + subConfig := events.SubscriberConfig{ + Stream: thingsStream, + Consumer: cfg.ESConsumerName, + Handler: consumer.NewEventHandler(svc), + } + return subscriber.Subscribe(ctx, subConfig) } diff --git a/cmd/lora/main.go b/cmd/lora/main.go index 348029afc9..78105c3e29 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -23,8 +23,9 @@ import ( mglog "github.com/absmach/magistrala/logger" "github.com/absmach/magistrala/lora" "github.com/absmach/magistrala/lora/api" - "github.com/absmach/magistrala/lora/events" + loraevents "github.com/absmach/magistrala/lora/events" "github.com/absmach/magistrala/lora/mqtt" + "github.com/absmach/magistrala/pkg/events" "github.com/absmach/magistrala/pkg/events/store" "github.com/absmach/magistrala/pkg/messaging" "github.com/absmach/magistrala/pkg/messaging/brokers" @@ -44,7 +45,7 @@ const ( thingsRMPrefix = "thing" channelsRMPrefix = "channel" connsRMPrefix = "connection" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" ) type config struct { @@ -147,6 +148,8 @@ func main() { return } + logger.Info("Subscribed to Event Store") + hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(cfg.InstanceID), logger) if cfg.SendTelemetry { @@ -198,21 +201,22 @@ func subscribeToLoRaBroker(svc lora.Service, mc mqttpaho.Client, timeout time.Du } func subscribeToThingsES(ctx context.Context, svc lora.Service, cfg config, logger *slog.Logger) error { - subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger) if err != nil { return err } - handler := events.NewEventHandler(svc) - - logger.Info("Subscribed to Redis Event Store") - - return subscriber.Subscribe(ctx, handler) + subConfig := events.SubscriberConfig{ + Stream: thingsStream, + Consumer: cfg.ESConsumerName, + Handler: loraevents.NewEventHandler(svc), + } + return subscriber.Subscribe(ctx, subConfig) } func newRouteMapRepository(client *redis.Client, prefix string, logger *slog.Logger) lora.RouteMapRepository { logger.Info(fmt.Sprintf("Connected to %s Redis Route-map", prefix)) - return events.NewRouteMapRepository(client, prefix) + return loraevents.NewRouteMapRepository(client, prefix) } func newService(pub messaging.Publisher, rmConn *redis.Client, thingsRMPrefix, channelsRMPrefix, connsRMPrefix string, logger *slog.Logger) lora.Service { diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index 2f4f63af3e..54607d0dd9 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -23,8 +23,9 @@ import ( "github.com/absmach/magistrala/opcua" "github.com/absmach/magistrala/opcua/api" "github.com/absmach/magistrala/opcua/db" - "github.com/absmach/magistrala/opcua/events" + opcuaevents "github.com/absmach/magistrala/opcua/events" "github.com/absmach/magistrala/opcua/gopcua" + "github.com/absmach/magistrala/pkg/events" "github.com/absmach/magistrala/pkg/events/store" "github.com/absmach/magistrala/pkg/messaging/brokers" brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing" @@ -43,7 +44,7 @@ const ( channelsRMPrefix = "channel" connectionRMPrefix = "connection" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" ) type config struct { @@ -142,6 +143,8 @@ func main() { return } + logger.Info("Subscribed to Event Store") + hs := httpserver.New(ctx, httpCancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, cfg.InstanceID), logger) if cfg.SendTelemetry { @@ -181,21 +184,22 @@ func subscribeToStoredSubs(ctx context.Context, sub opcua.Subscriber, cfg opcua. } func subscribeToThingsES(ctx context.Context, svc opcua.Service, cfg config, logger *slog.Logger) error { - subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger) if err != nil { return err } - handler := events.NewEventHandler(svc) - - logger.Info("Subscribed to Redis Event Store") - - return subscriber.Subscribe(ctx, handler) + subConfig := events.SubscriberConfig{ + Stream: thingsStream, + Consumer: cfg.ESConsumerName, + Handler: opcuaevents.NewEventHandler(svc), + } + return subscriber.Subscribe(ctx, subConfig) } func newRouteMapRepositoy(client *redis.Client, prefix string, logger *slog.Logger) opcua.RouteMapRepository { logger.Info(fmt.Sprintf("Connected to %s Redis Route-map", prefix)) - return events.NewRouteMapRepository(client, prefix) + return opcuaevents.NewRouteMapRepository(client, prefix) } func newService(sub opcua.Subscriber, browser opcua.Browser, thingRM, chanRM, connRM opcua.RouteMapRepository, opcuaConfig opcua.Config, logger *slog.Logger) opcua.Service { diff --git a/pkg/events/events.go b/pkg/events/events.go index f0281cd061..626bef11d3 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -38,12 +38,19 @@ type EventHandler interface { Handle(ctx context.Context, event Event) error } +// SubscriberConfig represents event subscriber configuration. +type SubscriberConfig struct { + Consumer string + Stream string + Handler EventHandler +} + // Subscriber specifies event subscription API. // //go:generate mockery --name Subscriber --output=./mocks --filename subscriber.go --quiet --note "Copyright (c) Abstract Machines" type Subscriber interface { // Subscribe subscribes to the event stream and consumes events. - Subscribe(ctx context.Context, handler EventHandler) error + Subscribe(ctx context.Context, cfg SubscriberConfig) error // Close gracefully closes event subscriber's connection. Close() error diff --git a/pkg/events/mocks/subscriber.go b/pkg/events/mocks/subscriber.go index 38d4002387..4de1ceb50f 100644 --- a/pkg/events/mocks/subscriber.go +++ b/pkg/events/mocks/subscriber.go @@ -34,17 +34,17 @@ func (_m *Subscriber) Close() error { return r0 } -// Subscribe provides a mock function with given fields: ctx, handler -func (_m *Subscriber) Subscribe(ctx context.Context, handler events.EventHandler) error { - ret := _m.Called(ctx, handler) +// Subscribe provides a mock function with given fields: ctx, cfg +func (_m *Subscriber) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { + ret := _m.Called(ctx, cfg) if len(ret) == 0 { panic("no return value specified for Subscribe") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, events.EventHandler) error); ok { - r0 = rf(ctx, handler) + if rf, ok := ret.Get(0).(func(context.Context, events.SubscriberConfig) error); ok { + r0 = rf(ctx, cfg) } else { r0 = ret.Error(0) } diff --git a/pkg/events/nats/publisher_test.go b/pkg/events/nats/publisher_test.go index ef69550644..20086ea552 100644 --- a/pkg/events/nats/publisher_test.go +++ b/pkg/events/nats/publisher_test.go @@ -19,11 +19,10 @@ import ( ) var ( - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") - numEvents = 100 + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") + numEvents = 100 ) type testEvent struct { @@ -56,14 +55,21 @@ func TestPublish(t *testing.T) { publisher, err := nats.NewPublisher(context.Background(), natsURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() - _, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", stream, consumer, logger) + _, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger) + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: "events." + stream, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -124,107 +130,116 @@ func TestPublish(t *testing.T) { } for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(context.Background(), event) - switch tc.err { - case nil: - receivedEvent := <-eventsChan - - val := int64(receivedEvent["occurred_at"].(float64)) - if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") + t.Run(tc.desc, func(t *testing.T) { + event := testEvent{Data: tc.event} + + err := publisher.Publish(context.Background(), event) + switch tc.err { + case nil: + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(tc.event, "occurred_at") + } + + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) + default: + assert.ErrorContains(t, err, tc.err.Error()) } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, tc.event["status"], receivedEvent["status"]) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) - default: - assert.ErrorContains(t, err, tc.err.Error()) - } + }) } } func TestPubsub(t *testing.T) { - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: nats.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: nats.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: nats.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: nats.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: "", - errorMessage: nats.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: nats.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := nats.NewSubscriber(context.Background(), natsURL, pc.stream, pc.consumer, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) + if err != nil { + assert.Equal(t, err, tc.err) - continue - } + return + } - switch err := subcriber.Subscribe(context.Background(), pc.handler); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + cfg := events.SubscriberConfig{ + Stream: tc.stream, + Consumer: tc.consumer, + Handler: tc.handler, + } + switch err := subcriber.Subscribe(context.Background(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, tc.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } @@ -232,10 +247,15 @@ func TestUnavailablePublish(t *testing.T) { publisher, err := nats.NewPublisher(context.Background(), natsURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger) + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: "events." + stream, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) err = pool.Client.PauseContainer(container.Container.ID) diff --git a/pkg/events/nats/setup_test.go b/pkg/events/nats/setup_test.go index 143e4863b9..e539aca537 100644 --- a/pkg/events/nats/setup_test.go +++ b/pkg/events/nats/setup_test.go @@ -52,7 +52,7 @@ func TestMain(m *testing.M) { } if err := pool.Retry(func() error { - _, err = nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger) + _, err = nats.NewSubscriber(context.Background(), natsURL, logger) return err }); err != nil { log.Fatalf("Could not connect to docker: %s", err) diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go index 9154678fb5..ca99f83123 100644 --- a/pkg/events/nats/subscriber.go +++ b/pkg/events/nats/subscriber.go @@ -18,9 +18,7 @@ import ( "github.com/nats-io/nats.go/jetstream" ) -const ( - maxReconnects = -1 -) +const maxReconnects = -1 var _ events.Subscriber = (*subEventStore)(nil) @@ -47,22 +45,12 @@ var ( ) type subEventStore struct { - conn *nats.Conn - pubsub messaging.PubSub - stream string - consumer string - logger *slog.Logger + conn *nats.Conn + pubsub messaging.PubSub + logger *slog.Logger } -func NewSubscriber(ctx context.Context, url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - if stream == "" { - return nil, ErrEmptyStream - } - - if consumer == "" { - return nil, ErrEmptyConsumer - } - +func NewSubscriber(ctx context.Context, url string, logger *slog.Logger) (events.Subscriber, error) { conn, err := nats.Connect(url, nats.MaxReconnects(maxReconnects)) if err != nil { return nil, err @@ -82,20 +70,25 @@ func NewSubscriber(ctx context.Context, url, stream, consumer string, logger *sl } return &subEventStore{ - conn: conn, - pubsub: pubsub, - stream: stream, - consumer: consumer, - logger: logger, + conn: conn, + pubsub: pubsub, + logger: logger, }, nil } -func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error { +func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { + if cfg.Stream == "" { + return ErrEmptyStream + } + if cfg.Consumer == "" { + return ErrEmptyConsumer + } + subCfg := messaging.SubscriberConfig{ - ID: es.consumer, - Topic: eventsPrefix + "." + es.stream, + ID: cfg.Consumer, + Topic: cfg.Stream, Handler: &eventHandler{ - handler: handler, + handler: cfg.Handler, ctx: ctx, logger: es.logger, }, @@ -134,7 +127,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error { } if err := eh.handler.Handle(eh.ctx, event); err != nil { - eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + eh.logger.Warn(fmt.Sprintf("failed to handle nats event: %s", err)) } return nil diff --git a/pkg/events/rabbitmq/publisher_test.go b/pkg/events/rabbitmq/publisher_test.go index 252784a3c0..f14534654b 100644 --- a/pkg/events/rabbitmq/publisher_test.go +++ b/pkg/events/rabbitmq/publisher_test.go @@ -19,11 +19,10 @@ import ( ) var ( - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") - numEvents = 100 + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") + numEvents = 100 ) type testEvent struct { @@ -56,14 +55,21 @@ func TestPublish(t *testing.T) { publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() - _, err = rabbitmq.NewSubscriber("http://invaliurl.com", stream, consumer, logger) + _, err = rabbitmq.NewSubscriber("http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger) + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: "events." + stream, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -124,108 +130,117 @@ func TestPublish(t *testing.T) { } for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(context.Background(), event) - switch tc.err { - case nil: - receivedEvent := <-eventsChan - - val := int64(receivedEvent["occurred_at"].(float64)) - if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") + t.Run(tc.desc, func(t *testing.T) { + event := testEvent{Data: tc.event} + + err := publisher.Publish(context.Background(), event) + switch tc.err { + case nil: + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(tc.event, "occurred_at") + } + + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) + + default: + assert.ErrorContains(t, err, tc.err.Error()) } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, tc.event["status"], receivedEvent["status"]) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) - - default: - assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) - } + }) } } func TestPubsub(t *testing.T) { - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: rabbitmq.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: rabbitmq.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: rabbitmq.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: rabbitmq.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: "", - errorMessage: rabbitmq.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: rabbitmq.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, pc.stream, pc.consumer, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) + if err != nil { + assert.Equal(t, err, tc.err) - continue - } + return + } - switch err := subcriber.Subscribe(context.Background(), pc.handler); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + cfg := events.SubscriberConfig{ + Stream: tc.stream, + Consumer: tc.consumer, + Handler: tc.handler, + } + switch err := subcriber.Subscribe(context.Background(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, tc.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } @@ -233,10 +248,15 @@ func TestUnavailablePublish(t *testing.T) { publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger) + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: "events." + stream, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) err = pool.Client.PauseContainer(container.Container.ID) diff --git a/pkg/events/rabbitmq/setup_test.go b/pkg/events/rabbitmq/setup_test.go index 4db3c7c254..dcbf066afd 100644 --- a/pkg/events/rabbitmq/setup_test.go +++ b/pkg/events/rabbitmq/setup_test.go @@ -51,7 +51,7 @@ func TestMain(m *testing.M) { } if err := pool.Retry(func() error { - _, err = rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger) + _, err = rabbitmq.NewSubscriber(rabbitmqURL, logger) return err }); err != nil { log.Fatalf("Could not connect to docker: %s", err) diff --git a/pkg/events/rabbitmq/subscriber.go b/pkg/events/rabbitmq/subscriber.go index 88d2713f44..bba6b16317 100644 --- a/pkg/events/rabbitmq/subscriber.go +++ b/pkg/events/rabbitmq/subscriber.go @@ -30,22 +30,12 @@ var ( ) type subEventStore struct { - conn *amqp.Connection - pubsub messaging.PubSub - stream string - consumer string - logger *slog.Logger + conn *amqp.Connection + pubsub messaging.PubSub + logger *slog.Logger } -func NewSubscriber(url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - if stream == "" { - return nil, ErrEmptyStream - } - - if consumer == "" { - return nil, ErrEmptyConsumer - } - +func NewSubscriber(url string, logger *slog.Logger) (events.Subscriber, error) { conn, err := amqp.Dial(url) if err != nil { return nil, err @@ -64,20 +54,25 @@ func NewSubscriber(url, stream, consumer string, logger *slog.Logger) (events.Su } return &subEventStore{ - conn: conn, - pubsub: pubsub, - stream: stream, - consumer: consumer, - logger: logger, + conn: conn, + pubsub: pubsub, + logger: logger, }, nil } -func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error { +func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { + if cfg.Stream == "" { + return ErrEmptyStream + } + if cfg.Consumer == "" { + return ErrEmptyConsumer + } + subCfg := messaging.SubscriberConfig{ - ID: es.consumer, - Topic: eventsPrefix + "." + es.stream, + ID: cfg.Consumer, + Topic: cfg.Stream, Handler: &eventHandler{ - handler: handler, + handler: cfg.Handler, ctx: ctx, logger: es.logger, }, @@ -116,7 +111,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error { } if err := eh.handler.Handle(eh.ctx, event); err != nil { - eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + eh.logger.Warn(fmt.Sprintf("failed to handle rabbitmq event: %s", err)) } return nil diff --git a/pkg/events/redis/publisher.go b/pkg/events/redis/publisher.go index a6f5c90aca..e6a29626c0 100644 --- a/pkg/events/redis/publisher.go +++ b/pkg/events/redis/publisher.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis import ( @@ -32,7 +29,7 @@ func NewPublisher(ctx context.Context, url, stream string, flushPeriod time.Dura es := &pubEventStore{ client: redis.NewClient(opts), unpublishedEvents: make(chan *redis.XAddArgs, events.MaxUnpublishedEvents), - stream: stream, + stream: eventsPrefix + stream, flushPeriod: flushPeriod, } diff --git a/pkg/events/redis/publisher_test.go b/pkg/events/redis/publisher_test.go index 9bf226dd76..ca1320a4b0 100644 --- a/pkg/events/redis/publisher_test.go +++ b/pkg/events/redis/publisher_test.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis_test import ( @@ -23,13 +20,12 @@ import ( ) var ( - streamName = "magistrala.eventstest" - consumer = "test-consumer" - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") - numEvents = 100 + stream = "tests.events" + consumer = "test-consumer" + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") + numEvents = 100 ) type testEvent struct { @@ -60,19 +56,26 @@ func TestPublish(t *testing.T) { err := redisClient.FlushAll(context.Background()).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) - _, err = redis.NewPublisher(context.Background(), "http://invaliurl.com", streamName, events.UnpublishedEventsCheckInterval) + _, err = redis.NewPublisher(context.Background(), "http://invaliurl.com", stream, events.UnpublishedEventsCheckInterval) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, events.UnpublishedEventsCheckInterval) + publisher, err := redis.NewPublisher(context.Background(), redisURL, stream, events.UnpublishedEventsCheckInterval) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() - _, err = redis.NewSubscriber("http://invaliurl.com", streamName, consumer, logger) + _, err = redis.NewSubscriber("http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - subcriber, err := redis.NewSubscriber(redisURL, streamName, consumer, logger) + subcriber, err := redis.NewSubscriber(redisURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: "events." + stream, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -133,30 +136,32 @@ func TestPublish(t *testing.T) { } for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(context.Background(), event) - switch tc.err { - case nil: - receivedEvent := <-eventsChan - - roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64) - assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) - if assert.WithinRange(t, time.Unix(0, roa), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") + t.Run(tc.desc, func(t *testing.T) { + event := testEvent{Data: tc.event} + + err := publisher.Publish(context.Background(), event) + switch tc.err { + case nil: + receivedEvent := <-eventsChan + + roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64) + assert.Nil(t, err) + if assert.WithinRange(t, time.Unix(0, roa), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(tc.event, "occurred_at") + } + + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) + + default: + assert.ErrorContains(t, err, tc.err.Error()) } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, tc.event["status"], receivedEvent["status"]) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) - - default: - assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) - } + }) } } @@ -164,94 +169,104 @@ func TestPubsub(t *testing.T) { err := redisClient.FlushAll(context.Background()).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: redis.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: redis.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: redis.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: redis.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: "", - errorMessage: redis.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: redis.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := redis.NewSubscriber(redisURL, pc.stream, pc.consumer, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - - continue - } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + subcriber, err := redis.NewSubscriber(redisURL, logger) + if err != nil { + assert.Equal(t, err, tc.err) - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + return + } - switch err := subcriber.Subscribe(context.Background(), pc.handler); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + cfg := events.SubscriberConfig{ + Stream: tc.stream, + Consumer: tc.consumer, + Handler: tc.handler, + } + switch err := subcriber.Subscribe(context.Background(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, tc.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } func TestUnavailablePublish(t *testing.T) { - publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, time.Second) + publisher, err := redis.NewPublisher(context.Background(), redisURL, stream, time.Second) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - subcriber, err := redis.NewSubscriber(redisURL, streamName, consumer, logger) + subcriber, err := redis.NewSubscriber(redisURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: "events." + stream, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) err = pool.Client.PauseContainer(container.Container.ID) diff --git a/pkg/events/redis/setup_test.go b/pkg/events/redis/setup_test.go index 541cb2a3d1..719e0996c3 100644 --- a/pkg/events/redis/setup_test.go +++ b/pkg/events/redis/setup_test.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis_test import ( diff --git a/pkg/events/redis/subscriber.go b/pkg/events/redis/subscriber.go index 64d1724d2e..910ecca348 100644 --- a/pkg/events/redis/subscriber.go +++ b/pkg/events/redis/subscriber.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis import ( @@ -17,9 +14,10 @@ import ( ) const ( - eventCount = 100 - exists = "BUSYGROUP Consumer Group name already exists" - group = "magistrala" + eventsPrefix = "events." + eventCount = 100 + exists = "BUSYGROUP Consumer Group name already exists" + group = "magistrala" ) var _ events.Subscriber = (*subEventStore)(nil) @@ -33,36 +31,31 @@ var ( ) type subEventStore struct { - client *redis.Client - stream string - consumer string - logger *slog.Logger + client *redis.Client + logger *slog.Logger } -func NewSubscriber(url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - if stream == "" { - return nil, ErrEmptyStream - } - - if consumer == "" { - return nil, ErrEmptyConsumer - } - +func NewSubscriber(url string, logger *slog.Logger) (events.Subscriber, error) { opts, err := redis.ParseURL(url) if err != nil { return nil, err } return &subEventStore{ - client: redis.NewClient(opts), - stream: stream, - consumer: consumer, - logger: logger, + client: redis.NewClient(opts), + logger: logger, }, nil } -func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error { - err := es.client.XGroupCreateMkStream(ctx, es.stream, group, "$").Err() +func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { + if cfg.Stream == "" { + return ErrEmptyStream + } + if cfg.Consumer == "" { + return ErrEmptyConsumer + } + + err := es.client.XGroupCreateMkStream(ctx, cfg.Stream, group, "$").Err() if err != nil && err.Error() != exists { return err } @@ -71,12 +64,12 @@ func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHand for { msgs, err := es.client.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: group, - Consumer: es.consumer, - Streams: []string{es.stream, ">"}, + Consumer: cfg.Consumer, + Streams: []string{cfg.Stream, ">"}, Count: eventCount, }).Result() if err != nil { - es.logger.Warn(fmt.Sprintf("failed to read from Redis stream: %s", err)) + es.logger.Warn(fmt.Sprintf("failed to read from redis stream: %s", err)) continue } @@ -84,7 +77,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHand continue } - es.handle(ctx, msgs[0].Messages, handler) + es.handle(ctx, cfg.Stream, msgs[0].Messages, cfg.Handler) } }() @@ -103,7 +96,7 @@ func (re redisEvent) Encode() (map[string]interface{}, error) { return re.Data, nil } -func (es *subEventStore) handle(ctx context.Context, msgs []redis.XMessage, h events.EventHandler) { +func (es *subEventStore) handle(ctx context.Context, stream string, msgs []redis.XMessage, h events.EventHandler) { for _, msg := range msgs { event := redisEvent{ Data: msg.Values, @@ -115,7 +108,7 @@ func (es *subEventStore) handle(ctx context.Context, msgs []redis.XMessage, h ev return } - if err := es.client.XAck(ctx, es.stream, group, msg.ID).Err(); err != nil { + if err := es.client.XAck(ctx, stream, group, msg.ID).Err(); err != nil { es.logger.Warn(fmt.Sprintf("failed to ack redis event: %s", err)) return diff --git a/pkg/events/store/brokers_nats.go b/pkg/events/store/store_nats.go similarity index 76% rename from pkg/events/store/brokers_nats.go rename to pkg/events/store/store_nats.go index 282fc6ad5e..e344253bb2 100644 --- a/pkg/events/store/brokers_nats.go +++ b/pkg/events/store/store_nats.go @@ -28,8 +28,8 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er return pb, nil } -func NewSubscriber(ctx context.Context, url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - pb, err := nats.NewSubscriber(ctx, url, stream, consumer, logger) +func NewSubscriber(ctx context.Context, url string, logger *slog.Logger) (events.Subscriber, error) { + pb, err := nats.NewSubscriber(ctx, url, logger) if err != nil { return nil, err } diff --git a/pkg/events/store/brokers_rabbitmq.go b/pkg/events/store/store_rabbitmq.go similarity index 76% rename from pkg/events/store/brokers_rabbitmq.go rename to pkg/events/store/store_rabbitmq.go index bf895502f2..0af15e0d70 100644 --- a/pkg/events/store/brokers_rabbitmq.go +++ b/pkg/events/store/store_rabbitmq.go @@ -28,8 +28,8 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er return pb, nil } -func NewSubscriber(_ context.Context, url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - pb, err := rabbitmq.NewSubscriber(url, stream, consumer, logger) +func NewSubscriber(_ context.Context, url string, logger *slog.Logger) (events.Subscriber, error) { + pb, err := rabbitmq.NewSubscriber(url, logger) if err != nil { return nil, err } diff --git a/pkg/events/store/brokers_redis.go b/pkg/events/store/store_redis.go similarity index 78% rename from pkg/events/store/brokers_redis.go rename to pkg/events/store/store_redis.go index 711310123e..136d01b794 100644 --- a/pkg/events/store/brokers_redis.go +++ b/pkg/events/store/store_redis.go @@ -28,8 +28,8 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er return pb, nil } -func NewSubscriber(_ context.Context, url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - pb, err := redis.NewSubscriber(url, stream, consumer, logger) +func NewSubscriber(_ context.Context, url string, logger *slog.Logger) (events.Subscriber, error) { + pb, err := redis.NewSubscriber(url, logger) if err != nil { return nil, err }