diff --git a/kind/Chart.lock b/kind/Chart.lock deleted file mode 100644 index 53732b11b..000000000 --- a/kind/Chart.lock +++ /dev/null @@ -1,6 +0,0 @@ -dependencies: -- name: orb - repository: https://orb-community.github.io/orb-helm/ - version: 1.0.44 -digest: sha256:054a0e4810a7d857f4c0b156bb92e909f485096242098f62ab5b558140e48a22 -generated: "2023-02-13T13:18:58.67925487-03:00" diff --git a/kind/Chart.yaml b/kind/Chart.yaml index 267789e22..f86bcf44b 100644 --- a/kind/Chart.yaml +++ b/kind/Chart.yaml @@ -17,5 +17,5 @@ appVersion: "1.0.0" dependencies: - name: orb - version: "1.0.44" + version: "1.0.50" repository: "@orb-community" diff --git a/maestro/deployment/service.go b/maestro/deployment/service.go index 092b671b4..27fec2608 100644 --- a/maestro/deployment/service.go +++ b/maestro/deployment/service.go @@ -43,11 +43,16 @@ type deploymentService struct { var _ Service = (*deploymentService)(nil) -func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string) Service { +func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string, maestroProducer producer.Producer) Service { namedLogger := logger.Named("deployment-service") es := password.NewEncryptionService(logger, encryptionKey) cb := config.NewConfigBuilder(namedLogger, kafkaUrl, es) - return &deploymentService{logger: namedLogger, dbRepository: repository, configBuilder: cb, encryptionService: es} + return &deploymentService{logger: namedLogger, + dbRepository: repository, + configBuilder: cb, + encryptionService: es, + maestroProducer: maestroProducer, + } } func (d *deploymentService) CreateDeployment(ctx context.Context, deployment *Deployment) error { @@ -82,12 +87,12 @@ func (d *deploymentService) getAuthBuilder(authType string) config.AuthBuilderSe func (d *deploymentService) encodeConfig(deployment *Deployment) (types.Metadata, error) { authType := deployment.GetConfig() - if authType != nil { + if authType == nil { return nil, errors.New("deployment do not have authentication information") } value := authType.GetSubMetadata(AuthenticationKey)["type"].(string) authBuilder := d.getAuthBuilder(value) - if authBuilder != nil { + if authBuilder == nil { return nil, errors.New("deployment do not have authentication information") } return authBuilder.EncodeAuth(deployment.GetConfig()) diff --git a/maestro/service.go b/maestro/service.go index 33106559e..4071553f1 100644 --- a/maestro/service.go +++ b/maestro/service.go @@ -50,7 +50,8 @@ func NewMaestroService(logger *zap.Logger, streamRedisClient *redis.Client, sink sinksGrpcClient sinkspb.SinkServiceClient, otelCfg config.OtelConfig, db *sqlx.DB, svcCfg config.BaseSvcConfig) Service { kubectr := kubecontrol.NewService(logger) repo := deployment.NewRepositoryService(db, logger) - deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey) + maestroProducer := producer.NewMaestroProducer(logger, streamRedisClient) + deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey, maestroProducer) ps := producer.NewMaestroProducer(logger, streamRedisClient) monitorService := monitor.NewMonitorService(logger, &sinksGrpcClient, ps, &kubectr) eventService := service.NewEventService(logger, deploymentService, kubectr) diff --git a/maestro/service/deploy_service_test.go b/maestro/service/deploy_service_test.go new file mode 100644 index 000000000..e4837ec6e --- /dev/null +++ b/maestro/service/deploy_service_test.go @@ -0,0 +1,57 @@ +package service + +import ( + "context" + "github.com/orb-community/orb/maestro/deployment" + "github.com/orb-community/orb/maestro/redis" + "github.com/orb-community/orb/pkg/types" + "go.uber.org/zap" + "testing" + "time" +) + +func Test_eventService_HandleSinkCreate(t *testing.T) { + + type args struct { + event redis.SinksUpdateEvent + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "create event", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink1", + Owner: "owner1", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user", + "password": "dbpass", + }, + }, + Backend: "prometheus", + Timestamp: time.Now(), + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) + d := NewEventService(logger, deploymentService, nil) + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := d.HandleSinkCreate(ctx, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("HandleSinkCreate() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/maestro/service/producer_test.go b/maestro/service/producer_test.go new file mode 100644 index 000000000..e108b1bfa --- /dev/null +++ b/maestro/service/producer_test.go @@ -0,0 +1,19 @@ +package service + +import ( + "context" + "github.com/orb-community/orb/maestro/redis/producer" + "go.uber.org/zap" +) + +type testProducer struct { + logger *zap.Logger +} + +func NewTestProducer(logger *zap.Logger) producer.Producer { + return &testProducer{logger: logger} +} + +func (t *testProducer) PublishSinkStatus(_ context.Context, _ string, _ string, _ string, _ string) error { + return nil +} diff --git a/maestro/service/repository_test.go b/maestro/service/repository_test.go new file mode 100644 index 000000000..795df9331 --- /dev/null +++ b/maestro/service/repository_test.go @@ -0,0 +1,57 @@ +package service + +import ( + "context" + "errors" + "github.com/orb-community/orb/maestro/deployment" + "go.uber.org/zap" +) + +type fakeRepository struct { + logger *zap.Logger + inMemoryDict map[string]*deployment.Deployment +} + +func NewFakeRepository(logger *zap.Logger) deployment.Repository { + return &fakeRepository{logger: logger, inMemoryDict: make(map[string]*deployment.Deployment)} +} + +func (f *fakeRepository) FetchAll(ctx context.Context) ([]deployment.Deployment, error) { + var allDeployments []deployment.Deployment + for _, deploy := range f.inMemoryDict { + allDeployments = append(allDeployments, *deploy) + } + return allDeployments, nil +} + +func (f *fakeRepository) Add(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { + deployment.Id = "fake-id" + f.inMemoryDict[deployment.SinkID] = deployment + return deployment, nil +} + +func (f *fakeRepository) Update(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { + f.inMemoryDict[deployment.SinkID] = deployment + return deployment, nil +} + +func (f *fakeRepository) UpdateStatus(ctx context.Context, ownerID string, sinkId string, status string, errorMessage string) error { + return nil +} + +func (f *fakeRepository) Remove(ctx context.Context, ownerId string, sinkId string) error { + delete(f.inMemoryDict, sinkId) + return nil +} + +func (f *fakeRepository) FindByOwnerAndSink(ctx context.Context, _ string, sinkId string) (*deployment.Deployment, error) { + deploy, ok := f.inMemoryDict[sinkId] + if ok { + return deploy, nil + } + return nil, errors.New("not found") +} + +func (f *fakeRepository) FindByCollectorName(ctx context.Context, collectorName string) (*deployment.Deployment, error) { + return nil, nil +}