From 10e26c861ab150d740c1c350a7bc46ba6321f91c Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 2 Oct 2023 10:16:33 -0300 Subject: [PATCH 1/6] fix(maestro): fix maestro --- maestro/deployment/repository.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/maestro/deployment/repository.go b/maestro/deployment/repository.go index 93f9ead14..e55bb7795 100644 --- a/maestro/deployment/repository.go +++ b/maestro/deployment/repository.go @@ -135,8 +135,23 @@ func (r *repositoryService) Remove(ctx context.Context, ownerId string, sinkId s func (r *repositoryService) FindByOwnerAndSink(ctx context.Context, ownerId string, sinkId string) (*Deployment, error) { tx := r.db.MustBeginTx(ctx, nil) var rows []Deployment - err := tx.SelectContext(ctx, &rows, "SELECT * FROM deployments WHERE owner_id = :owner_id AND sink_id = :sink_id", - map[string]interface{}{"owner_id": ownerId, "sink_id": sinkId}) + args := []interface{}{ownerId, sinkId} + query := ` + SELECT id, + owner_id, + sink_id, + backend, + config, + last_status, + last_status_update, + last_error_message, + last_error_time, + collector_name, + last_collector_deploy_time, + last_collector_stop_time + FROM deployments WHERE owner_id = $1 AND sink_id = $2 + ` + err := tx.SelectContext(ctx, &rows, query, args) if err != nil { _ = tx.Rollback() return nil, err From 52fe4f271367163e1923b7767f0b0b054bbc8271 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 2 Oct 2023 10:25:02 -0300 Subject: [PATCH 2/6] fix(maestro): fix SQL. --- maestro/deployment/repository.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maestro/deployment/repository.go b/maestro/deployment/repository.go index e55bb7795..c0ffa1ec8 100644 --- a/maestro/deployment/repository.go +++ b/maestro/deployment/repository.go @@ -149,7 +149,7 @@ func (r *repositoryService) FindByOwnerAndSink(ctx context.Context, ownerId stri collector_name, last_collector_deploy_time, last_collector_stop_time - FROM deployments WHERE owner_id = $1 AND sink_id = $2 + FROM deployments WHERE owner_id = ? AND sink_id = ? ` err := tx.SelectContext(ctx, &rows, query, args) if err != nil { From 8fc712733bcb9b8205eb5fd78c3d4b0185b0f1fe Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 2 Oct 2023 10:29:50 -0300 Subject: [PATCH 3/6] fix(maestro): fix SQL. --- maestro/deployment/repository.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/maestro/deployment/repository.go b/maestro/deployment/repository.go index c0ffa1ec8..ff4e5ad6e 100644 --- a/maestro/deployment/repository.go +++ b/maestro/deployment/repository.go @@ -36,6 +36,20 @@ type repositoryService struct { func (r *repositoryService) FetchAll(ctx context.Context) ([]Deployment, error) { tx := r.db.MustBeginTx(ctx, nil) var deployments []Deployment + query := ` + SELECT id, + owner_id, + sink_id, + backend, + config, + last_status, + last_status_update, + last_error_message, + last_error_time, + collector_name, + last_collector_deploy_time, + last_collector_stop_time + FROM deployments` err := tx.SelectContext(ctx, &deployments, "SELECT * FROM deployments", nil) if err != nil { _ = tx.Rollback() From a3ddf16215847138be8a4e31f85710f6f91ed01e Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 2 Oct 2023 10:30:34 -0300 Subject: [PATCH 4/6] fix(maestro): fix SQL. --- maestro/deployment/repository.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maestro/deployment/repository.go b/maestro/deployment/repository.go index ff4e5ad6e..9df735aa6 100644 --- a/maestro/deployment/repository.go +++ b/maestro/deployment/repository.go @@ -50,7 +50,7 @@ func (r *repositoryService) FetchAll(ctx context.Context) ([]Deployment, error) last_collector_deploy_time, last_collector_stop_time FROM deployments` - err := tx.SelectContext(ctx, &deployments, "SELECT * FROM deployments", nil) + err := tx.SelectContext(ctx, &deployments, query, nil) if err != nil { _ = tx.Rollback() return nil, err From 68d62212e07446acbd0e905ad5815fd1221ceafd Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 2 Oct 2023 15:17:12 -0300 Subject: [PATCH 5/6] fix(maestro): change how read sinker events. --- maestro/redis/consumer/sinker.go | 120 ++++++++++++++++++++++++++----- 1 file changed, 101 insertions(+), 19 deletions(-) diff --git a/maestro/redis/consumer/sinker.go b/maestro/redis/consumer/sinker.go index 8975fb256..bfee98c4e 100644 --- a/maestro/redis/consumer/sinker.go +++ b/maestro/redis/consumer/sinker.go @@ -28,35 +28,87 @@ func NewSinkerActivityListener(l *zap.Logger, eventService service.EventService, } } -func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context) error { - //listening sinker events - err := s.redisClient.XGroupCreateMkStream(ctx, maestroredis.SinksActivityStream, maestroredis.GroupMaestro, "$").Err() +func (s *sinkerActivityListenerService) ReadSinksActivity(ctx context.Context) error { + const activityStream = "orb.sink_activity" + err := s.redisClient.XGroupCreateMkStream(ctx, activityStream, maestroredis.GroupMaestro, "$").Err() if err != nil && err.Error() != maestroredis.Exists { return err } + go func() { + for { + streams, err := s.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: maestroredis.GroupMaestro, + Consumer: "orb_maestro-es-consumer", + Streams: []string{activityStream, ">"}, + }).Result() + if err != nil || len(streams) == 0 { + return + } + for _, msg := range streams[0].Messages { + event := maestroredis.SinkerUpdateEvent{} + event.Decode(msg.Values) + s.logger.Debug("Reading message from idle stream", + zap.String("message_id", msg.ID), + zap.String("sink_id", event.SinkID), + zap.String("owner_id", event.OwnerID)) + err := s.eventService.HandleSinkActivity(ctx, event) + if err != nil { + s.logger.Error("error receiving message", zap.Error(err)) + return + } + } + } + }() + return nil +} - err = s.redisClient.XGroupCreateMkStream(ctx, maestroredis.SinksIdleStream, maestroredis.GroupMaestro, "$").Err() +func (s *sinkerActivityListenerService) ReadSinksIdle(ctx context.Context) error { + const idleStream = "orb.sink_idle" + err := s.redisClient.XGroupCreateMkStream(ctx, idleStream, maestroredis.GroupMaestro, "$").Err() if err != nil && err.Error() != maestroredis.Exists { return err } - s.logger.Debug("Reading Sinker Events", zap.String("stream", maestroredis.SinksIdleStream), zap.String("stream", maestroredis.SinksActivityStream)) - for { - streams, err := s.readStreams(ctx) - if err != nil || len(streams) == 0 { - continue - } - for _, str := range streams { - go s.processStream(ctx, str) + go func() { + for { + streams, err := s.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: maestroredis.GroupMaestro, + Consumer: "orb_maestro-es-consumer", + Streams: []string{idleStream, ">"}, + }).Result() + if err != nil { + return + } + for _, msg := range streams[0].Messages { + event := maestroredis.SinkerUpdateEvent{} + event.Decode(msg.Values) + s.logger.Debug("Reading message from idle stream", + zap.String("message_id", msg.ID), + zap.String("sink_id", event.SinkID), + zap.String("owner_id", event.OwnerID)) + err := s.eventService.HandleSinkIdle(ctx, event) + if err != nil { + s.logger.Error("error receiving message", zap.Error(err)) + return + } + } } - } + }() + return nil } -// createStreamIfNotExists - create stream if not exists -func (s *sinkerActivityListenerService) createStreamIfNotExists(ctx context.Context, streamName string) error { - err := s.redisClient.XGroupCreateMkStream(ctx, streamName, maestroredis.GroupMaestro, "$").Err() - if err != nil && err.Error() != maestroredis.Exists { - return err - } +func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context) error { + go func() { + err := s.ReadSinksActivity(ctx) + if err != nil { + s.logger.Error("error reading activity stream", zap.Error(err)) + } + }() + go func() { + err := s.ReadSinksIdle(ctx) + if err != nil { + s.logger.Error("error reading idle stream", zap.Error(err)) + } + }() return nil } @@ -75,6 +127,36 @@ func (s *sinkerActivityListenerService) readStreams(ctx context.Context) ([]redi return streams, nil } +func (s *sinkerActivityListenerService) processActivity(ctx context.Context, stream redis.XStream) { + for _, message := range stream.Messages { + event := maestroredis.SinkerUpdateEvent{} + event.Decode(message.Values) + s.logger.Debug("Reading message from activity stream", + zap.String("message_id", message.ID), + zap.String("sink_id", event.SinkID), + zap.String("owner_id", event.OwnerID)) + err := s.eventService.HandleSinkActivity(ctx, event) + if err != nil { + s.logger.Error("error receiving message", zap.Error(err)) + } + } +} + +func (s *sinkerActivityListenerService) processIdle(ctx context.Context, stream redis.XStream) { + for _, message := range stream.Messages { + event := maestroredis.SinkerUpdateEvent{} + event.Decode(message.Values) + s.logger.Debug("Reading message from activity stream", + zap.String("message_id", message.ID), + zap.String("sink_id", event.SinkID), + zap.String("owner_id", event.OwnerID)) + err := s.eventService.HandleSinkIdle(ctx, event) + if err != nil { + s.logger.Error("error receiving message", zap.Error(err)) + } + } +} + // processStream - process stream func (s *sinkerActivityListenerService) processStream(ctx context.Context, stream redis.XStream) { eventType := "" From 213310d3b4426a18bc95655a75558835e90cb923 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 2 Oct 2023 15:23:09 -0300 Subject: [PATCH 6/6] fix(maestro): fix event reading. --- maestro/redis/events.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/maestro/redis/events.go b/maestro/redis/events.go index 6d7429c35..1134dbb3a 100644 --- a/maestro/redis/events.go +++ b/maestro/redis/events.go @@ -35,7 +35,11 @@ func (sue SinksUpdateEvent) Decode(values map[string]interface{}) { sue.Owner = values["owner"].(string) sue.Config = types.FromMap(values["config"].(map[string]interface{})) sue.Backend = values["backend"].(string) - sue.Timestamp = values["timestamp"].(time.Time) + var err error + sue.Timestamp, err = time.Parse(time.RFC3339, values["timestamp"].(string)) + if err != nil { + sue.Timestamp = time.Now() + } } func (cse SinkerUpdateEvent) Decode(values map[string]interface{}) { @@ -43,7 +47,11 @@ func (cse SinkerUpdateEvent) Decode(values map[string]interface{}) { cse.SinkID = values["sink_id"].(string) cse.State = values["state"].(string) cse.Size = values["size"].(string) - cse.Timestamp = values["timestamp"].(time.Time) + var err error + cse.Timestamp, err = time.Parse(time.RFC3339, values["timestamp"].(string)) + if err != nil { + cse.Timestamp = time.Now() + } } func (cse SinkerUpdateEvent) Encode() map[string]interface{} {