diff --git a/sinks/redis/producer/streams.go b/sinks/redis/producer/streams.go index fb031a256..ce9c4fb17 100644 --- a/sinks/redis/producer/streams.go +++ b/sinks/redis/producer/streams.go @@ -10,6 +10,7 @@ package producer import ( "context" + "github.com/orb-community/orb/sinks/authentication_type" "github.com/go-redis/redis/v8" @@ -104,6 +105,35 @@ func (es sinksStreamProducer) UpdateSinkInternal(ctx context.Context, s sinks.Si return es.svc.UpdateSinkInternal(ctx, s) } +func (es sinksStreamProducer) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { + defer func() { + event := updateSinkEvent{ + sinkID: sink.ID, + owner: sink.MFOwnerID, + config: sink.Config, + backend: sink.Backend, + } + + encode, err := event.Encode() + if err != nil { + es.logger.Error("error encoding object", zap.Error(err)) + } + + record := &redis.XAddArgs{ + Stream: streamID, + MaxLen: streamLen, + Approx: true, + Values: encode, + } + + err = es.client.XAdd(ctx, record).Err() + if err != nil { + es.logger.Error("error sending event to sinks event store", zap.Error(err)) + } + }() + return es.svc.UpdateSinkStatusInternal(ctx, s) +} + func (es sinksStreamProducer) UpdateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) { defer func() { event := updateSinkEvent{