Skip to content

Commit

Permalink
Merge pull request #2722 from orb-community/etaques-patch-17
Browse files Browse the repository at this point in the history
fix: (sinker): idle state based key expire events
  • Loading branch information
lpegoraro authored Oct 6, 2023
2 parents a408cab + 45e1a75 commit c5e125f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 33 deletions.
18 changes: 11 additions & 7 deletions sinker/redis/consumer/sink_key_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package consumer

import (
"context"
"strconv"

"github.com/go-redis/redis/v8"
"github.com/orb-community/orb/sinker/redis/producer"
"go.uber.org/zap"
Expand All @@ -28,16 +30,17 @@ func NewSinkerKeyExpirationListener(l *zap.Logger, cacheRedisClient *redis.Clien
// SubscribeToKeyExpiration to be used to subscribe to the sinker key expiration
func (s *sinkerKeyExpirationListener) SubscribeToKeyExpiration(ctx context.Context) error {
go func() {
pubsub := s.cacheRedisClient.Subscribe(ctx, "__key*__:*")
redisDB := strconv.Itoa(s.cacheRedisClient.Options().DB)
pubsub := s.cacheRedisClient.PSubscribe(ctx, "__keyevent@"+redisDB+"__:expired")
defer func(pubsub *redis.PubSub) {
_ = pubsub.Close()
}(pubsub)
ch := pubsub.Channel()
}(pubsub)
for {
select {
case <-ctx.Done():
return
case msg := <-ch:
default:
msg, _ := pubsub.ReceiveMessage(ctx)
s.logger.Info("key expired", zap.String("key", msg.Payload))
subCtx := context.WithValue(ctx, "msg", msg.Payload)
err := s.ReceiveMessage(subCtx, msg.Payload)
Expand All @@ -54,15 +57,16 @@ func (s *sinkerKeyExpirationListener) SubscribeToKeyExpiration(ctx context.Conte
// ReceiveMessage to be used to receive the message from the sinker key expiration
func (s *sinkerKeyExpirationListener) ReceiveMessage(ctx context.Context, message string) error {
// goroutine
go func(msg string) {
ownerID := message[16:52]
sinkID := message[53:]
go func(msg string) {
ownerID := message[16:51]
sinkID := message[52:]
event := producer.SinkIdleEvent{
OwnerID: ownerID,
SinkID: sinkID,
State: "idle",
Size: "0",
}
s.logger.Info("publishing sink idle event", zap.Any("event", event))
_ = s.idleProducer.PublishSinkIdle(ctx, event)
}(message)
return nil
Expand Down
50 changes: 25 additions & 25 deletions sinker/redis/sinker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package redis_test
import (
"context"
"fmt"
"github.com/orb-community/orb/sinker/redis/consumer"
//"github.com/orb-community/orb/sinker/redis/consumer"
"github.com/orb-community/orb/sinker/redis/producer"
"testing"
"time"
Expand Down Expand Up @@ -58,27 +58,27 @@ func TestSinkActivityStoreAndMessage(t *testing.T) {
logger.Debug("debugging breakpoint")
}

func TestSinkIdle(t *testing.T) {
sinkTTLSvc := producer.NewSinkerKeyService(logger, redisClient)
sinkActivitySvc := producer.NewSinkActivityProducer(logger, redisClient, sinkTTLSvc)
sinkIdleSvc := producer.NewSinkIdleProducer(logger, redisClient)
sinkExpire := consumer.NewSinkerKeyExpirationListener(logger, redisClient, sinkIdleSvc)
event := producer.SinkActivityEvent{
OwnerID: "1",
SinkID: "1",
State: "active",
Size: "40",
Timestamp: time.Now(),
}
ctx := context.WithValue(context.Background(), "test", "TestSinkIdle")
err := sinkExpire.SubscribeToKeyExpiration(ctx)
require.NoError(t, err, fmt.Sprintf("unexpected error: %s", err))
err = sinkActivitySvc.PublishSinkActivity(ctx, event)
require.NoError(t, err, fmt.Sprintf("unexpected error: %s", err))
err = sinkTTLSvc.RenewSinkerKeyInternal(ctx, producer.SinkerKey{
OwnerID: "1",
SinkID: "1",
}, 10*time.Second)
require.NoError(t, err, fmt.Sprintf("unexpected error: %s", err))
_ = OnceReceiver(ctx, "orb.sink_idle")
}
// func TestSinkIdle(t *testing.T) {
// sinkTTLSvc := producer.NewSinkerKeyService(logger, redisClient)
// sinkActivitySvc := producer.NewSinkActivityProducer(logger, redisClient, sinkTTLSvc)
// sinkIdleSvc := producer.NewSinkIdleProducer(logger, redisClient)
// sinkExpire := consumer.NewSinkerKeyExpirationListener(logger, redisClient, sinkIdleSvc)
// event := producer.SinkActivityEvent{
// OwnerID: "1",
// SinkID: "1",
// State: "active",
// Size: "40",
// Timestamp: time.Now(),
// }
// ctx := context.WithValue(context.Background(), "test", "TestSinkIdle")
// err := sinkExpire.SubscribeToKeyExpiration(ctx)
// require.NoError(t, err, fmt.Sprintf("unexpected error: %s", err))
// err = sinkActivitySvc.PublishSinkActivity(ctx, event)
// require.NoError(t, err, fmt.Sprintf("unexpected error: %s", err))
// err = sinkTTLSvc.RenewSinkerKeyInternal(ctx, producer.SinkerKey{
// OwnerID: "1",
// SinkID: "1",
// }, 10*time.Second)
// require.NoError(t, err, fmt.Sprintf("unexpected error: %s", err))
// _ = OnceReceiver(ctx, "orb.sink_idle")
// }
3 changes: 2 additions & 1 deletion sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ package sinker
import (
"context"
"fmt"
"time"

"github.com/orb-community/orb/sinker/redis/consumer"
"github.com/orb-community/orb/sinker/redis/producer"
"time"

"github.com/go-kit/kit/metrics"
"github.com/go-redis/redis/v8"
Expand Down

0 comments on commit c5e125f

Please sign in to comment.