diff --git a/maestro/deployment/model.go b/maestro/deployment/model.go index f40e52140..599034ac9 100644 --- a/maestro/deployment/model.go +++ b/maestro/deployment/model.go @@ -21,13 +21,14 @@ type Deployment struct { LastCollectorStopTime *time.Time `db:"last_collector_stop_time" json:"lastCollectorStopTime"` } -func NewDeployment(ownerID string, sinkID string, config types.Metadata) Deployment { +func NewDeployment(ownerID string, sinkID string, config types.Metadata, backend string) Deployment { now := time.Now() deploymentName := "otel-" + sinkID configAsByte := toByte(config) return Deployment{ OwnerID: ownerID, SinkID: sinkID, + Backend: backend, Config: configAsByte, LastStatus: "pending", LastStatusUpdate: &now, diff --git a/maestro/service/deploy_service.go b/maestro/service/deploy_service.go index 36cbfdf80..bb3687b52 100644 --- a/maestro/service/deploy_service.go +++ b/maestro/service/deploy_service.go @@ -37,7 +37,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { d.logger.Debug("handling sink create event", zap.String("sink-id", event.SinkID), zap.String("owner-id", event.Owner)) // Create Deployment Entry - entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config) + entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend) // Use deploymentService, which will create deployment in both postgres and redis err := d.deploymentService.CreateDeployment(ctx, &entry) if err != nil { @@ -57,7 +57,7 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis. d.logger.Error("error trying to get deployment entry", zap.Error(err)) return err } else { - newEntry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config) + newEntry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend) err := d.deploymentService.CreateDeployment(ctx, &newEntry) if err != nil { d.logger.Error("error trying to recreate deployment entry", zap.Error(err)) diff --git a/maestro/service/handle_sinker_test.go b/maestro/service/handle_sinker_test.go index abca13ba1..85a69b3a2 100644 --- a/maestro/service/handle_sinker_test.go +++ b/maestro/service/handle_sinker_test.go @@ -51,8 +51,9 @@ func TestEventService_HandleSinkActivity(t *testing.T) { "MY_SECRET", NewTestProducer(logger), NewTestKubeCtr(logger)) d := NewEventService(logger, deploymentService, nil) err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ - SinkID: "sink2", - Owner: "owner2", + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", @@ -114,8 +115,9 @@ func TestEventService_HandleSinkIdle(t *testing.T) { deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) d := NewEventService(logger, deploymentService, NewTestKubeCtr(logger)) err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ - SinkID: "sink2", - Owner: "owner2", + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", diff --git a/maestro/service/handle_sinks_test.go b/maestro/service/handle_sinks_test.go index 3fef60cb6..2aaa53122 100644 --- a/maestro/service/handle_sinks_test.go +++ b/maestro/service/handle_sinks_test.go @@ -104,8 +104,9 @@ func TestEventService_HandleSinkUpdate(t *testing.T) { name: "update event success", args: args{ event: redis.SinksUpdateEvent{ - SinkID: "sink1", - Owner: "owner1", + SinkID: "sink1", + Owner: "owner1", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", @@ -116,7 +117,6 @@ func TestEventService_HandleSinkUpdate(t *testing.T) { "password": "dbpass-2", }, }, - Backend: "prometheus", Timestamp: time.Now(), }, }, @@ -149,8 +149,9 @@ func TestEventService_HandleSinkDelete(t *testing.T) { name: "delete event when there is none in db", args: args{ event: redis.SinksUpdateEvent{ - SinkID: "sink1", - Owner: "owner1", + SinkID: "sink1", + Owner: "owner1", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", @@ -169,8 +170,9 @@ func TestEventService_HandleSinkDelete(t *testing.T) { name: "delete event success", args: args{ event: redis.SinksUpdateEvent{ - SinkID: "sink2", - Owner: "owner2", + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", @@ -190,8 +192,9 @@ func TestEventService_HandleSinkDelete(t *testing.T) { deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) d := NewEventService(logger, deploymentService, nil) err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ - SinkID: "sink2", - Owner: "owner2", + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", diff --git a/maestro/service/repository_test.go b/maestro/service/repository_test.go index 795df9331..13e89518b 100644 --- a/maestro/service/repository_test.go +++ b/maestro/service/repository_test.go @@ -19,27 +19,30 @@ func NewFakeRepository(logger *zap.Logger) deployment.Repository { func (f *fakeRepository) FetchAll(ctx context.Context) ([]deployment.Deployment, error) { var allDeployments []deployment.Deployment for _, deploy := range f.inMemoryDict { - allDeployments = append(allDeployments, *deploy) + copy := copyDeploy(deploy) + allDeployments = append(allDeployments, copy) } return allDeployments, nil } -func (f *fakeRepository) Add(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { +func (f *fakeRepository) Add(_ context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { deployment.Id = "fake-id" - f.inMemoryDict[deployment.SinkID] = deployment + copy := copyDeploy(deployment) + f.inMemoryDict[deployment.SinkID] = © return deployment, nil } -func (f *fakeRepository) Update(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { - f.inMemoryDict[deployment.SinkID] = deployment +func (f *fakeRepository) Update(_ context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { + copy := copyDeploy(deployment) + f.inMemoryDict[deployment.SinkID] = © return deployment, nil } -func (f *fakeRepository) UpdateStatus(ctx context.Context, ownerID string, sinkId string, status string, errorMessage string) error { +func (f *fakeRepository) UpdateStatus(_ context.Context, _ string, _ string, _ string, _ string) error { return nil } -func (f *fakeRepository) Remove(ctx context.Context, ownerId string, sinkId string) error { +func (f *fakeRepository) Remove(_ context.Context, _ string, sinkId string) error { delete(f.inMemoryDict, sinkId) return nil } @@ -47,11 +50,30 @@ func (f *fakeRepository) Remove(ctx context.Context, ownerId string, sinkId stri func (f *fakeRepository) FindByOwnerAndSink(ctx context.Context, _ string, sinkId string) (*deployment.Deployment, error) { deploy, ok := f.inMemoryDict[sinkId] if ok { - return deploy, nil + copy := copyDeploy(deploy) + return ©, nil } return nil, errors.New("not found") } -func (f *fakeRepository) FindByCollectorName(ctx context.Context, collectorName string) (*deployment.Deployment, error) { +func (f *fakeRepository) FindByCollectorName(_ context.Context, _ string) (*deployment.Deployment, error) { return nil, nil } + +func copyDeploy(src *deployment.Deployment) deployment.Deployment { + deploy := deployment.Deployment{ + Id: src.Id, + OwnerID: src.OwnerID, + SinkID: src.SinkID, + Backend: src.Backend, + Config: src.Config, + LastStatus: src.LastStatus, + LastStatusUpdate: src.LastStatusUpdate, + LastErrorMessage: src.LastErrorMessage, + LastErrorTime: src.LastErrorTime, + CollectorName: src.CollectorName, + LastCollectorDeployTime: src.LastCollectorDeployTime, + LastCollectorStopTime: src.LastCollectorStopTime, + } + return deploy +}