Skip to content

Commit

Permalink
Merge pull request #2666 from lpegoraro/fix-eng-1118-create-event
Browse files Browse the repository at this point in the history
fix(maestro): fixes create event flow.
  • Loading branch information
lpegoraro authored Sep 28, 2023
2 parents 0837afd + e491554 commit b14e3b2
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 12 deletions.
6 changes: 0 additions & 6 deletions kind/Chart.lock

This file was deleted.

2 changes: 1 addition & 1 deletion kind/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ appVersion: "1.0.0"

dependencies:
- name: orb
version: "1.0.44"
version: "1.0.50"
repository: "@orb-community"
13 changes: 9 additions & 4 deletions maestro/deployment/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,16 @@ type deploymentService struct {

var _ Service = (*deploymentService)(nil)

func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string) Service {
func NewDeploymentService(logger *zap.Logger, repository Repository, kafkaUrl string, encryptionKey string, maestroProducer producer.Producer) Service {
namedLogger := logger.Named("deployment-service")
es := password.NewEncryptionService(logger, encryptionKey)
cb := config.NewConfigBuilder(namedLogger, kafkaUrl, es)
return &deploymentService{logger: namedLogger, dbRepository: repository, configBuilder: cb, encryptionService: es}
return &deploymentService{logger: namedLogger,
dbRepository: repository,
configBuilder: cb,
encryptionService: es,
maestroProducer: maestroProducer,
}
}

func (d *deploymentService) CreateDeployment(ctx context.Context, deployment *Deployment) error {
Expand Down Expand Up @@ -82,12 +87,12 @@ func (d *deploymentService) getAuthBuilder(authType string) config.AuthBuilderSe

func (d *deploymentService) encodeConfig(deployment *Deployment) (types.Metadata, error) {
authType := deployment.GetConfig()
if authType != nil {
if authType == nil {
return nil, errors.New("deployment do not have authentication information")
}
value := authType.GetSubMetadata(AuthenticationKey)["type"].(string)
authBuilder := d.getAuthBuilder(value)
if authBuilder != nil {
if authBuilder == nil {
return nil, errors.New("deployment do not have authentication information")
}
return authBuilder.EncodeAuth(deployment.GetConfig())
Expand Down
3 changes: 2 additions & 1 deletion maestro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func NewMaestroService(logger *zap.Logger, streamRedisClient *redis.Client, sink
sinksGrpcClient sinkspb.SinkServiceClient, otelCfg config.OtelConfig, db *sqlx.DB, svcCfg config.BaseSvcConfig) Service {
kubectr := kubecontrol.NewService(logger)
repo := deployment.NewRepositoryService(db, logger)
deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey)
maestroProducer := producer.NewMaestroProducer(logger, streamRedisClient)
deploymentService := deployment.NewDeploymentService(logger, repo, otelCfg.KafkaUrl, svcCfg.EncryptionKey, maestroProducer)
ps := producer.NewMaestroProducer(logger, streamRedisClient)
monitorService := monitor.NewMonitorService(logger, &sinksGrpcClient, ps, &kubectr)
eventService := service.NewEventService(logger, deploymentService, kubectr)
Expand Down
57 changes: 57 additions & 0 deletions maestro/service/deploy_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package service

import (
"context"
"github.com/orb-community/orb/maestro/deployment"
"github.com/orb-community/orb/maestro/redis"
"github.com/orb-community/orb/pkg/types"
"go.uber.org/zap"
"testing"
"time"
)

func Test_eventService_HandleSinkCreate(t *testing.T) {

type args struct {
event redis.SinksUpdateEvent
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "create event",
args: args{
event: redis.SinksUpdateEvent{
SinkID: "sink1",
Owner: "owner1",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
},
"authentication": types.Metadata{
"type": "basicauth",
"username": "prom-user",
"password": "dbpass",
},
},
Backend: "prometheus",
Timestamp: time.Now(),
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := zap.NewNop()
deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger))
d := NewEventService(logger, deploymentService, nil)
ctx := context.WithValue(context.Background(), "test", tt.name)
if err := d.HandleSinkCreate(ctx, tt.args.event); (err != nil) != tt.wantErr {
t.Errorf("HandleSinkCreate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
19 changes: 19 additions & 0 deletions maestro/service/producer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package service

import (
"context"
"github.com/orb-community/orb/maestro/redis/producer"
"go.uber.org/zap"
)

type testProducer struct {
logger *zap.Logger
}

func NewTestProducer(logger *zap.Logger) producer.Producer {
return &testProducer{logger: logger}
}

func (t *testProducer) PublishSinkStatus(_ context.Context, _ string, _ string, _ string, _ string) error {
return nil
}
57 changes: 57 additions & 0 deletions maestro/service/repository_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package service

import (
"context"
"errors"
"github.com/orb-community/orb/maestro/deployment"
"go.uber.org/zap"
)

type fakeRepository struct {
logger *zap.Logger
inMemoryDict map[string]*deployment.Deployment
}

func NewFakeRepository(logger *zap.Logger) deployment.Repository {
return &fakeRepository{logger: logger, inMemoryDict: make(map[string]*deployment.Deployment)}
}

func (f *fakeRepository) FetchAll(ctx context.Context) ([]deployment.Deployment, error) {
var allDeployments []deployment.Deployment
for _, deploy := range f.inMemoryDict {
allDeployments = append(allDeployments, *deploy)
}
return allDeployments, nil
}

func (f *fakeRepository) Add(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) {
deployment.Id = "fake-id"
f.inMemoryDict[deployment.SinkID] = deployment
return deployment, nil
}

func (f *fakeRepository) Update(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) {
f.inMemoryDict[deployment.SinkID] = deployment
return deployment, nil
}

func (f *fakeRepository) UpdateStatus(ctx context.Context, ownerID string, sinkId string, status string, errorMessage string) error {
return nil
}

func (f *fakeRepository) Remove(ctx context.Context, ownerId string, sinkId string) error {
delete(f.inMemoryDict, sinkId)
return nil
}

func (f *fakeRepository) FindByOwnerAndSink(ctx context.Context, _ string, sinkId string) (*deployment.Deployment, error) {
deploy, ok := f.inMemoryDict[sinkId]
if ok {
return deploy, nil
}
return nil, errors.New("not found")
}

func (f *fakeRepository) FindByCollectorName(ctx context.Context, collectorName string) (*deployment.Deployment, error) {
return nil, nil
}

0 comments on commit b14e3b2

Please sign in to comment.