Skip to content

Commit

Permalink
[3185] Unify pipeline tag on prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pwilczynskiclearcode committed Oct 2, 2024
1 parent 93bec37 commit 5abfd40
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
25 changes: 16 additions & 9 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ func MaxTranscodingPrice(maxPrice *big.Rat) {
func MaxPriceForCapability(cap string, modelName string, maxPrice *big.Rat) {
floatWei, _ := maxPrice.Float64()
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kPipeline, cap), tag.Insert(census.kModelName, modelName)},
[]tag.Mutator{tag.Insert(census.kPipeline, normilizePipelineTag(cap)), tag.Insert(census.kModelName, modelName)},
census.mPricePerCapability.M(floatWei)); err != nil {

glog.Errorf("Error recording metrics err=%q", err)
Expand Down Expand Up @@ -1805,9 +1805,10 @@ func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string

// AIRequestFinished records gateway AI job request metrics.
func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit)
pipelineTag := normilizePipelineTag(pipeline)
census.recordModelRequested(pipelineTag, model)
census.recordAIRequestLatencyScore(pipelineTag, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipelineTag, model, jobInfo.PricePerUnit)
}

// recordAIRequestLatencyScore records the latency score for a AI job request.
Expand Down Expand Up @@ -1845,7 +1846,7 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.
orchAddr = common.BytesToAddress(addr).String()
}

tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, normilizePipelineTag(Pipeline)), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
Expand All @@ -1858,9 +1859,11 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.

// AIJobProcessed records orchestrator AI job processing metrics.
func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore)
census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit)
pipelineTag := normilizePipelineTag(pipeline)

census.recordModelRequested(pipelineTag, model)
census.recordAIJobLatencyScore(pipelineTag, model, jobInfo.LatencyScore)
census.recordAIJobPricePerUnit(pipelineTag, model, jobInfo.PricePerUnit)
}

// recordAIJobLatencyScore records the latency score for a processed AI job.
Expand Down Expand Up @@ -1890,7 +1893,7 @@ func (cen *censusMetricsCounter) recordAIJobPricePerUnit(Pipeline string, Model
// AIProcessingError logs errors in orchestrator AI job processing.
func AIProcessingError(code string, Pipeline string, Model string, sender string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)},
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, normilizePipelineTag(Pipeline)), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)},
census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
Expand Down Expand Up @@ -1924,3 +1927,7 @@ func FastVerificationFailed(ctx context.Context, uri string, errtype int) {
clog.Errorf(ctx, "Error recording metrics err=%q", err)
}
}

func normilizePipelineTag(pipeline string) string {
return strings.Replace(strings.ToLower(pipeline), " ", "-", -1)
}
14 changes: 7 additions & 7 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,30 +633,30 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
mw, err := worker.NewSegmentAnything2MultipartWriter(&buf, req)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}

client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}

imageRdr, err := req.Image.Reader()
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
config, _, err := image.DecodeConfig(imageRdr)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
Expand All @@ -665,7 +665,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
Expand All @@ -676,7 +676,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
took := time.Since(start)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
Expand All @@ -700,7 +700,7 @@ func submitSegmentAnything2(ctx context.Context, params aiRequestParams, sess *A
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}

monitor.AIRequestFinished(ctx, "segment anything 2", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
monitor.AIRequestFinished(ctx, "segment-anything-2", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
}

return resp.JSON200, nil
Expand Down

0 comments on commit 5abfd40

Please sign in to comment.