From 275d1289ffcafd71dd3bee12d871f74514406845 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 27 Sep 2023 17:02:42 -0300 Subject: [PATCH 1/4] feat(maestro): add observability and logs to new flow on maestro. --- maestro/service.go | 15 +++++ maestro/service/deploy_service.go | 5 ++ maestro/service/metrics_middleware.go | 85 +++++++++++++++++++++++++++ 3 files changed, 105 insertions(+) create mode 100644 maestro/service/metrics_middleware.go diff --git a/maestro/service.go b/maestro/service.go index 627068065..ca2f89dc4 100644 --- a/maestro/service.go +++ b/maestro/service.go @@ -10,6 +10,7 @@ package maestro import ( "context" + kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/go-redis/redis/v8" "github.com/jmoiron/sqlx" "github.com/orb-community/orb/maestro/deployment" @@ -20,6 +21,7 @@ import ( "github.com/orb-community/orb/maestro/service" "github.com/orb-community/orb/pkg/config" sinkspb "github.com/orb-community/orb/sinks/pb" + stdprometheus "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -52,6 +54,19 @@ func NewMaestroService(logger *zap.Logger, streamRedisClient *redis.Client, sink ps := producer.NewMaestroProducer(logger, streamRedisClient) monitorService := monitor.NewMonitorService(logger, &sinksGrpcClient, ps, &kubectr) eventService := service.NewEventService(logger, deploymentService, kubectr) + eventService = service.NewTracingService(logger, eventService, + kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: "maestro", + Subsystem: "comms", + Name: "message_count", + Help: "Number of messages received.", + }, []string{"method", "sink_id", "owner_id"}), + kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: "maestro", + Subsystem: "comms", + Name: "message_latency_microseconds", + Help: "Total duration of messages processed in microseconds.", + }, []string{"method", "sink_id", "owner_id"})) sinkListenerService := rediscons1.NewSinksListenerController(logger, eventService, sinkerRedisClient, sinksGrpcClient) activityListener := rediscons1.NewSinkerActivityListener(logger, eventService, sinkerRedisClient) return &maestroService{ diff --git a/maestro/service/deploy_service.go b/maestro/service/deploy_service.go index d0e529b02..be6977f9c 100644 --- a/maestro/service/deploy_service.go +++ b/maestro/service/deploy_service.go @@ -35,6 +35,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr // HandleSinkCreate will create deployment entry in postgres, will create deployment in Redis, to prepare for SinkActivity func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { + d.logger.Info("handling sink create event", zap.String("sink-id", event.SinkID)) // Create Deployment Entry entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config) // Use deploymentService, which will create deployment in both postgres and redis @@ -48,6 +49,7 @@ func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis. func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { now := time.Now() + d.logger.Info("handling sink update event", zap.String("sink-id", event.SinkID)) // check if exists deployment entry from postgres entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) if err != nil { @@ -69,6 +71,7 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis. } func (d *eventService) HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error { + d.logger.Info("handling sink delete event", zap.String("sink-id", event.SinkID)) deploymentEntry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) if err != nil { d.logger.Warn("did not find collector entry for sink", zap.String("sink-id", event.SinkID)) @@ -90,6 +93,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi if event.State != "active" { return errors.New("trying to deploy sink that is not active") } + d.logger.Info("handling sink activity event", zap.String("sink-id", event.SinkID)) // check if exists deployment entry from postgres _, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID) if err != nil { @@ -117,6 +121,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi func (d *eventService) HandleSinkIdle(ctx context.Context, event maestroredis.SinkerUpdateEvent) error { // check if exists deployment entry from postgres + d.logger.Info("handling sink idle event", zap.String("sink-id", event.SinkID)) _, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID) if err != nil { d.logger.Error("error trying to get deployment entry", zap.Error(err)) diff --git a/maestro/service/metrics_middleware.go b/maestro/service/metrics_middleware.go new file mode 100644 index 000000000..81f8d8df5 --- /dev/null +++ b/maestro/service/metrics_middleware.go @@ -0,0 +1,85 @@ +package service + +import ( + "context" + "github.com/go-kit/kit/metrics" + maestroredis "github.com/orb-community/orb/maestro/redis" + "go.uber.org/zap" + "time" +) + +type tracingService struct { + logger *zap.Logger + counter metrics.Counter + latency metrics.Histogram + nextService EventService +} + +func NewTracingService(logger *zap.Logger, service EventService, counter metrics.Counter, latency metrics.Histogram) EventService { + return &tracingService{logger: logger, nextService: service, counter: counter, latency: latency} +} + +func (t *tracingService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { + defer func(begun time.Time) { + labels := []string{ + "method", "HandleSinkCreate", + "sink_id", event.SinkID, + "owner_id", event.Owner, + } + t.counter.With(labels...).Add(1) + t.latency.With(labels...).Observe(float64(time.Since(begun).Microseconds())) + }(time.Now()) + return t.nextService.HandleSinkCreate(ctx, event) +} + +func (t *tracingService) HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { + defer func(begun time.Time) { + labels := []string{ + "method", "HandleSinkCreate", + "sink_id", event.SinkID, + "owner_id", event.Owner, + } + t.counter.With(labels...).Add(1) + t.latency.With(labels...).Observe(float64(time.Since(begun).Microseconds())) + }(time.Now()) + return t.nextService.HandleSinkUpdate(ctx, event) +} + +func (t *tracingService) HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error { + defer func(begun time.Time) { + labels := []string{ + "method", "HandleSinkCreate", + "sink_id", event.SinkID, + "owner_id", event.Owner, + } + t.counter.With(labels...).Add(1) + t.latency.With(labels...).Observe(float64(time.Since(begun).Microseconds())) + }(time.Now()) + return t.nextService.HandleSinkDelete(ctx, event) +} + +func (t *tracingService) HandleSinkActivity(ctx context.Context, event maestroredis.SinkerUpdateEvent) error { + defer func(begun time.Time) { + labels := []string{ + "method", "HandleSinkCreate", + "sink_id", event.SinkID, + "owner_id", event.OwnerID, + } + t.counter.With(labels...).Add(1) + t.latency.With(labels...).Observe(float64(time.Since(begun).Microseconds())) + }(time.Now()) + return t.nextService.HandleSinkActivity(ctx, event) +} + +func (t *tracingService) HandleSinkIdle(ctx context.Context, event maestroredis.SinkerUpdateEvent) error { + defer func(begun time.Time) { + labels := []string{ + "method", "HandleSinkCreate", + "sink_id", event.SinkID, + "owner_id", event.OwnerID, + } + t.counter.With(labels...).Add(1) + t.latency.With(labels...).Observe(float64(time.Since(begun).Microseconds())) + }(time.Now()) + return t.nextService.HandleSinkIdle(ctx, event) +} From 402d68998adad0fabc4e8c5a9e0da3600007b898 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 27 Sep 2023 17:13:06 -0300 Subject: [PATCH 2/4] feat(maestro): adding more logs and removing return which could be omitting logs. --- maestro/redis/consumer/sinker.go | 4 ++++ maestro/redis/consumer/sinks.go | 8 ++++---- maestro/service.go | 2 -- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/maestro/redis/consumer/sinker.go b/maestro/redis/consumer/sinker.go index a60bf9ff7..2c19af9b0 100644 --- a/maestro/redis/consumer/sinker.go +++ b/maestro/redis/consumer/sinker.go @@ -57,6 +57,8 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context for _, message := range stream.Messages { event := maestroredis.SinkerUpdateEvent{} event.Decode(message.Values) + s.logger.Info("Reading message from activity stream", zap.String("message_id", message.ID), + zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID)) err := s.eventService.HandleSinkActivity(ctx, event) if err != nil { s.logger.Error("error receiving message", zap.Error(err)) @@ -66,6 +68,8 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context for _, message := range stream.Messages { event := maestroredis.SinkerUpdateEvent{} event.Decode(message.Values) + s.logger.Info("Reading message from idle stream", zap.String("message_id", message.ID), + zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID)) err := s.eventService.HandleSinkIdle(ctx, event) if err != nil { s.logger.Error("error receiving message", zap.Error(err)) diff --git a/maestro/redis/consumer/sinks.go b/maestro/redis/consumer/sinks.go index 59440d642..e5a16cc43 100644 --- a/maestro/redis/consumer/sinks.go +++ b/maestro/redis/consumer/sinks.go @@ -62,7 +62,7 @@ func (ls *sinksListenerService) SubscribeSinksEvents(ctx context.Context) error } func (ls *sinksListenerService) ReceiveMessage(ctx context.Context, msg redis.XMessage) error { - logger := ls.logger.With(zap.String("maestro_sinks_listener_msg", msg.ID)) + logger := ls.logger.Named("sinks_listener:" + msg.ID) event := msg.Values rte, err := redis2.DecodeSinksEvent(event, event["operation"].(string)) if err != nil { @@ -106,7 +106,7 @@ func (ls *sinksListenerService) ReceiveMessage(ctx context.Context, msg redis.XM // handleSinksUpdate logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - ls.logger.Info("Received maestro UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) + ls.logger.Info("Received sinks UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkUpdate(ctx, event) if err != nil { return err @@ -117,7 +117,7 @@ func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event mae // handleSinksDelete logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - ls.logger.Info("Received maestro DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) + ls.logger.Info("Received sinks DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkDelete(ctx, event) if err != nil { return err @@ -127,7 +127,7 @@ func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event mae // handleSinksCreate logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - ls.logger.Info("Received event to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) + ls.logger.Info("Received sinks to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkCreate(ctx, event) if err != nil { return err diff --git a/maestro/service.go b/maestro/service.go index ca2f89dc4..6a6d00e94 100644 --- a/maestro/service.go +++ b/maestro/service.go @@ -115,7 +115,6 @@ func (svc *maestroService) Stop() { func (svc *maestroService) subscribeToSinksEvents(ctx context.Context) { if err := svc.sinkListenerService.SubscribeSinksEvents(ctx); err != nil { svc.logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err)) - return } svc.logger.Info("finished reading sinks events") ctx.Done() @@ -124,7 +123,6 @@ func (svc *maestroService) subscribeToSinksEvents(ctx context.Context) { func (svc *maestroService) subscribeToSinkerEvents(ctx context.Context) { if err := svc.activityListener.SubscribeSinksEvents(ctx); err != nil { svc.logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err)) - return } svc.logger.Info("finished reading sinker events") ctx.Done() From 94704734bdc2989385cd40b80945d2aafe5d733a Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 27 Sep 2023 17:17:52 -0300 Subject: [PATCH 3/4] feat(maestro): adding more logs and removing return which could be omitting logs. --- maestro/redis/consumer/sinker.go | 3 ++- maestro/redis/consumer/sinks.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/maestro/redis/consumer/sinker.go b/maestro/redis/consumer/sinker.go index 2c19af9b0..2f2050071 100644 --- a/maestro/redis/consumer/sinker.go +++ b/maestro/redis/consumer/sinker.go @@ -5,6 +5,7 @@ import ( "github.com/go-redis/redis/v8" maestroredis "github.com/orb-community/orb/maestro/redis" "github.com/orb-community/orb/maestro/service" + redis2 "github.com/orb-community/orb/sinks/redis" "go.uber.org/zap" ) @@ -39,7 +40,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context if err != nil && err.Error() != maestroredis.Exists { return err } - + s.logger.Info("Reading Sinker Events", zap.String("stream", redis2.StreamSinks)) for { const activityStream = "orb.sink_activity" const idleStream = "orb.sink_idle" diff --git a/maestro/redis/consumer/sinks.go b/maestro/redis/consumer/sinks.go index e5a16cc43..e44ddd451 100644 --- a/maestro/redis/consumer/sinks.go +++ b/maestro/redis/consumer/sinks.go @@ -41,7 +41,7 @@ func (ls *sinksListenerService) SubscribeSinksEvents(ctx context.Context) error if err != nil && err.Error() != redis2.Exists { return err } - + ls.logger.Info("Reading Sinks Events", zap.String("stream", redis2.StreamSinks)) for { streams, err := ls.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: redis2.GroupMaestro, From 8181afac85135b1ee60e5f1a5f181a69d893d5d3 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 27 Sep 2023 17:29:44 -0300 Subject: [PATCH 4/4] feat(logs): fixing logs and changing size source in otlp receiver. --- maestro/redis/consumer/sinker.go | 6 +++--- maestro/redis/consumer/sinks.go | 8 ++++---- maestro/service/deploy_service.go | 10 +++++----- sinker/otel/orbreceiver/logs.go | 11 +++++------ sinker/otel/orbreceiver/metrics.go | 11 ++++++----- sinker/otel/orbreceiver/traces.go | 10 ++++++---- 6 files changed, 29 insertions(+), 27 deletions(-) diff --git a/maestro/redis/consumer/sinker.go b/maestro/redis/consumer/sinker.go index 2f2050071..14778978f 100644 --- a/maestro/redis/consumer/sinker.go +++ b/maestro/redis/consumer/sinker.go @@ -40,7 +40,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context if err != nil && err.Error() != maestroredis.Exists { return err } - s.logger.Info("Reading Sinker Events", zap.String("stream", redis2.StreamSinks)) + s.logger.Debug("Reading Sinker Events", zap.String("stream", redis2.StreamSinks)) for { const activityStream = "orb.sink_activity" const idleStream = "orb.sink_idle" @@ -58,7 +58,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context for _, message := range stream.Messages { event := maestroredis.SinkerUpdateEvent{} event.Decode(message.Values) - s.logger.Info("Reading message from activity stream", zap.String("message_id", message.ID), + s.logger.Debug("Reading message from activity stream", zap.String("message_id", message.ID), zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID)) err := s.eventService.HandleSinkActivity(ctx, event) if err != nil { @@ -69,7 +69,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context for _, message := range stream.Messages { event := maestroredis.SinkerUpdateEvent{} event.Decode(message.Values) - s.logger.Info("Reading message from idle stream", zap.String("message_id", message.ID), + s.logger.Debug("Reading message from idle stream", zap.String("message_id", message.ID), zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID)) err := s.eventService.HandleSinkIdle(ctx, event) if err != nil { diff --git a/maestro/redis/consumer/sinks.go b/maestro/redis/consumer/sinks.go index e44ddd451..5a0e486db 100644 --- a/maestro/redis/consumer/sinks.go +++ b/maestro/redis/consumer/sinks.go @@ -41,7 +41,7 @@ func (ls *sinksListenerService) SubscribeSinksEvents(ctx context.Context) error if err != nil && err.Error() != redis2.Exists { return err } - ls.logger.Info("Reading Sinks Events", zap.String("stream", redis2.StreamSinks)) + ls.logger.Debug("Reading Sinks Events", zap.String("stream", redis2.StreamSinks)) for { streams, err := ls.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: redis2.GroupMaestro, @@ -106,7 +106,7 @@ func (ls *sinksListenerService) ReceiveMessage(ctx context.Context, msg redis.XM // handleSinksUpdate logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - ls.logger.Info("Received sinks UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) + ls.logger.Debug("Received sinks UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkUpdate(ctx, event) if err != nil { return err @@ -117,7 +117,7 @@ func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event mae // handleSinksDelete logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - ls.logger.Info("Received sinks DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) + ls.logger.Debug("Received sinks DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkDelete(ctx, event) if err != nil { return err @@ -127,7 +127,7 @@ func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event mae // handleSinksCreate logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - ls.logger.Info("Received sinks to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) + ls.logger.Debug("Received sinks to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkCreate(ctx, event) if err != nil { return err diff --git a/maestro/service/deploy_service.go b/maestro/service/deploy_service.go index be6977f9c..d42bbef39 100644 --- a/maestro/service/deploy_service.go +++ b/maestro/service/deploy_service.go @@ -35,7 +35,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr // HandleSinkCreate will create deployment entry in postgres, will create deployment in Redis, to prepare for SinkActivity func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - d.logger.Info("handling sink create event", zap.String("sink-id", event.SinkID)) + d.logger.Debug("handling sink create event", zap.String("sink-id", event.SinkID)) // Create Deployment Entry entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config) // Use deploymentService, which will create deployment in both postgres and redis @@ -49,7 +49,7 @@ func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis. func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { now := time.Now() - d.logger.Info("handling sink update event", zap.String("sink-id", event.SinkID)) + d.logger.Debug("handling sink update event", zap.String("sink-id", event.SinkID)) // check if exists deployment entry from postgres entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) if err != nil { @@ -71,7 +71,7 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis. } func (d *eventService) HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - d.logger.Info("handling sink delete event", zap.String("sink-id", event.SinkID)) + d.logger.Debug("handling sink delete event", zap.String("sink-id", event.SinkID)) deploymentEntry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) if err != nil { d.logger.Warn("did not find collector entry for sink", zap.String("sink-id", event.SinkID)) @@ -93,7 +93,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi if event.State != "active" { return errors.New("trying to deploy sink that is not active") } - d.logger.Info("handling sink activity event", zap.String("sink-id", event.SinkID)) + d.logger.Debug("handling sink activity event", zap.String("sink-id", event.SinkID)) // check if exists deployment entry from postgres _, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID) if err != nil { @@ -121,7 +121,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi func (d *eventService) HandleSinkIdle(ctx context.Context, event maestroredis.SinkerUpdateEvent) error { // check if exists deployment entry from postgres - d.logger.Info("handling sink idle event", zap.String("sink-id", event.SinkID)) + d.logger.Debug("handling sink idle event", zap.String("sink-id", event.SinkID)) _, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID) if err != nil { d.logger.Error("error trying to get deployment entry", zap.Error(err)) diff --git a/sinker/otel/orbreceiver/logs.go b/sinker/otel/orbreceiver/logs.go index a054f1029..bff9a860b 100644 --- a/sinker/otel/orbreceiver/logs.go +++ b/sinker/otel/orbreceiver/logs.go @@ -6,7 +6,7 @@ package orbreceiver import ( "context" - "fmt" + "strconv" "strings" "github.com/mainflux/mainflux/pkg/messaging" @@ -32,6 +32,7 @@ func (r *OrbReceiver) MessageLogsInbound(msg messaging.Message) error { zap.Int64("created", msg.Created), zap.String("publisher", msg.Publisher)) r.cfg.Logger.Info("received log message, pushing to kafka exporter") + size := len(msg.Payload) decompressedPayload := r.DecompressBrotli(msg.Payload) lr, err := r.encoder.unmarshalLogsRequest(decompressedPayload) if err != nil { @@ -48,13 +49,13 @@ func (r *OrbReceiver) MessageLogsInbound(msg messaging.Message) error { scopes := lr.Logs().ResourceLogs().At(0).ScopeLogs() for i := 0; i < scopes.Len(); i++ { - r.ProccessLogsContext(scopes.At(i), msg.Channel) + r.ProccessLogsContext(scopes.At(i), msg.Channel, size) } }() return nil } -func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string) { +func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string, size int) { // Extract Datasets attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids") if !ok { @@ -118,15 +119,13 @@ func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string) lr.ResourceLogs().At(0).Resource().Attributes().PutStr("service.name", agentPb.AgentName) lr.ResourceLogs().At(0).Resource().Attributes().PutStr("service.instance.id", polID) request := plogotlp.NewExportRequestFromLogs(lr) - sizeable, _ := request.MarshalProto() _, err = r.exportLogs(attributeCtx, request) if err != nil { r.cfg.Logger.Error("error during logs export, skipping sink", zap.Error(err)) _ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, "0") continue } else { - size := fmt.Sprintf("%d", len(sizeable)) - _ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size) + _ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size)) } } } diff --git a/sinker/otel/orbreceiver/metrics.go b/sinker/otel/orbreceiver/metrics.go index 2a9242e99..a847b11dd 100644 --- a/sinker/otel/orbreceiver/metrics.go +++ b/sinker/otel/orbreceiver/metrics.go @@ -6,6 +6,7 @@ package orbreceiver import ( "context" + "strconv" "strings" "time" @@ -32,7 +33,8 @@ func (r *OrbReceiver) MessageMetricsInbound(msg messaging.Message) error { zap.String("protocol", msg.Protocol), zap.Int64("created", msg.Created), zap.String("publisher", msg.Publisher)) - r.cfg.Logger.Info("received metric message, pushing to kafka exporter") + r.cfg.Logger.Debug("received metric message, pushing to kafka exporter", zap.String("publisher", msg.Publisher)) + size := len(msg.Payload) decompressedPayload := r.DecompressBrotli(msg.Payload) mr, err := r.encoder.unmarshalMetricsRequest(decompressedPayload) if err != nil { @@ -49,13 +51,13 @@ func (r *OrbReceiver) MessageMetricsInbound(msg messaging.Message) error { scopes := mr.Metrics().ResourceMetrics().At(0).ScopeMetrics() for i := 0; i < scopes.Len(); i++ { - r.ProccessMetricsContext(scopes.At(i), msg.Channel) + r.ProccessMetricsContext(scopes.At(i), msg.Channel, size) } }() return nil } -func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel string) { +func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel string, size int) { // Extract Datasets attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids") if !ok { @@ -110,10 +112,9 @@ func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel attributeCtx = context.WithValue(attributeCtx, "orb_tags", agentPb.OrbTags) attributeCtx = context.WithValue(attributeCtx, "agent_groups", agentPb.AgentGroupIDs) attributeCtx = context.WithValue(attributeCtx, "agent_ownerID", agentPb.OwnerID) - size := string(rune(scope.Metrics().Len())) for sinkId := range sinkIds { - err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size) + err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size)) if err != nil { r.cfg.Logger.Error("error notifying metrics sink active, changing state, skipping sink", zap.String("sink-id", sinkId), zap.Error(err)) } diff --git a/sinker/otel/orbreceiver/traces.go b/sinker/otel/orbreceiver/traces.go index 8c83381f6..af2bdbab3 100644 --- a/sinker/otel/orbreceiver/traces.go +++ b/sinker/otel/orbreceiver/traces.go @@ -6,6 +6,7 @@ package orbreceiver import ( "context" + "strconv" "strings" "github.com/mainflux/mainflux/pkg/messaging" @@ -31,6 +32,7 @@ func (r *OrbReceiver) MessageTracesInbound(msg messaging.Message) error { zap.Int64("created", msg.Created), zap.String("publisher", msg.Publisher)) r.cfg.Logger.Info("received trace message, pushing to kafka exporter") + size := len(msg.Payload) decompressedPayload := r.DecompressBrotli(msg.Payload) tr, err := r.encoder.unmarshalTracesRequest(decompressedPayload) if err != nil { @@ -47,13 +49,13 @@ func (r *OrbReceiver) MessageTracesInbound(msg messaging.Message) error { scopes := tr.Traces().ResourceSpans().At(0).ScopeSpans() for i := 0; i < scopes.Len(); i++ { - r.ProccessTracesContext(scopes.At(i), msg.Channel) + r.ProccessTracesContext(scopes.At(i), msg.Channel, size) } }() return nil } -func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel string) { +func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel string, size int) { // Extract Datasets attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids") if !ok { @@ -106,9 +108,9 @@ func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel str attributeCtx = context.WithValue(attributeCtx, "orb_tags", agentPb.OrbTags) attributeCtx = context.WithValue(attributeCtx, "agent_groups", agentPb.AgentGroupIDs) attributeCtx = context.WithValue(attributeCtx, "agent_ownerID", agentPb.OwnerID) - size := string(rune(scope.Spans().Len())) + for sinkId := range sinkIds { - err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size) + err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size)) if err != nil { r.cfg.Logger.Error("error notifying sink active, changing state, skipping sink", zap.String("sink-id", sinkId), zap.Error(err)) continue