Skip to content

Commit

Permalink
fix: (sinks) update state
Browse files Browse the repository at this point in the history
  • Loading branch information
etaques committed Oct 3, 2023
1 parent d9bc6a0 commit 116d11b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
2 changes: 1 addition & 1 deletion sinks/redis/consumer/sink_status_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *sinkStatusListener) ReceiveMessage(ctx context.Context, message redis.X
gotSink.Error = event.Msg
}
gotSink.State = newState
_, err = s.sinkService.UpdateSinkInternal(ctx, gotSink)
_, err = s.sinkService.UpdateSinkStatusInternal(ctx, gotSink)
if err != nil {
logger.Error("failed to update sink", zap.String("owner_id", event.OwnerID),
zap.String("sink_id", event.SinkID), zap.Error(err))
Expand Down
37 changes: 36 additions & 1 deletion sinks/sinks_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ func (svc sinkService) UpdateSinkInternal(ctx context.Context, sink Sink) (Sink,
defaultMetadata := make(types.Metadata, 1)
defaultMetadata["opentelemetry"] = "enabled"
sink.Config.Merge(defaultMetadata)
sink.State = Unknown

sink.Error = ""
sink.State = Unknown
if sink.Format == "yaml" {
configDataByte, err := yaml.Marshal(sink.Config)
if err != nil {
Expand Down Expand Up @@ -265,6 +266,40 @@ func (svc sinkService) UpdateSinkInternal(ctx context.Context, sink Sink) (Sink,
return sinkEdited, nil
}

func (svc sinkService) UpdateSinkStatusInternal(ctx context.Context, sink Sink) (Sink, error) {
var currentSink Sink
currentSink, err := svc.sinkRepo.RetrieveById(ctx, sink.ID)
if err != nil {
return Sink{}, errors.Wrap(ErrUpdateEntity, err)
}
var cfg Configuration
authType, _ := authentication_type.GetAuthType(currentSink.GetAuthenticationTypeName())
be := backend.GetBackend(currentSink.Backend)

cfg = Configuration {
Authentication: authType,
Exporter: be,
}

currentSink.State = sink.State
currentSink.Error = sink.Error

err = svc.sinkRepo.Update(ctx, currentSink)
if err != nil {
return Sink{}, errors.Wrap(ErrUpdateEntity, err)
}
sinkEdited, err := svc.sinkRepo.RetrieveById(ctx, sink.ID)
if err != nil {
return Sink{}, errors.Wrap(ErrUpdateEntity, err)
}
sinkEdited, err = svc.decryptMetadata(cfg, sinkEdited)
if err != nil {
return Sink{}, errors.Wrap(ErrUpdateEntity, err)
}

return sinkEdited, nil
}

func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) (Sink, error) {
skOwnerID, err := svc.identify(token)
if err != nil {
Expand Down

0 comments on commit 116d11b

Please sign in to comment.