Skip to content

Commit

Permalink
Merge pull request #32 from GiedriusS/v2.39.0_for_thanos_fixes
Browse files Browse the repository at this point in the history
Port our changes to v2.39.0
  • Loading branch information
GiedriusS authored Oct 17, 2022
2 parents f7a7b18 + 710465b commit 465a7ad
Show file tree
Hide file tree
Showing 9 changed files with 503 additions and 25 deletions.
17 changes: 17 additions & 0 deletions prompb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,22 @@

package prompb

import (
"sync"
)

func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }

func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
size := r.Size()
data, ok := p.Get().(*[]byte)
if ok && cap(*data) >= size {
n, err := r.MarshalToSizedBuffer((*data)[:size])
if err != nil {
return nil, err
}
return (*data)[:n], nil
}
return r.Marshal()
}
100 changes: 87 additions & 13 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,16 @@ func (ng *Engine) getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorS
start = start - offsetMilliseconds
end = end - offsetMilliseconds

f, ok := parser.Functions[extractFuncFromPath(path)]
if ok && f.ExtRange {
// Buffer more so that we could reasonably
// inject a zero if there is only one point.
if extractFuncFromPath(path) == "xincrease" {
start -= durationMilliseconds(1 * 24 * time.Hour)
}
start -= durationMilliseconds(ng.lookbackDelta)
}

return start, end
}

Expand Down Expand Up @@ -1024,6 +1034,8 @@ type EvalNodeHelper struct {
rightSigs map[string]Sample
matchedSigs map[string]map[uint64]struct{}
resultMetric map[string]labels.Labels

metricAppeared int64
}

// DropMetricName is a cached version of DropMetricName.
Expand Down Expand Up @@ -1376,18 +1388,30 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
mat := make(Matrix, 0, len(selVS.Series)) // Output matrix.
offset := durationMilliseconds(selVS.Offset)
selRange := durationMilliseconds(sel.Range)

stepRange := selRange
if stepRange > ev.interval {
stepRange = ev.interval
}
bufferRange := selRange

if e.Func.ExtRange {
bufferRange += durationMilliseconds(ev.lookbackDelta)
stepRange += durationMilliseconds(ev.lookbackDelta)
}

if e.Func.Name == "xincrease" {
bufferRange += durationMilliseconds(1 * 24 * time.Hour)
}
// Reuse objects across steps to save memory allocations.
points := getPointSlice(16)
inMatrix := make(Matrix, 1)
inArgs[matrixArgIndex] = inMatrix
enh := &EvalNodeHelper{Out: make(Vector, 0, 1)}
// Process all the calls for one time series at a time.
it := storage.NewBuffer(selRange)
it := storage.NewBuffer(bufferRange)
for i, s := range selVS.Series {
enh.metricAppeared = -1
ev.currentSamples -= len(points)
points = points[:0]
it.Reset(s.Iterator())
Expand All @@ -1414,13 +1438,21 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
otherInArgs[j][0].V = otherArgs[j][0].Points[step].V
}
}

maxt := ts - offset
mint := maxt - selRange

var metricAppeared int64 = -1
// Evaluate the matrix selector for this series for this step.
points = ev.matrixIterSlice(it, mint, maxt, points)
points = ev.matrixIterSlice(it, mint, maxt, e.Func.ExtRange, points, &metricAppeared, e.Func.Name)
if len(points) == 0 {
enh.metricAppeared = -1
continue
}
if enh.metricAppeared == -1 && metricAppeared != -1 {
enh.metricAppeared = metricAppeared
}

inMatrix[0].Points = points
enh.Ts = ts
// Make the function call.
Expand Down Expand Up @@ -1809,7 +1841,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag
Metric: series[i].Labels(),
}

ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16))
ss.Points = ev.matrixIterSlice(it, mint, maxt, false, getPointSlice(16), nil, "")
ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, int64(len(ss.Points)))

