From 93fe5ead622b3a969d5a090d54d2b0136efaae58 Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 14:14:48 -0300 Subject: [PATCH 01/12] set sink status when deployment status change --- maestro/deployment/service.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/maestro/deployment/service.go b/maestro/deployment/service.go index 13c0e0552..53b2abc0b 100644 --- a/maestro/deployment/service.go +++ b/maestro/deployment/service.go @@ -3,13 +3,14 @@ package deployment import ( "context" "errors" + "time" + "github.com/orb-community/orb/maestro/config" "github.com/orb-community/orb/maestro/kubecontrol" "github.com/orb-community/orb/maestro/password" "github.com/orb-community/orb/maestro/redis/producer" "github.com/orb-community/orb/pkg/types" "go.uber.org/zap" - "time" ) const AuthenticationKey = "authentication" @@ -253,7 +254,10 @@ func (d *deploymentService) UpdateStatus(ctx context.Context, ownerID string, si d.logger.Info("updated deployment status", zap.String("ownerID", updated.OwnerID), zap.String("sinkID", updated.SinkID), zap.String("status", updated.LastStatus), zap.String("errorMessage", updated.LastErrorMessage)) - + err = d.maestroProducer.PublishSinkStatus(ctx, updated.OwnerID, updated.SinkID, updated.LastStatus, "") + if err != nil { + return err + } return nil } From a47e062c4afc0915d9ea2c77e1b5544003431016 Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 16:12:59 -0300 Subject: [PATCH 02/12] fix: (maestro) monitor to kill orphaned otelcollectors --- maestro/monitor/monitor.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index b6e20463a..8e55b7ba7 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -5,12 +5,13 @@ import ( "context" "encoding/json" "errors" - "github.com/orb-community/orb/maestro/deployment" - "github.com/orb-community/orb/maestro/redis/producer" "io" "strings" "time" + "github.com/orb-community/orb/maestro/deployment" + "github.com/orb-community/orb/maestro/redis/producer" + maestroconfig "github.com/orb-community/orb/maestro/config" "github.com/orb-community/orb/maestro/kubecontrol" sinkspb "github.com/orb-community/orb/sinks/pb" @@ -172,7 +173,7 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { deploymentName := "otel-" + sinkId svc.logger.Debug("compare deploymentName with collector name", zap.String("deploy name", deploymentName), zap.String("collector name", collector.Name)) - err = svc.kubecontrol.KillOtelCollector(ctx, collector.Name, sinkId) + err = svc.kubecontrol.KillOtelCollector(ctx, deploymentName, sinkId) if err != nil { svc.logger.Error("error removing otel collector", zap.Error(err)) } From d9bc6a07041b244cae1d6ee95c90ada4cf507b85 Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 16:32:58 -0300 Subject: [PATCH 03/12] fix: (sinks) update state --- sinks/redis/consumer/sink_status_listener.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sinks/redis/consumer/sink_status_listener.go b/sinks/redis/consumer/sink_status_listener.go index fd47601cd..efd3a77b1 100644 --- a/sinks/redis/consumer/sink_status_listener.go +++ b/sinks/redis/consumer/sink_status_listener.go @@ -3,6 +3,7 @@ package consumer import ( "context" "fmt" + "github.com/go-redis/redis/v8" "github.com/orb-community/orb/sinks" redis2 "github.com/orb-community/orb/sinks/redis" @@ -84,6 +85,7 @@ func (s *sinkStatusListener) ReceiveMessage(ctx context.Context, message redis.X if newState == sinks.Error || newState == sinks.ProvisioningError || newState == sinks.Warning { gotSink.Error = event.Msg } + gotSink.State = newState _, err = s.sinkService.UpdateSinkInternal(ctx, gotSink) if err != nil { logger.Error("failed to update sink", zap.String("owner_id", event.OwnerID), From 116d11ba94c035c79315791019959493a5db35c5 Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 18:02:58 -0300 Subject: [PATCH 04/12] fix: (sinks) update state --- sinks/redis/consumer/sink_status_listener.go | 2 +- sinks/sinks_service.go | 37 +++++++++++++++++++- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/sinks/redis/consumer/sink_status_listener.go b/sinks/redis/consumer/sink_status_listener.go index efd3a77b1..57a1fc34b 100644 --- a/sinks/redis/consumer/sink_status_listener.go +++ b/sinks/redis/consumer/sink_status_listener.go @@ -86,7 +86,7 @@ func (s *sinkStatusListener) ReceiveMessage(ctx context.Context, message redis.X gotSink.Error = event.Msg } gotSink.State = newState - _, err = s.sinkService.UpdateSinkInternal(ctx, gotSink) + _, err = s.sinkService.UpdateSinkStatusInternal(ctx, gotSink) if err != nil { logger.Error("failed to update sink", zap.String("owner_id", event.OwnerID), zap.String("sink_id", event.SinkID), zap.Error(err)) diff --git a/sinks/sinks_service.go b/sinks/sinks_service.go index faf942bc1..273e67508 100644 --- a/sinks/sinks_service.go +++ b/sinks/sinks_service.go @@ -218,8 +218,9 @@ func (svc sinkService) UpdateSinkInternal(ctx context.Context, sink Sink) (Sink, defaultMetadata := make(types.Metadata, 1) defaultMetadata["opentelemetry"] = "enabled" sink.Config.Merge(defaultMetadata) - sink.State = Unknown + sink.Error = "" + sink.State = Unknown if sink.Format == "yaml" { configDataByte, err := yaml.Marshal(sink.Config) if err != nil { @@ -265,6 +266,40 @@ func (svc sinkService) UpdateSinkInternal(ctx context.Context, sink Sink) (Sink, return sinkEdited, nil } +func (svc sinkService) UpdateSinkStatusInternal(ctx context.Context, sink Sink) (Sink, error) { + var currentSink Sink + currentSink, err := svc.sinkRepo.RetrieveById(ctx, sink.ID) + if err != nil { + return Sink{}, errors.Wrap(ErrUpdateEntity, err) + } + var cfg Configuration + authType, _ := authentication_type.GetAuthType(currentSink.GetAuthenticationTypeName()) + be := backend.GetBackend(currentSink.Backend) + + cfg = Configuration { + Authentication: authType, + Exporter: be, + } + + currentSink.State = sink.State + currentSink.Error = sink.Error + + err = svc.sinkRepo.Update(ctx, currentSink) + if err != nil { + return Sink{}, errors.Wrap(ErrUpdateEntity, err) + } + sinkEdited, err := svc.sinkRepo.RetrieveById(ctx, sink.ID) + if err != nil { + return Sink{}, errors.Wrap(ErrUpdateEntity, err) + } + sinkEdited, err = svc.decryptMetadata(cfg, sinkEdited) + if err != nil { + return Sink{}, errors.Wrap(ErrUpdateEntity, err) + } + + return sinkEdited, nil +} + func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) (Sink, error) { skOwnerID, err := svc.identify(token) if err != nil { From 3c0eba92108191cee41e828d8bb382d32e0c497c Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 18:11:26 -0300 Subject: [PATCH 05/12] add changes --- sinks/api/http/logging.go | 17 ++++++++++++++++- sinks/api/http/metrics.go | 8 +++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/sinks/api/http/logging.go b/sinks/api/http/logging.go index 04527b41e..aa6f4eb60 100644 --- a/sinks/api/http/logging.go +++ b/sinks/api/http/logging.go @@ -6,11 +6,12 @@ package http import ( "context" + "time" + "github.com/orb-community/orb/sinks" "github.com/orb-community/orb/sinks/authentication_type" "github.com/orb-community/orb/sinks/backend" "go.uber.org/zap" - "time" ) var _ sinks.SinkService = (*loggingMiddleware)(nil) @@ -90,6 +91,20 @@ func (l loggingMiddleware) UpdateSinkInternal(ctx context.Context, s sinks.Sink) return l.svc.UpdateSinkInternal(ctx, s) } +func (l loggingMiddleware) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { + defer func(begin time.Time) { + if err != nil { + l.logger.Warn("method call: edit_internal_sink", + zap.Error(err), + zap.Duration("duration", time.Since(begin))) + } else { + l.logger.Debug("method call: edit_internal_sink", + zap.Duration("duration", time.Since(begin))) + } + }(time.Now()) + return l.svc.UpdateSinkInternal(ctx, s) +} + func (l loggingMiddleware) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (_ sinks.Page, err error) { defer func(begin time.Time) { if err != nil { diff --git a/sinks/api/http/metrics.go b/sinks/api/http/metrics.go index 51bac1ee3..b1aae24a3 100644 --- a/sinks/api/http/metrics.go +++ b/sinks/api/http/metrics.go @@ -6,6 +6,8 @@ package http import ( "context" + "time" + "github.com/go-kit/kit/metrics" "github.com/mainflux/mainflux" "github.com/orb-community/orb/pkg/errors" @@ -13,7 +15,6 @@ import ( "github.com/orb-community/orb/sinks/authentication_type" "github.com/orb-community/orb/sinks/backend" "go.uber.org/zap" - "time" ) var _ sinks.SinkService = (*metricsMiddleware)(nil) @@ -96,6 +97,11 @@ func (m metricsMiddleware) UpdateSinkInternal(ctx context.Context, s sinks.Sink) return m.svc.UpdateSinkInternal(ctx, s) } +func (m metricsMiddleware) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { + + return m.svc.UpdateSinkInternal(ctx, s) +} + func (m metricsMiddleware) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (sink sinks.Page, err error) { ownerID, err := m.identify(token) if err != nil { From 7df6d9e9c9777470349e4bed6b896be1511d8efc Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 18:17:38 -0300 Subject: [PATCH 06/12] add changes --- sinks/redis/producer/streams.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/sinks/redis/producer/streams.go b/sinks/redis/producer/streams.go index fb031a256..ce9c4fb17 100644 --- a/sinks/redis/producer/streams.go +++ b/sinks/redis/producer/streams.go @@ -10,6 +10,7 @@ package producer import ( "context" + "github.com/orb-community/orb/sinks/authentication_type" "github.com/go-redis/redis/v8" @@ -104,6 +105,35 @@ func (es sinksStreamProducer) UpdateSinkInternal(ctx context.Context, s sinks.Si return es.svc.UpdateSinkInternal(ctx, s) } +func (es sinksStreamProducer) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { + defer func() { + event := updateSinkEvent{ + sinkID: sink.ID, + owner: sink.MFOwnerID, + config: sink.Config, + backend: sink.Backend, + } + + encode, err := event.Encode() + if err != nil { + es.logger.Error("error encoding object", zap.Error(err)) + } + + record := &redis.XAddArgs{ + Stream: streamID, + MaxLen: streamLen, + Approx: true, + Values: encode, + } + + err = es.client.XAdd(ctx, record).Err() + if err != nil { + es.logger.Error("error sending event to sinks event store", zap.Error(err)) + } + }() + return es.svc.UpdateSinkStatusInternal(ctx, s) +} + func (es sinksStreamProducer) UpdateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) { defer func() { event := updateSinkEvent{ From b160603329efe83c55498b394c0346e809d37db3 Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 20:50:18 -0300 Subject: [PATCH 07/12] add changes --- sinks/redis/producer/streams.go | 26 +------------------------- sinks/sinks_service.go | 8 ++++---- 2 files changed, 5 insertions(+), 29 deletions(-) diff --git a/sinks/redis/producer/streams.go b/sinks/redis/producer/streams.go index ce9c4fb17..53d7d846e 100644 --- a/sinks/redis/producer/streams.go +++ b/sinks/redis/producer/streams.go @@ -106,31 +106,7 @@ func (es sinksStreamProducer) UpdateSinkInternal(ctx context.Context, s sinks.Si } func (es sinksStreamProducer) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { - defer func() { - event := updateSinkEvent{ - sinkID: sink.ID, - owner: sink.MFOwnerID, - config: sink.Config, - backend: sink.Backend, - } - - encode, err := event.Encode() - if err != nil { - es.logger.Error("error encoding object", zap.Error(err)) - } - - record := &redis.XAddArgs{ - Stream: streamID, - MaxLen: streamLen, - Approx: true, - Values: encode, - } - - err = es.client.XAdd(ctx, record).Err() - if err != nil { - es.logger.Error("error sending event to sinks event store", zap.Error(err)) - } - }() + return es.svc.UpdateSinkStatusInternal(ctx, s) } diff --git a/sinks/sinks_service.go b/sinks/sinks_service.go index 273e67508..eb57f838c 100644 --- a/sinks/sinks_service.go +++ b/sinks/sinks_service.go @@ -281,10 +281,10 @@ func (svc sinkService) UpdateSinkStatusInternal(ctx context.Context, sink Sink) Exporter: be, } - currentSink.State = sink.State - currentSink.Error = sink.Error - - err = svc.sinkRepo.Update(ctx, currentSink) + err = svc.sinkRepo.UpdateSinkState(ctx, sink.ID, sink.Error, currentSink.MFOwnerID, sink.State) + if err != nil { + return Sink{}, errors.Wrap(ErrUpdateEntity, err) + } if err != nil { return Sink{}, errors.Wrap(ErrUpdateEntity, err) } From 9eb3a5608026e4697cb67cdbb3912107bd37d17b Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 20:55:25 -0300 Subject: [PATCH 08/12] add changes --- sinks/api/http/logging.go | 4 ++-- sinks/sinks_service.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sinks/api/http/logging.go b/sinks/api/http/logging.go index aa6f4eb60..64a301920 100644 --- a/sinks/api/http/logging.go +++ b/sinks/api/http/logging.go @@ -94,11 +94,11 @@ func (l loggingMiddleware) UpdateSinkInternal(ctx context.Context, s sinks.Sink) func (l loggingMiddleware) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { defer func(begin time.Time) { if err != nil { - l.logger.Warn("method call: edit_internal_sink", + l.logger.Warn("method call: edit_sink_status_internal", zap.Error(err), zap.Duration("duration", time.Since(begin))) } else { - l.logger.Debug("method call: edit_internal_sink", + l.logger.Debug("method call: edit_sink_status_internal", zap.Duration("duration", time.Since(begin))) } }(time.Now()) diff --git a/sinks/sinks_service.go b/sinks/sinks_service.go index eb57f838c..a8070b590 100644 --- a/sinks/sinks_service.go +++ b/sinks/sinks_service.go @@ -218,9 +218,9 @@ func (svc sinkService) UpdateSinkInternal(ctx context.Context, sink Sink) (Sink, defaultMetadata := make(types.Metadata, 1) defaultMetadata["opentelemetry"] = "enabled" sink.Config.Merge(defaultMetadata) - - sink.Error = "" sink.State = Unknown + sink.Error = "" + if sink.Format == "yaml" { configDataByte, err := yaml.Marshal(sink.Config) if err != nil { From b15daba3c81c71c7c1203031849a9baf255b897d Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 20:57:13 -0300 Subject: [PATCH 09/12] add changes --- sinks/sinks_service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sinks/sinks_service.go b/sinks/sinks_service.go index a8070b590..c17288e4e 100644 --- a/sinks/sinks_service.go +++ b/sinks/sinks_service.go @@ -220,7 +220,6 @@ func (svc sinkService) UpdateSinkInternal(ctx context.Context, sink Sink) (Sink, sink.Config.Merge(defaultMetadata) sink.State = Unknown sink.Error = "" - if sink.Format == "yaml" { configDataByte, err := yaml.Marshal(sink.Config) if err != nil { From 8ca75e55a9cab4748441b2ad22985886b9b1bdf5 Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 21:00:14 -0300 Subject: [PATCH 10/12] add changes --- sinks/api/http/logging.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sinks/api/http/logging.go b/sinks/api/http/logging.go index 64a301920..f7163e7ea 100644 --- a/sinks/api/http/logging.go +++ b/sinks/api/http/logging.go @@ -102,8 +102,7 @@ func (l loggingMiddleware) UpdateSinkStatusInternal(ctx context.Context, s sinks zap.Duration("duration", time.Since(begin))) } }(time.Now()) - return l.svc.UpdateSinkInternal(ctx, s) -} + return l.svc.UpdateSinkStatusInternal(ctx, s) func (l loggingMiddleware) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (_ sinks.Page, err error) { defer func(begin time.Time) { From 6b05013ded8abda9590e77a8cb5e346af1f4ce37 Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 21:09:57 -0300 Subject: [PATCH 11/12] add changes --- sinks/api/http/logging.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sinks/api/http/logging.go b/sinks/api/http/logging.go index f7163e7ea..aaf2fab74 100644 --- a/sinks/api/http/logging.go +++ b/sinks/api/http/logging.go @@ -103,7 +103,7 @@ func (l loggingMiddleware) UpdateSinkStatusInternal(ctx context.Context, s sinks } }(time.Now()) return l.svc.UpdateSinkStatusInternal(ctx, s) - +} func (l loggingMiddleware) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (_ sinks.Page, err error) { defer func(begin time.Time) { if err != nil { From 37fa6c173d465101cf17bf564dee3a5879ac00d6 Mon Sep 17 00:00:00 2001 From: etaques Date: Tue, 3 Oct 2023 21:44:48 -0300 Subject: [PATCH 12/12] add changes --- sinks/api/http/logging.go | 13 -------- sinks/api/http/metrics.go | 5 --- sinks/redis/consumer/sink_status_listener.go | 2 +- sinks/redis/producer/streams.go | 5 --- sinks/sinks.go | 2 -- sinks/sinks_service.go | 34 -------------------- 6 files changed, 1 insertion(+), 60 deletions(-) diff --git a/sinks/api/http/logging.go b/sinks/api/http/logging.go index aaf2fab74..2bde997f0 100644 --- a/sinks/api/http/logging.go +++ b/sinks/api/http/logging.go @@ -91,19 +91,6 @@ func (l loggingMiddleware) UpdateSinkInternal(ctx context.Context, s sinks.Sink) return l.svc.UpdateSinkInternal(ctx, s) } -func (l loggingMiddleware) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { - defer func(begin time.Time) { - if err != nil { - l.logger.Warn("method call: edit_sink_status_internal", - zap.Error(err), - zap.Duration("duration", time.Since(begin))) - } else { - l.logger.Debug("method call: edit_sink_status_internal", - zap.Duration("duration", time.Since(begin))) - } - }(time.Now()) - return l.svc.UpdateSinkStatusInternal(ctx, s) -} func (l loggingMiddleware) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (_ sinks.Page, err error) { defer func(begin time.Time) { if err != nil { diff --git a/sinks/api/http/metrics.go b/sinks/api/http/metrics.go index b1aae24a3..7ef0edcfb 100644 --- a/sinks/api/http/metrics.go +++ b/sinks/api/http/metrics.go @@ -97,11 +97,6 @@ func (m metricsMiddleware) UpdateSinkInternal(ctx context.Context, s sinks.Sink) return m.svc.UpdateSinkInternal(ctx, s) } -func (m metricsMiddleware) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { - - return m.svc.UpdateSinkInternal(ctx, s) -} - func (m metricsMiddleware) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (sink sinks.Page, err error) { ownerID, err := m.identify(token) if err != nil { diff --git a/sinks/redis/consumer/sink_status_listener.go b/sinks/redis/consumer/sink_status_listener.go index 5dc97fe8c..55fe32730 100644 --- a/sinks/redis/consumer/sink_status_listener.go +++ b/sinks/redis/consumer/sink_status_listener.go @@ -89,7 +89,7 @@ func (s *sinkStatusListener) ReceiveMessage(ctx context.Context, message redis.X gotSink.Error = event.Msg } gotSink.State = newState - _, err = s.sinkService.UpdateSinkStatusInternal(ctx, gotSink) + err = s.sinkService.ChangeSinkStateInternal(ctx, gotSink.ID, gotSink.Error, gotSink.MFOwnerID, gotSink.State) if err != nil { logger.Error("failed to update sink", zap.String("owner_id", event.OwnerID), zap.String("sink_id", event.SinkID), zap.Error(err)) diff --git a/sinks/redis/producer/streams.go b/sinks/redis/producer/streams.go index 53d7d846e..01b68dff4 100644 --- a/sinks/redis/producer/streams.go +++ b/sinks/redis/producer/streams.go @@ -105,11 +105,6 @@ func (es sinksStreamProducer) UpdateSinkInternal(ctx context.Context, s sinks.Si return es.svc.UpdateSinkInternal(ctx, s) } -func (es sinksStreamProducer) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { - - return es.svc.UpdateSinkStatusInternal(ctx, s) -} - func (es sinksStreamProducer) UpdateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) { defer func() { event := updateSinkEvent{ diff --git a/sinks/sinks.go b/sinks/sinks.go index 9f69ef452..a5e8bf29e 100644 --- a/sinks/sinks.go +++ b/sinks/sinks.go @@ -159,8 +159,6 @@ type SinkService interface { UpdateSink(ctx context.Context, token string, s Sink) (Sink, error) // UpdateSinkInternal by id UpdateSinkInternal(ctx context.Context, s Sink) (Sink, error) - // UpdateSinkStatusInternal by id - UpdateSinkStatusInternal(ctx context.Context, s Sink) (Sink, error) // ListSinks retrieves data about sinks ListSinks(ctx context.Context, token string, pm PageMetadata) (Page, error) // ListSinksInternal retrieves data from sinks filtered by SinksFilter for Services like Maestro, to build DeploymentEntries diff --git a/sinks/sinks_service.go b/sinks/sinks_service.go index c17288e4e..faf942bc1 100644 --- a/sinks/sinks_service.go +++ b/sinks/sinks_service.go @@ -265,40 +265,6 @@ func (svc sinkService) UpdateSinkInternal(ctx context.Context, sink Sink) (Sink, return sinkEdited, nil } -func (svc sinkService) UpdateSinkStatusInternal(ctx context.Context, sink Sink) (Sink, error) { - var currentSink Sink - currentSink, err := svc.sinkRepo.RetrieveById(ctx, sink.ID) - if err != nil { - return Sink{}, errors.Wrap(ErrUpdateEntity, err) - } - var cfg Configuration - authType, _ := authentication_type.GetAuthType(currentSink.GetAuthenticationTypeName()) - be := backend.GetBackend(currentSink.Backend) - - cfg = Configuration { - Authentication: authType, - Exporter: be, - } - - err = svc.sinkRepo.UpdateSinkState(ctx, sink.ID, sink.Error, currentSink.MFOwnerID, sink.State) - if err != nil { - return Sink{}, errors.Wrap(ErrUpdateEntity, err) - } - if err != nil { - return Sink{}, errors.Wrap(ErrUpdateEntity, err) - } - sinkEdited, err := svc.sinkRepo.RetrieveById(ctx, sink.ID) - if err != nil { - return Sink{}, errors.Wrap(ErrUpdateEntity, err) - } - sinkEdited, err = svc.decryptMetadata(cfg, sinkEdited) - if err != nil { - return Sink{}, errors.Wrap(ErrUpdateEntity, err) - } - - return sinkEdited, nil -} - func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) (Sink, error) { skOwnerID, err := svc.identify(token) if err != nil {