Skip to content

Commit

Permalink
fix(maestro): adding test cases and fixes logic for handling events (#…
Browse files Browse the repository at this point in the history
…2667)

* feat(maestro): fix update event and add redundancy to update when there isnt a deployment in db.

* feat(maestro): add unit test for delete

* feat(maestro): add unit test for delete

* feat(maestro): fix delete flow.

* feat(maestro): fixes sink activity.

* feat(maestro): fixes not encoding

* feat(maestro): fixes create flow.

* feat(maestro): fix on handle activity.
  • Loading branch information
lpegoraro authored Sep 29, 2023
1 parent 418bb2b commit 6c6caeb
Show file tree
Hide file tree
Showing 10 changed files with 444 additions and 82 deletions.
2 changes: 2 additions & 0 deletions maestro/config/authentication_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (b *BasicAuthBuilder) DecodeAuth(config types.Metadata) (types.Metadata, er
return nil, err
}
authCfg["password"] = decodedPassword
config[AuthenticationKey] = authCfg
return config, nil
}

Expand All @@ -61,5 +62,6 @@ func (b *BasicAuthBuilder) EncodeAuth(config types.Metadata) (types.Metadata, er
return nil, err
}
authcfg["password"] = encodedPassword
config[AuthenticationKey] = authcfg
return config, nil
}
3 changes: 2 additions & 1 deletion maestro/deployment/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ type Deployment struct {
LastCollectorStopTime *time.Time `db:"last_collector_stop_time" json:"lastCollectorStopTime"`
}

