From 9696b66d25a7354c35ccf0d97dab3b84d3cc9d01 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 28 Sep 2023 16:41:07 -0300 Subject: [PATCH 1/8] feat(maestro): fix update event and add redundancy to update when there isnt a deployment in db. --- maestro/service/deploy_service.go | 14 ++++- maestro/service/deploy_service_test.go | 81 +++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/maestro/service/deploy_service.go b/maestro/service/deploy_service.go index 171de6a50..36cbfdf80 100644 --- a/maestro/service/deploy_service.go +++ b/maestro/service/deploy_service.go @@ -53,8 +53,18 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis. // check if exists deployment entry from postgres entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID) if err != nil { - d.logger.Error("error trying to get deployment entry", zap.Error(err)) - return err + if err.Error() != "not found" { + d.logger.Error("error trying to get deployment entry", zap.Error(err)) + return err + } else { + newEntry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config) + err := d.deploymentService.CreateDeployment(ctx, &newEntry) + if err != nil { + d.logger.Error("error trying to recreate deployment entry", zap.Error(err)) + return err + } + entry = &newEntry + } } // async update sink status to provisioning go func() { diff --git a/maestro/service/deploy_service_test.go b/maestro/service/deploy_service_test.go index e4837ec6e..55463c6d3 100644 --- a/maestro/service/deploy_service_test.go +++ b/maestro/service/deploy_service_test.go @@ -11,7 +11,6 @@ import ( ) func Test_eventService_HandleSinkCreate(t *testing.T) { - type args struct { event redis.SinksUpdateEvent } @@ -42,6 +41,19 @@ func Test_eventService_HandleSinkCreate(t *testing.T) { }, wantErr: false, }, + { + name: "create event without config", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink1", + Owner: "owner1", + Config: nil, + Backend: "prometheus", + Timestamp: time.Now(), + }, + }, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -55,3 +67,70 @@ func Test_eventService_HandleSinkCreate(t *testing.T) { }) } } + +func TestEventService_HandleSinkUpdate(t *testing.T) { + type args struct { + event redis.SinksUpdateEvent + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "update event when there is none in db", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink1", + Owner: "owner1", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user", + "password": "dbpass", + }, + }, + Backend: "prometheus", + Timestamp: time.Now(), + }, + }, + wantErr: false, + }, + { + name: "update event success", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink1", + Owner: "owner1", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + Backend: "prometheus", + Timestamp: time.Now(), + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) + d := NewEventService(logger, deploymentService, nil) + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := d.HandleSinkUpdate(ctx, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("HandleSinkCreate() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} From d4b59590da599e7303fd0183b71b2ef00f70f658 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 28 Sep 2023 16:46:12 -0300 Subject: [PATCH 2/8] feat(maestro): add unit test for delete --- maestro/service/deploy_service_test.go | 54 +++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/maestro/service/deploy_service_test.go b/maestro/service/deploy_service_test.go index 55463c6d3..eb303458d 100644 --- a/maestro/service/deploy_service_test.go +++ b/maestro/service/deploy_service_test.go @@ -55,11 +55,11 @@ func Test_eventService_HandleSinkCreate(t *testing.T) { wantErr: true, }, } + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) + d := NewEventService(logger, deploymentService, nil) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - logger := zap.NewNop() - deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) - d := NewEventService(logger, deploymentService, nil) ctx := context.WithValue(context.Background(), "test", tt.name) if err := d.HandleSinkCreate(ctx, tt.args.event); (err != nil) != tt.wantErr { t.Errorf("HandleSinkCreate() error = %v, wantErr %v", err, tt.wantErr) @@ -122,11 +122,11 @@ func TestEventService_HandleSinkUpdate(t *testing.T) { wantErr: false, }, } + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) + d := NewEventService(logger, deploymentService, nil) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - logger := zap.NewNop() - deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) - d := NewEventService(logger, deploymentService, nil) ctx := context.WithValue(context.Background(), "test", tt.name) if err := d.HandleSinkUpdate(ctx, tt.args.event); (err != nil) != tt.wantErr { t.Errorf("HandleSinkCreate() error = %v, wantErr %v", err, tt.wantErr) @@ -134,3 +134,45 @@ func TestEventService_HandleSinkUpdate(t *testing.T) { }) } } + +func TestEventService_HandleSinkDelete(t *testing.T) { + type args struct { + event redis.SinksUpdateEvent + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "delete event when there is none in db", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink1", + Owner: "owner1", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + }, + }, + }, + } + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) + d := NewEventService(logger, deploymentService, nil) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := d.HandleSinkDelete(ctx, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("HandleSinkCreate() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} From a36a4eba6f3f34dbe830a107e83ec8cece894d63 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 28 Sep 2023 16:49:25 -0300 Subject: [PATCH 3/8] feat(maestro): add unit test for delete --- maestro/service/deploy_service_test.go | 37 ++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/maestro/service/deploy_service_test.go b/maestro/service/deploy_service_test.go index eb303458d..8c06607c5 100644 --- a/maestro/service/deploy_service_test.go +++ b/maestro/service/deploy_service_test.go @@ -5,6 +5,7 @@ import ( "github.com/orb-community/orb/maestro/deployment" "github.com/orb-community/orb/maestro/redis" "github.com/orb-community/orb/pkg/types" + "github.com/stretchr/testify/require" "go.uber.org/zap" "testing" "time" @@ -162,11 +163,47 @@ func TestEventService_HandleSinkDelete(t *testing.T) { }, }, }, + wantErr: true, + }, + { + name: "delete event success", + args: args{ + event: redis.SinksUpdateEvent{ + SinkID: "sink2", + Owner: "owner2", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + }, + }, + wantErr: true, }, } logger := zap.NewNop() deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) d := NewEventService(logger, deploymentService, nil) + err := d.HandleSinkDelete(context.Background(), redis.SinksUpdateEvent{ + SinkID: "sink2", + Owner: "owner2", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + }) + require.NoError(t, err, "should not error") for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.WithValue(context.Background(), "test", tt.name) From 56dbc5c7522331f798c82d11c1bac8dbc42fa9a1 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 28 Sep 2023 17:04:17 -0300 Subject: [PATCH 4/8] feat(maestro): fix delete flow. --- maestro/deployment/service.go | 2 +- maestro/service/deploy_service_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/maestro/deployment/service.go b/maestro/deployment/service.go index 27fec2608..3bdddf9bf 100644 --- a/maestro/deployment/service.go +++ b/maestro/deployment/service.go @@ -104,7 +104,7 @@ func (d *deploymentService) GetDeployment(ctx context.Context, ownerID string, s return nil, "", err } authType := deployment.GetConfig() - if authType != nil { + if authType == nil { return nil, "", errors.New("deployment do not have authentication information") } value := authType.GetSubMetadata(AuthenticationKey)["type"].(string) diff --git a/maestro/service/deploy_service_test.go b/maestro/service/deploy_service_test.go index 8c06607c5..95d8cc637 100644 --- a/maestro/service/deploy_service_test.go +++ b/maestro/service/deploy_service_test.go @@ -130,7 +130,7 @@ func TestEventService_HandleSinkUpdate(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx := context.WithValue(context.Background(), "test", tt.name) if err := d.HandleSinkUpdate(ctx, tt.args.event); (err != nil) != tt.wantErr { - t.Errorf("HandleSinkCreate() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("HandleSinkUpdate() error = %v, wantErr %v", err, tt.wantErr) } }) } @@ -183,13 +183,13 @@ func TestEventService_HandleSinkDelete(t *testing.T) { }, }, }, - wantErr: true, + wantErr: false, }, } logger := zap.NewNop() deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) d := NewEventService(logger, deploymentService, nil) - err := d.HandleSinkDelete(context.Background(), redis.SinksUpdateEvent{ + err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ SinkID: "sink2", Owner: "owner2", Config: types.Metadata{ @@ -208,7 +208,7 @@ func TestEventService_HandleSinkDelete(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx := context.WithValue(context.Background(), "test", tt.name) if err := d.HandleSinkDelete(ctx, tt.args.event); (err != nil) != tt.wantErr { - t.Errorf("HandleSinkCreate() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("HandleSinkDelete() error = %v, wantErr %v", err, tt.wantErr) } }) } From d4475258837f0e3a1ff6fbf084af865629b2ffa0 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 28 Sep 2023 17:43:44 -0300 Subject: [PATCH 5/8] feat(maestro): fixes sink activity. --- maestro/deployment/service.go | 4 +++- maestro/service.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/maestro/deployment/service.go b/maestro/deployment/service.go index 3bdddf9bf..e618150fc 100644 --- a/maestro/deployment/service.go +++ b/maestro/deployment/service.go @@ -43,7 +43,8 @@ type deploymentService struct { var _ Service = (*deploymentService)(nil) -func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string, maestroProducer producer.Producer) Service { +func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string, + maestroProducer producer.Producer, kubecontrol kubecontrol.Service) Service { namedLogger := logger.Named("deployment-service") es := password.NewEncryptionService(logger, encryptionKey) cb := config.NewConfigBuilder(namedLogger, kafkaUrl, es) @@ -52,6 +53,7 @@ func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl st configBuilder: cb, encryptionService: es, maestroProducer: maestroProducer, + kubecontrol: kubecontrol, } } diff --git a/maestro/service.go b/maestro/service.go index 4071553f1..55347cd0b 100644 --- a/maestro/service.go +++ b/maestro/service.go @@ -51,7 +51,7 @@ func NewMaestroService(logger *zap.Logger, streamRedisClient *redis.Client, sink kubectr := kubecontrol.NewService(logger) repo := deployment.NewRepositoryService(db, logger) maestroProducer := producer.NewMaestroProducer(logger, streamRedisClient) - deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey, maestroProducer) + deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey, maestroProducer, kubectr) ps := producer.NewMaestroProducer(logger, streamRedisClient) monitorService := monitor.NewMonitorService(logger, &sinksGrpcClient, ps, &kubectr) eventService := service.NewEventService(logger, deploymentService, kubectr) From abba61bc8dbc3abe9023b26b66230c23361a053b Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 28 Sep 2023 18:42:06 -0300 Subject: [PATCH 6/8] feat(maestro): fixes not encoding --- maestro/config/authentication_builder.go | 2 + maestro/service/handle_sinker_test.go | 139 ++++++++++++++++++ ...y_service_test.go => handle_sinks_test.go} | 6 +- maestro/service/kubectr_test.go | 24 +++ 4 files changed, 168 insertions(+), 3 deletions(-) create mode 100644 maestro/service/handle_sinker_test.go rename maestro/service/{deploy_service_test.go => handle_sinks_test.go} (99%) create mode 100644 maestro/service/kubectr_test.go diff --git a/maestro/config/authentication_builder.go b/maestro/config/authentication_builder.go index acc5c6dc9..6555193a1 100644 --- a/maestro/config/authentication_builder.go +++ b/maestro/config/authentication_builder.go @@ -50,6 +50,7 @@ func (b *BasicAuthBuilder) DecodeAuth(config types.Metadata) (types.Metadata, er return nil, err } authCfg["password"] = decodedPassword + config[AuthenticationKey] = authCfg return config, nil } @@ -61,5 +62,6 @@ func (b *BasicAuthBuilder) EncodeAuth(config types.Metadata) (types.Metadata, er return nil, err } authcfg["password"] = encodedPassword + config[AuthenticationKey] = authcfg return config, nil } diff --git a/maestro/service/handle_sinker_test.go b/maestro/service/handle_sinker_test.go new file mode 100644 index 000000000..abca13ba1 --- /dev/null +++ b/maestro/service/handle_sinker_test.go @@ -0,0 +1,139 @@ +package service + +import ( + "context" + "github.com/orb-community/orb/maestro/deployment" + "github.com/orb-community/orb/maestro/redis" + "github.com/orb-community/orb/pkg/types" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "testing" + "time" +) + +func TestEventService_HandleSinkActivity(t *testing.T) { + type args struct { + event redis.SinkerUpdateEvent + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "activity on a sink that does not exist", + args: args{ + event: redis.SinkerUpdateEvent{ + OwnerID: "owner1", + SinkID: "sink1", + State: "active", + Size: "22", + Timestamp: time.Now(), + }, + }, + wantErr: true, + }, + { + name: "activity success", + args: args{ + event: redis.SinkerUpdateEvent{ + OwnerID: "owner2", + SinkID: "sink2", + State: "active", + Size: "22", + Timestamp: time.Now(), + }, + }, wantErr: false, + }, + } + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", + "MY_SECRET", NewTestProducer(logger), NewTestKubeCtr(logger)) + d := NewEventService(logger, deploymentService, nil) + err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ + SinkID: "sink2", + Owner: "owner2", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + }) + require.NoError(t, err, "should not error") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := d.HandleSinkActivity(ctx, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("HandleSinkActivity() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestEventService_HandleSinkIdle(t *testing.T) { + type args struct { + event redis.SinkerUpdateEvent + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "sink idle on a sink that does not exist", + args: args{ + event: redis.SinkerUpdateEvent{ + OwnerID: "owner1", + SinkID: "sink1", + State: "idle", + Size: "22", + Timestamp: time.Now(), + }, + }, + wantErr: true, + }, + { + name: "sink idle success", + args: args{ + event: redis.SinkerUpdateEvent{ + OwnerID: "owner2", + SinkID: "sink2", + State: "idle", + Size: "22", + Timestamp: time.Now(), + }, + }, wantErr: false, + }, + } + logger := zap.NewNop() + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) + d := NewEventService(logger, deploymentService, NewTestKubeCtr(logger)) + err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ + SinkID: "sink2", + Owner: "owner2", + Config: types.Metadata{ + "exporter": types.Metadata{ + "remote_host": "https://acme.com/prom/push", + }, + "authentication": types.Metadata{ + "type": "basicauth", + "username": "prom-user-2", + "password": "dbpass-2", + }, + }, + }) + require.NoError(t, err, "should not error") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.WithValue(context.Background(), "test", tt.name) + if err := d.HandleSinkIdle(ctx, tt.args.event); (err != nil) != tt.wantErr { + t.Errorf("HandleSinkIdle() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/maestro/service/deploy_service_test.go b/maestro/service/handle_sinks_test.go similarity index 99% rename from maestro/service/deploy_service_test.go rename to maestro/service/handle_sinks_test.go index 95d8cc637..3fef60cb6 100644 --- a/maestro/service/deploy_service_test.go +++ b/maestro/service/handle_sinks_test.go @@ -57,7 +57,7 @@ func Test_eventService_HandleSinkCreate(t *testing.T) { }, } logger := zap.NewNop() - deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) d := NewEventService(logger, deploymentService, nil) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -124,7 +124,7 @@ func TestEventService_HandleSinkUpdate(t *testing.T) { }, } logger := zap.NewNop() - deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) d := NewEventService(logger, deploymentService, nil) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -187,7 +187,7 @@ func TestEventService_HandleSinkDelete(t *testing.T) { }, } logger := zap.NewNop() - deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger)) + deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) d := NewEventService(logger, deploymentService, nil) err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ SinkID: "sink2", diff --git a/maestro/service/kubectr_test.go b/maestro/service/kubectr_test.go new file mode 100644 index 000000000..fb449a8cc --- /dev/null +++ b/maestro/service/kubectr_test.go @@ -0,0 +1,24 @@ +package service + +import ( + "context" + "github.com/orb-community/orb/maestro/kubecontrol" + "go.uber.org/zap" +) + +type testKubeCtr struct { + logger *zap.Logger +} + +func NewTestKubeCtr(logger *zap.Logger) kubecontrol.Service { + return &testKubeCtr{logger: logger} +} + +func (t *testKubeCtr) CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) (string, error) { + name := "test-collector" + return name, nil +} + +func (t *testKubeCtr) KillOtelCollector(ctx context.Context, deploymentName, sinkID string) error { + return nil +} From 26b3ae7a03ce7bbe2cd7ce70205c73f8e1eac16f Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 28 Sep 2023 19:30:22 -0300 Subject: [PATCH 7/8] feat(maestro): fixes create flow. --- maestro/deployment/model.go | 3 +- maestro/service/deploy_service.go | 4 +-- maestro/service/handle_sinker_test.go | 10 ++++--- maestro/service/handle_sinks_test.go | 21 ++++++++------ maestro/service/repository_test.go | 40 +++++++++++++++++++++------ 5 files changed, 53 insertions(+), 25 deletions(-) diff --git a/maestro/deployment/model.go b/maestro/deployment/model.go index f40e52140..599034ac9 100644 --- a/maestro/deployment/model.go +++ b/maestro/deployment/model.go @@ -21,13 +21,14 @@ type Deployment struct { LastCollectorStopTime *time.Time `db:"last_collector_stop_time" json:"lastCollectorStopTime"` } -func NewDeployment(ownerID string, sinkID string, config types.Metadata) Deployment { +func NewDeployment(ownerID string, sinkID string, config types.Metadata, backend string) Deployment { now := time.Now() deploymentName := "otel-" + sinkID configAsByte := toByte(config) return Deployment{ OwnerID: ownerID, SinkID: sinkID, + Backend: backend, Config: configAsByte, LastStatus: "pending", LastStatusUpdate: &now, diff --git a/maestro/service/deploy_service.go b/maestro/service/deploy_service.go index 36cbfdf80..bb3687b52 100644 --- a/maestro/service/deploy_service.go +++ b/maestro/service/deploy_service.go @@ -37,7 +37,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error { d.logger.Debug("handling sink create event", zap.String("sink-id", event.SinkID), zap.String("owner-id", event.Owner)) // Create Deployment Entry - entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config) + entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend) // Use deploymentService, which will create deployment in both postgres and redis err := d.deploymentService.CreateDeployment(ctx, &entry) if err != nil { @@ -57,7 +57,7 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis. d.logger.Error("error trying to get deployment entry", zap.Error(err)) return err } else { - newEntry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config) + newEntry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend) err := d.deploymentService.CreateDeployment(ctx, &newEntry) if err != nil { d.logger.Error("error trying to recreate deployment entry", zap.Error(err)) diff --git a/maestro/service/handle_sinker_test.go b/maestro/service/handle_sinker_test.go index abca13ba1..85a69b3a2 100644 --- a/maestro/service/handle_sinker_test.go +++ b/maestro/service/handle_sinker_test.go @@ -51,8 +51,9 @@ func TestEventService_HandleSinkActivity(t *testing.T) { "MY_SECRET", NewTestProducer(logger), NewTestKubeCtr(logger)) d := NewEventService(logger, deploymentService, nil) err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ - SinkID: "sink2", - Owner: "owner2", + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", @@ -114,8 +115,9 @@ func TestEventService_HandleSinkIdle(t *testing.T) { deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) d := NewEventService(logger, deploymentService, NewTestKubeCtr(logger)) err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ - SinkID: "sink2", - Owner: "owner2", + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", diff --git a/maestro/service/handle_sinks_test.go b/maestro/service/handle_sinks_test.go index 3fef60cb6..2aaa53122 100644 --- a/maestro/service/handle_sinks_test.go +++ b/maestro/service/handle_sinks_test.go @@ -104,8 +104,9 @@ func TestEventService_HandleSinkUpdate(t *testing.T) { name: "update event success", args: args{ event: redis.SinksUpdateEvent{ - SinkID: "sink1", - Owner: "owner1", + SinkID: "sink1", + Owner: "owner1", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", @@ -116,7 +117,6 @@ func TestEventService_HandleSinkUpdate(t *testing.T) { "password": "dbpass-2", }, }, - Backend: "prometheus", Timestamp: time.Now(), }, }, @@ -149,8 +149,9 @@ func TestEventService_HandleSinkDelete(t *testing.T) { name: "delete event when there is none in db", args: args{ event: redis.SinksUpdateEvent{ - SinkID: "sink1", - Owner: "owner1", + SinkID: "sink1", + Owner: "owner1", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", @@ -169,8 +170,9 @@ func TestEventService_HandleSinkDelete(t *testing.T) { name: "delete event success", args: args{ event: redis.SinksUpdateEvent{ - SinkID: "sink2", - Owner: "owner2", + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", @@ -190,8 +192,9 @@ func TestEventService_HandleSinkDelete(t *testing.T) { deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil) d := NewEventService(logger, deploymentService, nil) err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{ - SinkID: "sink2", - Owner: "owner2", + SinkID: "sink2", + Owner: "owner2", + Backend: "prometheus", Config: types.Metadata{ "exporter": types.Metadata{ "remote_host": "https://acme.com/prom/push", diff --git a/maestro/service/repository_test.go b/maestro/service/repository_test.go index 795df9331..13e89518b 100644 --- a/maestro/service/repository_test.go +++ b/maestro/service/repository_test.go @@ -19,27 +19,30 @@ func NewFakeRepository(logger *zap.Logger) deployment.Repository { func (f *fakeRepository) FetchAll(ctx context.Context) ([]deployment.Deployment, error) { var allDeployments []deployment.Deployment for _, deploy := range f.inMemoryDict { - allDeployments = append(allDeployments, *deploy) + copy := copyDeploy(deploy) + allDeployments = append(allDeployments, copy) } return allDeployments, nil } -func (f *fakeRepository) Add(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { +func (f *fakeRepository) Add(_ context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { deployment.Id = "fake-id" - f.inMemoryDict[deployment.SinkID] = deployment + copy := copyDeploy(deployment) + f.inMemoryDict[deployment.SinkID] = © return deployment, nil } -func (f *fakeRepository) Update(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { - f.inMemoryDict[deployment.SinkID] = deployment +func (f *fakeRepository) Update(_ context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) { + copy := copyDeploy(deployment) + f.inMemoryDict[deployment.SinkID] = © return deployment, nil } -func (f *fakeRepository) UpdateStatus(ctx context.Context, ownerID string, sinkId string, status string, errorMessage string) error { +func (f *fakeRepository) UpdateStatus(_ context.Context, _ string, _ string, _ string, _ string) error { return nil } -func (f *fakeRepository) Remove(ctx context.Context, ownerId string, sinkId string) error { +func (f *fakeRepository) Remove(_ context.Context, _ string, sinkId string) error { delete(f.inMemoryDict, sinkId) return nil } @@ -47,11 +50,30 @@ func (f *fakeRepository) Remove(ctx context.Context, ownerId string, sinkId stri func (f *fakeRepository) FindByOwnerAndSink(ctx context.Context, _ string, sinkId string) (*deployment.Deployment, error) { deploy, ok := f.inMemoryDict[sinkId] if ok { - return deploy, nil + copy := copyDeploy(deploy) + return ©, nil } return nil, errors.New("not found") } -func (f *fakeRepository) FindByCollectorName(ctx context.Context, collectorName string) (*deployment.Deployment, error) { +func (f *fakeRepository) FindByCollectorName(_ context.Context, _ string) (*deployment.Deployment, error) { return nil, nil } + +func copyDeploy(src *deployment.Deployment) deployment.Deployment { + deploy := deployment.Deployment{ + Id: src.Id, + OwnerID: src.OwnerID, + SinkID: src.SinkID, + Backend: src.Backend, + Config: src.Config, + LastStatus: src.LastStatus, + LastStatusUpdate: src.LastStatusUpdate, + LastErrorMessage: src.LastErrorMessage, + LastErrorTime: src.LastErrorTime, + CollectorName: src.CollectorName, + LastCollectorDeployTime: src.LastCollectorDeployTime, + LastCollectorStopTime: src.LastCollectorStopTime, + } + return deploy +} From 9b6a1deb600cf36488a1a788ad6601c6e24a56b2 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 29 Sep 2023 14:48:26 -0300 Subject: [PATCH 8/8] feat(maestro): fix on handle activity. --- maestro/deployment/service.go | 4 ++-- maestro/service/deploy_service.go | 13 ++++++------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/maestro/deployment/service.go b/maestro/deployment/service.go index e618150fc..07316ec28 100644 --- a/maestro/deployment/service.go +++ b/maestro/deployment/service.go @@ -178,8 +178,8 @@ func (d *deploymentService) NotifyCollector(ctx context.Context, ownerID string, } } else if operation == "deploy" { // Spin up the collector - if got.LastCollectorDeployTime != nil || got.LastCollectorDeployTime.Before(now) { - if got.LastCollectorStopTime != nil || got.LastCollectorStopTime.Before(now) { + if got.LastCollectorDeployTime == nil || got.LastCollectorDeployTime.Before(now) { + if got.LastCollectorStopTime == nil || got.LastCollectorStopTime.Before(now) { d.logger.Debug("collector is not running deploying") deployReq := &config.DeploymentRequest{ OwnerID: ownerID, diff --git a/maestro/service/deploy_service.go b/maestro/service/deploy_service.go index bb3687b52..ee5930b90 100644 --- a/maestro/service/deploy_service.go +++ b/maestro/service/deploy_service.go @@ -120,15 +120,14 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi _, err = d.deploymentService.NotifyCollector(ctx, event.OwnerID, event.SinkID, "deploy", "", "") if err != nil { d.logger.Error("error trying to notify collector", zap.Error(err)) + err2 := d.deploymentService.UpdateStatus(ctx, event.OwnerID, event.SinkID, "provisioning_error", err.Error()) + if err2 != nil { + d.logger.Warn("error during notifying provisioning error, customer will not be notified of error") + d.logger.Error("error during update provisioning error status", zap.Error(err)) + return err + } return err } - err2 := d.deploymentService.UpdateStatus(ctx, event.OwnerID, event.SinkID, "provisioning_error", err.Error()) - if err2 != nil { - d.logger.Warn("error during notifying provisioning error, customer will not be notified of error") - d.logger.Error("error during update status", zap.Error(err)) - return err - } - return nil }