Skip to content

Commit

Permalink
add changes
Browse files Browse the repository at this point in the history
  • Loading branch information
etaques committed Oct 4, 2023
1 parent 6b05013 commit 37fa6c1
Show file tree
Hide file tree
Showing 6 changed files with 1 addition and 60 deletions.
13 changes: 0 additions & 13 deletions sinks/api/http/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,6 @@ func (l loggingMiddleware) UpdateSinkInternal(ctx context.Context, s sinks.Sink)
return l.svc.UpdateSinkInternal(ctx, s)
}

func (l loggingMiddleware) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) {
defer func(begin time.Time) {
if err != nil {
l.logger.Warn("method call: edit_sink_status_internal",
zap.Error(err),
zap.Duration("duration", time.Since(begin)))
} else {
l.logger.Debug("method call: edit_sink_status_internal",
zap.Duration("duration", time.Since(begin)))
}
}(time.Now())
return l.svc.UpdateSinkStatusInternal(ctx, s)
}
func (l loggingMiddleware) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (_ sinks.Page, err error) {
defer func(begin time.Time) {
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions sinks/api/http/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ func (m metricsMiddleware) UpdateSinkInternal(ctx context.Context, s sinks.Sink)
return m.svc.UpdateSinkInternal(ctx, s)
}

func (m metricsMiddleware) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) {

return m.svc.UpdateSinkInternal(ctx, s)
}

func (m metricsMiddleware) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (sink sinks.Page, err error) {
ownerID, err := m.identify(token)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion sinks/redis/consumer/sink_status_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (s *sinkStatusListener) ReceiveMessage(ctx context.Context, message redis.X
gotSink.Error = event.Msg
}
gotSink.State = newState
_, err = s.sinkService.UpdateSinkStatusInternal(ctx, gotSink)
err = s.sinkService.ChangeSinkStateInternal(ctx, gotSink.ID, gotSink.Error, gotSink.MFOwnerID, gotSink.State)
if err != nil {
logger.Error("failed to update sink", zap.String("owner_id", event.OwnerID),
zap.String("sink_id", event.SinkID), zap.Error(err))
Expand Down
5 changes: 0 additions & 5 deletions sinks/redis/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ func (es sinksStreamProducer) UpdateSinkInternal(ctx context.Context, s sinks.Si
return es.svc.UpdateSinkInternal(ctx, s)
}

func (es sinksStreamProducer) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) {

return es.svc.UpdateSinkStatusInternal(ctx, s)
}

func (es sinksStreamProducer) UpdateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) {
defer func() {
event := updateSinkEvent{
Expand Down
2 changes: 0 additions & 2 deletions sinks/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ type SinkService interface {
UpdateSink(ctx context.Context, token string, s Sink) (Sink, error)
// UpdateSinkInternal by id
UpdateSinkInternal(ctx context.Context, s Sink) (Sink, error)
// UpdateSinkStatusInternal by id
UpdateSinkStatusInternal(ctx context.Context, s Sink) (Sink, error)
// ListSinks retrieves data about sinks
ListSinks(ctx context.Context, token string, pm PageMetadata) (Page, error)
// ListSinksInternal retrieves data from sinks filtered by SinksFilter for Services like Maestro, to build DeploymentEntries
Expand Down
34 changes: 0 additions & 34 deletions sinks/sinks_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,40 +265,6 @@ func (svc sinkService) UpdateSinkInternal(ctx context.Context, sink Sink) (Sink,
return sinkEdited, nil
}

func (svc sinkService) UpdateSinkStatusInternal(ctx context.Context, sink Sink) (Sink, error) {
var currentSink Sink
currentSink, err := svc.sinkRepo.RetrieveById(ctx, sink.ID)
if err != nil {
return Sink{}, errors.Wrap(ErrUpdateEntity, err)
}
var cfg Configuration
authType, _ := authentication_type.GetAuthType(currentSink.GetAuthenticationTypeName())
be := backend.GetBackend(currentSink.Backend)

cfg = Configuration {
Authentication: authType,
Exporter: be,
}

err = svc.sinkRepo.UpdateSinkState(ctx, sink.ID, sink.Error, currentSink.MFOwnerID, sink.State)
if err != nil {
return Sink{}, errors.Wrap(ErrUpdateEntity, err)
}
if err != nil {
return Sink{}, errors.Wrap(ErrUpdateEntity, err)
}
sinkEdited, err := svc.sinkRepo.RetrieveById(ctx, sink.ID)
if err != nil {
return Sink{}, errors.Wrap(ErrUpdateEntity, err)
}
sinkEdited, err = svc.decryptMetadata(cfg, sinkEdited)
if err != nil {
return Sink{}, errors.Wrap(ErrUpdateEntity, err)
}

return sinkEdited, nil
}

func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) (Sink, error) {
skOwnerID, err := svc.identify(token)
if err != nil {
Expand Down

0 comments on commit 37fa6c1

Please sign in to comment.