Skip to content

Commit

Permalink
MQE: Add support for deriv function (#9858)
Browse files Browse the repository at this point in the history
* Implement deriv in MQE

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Update changelog

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Refactor linearRegression

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Fix after rebase

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Apply PR review

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Pass head and tail to linearRegression

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Add tests for deriv

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Refactor tests to make it more readable

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Fix Rebase

Signed-off-by: Jon Kartago Lamida <[email protected]>

---------

Signed-off-by: Jon Kartago Lamida <[email protected]>
  • Loading branch information
lamida authored Nov 13, 2024
1 parent 91e0980 commit 98b43bf
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 8 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@
* [CHANGE] Cache: Deprecate experimental support for Redis as a cache backend. The support is set to be removed in the next major release. #9453
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9371 #9859
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9371 #9859 #9858
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2309,7 +2309,7 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
for _, function := range []string{"rate", "increase", "changes", "resets"} {
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex))
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh),
"count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime),
"deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg),
"deriv": FunctionOverRangeVectorOperatorFactory("deriv", functions.Deriv),
"exp": InstantVectorTransformationFunctionOperatorFactory("exp", functions.Exp),
"floor": InstantVectorTransformationFunctionOperatorFactory("floor", functions.Floor),
"histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount),
Expand Down
67 changes: 67 additions & 0 deletions pkg/streamingpromql/operators/functions/range_vectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,70 @@ func resets(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFu

return resets, true, nil, nil
}

var Deriv = FunctionOverRangeVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
StepFunc: deriv,
}

func deriv(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints()

if (len(head) + len(tail)) < 2 {
return 0, false, nil, nil
}

slope, _ := linearRegression(head, tail, head[0].T)

return slope, true, nil, nil
}

func linearRegression(head, tail []promql.FPoint, interceptTime int64) (slope, intercept float64) {
var (
n float64
sumX, cX float64
sumY, cY float64
sumXY, cXY float64
sumX2, cX2 float64
initY float64
constY bool
)

initY = head[0].F
constY = true
accumulate := func(points []promql.FPoint, head bool) {
for i, sample := range points {
// Set constY to false if any new y values are encountered.
if constY && (i > 0 || !head) && sample.F != initY {
constY = false
}
n += 1.0
x := float64(sample.T-interceptTime) / 1e3
sumX, cX = floats.KahanSumInc(x, sumX, cX)
sumY, cY = floats.KahanSumInc(sample.F, sumY, cY)
sumXY, cXY = floats.KahanSumInc(x*sample.F, sumXY, cXY)
sumX2, cX2 = floats.KahanSumInc(x*x, sumX2, cX2)
}
}

accumulate(head, true)
accumulate(tail, false)

if constY {
if math.IsInf(initY, 0) {
return math.NaN(), math.NaN()
}
return 0, initY
}
sumX += cX
sumY += cY
sumXY += cXY
sumX2 += cX2

covXY := sumXY - sumX*sumY/n
varX := sumX2 - sumX*sumX/n

slope = covXY / varX
intercept = sumY/n - slope*sumX/n
return slope, intercept
}
36 changes: 36 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -501,3 +501,39 @@ eval range from 0 to 20m step 1m resets(metric[3m])
{case="nh, sum decreased, no resets"} 0 0 0 0 0
{case="nhcb, no resets"} 0 0 0 0 0 0
{case="nhcb, some resets"} 0 1 1 2 1 1 0

clear

