diff --git a/maestro/config/authentication_builder.go b/maestro/config/authentication_builder.go index acc5c6dc9..6555193a1 100644 --- a/maestro/config/authentication_builder.go +++ b/maestro/config/authentication_builder.go @@ -50,6 +50,7 @@ func (b *BasicAuthBuilder) DecodeAuth(config types.Metadata) (types.Metadata, er return nil, err } authCfg["password"] = decodedPassword + config[AuthenticationKey] = authCfg return config, nil } @@ -61,5 +62,6 @@ func (b *BasicAuthBuilder) EncodeAuth(config types.Metadata) (types.Metadata, er return nil, err } authcfg["password"] = encodedPassword + config[AuthenticationKey] = authcfg return config, nil } diff --git a/maestro/deployment/model.go b/maestro/deployment/model.go index f40e52140..599034ac9 100644 --- a/maestro/deployment/model.go +++ b/maestro/deployment/model.go @@ -21,13 +21,14 @@ type Deployment struct { LastCollectorStopTime *time.Time `db:"last_collector_stop_time" json:"lastCollectorStopTime"` } -func NewDeployment(ownerID string, sinkID string, config types.Metadata) Deployment { +func NewDeployment(ownerID string, sinkID string, config types.Metadata, backend string) Deployment { now := time.Now() deploymentName := "otel-" + sinkID configAsByte := toByte(config) return Deployment{ OwnerID: ownerID, SinkID: sinkID, + Backend: backend, Config: configAsByte, LastStatus: "pending", LastStatusUpdate: &now, diff --git a/maestro/deployment/service.go b/maestro/deployment/service.go index 27fec2608..07316ec28 100644 --- a/maestro/deployment/service.go +++ b/maestro/deployment/service.go @@ -43,7 +43,8 @@ type deploymentService struct { var _ Service = (*deploymentService)(nil) -func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string, maestroProducer producer.Producer) Service { +func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string, + maestroProducer producer.Producer, kubecontrol kubecontrol.Service) Service { namedLogger := logger.Named("deployment-service") es := password.NewEncryptionService(logger, encryptionKey) cb := config.NewConfigBuilder(namedLogger, kafkaUrl, es) @@ -52,6 +53,7 @@ func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl st configBuilder: cb, encryptionService: es, maestroProducer: maestroProducer, + kubecontrol: kubecontrol, } } @@ -104,7 +106,7 @@ func (d *deploymentService) GetDeployment(ctx context.Context, ownerID string, s return nil, "", err } authType := deployment.GetConfig() - if authType != nil { + if authType == nil { return nil, "", errors.New("deployment do not have authentication information") } value := authType.GetSubMetadata(AuthenticationKey)["type"].(string) @@ -176,8 +178,8 @@ func (d *deploymentService) NotifyCollector(ctx context.Context, ownerID string, } } else if operation == "deploy" { // Spin up the collector - if got.LastCollectorDeployTime != nil || got.LastCollectorDeployTime.Before(now) { - if got.LastCollectorStopTime != nil || got.LastCollectorStopTime.Before(now) { + if got.LastCollectorDeployTime == nil || got.LastCollectorDeployTime.Before(now) { + if got.LastCollectorStopTime == nil || got.LastCollectorStopTime.Before(now) { d.logger.Debug("collector is not running deploying") deployReq := &config.DeploymentRequest{ OwnerID: ownerID, diff --git a/maestro/service.go b/maestro/service.go index 4071553f1..55347cd0b 100644 --- a/maestro/service.go +++ b/maestro/service.go @@ -51,7 +51,7 @@ func NewMaestroService(logger *zap.Logger, streamRedisClient *redis.Client, sink kubectr := kubecontrol.NewService(logger) repo := deployment.NewRepositoryService(db, logger) maestroProducer := producer.NewMaestroProducer(logger, streamRedisClient) - deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey, maestroProducer) + deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey, maestroProducer, kubectr) ps := producer.NewMaestroProducer(logger, streamRedisClient) monitorService := monitor.NewMonitorService(logger, &sinksGrpcClient, ps, &kubectr) eventService := service.NewEventService(logger, deploymentService, kubectr) diff --git a/maestro/service/deploy_service.go b/maestro/service/deploy_service.go index 171de6a50..ee5930b90 100644 --- a/maestro/service/deploy_service.go +++ b/maestro/service/deploy_service.go @@ -37,7 +37,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { d.logger.Debug("handling sink create event", zap.String("sink-id", event.SinkID), zap.String("owner-id", event.Owner)) // Create Deployment Entry - entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config) + entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend) // Use deploymentService, which will create deployment in both postgres and redis err := d.deploymentService.CreateDeployment(ctx, &entry) if err != nil { @@ -53,8 +53,18 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis. // check if exists deployment entry from postgres entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) if err != nil { - d.logger.Error("error trying to get deployment entry", zap.Error(err)) - return err + if err.Error() != "not found" { + d.logger.Error("error trying to get deployment entry", zap.Error(err)) + return err + } else { + newEntry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend) + err := d.deploymentService.CreateDeployment(ctx, &newEntry) + if err != nil { + d.logger.Error("error trying to recreate deployment entry", zap.Error(err)) + return err + } + entry = &newEntry + } } // async update sink status to provisioning go func() { @@ -110,15 +120,14 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi _, err = d.deploymentService.NotifyCollector(ctx, event.OwnerID, event.SinkID, "deploy", "", "") if err != nil { d.logger.Error("error trying to notify collector", zap.Error(err)) + err2 := d.deploymentService.UpdateStatus(ctx, event.OwnerID, event.SinkID, "provisioning_error", err.Error()) + if err2 != nil { + d.logger.Warn("error during notifying provisioning error, customer will not be notified of error") + d.logger.Error("error during update provisioning error status", zap.Error(err)) + return err + } return err } - err2 := d.deploymentService.UpdateStatus(ctx, event.OwnerID, event.SinkID, "provisioning_error", err.Error()) - if err2 != nil { - d.logger.Warn("error during notifying provisioning error, customer will not be notified of error") - d.logger.Error("error during update status", zap.Error(err)) - return err - } - return nil } diff --git a/maestro/service/deploy_service_test.go b/maestro/service/deploy_service_test.go deleted file mode 100644 index e4837ec6e..000000000 --- a/maestro/service/deploy_service_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package service - -import ( - "context" - "github.com/orb-community/orb/maestro/deployment" - "github.com/orb-community/orb/maestro/redis" - "github.com/orb-community/orb/pkg/types" - "go.uber.org/zap" - "testing" - "time" -) - -func Test_eventService_HandleSinkCreate(t *testing.T) { - - type args struct { - event redis.SinksUpdateEvent - } - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "create event", - args: args{ - event: redis.SinksUpdateEvent{ - SinkID: "sink1", - Owner: "owner1", - Config: types.Metadata{ - "exporter": types.Metadata{ - "remote_host": "https://acme.com/prom/push", - }, - "authentication": types.Metadata{ - "type": "basicauth", - "username": "prom-user", - "password": "dbpass", - }, - }, - Backend: "prometheus", - Timestamp: time.Now(), - }, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - logger := zap.NewNop() - deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) - d := NewEventService(logger, deploymentService, nil) - ctx := context.WithValue(context.Background(), "test", tt.name) - if err := d.HandleSinkCreate(ctx, tt.args.event); (err != nil) != tt.wantErr { - t.Errorf("HandleSinkCreate() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} diff --git a/maestro/service/handle_sinker_test.go b/maestro/service/handle_sinker_test.go new file mode 100644 index 000000000..85a69b3a2 --- /dev/null +++ b/maestro/service/handle_sinker_test.go @@ -0,0 +1,141 @@ +package service + +import ( + "context" + "github.com/orb-community/orb/maestro/deployment" + "github.com/orb-community/orb/maestro/redis" + "github.com/orb-community/orb/pkg/types" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "testing" + "time" +) + +func TestEventService_HandleSinkActivity(t *testing.T) { + type args struct { + event redis.SinkerUpdateEvent + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "activity on a sink that does not exist", + args: args{ + event: redis.SinkerUpdateEvent{ + OwnerID: "owner1", + SinkID: "sink1", + State: "active", + Size: "22", + Timestamp: time.Now(), + }, + }, + wantErr: true, + }, + { + name: "activity success", + args: args{ + event: redis.SinkerUpdateEvent{ + OwnerID: "owner2", + SinkID: "sink2", + State: "active", + Size: "22", + Timestamp: time.Now(), + }, + }, wantErr: false, + }, + } + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", + "MY_SECRET", NewTestProducer(logger), NewTestKubeCtr(logger)) + d := NewEventService(logger, deploymentService, nil) + err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + }) + require.NoError(t, err, "should not error") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := d.HandleSinkActivity(ctx, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("HandleSinkActivity() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestEventService_HandleSinkIdle(t *testing.T) { + type args struct { + event redis.SinkerUpdateEvent + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "sink idle on a sink that does not exist", + args: args{ + event: redis.SinkerUpdateEvent{ + OwnerID: "owner1", + SinkID: "sink1", + State: "idle", + Size: "22", + Timestamp: time.Now(), + }, + }, + wantErr: true, + }, + { + name: "sink idle success", + args: args{ + event: redis.SinkerUpdateEvent{ + OwnerID: "owner2", + SinkID: "sink2", + State: "idle", + Size: "22", + Timestamp: time.Now(), + }, + }, wantErr: false, + }, + } + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) + d := NewEventService(logger, deploymentService, NewTestKubeCtr(logger)) + err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + }) + require.NoError(t, err, "should not error") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := d.HandleSinkIdle(ctx, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("HandleSinkIdle() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/maestro/service/handle_sinks_test.go b/maestro/service/handle_sinks_test.go new file mode 100644 index 000000000..2aaa53122 --- /dev/null +++ b/maestro/service/handle_sinks_test.go @@ -0,0 +1,218 @@ +package service + +import ( + "context" + "github.com/orb-community/orb/maestro/deployment" + "github.com/orb-community/orb/maestro/redis" + "github.com/orb-community/orb/pkg/types" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "testing" + "time" +) + +func Test_eventService_HandleSinkCreate(t *testing.T) { + type args struct { + event redis.SinksUpdateEvent + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "create event", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink1", + Owner: "owner1", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user", + "password": "dbpass", + }, + }, + Backend: "prometheus", + Timestamp: time.Now(), + }, + }, + wantErr: false, + }, + { + name: "create event without config", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink1", + Owner: "owner1", + Config: nil, + Backend: "prometheus", + Timestamp: time.Now(), + }, + }, + wantErr: true, + }, + } + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) + d := NewEventService(logger, deploymentService, nil) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := d.HandleSinkCreate(ctx, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("HandleSinkCreate() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestEventService_HandleSinkUpdate(t *testing.T) { + type args struct { + event redis.SinksUpdateEvent + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "update event when there is none in db", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink1", + Owner: "owner1", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user", + "password": "dbpass", + }, + }, + Backend: "prometheus", + Timestamp: time.Now(), + }, + }, + wantErr: false, + }, + { + name: "update event success", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink1", + Owner: "owner1", + Backend: "prometheus", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + Timestamp: time.Now(), + }, + }, + wantErr: false, + }, + } + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) + d := NewEventService(logger, deploymentService, nil) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := d.HandleSinkUpdate(ctx, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("HandleSinkUpdate() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestEventService_HandleSinkDelete(t *testing.T) { + type args struct { + event redis.SinksUpdateEvent + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "delete event when there is none in db", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink1", + Owner: "owner1", + Backend: "prometheus", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + }, + }, + wantErr: true, + }, + { + name: "delete event success", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + }, + }, + wantErr: false, + }, + } + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) + d := NewEventService(logger, deploymentService, nil) + err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + }) + require.NoError(t, err, "should not error") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := d.HandleSinkDelete(ctx, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("HandleSinkDelete() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/maestro/service/kubectr_test.go b/maestro/service/kubectr_test.go new file mode 100644 index 000000000..fb449a8cc --- /dev/null +++ b/maestro/service/kubectr_test.go @@ -0,0 +1,24 @@ +package service + +import ( + "context" + "github.com/orb-community/orb/maestro/kubecontrol" + "go.uber.org/zap" +) + +type testKubeCtr struct { + logger *zap.Logger +} + +func NewTestKubeCtr(logger *zap.Logger) kubecontrol.Service { + return &testKubeCtr{logger: logger} +} + +func (t *testKubeCtr) CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) (string, error) { + name := "test-collector" + return name, nil +} + +func (t *testKubeCtr) KillOtelCollector(ctx context.Context, deploymentName, sinkID string) error { + return nil +} diff --git a/maestro/service/repository_test.go b/maestro/service/repository_test.go index 795df9331..13e89518b 100644 --- a/maestro/service/repository_test.go +++ b/maestro/service/repository_test.go @@ -19,27 +19,30 @@ func NewFakeRepository(logger *zap.Logger) deployment.Repository { func (f *fakeRepository) FetchAll(ctx context.Context) ([]deployment.Deployment, error) { var allDeployments []deployment.Deployment for _, deploy := range f.inMemoryDict { - allDeployments = append(allDeployments, *deploy) + copy := copyDeploy(deploy) + allDeployments = append(allDeployments, copy) } return allDeployments, nil } -func (f *fakeRepository) Add(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { +func (f *fakeRepository) Add(_ context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { deployment.Id = "fake-id" - f.inMemoryDict[deployment.SinkID] = deployment + copy := copyDeploy(deployment) + f.inMemoryDict[deployment.SinkID] = © return deployment, nil } -func (f *fakeRepository) Update(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { - f.inMemoryDict[deployment.SinkID] = deployment +func (f *fakeRepository) Update(_ context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { + copy := copyDeploy(deployment) + f.inMemoryDict[deployment.SinkID] = © return deployment, nil } -func (f *fakeRepository) UpdateStatus(ctx context.Context, ownerID string, sinkId string, status string, errorMessage string) error { +func (f *fakeRepository) UpdateStatus(_ context.Context, _ string, _ string, _ string, _ string) error { return nil } -func (f *fakeRepository) Remove(ctx context.Context, ownerId string, sinkId string) error { +func (f *fakeRepository) Remove(_ context.Context, _ string, sinkId string) error { delete(f.inMemoryDict, sinkId) return nil } @@ -47,11 +50,30 @@ func (f *fakeRepository) Remove(ctx context.Context, ownerId string, sinkId stri func (f *fakeRepository) FindByOwnerAndSink(ctx context.Context, _ string, sinkId string) (*deployment.Deployment, error) { deploy, ok := f.inMemoryDict[sinkId] if ok { - return deploy, nil + copy := copyDeploy(deploy) + return ©, nil } return nil, errors.New("not found") } -func (f *fakeRepository) FindByCollectorName(ctx context.Context, collectorName string) (*deployment.Deployment, error) { +func (f *fakeRepository) FindByCollectorName(_ context.Context, _ string) (*deployment.Deployment, error) { return nil, nil } + +func copyDeploy(src *deployment.Deployment) deployment.Deployment { + deploy := deployment.Deployment{ + Id: src.Id, + OwnerID: src.OwnerID, + SinkID: src.SinkID, + Backend: src.Backend, + Config: src.Config, + LastStatus: src.LastStatus, + LastStatusUpdate: src.LastStatusUpdate, + LastErrorMessage: src.LastErrorMessage, + LastErrorTime: src.LastErrorTime, + CollectorName: src.CollectorName, + LastCollectorDeployTime: src.LastCollectorDeployTime, + LastCollectorStopTime: src.LastCollectorStopTime, + } + return deploy +}