Skip to content

Commit

Permalink
NOISSUE - Add Event Subscriber Config (absmach#2054)
Browse files Browse the repository at this point in the history
Signed-off-by: Rodney Osodo <[email protected]>
  • Loading branch information
rodneyosodo authored Feb 26, 2024
1 parent eb90526 commit adb03b4
Show file tree
Hide file tree
Showing 18 changed files with 448 additions and 399 deletions.
18 changes: 11 additions & 7 deletions cmd/bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
httpserver "github.com/absmach/magistrala/internal/server/http"
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/pkg/auth"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
mgsdk "github.com/absmach/magistrala/pkg/sdk/go"
"github.com/absmach/magistrala/pkg/uuid"
Expand All @@ -45,7 +46,7 @@ const (
defDB = "bootstrap"
defSvcHTTPPort = "9013"

thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
streamID = "magistrala.bootstrap"
)

Expand Down Expand Up @@ -142,6 +143,8 @@ func main() {
return
}

logger.Info("Subscribed to Event Store")

httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
Expand Down Expand Up @@ -197,14 +200,15 @@ func newService(ctx context.Context, authClient magistrala.AuthServiceClient, db
}

func subscribeToThingsES(ctx context.Context, svc bootstrap.Service, cfg config, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger)
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
return err
}

handler := consumer.NewEventHandler(svc)

logger.Info("Subscribed to Redis Event Store")

return subscriber.Subscribe(ctx, handler)
subConfig := events.SubscriberConfig{
Stream: thingsStream,
Consumer: cfg.ESConsumerName,
Handler: consumer.NewEventHandler(svc),
}
return subscriber.Subscribe(ctx, subConfig)
}
22 changes: 13 additions & 9 deletions cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/lora"
"github.com/absmach/magistrala/lora/api"
"github.com/absmach/magistrala/lora/events"
loraevents "github.com/absmach/magistrala/lora/events"
"github.com/absmach/magistrala/lora/mqtt"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/absmach/magistrala/pkg/messaging/brokers"
Expand All @@ -44,7 +45,7 @@ const (
thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
connsRMPrefix = "connection"
thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
)

type config struct {
Expand Down Expand Up @@ -147,6 +148,8 @@ func main() {
return
}

logger.Info("Subscribed to Event Store")

hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(cfg.InstanceID), logger)

if cfg.SendTelemetry {
Expand Down Expand Up @@ -198,21 +201,22 @@ func subscribeToLoRaBroker(svc lora.Service, mc mqttpaho.Client, timeout time.Du
}

func subscribeToThingsES(ctx context.Context, svc lora.Service, cfg config, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger)
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
return err
}

handler := events.NewEventHandler(svc)

logger.Info("Subscribed to Redis Event Store")

return subscriber.Subscribe(ctx, handler)
subConfig := events.SubscriberConfig{
Stream: thingsStream,
Consumer: cfg.ESConsumerName,
Handler: loraevents.NewEventHandler(svc),
}
return subscriber.Subscribe(ctx, subConfig)
}

func newRouteMapRepository(client *redis.Client, prefix string, logger *slog.Logger) lora.RouteMapRepository {
logger.Info(fmt.Sprintf("Connected to %s Redis Route-map", prefix))
return events.NewRouteMapRepository(client, prefix)
return loraevents.NewRouteMapRepository(client, prefix)
}

func newService(pub messaging.Publisher, rmConn *redis.Client, thingsRMPrefix, channelsRMPrefix, connsRMPrefix string, logger *slog.Logger) lora.Service {
Expand Down
22 changes: 13 additions & 9 deletions cmd/opcua/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"github.com/absmach/magistrala/opcua"
"github.com/absmach/magistrala/opcua/api"
"github.com/absmach/magistrala/opcua/db"
"github.com/absmach/magistrala/opcua/events"
opcuaevents "github.com/absmach/magistrala/opcua/events"
"github.com/absmach/magistrala/opcua/gopcua"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
Expand All @@ -43,7 +44,7 @@ const (
channelsRMPrefix = "channel"
connectionRMPrefix = "connection"

thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
)

type config struct {
Expand Down Expand Up @@ -142,6 +143,8 @@ func main() {
return
}

logger.Info("Subscribed to Event Store")

hs := httpserver.New(ctx, httpCancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, cfg.InstanceID), logger)

if cfg.SendTelemetry {
Expand Down Expand Up @@ -181,21 +184,22 @@ func subscribeToStoredSubs(ctx context.Context, sub opcua.Subscriber, cfg opcua.
}

func subscribeToThingsES(ctx context.Context, svc opcua.Service, cfg config, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger)
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
return err
}

handler := events.NewEventHandler(svc)

logger.Info("Subscribed to Redis Event Store")

return subscriber.Subscribe(ctx, handler)
subConfig := events.SubscriberConfig{
Stream: thingsStream,
Consumer: cfg.ESConsumerName,
Handler: opcuaevents.NewEventHandler(svc),
}
return subscriber.Subscribe(ctx, subConfig)
}

func newRouteMapRepositoy(client *redis.Client, prefix string, logger *slog.Logger) opcua.RouteMapRepository {
logger.Info(fmt.Sprintf("Connected to %s Redis Route-map", prefix))
return events.NewRouteMapRepository(client, prefix)
return opcuaevents.NewRouteMapRepository(client, prefix)
}

func newService(sub opcua.Subscriber, browser opcua.Browser, thingRM, chanRM, connRM opcua.RouteMapRepository, opcuaConfig opcua.Config, logger *slog.Logger) opcua.Service {
Expand Down
9 changes: 8 additions & 1 deletion pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,19 @@ type EventHandler interface {
Handle(ctx context.Context, event Event) error
}

// SubscriberConfig represents event subscriber configuration.
type SubscriberConfig struct {
Consumer string
Stream string
Handler EventHandler
}

// Subscriber specifies event subscription API.
//
//go:generate mockery --name Subscriber --output=./mocks --filename subscriber.go --quiet --note "Copyright (c) Abstract Machines"
type Subscriber interface {
// Subscribe subscribes to the event stream and consumes events.
Subscribe(ctx context.Context, handler EventHandler) error
Subscribe(ctx context.Context, cfg SubscriberConfig) error

// Close gracefully closes event subscriber's connection.
Close() error
Expand Down
10 changes: 5 additions & 5 deletions pkg/events/mocks/subscriber.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit adb03b4

Please sign in to comment.