diff --git a/maestro/deployment/service.go b/maestro/deployment/service.go index 53b2abc0b..cab9de5b6 100644 --- a/maestro/deployment/service.go +++ b/maestro/deployment/service.go @@ -165,6 +165,10 @@ func (d *deploymentService) UpdateDeployment(ctx context.Context, deployment *De if err != nil { return err } + err = d.maestroProducer.PublishSinkStatus(ctx, deployment.OwnerID, deployment.SinkID, "unknown", "") + if err != nil { + return err + } d.logger.Info("updated deployment", zap.String("ownerID", updated.OwnerID), zap.String("sinkID", updated.SinkID)) return nil @@ -254,7 +258,7 @@ func (d *deploymentService) UpdateStatus(ctx context.Context, ownerID string, si d.logger.Info("updated deployment status", zap.String("ownerID", updated.OwnerID), zap.String("sinkID", updated.SinkID), zap.String("status", updated.LastStatus), zap.String("errorMessage", updated.LastErrorMessage)) - err = d.maestroProducer.PublishSinkStatus(ctx, updated.OwnerID, updated.SinkID, updated.LastStatus, "") + err = d.maestroProducer.PublishSinkStatus(ctx, updated.OwnerID, updated.SinkID, updated.LastStatus, errorMessage) if err != nil { return err } diff --git a/sinks/postgres/sinks_test.go b/sinks/postgres/sinks_test.go index 974d55e42..bdb4788d6 100644 --- a/sinks/postgres/sinks_test.go +++ b/sinks/postgres/sinks_test.go @@ -580,8 +580,16 @@ func TestUpdateSinkState(t *testing.T) { for desc, tc := range cases { t.Run(desc, func(t *testing.T) { + ctx := context.WithValue(context.Background(), "test", desc) err := sinkRepo.UpdateSinkState(context.Background(), tc.sinkID, tc.msg, tc.ownerID, tc.state) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", desc, tc.err, err)) + // only validate success scenarios + if tc.err == nil { + got, err := sinkRepo.RetrieveById(ctx, sinkID) + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", desc, tc.err, err)) + assert.Equal(t, tc.state, got.State, fmt.Sprintf("%s: expected state %d got %d", desc, tc.state, got.State)) + assert.Equal(t, tc.msg, got.Error, fmt.Sprintf("%s: expected msg %s got %s", desc, tc.msg, got.Error)) + } }) } diff --git a/sinks/sinks_service.go b/sinks/sinks_service.go index faf942bc1..dee2c669d 100644 --- a/sinks/sinks_service.go +++ b/sinks/sinks_service.go @@ -310,8 +310,6 @@ func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) defaultMetadata := make(types.Metadata, 1) defaultMetadata["opentelemetry"] = "enabled" sink.Config.Merge(defaultMetadata) - sink.State = Unknown - sink.Error = "" if sink.Format == "yaml" { configDataByte, err := yaml.Marshal(sink.Config) if err != nil { @@ -475,9 +473,7 @@ func (svc sinkService) ChangeSinkStateInternal(ctx context.Context, sinkID strin } func (svc sinkService) validateBackend(sink *Sink) (be backend.Backend, err error) { - if backend.HaveBackend(sink.Backend) { - sink.State = Unknown - } else { + if !backend.HaveBackend(sink.Backend) { return nil, ErrInvalidBackend } sinkBe := backend.GetBackend(sink.Backend)