Skip to content

Commit

Permalink
feat(sinker): WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Luiz Pegoraro committed Sep 19, 2023
1 parent 6759d35 commit 1e5ece6
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 75 deletions.
27 changes: 14 additions & 13 deletions maestro/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"context"
"encoding/json"
"errors"
"github.com/orb-community/orb/maestro/deployment"
"github.com/orb-community/orb/maestro/redis/producer"
"io"
"strings"
"time"

"github.com/orb-community/orb/maestro/kubecontrol"
rediscons1 "github.com/orb-community/orb/maestro/redis/consumer"

maestroconfig "github.com/orb-community/orb/maestro/config"
"github.com/orb-community/orb/maestro/kubecontrol"
sinkspb "github.com/orb-community/orb/sinks/pb"
"go.uber.org/zap"
k8scorev1 "k8s.io/api/core/v1"
Expand All @@ -27,12 +27,12 @@ const (
namespace = "otelcollectors"
)

func NewMonitorService(logger *zap.Logger, sinksClient *sinkspb.SinkServiceClient, eventStore rediscons1.Subscriber, kubecontrol *kubecontrol.Service) Service {
func NewMonitorService(logger *zap.Logger, sinksClient *sinkspb.SinkServiceClient, mp producer.Producer, kubecontrol *kubecontrol.Service) Service {
return &monitorService{
logger: logger,
sinksClient: *sinksClient,
eventStore: eventStore,
kubecontrol: *kubecontrol,
logger: logger,
sinksClient: *sinksClient,
maestroProducer: mp,
kubecontrol: *kubecontrol,
}
}

Expand All @@ -42,10 +42,11 @@ type Service interface {
}

type monitorService struct {
logger *zap.Logger
sinksClient sinkspb.SinkServiceClient
eventStore rediscons1.Subscriber
kubecontrol kubecontrol.Service
logger *zap.Logger
sinksClient sinkspb.SinkServiceClient
maestroProducer producer.Producer
deploymentSvc deployment.Service
kubecontrol kubecontrol.Service
}

func (svc *monitorService) Start(ctx context.Context, cancelFunc context.CancelFunc) error {
Expand Down Expand Up @@ -167,7 +168,7 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
if sink == nil {
svc.logger.Warn("collector not found for sink, depleting collector", zap.String("collector name", collector.Name))
sinkId := collector.Name[5:41]
deploymentEntry, err := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkId)
deploymentEntry, err := svc.deploymentSvc.GetDeploymentByCollectorName(ctx, collector.Name)
if err != nil {
svc.logger.Error("did not find collector entry for sink", zap.String("sink-id", sinkId))
deploymentName := "otel-" + sinkId
Expand Down
62 changes: 0 additions & 62 deletions sinker/otel/bridgeservice/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package bridgeservice

import (
"context"
"encoding/json"
"fmt"
"github.com/orb-community/orb/pkg/types"
sinkspb "github.com/orb-community/orb/sinks/pb"
"sort"
"time"
Expand Down Expand Up @@ -68,66 +66,6 @@ func (bs *SinkerOtelBridgeService) IncrementMessageCounter(publisher, subtopic,
}

func (bs *SinkerOtelBridgeService) NotifyActiveSink(ctx context.Context, mfOwnerId, sinkId, newState, message string) error {
cfgRepo, err := bs.sinkerCache.Get(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("unable to retrieve the sink config", zap.Error(err))
sinkData, _ := bs.sinksClient.RetrieveSink(ctx, &sinkspb.SinkByIDReq{
SinkID: sinkId,
OwnerID: mfOwnerId,
})
var metadata types.Metadata
_ = json.Unmarshal(sinkData.Config, &metadata)
cfgRepo = config.SinkConfig{
SinkID: sinkId,
OwnerID: mfOwnerId,
Config: metadata,
State: config.Active,
Msg: "",
}
err = bs.sinkerCache.DeployCollector(ctx, cfgRepo)
if err != nil {
bs.logger.Error("error during update sink cache", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
}

// only updates sink state if status Idle or Unknown
if cfgRepo.State == config.Idle || cfgRepo.State == config.Unknown {
cfgRepo.LastRemoteWrite = time.Now()
// only deploy collector if new state is "active" and current state "not active"
if newState == "active" && cfgRepo.State != config.Active {
err = cfgRepo.State.SetFromString(newState)
if err != nil {
bs.logger.Error("unable to set state", zap.String("new_state", newState), zap.Error(err))
return err
}
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
err = bs.sinkerCache.DeployCollector(ctx, cfgRepo)
if err != nil {
bs.logger.Error("error during update sink cache", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("waking up sink to active", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
} else {
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
}
} else {
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
}

return nil
}
Expand Down
19 changes: 19 additions & 0 deletions sinker/redis/consumer/sink_key_expire.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package consumer

import (
"github.com/go-redis/redis/v8"
"github.com/orb-community/orb/sinker/redis/producer"
"go.uber.org/zap"
)

type SinkerKeyExpirationListener interface {
// Listen to the sinker key expiration
SubscribeToKeyExpiration() error
ReceiveMessage(message interface{}) error
}

type sinkerKeyExpirationListener struct {
logger *zap.Logger
cacheRedisClient redis.Client
idleProducer producer.SinkIdleProducer
}
70 changes: 70 additions & 0 deletions sinker/redis/producer/sink_ttl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package producer

import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
"time"
)

type SinkerKey struct {
OwnerID string
SinkID string
Size string
LastActivity time.Time
}

func (s *SinkerKey) Encode() map[string]interface{} {
return map[string]interface{}{
"owner_id": s.OwnerID,
"sink_id": s.SinkID,
"size": s.Size,
"last_activity": s.LastActivity.Format(time.RFC3339),
}
}

const DefaultExpiration = 5 * time.Minute

type SinkerKeyService interface {
// AddNewSinkerKey Add New Sinker Key with default Expiration of 5 minutes
AddNewSinkerKey(ctx context.Context, key SinkerKey) error
// RenewSinkerKey Increment Expiration of Sinker Key
RenewSinkerKey(ctx context.Context, key SinkerKey) error
}

type sinkerKeyService struct {
logger *zap.Logger
cacheRepository redis.Client
}

func NewSinkerKeyService(logger *zap.Logger, cacheRepository redis.Client) SinkerKeyService {
return &sinkerKeyService{logger: logger, cacheRepository: cacheRepository}
}

// RenewSinkerKey Increment Expiration of Sinker Key
func (s *sinkerKeyService) RenewSinkerKey(ctx context.Context, key SinkerKey) error {
// If key does not exist, create new entry
cmd := s.cacheRepository.Expire(ctx, "orb.sinker", DefaultExpiration)
if cmd.Err() != nil {
s.logger.Error("error sending event to sinker event store", zap.Error(cmd.Err()))
return cmd.Err()
}
return nil
}

func (s *sinkerKeyService) AddNewSinkerKey(ctx context.Context, sink SinkerKey) error {
// Create sinker key in redis Hashset with default expiration of 5 minutes
key := fmt.Sprintf("orb.sinker.key-%s:%s", sink.OwnerID, sink.SinkID)
cmd := s.cacheRepository.HSet(ctx, key, sink.Encode())
if cmd.Err() != nil {
s.logger.Error("error sending event to sinker event store", zap.Error(cmd.Err()))
return cmd.Err()
}
err := s.RenewSinkerKey(ctx, sink)
if err != nil {
s.logger.Error("error setting expiration to sinker event store", zap.Error(cmd.Err()))
return cmd.Err()
}
return nil
}
58 changes: 58 additions & 0 deletions sinker/redis/producer/sinker_activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package producer

import (
"context"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
"time"
)

type SinkActivityProducer interface {
// PublishSinkActivity to be used to publish the sink activity to the sinker, mainly used by Otel Bridge Service
PublishSinkActivity(ctx context.Context, event SinkActivityEvent) error
}

type SinkActivityEvent struct {
OwnerID string
SinkID string
State string
Size string
Timestamp time.Time
}

func (s *SinkActivityEvent) Encode() map[string]interface{} {
return map[string]interface{}{
"owner_id": s.OwnerID,
"sink_id": s.SinkID,
"state": s.State,
"size": s.Size,
"timestamp": s.Timestamp.Format(time.RFC3339),
}
}

var _ SinkActivityProducer = (*sinkActivityProducer)(nil)

type sinkActivityProducer struct {
logger *zap.Logger
redisStreamClient redis.Client
}

func NewSinkActivityProducer(logger *zap.Logger, redisStreamClient redis.Client) SinkActivityProducer {
return &sinkActivityProducer{logger: logger, redisStreamClient: redisStreamClient}
}

// PublishSinkActivity BridgeService will notify stream of sink activity
func (sp *sinkActivityProducer) PublishSinkActivity(ctx context.Context, event SinkActivityEvent) error {
const maxLen = 1000
record := &redis.XAddArgs{
Stream: "orb.sink_activity",
Values: event.Encode(),
MaxLen: maxLen,
Approx: true,
}
err := sp.redisStreamClient.XAdd(ctx, record).Err()
if err != nil {
sp.logger.Error("error sending event to sinker event store", zap.Error(err))
}
return err
}
37 changes: 37 additions & 0 deletions sinker/redis/producer/sinker_idle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package producer

import (
"context"
"github.com/go-redis/redis/v8"
"time"
)

type SinkIdleEvent struct {
OwnerID string
SinkID string
State string
Size string
Timestamp time.Time
}

func (s *SinkIdleEvent) Encode() map[string]interface{} {
return map[string]interface{}{
"owner_id": s.OwnerID,
"sink_id": s.SinkID,
"state": s.State,
"size": s.Size,
"timestamp": s.Timestamp.Format(time.RFC3339),
}
}

type SinkIdleProducer interface {
// PublishSinkIdle to be used to publish the sink activity to the sinker, mainly used by Otel Bridge Service
PublishSinkIdle(ctx context.Context, event SinkIdleEvent) error
}

var _ SinkIdleProducer = (*sinkIdleProducer)(nil)

type sinkIdleProducer struct {
logger *zap.Logger
redisStreamClient redis.Client
}

0 comments on commit 1e5ece6

Please sign in to comment.