diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index 5ac8a7ccd..56a3195e9 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -5,14 +5,14 @@ import ( "context" "encoding/json" "errors" + "github.com/orb-community/orb/maestro/deployment" + "github.com/orb-community/orb/maestro/redis/producer" "io" "strings" "time" - "github.com/orb-community/orb/maestro/kubecontrol" - rediscons1 "github.com/orb-community/orb/maestro/redis/consumer" - maestroconfig "github.com/orb-community/orb/maestro/config" + "github.com/orb-community/orb/maestro/kubecontrol" sinkspb "github.com/orb-community/orb/sinks/pb" "go.uber.org/zap" k8scorev1 "k8s.io/api/core/v1" @@ -27,12 +27,12 @@ const ( namespace = "otelcollectors" ) -func NewMonitorService(logger *zap.Logger, sinksClient *sinkspb.SinkServiceClient, eventStore rediscons1.Subscriber, kubecontrol *kubecontrol.Service) Service { +func NewMonitorService(logger *zap.Logger, sinksClient *sinkspb.SinkServiceClient, mp producer.Producer, kubecontrol *kubecontrol.Service) Service { return &monitorService{ - logger: logger, - sinksClient: *sinksClient, - eventStore: eventStore, - kubecontrol: *kubecontrol, + logger: logger, + sinksClient: *sinksClient, + maestroProducer: mp, + kubecontrol: *kubecontrol, } } @@ -42,10 +42,11 @@ type Service interface { } type monitorService struct { - logger *zap.Logger - sinksClient sinkspb.SinkServiceClient - eventStore rediscons1.Subscriber - kubecontrol kubecontrol.Service + logger *zap.Logger + sinksClient sinkspb.SinkServiceClient + maestroProducer producer.Producer + deploymentSvc deployment.Service + kubecontrol kubecontrol.Service } func (svc *monitorService) Start(ctx context.Context, cancelFunc context.CancelFunc) error { @@ -167,7 +168,7 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { if sink == nil { svc.logger.Warn("collector not found for sink, depleting collector", zap.String("collector name", collector.Name)) sinkId := collector.Name[5:41] - deploymentEntry, err := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkId) + deploymentEntry, err := svc.deploymentSvc.GetDeploymentByCollectorName(ctx, collector.Name) if err != nil { svc.logger.Error("did not find collector entry for sink", zap.String("sink-id", sinkId)) deploymentName := "otel-" + sinkId diff --git a/sinker/otel/bridgeservice/bridge.go b/sinker/otel/bridgeservice/bridge.go index db8053531..b1cfce1dc 100644 --- a/sinker/otel/bridgeservice/bridge.go +++ b/sinker/otel/bridgeservice/bridge.go @@ -2,9 +2,7 @@ package bridgeservice import ( "context" - "encoding/json" "fmt" - "github.com/orb-community/orb/pkg/types" sinkspb "github.com/orb-community/orb/sinks/pb" "sort" "time" @@ -68,66 +66,6 @@ func (bs *SinkerOtelBridgeService) IncrementMessageCounter(publisher, subtopic, } func (bs *SinkerOtelBridgeService) NotifyActiveSink(ctx context.Context, mfOwnerId, sinkId, newState, message string) error { - cfgRepo, err := bs.sinkerCache.Get(mfOwnerId, sinkId) - if err != nil { - bs.logger.Error("unable to retrieve the sink config", zap.Error(err)) - sinkData, _ := bs.sinksClient.RetrieveSink(ctx, &sinkspb.SinkByIDReq{ - SinkID: sinkId, - OwnerID: mfOwnerId, - }) - var metadata types.Metadata - _ = json.Unmarshal(sinkData.Config, &metadata) - cfgRepo = config.SinkConfig{ - SinkID: sinkId, - OwnerID: mfOwnerId, - Config: metadata, - State: config.Active, - Msg: "", - } - err = bs.sinkerCache.DeployCollector(ctx, cfgRepo) - if err != nil { - bs.logger.Error("error during update sink cache", zap.String("sinkId", sinkId), zap.Error(err)) - return err - } - } - - // only updates sink state if status Idle or Unknown - if cfgRepo.State == config.Idle || cfgRepo.State == config.Unknown { - cfgRepo.LastRemoteWrite = time.Now() - // only deploy collector if new state is "active" and current state "not active" - if newState == "active" && cfgRepo.State != config.Active { - err = cfgRepo.State.SetFromString(newState) - if err != nil { - bs.logger.Error("unable to set state", zap.String("new_state", newState), zap.Error(err)) - return err - } - err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId) - if err != nil { - bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err)) - return err - } - err = bs.sinkerCache.DeployCollector(ctx, cfgRepo) - if err != nil { - bs.logger.Error("error during update sink cache", zap.String("sinkId", sinkId), zap.Error(err)) - return err - } - bs.logger.Info("waking up sink to active", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State)) - } else { - err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId) - if err != nil { - bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err)) - return err - } - bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State)) - } - } else { - err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId) - if err != nil { - bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err)) - return err - } - bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State)) - } return nil } diff --git a/sinker/redis/consumer/sink_key_expire.go b/sinker/redis/consumer/sink_key_expire.go new file mode 100644 index 000000000..846a6bf2a --- /dev/null +++ b/sinker/redis/consumer/sink_key_expire.go @@ -0,0 +1,19 @@ +package consumer + +import ( + "github.com/go-redis/redis/v8" + "github.com/orb-community/orb/sinker/redis/producer" + "go.uber.org/zap" +) + +type SinkerKeyExpirationListener interface { + // Listen to the sinker key expiration + SubscribeToKeyExpiration() error + ReceiveMessage(message interface{}) error +} + +type sinkerKeyExpirationListener struct { + logger *zap.Logger + cacheRedisClient redis.Client + idleProducer producer.SinkIdleProducer +} diff --git a/sinker/redis/producer/sink_ttl.go b/sinker/redis/producer/sink_ttl.go new file mode 100644 index 000000000..bc49e9a66 --- /dev/null +++ b/sinker/redis/producer/sink_ttl.go @@ -0,0 +1,70 @@ +package producer + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "go.uber.org/zap" + "time" +) + +type SinkerKey struct { + OwnerID string + SinkID string + Size string + LastActivity time.Time +} + +func (s *SinkerKey) Encode() map[string]interface{} { + return map[string]interface{}{ + "owner_id": s.OwnerID, + "sink_id": s.SinkID, + "size": s.Size, + "last_activity": s.LastActivity.Format(time.RFC3339), + } +} + +const DefaultExpiration = 5 * time.Minute + +type SinkerKeyService interface { + // AddNewSinkerKey Add New Sinker Key with default Expiration of 5 minutes + AddNewSinkerKey(ctx context.Context, key SinkerKey) error + // RenewSinkerKey Increment Expiration of Sinker Key + RenewSinkerKey(ctx context.Context, key SinkerKey) error +} + +type sinkerKeyService struct { + logger *zap.Logger + cacheRepository redis.Client +} + +func NewSinkerKeyService(logger *zap.Logger, cacheRepository redis.Client) SinkerKeyService { + return &sinkerKeyService{logger: logger, cacheRepository: cacheRepository} +} + +// RenewSinkerKey Increment Expiration of Sinker Key +func (s *sinkerKeyService) RenewSinkerKey(ctx context.Context, key SinkerKey) error { + // If key does not exist, create new entry + cmd := s.cacheRepository.Expire(ctx, "orb.sinker", DefaultExpiration) + if cmd.Err() != nil { + s.logger.Error("error sending event to sinker event store", zap.Error(cmd.Err())) + return cmd.Err() + } + return nil +} + +func (s *sinkerKeyService) AddNewSinkerKey(ctx context.Context, sink SinkerKey) error { + // Create sinker key in redis Hashset with default expiration of 5 minutes + key := fmt.Sprintf("orb.sinker.key-%s:%s", sink.OwnerID, sink.SinkID) + cmd := s.cacheRepository.HSet(ctx, key, sink.Encode()) + if cmd.Err() != nil { + s.logger.Error("error sending event to sinker event store", zap.Error(cmd.Err())) + return cmd.Err() + } + err := s.RenewSinkerKey(ctx, sink) + if err != nil { + s.logger.Error("error setting expiration to sinker event store", zap.Error(cmd.Err())) + return cmd.Err() + } + return nil +} diff --git a/sinker/redis/producer/sinker_activity.go b/sinker/redis/producer/sinker_activity.go new file mode 100644 index 000000000..fd617ab04 --- /dev/null +++ b/sinker/redis/producer/sinker_activity.go @@ -0,0 +1,58 @@ +package producer + +import ( + "context" + "github.com/go-redis/redis/v8" + "go.uber.org/zap" + "time" +) + +type SinkActivityProducer interface { + // PublishSinkActivity to be used to publish the sink activity to the sinker, mainly used by Otel Bridge Service + PublishSinkActivity(ctx context.Context, event SinkActivityEvent) error +} + +type SinkActivityEvent struct { + OwnerID string + SinkID string + State string + Size string + Timestamp time.Time +} + +func (s *SinkActivityEvent) Encode() map[string]interface{} { + return map[string]interface{}{ + "owner_id": s.OwnerID, + "sink_id": s.SinkID, + "state": s.State, + "size": s.Size, + "timestamp": s.Timestamp.Format(time.RFC3339), + } +} + +var _ SinkActivityProducer = (*sinkActivityProducer)(nil) + +type sinkActivityProducer struct { + logger *zap.Logger + redisStreamClient redis.Client +} + +func NewSinkActivityProducer(logger *zap.Logger, redisStreamClient redis.Client) SinkActivityProducer { + return &sinkActivityProducer{logger: logger, redisStreamClient: redisStreamClient} +} + +// PublishSinkActivity BridgeService will notify stream of sink activity +func (sp *sinkActivityProducer) PublishSinkActivity(ctx context.Context, event SinkActivityEvent) error { + const maxLen = 1000 + record := &redis.XAddArgs{ + Stream: "orb.sink_activity", + Values: event.Encode(), + MaxLen: maxLen, + Approx: true, + } + err := sp.redisStreamClient.XAdd(ctx, record).Err() + if err != nil { + sp.logger.Error("error sending event to sinker event store", zap.Error(err)) + } + return err +} diff --git a/sinker/redis/producer/sinker_idle.go b/sinker/redis/producer/sinker_idle.go new file mode 100644 index 000000000..10253c179 --- /dev/null +++ b/sinker/redis/producer/sinker_idle.go @@ -0,0 +1,37 @@ +package producer + +import ( + "context" + "github.com/go-redis/redis/v8" + "time" +) + +type SinkIdleEvent struct { + OwnerID string + SinkID string + State string + Size string + Timestamp time.Time +} + +func (s *SinkIdleEvent) Encode() map[string]interface{} { + return map[string]interface{}{ + "owner_id": s.OwnerID, + "sink_id": s.SinkID, + "state": s.State, + "size": s.Size, + "timestamp": s.Timestamp.Format(time.RFC3339), + } +} + +type SinkIdleProducer interface { + // PublishSinkIdle to be used to publish the sink activity to the sinker, mainly used by Otel Bridge Service + PublishSinkIdle(ctx context.Context, event SinkIdleEvent) error +} + +var _ SinkIdleProducer = (*sinkIdleProducer)(nil) + +type sinkIdleProducer struct { + logger *zap.Logger + redisStreamClient redis.Client +}