Skip to content

Commit

Permalink
Merge branch 'master' into rafal/fix-leaked-known-sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Sep 10, 2024
2 parents b038843 + 45652c4 commit 97bf818
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 50 deletions.
7 changes: 7 additions & 0 deletions pm/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type Sender interface {
// for creating new tickets
StartSession(ticketParams TicketParams) string

// CleanupSession deletes session from the internal map
CleanupSession(sessionID string)

// CreateTicketBatch returns a ticket batch of the specified size
CreateTicketBatch(sessionID string, size int) (*TicketBatch, error)

Expand Down Expand Up @@ -82,6 +85,10 @@ func (s *sender) EV(sessionID string) (*big.Rat, error) {
return ticketEV(session.ticketParams.FaceValue, session.ticketParams.WinProb), nil
}

func (s *sender) CleanupSession(sessionID string) {
s.sessions.Delete(sessionID)
}

func (s *sender) validateSender(info *SenderInfo) error {
maxWithdrawRound := new(big.Int).Add(s.timeManager.LastInitializedRound(), big.NewInt(1))
if info.WithdrawRound.Int64() != 0 && info.WithdrawRound.Cmp(maxWithdrawRound) != 1 {
Expand Down
5 changes: 5 additions & 0 deletions pm/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ func (m *MockSender) StartSession(ticketParams TicketParams) string {
return args.String(0)
}

// CleanupSession deletes session from the internal ma
func (m *MockSender) CleanupSession(sessionID string) {
m.Called(sessionID)
}

// EV returns the ticket EV for a session
func (m *MockSender) EV(sessionID string) (*big.Rat, error) {
args := m.Called(sessionID)
Expand Down
22 changes: 16 additions & 6 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (cfg *BroadcastConfig) SetMaxPrice(price *core.AutoConvertedPrice) {
}

type sessionsCreator func() ([]*BroadcastSession, error)
type sessionsCleanup func(sessionId string)
type SessionPool struct {
mid core.ManifestID

Expand All @@ -101,10 +102,11 @@ type SessionPool struct {
finished bool // set at stream end

createSessions sessionsCreator
cleanupSession sessionsCleanup
sus *suspender
}

func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender, createSession sessionsCreator,
func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender, createSession sessionsCreator, cleanupSession sessionsCleanup,
sel BroadcastSessionsSelector) *SessionPool {

return &SessionPool{
Expand All @@ -114,6 +116,7 @@ func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender,
sessMap: make(map[string]*BroadcastSession),
sel: sel,
createSessions: createSession,
cleanupSession: cleanupSession,
sus: sus,
}
}
Expand Down Expand Up @@ -378,6 +381,7 @@ func (sp *SessionPool) removeSession(session *BroadcastSession) {
sp.lock.Lock()
defer sp.lock.Unlock()

sp.cleanupSession(session.PMSessionID)
delete(sp.sessMap, session.Transcoder())
}

Expand Down Expand Up @@ -463,11 +467,14 @@ func NewSessionManager(ctx context.Context, node *core.LivepeerNode, params *cor
untrustedNumOrchs := int(untrustedPoolSize)
susTrusted := newSuspender()
susUntrusted := newSuspender()
cleanupSession := func(sessionID string) {
node.Sender.CleanupSession(sessionID)
}
createSessionsTrusted := func() ([]*BroadcastSession, error) {
return selectOrchestrator(ctx, node, params, trustedNumOrchs, susTrusted, common.ScoreAtLeast(common.Score_Trusted))
return selectOrchestrator(ctx, node, params, trustedNumOrchs, susTrusted, common.ScoreAtLeast(common.Score_Trusted), cleanupSession)
}
createSessionsUntrusted := func() ([]*BroadcastSession, error) {
return selectOrchestrator(ctx, node, params, untrustedNumOrchs, susUntrusted, common.ScoreEqualTo(common.Score_Untrusted))
return selectOrchestrator(ctx, node, params, untrustedNumOrchs, susUntrusted, common.ScoreEqualTo(common.Score_Untrusted), cleanupSession)
}
var stakeRdr stakeReader
if node.Eth != nil {
Expand All @@ -476,8 +483,8 @@ func NewSessionManager(ctx context.Context, node *core.LivepeerNode, params *cor
bsm := &BroadcastSessionsManager{
mid: params.ManifestID,
VerificationFreq: params.VerificationFreq,
trustedPool: NewSessionPool(params.ManifestID, int(trustedPoolSize), trustedNumOrchs, susTrusted, createSessionsTrusted, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)),
untrustedPool: NewSessionPool(params.ManifestID, int(untrustedPoolSize), untrustedNumOrchs, susUntrusted, createSessionsUntrusted, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)),
trustedPool: NewSessionPool(params.ManifestID, int(trustedPoolSize), trustedNumOrchs, susTrusted, createSessionsTrusted, cleanupSession, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)),
untrustedPool: NewSessionPool(params.ManifestID, int(untrustedPoolSize), untrustedNumOrchs, susUntrusted, createSessionsUntrusted, cleanupSession, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)),
}
bsm.trustedPool.refreshSessions(ctx)
bsm.untrustedPool.refreshSessions(ctx)
Expand Down Expand Up @@ -768,7 +775,7 @@ func (bsm *BroadcastSessionsManager) usingVerified() bool {
}

func selectOrchestrator(ctx context.Context, n *core.LivepeerNode, params *core.StreamParameters, count int, sus *suspender,
scorePred common.ScorePred) ([]*BroadcastSession, error) {
scorePred common.ScorePred, cleanupSession sessionsCleanup) ([]*BroadcastSession, error) {

if n.OrchestratorPool == nil {
clog.Infof(ctx, "No orchestrators specified; not transcoding")
Expand Down Expand Up @@ -836,6 +843,7 @@ func selectOrchestrator(ctx context.Context, n *core.LivepeerNode, params *core.
OrchestratorOS: orchOS,
BroadcasterOS: bcastOS,
Sender: n.Sender,
CleanupSession: cleanupSession,
PMSessionID: sessionID,
Balances: n.Balances,
Balance: balance,
Expand Down Expand Up @@ -1470,7 +1478,9 @@ func updateSession(sess *BroadcastSession, res *ReceivedTranscodeResult) {
// and the next time this BroadcastSession is used, the ticket params will be validated
// during ticket creation in genPayment(). If ticket params validation during ticket
// creation fails, then this BroadcastSession will be removed
oldSession := sess.PMSessionID
sess.PMSessionID = sess.Sender.StartSession(*pmTicketParams(oInfo.TicketParams))
sess.CleanupSession(oldSession)

// Session ID changed so we need to make sure the balance tracks the new session ID
if oldInfo.AuthToken.SessionId != oInfo.AuthToken.SessionId {
Expand Down
20 changes: 9 additions & 11 deletions server/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,10 @@ func StubBroadcastSession(transcoder string) *BroadcastSession {
},
OrchestratorScore: common.Score_Trusted,
lock: &sync.RWMutex{},
CleanupSession: func(sessionId string) {},
}
}

func StubBroadcastSessionsManager() *BroadcastSessionsManager {
sess1 := StubBroadcastSession("transcoder1")
sess2 := StubBroadcastSession("transcoder2")

return bsmWithSessList([]*BroadcastSession{sess1, sess2})
}

func selFactoryEmpty() BroadcastSessionsSelector {
return &LIFOSelector{}
}
Expand Down Expand Up @@ -101,6 +95,8 @@ func bsmWithSessListExt(sessList, untrustedSessList []*BroadcastSession, noRefre
return cloneSessions(sessList), nil
}

var deleteSessions = func(sessionID string) {}

untrustedSessMap := make(map[string]*BroadcastSession)
for _, sess := range untrustedSessList {
untrustedSessMap[sess.OrchestratorInfo.Transcoder] = sess
Expand All @@ -118,11 +114,10 @@ func bsmWithSessListExt(sessList, untrustedSessList []*BroadcastSession, noRefre
if noRefresh {
createSessions = createSessionsEmpty
createSessionsUntrusted = createSessionsEmpty

}
trustedPool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, sel)
trustedPool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, deleteSessions, sel)
trustedPool.sessMap = sessMap
untrustedPool := NewSessionPool("test", len(untrustedSessList), 1, newSuspender(), createSessionsUntrusted, unsel)
untrustedPool := NewSessionPool("test", len(untrustedSessList), 1, newSuspender(), createSessionsUntrusted, deleteSessions, unsel)
untrustedPool.sessMap = untrustedSessMap

return &BroadcastSessionsManager{
Expand Down Expand Up @@ -393,6 +388,7 @@ func TestSelectSession_MultipleInFlight2(t *testing.T) {
sess := StubBroadcastSession(ts.URL)
sender := &pm.MockSender{}
sender.On("StartSession", mock.Anything).Return("foo").Times(3)
sender.On("StopSession", mock.Anything).Times(3)
sender.On("EV", mock.Anything).Return(big.NewRat(1000000, 1), nil)
sender.On("CreateTicketBatch", mock.Anything, mock.Anything).Return(defaultTicketBatch(), nil)
sender.On("ValidateTicketParams", mock.Anything).Return(nil)
Expand Down Expand Up @@ -1125,7 +1121,9 @@ func TestUpdateSession(t *testing.T) {

balances := core.NewAddressBalances(5 * time.Minute)
defer balances.StopCleanup()
sess := &BroadcastSession{PMSessionID: "foo", LatencyScore: 1.1, Balances: balances, lock: &sync.RWMutex{}}
sess := &BroadcastSession{PMSessionID: "foo", LatencyScore: 1.1, Balances: balances, lock: &sync.RWMutex{}, CleanupSession: func(sessionID string) {

}}
res := &ReceivedTranscodeResult{
LatencyScore: 2.1,
}
Expand Down
41 changes: 9 additions & 32 deletions server/mediaserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,31 +115,6 @@ func setupServerWithCancel() (*LivepeerServer, context.CancelFunc) {
return S, cancel
}

func setupServerWithCancelAndPorts() (*LivepeerServer, context.CancelFunc) {
drivers.NodeStorage = drivers.NewMemoryDriver(nil)
ctx, cancel := context.WithCancel(context.Background())
var S *LivepeerServer
if S == nil {
httpPushResetTimer = func() (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
pushResetWg.Add(1)
wrapCancel := func() {
cancel()
pushResetWg.Done()
}
return ctx, wrapCancel
}
n, _ := core.NewLivepeerNode(nil, "./tmp", nil)
S, _ = NewLivepeerServer("127.0.0.1:2938", n, true, "")
go S.StartMediaServer(ctx, "127.0.0.1:9080")
go func() {
srv := &http.Server{Addr: "127.0.0.1:9938"}
S.StartCliWebserver(srv)
}()
}
return S, cancel
}

// since we have test that checks that there is no goroutine
// left running after using RTMP connection - we have to properly
// close connections in all the tests that are using them
Expand Down Expand Up @@ -171,6 +146,8 @@ func (d *stubDiscovery) GetInfos() []common.OrchestratorLocalInfo {
return nil
}

var cleanupSessions = func(sessionID string) {}

func (d *stubDiscovery) GetOrchestrators(ctx context.Context, num int, sus common.Suspender, caps common.CapabilityComparator,
scorePred common.ScorePred) (common.OrchestratorDescriptors, error) {

Expand Down Expand Up @@ -237,14 +214,14 @@ func TestSelectOrchestrator(t *testing.T) {
mid := core.RandomManifestID()
storage := drivers.NodeStorage.NewSession(string(mid))
sp := &core.StreamParameters{ManifestID: mid, Profiles: []ffmpeg.VideoProfile{ffmpeg.P360p30fps16x9}, OS: storage}
if _, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0)); err != errDiscovery {
if _, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0), cleanupSessions); err != errDiscovery {
t.Error("Expected error with discovery")
}

sd := &stubDiscovery{}
// Discovery returned no orchestrators
s.LivepeerNode.OrchestratorPool = sd
if sess, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0)); sess != nil || err != errNoOrchs {
if sess, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0), cleanupSessions); sess != nil || err != errNoOrchs {
t.Error("Expected nil session")
}

Expand All @@ -255,7 +232,7 @@ func TestSelectOrchestrator(t *testing.T) {
{PriceInfo: &net.PriceInfo{PricePerUnit: 1, PixelsPerUnit: 1}, TicketParams: &net.TicketParams{}, AuthToken: authToken0},
{PriceInfo: &net.PriceInfo{PricePerUnit: 1, PixelsPerUnit: 1}, TicketParams: &net.TicketParams{}, AuthToken: authToken1},
}
sess, _ := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0))
sess, _ := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0), cleanupSessions)

if len(sess) != len(sd.infos) {
t.Error("Expected session length of 2")
Expand Down Expand Up @@ -290,7 +267,7 @@ func TestSelectOrchestrator(t *testing.T) {
externalStorage := drivers.NodeStorage.NewSession(string(mid))
sp.OS = externalStorage

sess, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0))
sess, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0), cleanupSessions)
assert.Nil(err)

