Skip to content

Commit

Permalink
[3185] Unify pipeline tag on prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pwilczynskiclearcode committed Oct 2, 2024
1 parent 93bec37 commit 731b972
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
14 changes: 9 additions & 5 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -1653,9 +1653,10 @@ func MaxTranscodingPrice(maxPrice *big.Rat) {
}

func MaxPriceForCapability(cap string, modelName string, maxPrice *big.Rat) {
pipelineTag := strings.Replace(strings.ToLower(cap), " ", "-", -1)
floatWei, _ := maxPrice.Float64()
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kPipeline, cap), tag.Insert(census.kModelName, modelName)},
[]tag.Mutator{tag.Insert(census.kPipeline, pipelineTag), tag.Insert(census.kModelName, modelName)},
census.mPricePerCapability.M(floatWei)); err != nil {

glog.Errorf("Error recording metrics err=%q", err)
Expand Down Expand Up @@ -1844,8 +1845,9 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}
pipelineTag := strings.Replace(strings.ToLower(Pipeline), " ", "-", -1)

tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipelineTag), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
Expand All @@ -1858,9 +1860,11 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.

// AIJobProcessed records orchestrator AI job processing metrics.
func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore)
census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit)
pipelineTag := strings.Replace(strings.ToLower(pipeline), " ", "-", -1)

census.recordModelRequested(pipelineTag, model)
census.recordAIJobLatencyScore(pipelineTag, model, jobInfo.LatencyScore)
census.recordAIJobPricePerUnit(pipelineTag, model, jobInfo.PricePerUnit)
}

// recordAIJobLatencyScore records the latency score for a processed AI job.
Expand Down
14 changes: 7 additions & 7 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,30 +633,30 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
mw, err := worker.NewSegmentAnything2MultipartWriter(&buf, req)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}

client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}

imageRdr, err := req.Image.Reader()
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
config, _, err := image.DecodeConfig(imageRdr)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
Expand All @@ -665,7 +665,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
Expand All @@ -676,7 +676,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
took := time.Since(start)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
Expand All @@ -700,7 +700,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}

monitor.AIRequestFinished(ctx, "segment anything 2", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
monitor.AIRequestFinished(ctx, "segment-anything-2", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
}

return resp.JSON200, nil
Expand Down

0 comments on commit 731b972

Please sign in to comment.