Skip to content

Commit

Permalink
MQE: Add support for avg aggregation (#9201)
Browse files Browse the repository at this point in the history
* MQE: Add support for avg aggregation

* Optimise avg to not copy histograms unless needed

* Reorder sum for consistency and readability

* Update CHANGELOG

* Add benchmarks

* Fix variable naming

* Add extra tests

* Split out collecting histograms+floats

* Shortcut if not data for series

* Only allocate incrementalMeans if needed

* Fix typo

* Fix typo
  • Loading branch information
jhesketh authored Sep 6, 2024
1 parent 2bc501a commit c328187
Show file tree
Hide file tree
Showing 9 changed files with 519 additions and 92 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* [CHANGE] Querier: allow wrapping errors with context errors only when the former actually correspond to `context.Canceled` and `context.DeadlineExceeded`. #9175
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9194 #9196 #9212
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9194 #9196 #9201 #9212
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
321 changes: 321 additions & 0 deletions pkg/streamingpromql/aggregations/avg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package aggregations

import (
"math"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/floats"
"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type AvgAggregationGroup struct {
floats []float64
floatMeans []float64
floatCompensatingMeans []float64 // Mean, or "compensating value" for Kahan summation.
incrementalMeans []bool // True after reverting to incremental calculation of the mean value.
floatPresent []bool
histograms []*histogram.FloatHistogram
histogramPointCount int

// Keeps track of how many series we have encountered thus far for the group at this point
// This is necessary to do per point (instead of just counting the groups) as a series may have
// stale or non-existent values that are not added towards the count.
groupSeriesCounts []float64
}

func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
if len(data.Floats) == 0 && len(data.Histograms) == 0 {
// Nothing to do
return nil
}

var err error

if g.groupSeriesCounts == nil {
g.groupSeriesCounts, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
if err != nil {
return err
}
g.groupSeriesCounts = g.groupSeriesCounts[:steps]
}

err = g.accumulateFloats(data, steps, start, interval, memoryConsumptionTracker)
if err != nil {
return err
}
err = g.accumulateHistograms(data, steps, start, interval, memoryConsumptionTracker, emitAnnotationFunc)
if err != nil {
return err
}

return nil
}

func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error {
var err error
if len(data.Floats) > 0 && g.floats == nil {
// First series with float values for this group, populate it.
g.floats, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatCompensatingMeans, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
if err != nil {
return err
}

g.floats = g.floats[:steps]
g.floatCompensatingMeans = g.floatCompensatingMeans[:steps]
g.floatPresent = g.floatPresent[:steps]
}

for _, p := range data.Floats {
idx := (p.T - start) / interval
g.groupSeriesCounts[idx]++
if !g.floatPresent[idx] {
// The first point is just taken as the value
g.floats[idx] = p.F
g.floatPresent[idx] = true
continue
}

if g.incrementalMeans == nil || !g.incrementalMeans[idx] {
newV, newC := floats.KahanSumInc(p.F, g.floats[idx], g.floatCompensatingMeans[idx])
if !math.IsInf(newV, 0) {
// The sum doesn't overflow, so we propagate it to the
// group struct and continue with the regular
// calculation of the mean value.
g.floats[idx], g.floatCompensatingMeans[idx] = newV, newC
continue
}
// If we are here, we know that the sum _would_ overflow. So
// instead of continuing to sum up, we revert to incremental
// calculation of the mean value from here on.
if g.floatMeans == nil {
g.floatMeans, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
if err != nil {
return err
}
g.floatMeans = g.floatMeans[:steps]
}
if g.incrementalMeans == nil {
// First time we are using an incremental mean. Track which samples will be incremental.
g.incrementalMeans, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
if err != nil {
return err
}
g.incrementalMeans = g.incrementalMeans[:steps]
}
g.incrementalMeans[idx] = true
g.floatMeans[idx] = g.floats[idx] / (g.groupSeriesCounts[idx] - 1)
g.floatCompensatingMeans[idx] /= g.groupSeriesCounts[idx] - 1
}
if math.IsInf(g.floatMeans[idx], 0) {
if math.IsInf(p.F, 0) && (g.floatMeans[idx] > 0) == (p.F > 0) {
// The `floatMean` and `s.F` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `floatMean` is correct
// already.
continue
}
if !math.IsInf(p.F, 0) && !math.IsNaN(p.F) {
// At this stage, the mean is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that mean
// value.
// This is required because our calculation below removes
// the mean value, which would look like Inf += x - Inf and
// end up as a NaN.
continue
}
}
currentMean := g.floatMeans[idx] + g.floatCompensatingMeans[idx]
g.floatMeans[idx], g.floatCompensatingMeans[idx] = floats.KahanSumInc(
p.F/g.groupSeriesCounts[idx]-currentMean/g.groupSeriesCounts[idx],
g.floatMeans[idx],
g.floatCompensatingMeans[idx],
)
}
return nil
}

func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
var err error
if len(data.Histograms) > 0 && g.histograms == nil {
// First series with histogram values for this group, populate it.
g.histograms, err = types.HistogramSlicePool.Get(steps, memoryConsumptionTracker)
if err != nil {
return err
}
g.histograms = g.histograms[:steps]
}

var lastUncopiedHistogram *histogram.FloatHistogram

