diff --git a/pm/sender.go b/pm/sender.go index ebeb09e2c6..6f6acf0fd4 100644 --- a/pm/sender.go +++ b/pm/sender.go @@ -22,9 +22,6 @@ type Sender interface { // for creating new tickets StartSession(ticketParams TicketParams) string - // CleanupSession deletes session from the internal map - CleanupSession(sessionID string) - // CreateTicketBatch returns a ticket batch of the specified size CreateTicketBatch(sessionID string, size int) (*TicketBatch, error) @@ -85,10 +82,6 @@ func (s *sender) EV(sessionID string) (*big.Rat, error) { return ticketEV(session.ticketParams.FaceValue, session.ticketParams.WinProb), nil } -func (s *sender) CleanupSession(sessionID string) { - s.sessions.Delete(sessionID) -} - func (s *sender) validateSender(info *SenderInfo) error { maxWithdrawRound := new(big.Int).Add(s.timeManager.LastInitializedRound(), big.NewInt(1)) if info.WithdrawRound.Int64() != 0 && info.WithdrawRound.Cmp(maxWithdrawRound) != 1 { diff --git a/pm/stub.go b/pm/stub.go index 549f386472..a3e0f7f224 100644 --- a/pm/stub.go +++ b/pm/stub.go @@ -511,11 +511,6 @@ func (m *MockSender) StartSession(ticketParams TicketParams) string { return args.String(0) } -// CleanupSession deletes session from the internal ma -func (m *MockSender) CleanupSession(sessionID string) { - m.Called(sessionID) -} - // EV returns the ticket EV for a session func (m *MockSender) EV(sessionID string) (*big.Rat, error) { args := m.Called(sessionID) diff --git a/server/broadcast.go b/server/broadcast.go index a3e8cf64b9..bcd0a54658 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -85,7 +85,6 @@ func (cfg *BroadcastConfig) SetMaxPrice(price *core.AutoConvertedPrice) { } type sessionsCreator func() ([]*BroadcastSession, error) -type sessionsCleanup func(sessionId string) type SessionPool struct { mid core.ManifestID @@ -102,11 +101,10 @@ type SessionPool struct { finished bool // set at stream end createSessions sessionsCreator - cleanupSession sessionsCleanup sus *suspender } -func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender, createSession sessionsCreator, cleanupSession sessionsCleanup, +func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender, createSession sessionsCreator, sel BroadcastSessionsSelector) *SessionPool { return &SessionPool{ @@ -116,7 +114,6 @@ func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender, sessMap: make(map[string]*BroadcastSession), sel: sel, createSessions: createSession, - cleanupSession: cleanupSession, sus: sus, } } @@ -381,7 +378,6 @@ func (sp *SessionPool) removeSession(session *BroadcastSession) { sp.lock.Lock() defer sp.lock.Unlock() - sp.cleanupSession(session.PMSessionID) delete(sp.sessMap, session.Transcoder()) } @@ -467,14 +463,11 @@ func NewSessionManager(ctx context.Context, node *core.LivepeerNode, params *cor untrustedNumOrchs := int(untrustedPoolSize) susTrusted := newSuspender() susUntrusted := newSuspender() - cleanupSession := func(sessionID string) { - node.Sender.CleanupSession(sessionID) - } createSessionsTrusted := func() ([]*BroadcastSession, error) { - return selectOrchestrator(ctx, node, params, trustedNumOrchs, susTrusted, common.ScoreAtLeast(common.Score_Trusted), cleanupSession) + return selectOrchestrator(ctx, node, params, trustedNumOrchs, susTrusted, common.ScoreAtLeast(common.Score_Trusted)) } createSessionsUntrusted := func() ([]*BroadcastSession, error) { - return selectOrchestrator(ctx, node, params, untrustedNumOrchs, susUntrusted, common.ScoreEqualTo(common.Score_Untrusted), cleanupSession) + return selectOrchestrator(ctx, node, params, untrustedNumOrchs, susUntrusted, common.ScoreEqualTo(common.Score_Untrusted)) } var stakeRdr stakeReader if node.Eth != nil { @@ -483,8 +476,8 @@ func NewSessionManager(ctx context.Context, node *core.LivepeerNode, params *cor bsm := &BroadcastSessionsManager{ mid: params.ManifestID, VerificationFreq: params.VerificationFreq, - trustedPool: NewSessionPool(params.ManifestID, int(trustedPoolSize), trustedNumOrchs, susTrusted, createSessionsTrusted, cleanupSession, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)), - untrustedPool: NewSessionPool(params.ManifestID, int(untrustedPoolSize), untrustedNumOrchs, susUntrusted, createSessionsUntrusted, cleanupSession, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)), + trustedPool: NewSessionPool(params.ManifestID, int(trustedPoolSize), trustedNumOrchs, susTrusted, createSessionsTrusted, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)), + untrustedPool: NewSessionPool(params.ManifestID, int(untrustedPoolSize), untrustedNumOrchs, susUntrusted, createSessionsUntrusted, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)), } bsm.trustedPool.refreshSessions(ctx) bsm.untrustedPool.refreshSessions(ctx) @@ -775,7 +768,7 @@ func (bsm *BroadcastSessionsManager) usingVerified() bool { } func selectOrchestrator(ctx context.Context, n *core.LivepeerNode, params *core.StreamParameters, count int, sus *suspender, - scorePred common.ScorePred, cleanupSession sessionsCleanup) ([]*BroadcastSession, error) { + scorePred common.ScorePred) ([]*BroadcastSession, error) { if n.OrchestratorPool == nil { clog.Infof(ctx, "No orchestrators specified; not transcoding") @@ -843,7 +836,6 @@ func selectOrchestrator(ctx context.Context, n *core.LivepeerNode, params *core. OrchestratorOS: orchOS, BroadcasterOS: bcastOS, Sender: n.Sender, - CleanupSession: cleanupSession, PMSessionID: sessionID, Balances: n.Balances, Balance: balance, @@ -1478,9 +1470,7 @@ func updateSession(sess *BroadcastSession, res *ReceivedTranscodeResult) { // and the next time this BroadcastSession is used, the ticket params will be validated // during ticket creation in genPayment(). If ticket params validation during ticket // creation fails, then this BroadcastSession will be removed - oldSession := sess.PMSessionID sess.PMSessionID = sess.Sender.StartSession(*pmTicketParams(oInfo.TicketParams)) - sess.CleanupSession(oldSession) // Session ID changed so we need to make sure the balance tracks the new session ID if oldInfo.AuthToken.SessionId != oInfo.AuthToken.SessionId { diff --git a/server/broadcast_test.go b/server/broadcast_test.go index 372d25205a..669ff06313 100644 --- a/server/broadcast_test.go +++ b/server/broadcast_test.go @@ -61,10 +61,16 @@ func StubBroadcastSession(transcoder string) *BroadcastSession { }, OrchestratorScore: common.Score_Trusted, lock: &sync.RWMutex{}, - CleanupSession: func(sessionId string) {}, } } +func StubBroadcastSessionsManager() *BroadcastSessionsManager { + sess1 := StubBroadcastSession("transcoder1") + sess2 := StubBroadcastSession("transcoder2") + + return bsmWithSessList([]*BroadcastSession{sess1, sess2}) +} + func selFactoryEmpty() BroadcastSessionsSelector { return &LIFOSelector{} } @@ -95,8 +101,6 @@ func bsmWithSessListExt(sessList, untrustedSessList []*BroadcastSession, noRefre return cloneSessions(sessList), nil } - var deleteSessions = func(sessionID string) {} - untrustedSessMap := make(map[string]*BroadcastSession) for _, sess := range untrustedSessList { untrustedSessMap[sess.OrchestratorInfo.Transcoder] = sess @@ -114,10 +118,11 @@ func bsmWithSessListExt(sessList, untrustedSessList []*BroadcastSession, noRefre if noRefresh { createSessions = createSessionsEmpty createSessionsUntrusted = createSessionsEmpty + } - trustedPool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, deleteSessions, sel) + trustedPool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, sel) trustedPool.sessMap = sessMap - untrustedPool := NewSessionPool("test", len(untrustedSessList), 1, newSuspender(), createSessionsUntrusted, deleteSessions, unsel) + untrustedPool := NewSessionPool("test", len(untrustedSessList), 1, newSuspender(), createSessionsUntrusted, unsel) untrustedPool.sessMap = untrustedSessMap return &BroadcastSessionsManager{ @@ -388,7 +393,6 @@ func TestSelectSession_MultipleInFlight2(t *testing.T) { sess := StubBroadcastSession(ts.URL) sender := &pm.MockSender{} sender.On("StartSession", mock.Anything).Return("foo").Times(3) - sender.On("StopSession", mock.Anything).Times(3) sender.On("EV", mock.Anything).Return(big.NewRat(1000000, 1), nil) sender.On("CreateTicketBatch", mock.Anything, mock.Anything).Return(defaultTicketBatch(), nil) sender.On("ValidateTicketParams", mock.Anything).Return(nil) @@ -1121,9 +1125,7 @@ func TestUpdateSession(t *testing.T) { balances := core.NewAddressBalances(5 * time.Minute) defer balances.StopCleanup() - sess := &BroadcastSession{PMSessionID: "foo", LatencyScore: 1.1, Balances: balances, lock: &sync.RWMutex{}, CleanupSession: func(sessionID string) { - - }} + sess := &BroadcastSession{PMSessionID: "foo", LatencyScore: 1.1, Balances: balances, lock: &sync.RWMutex{}} res := &ReceivedTranscodeResult{ LatencyScore: 2.1, } diff --git a/server/mediaserver_test.go b/server/mediaserver_test.go index 09f4f53370..8a7db8659f 100644 --- a/server/mediaserver_test.go +++ b/server/mediaserver_test.go @@ -115,6 +115,31 @@ func setupServerWithCancel() (*LivepeerServer, context.CancelFunc) { return S, cancel } +func setupServerWithCancelAndPorts() (*LivepeerServer, context.CancelFunc) { + drivers.NodeStorage = drivers.NewMemoryDriver(nil) + ctx, cancel := context.WithCancel(context.Background()) + var S *LivepeerServer + if S == nil { + httpPushResetTimer = func() (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + pushResetWg.Add(1) + wrapCancel := func() { + cancel() + pushResetWg.Done() + } + return ctx, wrapCancel + } + n, _ := core.NewLivepeerNode(nil, "./tmp", nil) + S, _ = NewLivepeerServer("127.0.0.1:2938", n, true, "") + go S.StartMediaServer(ctx, "127.0.0.1:9080") + go func() { + srv := &http.Server{Addr: "127.0.0.1:9938"} + S.StartCliWebserver(srv) + }() + } + return S, cancel +} + // since we have test that checks that there is no goroutine // left running after using RTMP connection - we have to properly // close connections in all the tests that are using them @@ -146,8 +171,6 @@ func (d *stubDiscovery) GetInfos() []common.OrchestratorLocalInfo { return nil } -var cleanupSessions = func(sessionID string) {} - func (d *stubDiscovery) GetOrchestrators(ctx context.Context, num int, sus common.Suspender, caps common.CapabilityComparator, scorePred common.ScorePred) (common.OrchestratorDescriptors, error) { @@ -214,14 +237,14 @@ func TestSelectOrchestrator(t *testing.T) { mid := core.RandomManifestID() storage := drivers.NodeStorage.NewSession(string(mid)) sp := &core.StreamParameters{ManifestID: mid, Profiles: []ffmpeg.VideoProfile{ffmpeg.P360p30fps16x9}, OS: storage} - if _, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0), cleanupSessions); err != errDiscovery { + if _, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0)); err != errDiscovery { t.Error("Expected error with discovery") } sd := &stubDiscovery{} // Discovery returned no orchestrators s.LivepeerNode.OrchestratorPool = sd - if sess, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0), cleanupSessions); sess != nil || err != errNoOrchs { + if sess, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0)); sess != nil || err != errNoOrchs { t.Error("Expected nil session") } @@ -232,7 +255,7 @@ func TestSelectOrchestrator(t *testing.T) { {PriceInfo: &net.PriceInfo{PricePerUnit: 1, PixelsPerUnit: 1}, TicketParams: &net.TicketParams{}, AuthToken: authToken0}, {PriceInfo: &net.PriceInfo{PricePerUnit: 1, PixelsPerUnit: 1}, TicketParams: &net.TicketParams{}, AuthToken: authToken1}, } - sess, _ := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0), cleanupSessions) + sess, _ := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0)) if len(sess) != len(sd.infos) { t.Error("Expected session length of 2") @@ -267,7 +290,7 @@ func TestSelectOrchestrator(t *testing.T) { externalStorage := drivers.NodeStorage.NewSession(string(mid)) sp.OS = externalStorage - sess, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0), cleanupSessions) + sess, err := selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0)) assert.Nil(err) // B should initialize new OS session using auth token sessionID @@ -355,7 +378,7 @@ func TestSelectOrchestrator(t *testing.T) { expSessionID2 := "bar" sender.On("StartSession", mock.Anything).Return(expSessionID2).Once() - sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0), cleanupSessions) + sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), common.ScoreAtLeast(0)) require.Nil(err) assert.Len(sess, 2) @@ -384,7 +407,7 @@ func TestSelectOrchestrator(t *testing.T) { // Skip orchestrator if missing auth token sd.infos[0].AuthToken = nil - sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), func(float32) bool { return true }, cleanupSessions) + sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), func(float32) bool { return true }) require.Nil(err) assert.Len(sess, 1) @@ -394,7 +417,7 @@ func TestSelectOrchestrator(t *testing.T) { sd.infos[0].AuthToken = &net.AuthToken{} sd.infos[0].TicketParams = nil - sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), func(float32) bool { return true }, cleanupSessions) + sess, err = selectOrchestrator(context.TODO(), s.LivepeerNode, sp, 4, newSuspender(), func(float32) bool { return true }) require.Nil(err) assert.Len(sess, 1) diff --git a/server/rpc.go b/server/rpc.go index 6c5d24637c..097c61e709 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -117,7 +117,6 @@ type BroadcastSession struct { OrchestratorInfo *net.OrchestratorInfo OrchestratorOS drivers.OSSession PMSessionID string - CleanupSession sessionsCleanup Balance Balance InitialPrice *net.PriceInfo } diff --git a/server/sessionpool_test.go b/server/sessionpool_test.go index 832f7016d3..49a9f4f44b 100644 --- a/server/sessionpool_test.go +++ b/server/sessionpool_test.go @@ -51,8 +51,7 @@ func poolWithSessList(sessList []*BroadcastSession) *sessionPoolLIFO { // return sessList, nil return nil, nil } - var deleteSessions = func(sessionID string) {} - pool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, deleteSessions, sel) + pool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, sel) pool.sessMap = sessMap return newSessionPoolLIFO(pool) }