Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3185 unify pipeline tag #3191

Merged
merged 6 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,7 @@
modelID := p.ModelID
autoCapPrice, err := core.NewAutoConvertedPrice(p.Currency, maxCapabilityPrice, func(price *big.Rat) {
if monitor.Enabled {
monitor.MaxPriceForCapability(capName, modelID, price)
monitor.MaxPriceForCapability(monitor.ToPipeline(capName), modelID, price)

Check warning on line 1036 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1036

Added line #L1036 was not covered by tests
}
glog.Infof("Maximum price per unit set to %v wei for capability=%v model_id=%v", price.FloatString(3), p.Pipeline, p.ModelID)
})
Expand Down
17 changes: 11 additions & 6 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -1698,10 +1698,10 @@
}
}

func MaxPriceForCapability(cap string, modelName string, maxPrice *big.Rat) {
func MaxPriceForCapability(pipeline string, modelName string, maxPrice *big.Rat) {

Check warning on line 1701 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1701

Added line #L1701 was not covered by tests
floatWei, _ := maxPrice.Float64()
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kPipeline, cap), tag.Insert(census.kModelName, modelName)},
[]tag.Mutator{tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelName)},

Check warning on line 1704 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1704

Added line #L1704 was not covered by tests
census.mPricePerCapability.M(floatWei)); err != nil {

glog.Errorf("Error recording metrics err=%q", err)
Expand Down Expand Up @@ -1885,13 +1885,13 @@
}

// AIRequestError logs an error in a gateway AI job request.
func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.OrchestratorInfo) {
func AIRequestError(code string, pipeline string, model string, orchInfo *lpnet.OrchestratorInfo) {

Check warning on line 1888 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1888

Added line #L1888 was not covered by tests
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}

tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}

Check warning on line 1894 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1894

Added line #L1894 was not covered by tests
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
Expand Down Expand Up @@ -1934,9 +1934,9 @@
}

// AIProcessingError logs errors in orchestrator AI job processing.
func AIProcessingError(code string, Pipeline string, Model string, sender string) {
func AIProcessingError(code string, pipeline string, model string, sender string) {

Check warning on line 1937 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1937

Added line #L1937 was not covered by tests
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)},
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, model), tag.Insert(census.kSender, sender)},

Check warning on line 1939 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L1939

Added line #L1939 was not covered by tests
census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
Expand Down Expand Up @@ -1999,3 +1999,8 @@
clog.Errorf(ctx, "Error recording metrics err=%q", err)
}
}

// ToPipeline converts capability name into pipeline name
func ToPipeline(cap string) string {
return strings.Replace(strings.ToLower(cap), " ", "-", -1)

Check warning on line 2005 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L2004-L2005

Added lines #L2004 - L2005 were not covered by tests
}
18 changes: 9 additions & 9 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,30 +681,30 @@
mw, err := worker.NewSegmentAnything2MultipartWriter(&buf, req)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 684 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L684

Added line #L684 was not covered by tests
}
return nil, err
}

client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 692 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L692

Added line #L692 was not covered by tests
}
return nil, err
}

imageRdr, err := req.Image.Reader()
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 700 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L700

Added line #L700 was not covered by tests
}
return nil, err
}
config, _, err := image.DecodeConfig(imageRdr)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 707 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L707

Added line #L707 was not covered by tests
}
return nil, err
}
Expand All @@ -713,7 +713,7 @@
setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 716 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L716

Added line #L716 was not covered by tests
}
return nil, err
}
Expand All @@ -724,7 +724,7 @@
took := time.Since(start)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "segment anything 2", *req.ModelId, sess.OrchestratorInfo)
monitor.AIRequestError(err.Error(), "segment-anything-2", *req.ModelId, sess.OrchestratorInfo)

Check warning on line 727 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L727

Added line #L727 was not covered by tests
}
return nil, err
}
Expand All @@ -748,7 +748,7 @@
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}

monitor.AIRequestFinished(ctx, "segment anything 2", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
monitor.AIRequestFinished(ctx, "segment-anything-2", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)

Check warning on line 751 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L751

Added line #L751 was not covered by tests
}

return resp.JSON200, nil
Expand Down Expand Up @@ -1116,7 +1116,7 @@
case <-cctx.Done():
err := fmt.Errorf("no orchestrators available within %v timeout", processingRetryTimeout)
if monitor.Enabled {
monitor.AIRequestError(err.Error(), capName, modelID, nil)
monitor.AIRequestError(err.Error(), monitor.ToPipeline(capName), modelID, nil)

Check warning on line 1119 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1119

Added line #L1119 was not covered by tests
}
return nil, &ServiceUnavailableError{err: err}
default:
Expand Down Expand Up @@ -1154,7 +1154,7 @@
if resp == nil {
errMsg := "no orchestrators available"
if monitor.Enabled {
monitor.AIRequestError(errMsg, capName, modelID, nil)
monitor.AIRequestError(errMsg, monitor.ToPipeline(capName), modelID, nil)

Check warning on line 1157 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1157

Added line #L1157 was not covered by tests
}
return nil, &ServiceUnavailableError{err: errors.New(errMsg)}
}
Expand Down
2 changes: 1 addition & 1 deletion server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@
var err error
autoPrice, err = core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
if monitor.Enabled {
monitor.MaxPriceForCapability(core.CapabilityNameLookup[cap], modelID, price)
monitor.MaxPriceForCapability(monitor.ToPipeline(core.CapabilityNameLookup[cap]), modelID, price)

Check warning on line 245 in server/handlers.go

View check run for this annotation

Codecov / codecov/patch

server/handlers.go#L245

Added line #L245 was not covered by tests
}
glog.Infof("Maximum price per unit set to %v wei for capability=%v model_id=%v", price.FloatString(3), pipeline, modelID)
})
Expand Down
Loading