Skip to content

Commit

Permalink
MQE: fix or where both sides have series with the same labels (#9874)
Browse files Browse the repository at this point in the history
* MQE: fix `or` where both sides have series with the same labels

* Add changelog entry

# Conflicts:
#	CHANGELOG.md

* Address PR feedback: move `operators.NewDeduplicateAndMerge` to `NewOrBinaryOperation` to make test stronger
  • Loading branch information
charleskorn authored Nov 12, 2024
1 parent 765c525 commit b873372
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* `cortex_alertmanager_alerts`
* `cortex_alertmanager_silences`
* [CHANGE] Distributor: Drop experimental `-distributor.direct-otlp-translation-enabled` flag, since direct OTLP translation is well tested at this point. #9647
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874
* [FEATURE] Distributor: Add support for `lz4` OTLP compression. #9763
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] gRPC: Support S2 compression. #9322
Expand Down
12 changes: 10 additions & 2 deletions pkg/streamingpromql/operators/binops/or_binary_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prometheus/prometheus/promql/parser/posrange"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/operators"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

Expand Down Expand Up @@ -46,15 +47,17 @@ func NewOrBinaryOperation(
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
timeRange types.QueryTimeRange,
expressionPosition posrange.PositionRange,
) *OrBinaryOperation {
return &OrBinaryOperation{
) types.InstantVectorOperator {
o := &OrBinaryOperation{
Left: left,
Right: right,
VectorMatching: vectorMatching,
MemoryConsumptionTracker: memoryConsumptionTracker,
timeRange: timeRange,
expressionPosition: expressionPosition,
}

return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
}

func (o *OrBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
Expand Down Expand Up @@ -144,6 +147,11 @@ func (o *OrBinaryOperation) computeSeriesOutputOrder(leftMetadata []types.Series
// We can return left series as soon as they're read, given they are returned unmodified (we just need to store sample presence
// information so we can filter the corresponding right side series later on).
//
// We deliberately ignore the case where series on both sides have the same labels: this makes the logic here much simpler, and
// we rely on DeduplicateAndMerge to merge series when required. This does come at a slight performance cost, so we could revisit this
// in the future if profiles show this is problematic. DeduplicateAndMerge should never produce a conflict, as the filtering done here
// should ensure there is only one value for each time step for each set of series with the same labels.
//
// A simpler version of this would be to just return all left side series first, then all right side series.
// However, if we do that, we will always need to hold presence bitmaps for every group in memory until we've read all left side series.
// By sorting the series so we return series from the right as soon as we've seen all of the corresponding series from the left, we
Expand Down
46 changes: 46 additions & 0 deletions pkg/streamingpromql/operators/binops/or_binary_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,52 @@ func TestOrBinaryOperationSorting(t *testing.T) {
labels.FromStrings("right", "6", "group", "3"),
},
},
// OrBinaryOperation does not handle the case where both sides contain series with identical labels, and
// instead relies on DeduplicateAndMerge to handle merging series with identical labels.
// Given NewOrBinaryOperation wraps the OrBinaryOperation in a DeduplicateAndMerge, we can still test this
// here.
"same series on both sides, one series": {
leftSeries: []labels.Labels{
labels.FromStrings("series", "1", "group", "1"),
},
rightSeries: []labels.Labels{
labels.FromStrings("series", "1", "group", "1"),
},

expectedOutputSeriesOrder: []labels.Labels{
labels.FromStrings("series", "1", "group", "1"),
},
},
"same series on both sides, multiple series in same order": {
leftSeries: []labels.Labels{
labels.FromStrings("series", "1", "group", "1"),
labels.FromStrings("series", "2", "group", "2"),
},
rightSeries: []labels.Labels{
labels.FromStrings("series", "1", "group", "1"),
labels.FromStrings("series", "2", "group", "2"),
},

expectedOutputSeriesOrder: []labels.Labels{
labels.FromStrings("series", "1", "group", "1"),
labels.FromStrings("series", "2", "group", "2"),
},
},
"same series on both sides, multiple series in different order": {
leftSeries: []labels.Labels{
labels.FromStrings("series", "1", "group", "1"),
labels.FromStrings("series", "2", "group", "2"),
},
rightSeries: []labels.Labels{
labels.FromStrings("series", "2", "group", "2"),
labels.FromStrings("series", "1", "group", "1"),
},

expectedOutputSeriesOrder: []labels.Labels{
labels.FromStrings("series", "2", "group", "2"),
labels.FromStrings("series", "1", "group", "1"),
},
},
}

for name, testCase := range testCases {
Expand Down
10 changes: 10 additions & 0 deletions pkg/streamingpromql/testdata/ours/binary_operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -998,3 +998,13 @@ eval range from 0 to 24m step 6m right_side or ignoring(pod, idx) left_side
left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _
left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _
left_side{env="test", cluster="food", pod="a"} 17 18 19 _

clear

load 6m
series_1 _ 1 2 _ 4 5 _ 7 _
series_2 9 _ _ 9 _ _ 9 _ 9

# Test the case where both sides of 'or' contain series with the same labels.
eval range from 0 to 48m step 6m min(series_1) or min(series_2)
{} 9 1 2 9 4 5 9 7 9

0 comments on commit b873372

Please sign in to comment.