diff --git a/maestro/redis/consumer/sinker.go b/maestro/redis/consumer/sinker.go index 2f2050071..14778978f 100644 --- a/maestro/redis/consumer/sinker.go +++ b/maestro/redis/consumer/sinker.go @@ -40,7 +40,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context if err != nil && err.Error() != maestroredis.Exists { return err } - s.logger.Info("Reading Sinker Events", zap.String("stream", redis2.StreamSinks)) + s.logger.Debug("Reading Sinker Events", zap.String("stream", redis2.StreamSinks)) for { const activityStream = "orb.sink_activity" const idleStream = "orb.sink_idle" @@ -58,7 +58,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context for _, message := range stream.Messages { event := maestroredis.SinkerUpdateEvent{} event.Decode(message.Values) - s.logger.Info("Reading message from activity stream", zap.String("message_id", message.ID), + s.logger.Debug("Reading message from activity stream", zap.String("message_id", message.ID), zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID)) err := s.eventService.HandleSinkActivity(ctx, event) if err != nil { @@ -69,7 +69,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context for _, message := range stream.Messages { event := maestroredis.SinkerUpdateEvent{} event.Decode(message.Values) - s.logger.Info("Reading message from idle stream", zap.String("message_id", message.ID), + s.logger.Debug("Reading message from idle stream", zap.String("message_id", message.ID), zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID)) err := s.eventService.HandleSinkIdle(ctx, event) if err != nil { diff --git a/maestro/redis/consumer/sinks.go b/maestro/redis/consumer/sinks.go index e44ddd451..5a0e486db 100644 --- a/maestro/redis/consumer/sinks.go +++ b/maestro/redis/consumer/sinks.go @@ -41,7 +41,7 @@ func (ls *sinksListenerService) SubscribeSinksEvents(ctx context.Context) error if err != nil && err.Error() != redis2.Exists { return err } - ls.logger.Info("Reading Sinks Events", zap.String("stream", redis2.StreamSinks)) + ls.logger.Debug("Reading Sinks Events", zap.String("stream", redis2.StreamSinks)) for { streams, err := ls.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: redis2.GroupMaestro, @@ -106,7 +106,7 @@ func (ls *sinksListenerService) ReceiveMessage(ctx context.Context, msg redis.XM // handleSinksUpdate logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - ls.logger.Info("Received sinks UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) + ls.logger.Debug("Received sinks UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkUpdate(ctx, event) if err != nil { return err @@ -117,7 +117,7 @@ func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event mae // handleSinksDelete logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - ls.logger.Info("Received sinks DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) + ls.logger.Debug("Received sinks DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkDelete(ctx, event) if err != nil { return err @@ -127,7 +127,7 @@ func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event mae // handleSinksCreate logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - ls.logger.Info("Received sinks to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) + ls.logger.Debug("Received sinks to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkCreate(ctx, event) if err != nil { return err diff --git a/maestro/service/deploy_service.go b/maestro/service/deploy_service.go index be6977f9c..d42bbef39 100644 --- a/maestro/service/deploy_service.go +++ b/maestro/service/deploy_service.go @@ -35,7 +35,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr // HandleSinkCreate will create deployment entry in postgres, will create deployment in Redis, to prepare for SinkActivity func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - d.logger.Info("handling sink create event", zap.String("sink-id", event.SinkID)) + d.logger.Debug("handling sink create event", zap.String("sink-id", event.SinkID)) // Create Deployment Entry entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config) // Use deploymentService, which will create deployment in both postgres and redis @@ -49,7 +49,7 @@ func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis. func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { now := time.Now() - d.logger.Info("handling sink update event", zap.String("sink-id", event.SinkID)) + d.logger.Debug("handling sink update event", zap.String("sink-id", event.SinkID)) // check if exists deployment entry from postgres entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) if err != nil { @@ -71,7 +71,7 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis. } func (d *eventService) HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - d.logger.Info("handling sink delete event", zap.String("sink-id", event.SinkID)) + d.logger.Debug("handling sink delete event", zap.String("sink-id", event.SinkID)) deploymentEntry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) if err != nil { d.logger.Warn("did not find collector entry for sink", zap.String("sink-id", event.SinkID)) @@ -93,7 +93,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi if event.State != "active" { return errors.New("trying to deploy sink that is not active") } - d.logger.Info("handling sink activity event", zap.String("sink-id", event.SinkID)) + d.logger.Debug("handling sink activity event", zap.String("sink-id", event.SinkID)) // check if exists deployment entry from postgres _, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID) if err != nil { @@ -121,7 +121,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi func (d *eventService) HandleSinkIdle(ctx context.Context, event maestroredis.SinkerUpdateEvent) error { // check if exists deployment entry from postgres - d.logger.Info("handling sink idle event", zap.String("sink-id", event.SinkID)) + d.logger.Debug("handling sink idle event", zap.String("sink-id", event.SinkID)) _, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID) if err != nil { d.logger.Error("error trying to get deployment entry", zap.Error(err)) diff --git a/sinker/otel/orbreceiver/logs.go b/sinker/otel/orbreceiver/logs.go index a054f1029..bff9a860b 100644 --- a/sinker/otel/orbreceiver/logs.go +++ b/sinker/otel/orbreceiver/logs.go @@ -6,7 +6,7 @@ package orbreceiver import ( "context" - "fmt" + "strconv" "strings" "github.com/mainflux/mainflux/pkg/messaging" @@ -32,6 +32,7 @@ func (r *OrbReceiver) MessageLogsInbound(msg messaging.Message) error { zap.Int64("created", msg.Created), zap.String("publisher", msg.Publisher)) r.cfg.Logger.Info("received log message, pushing to kafka exporter") + size := len(msg.Payload) decompressedPayload := r.DecompressBrotli(msg.Payload) lr, err := r.encoder.unmarshalLogsRequest(decompressedPayload) if err != nil { @@ -48,13 +49,13 @@ func (r *OrbReceiver) MessageLogsInbound(msg messaging.Message) error { scopes := lr.Logs().ResourceLogs().At(0).ScopeLogs() for i := 0; i < scopes.Len(); i++ { - r.ProccessLogsContext(scopes.At(i), msg.Channel) + r.ProccessLogsContext(scopes.At(i), msg.Channel, size) } }() return nil } -func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string) { +func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string, size int) { // Extract Datasets attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids") if !ok { @@ -118,15 +119,13 @@ func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string) lr.ResourceLogs().At(0).Resource().Attributes().PutStr("service.name", agentPb.AgentName) lr.ResourceLogs().At(0).Resource().Attributes().PutStr("service.instance.id", polID) request := plogotlp.NewExportRequestFromLogs(lr) - sizeable, _ := request.MarshalProto() _, err = r.exportLogs(attributeCtx, request) if err != nil { r.cfg.Logger.Error("error during logs export, skipping sink", zap.Error(err)) _ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, "0") continue } else { - size := fmt.Sprintf("%d", len(sizeable)) - _ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size) + _ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size)) } } } diff --git a/sinker/otel/orbreceiver/metrics.go b/sinker/otel/orbreceiver/metrics.go index 2a9242e99..a847b11dd 100644 --- a/sinker/otel/orbreceiver/metrics.go +++ b/sinker/otel/orbreceiver/metrics.go @@ -6,6 +6,7 @@ package orbreceiver import ( "context" + "strconv" "strings" "time" @@ -32,7 +33,8 @@ func (r *OrbReceiver) MessageMetricsInbound(msg messaging.Message) error { zap.String("protocol", msg.Protocol), zap.Int64("created", msg.Created), zap.String("publisher", msg.Publisher)) - r.cfg.Logger.Info("received metric message, pushing to kafka exporter") + r.cfg.Logger.Debug("received metric message, pushing to kafka exporter", zap.String("publisher", msg.Publisher)) + size := len(msg.Payload) decompressedPayload := r.DecompressBrotli(msg.Payload) mr, err := r.encoder.unmarshalMetricsRequest(decompressedPayload) if err != nil { @@ -49,13 +51,13 @@ func (r *OrbReceiver) MessageMetricsInbound(msg messaging.Message) error { scopes := mr.Metrics().ResourceMetrics().At(0).ScopeMetrics() for i := 0; i < scopes.Len(); i++ { - r.ProccessMetricsContext(scopes.At(i), msg.Channel) + r.ProccessMetricsContext(scopes.At(i), msg.Channel, size) } }() return nil } -func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel string) { +func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel string, size int) { // Extract Datasets attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids") if !ok { @@ -110,10 +112,9 @@ func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel attributeCtx = context.WithValue(attributeCtx, "orb_tags", agentPb.OrbTags) attributeCtx = context.WithValue(attributeCtx, "agent_groups", agentPb.AgentGroupIDs) attributeCtx = context.WithValue(attributeCtx, "agent_ownerID", agentPb.OwnerID) - size := string(rune(scope.Metrics().Len())) for sinkId := range sinkIds { - err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size) + err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size)) if err != nil { r.cfg.Logger.Error("error notifying metrics sink active, changing state, skipping sink", zap.String("sink-id", sinkId), zap.Error(err)) } diff --git a/sinker/otel/orbreceiver/traces.go b/sinker/otel/orbreceiver/traces.go index 8c83381f6..af2bdbab3 100644 --- a/sinker/otel/orbreceiver/traces.go +++ b/sinker/otel/orbreceiver/traces.go @@ -6,6 +6,7 @@ package orbreceiver import ( "context" + "strconv" "strings" "github.com/mainflux/mainflux/pkg/messaging" @@ -31,6 +32,7 @@ func (r *OrbReceiver) MessageTracesInbound(msg messaging.Message) error { zap.Int64("created", msg.Created), zap.String("publisher", msg.Publisher)) r.cfg.Logger.Info("received trace message, pushing to kafka exporter") + size := len(msg.Payload) decompressedPayload := r.DecompressBrotli(msg.Payload) tr, err := r.encoder.unmarshalTracesRequest(decompressedPayload) if err != nil { @@ -47,13 +49,13 @@ func (r *OrbReceiver) MessageTracesInbound(msg messaging.Message) error { scopes := tr.Traces().ResourceSpans().At(0).ScopeSpans() for i := 0; i < scopes.Len(); i++ { - r.ProccessTracesContext(scopes.At(i), msg.Channel) + r.ProccessTracesContext(scopes.At(i), msg.Channel, size) } }() return nil } -func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel string) { +func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel string, size int) { // Extract Datasets attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids") if !ok { @@ -106,9 +108,9 @@ func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel str attributeCtx = context.WithValue(attributeCtx, "orb_tags", agentPb.OrbTags) attributeCtx = context.WithValue(attributeCtx, "agent_groups", agentPb.AgentGroupIDs) attributeCtx = context.WithValue(attributeCtx, "agent_ownerID", agentPb.OwnerID) - size := string(rune(scope.Spans().Len())) + for sinkId := range sinkIds { - err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size) + err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size)) if err != nil { r.cfg.Logger.Error("error notifying sink active, changing state, skipping sink", zap.String("sink-id", sinkId), zap.Error(err)) continue