Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(maestro): adding test cases and fixes logic for handling events #2667

Merged
merged 9 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions maestro/config/authentication_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (b *BasicAuthBuilder) DecodeAuth(config types.Metadata) (types.Metadata, er
return nil, err
}
authCfg["password"] = decodedPassword
config[AuthenticationKey] = authCfg
return config, nil
}

Expand All @@ -61,5 +62,6 @@ func (b *BasicAuthBuilder) EncodeAuth(config types.Metadata) (types.Metadata, er
return nil, err
}
authcfg["password"] = encodedPassword
config[AuthenticationKey] = authcfg
return config, nil
}
3 changes: 2 additions & 1 deletion maestro/deployment/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ type Deployment struct {
LastCollectorStopTime *time.Time `db:"last_collector_stop_time" json:"lastCollectorStopTime"`
}

func NewDeployment(ownerID string, sinkID string, config types.Metadata) Deployment {
func NewDeployment(ownerID string, sinkID string, config types.Metadata, backend string) Deployment {
now := time.Now()
deploymentName := "otel-" + sinkID
configAsByte := toByte(config)
return Deployment{
OwnerID: ownerID,
SinkID: sinkID,
Backend: backend,
Config: configAsByte,
LastStatus: "pending",
LastStatusUpdate: &now,
Expand Down
10 changes: 6 additions & 4 deletions maestro/deployment/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type deploymentService struct {

var _ Service = (*deploymentService)(nil)

func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string, maestroProducer producer.Producer) Service {
func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string,
maestroProducer producer.Producer, kubecontrol kubecontrol.Service) Service {
namedLogger := logger.Named("deployment-service")
es := password.NewEncryptionService(logger, encryptionKey)
cb := config.NewConfigBuilder(namedLogger, kafkaUrl, es)
Expand All @@ -52,6 +53,7 @@ func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl st
configBuilder: cb,
encryptionService: es,
maestroProducer: maestroProducer,
kubecontrol: kubecontrol,
}
}

Expand Down Expand Up @@ -104,7 +106,7 @@ func (d *deploymentService) GetDeployment(ctx context.Context, ownerID string, s
return nil, "", err
}
authType := deployment.GetConfig()
if authType != nil {
if authType == nil {
return nil, "", errors.New("deployment do not have authentication information")
}
value := authType.GetSubMetadata(AuthenticationKey)["type"].(string)
Expand Down Expand Up @@ -176,8 +178,8 @@ func (d *deploymentService) NotifyCollector(ctx context.Context, ownerID string,
}
} else if operation == "deploy" {
// Spin up the collector
if got.LastCollectorDeployTime != nil || got.LastCollectorDeployTime.Before(now) {
if got.LastCollectorStopTime != nil || got.LastCollectorStopTime.Before(now) {
if got.LastCollectorDeployTime == nil || got.LastCollectorDeployTime.Before(now) {
if got.LastCollectorStopTime == nil || got.LastCollectorStopTime.Before(now) {
d.logger.Debug("collector is not running deploying")
deployReq := &config.DeploymentRequest{
OwnerID: ownerID,
Expand Down
2 changes: 1 addition & 1 deletion maestro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewMaestroService(logger *zap.Logger, streamRedisClient *redis.Client, sink
kubectr := kubecontrol.NewService(logger)
repo := deployment.NewRepositoryService(db, logger)
maestroProducer := producer.NewMaestroProducer(logger, streamRedisClient)
deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey, maestroProducer)
deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey, maestroProducer, kubectr)
ps := producer.NewMaestroProducer(logger, streamRedisClient)
monitorService := monitor.NewMonitorService(logger, &sinksGrpcClient, ps, &kubectr)
eventService := service.NewEventService(logger, deploymentService, kubectr)
Expand Down
29 changes: 19 additions & 10 deletions maestro/service/deploy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr
func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
d.logger.Debug("handling sink create event", zap.String("sink-id", event.SinkID), zap.String("owner-id", event.Owner))
// Create Deployment Entry
entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config)
entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend)
// Use deploymentService, which will create deployment in both postgres and redis
err := d.deploymentService.CreateDeployment(ctx, &entry)
if err != nil {
Expand All @@ -53,8 +53,18 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.
// check if exists deployment entry from postgres
entry, _, err := d.deploymentService.GetDeployment(ctx, event.Owner, event.SinkID)
if err != nil {
d.logger.Error("error trying to get deployment entry", zap.Error(err))
return err
if err.Error() != "not found" {
d.logger.Error("error trying to get deployment entry", zap.Error(err))
return err
} else {
newEntry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend)
err := d.deploymentService.CreateDeployment(ctx, &newEntry)
if err != nil {
d.logger.Error("error trying to recreate deployment entry", zap.Error(err))
return err
}
entry = &newEntry
}
}
// async update sink status to provisioning
go func() {
Expand Down Expand Up @@ -110,15 +120,14 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi
_, err = d.deploymentService.NotifyCollector(ctx, event.OwnerID, event.SinkID, "deploy", "", "")
if err != nil {
d.logger.Error("error trying to notify collector", zap.Error(err))
err2 := d.deploymentService.UpdateStatus(ctx, event.OwnerID, event.SinkID, "provisioning_error", err.Error())
if err2 != nil {
d.logger.Warn("error during notifying provisioning error, customer will not be notified of error")
d.logger.Error("error during update provisioning error status", zap.Error(err))
return err
}
return err
}
err2 := d.deploymentService.UpdateStatus(ctx, event.OwnerID, event.SinkID, "provisioning_error", err.Error())
if err2 != nil {
d.logger.Warn("error during notifying provisioning error, customer will not be notified of error")
d.logger.Error("error during update status", zap.Error(err))
return err
}

return nil
}

Expand Down
57 changes: 0 additions & 57 deletions maestro/service/deploy_service_test.go

This file was deleted.

141 changes: 141 additions & 0 deletions maestro/service/handle_sinker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package service

import (
"context"
"github.com/orb-community/orb/maestro/deployment"
"github.com/orb-community/orb/maestro/redis"
"github.com/orb-community/orb/pkg/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"testing"
"time"
)

func TestEventService_HandleSinkActivity(t *testing.T) {
type args struct {
event redis.SinkerUpdateEvent
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "activity on a sink that does not exist",
args: args{
event: redis.SinkerUpdateEvent{
OwnerID: "owner1",
SinkID: "sink1",
State: "active",
Size: "22",
Timestamp: time.Now(),
},
},
wantErr: true,
},
{
name: "activity success",
args: args{
event: redis.SinkerUpdateEvent{
OwnerID: "owner2",
SinkID: "sink2",
State: "active",
Size: "22",
Timestamp: time.Now(),
},
}, wantErr: false,
},
}
logger := zap.NewNop()
deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092",
"MY_SECRET", NewTestProducer(logger), NewTestKubeCtr(logger))
d := NewEventService(logger, deploymentService, nil)
err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{
SinkID: "sink2",
Owner: "owner2",
Backend: "prometheus",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
},
"authentication": types.Metadata{
"type": "basicauth",
"username": "prom-user-2",
"password": "dbpass-2",
},
},
})
require.NoError(t, err, "should not error")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.WithValue(context.Background(), "test", tt.name)
if err := d.HandleSinkActivity(ctx, tt.args.event); (err != nil) != tt.wantErr {
t.Errorf("HandleSinkActivity() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func TestEventService_HandleSinkIdle(t *testing.T) {
type args struct {
event redis.SinkerUpdateEvent
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "sink idle on a sink that does not exist",
args: args{
event: redis.SinkerUpdateEvent{
OwnerID: "owner1",
SinkID: "sink1",
State: "idle",
Size: "22",
Timestamp: time.Now(),
},
},
wantErr: true,
},
{
name: "sink idle success",
args: args{
event: redis.SinkerUpdateEvent{
OwnerID: "owner2",
SinkID: "sink2",
State: "idle",
Size: "22",
Timestamp: time.Now(),
},
}, wantErr: false,
},
}
logger := zap.NewNop()
deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil)
d := NewEventService(logger, deploymentService, NewTestKubeCtr(logger))
err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{
SinkID: "sink2",
Owner: "owner2",
Backend: "prometheus",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
},
"authentication": types.Metadata{
"type": "basicauth",
"username": "prom-user-2",
"password": "dbpass-2",
},
},
})
require.NoError(t, err, "should not error")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.WithValue(context.Background(), "test", tt.name)
if err := d.HandleSinkIdle(ctx, tt.args.event); (err != nil) != tt.wantErr {
t.Errorf("HandleSinkIdle() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
Loading
Loading