Skip to content

Commit

Permalink
Merge pull request #2677 from lpegoraro/fix-eng-1118-fix-sql
Browse files Browse the repository at this point in the history
fix(maestro): change reading method
  • Loading branch information
lpegoraro authored Oct 2, 2023
2 parents dae1111 + 213310d commit 1f2a18b
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 21 deletions.
120 changes: 101 additions & 19 deletions maestro/redis/consumer/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,87 @@ func NewSinkerActivityListener(l *zap.Logger, eventService service.EventService,
}
}

func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context) error {
//listening sinker events
err := s.redisClient.XGroupCreateMkStream(ctx, maestroredis.SinksActivityStream, maestroredis.GroupMaestro, "$").Err()
func (s *sinkerActivityListenerService) ReadSinksActivity(ctx context.Context) error {
const activityStream = "orb.sink_activity"
err := s.redisClient.XGroupCreateMkStream(ctx, activityStream, maestroredis.GroupMaestro, "$").Err()
if err != nil && err.Error() != maestroredis.Exists {
return err
}
go func() {
for {
streams, err := s.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: maestroredis.GroupMaestro,
Consumer: "orb_maestro-es-consumer",
Streams: []string{activityStream, ">"},
}).Result()
if err != nil || len(streams) == 0 {
return
}
for _, msg := range streams[0].Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(msg.Values)
s.logger.Debug("Reading message from idle stream",
zap.String("message_id", msg.ID),
zap.String("sink_id", event.SinkID),
zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkActivity(ctx, event)
if err != nil {
s.logger.Error("error receiving message", zap.Error(err))
return
}
}
}
}()
return nil
}

err = s.redisClient.XGroupCreateMkStream(ctx, maestroredis.SinksIdleStream, maestroredis.GroupMaestro, "$").Err()
func (s *sinkerActivityListenerService) ReadSinksIdle(ctx context.Context) error {
const idleStream = "orb.sink_idle"
err := s.redisClient.XGroupCreateMkStream(ctx, idleStream, maestroredis.GroupMaestro, "$").Err()
if err != nil && err.Error() != maestroredis.Exists {
return err
}
s.logger.Debug("Reading Sinker Events", zap.String("stream", maestroredis.SinksIdleStream), zap.String("stream", maestroredis.SinksActivityStream))
for {
streams, err := s.readStreams(ctx)
if err != nil || len(streams) == 0 {
continue
}
for _, str := range streams {
go s.processStream(ctx, str)
go func() {
for {
streams, err := s.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: maestroredis.GroupMaestro,
Consumer: "orb_maestro-es-consumer",
Streams: []string{idleStream, ">"},
}).Result()
if err != nil {
return
}
for _, msg := range streams[0].Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(msg.Values)
s.logger.Debug("Reading message from idle stream",
zap.String("message_id", msg.ID),
zap.String("sink_id", event.SinkID),
zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkIdle(ctx, event)
if err != nil {
s.logger.Error("error receiving message", zap.Error(err))
return
}
}
}
}
}()
return nil
}

// createStreamIfNotExists - create stream if not exists
func (s *sinkerActivityListenerService) createStreamIfNotExists(ctx context.Context, streamName string) error {
err := s.redisClient.XGroupCreateMkStream(ctx, streamName, maestroredis.GroupMaestro, "$").Err()
if err != nil && err.Error() != maestroredis.Exists {
return err
}
func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context) error {
go func() {
err := s.ReadSinksActivity(ctx)
if err != nil {
s.logger.Error("error reading activity stream", zap.Error(err))
}
}()
go func() {
err := s.ReadSinksIdle(ctx)
if err != nil {
s.logger.Error("error reading idle stream", zap.Error(err))
}
}()
return nil
}

Expand All @@ -75,6 +127,36 @@ func (s *sinkerActivityListenerService) readStreams(ctx context.Context) ([]redi
return streams, nil
}

func (s *sinkerActivityListenerService) processActivity(ctx context.Context, stream redis.XStream) {
for _, message := range stream.Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(message.Values)
s.logger.Debug("Reading message from activity stream",
zap.String("message_id", message.ID),
zap.String("sink_id", event.SinkID),
zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkActivity(ctx, event)
if err != nil {
s.logger.Error("error receiving message", zap.Error(err))
}
}
}

func (s *sinkerActivityListenerService) processIdle(ctx context.Context, stream redis.XStream) {
for _, message := range stream.Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(message.Values)
s.logger.Debug("Reading message from activity stream",
zap.String("message_id", message.ID),
zap.String("sink_id", event.SinkID),
zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkIdle(ctx, event)
if err != nil {
s.logger.Error("error receiving message", zap.Error(err))
}
}
}

// processStream - process stream
func (s *sinkerActivityListenerService) processStream(ctx context.Context, stream redis.XStream) {
eventType := ""
Expand Down
12 changes: 10 additions & 2 deletions maestro/redis/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,23 @@ func (sue SinksUpdateEvent) Decode(values map[string]interface{}) {
sue.Owner = values["owner"].(string)
sue.Config = types.FromMap(values["config"].(map[string]interface{}))
sue.Backend = values["backend"].(string)
sue.Timestamp = values["timestamp"].(time.Time)
var err error
sue.Timestamp, err = time.Parse(time.RFC3339, values["timestamp"].(string))
if err != nil {
sue.Timestamp = time.Now()
}
}

func (cse SinkerUpdateEvent) Decode(values map[string]interface{}) {
cse.OwnerID = values["owner_id"].(string)
cse.SinkID = values["sink_id"].(string)
cse.State = values["state"].(string)
cse.Size = values["size"].(string)
cse.Timestamp = values["timestamp"].(time.Time)
var err error
cse.Timestamp, err = time.Parse(time.RFC3339, values["timestamp"].(string))
if err != nil {
cse.Timestamp = time.Now()
}
}

func (cse SinkerUpdateEvent) Encode() map[string]interface{} {
Expand Down

0 comments on commit 1f2a18b

Please sign in to comment.