for i, p := range data.Histograms {
idx := (p.T - start) / interval
g.groupSeriesCounts[idx]++

if g.histograms[idx] == invalidCombinationOfHistograms {
// We've already seen an invalid combination of histograms at this timestamp. Ignore this point.
continue
}

if g.histograms[idx] == nil {
if lastUncopiedHistogram == p.H {
// We've already used this histogram for a previous point due to lookback.
// Make a copy of it so we don't modify the other point.
g.histograms[idx] = p.H.Copy()
g.histogramPointCount++
continue
}
// This is the first time we have seen this histogram.
// It is safe to store it and modify it later without copying, as we'll make copies above if the same histogram is used for subsequent points.
g.histograms[idx] = p.H
g.histogramPointCount++
lastUncopiedHistogram = p.H
continue
}

// Check if the next point in data.Histograms is the same as the current point (due to lookback)
// If it is, create a copy before modifying it.
toAdd := p.H
if i+1 < len(data.Histograms) && data.Histograms[i+1].H == p.H {
toAdd = p.H.Copy()
}

_, err = toAdd.Sub(g.histograms[idx])
if err != nil {
// Unable to subtract histograms (likely due to invalid combination of histograms). Make sure we don't emit a sample at this timestamp.
g.histograms[idx] = invalidCombinationOfHistograms
g.histogramPointCount--

if err := functions.NativeHistogramErrorToAnnotation(err, emitAnnotationFunc); err != nil {
// Unknown error: we couldn't convert the error to an annotation. Give up.
return err
}
continue
}

toAdd.Div(g.groupSeriesCounts[idx])
_, err = g.histograms[idx].Add(toAdd)
if err != nil {
// Unable to add histograms together (likely due to invalid combination of histograms). Make sure we don't emit a sample at this timestamp.
g.histograms[idx] = invalidCombinationOfHistograms
g.histogramPointCount--

if err := functions.NativeHistogramErrorToAnnotation(err, emitAnnotationFunc); err != nil {
// Unknown error: we couldn't convert the error to an annotation. Give up.
return err
}
continue
}
}
return nil
}

// reconcileAndCountFloatPoints will return the number of points with a float present.
// It also takes the opportunity whilst looping through the floats to check if there
// is a conflicting Histogram present. If both are present, an empty vector should
// be returned. So this method removes the float+histogram where they conflict.
func (g *AvgAggregationGroup) reconcileAndCountFloatPoints() (int, bool) {
// It would be possible to calculate the number of points when constructing
// the series groups. However, it requires checking each point at each input
// series which is more costly than looping again here and just checking each
// point of the already grouped series.
// See: https://github.com/grafana/mimir/pull/8442
// We also take two different approaches here: One with extra checks if we
// have both Floats and Histograms present, and one without these checks
// so we don't have to do it at every point.
floatPointCount := 0
haveMixedFloatsAndHistograms := false
if len(g.floatPresent) > 0 && len(g.histograms) > 0 {
for idx, present := range g.floatPresent {
if present {
if g.histograms[idx] != nil {
// If a mix of histogram samples and float samples, the corresponding vector element is removed from the output vector entirely
// and a warning annotation is emitted.
g.floatPresent[idx] = false
g.histograms[idx] = nil
g.histogramPointCount--

haveMixedFloatsAndHistograms = true
} else {
floatPointCount++
}
}
}
} else {
for _, p := range g.floatPresent {
if p {
floatPointCount++
}
}
}
return floatPointCount, haveMixedFloatsAndHistograms
}

func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount, hasMixedData := g.reconcileAndCountFloatPoints()
var floatPoints []promql.FPoint
var err error

if floatPointCount > 0 {
floatPoints, err = types.FPointSlicePool.Get(floatPointCount, memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, hasMixedData, err
}

for i, havePoint := range g.floatPresent {
if havePoint {
t := start + int64(i)*interval
var f float64
if g.incrementalMeans != nil && g.incrementalMeans[i] {
f = g.floatMeans[i] + g.floatCompensatingMeans[i]
} else {
f = (g.floats[i] + g.floatCompensatingMeans[i]) / g.groupSeriesCounts[i]
}
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
}
}

var histogramPoints []promql.HPoint
if g.histogramPointCount > 0 {
histogramPoints, err = types.HPointSlicePool.Get(g.histogramPointCount, memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, hasMixedData, err
}

for i, h := range g.histograms {
if h != nil && h != invalidCombinationOfHistograms {
t := start + int64(i)*interval
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)})
}
}
}

types.Float64SlicePool.Put(g.floats, memoryConsumptionTracker)
types.Float64SlicePool.Put(g.floatMeans, memoryConsumptionTracker)
types.Float64SlicePool.Put(g.floatCompensatingMeans, memoryConsumptionTracker)
types.BoolSlicePool.Put(g.floatPresent, memoryConsumptionTracker)
types.HistogramSlicePool.Put(g.histograms, memoryConsumptionTracker)
types.BoolSlicePool.Put(g.incrementalMeans, memoryConsumptionTracker)
types.Float64SlicePool.Put(g.groupSeriesCounts, memoryConsumptionTracker)

return types.InstantVectorSeriesData{Floats: floatPoints, Histograms: histogramPoints}, hasMixedData, nil
}
1 change: 1 addition & 0 deletions pkg/streamingpromql/aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type AggregationGroup interface {
type AggregationGroupFactory func() AggregationGroup

var AggregationGroupFactories = map[parser.ItemType]AggregationGroupFactory{
parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} },
parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) },
parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) },
parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} },
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/aggregations/min_max.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (g *MinMaxAggregationGroup) minAccumulatePoint(idx int64, f float64) {

func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.floatValues == nil {
// Even if we have histograms, we have to populate the float slices, as we'll treat histograms as if they have value 0.
// Even if we only have histograms, we have to populate the float slices, as we'll treat histograms as if they have value 0.
// This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711

var err error
Expand Down
Loading

0 comments on commit c328187

Please sign in to comment.