From a4991f24953e8c812ee750e89c91cf363fc17169 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Thu, 5 Sep 2024 16:30:20 +0200 Subject: [PATCH] Clean up inactive sessions Not cleaning up the sessions causes quite a big memory leak which in effect causes OOMKilled --- cmd/livepeer/starter/starter.go | 2 +- pm/sender.go | 45 ++++++++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 1556b125df..5dde785005 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -922,7 +922,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { glog.Info("Broadcaster Deposit: ", eth.FormatUnits(info.Deposit, "ETH")) glog.Info("Broadcaster Reserve: ", eth.FormatUnits(info.Reserve.FundsRemaining, "ETH")) - n.Sender = pm.NewSender(n.Eth, timeWatcher, senderWatcher, maxEV, maxTotalEV, *cfg.DepositMultiplier) + n.Sender = pm.NewSender(ctx, n.Eth, timeWatcher, senderWatcher, maxEV, maxTotalEV, *cfg.DepositMultiplier) pixelsPerUnit, ok := new(big.Rat).SetString(*cfg.PixelsPerUnit) if !ok || !pixelsPerUnit.IsInt() { diff --git a/pm/sender.go b/pm/sender.go index 6f6acf0fd4..8c4d4dc402 100644 --- a/pm/sender.go +++ b/pm/sender.go @@ -1,15 +1,22 @@ package pm import ( + "context" "fmt" "math/big" "sync" "sync/atomic" + "time" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" ) +const ( + sessionCleanupInterval = 10 * time.Minute + sessionInactiveTimeout = 1 * time.Hour +) + // ErrSenderValidation is returned when the sender cannot send tickets type ErrSenderValidation struct { error @@ -46,12 +53,13 @@ type sender struct { maxTotalEV *big.Rat depositMultiplier int - sessions sync.Map + sessions sync.Map + sessionsLastUsage sync.Map } // NewSender creates a new Sender instance. -func NewSender(signer Signer, timeManager TimeManager, senderManager SenderManager, maxEV *big.Rat, maxTotalEV *big.Rat, depositMultiplier int) Sender { - return &sender{ +func NewSender(ctx context.Context, signer Signer, timeManager TimeManager, senderManager SenderManager, maxEV *big.Rat, maxTotalEV *big.Rat, depositMultiplier int) Sender { + s := &sender{ signer: signer, timeManager: timeManager, senderManager: senderManager, @@ -59,6 +67,8 @@ func NewSender(signer Signer, timeManager TimeManager, senderManager SenderManag maxTotalEV: maxTotalEV, depositMultiplier: depositMultiplier, } + s.startSessionsCleanupLoop(ctx) + return s } func (s *sender) StartSession(ticketParams TicketParams) string { @@ -68,6 +78,7 @@ func (s *sender) StartSession(ticketParams TicketParams) string { ticketParams: ticketParams, senderNonce: 0, }) + s.sessionsLastUsage.Store(sessionID, time.Now()) return sessionID } @@ -212,6 +223,34 @@ func (s *sender) loadSession(sessionID string) (*session, error) { if !ok { return nil, errors.Errorf("error loading session: %x", sessionID) } + s.sessionsLastUsage.Store(sessionID, time.Now()) return tempSession.(*session), nil } + +func (s *sender) startSessionsCleanupLoop(ctx context.Context) { + go func() { + t := time.NewTicker(sessionCleanupInterval) + for { + select { + case <-t.C: + s.cleanupSessions() + case <-ctx.Done(): + return + } + } + }() +} + +func (s *sender) cleanupSessions() { + now := time.Now() + s.sessionsLastUsage.Range(func(key, value interface{}) bool { + sessionID := key.(string) + lastUsage := value.(time.Time) + if now.Sub(lastUsage) > sessionInactiveTimeout { + s.sessions.Delete(sessionID) + s.sessionsLastUsage.Delete(sessionID) + } + return true + }) +}