Skip to content

Commit

Permalink
feat(logs): fixing logs and changing size source in otlp receiver.
Browse files Browse the repository at this point in the history
  • Loading branch information
Luiz Pegoraro committed Sep 27, 2023
1 parent 9470473 commit 8181afa
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 27 deletions.
6 changes: 3 additions & 3 deletions maestro/redis/consumer/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context
if err != nil && err.Error() != maestroredis.Exists {
return err
}
s.logger.Info("Reading Sinker Events", zap.String("stream", redis2.StreamSinks))
s.logger.Debug("Reading Sinker Events", zap.String("stream", redis2.StreamSinks))
for {
const activityStream = "orb.sink_activity"
const idleStream = "orb.sink_idle"
Expand All @@ -58,7 +58,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context
for _, message := range stream.Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(message.Values)
s.logger.Info("Reading message from activity stream", zap.String("message_id", message.ID),
s.logger.Debug("Reading message from activity stream", zap.String("message_id", message.ID),
zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkActivity(ctx, event)
if err != nil {
Expand All @@ -69,7 +69,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context
for _, message := range stream.Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(message.Values)
s.logger.Info("Reading message from idle stream", zap.String("message_id", message.ID),
s.logger.Debug("Reading message from idle stream", zap.String("message_id", message.ID),
zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkIdle(ctx, event)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions maestro/redis/consumer/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (ls *sinksListenerService) SubscribeSinksEvents(ctx context.Context) error
if err != nil && err.Error() != redis2.Exists {
return err
}
ls.logger.Info("Reading Sinks Events", zap.String("stream", redis2.StreamSinks))
ls.logger.Debug("Reading Sinks Events", zap.String("stream", redis2.StreamSinks))
for {
streams, err := ls.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: redis2.GroupMaestro,
Expand Down Expand Up @@ -106,7 +106,7 @@ func (ls *sinksListenerService) ReceiveMessage(ctx context.Context, msg redis.XM

// handleSinksUpdate logic moved to deployment.EventService
func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
ls.logger.Info("Received sinks UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
ls.logger.Debug("Received sinks UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
err := ls.deploymentService.HandleSinkUpdate(ctx, event)
if err != nil {
return err
Expand All @@ -117,7 +117,7 @@ func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event mae

// handleSinksDelete logic moved to deployment.EventService
func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
ls.logger.Info("Received sinks DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
ls.logger.Debug("Received sinks DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
err := ls.deploymentService.HandleSinkDelete(ctx, event)
if err != nil {
return err
Expand All @@ -127,7 +127,7 @@ func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event mae

// handleSinksCreate logic moved to deployment.EventService
func (ls *sinksListenerService) handleSinksCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
ls.logger.Info("Received sinks to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
ls.logger.Debug("Received sinks to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
err := ls.deploymentService.HandleSinkCreate(ctx, event)
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions maestro/service/deploy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr

// HandleSinkCreate will create deployment entry in postgres, will create deployment in Redis, to prepare for SinkActivity
func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
d.logger.Info("handling sink create event", zap.String("sink-id", event.SinkID))
d.logger.Debug("handling sink create event", zap.String("sink-id", event.SinkID))
// Create Deployment Entry
entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config)
// Use deploymentService, which will create deployment in both postgres and redis
Expand All @@ -49,7 +49,7 @@ func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.

func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
now := time.Now()
d.logger.Info("handling sink update event", zap.String("sink-id", event.SinkID))
d.logger.Debug("handling sink update event", zap.String("sink-id", event.SinkID))
// check if exists deployment entry from postgres
entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID)
if err != nil {
Expand All @@ -71,7 +71,7 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.
}

func (d *eventService) HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
d.logger.Info("handling sink delete event", zap.String("sink-id", event.SinkID))
d.logger.Debug("handling sink delete event", zap.String("sink-id", event.SinkID))
deploymentEntry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID)
if err != nil {
d.logger.Warn("did not find collector entry for sink", zap.String("sink-id", event.SinkID))
Expand All @@ -93,7 +93,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi
if event.State != "active" {
return errors.New("trying to deploy sink that is not active")
}
d.logger.Info("handling sink activity event", zap.String("sink-id", event.SinkID))
d.logger.Debug("handling sink activity event", zap.String("sink-id", event.SinkID))
// check if exists deployment entry from postgres
_, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID)
if err != nil {
Expand Down Expand Up @@ -121,7 +121,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi

func (d *eventService) HandleSinkIdle(ctx context.Context, event maestroredis.SinkerUpdateEvent) error {
// check if exists deployment entry from postgres
d.logger.Info("handling sink idle event", zap.String("sink-id", event.SinkID))
d.logger.Debug("handling sink idle event", zap.String("sink-id", event.SinkID))
_, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID)
if err != nil {
d.logger.Error("error trying to get deployment entry", zap.Error(err))
Expand Down
11 changes: 5 additions & 6 deletions sinker/otel/orbreceiver/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package orbreceiver

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/mainflux/mainflux/pkg/messaging"
Expand All @@ -32,6 +32,7 @@ func (r *OrbReceiver) MessageLogsInbound(msg messaging.Message) error {
zap.Int64("created", msg.Created),
zap.String("publisher", msg.Publisher))
r.cfg.Logger.Info("received log message, pushing to kafka exporter")
size := len(msg.Payload)
decompressedPayload := r.DecompressBrotli(msg.Payload)
lr, err := r.encoder.unmarshalLogsRequest(decompressedPayload)
if err != nil {
Expand All @@ -48,13 +49,13 @@ func (r *OrbReceiver) MessageLogsInbound(msg messaging.Message) error {

scopes := lr.Logs().ResourceLogs().At(0).ScopeLogs()
for i := 0; i < scopes.Len(); i++ {
r.ProccessLogsContext(scopes.At(i), msg.Channel)
r.ProccessLogsContext(scopes.At(i), msg.Channel, size)
}
}()
return nil
}

func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string) {
func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string, size int) {
// Extract Datasets
attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids")
if !ok {
Expand Down Expand Up @@ -118,15 +119,13 @@ func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string)
lr.ResourceLogs().At(0).Resource().Attributes().PutStr("service.name", agentPb.AgentName)
lr.ResourceLogs().At(0).Resource().Attributes().PutStr("service.instance.id", polID)
request := plogotlp.NewExportRequestFromLogs(lr)
sizeable, _ := request.MarshalProto()
_, err = r.exportLogs(attributeCtx, request)
if err != nil {
r.cfg.Logger.Error("error during logs export, skipping sink", zap.Error(err))
_ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, "0")
continue
} else {
size := fmt.Sprintf("%d", len(sizeable))
_ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size)
_ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size))
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions sinker/otel/orbreceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package orbreceiver

import (
"context"
"strconv"
"strings"
"time"

Expand All @@ -32,7 +33,8 @@ func (r *OrbReceiver) MessageMetricsInbound(msg messaging.Message) error {
zap.String("protocol", msg.Protocol),
zap.Int64("created", msg.Created),
zap.String("publisher", msg.Publisher))
r.cfg.Logger.Info("received metric message, pushing to kafka exporter")
r.cfg.Logger.Debug("received metric message, pushing to kafka exporter", zap.String("publisher", msg.Publisher))
size := len(msg.Payload)
decompressedPayload := r.DecompressBrotli(msg.Payload)
mr, err := r.encoder.unmarshalMetricsRequest(decompressedPayload)
if err != nil {
Expand All @@ -49,13 +51,13 @@ func (r *OrbReceiver) MessageMetricsInbound(msg messaging.Message) error {

scopes := mr.Metrics().ResourceMetrics().At(0).ScopeMetrics()
for i := 0; i < scopes.Len(); i++ {
r.ProccessMetricsContext(scopes.At(i), msg.Channel)
r.ProccessMetricsContext(scopes.At(i), msg.Channel, size)
}
}()
return nil
}

func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel string) {
func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel string, size int) {
// Extract Datasets
attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids")
if !ok {
Expand Down Expand Up @@ -110,10 +112,9 @@ func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel
attributeCtx = context.WithValue(attributeCtx, "orb_tags", agentPb.OrbTags)
attributeCtx = context.WithValue(attributeCtx, "agent_groups", agentPb.AgentGroupIDs)
attributeCtx = context.WithValue(attributeCtx, "agent_ownerID", agentPb.OwnerID)
size := string(rune(scope.Metrics().Len()))

for sinkId := range sinkIds {
err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size)
err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size))
if err != nil {
r.cfg.Logger.Error("error notifying metrics sink active, changing state, skipping sink", zap.String("sink-id", sinkId), zap.Error(err))
}
Expand Down
10 changes: 6 additions & 4 deletions sinker/otel/orbreceiver/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package orbreceiver

import (
"context"
"strconv"
"strings"

"github.com/mainflux/mainflux/pkg/messaging"
Expand All @@ -31,6 +32,7 @@ func (r *OrbReceiver) MessageTracesInbound(msg messaging.Message) error {
zap.Int64("created", msg.Created),
zap.String("publisher", msg.Publisher))
r.cfg.Logger.Info("received trace message, pushing to kafka exporter")
size := len(msg.Payload)
decompressedPayload := r.DecompressBrotli(msg.Payload)
tr, err := r.encoder.unmarshalTracesRequest(decompressedPayload)
if err != nil {
Expand All @@ -47,13 +49,13 @@ func (r *OrbReceiver) MessageTracesInbound(msg messaging.Message) error {

scopes := tr.Traces().ResourceSpans().At(0).ScopeSpans()
for i := 0; i < scopes.Len(); i++ {
r.ProccessTracesContext(scopes.At(i), msg.Channel)
r.ProccessTracesContext(scopes.At(i), msg.Channel, size)
}
}()
return nil
}

func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel string) {
func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel string, size int) {
// Extract Datasets
attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids")
if !ok {
Expand Down Expand Up @@ -106,9 +108,9 @@ func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel str
attributeCtx = context.WithValue(attributeCtx, "orb_tags", agentPb.OrbTags)
attributeCtx = context.WithValue(attributeCtx, "agent_groups", agentPb.AgentGroupIDs)
attributeCtx = context.WithValue(attributeCtx, "agent_ownerID", agentPb.OwnerID)
size := string(rune(scope.Spans().Len()))

for sinkId := range sinkIds {
err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size)
err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size))
if err != nil {
r.cfg.Logger.Error("error notifying sink active, changing state, skipping sink", zap.String("sink-id", sinkId), zap.Error(err))
continue
Expand Down

0 comments on commit 8181afa

Please sign in to comment.