diff --git a/maestro/deployment/deploy_service.go b/maestro/deployment/deploy_service.go deleted file mode 100644 index 13e34f4b7..000000000 --- a/maestro/deployment/deploy_service.go +++ /dev/null @@ -1,103 +0,0 @@ -package deployment - -import ( - "context" - "github.com/orb-community/orb/maestro/kubecontrol" - maestroredis "github.com/orb-community/orb/maestro/redis" - "go.uber.org/zap" - "time" -) - -type DeployService interface { - HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error - HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error - HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error - HandleSinkActivity(ctx context.Context, event maestroredis.SinksUpdateEvent) error -} - -type deployService struct { - logger *zap.Logger - deploymentService Service - // Configuration for KafkaURL from Orb Deployment - kafkaUrl string -} - -var _ DeployService = (*deployService)(nil) - -func NewDeployService(logger *zap.Logger, service Service, kubecontrol kubecontrol.Service) DeployService { - namedLogger := logger.Named("deploy-service") - return &deployService{logger: namedLogger, deploymentService: service} -} - -// HandleSinkCreate will create deployment entry in postgres, will create deployment in Redis, to prepare for SinkActivity -func (d *deployService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - now := time.Now() - // Create Deployment Entry - entry := Deployment{ - OwnerID: event.Owner, - SinkID: event.SinkID, - Config: event.Config, - Backend: event.Backend, - LastStatus: "provisioning", - LastStatusUpdate: &now, - LastErrorMessage: "", - LastErrorTime: nil, - CollectorName: "", - LastCollectorDeployTime: nil, - LastCollectorStopTime: nil, - } - // Use deploymentService, which will create deployment in both postgres and redis - err := d.deploymentService.CreateDeployment(ctx, &entry) - if err != nil { - d.logger.Error("error trying to create deployment entry", zap.Error(err)) - return err - } - return nil -} - -func (d *deployService) HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - now := time.Now() - // check if exists deployment entry from postgres - entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) - if err != nil { - d.logger.Error("error trying to get deployment entry", zap.Error(err)) - return err - } - // async update sink status to provisioning - go func() { - _ = d.deploymentService.UpdateStatus(ctx, event.Owner, event.SinkID, "provisioning", "") - }() - // update deployment entry in postgres - entry.Config = event.Config - entry.LastCollectorStopTime = &now - entry.LastStatus = "provisioning" - entry.LastStatusUpdate = &now - err = d.deploymentService.UpdateDeployment(ctx, entry) - - return nil -} - -func (d *deployService) HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - deploymentEntry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) - if err != nil { - d.logger.Warn("did not find collector entry for sink", zap.String("sink-id", event.SinkID)) - return err - } - if deploymentEntry.LastCollectorDeployTime != nil || deploymentEntry.LastCollectorDeployTime.Before(time.Now()) { - if deploymentEntry.LastCollectorStopTime != nil || deploymentEntry.LastCollectorStopTime.Before(time.Now()) { - d.logger.Warn("collector is not running, skipping") - } else { - // - } - } - err = d.deploymentService.RemoveDeployment(ctx, event.Owner, event.SinkID) - if err != nil { - return err - } - return nil -} - -func (d *deployService) HandleSinkActivity(ctx context.Context, event maestroredis.SinksUpdateEvent) error { - //TODO implement me - panic("implement me") -} diff --git a/maestro/deployment/service.go b/maestro/deployment/service.go index 16aa90758..f7e731418 100644 --- a/maestro/deployment/service.go +++ b/maestro/deployment/service.go @@ -28,7 +28,7 @@ type Service interface { // GetDeploymentByCollectorName to be used to get the deployment information for creating the collector or monitoring the collector GetDeploymentByCollectorName(ctx context.Context, collectorName string) (*Deployment, error) // NotifyCollector add collector information to deployment - NotifyCollector(ctx context.Context, ownerID string, sinkId string, collectorName string, operation string, status string, errorMessage string) error + NotifyCollector(ctx context.Context, ownerID string, sinkId string, operation string, status string, errorMessage string) (string, error) } type deploymentService struct { @@ -66,7 +66,7 @@ func (d *deploymentService) CreateDeployment(ctx context.Context, deployment *De } d.logger.Info("added deployment", zap.String("id", added.Id), zap.String("ownerID", added.OwnerID), zap.String("sinkID", added.SinkID)) - err = d.maestroProducer.PublishSinkStatus(added.OwnerID, added.SinkID, "unknown", "") + err = d.maestroProducer.PublishSinkStatus(ctx, added.OwnerID, added.SinkID, "unknown", "") if err != nil { return err } @@ -108,15 +108,22 @@ func (d *deploymentService) GetDeployment(ctx context.Context, ownerID string, s // UpdateDeployment will stop the running collector if any, and change the deployment, it will not spin the collector back up, // it will wait for the next sink.activity func (d *deploymentService) UpdateDeployment(ctx context.Context, deployment *Deployment) error { + now := time.Now() got, err := d.dbRepository.FindByOwnerAndSink(ctx, deployment.OwnerID, deployment.SinkID) if err != nil { return errors.New("could not find deployment to update") } + // Spin down the collector if it is running + err = d.kubecontrol.KillOtelCollector(ctx, got.OwnerID, got.SinkID) + if err != nil { + d.logger.Warn("could not stop running collector, will try to update anyway", zap.Error(err)) + } err = deployment.Merge(*got) if err != nil { d.logger.Error("error during merge of deployments", zap.Error(err)) return err } + deployment.LastCollectorStopTime = &now if deployment == nil { return errors.New("deployment is nil") } @@ -129,17 +136,35 @@ func (d *deploymentService) UpdateDeployment(ctx context.Context, deployment *De return nil } -func (d *deploymentService) NotifyCollector(ctx context.Context, ownerID string, sinkId string, collectorName string, operation string, status string, errorMessage string) error { +func (d *deploymentService) NotifyCollector(ctx context.Context, ownerID string, sinkId string, operation string, status string, errorMessage string) (string, error) { got, err := d.dbRepository.FindByOwnerAndSink(ctx, ownerID, sinkId) if err != nil { - return errors.New("could not find deployment to update") + return "", errors.New("could not find deployment to update") } now := time.Now() - got.CollectorName = collectorName if operation == "delete" { got.LastCollectorStopTime = &now + err = d.kubecontrol.KillOtelCollector(ctx, got.OwnerID, got.SinkID) + if err != nil { + d.logger.Warn("could not stop running collector, will try to update anyway", zap.Error(err)) + } } else if operation == "deploy" { - got.LastCollectorDeployTime = &now + // Spin up the collector + if got.LastCollectorDeployTime != nil || got.LastCollectorDeployTime.Before(now) { + if got.LastCollectorStopTime != nil || got.LastCollectorStopTime.Before(now) { + d.logger.Debug("collector is not running deploying") + manifest, err := d.configBuilder.BuildDeploymentConfig(got) + if err != nil { + d.logger.Error("error during build deployment config", zap.Error(err)) + return "", err + } + got.CollectorName, err = d.kubecontrol.CreateOtelCollector(ctx, got.OwnerID, got.SinkID, manifest) + got.LastCollectorDeployTime = &now + } else { + d.logger.Info("collector is already running") + } + } + } if status != "" { got.LastStatus = status @@ -151,13 +176,13 @@ func (d *deploymentService) NotifyCollector(ctx context.Context, ownerID string, } updated, err := d.dbRepository.Update(ctx, got) if err != nil { - return err + return "", err } d.logger.Info("updated deployment information for collector and status or error", zap.String("ownerID", updated.OwnerID), zap.String("sinkID", updated.SinkID), zap.String("collectorName", updated.CollectorName), zap.String("status", updated.LastStatus), zap.String("errorMessage", updated.LastErrorMessage)) - return nil + return updated.CollectorName, nil } // UpdateStatus this will change the status in postgres and notify sinks service to show new status to user diff --git a/maestro/kubecontrol/kubecontrol.go b/maestro/kubecontrol/kubecontrol.go index 28acb9577..feb654914 100644 --- a/maestro/kubecontrol/kubecontrol.go +++ b/maestro/kubecontrol/kubecontrol.go @@ -14,7 +14,6 @@ import ( "os" "os/exec" "strings" - "time" ) const namespace = "otelcollectors" @@ -58,19 +57,13 @@ func NewService(logger *zap.Logger) Service { type Service interface { // CreateOtelCollector - create an existing collector by id - CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error - - // DeleteOtelCollector - delete an existing collector by id - DeleteOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error - - // UpdateOtelCollector - update an existing collector by id - UpdateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error + CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) (string, error) // KillOtelCollector - kill an existing collector by id, terminating by the ownerID, sinkID without the file KillOtelCollector(ctx context.Context, ownerID, sinkID string) error } -func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) error { +func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) (string, error) { _, status, err := svc.getDeploymentState(ctx, ownerID, sinkId) fileContent := []byte(manifest) tmp := strings.Split(string(fileContent), "\n") @@ -83,7 +76,7 @@ func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerI err = os.WriteFile("/tmp/otel-collector-"+sinkId+".json", []byte(newContent), 0644) if err != nil { svc.logger.Error("failed to write file content", zap.Error(err)) - return err + return "", err } stdOutListenFunction := func(out *bufio.Scanner, err *bufio.Scanner) { for out.Scan() { @@ -100,8 +93,9 @@ func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerI if err == nil { svc.logger.Info(fmt.Sprintf("successfully %s the otel-collector for sink-id: %s", operation, sinkId)) } - - return nil + // TODO this will be retrieved once we move to K8s SDK + collectorName := fmt.Sprintf("otelcol-%s-%s", ownerID, sinkId) + return collectorName, nil } func execCmd(_ context.Context, cmd *exec.Cmd, logger *zap.Logger, stdOutFunc func(stdOut *bufio.Scanner, stdErr *bufio.Scanner)) (*bufio.Scanner, *bufio.Scanner, error) { @@ -146,35 +140,13 @@ func (svc *deployService) getDeploymentState(ctx context.Context, _, sinkId stri return "", "deleted", nil } -func (svc *deployService) CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error { - err := svc.collectorDeploy(ctx, "apply", ownerID, sinkID, deploymentEntry) +func (svc *deployService) CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) (string, error) { + col, err := svc.collectorDeploy(ctx, "apply", ownerID, sinkID, deploymentEntry) if err != nil { - return err + return "", err } - return nil -} - -func (svc *deployService) UpdateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error { - err := svc.DeleteOtelCollector(ctx, ownerID, sinkID, deploymentEntry) - if err != nil { - return err - } - // Time to wait until K8s completely removes before re-creating - time.Sleep(3 * time.Second) - err = svc.CreateOtelCollector(ctx, ownerID, sinkID, deploymentEntry) - if err != nil { - return err - } - return nil -} - -func (svc *deployService) DeleteOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error { - err := svc.collectorDeploy(ctx, "delete", ownerID, sinkID, deploymentEntry) - if err != nil { - return err - } - return nil + return col, nil } func (svc *deployService) KillOtelCollector(ctx context.Context, deploymentName string, sinkId string) error { diff --git a/maestro/redis/consumer/sinks.go b/maestro/redis/consumer/sinks.go index c71642a0e..7cf078dc9 100644 --- a/maestro/redis/consumer/sinks.go +++ b/maestro/redis/consumer/sinks.go @@ -4,8 +4,8 @@ import ( "context" "errors" "github.com/go-redis/redis/v8" - "github.com/orb-community/orb/maestro/deployment" maestroredis "github.com/orb-community/orb/maestro/redis" + "github.com/orb-community/orb/maestro/service" sinkspb "github.com/orb-community/orb/sinks/pb" "go.uber.org/zap" ) @@ -17,7 +17,7 @@ type SinksListenerController interface { type sinksListenerService struct { logger *zap.Logger - deploymentService deployment.DeployService + deploymentService service.EventService redisClient *redis.Client sinksClient sinkspb.SinkServiceClient } @@ -83,7 +83,7 @@ func (ls *sinksListenerService) SubscribeSinksEvents(ctx context.Context) error } } -// handleSinksUpdate logic moved to deployment.DeployService +// handleSinksUpdate logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { ls.logger.Info("Received maestro UPDATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkUpdate(ctx, event) @@ -94,7 +94,7 @@ func (ls *sinksListenerService) handleSinksUpdate(ctx context.Context, event mae return nil } -// handleSinksDelete logic moved to deployment.DeployService +// handleSinksDelete logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error { ls.logger.Info("Received maestro DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkDelete(ctx, event) @@ -104,7 +104,7 @@ func (ls *sinksListenerService) handleSinksDelete(ctx context.Context, event mae return nil } -// handleSinksCreate logic moved to deployment.DeployService +// handleSinksCreate logic moved to deployment.EventService func (ls *sinksListenerService) handleSinksCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { ls.logger.Info("Received event to CREATE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) err := ls.deploymentService.HandleSinkCreate(ctx, event) diff --git a/maestro/redis/consumer/streams.go b/maestro/redis/consumer/streams.go index 7fbee9283..26448a164 100644 --- a/maestro/redis/consumer/streams.go +++ b/maestro/redis/consumer/streams.go @@ -34,20 +34,6 @@ const ( exists = "BUSYGROUP Consumer Group name already exists" ) -type Subscriber interface { - CreateDeploymentEntry(ctx context.Context, sink config.SinkData) error - GetDeploymentEntryFromSinkId(ctx context.Context, sinkId string) (string, error) - - UpdateSinkCache(ctx context.Context, data config.SinkData) (err error) - UpdateSinkStateCache(ctx context.Context, data config.SinkData) (err error) - PublishSinkStateChange(sink *sinkspb.SinkRes, status string, logsErr error, err error) - - GetActivity(sinkID string) (int64, error) - RemoveSinkActivity(ctx context.Context, sinkId string) error - - SubscribeSinkerEvents(context context.Context) error -} - type eventStore struct { kafkaUrl string kubecontrol kubecontrol.Service @@ -60,8 +46,8 @@ type eventStore struct { } func NewEventStore(streamRedisClient, sinkerKeyRedisClient *redis.Client, kafkaUrl string, kubecontrol kubecontrol.Service, - esconsumer string, sinksClient sinkspb.SinkServiceClient, logger *zap.Logger, service deployment.Service) Subscriber { - return eventStore{ + esconsumer string, sinksClient sinkspb.SinkServiceClient, logger *zap.Logger, service deployment.Service) *eventStore { + return &eventStore{ kafkaUrl: kafkaUrl, kubecontrol: kubecontrol, streamRedisClient: streamRedisClient, diff --git a/maestro/redis/events.go b/maestro/redis/events.go index a18d79c17..4ce315704 100644 --- a/maestro/redis/events.go +++ b/maestro/redis/events.go @@ -29,7 +29,6 @@ type SinkerUpdateEvent struct { SinkID string Owner string State string - Msg string Timestamp time.Time } @@ -38,7 +37,6 @@ func (cse SinkerUpdateEvent) Encode() map[string]interface{} { "sink_id": cse.SinkID, "owner": cse.Owner, "state": cse.State, - "msg": cse.Msg, "timestamp": cse.Timestamp.Unix(), "operation": SinkerUpdate, } diff --git a/maestro/redis/producer/streams.go b/maestro/redis/producer/streams.go index 539d39147..71b30ebad 100644 --- a/maestro/redis/producer/streams.go +++ b/maestro/redis/producer/streams.go @@ -1,16 +1,67 @@ package producer import ( + "context" "github.com/go-redis/redis/v8" "go.uber.org/zap" + "time" ) +const ( + streamID = "orb.maestro" + streamLen = 1000 +) + +type SinkStatusEvent struct { + ownerId string + sinkId string + status string + errorMessage string +} + +func (e SinkStatusEvent) Encode() map[string]interface{} { + return map[string]interface{}{ + "owner_id": e.ownerId, + "sink_id": e.sinkId, + "status": e.status, + "error_message": e.errorMessage, + "timestamp": time.Now().Format(time.RFC3339), + } +} + type Producer interface { // PublishSinkStatus to be used to publish the sink activity to the sinker - PublishSinkStatus(ownerId string, sinkId string, status string, errorMessage string) error + PublishSinkStatus(ctx context.Context, ownerId string, sinkId string, status string, errorMessage string) error } type maestroProducer struct { logger *zap.Logger streamRedis *redis.Client } + +func NewMaestroProducer(logger *zap.Logger, streamRedis *redis.Client) Producer { + return &maestroProducer{logger: logger, streamRedis: streamRedis} +} + +// PublishSinkStatus to be used to publish the sink activity to the sinker +func (p *maestroProducer) PublishSinkStatus(ctx context.Context, ownerId string, sinkId string, status string, errorMessage string) error { + event := SinkStatusEvent{ + ownerId: ownerId, + sinkId: sinkId, + status: status, + errorMessage: errorMessage, + } + streamEvent := event.Encode() + record := &redis.XAddArgs{ + Stream: streamID, + MaxLen: streamLen, + Approx: true, + Values: streamEvent, + } + cmd := p.streamRedis.XAdd(ctx, record) + if cmd.Err() != nil { + p.logger.Error("error sending event to maestro event store", zap.Error(cmd.Err())) + return cmd.Err() + } + return nil +} diff --git a/maestro/service.go b/maestro/service.go index 6a1c50f41..1c3e0f5d5 100644 --- a/maestro/service.go +++ b/maestro/service.go @@ -14,6 +14,8 @@ import ( "github.com/jmoiron/sqlx" "github.com/orb-community/orb/maestro/deployment" "github.com/orb-community/orb/maestro/monitor" + "github.com/orb-community/orb/maestro/redis/producer" + "github.com/orb-community/orb/maestro/service" "github.com/orb-community/orb/pkg/types" "strings" @@ -41,8 +43,8 @@ type maestroService struct { streamRedisClient *redis.Client sinkerRedisClient *redis.Client sinksClient sinkspb.SinkServiceClient + eventService service.EventService esCfg config.EsConfig - eventStore rediscons1.Subscriber kafkaUrl string } @@ -51,9 +53,9 @@ func NewMaestroService(logger *zap.Logger, streamRedisClient *redis.Client, sink kubectr := kubecontrol.NewService(logger) repo := deployment.NewRepositoryService(db, logger) deploymentService := deployment.NewDeploymentService(logger, repo) - eventStore := rediscons1.NewEventStore(streamRedisClient, sinkerRedisClient, otelCfg.KafkaUrl, kubectr, - esCfg.Consumer, sinksGrpcClient, logger, deploymentService) - monitorService := monitor.NewMonitorService(logger, &sinksGrpcClient, eventStore, &kubectr) + ps := producer.NewMaestroProducer(logger, streamRedisClient) + monitorService := monitor.NewMonitorService(logger, &sinksGrpcClient, ps, &kubectr) + eventService := service.NewEventService(logger, deploymentService, kubectr) return &maestroService{ logger: logger, deploymentService: deploymentService, @@ -62,7 +64,6 @@ func NewMaestroService(logger *zap.Logger, streamRedisClient *redis.Client, sink sinksClient: sinksGrpcClient, kubecontrol: kubectr, monitor: monitorService, - eventStore: eventStore, kafkaUrl: otelCfg.KafkaUrl, } } diff --git a/maestro/service/deploy_service.go b/maestro/service/deploy_service.go new file mode 100644 index 000000000..6884761ba --- /dev/null +++ b/maestro/service/deploy_service.go @@ -0,0 +1,156 @@ +package service + +import ( + "context" + "github.com/orb-community/orb/maestro/deployment" + "github.com/orb-community/orb/maestro/kubecontrol" + maestroredis "github.com/orb-community/orb/maestro/redis" + "github.com/orb-community/orb/pkg/errors" + "go.uber.org/zap" + "time" +) + +// EventService will hold the business logic of the handling events from both Listeners +type EventService interface { + HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error + HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error + HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error + HandleSinkActivity(ctx context.Context, event maestroredis.SinkerUpdateEvent) error + HandleSinkIdle(ctx context.Context, event maestroredis.SinkerUpdateEvent) error +} + +type eventService struct { + logger *zap.Logger + deploymentService deployment.Service + // Configuration for KafkaURL from Orb Deployment + kafkaUrl string +} + +var _ EventService = (*eventService)(nil) + +func NewEventService(logger *zap.Logger, service deployment.Service, kubecontrol kubecontrol.Service) EventService { + namedLogger := logger.Named("deploy-service") + return &eventService{logger: namedLogger, deploymentService: service} +} + +// HandleSinkCreate will create deployment entry in postgres, will create deployment in Redis, to prepare for SinkActivity +func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { + now := time.Now() + // Create Deployment Entry + entry := deployment.Deployment{ + OwnerID: event.Owner, + SinkID: event.SinkID, + Config: event.Config, + Backend: event.Backend, + LastStatus: "provisioning", + LastStatusUpdate: &now, + LastErrorMessage: "", + LastErrorTime: nil, + CollectorName: "", + LastCollectorDeployTime: nil, + LastCollectorStopTime: nil, + } + // Use deploymentService, which will create deployment in both postgres and redis + err := d.deploymentService.CreateDeployment(ctx, &entry) + if err != nil { + d.logger.Error("error trying to create deployment entry", zap.Error(err)) + return err + } + return nil +} + +func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { + now := time.Now() + // check if exists deployment entry from postgres + entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) + if err != nil { + d.logger.Error("error trying to get deployment entry", zap.Error(err)) + return err + } + // async update sink status to provisioning + go func() { + _ = d.deploymentService.UpdateStatus(ctx, event.Owner, event.SinkID, "provisioning", "") + }() + // update deployment entry in postgres + entry.Config = event.Config + entry.LastCollectorStopTime = &now + entry.LastStatus = "provisioning" + entry.LastStatusUpdate = &now + err = d.deploymentService.UpdateDeployment(ctx, entry) + + return nil +} + +func (d *eventService) HandleSinkDelete(ctx context.Context, event maestroredis.SinksUpdateEvent) error { + deploymentEntry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) + if err != nil { + d.logger.Warn("did not find collector entry for sink", zap.String("sink-id", event.SinkID)) + return err + } + if deploymentEntry.LastCollectorDeployTime != nil || deploymentEntry.LastCollectorDeployTime.Before(time.Now()) { + if deploymentEntry.LastCollectorStopTime != nil || deploymentEntry.LastCollectorStopTime.Before(time.Now()) { + d.logger.Warn("collector is not running, skipping") + } else { + // + } + } + err = d.deploymentService.RemoveDeployment(ctx, event.Owner, event.SinkID) + if err != nil { + return err + } + return nil +} + +func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredis.SinkerUpdateEvent) error { + if event.State != "active" { + return errors.New("trying to deploy sink that is not active") + } + // check if exists deployment entry from postgres + _, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) + if err != nil { + d.logger.Error("error trying to get deployment entry", zap.Error(err)) + return err + } + // async update sink status to provisioning + go func() { + _ = d.deploymentService.UpdateStatus(ctx, event.Owner, event.SinkID, "provisioning", "") + }() + _, err = d.deploymentService.NotifyCollector(ctx, event.Owner, event.SinkID, "deploy", "", "") + if err != nil { + d.logger.Error("error trying to notify collector", zap.Error(err)) + return err + } + err2 := d.deploymentService.UpdateStatus(ctx, event.Owner, event.SinkID, "provisioning_error", err.Error()) + if err2 != nil { + d.logger.Warn("error during notifying provisioning error, customer will not be notified of error") + d.logger.Error("error during update status", zap.Error(err)) + return err + } + + return nil +} + +func (d *eventService) HandleSinkIdle(ctx context.Context, event maestroredis.SinkerUpdateEvent) error { + // check if exists deployment entry from postgres + _, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) + if err != nil { + d.logger.Error("error trying to get deployment entry", zap.Error(err)) + return err + } + // async update sink status to idle + go func() { + _ = d.deploymentService.UpdateStatus(ctx, event.Owner, event.SinkID, "idle", "") + }() + _, err = d.deploymentService.NotifyCollector(ctx, event.Owner, event.SinkID, "deploy", "", "") + if err != nil { + d.logger.Error("error trying to notify collector", zap.Error(err)) + return err + } + err2 := d.deploymentService.UpdateStatus(ctx, event.Owner, event.SinkID, "provisioning_error", err.Error()) + if err2 != nil { + d.logger.Warn("error during notifying provisioning error, customer will not be notified of error") + d.logger.Error("error during update status", zap.Error(err)) + return err + } + return nil +}