Skip to content

Commit

Permalink
feat(maestro): enhanced readability of sinker activity listener code.
Browse files Browse the repository at this point in the history
  • Loading branch information
Luiz Pegoraro committed Sep 28, 2023
1 parent 738ce30 commit a5354fe
Showing 1 changed file with 60 additions and 35 deletions.
95 changes: 60 additions & 35 deletions maestro/redis/consumer/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/go-redis/redis/v8"
maestroredis "github.com/orb-community/orb/maestro/redis"
"github.com/orb-community/orb/maestro/service"
redis2 "github.com/orb-community/orb/sinks/redis"
"go.uber.org/zap"
)

Expand All @@ -29,7 +28,6 @@ func NewSinkerActivityListener(l *zap.Logger, eventService service.EventService,
}
}

// SubscribeSinksEvents will listen to both sink_activity and sink_idle stream and handle each message separately
func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context) error {
//listening sinker events
err := s.redisClient.XGroupCreateMkStream(ctx, maestroredis.SinksActivityStream, maestroredis.GroupMaestro, "$").Err()
Expand All @@ -41,45 +39,72 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context
if err != nil && err.Error() != maestroredis.Exists {
return err
}
s.logger.Debug("Reading Sinker Events", zap.String("stream", redis2.StreamSinks))
s.logger.Debug("Reading Sinker Events", zap.String("stream", maestroredis.SinksIdleStream), zap.String("stream", maestroredis.SinksActivityStream))
for {
const activityStream = "orb.sink_activity"
const idleStream = "orb.sink_idle"
streams, err := s.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: maestroredis.GroupMaestro,
Consumer: "orb_maestro-es-consumer",
Streams: []string{activityStream, idleStream, ">"},
}).Result()
streams, err := s.readStreams(ctx)
if err != nil || len(streams) == 0 {
continue
}
for _, str := range streams {
go func(stream redis.XStream) {
if stream.Stream == activityStream {
for _, message := range stream.Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(message.Values)
s.logger.Debug("Reading message from activity stream", zap.String("message_id", message.ID),
zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkActivity(ctx, event)
if err != nil {
s.logger.Error("error receiving message", zap.Error(err))
}
}
} else if stream.Stream == idleStream {
for _, message := range stream.Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(message.Values)
s.logger.Debug("Reading message from idle stream", zap.String("message_id", message.ID),
zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkIdle(ctx, event)
if err != nil {
s.logger.Error("error receiving message", zap.Error(err))
}
}
}
}(str)
go s.processStream(ctx, str)
}
}
}

// createStreamIfNotExists - create stream if not exists
func (s *sinkerActivityListenerService) createStreamIfNotExists(ctx context.Context, streamName string) error {
err := s.redisClient.XGroupCreateMkStream(ctx, streamName, maestroredis.GroupMaestro, "$").Err()
if err != nil && err.Error() != maestroredis.Exists {
return err
}
return nil
}

// readStreams - read streams
func (s *sinkerActivityListenerService) readStreams(ctx context.Context) ([]redis.XStream, error) {
const activityStream = "orb.sink_activity"
const idleStream = "orb.sink_idle"
streams, err := s.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: maestroredis.GroupMaestro,
Consumer: "orb_maestro-es-consumer",
Streams: []string{activityStream, idleStream, ">"},
}).Result()
if err != nil {
return nil, err
}
return streams, nil
}

// processStream - process stream
func (s *sinkerActivityListenerService) processStream(ctx context.Context, stream redis.XStream) {
eventType := ""
if stream.Stream == "orb.sink_activity" {
eventType = "activity"
} else if stream.Stream == "orb.sink_idle" {
eventType = "idle"
}
for _, message := range stream.Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(message.Values)
switch eventType {
case "activity":
s.logger.Debug("Reading message from activity stream",
zap.String("message_id", message.ID),
zap.String("sink_id", event.SinkID),
zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkActivity(ctx, event)
if err != nil {
s.logger.Error("error receiving message", zap.Error(err))
}
case "idle":
s.logger.Debug("Reading message from idle stream",
zap.String("message_id", message.ID),
zap.String("sink_id", event.SinkID),
zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkIdle(ctx, event)
if err != nil {
s.logger.Error("error receiving message", zap.Error(err))
}
}
}
}

0 comments on commit a5354fe

Please sign in to comment.