Skip to content

Commit

Permalink
feat(maestro): fixes create flow.
Browse files Browse the repository at this point in the history
  • Loading branch information
Luiz Pegoraro committed Sep 28, 2023
1 parent abba61b commit 26b3ae7
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 25 deletions.
3 changes: 2 additions & 1 deletion maestro/deployment/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ type Deployment struct {
LastCollectorStopTime *time.Time `db:"last_collector_stop_time" json:"lastCollectorStopTime"`
}

func NewDeployment(ownerID string, sinkID string, config types.Metadata) Deployment {
func NewDeployment(ownerID string, sinkID string, config types.Metadata, backend string) Deployment {
now := time.Now()
deploymentName := "otel-" + sinkID
configAsByte := toByte(config)
return Deployment{
OwnerID: ownerID,
SinkID: sinkID,
Backend: backend,
Config: configAsByte,
LastStatus: "pending",
LastStatusUpdate: &now,
Expand Down
4 changes: 2 additions & 2 deletions maestro/service/deploy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewEventService(logger *zap.Logger, service deployment.Service, _ kubecontr
func (d *eventService) HandleSinkCreate(ctx context.Context, event maestroredis.SinksUpdateEvent) error {
d.logger.Debug("handling sink create event", zap.String("sink-id", event.SinkID), zap.String("owner-id", event.Owner))
// Create Deployment Entry
entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config)
entry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend)
// Use deploymentService, which will create deployment in both postgres and redis
err := d.deploymentService.CreateDeployment(ctx, &entry)
if err != nil {
Expand All @@ -57,7 +57,7 @@ func (d *eventService) HandleSinkUpdate(ctx context.Context, event maestroredis.
d.logger.Error("error trying to get deployment entry", zap.Error(err))
return err
} else {
newEntry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config)
newEntry := deployment.NewDeployment(event.Owner, event.SinkID, event.Config, event.Backend)
err := d.deploymentService.CreateDeployment(ctx, &newEntry)
if err != nil {
d.logger.Error("error trying to recreate deployment entry", zap.Error(err))
Expand Down
10 changes: 6 additions & 4 deletions maestro/service/handle_sinker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ func TestEventService_HandleSinkActivity(t *testing.T) {
"MY_SECRET", NewTestProducer(logger), NewTestKubeCtr(logger))
d := NewEventService(logger, deploymentService, nil)
err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{
SinkID: "sink2",
Owner: "owner2",
SinkID: "sink2",
Owner: "owner2",
Backend: "prometheus",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
Expand Down Expand Up @@ -114,8 +115,9 @@ func TestEventService_HandleSinkIdle(t *testing.T) {
deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil)
d := NewEventService(logger, deploymentService, NewTestKubeCtr(logger))
err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{
SinkID: "sink2",
Owner: "owner2",
SinkID: "sink2",
Owner: "owner2",
Backend: "prometheus",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
Expand Down
21 changes: 12 additions & 9 deletions maestro/service/handle_sinks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ func TestEventService_HandleSinkUpdate(t *testing.T) {
name: "update event success",
args: args{
event: redis.SinksUpdateEvent{
SinkID: "sink1",
Owner: "owner1",
SinkID: "sink1",
Owner: "owner1",
Backend: "prometheus",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
Expand All @@ -116,7 +117,6 @@ func TestEventService_HandleSinkUpdate(t *testing.T) {
"password": "dbpass-2",
},
},
Backend: "prometheus",
Timestamp: time.Now(),
},
},
Expand Down Expand Up @@ -149,8 +149,9 @@ func TestEventService_HandleSinkDelete(t *testing.T) {
name: "delete event when there is none in db",
args: args{
event: redis.SinksUpdateEvent{
SinkID: "sink1",
Owner: "owner1",
SinkID: "sink1",
Owner: "owner1",
Backend: "prometheus",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
Expand All @@ -169,8 +170,9 @@ func TestEventService_HandleSinkDelete(t *testing.T) {
name: "delete event success",
args: args{
event: redis.SinksUpdateEvent{
SinkID: "sink2",
Owner: "owner2",
SinkID: "sink2",
Owner: "owner2",
Backend: "prometheus",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
Expand All @@ -190,8 +192,9 @@ func TestEventService_HandleSinkDelete(t *testing.T) {
deploymentService := deployment.NewDeploymentService(logger, NewFakeRepository(logger), "kafka:9092", "MY_SECRET", NewTestProducer(logger), nil)
d := NewEventService(logger, deploymentService, nil)
err := d.HandleSinkCreate(context.Background(), redis.SinksUpdateEvent{
SinkID: "sink2",
Owner: "owner2",
SinkID: "sink2",
Owner: "owner2",
Backend: "prometheus",
Config: types.Metadata{
"exporter": types.Metadata{
"remote_host": "https://acme.com/prom/push",
Expand Down
40 changes: 31 additions & 9 deletions maestro/service/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,61 @@ func NewFakeRepository(logger *zap.Logger) deployment.Repository {
func (f *fakeRepository) FetchAll(ctx context.Context) ([]deployment.Deployment, error) {
var allDeployments []deployment.Deployment
for _, deploy := range f.inMemoryDict {
allDeployments = append(allDeployments, *deploy)
copy := copyDeploy(deploy)
allDeployments = append(allDeployments, copy)
}
return allDeployments, nil
}

func (f *fakeRepository) Add(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) {
func (f *fakeRepository) Add(_ context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) {
deployment.Id = "fake-id"
f.inMemoryDict[deployment.SinkID] = deployment
copy := copyDeploy(deployment)
f.inMemoryDict[deployment.SinkID] = &copy
return deployment, nil
}

func (f *fakeRepository) Update(ctx context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) {
f.inMemoryDict[deployment.SinkID] = deployment
func (f *fakeRepository) Update(_ context.Context, deployment *deployment.Deployment) (*deployment.Deployment, error) {
copy := copyDeploy(deployment)
f.inMemoryDict[deployment.SinkID] = &copy
return deployment, nil
}

func (f *fakeRepository) UpdateStatus(ctx context.Context, ownerID string, sinkId string, status string, errorMessage string) error {
func (f *fakeRepository) UpdateStatus(_ context.Context, _ string, _ string, _ string, _ string) error {
return nil
}

func (f *fakeRepository) Remove(ctx context.Context, ownerId string, sinkId string) error {
func (f *fakeRepository) Remove(_ context.Context, _ string, sinkId string) error {
delete(f.inMemoryDict, sinkId)
return nil
}

func (f *fakeRepository) FindByOwnerAndSink(ctx context.Context, _ string, sinkId string) (*deployment.Deployment, error) {
deploy, ok := f.inMemoryDict[sinkId]
if ok {
return deploy, nil
copy := copyDeploy(deploy)
return &copy, nil
}
return nil, errors.New("not found")
}

func (f *fakeRepository) FindByCollectorName(ctx context.Context, collectorName string) (*deployment.Deployment, error) {
func (f *fakeRepository) FindByCollectorName(_ context.Context, _ string) (*deployment.Deployment, error) {
return nil, nil
}

func copyDeploy(src *deployment.Deployment) deployment.Deployment {
deploy := deployment.Deployment{
Id: src.Id,
OwnerID: src.OwnerID,
SinkID: src.SinkID,
Backend: src.Backend,
Config: src.Config,
LastStatus: src.LastStatus,
LastStatusUpdate: src.LastStatusUpdate,
LastErrorMessage: src.LastErrorMessage,
LastErrorTime: src.LastErrorTime,
CollectorName: src.CollectorName,
LastCollectorDeployTime: src.LastCollectorDeployTime,
LastCollectorStopTime: src.LastCollectorStopTime,
}
return deploy
}

0 comments on commit 26b3ae7

Please sign in to comment.