From 6d7e5dfa7e78f8f7147142f9704a956bb914b1cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 17 Oct 2022 12:47:32 +0300 Subject: [PATCH 1/2] *: port changes from v2.37.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Generated with: ``` git diff --patch v2.37.0...v2.37.0_for_thanos ``` Signed-off-by: Giedrius Statkevičius --- promql/engine.go | 100 +++++++++++++-- promql/functions.go | 135 +++++++++++++++++++++ promql/parser/functions.go | 19 +++ promql/testdata/functions.test | 214 +++++++++++++++++++++++++++++++-- web/api/v1/api.go | 20 +++ web/api/v1/api_test.go | 8 ++ 6 files changed, 473 insertions(+), 23 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index e7da2a96fbb..dc06d25947d 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -821,6 +821,16 @@ func (ng *Engine) getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorS start = start - offsetMilliseconds end = end - offsetMilliseconds + f, ok := parser.Functions[extractFuncFromPath(path)] + if ok && f.ExtRange { + // Buffer more so that we could reasonably + // inject a zero if there is only one point. + if extractFuncFromPath(path) == "xincrease" { + start -= durationMilliseconds(1 * 24 * time.Hour) + } + start -= durationMilliseconds(ng.lookbackDelta) + } + return start, end } @@ -1024,6 +1034,8 @@ type EvalNodeHelper struct { rightSigs map[string]Sample matchedSigs map[string]map[uint64]struct{} resultMetric map[string]labels.Labels + + metricAppeared int64 } // DropMetricName is a cached version of DropMetricName. @@ -1376,18 +1388,30 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { mat := make(Matrix, 0, len(selVS.Series)) // Output matrix. offset := durationMilliseconds(selVS.Offset) selRange := durationMilliseconds(sel.Range) + stepRange := selRange if stepRange > ev.interval { stepRange = ev.interval } + bufferRange := selRange + + if e.Func.ExtRange { + bufferRange += durationMilliseconds(ev.lookbackDelta) + stepRange += durationMilliseconds(ev.lookbackDelta) + } + + if e.Func.Name == "xincrease" { + bufferRange += durationMilliseconds(1 * 24 * time.Hour) + } // Reuse objects across steps to save memory allocations. points := getPointSlice(16) inMatrix := make(Matrix, 1) inArgs[matrixArgIndex] = inMatrix enh := &EvalNodeHelper{Out: make(Vector, 0, 1)} // Process all the calls for one time series at a time. - it := storage.NewBuffer(selRange) + it := storage.NewBuffer(bufferRange) for i, s := range selVS.Series { + enh.metricAppeared = -1 ev.currentSamples -= len(points) points = points[:0] it.Reset(s.Iterator()) @@ -1414,13 +1438,21 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { otherInArgs[j][0].V = otherArgs[j][0].Points[step].V } } + maxt := ts - offset mint := maxt - selRange + + var metricAppeared int64 = -1 // Evaluate the matrix selector for this series for this step. - points = ev.matrixIterSlice(it, mint, maxt, points) + points = ev.matrixIterSlice(it, mint, maxt, e.Func.ExtRange, points, &metricAppeared, e.Func.Name) if len(points) == 0 { + enh.metricAppeared = -1 continue } + if enh.metricAppeared == -1 && metricAppeared != -1 { + enh.metricAppeared = metricAppeared + } + inMatrix[0].Points = points enh.Ts = ts // Make the function call. @@ -1809,7 +1841,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag Metric: series[i].Labels(), } - ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16)) + ss.Points = ev.matrixIterSlice(it, mint, maxt, false, getPointSlice(16), nil, "") ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, int64(len(ss.Points))) if len(ss.Points) > 0 { @@ -1829,7 +1861,14 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag // values). Any such points falling before mint are discarded; points that fall // into the [mint, maxt] range are retained; only points with later timestamps // are populated from the iterator. -func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point { +func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, extRange bool, out []Point, metricAppeared *int64, functionName string) []Point { + var extMint int64 + if functionName == "xincrease" { + extMint = mint - durationMilliseconds(4*24*time.Hour) + } else { + extMint = mint - durationMilliseconds(ev.lookbackDelta) + } + if len(out) > 0 && out[len(out)-1].T >= mint { // There is an overlap between previous and current ranges, retain common // points. In most such cases: @@ -1837,13 +1876,28 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m // (b) the number of samples is relatively small. // so a linear search will be as fast as a binary search. var drop int - for drop = 0; out[drop].T < mint; drop++ { + if !extRange { + for drop = 0; out[drop].T < mint; drop++ { + } + // Only append points with timestamps after the last timestamp we have. + mint = out[len(out)-1].T + 1 + } else { + // This is an argument to an extended range function, first go past mint. + for drop = 0; drop < len(out) && out[drop].T <= mint; drop++ { + } + // Then, go back one sample if within lookbackDelta of mint. + if drop > 0 && out[drop-1].T >= extMint { + drop-- + } + if out[len(out)-1].T >= mint { + // Only append points with timestamps after the last timestamp we have. + mint = out[len(out)-1].T + 1 + } } + ev.currentSamples -= drop copy(out, out[drop:]) out = out[:len(out)-drop] - // Only append points with timestamps after the last timestamp we have. - mint = out[len(out)-1].T + 1 } else { ev.currentSamples -= len(out) out = out[:0] @@ -1857,18 +1911,38 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m } buf := it.Buffer() + appendedPointBeforeMint := len(out) > 0 for buf.Next() { t, v := buf.At() if value.IsStaleNaN(v) { continue } - // Values in the buffer are guaranteed to be smaller than maxt. - if t >= mint { - if ev.currentSamples >= ev.maxSamples { - ev.error(ErrTooManySamples(env)) + if metricAppeared != nil && *metricAppeared == -1 { + *metricAppeared = t + } + if !extRange { + // Values in the buffer are guaranteed to be smaller than maxt. + if t >= mint { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + out = append(out, Point{T: t, V: v}) + ev.currentSamples++ + } + } else { + // This is the argument to an extended range function: if any point + // exists at or before range start, add it and then keep replacing + // it with later points while not yet (strictly) inside the range. + if t > mint || !appendedPointBeforeMint { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + out = append(out, Point{T: t, V: v}) + ev.currentSamples++ + appendedPointBeforeMint = true + } else { + out[len(out)-1] = Point{T: t, V: v} } - ev.currentSamples++ - out = append(out, Point{T: t, V: v}) } } // The seeked sample might also be in the range. diff --git a/promql/functions.go b/promql/functions.go index e25f3f119e1..053df08e64f 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -16,6 +16,7 @@ package promql import ( "fmt" "math" + "os" "sort" "strconv" "strings" @@ -136,6 +137,97 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod }) } +// extendedRate is a utility function for xrate/xincrease/xdelta. +// It calculates the rate (allowing for counter resets if isCounter is true), +// taking into account the last sample before the range start, and returns +// the result as either per-second (if isRate is true) or overall. +func extendedRate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector { + ms := args[0].(*parser.MatrixSelector) + vs := ms.VectorSelector.(*parser.VectorSelector) + + var ( + samples = vals[0].(Matrix)[0] + rangeStart = enh.Ts - durationMilliseconds(ms.Range+vs.Offset) + rangeEnd = enh.Ts - durationMilliseconds(vs.Offset) + ) + + points := samples.Points + if len(points) == 0 { + return enh.Out + } + + sameVals := true + for i := range points { + if i > 0 && points[i-1].V != points[i].V { + sameVals = false + break + } + } + + until := enh.metricAppeared + durationMilliseconds(ms.Range) + if isCounter && !isRate && sameVals && enh.metricAppeared != -1 { + if enh.Ts-durationMilliseconds(vs.Offset) <= until || (vs.Timestamp != nil && *vs.Timestamp <= until) { + return append(enh.Out, Sample{ + Point: Point{V: points[0].V}, + }) + } + } + + sampledRange := float64(points[len(points)-1].T - points[0].T) + averageInterval := sampledRange / float64(len(points)-1) + + firstPoint := 0 + // Only do this for not xincrease. + if !(isCounter && !isRate) { + // If the point before the range is too far from rangeStart, drop it. + if float64(rangeStart-points[0].T) > averageInterval { + if len(points) < 3 { + return enh.Out + } + firstPoint = 1 + sampledRange = float64(points[len(points)-1].T - points[1].T) + averageInterval = sampledRange / float64(len(points)-2) + } + } + + var ( + counterCorrection float64 + lastValue float64 + ) + if isCounter { + for i := firstPoint; i < len(points); i++ { + sample := points[i] + if sample.V < lastValue { + counterCorrection += lastValue + } + lastValue = sample.V + } + } + resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection + + // Duration between last sample and boundary of range. + durationToEnd := float64(rangeEnd - points[len(points)-1].T) + + // If the points cover the whole range (i.e. they start just before the + // range start and end just before the range end) adjust the value from + // the sampled range to the requested range. + // Only do this for not xincrease. + if !(isCounter && !isRate) { + if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval { + adjustToRange := float64(durationMilliseconds(ms.Range)) + resultValue = resultValue * (adjustToRange / sampledRange) + } + } + + if isRate { + resultValue = resultValue / ms.Range.Seconds() + } + + return append(enh.Out, Sample{ + Point: Point{V: resultValue}, + }) +} + // === delta(Matrix parser.ValueTypeMatrix) Vector === func funcDelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { return extrapolatedRate(vals, args, enh, false, false) @@ -151,6 +243,21 @@ func funcIncrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel return extrapolatedRate(vals, args, enh, true, false) } +// === xdelta(Matrix parser.ValueTypeMatrix) Vector === +func funcXdelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { + return extendedRate(vals, args, enh, false, false) +} + +// === xrate(node parser.ValueTypeMatrix) Vector === +func funcXrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { + return extendedRate(vals, args, enh, true, true) +} + +// === xincrease(node parser.ValueTypeMatrix) Vector === +func funcXincrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { + return extendedRate(vals, args, enh, true, false) +} + // === irate(node parser.ValueTypeMatrix) Vector === func funcIrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { return instantValue(vals, enh.Out, true) @@ -1142,6 +1249,9 @@ var FunctionCalls = map[string]FunctionCall{ "time": funcTime, "timestamp": funcTimestamp, "vector": funcVector, + "xdelta": funcXdelta, + "xincrease": funcXincrease, + "xrate": funcXrate, "year": funcYear, } @@ -1160,6 +1270,31 @@ var AtModifierUnsafeFunctions = map[string]struct{}{ "timestamp": {}, } +func init() { + // REPLACE_RATE_FUNCS replaces the default rate extrapolation functions + // with xrate functions. This allows for a drop-in replacement and Grafana + // auto-completion, Prometheus tooling, Thanos, etc. should still work as expected. + if os.Getenv("REPLACE_RATE_FUNCS") == "1" { + FunctionCalls["delta"] = FunctionCalls["xdelta"] + FunctionCalls["increase"] = FunctionCalls["xincrease"] + FunctionCalls["rate"] = FunctionCalls["xrate"] + delete(FunctionCalls, "xdelta") + delete(FunctionCalls, "xincrease") + delete(FunctionCalls, "xrate") + + parser.Functions["delta"] = parser.Functions["xdelta"] + parser.Functions["increase"] = parser.Functions["xincrease"] + parser.Functions["rate"] = parser.Functions["xrate"] + parser.Functions["delta"].Name = "delta" + parser.Functions["increase"].Name = "increase" + parser.Functions["rate"].Name = "rate" + delete(parser.Functions, "xdelta") + delete(parser.Functions, "xincrease") + delete(parser.Functions, "xrate") + fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).") + } +} + type vectorByValueHeap Vector func (s vectorByValueHeap) Len() int { diff --git a/promql/parser/functions.go b/promql/parser/functions.go index 92afff8b238..011dc8f31c7 100644 --- a/promql/parser/functions.go +++ b/promql/parser/functions.go @@ -20,6 +20,7 @@ type Function struct { ArgTypes []ValueType Variadic int ReturnType ValueType + ExtRange bool } // Functions is a list of all functions supported by PromQL, including their types. @@ -363,6 +364,24 @@ var Functions = map[string]*Function{ ArgTypes: []ValueType{ValueTypeScalar}, ReturnType: ValueTypeVector, }, + "xdelta": { + Name: "xdelta", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + ExtRange: true, + }, + "xincrease": { + Name: "xincrease", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + ExtRange: true, + }, + "xrate": { + Name: "xrate", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + ExtRange: true, + }, "year": { Name: "year", ArgTypes: []ValueType{ValueTypeVector}, diff --git a/promql/testdata/functions.test b/promql/testdata/functions.test index 02e6a32190e..839b3dd64a5 100644 --- a/promql/testdata/functions.test +++ b/promql/testdata/functions.test @@ -1,3 +1,151 @@ +# Comparison of rate vs xrate. + +load 5s + http_requests{path="/foo"} 1 1 1 2 2 2 2 2 3 3 3 + http_requests{path="/bar"} 1 2 3 4 5 6 7 8 9 10 11 + + +# +# Timeseries starts inside range, (presumably) goes on after range end. +# + +# 1. Reference eval, aligned with collection. +eval instant at 25s rate(http_requests[50s]) + {path="/foo"} .022 + {path="/bar"} .12 + +eval instant at 25s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + +# 2. Eval 1 second earlier compared to (1). +# * path="/foo" rate should be same or fractionally higher ("shorter" sample, same actual increase); +# * path="/bar" rate should be same or fractionally lower (80% the increase, 80/96% range covered by sample). +# XXX Seeing ~20% jump for path="/foo" +eval instant at 24s rate(http_requests[50s]) + {path="/foo"} .0265 + {path="/bar"} .116 + +eval instant at 24s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .08 + +# 3. Eval 1 second later compared to (1). +# * path="/foo" rate should be same or fractionally lower ("longer" sample, same actual increase). +# * path="/bar" rate should be same or fractionally lower ("longer" sample, same actual increase). +# XXX Higher instead of lower for both. +eval instant at 26s rate(http_requests[50s]) + {path="/foo"} .0228 + {path="/bar"} .124 + +eval instant at 26s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + + +# +# Timeseries starts before range, ends within range. +# + +# 4. Reference eval, aligned with collection. +eval instant at 75s rate(http_requests[50s]) + {path="/foo"} .022 + {path="/bar"} .11 + +eval instant at 75s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + +# 5. Eval 1s earlier compared to (4). +# * path="/foo" rate should be same or fractionally lower ("longer" sample, same actual increase). +# * path="/bar" rate should be same or fractionally lower ("longer" sample, same actual increase). +# XXX Higher instead of lower for both. +eval instant at 74s rate(http_requests[50s]) + {path="/foo"} .0228 + {path="/bar"} .114 + +# XXX Higher instead of lower for {path="/bar"}. +eval instant at 74s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .12 + +# 6. Eval 1s later compared to (4). Rate/increase (should be) fractionally smaller. +# * path="/foo" rate should be same or fractionally higher ("shorter" sample, same actual increase); +# * path="/bar" rate should be same or fractionally lower (80% the increase, 80/96% range covered by sample). +# XXX Seeing ~20% jump for path="/foo", decrease instead of increase for path="/bar". +eval instant at 76s rate(http_requests[50s]) + {path="/foo"} .0265 + {path="/bar"} .106 + +eval instant at 76s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + +# +# Evaluation of 10 second rate every 10 seconds, not aligned with collection. +# + +eval instant at 9s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 19s rate(http_requests[10s]) + {path="/foo"} 0.2 + {path="/bar"} 0.2 + +eval instant at 29s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 39s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +# XXX Missed an increase in path="/foo" between timestamps 35 and 40 (both in this eval and the one before). +eval instant at 49s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 9s xrate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.1 + +eval instant at 19s xrate(http_requests[10s]) + {path="/foo"} 0.1 + {path="/bar"} 0.2 + +eval instant at 29s xrate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 39s xrate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +# XXX Sees the increase in path="/foo" between timestamps 35 and 40. +eval instant at 49s xrate(http_requests[10s]) + {path="/foo"} .1 + {path="/bar"} 0.2 + +# xincrease injects a zero if there is only one sample in the given timerange. +eval instant at 1s xincrease(http_requests[5s]) + {path="/foo"} 1 + {path="/bar"} 1 + + + +# xincrease does not inject anything at the end of the given timerange if there are +# two or more samples. +eval instant at 55s xincrease(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 1 + +clear + + + + + # Testdata for resets() and changes(). load 5m http_requests{path="/foo"} 1 2 3 0 1 0 0 1 2 0 @@ -67,23 +215,54 @@ eval instant at 15m changes(x[15m]) clear -# Tests for increase(). -load 5m +# Tests for increase()/xincrease()/xrate(). +load 5s http_requests{path="/foo"} 0+10x10 - http_requests{path="/bar"} 0+10x5 0+10x5 + http_requests{path="/bar"} 0+10x5 0+10x4 # Tests for increase(). -eval instant at 50m increase(http_requests[50m]) +eval instant at 50s increase(http_requests[50s]) {path="/foo"} 100 {path="/bar"} 90 -eval instant at 50m increase(http_requests[100m]) +eval instant at 50s increase(http_requests[100s]) {path="/foo"} 100 {path="/bar"} 90 +# Tests for xincrease(). +eval instant at 50s xincrease(http_requests[50s]) + {path="/foo"} 100 + {path="/bar"} 90 + +eval instant at 50s xincrease(http_requests[5s]) + {path="/foo"} 10 + {path="/bar"} 10 + +eval instant at 50s xincrease(http_requests[3s]) + {path="/foo"} 10 + {path="/bar"} 10 + + +# Tests for xrate(). +eval instant at 50s xrate(http_requests[50s]) + {path="/foo"} 2 + {path="/bar"} 1.8 + +eval instant at 50s xrate(http_requests[100s]) + {path="/foo"} 1 + {path="/bar"} 0.9 + +eval instant at 50s xrate(http_requests[5s]) + {path="/foo"} 2 + {path="/bar"} 2 + +eval instant at 50s xrate(http_requests[3s]) + {path="/foo"} 2 + {path="/bar"} 2 + clear -# Test for increase() with counter reset. +# Test for increase()/xincrease with counter reset. # When the counter is reset, it always starts at 0. # So the sequence 3 2 (decreasing counter = reset) is interpreted the same as 3 0 1 2. # Prometheus assumes it missed the intermediate values 0 and 1. @@ -91,7 +270,10 @@ load 5m http_requests{path="/foo"} 0 1 2 3 2 3 4 eval instant at 30m increase(http_requests[30m]) - {path="/foo"} 7 + {path="/foo"} 7 + +eval instant at 30m xincrease(http_requests[30m]) + {path="/foo"} 7 clear @@ -169,15 +351,27 @@ eval instant at 30m irate(http_requests[50m]) clear -# Tests for delta(). +# Tests for delta()/xdelta(). load 5m - http_requests{path="/foo"} 0 50 100 150 200 - http_requests{path="/bar"} 200 150 100 50 0 + http_requests{path="/foo"} 0 50 300 150 200 + http_requests{path="/bar"} 200 150 300 50 0 eval instant at 20m delta(http_requests[20m]) {path="/foo"} 200 {path="/bar"} -200 +eval instant at 20m xdelta(http_requests[20m]) + {path="/foo"} 200 + {path="/bar"} -200 + +eval instant at 20m xdelta(http_requests[19m]) + {path="/foo"} 190 + {path="/bar"} -190 + +eval instant at 20m xdelta(http_requests[1m]) + {path="/foo"} 10 + {path="/bar"} -10 + clear # Tests for idelta(). diff --git a/web/api/v1/api.go b/web/api/v1/api.go index bd040912ef0..3ae4781ea93 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -726,6 +726,10 @@ var ( maxTimeFormatted = maxTime.Format(time.RFC3339Nano) ) +type seriesOnlyCountResult struct { + MetricsCount uint64 `json:"metrics_count"` +} + func (api *API) series(r *http.Request) (result apiFuncResult) { if err := r.ParseForm(); err != nil { return apiFuncResult{nil, &apiError{errorBadData, errors.Wrapf(err, "error parsing form values")}, nil, nil} @@ -784,6 +788,22 @@ func (api *API) series(r *http.Request) (result apiFuncResult) { set = q.Select(false, hints, matcherSets[0]...) } + if r.Form.Get("only_count") == "1" { + var count uint64 + + for set.Next() { + count++ + } + + warnings := set.Warnings() + if set.Err() != nil { + return apiFuncResult{nil, &apiError{errorExec, set.Err()}, warnings, closer} + } + + return apiFuncResult{seriesOnlyCountResult{MetricsCount: count}, nil, warnings, closer} + + } + metrics := []labels.Labels{} for set.Next() { metrics = append(metrics, set.At().Labels()) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index d672807d3f8..06e60fa4c64 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -953,6 +953,14 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E labels.FromStrings("__name__", "test_metric2", "foo", "boo"), }, }, + { + endpoint: api.series, + query: url.Values{ + "match[]": []string{`test_metric2`}, + "only_count": []string{"1"}, + }, + response: uint64(1), + }, { endpoint: api.series, query: url.Values{ From 710465b6efd18a78141ecfefe5abc63f0231341c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 27 Sep 2022 13:59:15 +0300 Subject: [PATCH 2/2] remote/read_handler: pool input to Marshal() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use a sync.Pool to reuse byte slices between calls to Marshal() in the remote read handler. Signed-off-by: Giedrius Statkevičius --- prompb/custom.go | 17 +++++++++++++++++ storage/remote/codec.go | 11 +++++++++-- storage/remote/read_handler.go | 4 ++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/prompb/custom.go b/prompb/custom.go index 0b3820c4d28..4b07187bd28 100644 --- a/prompb/custom.go +++ b/prompb/custom.go @@ -13,5 +13,22 @@ package prompb +import ( + "sync" +) + func (m Sample) T() int64 { return m.Timestamp } func (m Sample) V() float64 { return m.Value } + +func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) { + size := r.Size() + data, ok := p.Get().(*[]byte) + if ok && cap(*data) >= size { + n, err := r.MarshalToSizedBuffer((*data)[:size]) + if err != nil { + return nil, err + } + return (*data)[:n], nil + } + return r.Marshal() +} diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 5d78107bb96..0bd05dd22e1 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -20,6 +20,7 @@ import ( "net/http" "sort" "strings" + "sync" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -191,6 +192,7 @@ func StreamChunkedReadResponses( ss storage.ChunkSeriesSet, sortedExternalLabels []prompb.Label, maxBytesInFrame int, + marshalPool *sync.Pool, ) (storage.Warnings, error) { var ( chks []prompb.Chunk @@ -232,12 +234,14 @@ func StreamChunkedReadResponses( continue } - b, err := proto.Marshal(&prompb.ChunkedReadResponse{ + resp := &prompb.ChunkedReadResponse{ ChunkedSeries: []*prompb.ChunkedSeries{ {Labels: lbls, Chunks: chks}, }, QueryIndex: queryIndex, - }) + } + + b, err := resp.PooledMarshal(marshalPool) if err != nil { return ss.Warnings(), fmt.Errorf("marshal ChunkedReadResponse: %w", err) } @@ -245,6 +249,9 @@ func StreamChunkedReadResponses( if _, err := stream.Write(b); err != nil { return ss.Warnings(), fmt.Errorf("write to stream: %w", err) } + + // We immediately flush the Write() so it is safe to return to the pool. + marshalPool.Put(&b) chks = chks[:0] } if err := iter.Err(); err != nil { diff --git a/storage/remote/read_handler.go b/storage/remote/read_handler.go index e1f1df21c19..116eb9596c4 100644 --- a/storage/remote/read_handler.go +++ b/storage/remote/read_handler.go @@ -17,6 +17,7 @@ import ( "context" "net/http" "sort" + "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -37,6 +38,7 @@ type readHandler struct { remoteReadMaxBytesInFrame int remoteReadGate *gate.Gate queries prometheus.Gauge + marshalPool *sync.Pool } // NewReadHandler creates a http.Handler that accepts remote read requests and @@ -49,6 +51,7 @@ func NewReadHandler(logger log.Logger, r prometheus.Registerer, queryable storag remoteReadSampleLimit: remoteReadSampleLimit, remoteReadGate: gate.New(remoteReadConcurrencyLimit), remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame, + marshalPool: &sync.Pool{}, queries: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "prometheus", @@ -225,6 +228,7 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re querier.Select(true, hints, filteredMatchers...), sortedExternalLabels, h.remoteReadMaxBytesInFrame, + h.marshalPool, ) if err != nil { return err