# Testing derive
# nh stands for native histogram
# nhcb stands for native histogram custom bucket
load 1m
metric{case="all same floats value=0"} 0 0 0 0 0 0 0 0
metric{case="all same floats value=3"} 3 3 3 3 3 3 3 3
metric{case="all NaN"} NaN NaN NaN NaN NaN NaN NaN NaN
metric{case="all Inf"} Inf Inf Inf Inf Inf Inf Inf Inf
metric{case="all floats 1"} 0 1 2 3 _ 5 6 _
metric{case="all floats 2"} 0 1 2 3 2 1 0 _
metric{case="mixed float, NaN and Inf"} Inf 1 2 3 NaN Inf Inf NaN 100 10 8 7 7 6
metric{case="all same histogram"} {{schema:3 sum:0 count:0 buckets:[1 2 1]}} {{schema:3 sum:0 count:0 buckets:[1 2 1]}}
metric{case="nh, count decreased"} {{schema:3 sum:0 count:2 buckets:[1 2 1]}} {{schema:3 sum:0 count:1 buckets:[1 2 1]}}
metric{case="nh, bucket decreased"} {{schema:3 sum:0 count:2 buckets:[1 2 1]}} {{schema:3 sum:0 count:1 buckets:[1 2 1]}} {{schema:3 sum:0 count:1 buckets:[1 0 1]}}
metric{case="nh, sum decreased"} {{schema:3 sum:3 count:2 buckets:[1 2 1]}} {{schema:3 sum:0 count:2 buckets:[1 2 1]}}
metric{case="nh, schema increased"} {{schema:3 sum:0 count:2 buckets:[1 2 1]}} {{schema:4 sum:0 count:2 buckets:[1 2 1]}}
metric{case="floats and nh"} 9 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}
metric{case="nhcb stay same"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}}
metric{case="nhcb"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[7 8]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[0 0]}}
metric{case="floats, nh and nhcb"} 0 1 2 3 2 1 0 _ {{schema:3 sum:0 count:2 buckets:[1 2 1]}} {{schema:3 sum:0 count:1 buckets:[1 2 1]}} {{schema:3 sum:0 count:1 buckets:[1 0 1]}} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[7 8]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[0 0]}}

eval range from 0 to 20m step 1m deriv(metric[3m])
{case="all Inf"} _ NaN NaN NaN NaN NaN NaN NaN NaN NaN
{case="all NaN"} _ NaN NaN NaN NaN NaN NaN NaN NaN NaN
{case="all floats 1"} _ 0.016666666666666666 0.016666666666666666 0.016666666666666666 0.016666666666666666 0.016666666666666666 0.016666666666666666 0.016666666666666666 0.016666666666666666
{case="all floats 2"} _ 0.016666666666666666 0.016666666666666666 0.016666666666666666 0.006666666666666667 -0.006666666666666667 -0.016666666666666666 -0.016666666666666666 -0.016666666666666666
{case="all same floats value=0"} _ 0 0 0 0 0 0 0 0 0
{case="all same floats value=3"} _ 0 0 0 0 0 0 0 0 0
{case="floats and nh"} _ -0.13333333333333333 -0.058333333333333334 -0.028333333333333332 0.016666666666666666 0.016666666666666666
{case="floats, nh and nhcb"} _ 0.016666666666666666 0.016666666666666666 0.016666666666666666 0.006666666666666667 -0.006666666666666667 -0.016666666666666666 -0.016666666666666666 -0.016666666666666666
{case="mixed float, NaN and Inf"} _ NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN -0.4683333333333333 -0.016666666666666666 -0.01 -0.008333333333333333 -0.016666666666666666

clear
10 changes: 4 additions & 6 deletions pkg/streamingpromql/testdata/upstream/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,12 @@ load 5m
eval instant at 50m rate(http_requests{group="canary", instance="1", job="app-server"}[50m])
{group="canary", instance="1", job="app-server"} 0.26666666666666666

# Unsupported by streaming engine.
# eval instant at 50m deriv(http_requests{group="canary", instance="1", job="app-server"}[50m])
# {group="canary", instance="1", job="app-server"} 0.26666666666666666
eval instant at 50m deriv(http_requests{group="canary", instance="1", job="app-server"}[50m])
{group="canary", instance="1", job="app-server"} 0.26666666666666666

# deriv should return correct result.
# Unsupported by streaming engine.
# eval instant at 50m deriv(testcounter_reset_middle[100m])
# {} 0.010606060606060607
eval instant at 50m deriv(testcounter_reset_middle[100m])
{} 0.010606060606060607

# predict_linear should return correct result.
# X/s = [ 0, 300, 600, 900,1200,1500,1800,2100,2400,2700,3000]
Expand Down

0 comments on commit 98b43bf

Please sign in to comment.