Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Orchestrator Swaps #2885

Merged
merged 3 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude []
if len(session.SegsInFlight) == 0 {
if session.LatencyScore > 0 && session.LatencyScore <= SELECTOR_LATENCY_SCORE_THRESHOLD {
clog.PublicInfof(ctx,
"Selecting new orchestrator, reason=%v",
"Reusing Orchestrator, reason=%v",
fmt.Sprintf(
"performance: no segments in flight, latency score of %v < %v",
session.LatencyScore,
Expand All @@ -228,6 +228,14 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude []

return session
}
clog.PublicInfof(ctx,
"Swapping Orchestrator, reason=%v",
fmt.Sprintf(
"performance: no segments in flight, latency score of %v < %v",
session.LatencyScore,
durMult,
),
)
}

// A session with segments in flight might be selectable under certain conditions
Expand All @@ -247,7 +255,7 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude []

if timeInFlight < maxTimeInFlight {
clog.PublicInfof(ctx,
"Selected orchestrator reason=%v",
"Reusing orchestrator reason=%v",
fmt.Sprintf(
"performance: segments in flight, latency score of %v < %v",
session.LatencyScore,
Expand All @@ -257,6 +265,14 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude []

return session
}
clog.PublicInfof(ctx,
"Swapping Orchestrator, reason=%v",
fmt.Sprintf(
"performance: no segments in flight, latency score of %v < %v",
session.LatencyScore,
durMult,
),
)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion server/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func TestTranscodeSegment_CompleteSession(t *testing.T) {
// Create stub server
ts, mux := stubTLSServer()
defer ts.Close()
transcodeDelay := 100 * time.Millisecond
transcodeDelay := 1500 * time.Millisecond
mux.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) {
time.Sleep(transcodeDelay)
w.WriteHeader(http.StatusOK)
Expand Down
5 changes: 4 additions & 1 deletion server/segment_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,13 @@ func SubmitSegment(ctx context.Context, sess *BroadcastSession, seg *stream.HLSS
clog.Infof(ctx, "Successfully transcoded segment segName=%s seqNo=%d orch=%s dur=%s",
seg.Name, seg.SeqNo, ti.Transcoder, transcodeDur)

// Use 1.5s for segments that are shorter than 1.5s
// Otherwise, the latency score is too high which results in a high number session swaps
segDuration := math.Max(1.5, seg.Duration)
return &ReceivedTranscodeResult{
TranscodeData: tdata,
Info: tr.Info,
LatencyScore: tookAllDur.Seconds() / seg.Duration,
LatencyScore: tookAllDur.Seconds() / segDuration,
}, nil
}

Expand Down
Loading