if len(ss.Points) > 0 {
Expand All @@ -1829,21 +1861,43 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag
// values). Any such points falling before mint are discarded; points that fall
// into the [mint, maxt] range are retained; only points with later timestamps
// are populated from the iterator.
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point {
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, extRange bool, out []Point, metricAppeared *int64, functionName string) []Point {
var extMint int64
if functionName == "xincrease" {
extMint = mint - durationMilliseconds(4*24*time.Hour)
} else {
extMint = mint - durationMilliseconds(ev.lookbackDelta)
}

if len(out) > 0 && out[len(out)-1].T >= mint {
// There is an overlap between previous and current ranges, retain common
// points. In most such cases:
// (a) the overlap is significantly larger than the eval step; and/or
// (b) the number of samples is relatively small.
// so a linear search will be as fast as a binary search.
var drop int
for drop = 0; out[drop].T < mint; drop++ {
if !extRange {
for drop = 0; out[drop].T < mint; drop++ {
}
// Only append points with timestamps after the last timestamp we have.
mint = out[len(out)-1].T + 1
} else {
// This is an argument to an extended range function, first go past mint.
for drop = 0; drop < len(out) && out[drop].T <= mint; drop++ {
}
// Then, go back one sample if within lookbackDelta of mint.
if drop > 0 && out[drop-1].T >= extMint {
drop--
}
if out[len(out)-1].T >= mint {
// Only append points with timestamps after the last timestamp we have.
mint = out[len(out)-1].T + 1
}
}

ev.currentSamples -= drop
copy(out, out[drop:])
out = out[:len(out)-drop]
// Only append points with timestamps after the last timestamp we have.
mint = out[len(out)-1].T + 1
} else {
ev.currentSamples -= len(out)
out = out[:0]
Expand All @@ -1857,18 +1911,38 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
}

buf := it.Buffer()
appendedPointBeforeMint := len(out) > 0
for buf.Next() {
t, v := buf.At()
if value.IsStaleNaN(v) {
continue
}
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
if metricAppeared != nil && *metricAppeared == -1 {
*metricAppeared = t
}
if !extRange {
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
}
} else {
// This is the argument to an extended range function: if any point
// exists at or before range start, add it and then keep replacing
// it with later points while not yet (strictly) inside the range.
if t > mint || !appendedPointBeforeMint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
appendedPointBeforeMint = true
} else {
out[len(out)-1] = Point{T: t, V: v}
}
ev.currentSamples++
out = append(out, Point{T: t, V: v})
}
}
// The seeked sample might also be in the range.
Expand Down
135 changes: 135 additions & 0 deletions promql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package promql
import (
"fmt"
"math"
"os"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -136,6 +137,97 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod
})
}

