Skip to content

Commit

Permalink
Clean up inactive sessions
Browse files Browse the repository at this point in the history
Not cleaning up the sessions causes quite a big memory leak which in effect causes OOMKilled
  • Loading branch information
leszko committed Sep 5, 2024
1 parent 2e55134 commit a4991f2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Info("Broadcaster Deposit: ", eth.FormatUnits(info.Deposit, "ETH"))
glog.Info("Broadcaster Reserve: ", eth.FormatUnits(info.Reserve.FundsRemaining, "ETH"))

n.Sender = pm.NewSender(n.Eth, timeWatcher, senderWatcher, maxEV, maxTotalEV, *cfg.DepositMultiplier)
n.Sender = pm.NewSender(ctx, n.Eth, timeWatcher, senderWatcher, maxEV, maxTotalEV, *cfg.DepositMultiplier)

pixelsPerUnit, ok := new(big.Rat).SetString(*cfg.PixelsPerUnit)
if !ok || !pixelsPerUnit.IsInt() {
Expand Down
45 changes: 42 additions & 3 deletions pm/sender.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package pm

import (
"context"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
)

const (
sessionCleanupInterval = 10 * time.Minute
sessionInactiveTimeout = 1 * time.Hour
)

// ErrSenderValidation is returned when the sender cannot send tickets
type ErrSenderValidation struct {
error
Expand Down Expand Up @@ -46,19 +53,22 @@ type sender struct {
maxTotalEV *big.Rat
depositMultiplier int

sessions sync.Map
sessions sync.Map
sessionsLastUsage sync.Map
}

// NewSender creates a new Sender instance.
func NewSender(signer Signer, timeManager TimeManager, senderManager SenderManager, maxEV *big.Rat, maxTotalEV *big.Rat, depositMultiplier int) Sender {
return &sender{
func NewSender(ctx context.Context, signer Signer, timeManager TimeManager, senderManager SenderManager, maxEV *big.Rat, maxTotalEV *big.Rat, depositMultiplier int) Sender {
s := &sender{
signer: signer,
timeManager: timeManager,
senderManager: senderManager,
maxEV: maxEV,
maxTotalEV: maxTotalEV,
depositMultiplier: depositMultiplier,
}
s.startSessionsCleanupLoop(ctx)
return s
}

func (s *sender) StartSession(ticketParams TicketParams) string {
Expand All @@ -68,6 +78,7 @@ func (s *sender) StartSession(ticketParams TicketParams) string {
ticketParams: ticketParams,
senderNonce: 0,
})
s.sessionsLastUsage.Store(sessionID, time.Now())

return sessionID
}
Expand Down Expand Up @@ -212,6 +223,34 @@ func (s *sender) loadSession(sessionID string) (*session, error) {
if !ok {
return nil, errors.Errorf("error loading session: %x", sessionID)
}
s.sessionsLastUsage.Store(sessionID, time.Now())

return tempSession.(*session), nil
}

func (s *sender) startSessionsCleanupLoop(ctx context.Context) {
go func() {
t := time.NewTicker(sessionCleanupInterval)
for {
select {
case <-t.C:
s.cleanupSessions()
case <-ctx.Done():
return
}
}
}()
}

func (s *sender) cleanupSessions() {
now := time.Now()
s.sessionsLastUsage.Range(func(key, value interface{}) bool {
sessionID := key.(string)
lastUsage := value.(time.Time)
if now.Sub(lastUsage) > sessionInactiveTimeout {
s.sessions.Delete(sessionID)
s.sessionsLastUsage.Delete(sessionID)
}
return true
})
}

0 comments on commit a4991f2

Please sign in to comment.