Skip to content

Commit

Permalink
Merge pull request #2662 from lpegoraro/eng-1118-maestro-deployment-c…
Browse files Browse the repository at this point in the history
…ache-postgresql

feat(maestro): add observability and logs to new flow on maestro.
  • Loading branch information
lpegoraro authored Sep 27, 2023
2 parents 4571cf6 + 8181afa commit 47eeaf7
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 23 deletions.
7 changes: 6 additions & 1 deletion maestro/redis/consumer/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/go-redis/redis/v8"
maestroredis "github.com/orb-community/orb/maestro/redis"
"github.com/orb-community/orb/maestro/service"
redis2 "github.com/orb-community/orb/sinks/redis"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -39,7 +40,7 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context
if err != nil && err.Error() != maestroredis.Exists {
return err
}

s.logger.Debug("Reading Sinker Events", zap.String("stream", redis2.StreamSinks))
for {
const activityStream = "orb.sink_activity"
const idleStream = "orb.sink_idle"
Expand All @@ -57,6 +58,8 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context
for _, message := range stream.Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(message.Values)
s.logger.Debug("Reading message from activity stream", zap.String("message_id", message.ID),
zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkActivity(ctx, event)
if err != nil {
s.logger.Error("error receiving message", zap.Error(err))
Expand All @@ -66,6 +69,8 @@ func (s *sinkerActivityListenerService) SubscribeSinksEvents(ctx context.Context
for _, message := range stream.Messages {
event := maestroredis.SinkerUpdateEvent{}
event.Decode(message.Values)
s.logger.Debug("Reading message from idle stream", zap.String("message_id", message.ID),
zap.String("sink_id", event.SinkID), zap.String("owner_id", event.OwnerID))
err := s.eventService.HandleSinkIdle(ctx, event)
if err != nil {
s.logger.Error("error receiving message", zap.Error(err))
Expand Down
10 changes: 5 additions & 5 deletions maestro/redis/consumer/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (ls *sinksListenerService) SubscribeSinksEvents(ctx context.Context) error
if err != nil && err.Error() != redis2.Exists {
return err
}

ls.logger.Debug("Reading Sinks Events", zap.String("stream", redis2.StreamSinks))
for {
streams, err := ls.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: redis2.GroupMaestro,
Expand All @@ -62,7 +62,7 @@ func (ls *sinksListenerService) SubscribeSinksEvents(ctx context.Context) error
}

func (ls *sinksListenerService) ReceiveMessage(ctx context.Context, msg redis.XMessage) error {
logger := ls.logger.With(zap.String("maestro_sinks_listener_msg", msg.ID))
logger := ls.logger.Named("sinks_listener:" + msg.ID)
event := msg.Values
rte, err := redis2.DecodeSinksEvent(event, event["operation"].(string))
if err != nil {
Expand Down Expand Up @@ -106,7 +106,7 @@ func (ls *sinksListenerService) ReceiveMessage(ctx context.Context, msg redis.XM

// handleSinksUpdate logic moved to deployment.EventService
func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
ls.logger.Info("Received maestro UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
ls.logger.Debug("Received sinks UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
err := ls.deploymentService.HandleSinkUpdate(ctx, event)
if err != nil {
return err
Expand All @@ -117,7 +117,7 @@ func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event mae

// handleSinksDelete logic moved to deployment.EventService
func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
ls.logger.Info("Received maestro DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
ls.logger.Debug("Received sinks DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
err := ls.deploymentService.HandleSinkDelete(ctx, event)
if err != nil {
return err
Expand All @@ -127,7 +127,7 @@ func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event mae

// handleSinksCreate logic moved to deployment.EventService
func (ls *sinksListenerService) handleSinksCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
ls.logger.Info("Received event to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
ls.logger.Debug("Received sinks to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
err := ls.deploymentService.HandleSinkCreate(ctx, event)
if err != nil {
return err
Expand Down
17 changes: 15 additions & 2 deletions maestro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package maestro

import (
"context"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/go-redis/redis/v8"
"github.com/jmoiron/sqlx"
"github.com/orb-community/orb/maestro/deployment"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/orb-community/orb/maestro/service"
"github.com/orb-community/orb/pkg/config"
sinkspb "github.com/orb-community/orb/sinks/pb"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -52,6 +54,19 @@ func NewMaestroService(logger *zap.Logger, streamRedisClient *redis.Client, sink
ps := producer.NewMaestroProducer(logger, streamRedisClient)
monitorService := monitor.NewMonitorService(logger, &sinksGrpcClient, ps, &kubectr)
eventService := service.NewEventService(logger, deploymentService, kubectr)
eventService = service.NewTracingService(logger, eventService,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "maestro",
Subsystem: "comms",
Name: "message_count",
Help: "Number of messages received.",
}, []string{"method", "sink_id", "owner_id"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "maestro",
Subsystem: "comms",
Name: "message_latency_microseconds",
Help: "Total duration of messages processed in microseconds.",
}, []string{"method", "sink_id", "owner_id"}))
sinkListenerService := rediscons1.NewSinksListenerController(logger, eventService, sinkerRedisClient, sinksGrpcClient)
activityListener := rediscons1.NewSinkerActivityListener(logger, eventService, sinkerRedisClient)
return &maestroService{
Expand Down Expand Up @@ -100,7 +115,6 @@ func (svc *maestroService) Stop() {
func (svc *maestroService) subscribeToSinksEvents(ctx context.Context) {
if err := svc.sinkListenerService.SubscribeSinksEvents(ctx); err != nil {
svc.logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err))
return
}
svc.logger.Info("finished reading sinks events")
ctx.Done()
Expand All @@ -109,7 +123,6 @@ func (svc *maestroService) subscribeToSinksEvents(ctx context.Context) {
func (svc *maestroService) subscribeToSinkerEvents(ctx context.Context) {
if err := svc.activityListener.SubscribeSinksEvents(ctx); err != nil {
svc.logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err))
return
}
svc.logger.Info("finished reading sinker events")
ctx.Done()
Expand Down
5 changes: 5 additions & 0 deletions maestro/service/deploy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr

// HandleSinkCreate will create deployment entry in postgres, will create deployment in Redis, to prepare for SinkActivity
func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
d.logger.Debug("handling sink create event", zap.String("sink-id", event.SinkID))
// Create Deployment Entry
entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config)
// Use deploymentService, which will create deployment in both postgres and redis
Expand All @@ -48,6 +49,7 @@ func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.

func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
now := time.Now()
d.logger.Debug("handling sink update event", zap.String("sink-id", event.SinkID))
// check if exists deployment entry from postgres
entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID)
if err != nil {
Expand All @@ -69,6 +71,7 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.
}

func (d *eventService) HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
d.logger.Debug("handling sink delete event", zap.String("sink-id", event.SinkID))
deploymentEntry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID)
if err != nil {
d.logger.Warn("did not find collector entry for sink", zap.String("sink-id", event.SinkID))
Expand All @@ -90,6 +93,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi
if event.State != "active" {
return errors.New("trying to deploy sink that is not active")
}
d.logger.Debug("handling sink activity event", zap.String("sink-id", event.SinkID))
// check if exists deployment entry from postgres
_, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID)
if err != nil {
Expand Down Expand Up @@ -117,6 +121,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi

func (d *eventService) HandleSinkIdle(ctx context.Context, event maestroredis.SinkerUpdateEvent) error {
// check if exists deployment entry from postgres
d.logger.Debug("handling sink idle event", zap.String("sink-id", event.SinkID))
_, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID)
if err != nil {
d.logger.Error("error trying to get deployment entry", zap.Error(err))
Expand Down
85 changes: 85 additions & 0 deletions maestro/service/metrics_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package service

import (
"context"
"github.com/go-kit/kit/metrics"
maestroredis "github.com/orb-community/orb/maestro/redis"
"go.uber.org/zap"
"time"
)

type tracingService struct {
logger *zap.Logger
counter metrics.Counter
latency metrics.Histogram
nextService EventService
}

func NewTracingService(logger *zap.Logger, service EventService, counter metrics.Counter, latency metrics.Histogram) EventService {
return &tracingService{logger: logger, nextService: service, counter: counter, latency: latency}
}

func (t *tracingService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
defer func(begun time.Time) {
labels := []string{
"method", "HandleSinkCreate",
"sink_id", event.SinkID,
"owner_id", event.Owner,
}
t.counter.With(labels...).Add(1)
t.latency.With(labels...).Observe(float64(time.Since(begun).Microseconds()))
}(time.Now())
return t.nextService.HandleSinkCreate(ctx, event)
}

func (t *tracingService) HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
defer func(begun time.Time) {
labels := []string{
"method", "HandleSinkCreate",
"sink_id", event.SinkID,
"owner_id", event.Owner,
}
t.counter.With(labels...).Add(1)
t.latency.With(labels...).Observe(float64(time.Since(begun).Microseconds()))
}(time.Now())
return t.nextService.HandleSinkUpdate(ctx, event)
}

func (t *tracingService) HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
defer func(begun time.Time) {
labels := []string{
"method", "HandleSinkCreate",
"sink_id", event.SinkID,
"owner_id", event.Owner,
}
t.counter.With(labels...).Add(1)
t.latency.With(labels...).Observe(float64(time.Since(begun).Microseconds()))
}(time.Now())
return t.nextService.HandleSinkDelete(ctx, event)
}

func (t *tracingService) HandleSinkActivity(ctx context.Context, event maestroredis.SinkerUpdateEvent) error {
defer func(begun time.Time) {
labels := []string{
"method", "HandleSinkCreate",
"sink_id", event.SinkID,
"owner_id", event.OwnerID,
}
t.counter.With(labels...).Add(1)
t.latency.With(labels...).Observe(float64(time.Since(begun).Microseconds()))
}(time.Now())
return t.nextService.HandleSinkActivity(ctx, event)
}

func (t *tracingService) HandleSinkIdle(ctx context.Context, event maestroredis.SinkerUpdateEvent) error {
defer func(begun time.Time) {
labels := []string{
"method", "HandleSinkCreate",
"sink_id", event.SinkID,
"owner_id", event.OwnerID,
}
t.counter.With(labels...).Add(1)
t.latency.With(labels...).Observe(float64(time.Since(begun).Microseconds()))
}(time.Now())
return t.nextService.HandleSinkIdle(ctx, event)
}
11 changes: 5 additions & 6 deletions sinker/otel/orbreceiver/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package orbreceiver

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/mainflux/mainflux/pkg/messaging"
Expand All @@ -32,6 +32,7 @@ func (r *OrbReceiver) MessageLogsInbound(msg messaging.Message) error {
zap.Int64("created", msg.Created),
zap.String("publisher", msg.Publisher))
r.cfg.Logger.Info("received log message, pushing to kafka exporter")
size := len(msg.Payload)
decompressedPayload := r.DecompressBrotli(msg.Payload)
lr, err := r.encoder.unmarshalLogsRequest(decompressedPayload)
if err != nil {
Expand All @@ -48,13 +49,13 @@ func (r *OrbReceiver) MessageLogsInbound(msg messaging.Message) error {

scopes := lr.Logs().ResourceLogs().At(0).ScopeLogs()
for i := 0; i < scopes.Len(); i++ {
r.ProccessLogsContext(scopes.At(i), msg.Channel)
r.ProccessLogsContext(scopes.At(i), msg.Channel, size)
}
}()
return nil
}

func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string) {
func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string, size int) {
// Extract Datasets
attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids")
if !ok {
Expand Down Expand Up @@ -118,15 +119,13 @@ func (r *OrbReceiver) ProccessLogsContext(scope plog.ScopeLogs, channel string)
lr.ResourceLogs().At(0).Resource().Attributes().PutStr("service.name", agentPb.AgentName)
lr.ResourceLogs().At(0).Resource().Attributes().PutStr("service.instance.id", polID)
request := plogotlp.NewExportRequestFromLogs(lr)
sizeable, _ := request.MarshalProto()
_, err = r.exportLogs(attributeCtx, request)
if err != nil {
r.cfg.Logger.Error("error during logs export, skipping sink", zap.Error(err))
_ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, "0")
continue
} else {
size := fmt.Sprintf("%d", len(sizeable))
_ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size)
_ = r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size))
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions sinker/otel/orbreceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package orbreceiver

import (
"context"
"strconv"
"strings"
"time"

Expand All @@ -32,7 +33,8 @@ func (r *OrbReceiver) MessageMetricsInbound(msg messaging.Message) error {
zap.String("protocol", msg.Protocol),
zap.Int64("created", msg.Created),
zap.String("publisher", msg.Publisher))
r.cfg.Logger.Info("received metric message, pushing to kafka exporter")
r.cfg.Logger.Debug("received metric message, pushing to kafka exporter", zap.String("publisher", msg.Publisher))
size := len(msg.Payload)
decompressedPayload := r.DecompressBrotli(msg.Payload)
mr, err := r.encoder.unmarshalMetricsRequest(decompressedPayload)
if err != nil {
Expand All @@ -49,13 +51,13 @@ func (r *OrbReceiver) MessageMetricsInbound(msg messaging.Message) error {

scopes := mr.Metrics().ResourceMetrics().At(0).ScopeMetrics()
for i := 0; i < scopes.Len(); i++ {
r.ProccessMetricsContext(scopes.At(i), msg.Channel)
r.ProccessMetricsContext(scopes.At(i), msg.Channel, size)
}
}()
return nil
}

func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel string) {
func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel string, size int) {
// Extract Datasets
attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids")
if !ok {
Expand Down Expand Up @@ -110,10 +112,9 @@ func (r *OrbReceiver) ProccessMetricsContext(scope pmetric.ScopeMetrics, channel
attributeCtx = context.WithValue(attributeCtx, "orb_tags", agentPb.OrbTags)
attributeCtx = context.WithValue(attributeCtx, "agent_groups", agentPb.AgentGroupIDs)
attributeCtx = context.WithValue(attributeCtx, "agent_ownerID", agentPb.OwnerID)
size := string(rune(scope.Metrics().Len()))

for sinkId := range sinkIds {
err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size)
err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size))
if err != nil {
r.cfg.Logger.Error("error notifying metrics sink active, changing state, skipping sink", zap.String("sink-id", sinkId), zap.Error(err))
}
Expand Down
10 changes: 6 additions & 4 deletions sinker/otel/orbreceiver/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package orbreceiver

import (
"context"
"strconv"
"strings"

"github.com/mainflux/mainflux/pkg/messaging"
Expand All @@ -31,6 +32,7 @@ func (r *OrbReceiver) MessageTracesInbound(msg messaging.Message) error {
zap.Int64("created", msg.Created),
zap.String("publisher", msg.Publisher))
r.cfg.Logger.Info("received trace message, pushing to kafka exporter")
size := len(msg.Payload)
decompressedPayload := r.DecompressBrotli(msg.Payload)
tr, err := r.encoder.unmarshalTracesRequest(decompressedPayload)
if err != nil {
Expand All @@ -47,13 +49,13 @@ func (r *OrbReceiver) MessageTracesInbound(msg messaging.Message) error {

scopes := tr.Traces().ResourceSpans().At(0).ScopeSpans()
for i := 0; i < scopes.Len(); i++ {
r.ProccessTracesContext(scopes.At(i), msg.Channel)
r.ProccessTracesContext(scopes.At(i), msg.Channel, size)
}
}()
return nil
}

func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel string) {
func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel string, size int) {
// Extract Datasets
attrDataset, ok := scope.Scope().Attributes().Get("dataset_ids")
if !ok {
Expand Down Expand Up @@ -106,9 +108,9 @@ func (r *OrbReceiver) ProccessTracesContext(scope ptrace.ScopeSpans, channel str
attributeCtx = context.WithValue(attributeCtx, "orb_tags", agentPb.OrbTags)
attributeCtx = context.WithValue(attributeCtx, "agent_groups", agentPb.AgentGroupIDs)
attributeCtx = context.WithValue(attributeCtx, "agent_ownerID", agentPb.OwnerID)
size := string(rune(scope.Spans().Len()))

for sinkId := range sinkIds {
err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, size)
err := r.cfg.SinkerService.NotifyActiveSink(r.ctx, agentPb.OwnerID, sinkId, strconv.Itoa(size))
if err != nil {
r.cfg.Logger.Error("error notifying sink active, changing state, skipping sink", zap.String("sink-id", sinkId), zap.Error(err))
continue
Expand Down

0 comments on commit 47eeaf7

Please sign in to comment.