From b9f37243d59e9a8d2e530dfba5429b140896288f Mon Sep 17 00:00:00 2001 From: Max Holland Date: Mon, 14 Oct 2024 12:59:13 +0100 Subject: [PATCH] Add new orchestrator capacities metric --- monitor/census.go | 20 +++++++++++++++++++- monitor/census_test.go | 8 ++++---- server/broadcast.go | 12 +++++++++++- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index f9dae6babb..29911b99c6 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -114,6 +114,7 @@ type ( kOrchestratorAddress tag.Key kOrchestratorVersion tag.Key kFVErrorType tag.Key + kCapability tag.Key mSegmentSourceAppeared *stats.Int64Measure mSegmentEmerged *stats.Int64Measure mSegmentEmergedUnprocessed *stats.Int64Measure @@ -138,6 +139,7 @@ type ( mSuccessRatePerStream *stats.Float64Measure mTranscodeTime *stats.Float64Measure mTranscodeOverallLatency *stats.Float64Measure + mOrchestratorCapacities *stats.Int64Measure mUploadTime *stats.Float64Measure mDownloadTime *stats.Float64Measure mAuthWebhookTime *stats.Float64Measure @@ -293,6 +295,7 @@ func InitCensus(nodeType NodeType, version string) { census.mTranscodeTime = stats.Float64("transcode_time_seconds", "Transcoding time", "sec") census.mTranscodeOverallLatency = stats.Float64("transcode_overall_latency_seconds", "Transcoding latency, from source segment emerged from segmenter till all transcoded segment apeeared in manifest", "sec") + census.mOrchestratorCapacities = stats.Int64("orchestrator_capacities", "Capacity per orchestrator per capability", "tot") census.mUploadTime = stats.Float64("upload_time_seconds", "Upload (to Orchestrator) time", "sec") census.mDownloadTime = stats.Float64("download_time_seconds", "Download (from orchestrator) time", "sec") census.mAuthWebhookTime = stats.Float64("auth_webhook_time_milliseconds", "Authentication webhook execution time", "ms") @@ -557,6 +560,13 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: append([]tag.Key{census.kProfiles}, baseTagsWithOrchInfo...), Aggregation: view.Distribution(0, .500, .75, 1.000, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), }, + { + Name: "orchestrator_capacities", + Measure: census.mOrchestratorCapacities, + Description: "Capacity per orchestrator per capability", + TagKeys: append([]tag.Key{census.kProfiles, census.kCapability}, baseTagsWithOrchInfo...), + Aggregation: view.LastValue(), + }, { Name: "transcode_score", Measure: census.mTranscodeScore, @@ -1422,7 +1432,7 @@ func (cen *censusMetricsCounter) sendSuccess() { stats.Record(cen.ctx, cen.mSuccessRate.M(cen.successRate())) } -func SegmentFullyTranscoded(ctx context.Context, nonce, seqNo uint64, profiles string, errCode SegmentTranscodeError, orchInfo *lpnet.OrchestratorInfo) { +func SegmentFullyTranscoded(ctx context.Context, nonce, seqNo uint64, profiles string, errCode SegmentTranscodeError, orchInfo *lpnet.OrchestratorInfo, orchCapacities map[string]int64) { census.lock.Lock() defer census.lock.Unlock() rctx, err := tag.New(census.ctx, tag.Insert(census.kProfiles, profiles)) @@ -1441,6 +1451,14 @@ func SegmentFullyTranscoded(ctx context.Context, nonce, seqNo uint64, profiles s } census.countSegmentEmerged(ctx, nonce, seqNo) } + + for capability, capacity := range orchCapacities { + if err := stats.RecordWithTags(rctx, + manifestIDTagAndOrchInfo(orchInfo, ctx, tag.Insert(census.kCapability, capability)), census.mOrchestratorCapacities.M(capacity)); err != nil { + clog.Errorf(ctx, "Error recording capacity metric err=%q", err) + } + } + if errCode == "" { if err := stats.RecordWithTags(rctx, manifestIDTagAndOrchInfo(orchInfo, ctx), census.mSegmentTranscodedAllAppeared.M(1)); err != nil { diff --git a/monitor/census_test.go b/monitor/census_test.go index 22c65b92c1..92a396ad85 100644 --- a/monitor/census_test.go +++ b/monitor/census_test.go @@ -62,7 +62,7 @@ func TestLastSegmentTimeout(t *testing.T) { if sr := census.successRate(); sr != 1 { t.Fatalf("Success rate should be 1, not %f", sr) } - SegmentFullyTranscoded(context.Background(), 1, 1, "ps", "", &lpnet.OrchestratorInfo{}) + SegmentFullyTranscoded(context.Background(), 1, 1, "ps", "", &lpnet.OrchestratorInfo{}, nil) if sr := census.successRate(); sr != 1 { t.Fatalf("Success rate should be 1, not %f", sr) } @@ -74,7 +74,7 @@ func TestLastSegmentTimeout(t *testing.T) { SegmentEmerged(context.TODO(), 1, 3, 3, 1) SegmentTranscodeFailed(context.TODO(), SegmentTranscodeErrorSessionEnded, 1, 3, fmt.Errorf("some"), true) SegmentEmerged(context.TODO(), 1, 4, 3, 1) - SegmentFullyTranscoded(context.Background(), 1, 4, "ps", "", &lpnet.OrchestratorInfo{}) + SegmentFullyTranscoded(context.Background(), 1, 4, "ps", "", &lpnet.OrchestratorInfo{}, nil) if sr := census.successRate(); sr != 0.75 { t.Fatalf("Success rate should be 0.75, not %f", sr) } @@ -85,7 +85,7 @@ func TestLastSegmentTimeout(t *testing.T) { StreamCreated("h1", 2) SegmentEmerged(context.TODO(), 2, 1, 3, 1) - SegmentFullyTranscoded(context.Background(), 2, 1, "ps", "", &lpnet.OrchestratorInfo{}) + SegmentFullyTranscoded(context.Background(), 2, 1, "ps", "", &lpnet.OrchestratorInfo{}, nil) SegmentEmerged(context.TODO(), 2, 2, 3, 1) StreamEnded(context.TODO(), 2) if len(census.success) != 1 { @@ -111,7 +111,7 @@ func TestLastSegmentTimeout(t *testing.T) { StreamCreated("h3", 3) SegmentEmerged(context.TODO(), 3, 1, 3, 1) - SegmentFullyTranscoded(context.Background(), 3, 1, "ps", "", &lpnet.OrchestratorInfo{}) + SegmentFullyTranscoded(context.Background(), 3, 1, "ps", "", &lpnet.OrchestratorInfo{}, nil) SegmentEmerged(context.TODO(), 3, 2, 3, 1) StreamEnded(context.TODO(), 3) if len(census.success) != 1 { diff --git a/server/broadcast.go b/server/broadcast.go index 5e9c134685..c04cf93ab8 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -1369,7 +1369,17 @@ func downloadResults(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSSe } if monitor.Enabled { - monitor.SegmentFullyTranscoded(ctx, nonce, seg.SeqNo, common.ProfilesNames(sess.Params.Profiles), errCode, sess.OrchestratorInfo) + orchCapacities := make(map[string]int64) + if sess.OrchestratorInfo != nil && sess.OrchestratorInfo.Capabilities != nil { + for capability, capacity := range sess.OrchestratorInfo.Capabilities.Capacities { + capName, err := core.CapabilityToName(core.Capability(capability)) + if err != nil { + continue + } + orchCapacities[capName] = int64(capacity) + } + } + monitor.SegmentFullyTranscoded(ctx, nonce, seg.SeqNo, common.ProfilesNames(sess.Params.Profiles), errCode, sess.OrchestratorInfo, orchCapacities) } clog.V(common.DEBUG).Infof(ctx, "Successfully validated segment")