// extendedRate is a utility function for xrate/xincrease/xdelta.
// It calculates the rate (allowing for counter resets if isCounter is true),
// taking into account the last sample before the range start, and returns
// the result as either per-second (if isRate is true) or overall.
func extendedRate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector {
ms := args[0].(*parser.MatrixSelector)
vs := ms.VectorSelector.(*parser.VectorSelector)

var (
samples = vals[0].(Matrix)[0]
rangeStart = enh.Ts - durationMilliseconds(ms.Range+vs.Offset)
rangeEnd = enh.Ts - durationMilliseconds(vs.Offset)
)

points := samples.Points
if len(points) == 0 {
return enh.Out
}

sameVals := true
for i := range points {
if i > 0 && points[i-1].V != points[i].V {
sameVals = false
break
}
}

until := enh.metricAppeared + durationMilliseconds(ms.Range)
if isCounter && !isRate && sameVals && enh.metricAppeared != -1 {
if enh.Ts-durationMilliseconds(vs.Offset) <= until || (vs.Timestamp != nil && *vs.Timestamp <= until) {
return append(enh.Out, Sample{
Point: Point{V: points[0].V},
})
}
}

sampledRange := float64(points[len(points)-1].T - points[0].T)
averageInterval := sampledRange / float64(len(points)-1)

firstPoint := 0
// Only do this for not xincrease.
if !(isCounter && !isRate) {
// If the point before the range is too far from rangeStart, drop it.
if float64(rangeStart-points[0].T) > averageInterval {
if len(points) < 3 {
return enh.Out
}
firstPoint = 1
sampledRange = float64(points[len(points)-1].T - points[1].T)
averageInterval = sampledRange / float64(len(points)-2)
}
}

var (
counterCorrection float64
lastValue float64
)
if isCounter {
for i := firstPoint; i < len(points); i++ {
sample := points[i]
if sample.V < lastValue {
counterCorrection += lastValue
}
lastValue = sample.V
}
}
resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection

// Duration between last sample and boundary of range.
durationToEnd := float64(rangeEnd - points[len(points)-1].T)

// If the points cover the whole range (i.e. they start just before the
// range start and end just before the range end) adjust the value from
// the sampled range to the requested range.
// Only do this for not xincrease.
if !(isCounter && !isRate) {
if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval {
adjustToRange := float64(durationMilliseconds(ms.Range))
resultValue = resultValue * (adjustToRange / sampledRange)
}
}

if isRate {
resultValue = resultValue / ms.Range.Seconds()
}

return append(enh.Out, Sample{
Point: Point{V: resultValue},
})
}

// === delta(Matrix parser.ValueTypeMatrix) Vector ===
func funcDelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return extrapolatedRate(vals, args, enh, false, false)
Expand All @@ -151,6 +243,21 @@ func funcIncrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel
return extrapolatedRate(vals, args, enh, true, false)
}

// === xdelta(Matrix parser.ValueTypeMatrix) Vector ===
func funcXdelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return extendedRate(vals, args, enh, false, false)
}

// === xrate(node parser.ValueTypeMatrix) Vector ===
func funcXrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return extendedRate(vals, args, enh, true, true)
}

// === xincrease(node parser.ValueTypeMatrix) Vector ===
func funcXincrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return extendedRate(vals, args, enh, true, false)
}

// === irate(node parser.ValueTypeMatrix) Vector ===
func funcIrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return instantValue(vals, enh.Out, true)
Expand Down Expand Up @@ -1142,6 +1249,9 @@ var FunctionCalls = map[string]FunctionCall{
"time": funcTime,
"timestamp": funcTimestamp,
"vector": funcVector,
"xdelta": funcXdelta,
"xincrease": funcXincrease,
"xrate": funcXrate,
"year": funcYear,
}

Expand All @@ -1160,6 +1270,31 @@ var AtModifierUnsafeFunctions = map[string]struct{}{
"timestamp": {},
}

func init() {
// REPLACE_RATE_FUNCS replaces the default rate extrapolation functions
// with xrate functions. This allows for a drop-in replacement and Grafana
// auto-completion, Prometheus tooling, Thanos, etc. should still work as expected.
if os.Getenv("REPLACE_RATE_FUNCS") == "1" {
FunctionCalls["delta"] = FunctionCalls["xdelta"]
FunctionCalls["increase"] = FunctionCalls["xincrease"]
FunctionCalls["rate"] = FunctionCalls["xrate"]
delete(FunctionCalls, "xdelta")
delete(FunctionCalls, "xincrease")
delete(FunctionCalls, "xrate")

parser.Functions["delta"] = parser.Functions["xdelta"]
parser.Functions["increase"] = parser.Functions["xincrease"]
parser.Functions["rate"] = parser.Functions["xrate"]
parser.Functions["delta"].Name = "delta"
parser.Functions["increase"].Name = "increase"
parser.Functions["rate"].Name = "rate"
delete(parser.Functions, "xdelta")
delete(parser.Functions, "xincrease")
delete(parser.Functions, "xrate")
fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).")
}
}

type vectorByValueHeap Vector

func (s vectorByValueHeap) Len() int {
Expand Down
Loading

0 comments on commit 465a7ad

Please sign in to comment.