func NewDeployment(ownerID string, sinkID string, config types.Metadata) Deployment {
func NewDeployment(ownerID string, sinkID string, config types.Metadata, backend string) Deployment {
now := time.Now()
deploymentName := "otel-" + sinkID
configAsByte := toByte(config)
return Deployment{
OwnerID: ownerID,
SinkID: sinkID,
Backend: backend,
Config: configAsByte,
LastStatus: "pending",
LastStatusUpdate: &now,
Expand Down
10 changes: 6 additions & 4 deletions maestro/deployment/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type deploymentService struct {

var _ Service = (*deploymentService)(nil)

func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string, maestroProducer producer.Producer) Service {
func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string,
maestroProducer producer.Producer, kubecontrol kubecontrol.Service) Service {
namedLogger := logger.Named("deployment-service")
es := password.NewEncryptionService(logger, encryptionKey)
cb := config.NewConfigBuilder(namedLogger, kafkaUrl, es)
Expand All @@ -52,6 +53,7 @@ func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl st
configBuilder: cb,
encryptionService: es,
maestroProducer: maestroProducer,
kubecontrol: kubecontrol,
}
}

Expand Down Expand Up @@ -104,7 +106,7 @@ func (d *deploymentService) GetDeployment(ctx context.Context, ownerID string, s
return nil, "", err
}
authType := deployment.GetConfig()
if authType != nil {
if authType == nil {
return nil, "", errors.New("deployment do not have authentication information")
}
value := authType.GetSubMetadata(AuthenticationKey)["type"].(string)
Expand Down Expand Up @@ -176,8 +178,8 @@ func (d *deploymentService) NotifyCollector(ctx context.Context, ownerID string,
}
} else if operation == "deploy" {
// Spin up the collector
if got.LastCollectorDeployTime != nil || got.LastCollectorDeployTime.Before(now) {
if got.LastCollectorStopTime != nil || got.LastCollectorStopTime.Before(now) {
if got.LastCollectorDeployTime == nil || got.LastCollectorDeployTime.Before(now) {
if got.LastCollectorStopTime == nil || got.LastCollectorStopTime.Before(now) {
d.logger.Debug("collector is not running deploying")
deployReq := &config.DeploymentRequest{
OwnerID: ownerID,
Expand Down
2 changes: 1 addition & 1 deletion maestro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewMaestroService(logger *zap.Logger, streamRedisClient *redis.Client, sink
kubectr := kubecontrol.NewService(logger)
repo := deployment.NewRepositoryService(db, logger)
maestroProducer := producer.NewMaestroProducer(logger, streamRedisClient)
deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey, maestroProducer)
deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey, maestroProducer, kubectr)
ps := producer.NewMaestroProducer(logger, streamRedisClient)
monitorService := monitor.NewMonitorService(logger, &sinksGrpcClient, ps, &kubectr)
eventService := service.NewEventService(logger, deploymentService, kubectr)
Expand Down
29 changes: 19 additions & 10 deletions maestro/service/deploy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr
func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
d.logger.Debug("handling sink create event", zap.String("sink-id", event.SinkID), zap.String("owner-id", event.Owner))
// Create Deployment Entry
entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config)
entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend)
// Use deploymentService, which will create deployment in both postgres and redis
err := d.deploymentService.CreateDeployment(ctx, &entry)
if err != nil {
Expand All @@ -53,8 +53,18 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.
// check if exists deployment entry from postgres
entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID)
if err != nil {
d.logger.Error("error trying to get deployment entry", zap.Error(err))
return err
if err.Error() != "not found" {
d.logger.Error("error trying to get deployment entry", zap.Error(err))
return err
} else {
newEntry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend)
err := d.deploymentService.CreateDeployment(ctx, &newEntry)
if err != nil {
d.logger.Error("error trying to recreate deployment entry", zap.Error(err))
return err
}
entry = &newEntry
}
}
// async update sink status to provisioning
go func() {
Expand Down Expand Up @@ -110,15 +120,14 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi
_, err = d.deploymentService.NotifyCollector(ctx, event.OwnerID, event.SinkID, "deploy", "", "")
if err != nil {
d.logger.Error("error trying to notify collector", zap.Error(err))
err2 := d.deploymentService.UpdateStatus(ctx, event.OwnerID, event.SinkID, "provisioning_error", err.Error())
if err2 != nil {
d.logger.Warn("error during notifying provisioning error, customer will not be notified of error")
d.logger.Error("error during update provisioning error status", zap.Error(err))
return err
}
return err
}
err2 := d.deploymentService.UpdateStatus(ctx, event.OwnerID, event.SinkID, "provisioning_error", err.Error())
if err2 != nil {
d.logger.Warn("error during notifying provisioning error, customer will not be notified of error")
d.logger.Error("error during update status", zap.Error(err))
return err
}

return nil
}

Expand Down
57 changes: 0 additions & 57 deletions maestro/service/deploy_service_test.go

This file was deleted.

141 changes: 141 additions & 0 deletions maestro/service/handle_sinker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package service

import (
"context"
"github.com/orb-community/orb/maestro/deployment"
"github.com/orb-community/orb/maestro/redis"
"github.com/orb-community/orb/pkg/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"testing"
"time"
)

func TestEventService_HandleSinkActivity(t *testing.T) {
type args struct {
event redis.SinkerUpdateEvent
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "activity on a sink that does not exist",
args: args{
event: redis.SinkerUpdateEvent{
OwnerID: "owner1",
SinkID: "sink1",
State: "active",
Size: "22",
Timestamp: time.Now(),
},
},
wantErr: true,
},
{
name: "activity success",
args: args{
event: redis.SinkerUpdateEvent{
OwnerID: "owner2",
SinkID: "sink2",
State: "active",
Size: "22",
Timestamp: time.Now(),
},
}, wantErr: false,
},
}
logger := zap.NewNop()
deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092",
"MY_SECRET", NewTestProducer(logger), NewTestKubeCtr(logger))
d := NewEventService(logger, deploymentService, nil)
err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{
SinkID: "sink2",
Owner: "owner2",
Backend: "prometheus",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
},
"authentication": types.Metadata{
"type": "basicauth",
"username": "prom-user-2",
"password": "dbpass-2",
},
},
})
require.NoError(t, err, "should not error")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.WithValue(context.Background(), "test", tt.name)
if err := d.HandleSinkActivity(ctx, tt.args.event); (err != nil) != tt.wantErr {
t.Errorf("HandleSinkActivity() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func TestEventService_HandleSinkIdle(t *testing.T) {
type args struct {
event redis.SinkerUpdateEvent
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "sink idle on a sink that does not exist",
args: args{
event: redis.SinkerUpdateEvent{
OwnerID: "owner1",
SinkID: "sink1",
State: "idle",
Size: "22",
Timestamp: time.Now(),
},
},
wantErr: true,
},
{
name: "sink idle success",
args: args{
event: redis.SinkerUpdateEvent{
OwnerID: "owner2",
SinkID: "sink2",
State: "idle",
Size: "22",
Timestamp: time.Now(),
},
}, wantErr: false,
},
}
logger := zap.NewNop()
deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil)
d := NewEventService(logger, deploymentService, NewTestKubeCtr(logger))
err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{
SinkID: "sink2",
Owner: "owner2",
Backend: "prometheus",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
},
"authentication": types.Metadata{
"type": "basicauth",
"username": "prom-user-2",
"password": "dbpass-2",
},
},
})
require.NoError(t, err, "should not error")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.WithValue(context.Background(), "test", tt.name)
if err := d.HandleSinkIdle(ctx, tt.args.event); (err != nil) != tt.wantErr {
t.Errorf("HandleSinkIdle() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
Loading

0 comments on commit 6c6caeb

Please sign in to comment.