Skip to content

Commit

Permalink
fix suspension
Browse files Browse the repository at this point in the history
  • Loading branch information
ad-astra-video committed Jul 22, 2024
1 parent a725208 commit d94d62b
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ type AISessionPool struct {
sessMap map[string]*BroadcastSession
inUseSess []*BroadcastSession
suspender *suspender
penalty int
mu sync.RWMutex
}

func NewAISessionPool(selector BroadcastSessionsSelector, suspender *suspender) *AISessionPool {
func NewAISessionPool(selector BroadcastSessionsSelector, suspender *suspender, penalty int) *AISessionPool {
return &AISessionPool{
selector: selector,
sessMap: make(map[string]*BroadcastSession),
suspender: suspender,
penalty: penalty,
mu: sync.RWMutex{},
}
}
Expand Down Expand Up @@ -122,10 +124,17 @@ func (pool *AISessionPool) Remove(sess *BroadcastSession) {
delete(pool.sessMap, sess.Transcoder())
pool.inUseSess = removeSessionFromList(pool.inUseSess, sess)

// Magic number for now
penalty := 3
penalty := 0
// If this method is called assume that the orch should be suspended
// as well
// as well. Since AISessionManager re-uses the pools the suspension
// penalty needs to consider the current suspender count to set the penalty
last_count, ok := pool.suspender.list[sess.Transcoder()]
if ok {
penalty = pool.suspender.count - last_count + pool.penalty
} else {
penalty = pool.suspender.count + pool.penalty
}

pool.suspender.suspend(sess.Transcoder(), penalty)
}

Expand All @@ -152,12 +161,14 @@ type AISessionSelector struct {
// The time until the pools should be refreshed with orchs from discovery
ttl time.Duration
lastRefreshTime time.Time
initialPoolSize int

cap core.Capability
modelID string

node *core.LivepeerNode
suspender *suspender
penalty int
os drivers.OSSession
}

Expand All @@ -172,8 +183,9 @@ func NewAISessionSelector(cap core.Capability, modelID string, node *core.Livepe
// The latency score in this context is just the latency of the last completed request for a session
// The "good enough" latency score is set to 0.0 so the selector will always select unknown sessions first
minLS := 0.0
warmPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore), suspender)
coldPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore), suspender)
penalty := 3
warmPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore), suspender, penalty)
coldPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore), suspender, penalty)
sel := &AISessionSelector{
warmPool: warmPool,
coldPool: coldPool,
Expand All @@ -182,6 +194,7 @@ func NewAISessionSelector(cap core.Capability, modelID string, node *core.Livepe
modelID: modelID,
node: node,
suspender: suspender,
penalty: penalty,
os: drivers.NodeStorage.NewSession(strconv.Itoa(int(cap)) + "_" + modelID),
}

Expand All @@ -196,7 +209,17 @@ func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
shouldRefreshSelector := func() bool {
// Refresh if the # of sessions across warm and cold pools falls below the smaller of the maxRefreshSessionsThreshold and
// 1/2 the total # of orchs that can be queried during discovery
discoveryPoolSize := sel.node.OrchestratorPool.Size()
discoveryPoolSize := int(math.Min(float64(sel.node.OrchestratorPool.Size()), float64(sel.initialPoolSize)))

if (sel.warmPool.Size() + sel.coldPool.Size()) == 0 {
//release all orchestrators from suspension and try refresh
//if penalty in
clog.Infof(ctx, "refreshing sessions, no orchestrators in pools")
for i := 0; i < sel.penalty; i++ {
sel.suspender.signalRefresh()
}
}

if sel.warmPool.Size()+sel.coldPool.Size() < int(math.Min(maxRefreshSessionsThreshold, math.Ceil(float64(discoveryPoolSize)/2.0))) {
return true
}
Expand Down Expand Up @@ -257,6 +280,7 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {

var warmSessions []*BroadcastSession
var coldSessions []*BroadcastSession

for _, sess := range sessions {
// If the constraints are missing for this capability skip this session
constraints, ok := sess.OrchestratorInfo.Capabilities.Constraints[uint32(sel.cap)]
Expand All @@ -279,6 +303,7 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {

sel.warmPool.Add(warmSessions)
sel.coldPool.Add(coldSessions)
sel.initialPoolSize = len(warmSessions) + len(coldSessions) + len(sel.suspender.list)

sel.lastRefreshTime = time.Now()

Expand Down

0 comments on commit d94d62b

Please sign in to comment.