From 0150a9e8e01444c995838fe3eefb40c1f4f63e7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Mon, 9 Oct 2023 15:38:22 +0200 Subject: [PATCH 1/3] Add / fix logs for swapping Orchestrator --- server/broadcast.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/server/broadcast.go b/server/broadcast.go index e95da3af99..d432810bf2 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -218,7 +218,7 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude [] if len(session.SegsInFlight) == 0 { if session.LatencyScore > 0 && session.LatencyScore <= SELECTOR_LATENCY_SCORE_THRESHOLD { clog.PublicInfof(ctx, - "Selecting new orchestrator, reason=%v", + "Reusing Orchestrator, reason=%v", fmt.Sprintf( "performance: no segments in flight, latency score of %v < %v", session.LatencyScore, @@ -228,6 +228,14 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude [] return session } + clog.PublicInfof(ctx, + "Swapping Orchestrator, reason=%v", + fmt.Sprintf( + "performance: no segments in flight, latency score of %v < %v", + session.LatencyScore, + durMult, + ), + ) } // A session with segments in flight might be selectable under certain conditions @@ -247,7 +255,7 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude [] if timeInFlight < maxTimeInFlight { clog.PublicInfof(ctx, - "Selected orchestrator reason=%v", + "Reusing orchestrator reason=%v", fmt.Sprintf( "performance: segments in flight, latency score of %v < %v", session.LatencyScore, @@ -257,6 +265,14 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude [] return session } + clog.PublicInfof(ctx, + "Swapping Orchestrator, reason=%v", + fmt.Sprintf( + "performance: no segments in flight, latency score of %v < %v", + session.LatencyScore, + durMult, + ), + ) } } return nil From 1fea43a8e17977965418972e278d56ff3cb9de3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Mon, 9 Oct 2023 15:43:04 +0200 Subject: [PATCH 2/3] Use min seg duration for latency score calculation --- server/segment_rpc.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/segment_rpc.go b/server/segment_rpc.go index b35409d3e1..4b60320a29 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -629,10 +629,13 @@ func SubmitSegment(ctx context.Context, sess *BroadcastSession, seg *stream.HLSS clog.Infof(ctx, "Successfully transcoded segment segName=%s seqNo=%d orch=%s dur=%s", seg.Name, seg.SeqNo, ti.Transcoder, transcodeDur) + // Use 1.5s for segments that are shorter than 1.5s + // Otherwise, the latency score is too high which results in a high number session swaps + segDuration := math.Max(1.5, seg.Duration) return &ReceivedTranscodeResult{ TranscodeData: tdata, Info: tr.Info, - LatencyScore: tookAllDur.Seconds() / seg.Duration, + LatencyScore: tookAllDur.Seconds() / segDuration, }, nil } From 6454a6aa1a7dc18317a1ae27451be7cda41926c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Mon, 9 Oct 2023 20:34:27 +0200 Subject: [PATCH 3/3] Fix unit test --- server/broadcast_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/broadcast_test.go b/server/broadcast_test.go index 2da5ba1119..669ff06313 100644 --- a/server/broadcast_test.go +++ b/server/broadcast_test.go @@ -735,7 +735,7 @@ func TestTranscodeSegment_CompleteSession(t *testing.T) { // Create stub server ts, mux := stubTLSServer() defer ts.Close() - transcodeDelay := 100 * time.Millisecond + transcodeDelay := 1500 * time.Millisecond mux.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) { time.Sleep(transcodeDelay) w.WriteHeader(http.StatusOK)