// B should initialize new OS session using auth token sessionID
Expand Down Expand Up @@ -378,7 +355,7 @@ func TestSelectOrchestrator(t *testing.T) {
expSessionID2 := "bar"
sender.On("StartSession", mock.Anything).Return(expSessionID2).Once()

sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0))
sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0), cleanupSessions)
require.Nil(err)

assert.Len(sess, 2)
Expand Down Expand Up @@ -407,7 +384,7 @@ func TestSelectOrchestrator(t *testing.T) {
// Skip orchestrator if missing auth token
sd.infos[0].AuthToken = nil

sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), func(float32) bool { return true })
sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), func(float32) bool { return true }, cleanupSessions)
require.Nil(err)

assert.Len(sess, 1)
Expand All @@ -417,7 +394,7 @@ func TestSelectOrchestrator(t *testing.T) {
sd.infos[0].AuthToken = &net.AuthToken{}
sd.infos[0].TicketParams = nil

sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), func(float32) bool { return true })
sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), func(float32) bool { return true }, cleanupSessions)
require.Nil(err)

assert.Len(sess, 1)
Expand Down
1 change: 1 addition & 0 deletions server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type BroadcastSession struct {
OrchestratorInfo *net.OrchestratorInfo
OrchestratorOS drivers.OSSession
PMSessionID string
CleanupSession sessionsCleanup
Balance Balance
InitialPrice *net.PriceInfo
}
Expand Down
3 changes: 2 additions & 1 deletion server/sessionpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func poolWithSessList(sessList []*BroadcastSession) *sessionPoolLIFO {
// return sessList, nil
return nil, nil
}
pool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, sel)
var deleteSessions = func(sessionID string) {}
pool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, deleteSessions, sel)
pool.sessMap = sessMap
return newSessionPoolLIFO(pool)
}
Expand Down

0 comments on commit 97bf818

Please sign in to comment.