Skip to content

Commit

Permalink
add changes
Browse files Browse the repository at this point in the history
  • Loading branch information
etaques committed Oct 3, 2023
1 parent 3c0eba9 commit 7df6d9e
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions sinks/redis/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package producer

import (
"context"

"github.com/orb-community/orb/sinks/authentication_type"

"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -104,6 +105,35 @@ func (es sinksStreamProducer) UpdateSinkInternal(ctx context.Context, s sinks.Si
return es.svc.UpdateSinkInternal(ctx, s)
}

func (es sinksStreamProducer) UpdateSinkStatusInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) {
defer func() {
event := updateSinkEvent{
sinkID: sink.ID,
owner: sink.MFOwnerID,
config: sink.Config,
backend: sink.Backend,
}

encode, err := event.Encode()
if err != nil {
es.logger.Error("error encoding object", zap.Error(err))
}

record := &redis.XAddArgs{
Stream: streamID,
MaxLen: streamLen,
Approx: true,
Values: encode,
}

err = es.client.XAdd(ctx, record).Err()
if err != nil {
es.logger.Error("error sending event to sinks event store", zap.Error(err))
}
}()
return es.svc.UpdateSinkStatusInternal(ctx, s)
}

func (es sinksStreamProducer) UpdateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) {
defer func() {
event := updateSinkEvent{
Expand Down

0 comments on commit 7df6d9e

Please sign in to comment.