Skip to content

Commit

Permalink
Add new orchestrator capacities metric
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Oct 14, 2024
1 parent aad1a23 commit b9f3724
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
20 changes: 19 additions & 1 deletion monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type (
kOrchestratorAddress tag.Key
kOrchestratorVersion tag.Key
kFVErrorType tag.Key
kCapability tag.Key
mSegmentSourceAppeared *stats.Int64Measure
mSegmentEmerged *stats.Int64Measure
mSegmentEmergedUnprocessed *stats.Int64Measure
Expand All @@ -138,6 +139,7 @@ type (
mSuccessRatePerStream *stats.Float64Measure
mTranscodeTime *stats.Float64Measure
mTranscodeOverallLatency *stats.Float64Measure
mOrchestratorCapacities *stats.Int64Measure
mUploadTime *stats.Float64Measure
mDownloadTime *stats.Float64Measure
mAuthWebhookTime *stats.Float64Measure
Expand Down Expand Up @@ -293,6 +295,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mTranscodeTime = stats.Float64("transcode_time_seconds", "Transcoding time", "sec")
census.mTranscodeOverallLatency = stats.Float64("transcode_overall_latency_seconds",
"Transcoding latency, from source segment emerged from segmenter till all transcoded segment apeeared in manifest", "sec")
census.mOrchestratorCapacities = stats.Int64("orchestrator_capacities", "Capacity per orchestrator per capability", "tot")
census.mUploadTime = stats.Float64("upload_time_seconds", "Upload (to Orchestrator) time", "sec")
census.mDownloadTime = stats.Float64("download_time_seconds", "Download (from orchestrator) time", "sec")
census.mAuthWebhookTime = stats.Float64("auth_webhook_time_milliseconds", "Authentication webhook execution time", "ms")
Expand Down Expand Up @@ -557,6 +560,13 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: append([]tag.Key{census.kProfiles}, baseTagsWithOrchInfo...),
Aggregation: view.Distribution(0, .500, .75, 1.000, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000),
},
{
Name: "orchestrator_capacities",
Measure: census.mOrchestratorCapacities,
Description: "Capacity per orchestrator per capability",
TagKeys: append([]tag.Key{census.kProfiles, census.kCapability}, baseTagsWithOrchInfo...),
Aggregation: view.LastValue(),
},
{
Name: "transcode_score",
Measure: census.mTranscodeScore,
Expand Down Expand Up @@ -1422,7 +1432,7 @@ func (cen *censusMetricsCounter) sendSuccess() {
stats.Record(cen.ctx, cen.mSuccessRate.M(cen.successRate()))
}

func SegmentFullyTranscoded(ctx context.Context, nonce, seqNo uint64, profiles string, errCode SegmentTranscodeError, orchInfo *lpnet.OrchestratorInfo) {
func SegmentFullyTranscoded(ctx context.Context, nonce, seqNo uint64, profiles string, errCode SegmentTranscodeError, orchInfo *lpnet.OrchestratorInfo, orchCapacities map[string]int64) {
census.lock.Lock()
defer census.lock.Unlock()
rctx, err := tag.New(census.ctx, tag.Insert(census.kProfiles, profiles))
Expand All @@ -1441,6 +1451,14 @@ func SegmentFullyTranscoded(ctx context.Context, nonce, seqNo uint64, profiles s
}
census.countSegmentEmerged(ctx, nonce, seqNo)
}

for capability, capacity := range orchCapacities {
if err := stats.RecordWithTags(rctx,
manifestIDTagAndOrchInfo(orchInfo, ctx, tag.Insert(census.kCapability, capability)), census.mOrchestratorCapacities.M(capacity)); err != nil {
clog.Errorf(ctx, "Error recording capacity metric err=%q", err)

Check warning on line 1458 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1456-L1458

Added lines #L1456 - L1458 were not covered by tests
}
}

if errCode == "" {
if err := stats.RecordWithTags(rctx,
manifestIDTagAndOrchInfo(orchInfo, ctx), census.mSegmentTranscodedAllAppeared.M(1)); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions monitor/census_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestLastSegmentTimeout(t *testing.T) {
if sr := census.successRate(); sr != 1 {
t.Fatalf("Success rate should be 1, not %f", sr)
}
SegmentFullyTranscoded(context.Background(), 1, 1, "ps", "", &lpnet.OrchestratorInfo{})
SegmentFullyTranscoded(context.Background(), 1, 1, "ps", "", &lpnet.OrchestratorInfo{}, nil)
if sr := census.successRate(); sr != 1 {
t.Fatalf("Success rate should be 1, not %f", sr)
}
Expand All @@ -74,7 +74,7 @@ func TestLastSegmentTimeout(t *testing.T) {
SegmentEmerged(context.TODO(), 1, 3, 3, 1)
SegmentTranscodeFailed(context.TODO(), SegmentTranscodeErrorSessionEnded, 1, 3, fmt.Errorf("some"), true)
SegmentEmerged(context.TODO(), 1, 4, 3, 1)
SegmentFullyTranscoded(context.Background(), 1, 4, "ps", "", &lpnet.OrchestratorInfo{})
SegmentFullyTranscoded(context.Background(), 1, 4, "ps", "", &lpnet.OrchestratorInfo{}, nil)
if sr := census.successRate(); sr != 0.75 {
t.Fatalf("Success rate should be 0.75, not %f", sr)
}
Expand All @@ -85,7 +85,7 @@ func TestLastSegmentTimeout(t *testing.T) {

StreamCreated("h1", 2)
SegmentEmerged(context.TODO(), 2, 1, 3, 1)
SegmentFullyTranscoded(context.Background(), 2, 1, "ps", "", &lpnet.OrchestratorInfo{})
SegmentFullyTranscoded(context.Background(), 2, 1, "ps", "", &lpnet.OrchestratorInfo{}, nil)
SegmentEmerged(context.TODO(), 2, 2, 3, 1)
StreamEnded(context.TODO(), 2)
if len(census.success) != 1 {
Expand All @@ -111,7 +111,7 @@ func TestLastSegmentTimeout(t *testing.T) {

StreamCreated("h3", 3)
SegmentEmerged(context.TODO(), 3, 1, 3, 1)
SegmentFullyTranscoded(context.Background(), 3, 1, "ps", "", &lpnet.OrchestratorInfo{})
SegmentFullyTranscoded(context.Background(), 3, 1, "ps", "", &lpnet.OrchestratorInfo{}, nil)
SegmentEmerged(context.TODO(), 3, 2, 3, 1)
StreamEnded(context.TODO(), 3)
if len(census.success) != 1 {
Expand Down
12 changes: 11 additions & 1 deletion server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,17 @@ func downloadResults(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSSe
}

if monitor.Enabled {
monitor.SegmentFullyTranscoded(ctx, nonce, seg.SeqNo, common.ProfilesNames(sess.Params.Profiles), errCode, sess.OrchestratorInfo)
orchCapacities := make(map[string]int64)
if sess.OrchestratorInfo != nil && sess.OrchestratorInfo.Capabilities != nil {
for capability, capacity := range sess.OrchestratorInfo.Capabilities.Capacities {
capName, err := core.CapabilityToName(core.Capability(capability))
if err != nil {
continue

Check warning on line 1377 in server/broadcast.go

View check run for this annotation

Codecov / codecov/patch

server/broadcast.go#L1372-L1377

Added lines #L1372 - L1377 were not covered by tests
}
orchCapacities[capName] = int64(capacity)

Check warning on line 1379 in server/broadcast.go

View check run for this annotation

Codecov / codecov/patch

server/broadcast.go#L1379

Added line #L1379 was not covered by tests
}
}
monitor.SegmentFullyTranscoded(ctx, nonce, seg.SeqNo, common.ProfilesNames(sess.Params.Profiles), errCode, sess.OrchestratorInfo, orchCapacities)

Check warning on line 1382 in server/broadcast.go

View check run for this annotation

Codecov / codecov/patch

server/broadcast.go#L1382

Added line #L1382 was not covered by tests
}

clog.V(common.DEBUG).Infof(ctx, "Successfully validated segment")
Expand Down

0 comments on commit b9f3724

Please sign in to comment.