From 2e7191b023a651f56a5bc8e6fbe4f02845319a53 Mon Sep 17 00:00:00 2001 From: Player256 Date: Tue, 1 Aug 2023 14:38:50 +0530 Subject: [PATCH 01/11] added new bench target 'bench-fast' Signed-off-by: Player256 --- Makefile | 9 +++++++++ engine/bench_test.go | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/Makefile b/Makefile index 9d6bce1b..6e30b185 100644 --- a/Makefile +++ b/Makefile @@ -111,6 +111,15 @@ bench-new: benchmarks benchmark: bench-old bench-new @benchstat benchmarks/old.out benchmarks/new.out +.PHONY : bench-fast +bench-fast: benchmarks + @echo "Benchmarking old engine" + @go test ./engine -bench 'BenchmarkRangeQuery/.*/old_engine' -run none -count 5 -args -fast | sed -u 's/\/old_engine//' > benchmarks/old.out + @go test ./engine -bench 'BenchmarkNativeHistograms/.*/old_engine' -run none -count 5 | sed -u 's/\/old_engine//' >> benchmarks/old.out + @echo "Benchmarking new engine" + @go test ./engine -bench 'BenchmarkRangeQuery/.*/new_engine' -run none -count 5 -args -fast | sed -u 's/\/new_engine//' > benchmarks/new.out + @go test ./engine -bench 'BenchmarkNativeHistograms/.*/new_engine' -run none -count 5 | sed -u 's/\/new_engine//' >> benchmarks/new.out + .PHONY: sync-parser sync-parser: @echo "Cleaning existing directories" diff --git a/engine/bench_test.go b/engine/bench_test.go index 01d8a9ea..6636a198 100644 --- a/engine/bench_test.go +++ b/engine/bench_test.go @@ -5,6 +5,7 @@ package engine_test import ( "context" + "flag" "fmt" "strconv" "strings" @@ -101,6 +102,10 @@ func BenchmarkSingleQuery(b *testing.B) { } func BenchmarkRangeQuery(b *testing.B) { + + sixHourDatasetOnly := flag.Bool("fast", false, "Only runs six hour dataset") + flag.Parse() + samplesPerHour := 60 * 2 sixHourDataset := setupStorage(b, 1000, 3, 6*samplesPerHour) defer sixHourDataset.Close() @@ -288,6 +293,13 @@ func BenchmarkRangeQuery(b *testing.B) { b.ResetTimer() b.ReportAllocs() + + if *sixHourDatasetOnly { + if tc.test != sixHourDataset { + return + } + } + for i := 0; i < b.N; i++ { qry, err := engine.NewRangeQuery(tc.test.Context(), tc.test.Queryable(), nil, tc.query, start, end, step) testutil.Ok(b, err) @@ -300,6 +312,12 @@ func BenchmarkRangeQuery(b *testing.B) { b.ResetTimer() b.ReportAllocs() + if *sixHourDatasetOnly { + if tc.test != sixHourDataset { + return + } + } + for i := 0; i < b.N; i++ { newResult := executeRangeQuery(b, tc.query, tc.test, start, end, step) testutil.Ok(b, newResult.Err) From 56c8d8bed73e9a31e8d1673176aba6cd432c69ab Mon Sep 17 00:00:00 2001 From: Player256 Date: Wed, 2 Aug 2023 11:21:14 +0530 Subject: [PATCH 02/11] changed -fast to -short Signed-off-by: Player256 --- Makefile | 4 ++-- engine/bench_test.go | 13 ++++--------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 6e30b185..ab28d386 100644 --- a/Makefile +++ b/Makefile @@ -114,10 +114,10 @@ benchmark: bench-old bench-new .PHONY : bench-fast bench-fast: benchmarks @echo "Benchmarking old engine" - @go test ./engine -bench 'BenchmarkRangeQuery/.*/old_engine' -run none -count 5 -args -fast | sed -u 's/\/old_engine//' > benchmarks/old.out + @go test ./engine -bench 'BenchmarkRangeQuery/.*/old_engine' -run none -count 5 -short | sed -u 's/\/old_engine//' > benchmarks/old.out @go test ./engine -bench 'BenchmarkNativeHistograms/.*/old_engine' -run none -count 5 | sed -u 's/\/old_engine//' >> benchmarks/old.out @echo "Benchmarking new engine" - @go test ./engine -bench 'BenchmarkRangeQuery/.*/new_engine' -run none -count 5 -args -fast | sed -u 's/\/new_engine//' > benchmarks/new.out + @go test ./engine -bench 'BenchmarkRangeQuery/.*/new_engine' -run none -count 5 -short | sed -u 's/\/new_engine//' > benchmarks/new.out @go test ./engine -bench 'BenchmarkNativeHistograms/.*/new_engine' -run none -count 5 | sed -u 's/\/new_engine//' >> benchmarks/new.out .PHONY: sync-parser diff --git a/engine/bench_test.go b/engine/bench_test.go index 6636a198..39839485 100644 --- a/engine/bench_test.go +++ b/engine/bench_test.go @@ -5,7 +5,6 @@ package engine_test import ( "context" - "flag" "fmt" "strconv" "strings" @@ -102,10 +101,6 @@ func BenchmarkSingleQuery(b *testing.B) { } func BenchmarkRangeQuery(b *testing.B) { - - sixHourDatasetOnly := flag.Bool("fast", false, "Only runs six hour dataset") - flag.Parse() - samplesPerHour := 60 * 2 sixHourDataset := setupStorage(b, 1000, 3, 6*samplesPerHour) defer sixHourDataset.Close() @@ -294,9 +289,9 @@ func BenchmarkRangeQuery(b *testing.B) { b.ResetTimer() b.ReportAllocs() - if *sixHourDatasetOnly { + if testing.Short() { if tc.test != sixHourDataset { - return + b.Skip() } } @@ -312,9 +307,9 @@ func BenchmarkRangeQuery(b *testing.B) { b.ResetTimer() b.ReportAllocs() - if *sixHourDatasetOnly { + if testing.Short() { if tc.test != sixHourDataset { - return + b.Skip() } } From 2ac3b007b35deab57631be9b702480c0079fb3bc Mon Sep 17 00:00:00 2001 From: Player256 Date: Wed, 2 Aug 2023 12:08:57 +0530 Subject: [PATCH 03/11] benchstat added in bench-fast target Signed-off-by: Player256 --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index ab28d386..f8c930d3 100644 --- a/Makefile +++ b/Makefile @@ -119,6 +119,7 @@ bench-fast: benchmarks @echo "Benchmarking new engine" @go test ./engine -bench 'BenchmarkRangeQuery/.*/new_engine' -run none -count 5 -short | sed -u 's/\/new_engine//' > benchmarks/new.out @go test ./engine -bench 'BenchmarkNativeHistograms/.*/new_engine' -run none -count 5 | sed -u 's/\/new_engine//' >> benchmarks/new.out + @benchstat benchmarks/old.out benchmarks/new.out .PHONY: sync-parser sync-parser: From a4bd15039cdf930eeedd3475e04122eadc730cea Mon Sep 17 00:00:00 2001 From: Player256 Date: Wed, 2 Aug 2023 12:14:52 +0530 Subject: [PATCH 04/11] b Signed-off-by: Player256 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index f8c930d3..81d5ea1a 100644 --- a/Makefile +++ b/Makefile @@ -119,7 +119,7 @@ bench-fast: benchmarks @echo "Benchmarking new engine" @go test ./engine -bench 'BenchmarkRangeQuery/.*/new_engine' -run none -count 5 -short | sed -u 's/\/new_engine//' > benchmarks/new.out @go test ./engine -bench 'BenchmarkNativeHistograms/.*/new_engine' -run none -count 5 | sed -u 's/\/new_engine//' >> benchmarks/new.out - @benchstat benchmarks/old.out benchmarks/new.out + .PHONY: sync-parser sync-parser: From 4ef001a339670a605b715906aaceedaa6a635f2d Mon Sep 17 00:00:00 2001 From: Player256 Date: Wed, 2 Aug 2023 12:15:25 +0530 Subject: [PATCH 05/11] benchstat added in bench-fast target Signed-off-by: Player256 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 81d5ea1a..f8c930d3 100644 --- a/Makefile +++ b/Makefile @@ -119,7 +119,7 @@ bench-fast: benchmarks @echo "Benchmarking new engine" @go test ./engine -bench 'BenchmarkRangeQuery/.*/new_engine' -run none -count 5 -short | sed -u 's/\/new_engine//' > benchmarks/new.out @go test ./engine -bench 'BenchmarkNativeHistograms/.*/new_engine' -run none -count 5 | sed -u 's/\/new_engine//' >> benchmarks/new.out - + @benchstat benchmarks/old.out benchmarks/new.out .PHONY: sync-parser sync-parser: From 53f90202bb42ac42c989c5fd908c28b4479f2d40 Mon Sep 17 00:00:00 2001 From: Nishchay Veer <99465982+nishchay-veer@users.noreply.github.com> Date: Fri, 4 Aug 2023 17:00:22 +0530 Subject: [PATCH 06/11] Time consumed by each operator (#289) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * CPU time consumed by each operator Signed-off-by: nishchay-veer * Time consumed by operator when calling Next the TimingOperator struct wraps another VectorOperator and includes a startTime field to track the start time of each Next method call. Inside the Next method implementation of TimingOperator, it captures the current time using time.Now() and stores it in the startTime field. Then, it calls the Next method of the wrapped operator. Signed-off-by: nishchay-veer * Generalise timing operator for scalar and aggregate Signed-off-by: nishchay-veer * Added recordTime and duration field in numberLiteralSelector added a boolean flag recordTime to enable or disable recording of time taken.If it is enabled, then I have added the code to record the time taken to the duration field. Also modified the constantMetric to include a new label called . Signed-off-by: nishchay-veer * embedding instead of wrapping operator inside of operator The changes are done for capturing observability information for operators in a clean and modular way. By using embedding instead of wrapping, it allows for more granular data capture without creating cross-references between operators. Signed-off-by: nishchay-veer * Added Analyze method in ExplainableQuery interface The Analyze method returns an AnalyzeOutputNode, which represents the analysis of the query execution. This analysis can include performance metrics, such as CPU time, memory usage, or other relevant statistics. The Analyze method allows for capturing observability information during query execution to assess performance Signed-off-by: nishchay-veer * Added analyze function for local testing Signed-off-by: nishchay-veer * Include ObservableVectorOperator in building the operator tree Signed-off-by: nishchay-veer * Minor changes for type assertion Signed-off-by: nishchay-veer * Type casting into model.ObservableVectorOperator Signed-off-by: nishchay-veer * Initialised TimingInformation to avoid nil case Signed-off-by: nishchay-veer * Next call when Analyze query Signed-off-by: nishchay-veer * Fixed some checks failing Signed-off-by: nishchay-veer * Embed struct Signed-off-by: nishchay-veer * Test code for Query Analyze Signed-off-by: nishchay-veer * Embed model.OperatorTelemetry Signed-off-by: nishchay-veer * Assertion function for non-zero CPU Time Signed-off-by: nishchay-veer * Adding model.OperatorTelemetry to each operator Signed-off-by: nishchay-veer * Capturing CPUTime of each operator Signed-off-by: nishchay-veer * engine: actually execute the query Signed-off-by: Giedrius Statkevičius * Find time consumed by each operator Signed-off-by: nishchay-veer * Removed unnecessary type casting Signed-off-by: nishchay-veer * Added analyze function for local testing Signed-off-by: nishchay-veer * Initialised TimingInformation to avoid nil case Signed-off-by: nishchay-veer * Next call when Analyze query Signed-off-by: nishchay-veer * Adding model.OperatorTelemetry to each operator Signed-off-by: nishchay-veer * Capturing CPUTime of each operator Signed-off-by: nishchay-veer * Find time consumed by each operator Signed-off-by: nishchay-veer * Removed unnecessary type casting Signed-off-by: nishchay-veer * Fixed few minor nits Signed-off-by: nishchay-veer * Added model.OperatorTelemetry to noArgFunOperator Signed-off-by: nishchay-veer * Removing type checks for model.TimingInformation Signed-off-by: nishchay-veer * Removed type checks Signed-off-by: nishchay-veer * Removed type checks in TestQueryAnalyze Signed-off-by: nishchay-veer * Fixed few minors issues Signed-off-by: nishchay-veer * modified TestQueryAnalyze to check for child operators Signed-off-by: nishchay-veer * linters check passed Signed-off-by: nishchay-veer * Added operatorTelemetry to each operator Signed-off-by: nishchay-veer * Changed TimingInformation to TrackedTelemetry for recording other telemetry information Signed-off-by: nishchay-veer * Remove nil case in slice of operators Signed-off-by: nishchay-veer * removing unnecessary typecasts Signed-off-by: nishchay-veer * Set name of operators Signed-off-by: nishchay-veer * Removed Explain overheads from AnalyzeOutputNode Signed-off-by: nishchay-veer * added setName() method to Analyze Signed-off-by: nishchay-veer * fixed engine_test Signed-off-by: nishchay-veer * Removed name from NoopTelemetry Signed-off-by: nishchay-veer * Rename CPU Time -> Execution time Signed-off-by: Giedrius Statkevičius * engine: clean up last CPUTime reference Signed-off-by: Giedrius Statkevičius * execution: rename more fields Signed-off-by: Giedrius Statkevičius --------- Signed-off-by: nishchay-veer Signed-off-by: Giedrius Statkevičius Signed-off-by: Nishchay Veer <99465982+nishchay-veer@users.noreply.github.com> Co-authored-by: Giedrius Statkevičius --- engine/engine.go | 89 ++++++++++++++++++---- engine/engine_test.go | 75 ++++++++++++++++++ execution/aggregate/hashaggregate.go | 21 ++++- execution/aggregate/khashaggregate.go | 23 +++++- execution/binary/scalar.go | 28 ++++++- execution/binary/vector.go | 24 +++++- execution/exchange/coalesce.go | 27 ++++++- execution/exchange/concurrent.go | 17 ++++- execution/exchange/dedup.go | 17 ++++- execution/execution.go | 17 +++-- execution/function/absent.go | 13 ++++ execution/function/histogram.go | 17 ++++- execution/function/noarg.go | 10 +++ execution/function/operator.go | 68 +++++++++++++---- execution/function/relabel.go | 16 +++- execution/function/scalar.go | 14 ++++ execution/model/operator.go | 49 ++++++++++++ execution/remote/operator.go | 15 +++- execution/scan/literal_selector.go | 22 +++++- execution/scan/matrix_selector.go | 20 ++++- execution/scan/vector_selector.go | 16 +++- execution/step_invariant/step_invariant.go | 17 +++++ execution/unary/unary.go | 17 ++++- query/options.go | 3 +- 24 files changed, 570 insertions(+), 65 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 9eb48e54..f1342e18 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -5,10 +5,12 @@ package engine import ( "context" + "io" "math" "runtime" "sort" + "strconv" "time" "github.com/cespare/xxhash/v2" @@ -71,6 +73,9 @@ type Opts struct { // FallbackEngine Engine v1.QueryEngine + + // EnableAnalysis enables query analysis. + EnableAnalysis bool } func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer { @@ -211,6 +216,7 @@ func New(opts Opts) *compatibilityEngine { timeout: opts.Timeout, metrics: metrics, extLookbackDelta: opts.ExtLookbackDelta, + enableAnalysis: opts.EnableAnalysis, } } @@ -227,6 +233,7 @@ type compatibilityEngine struct { metrics *engineMetrics extLookbackDelta time.Duration + enableAnalysis bool } func (e *compatibilityEngine) SetQueryLogger(l promql.QueryLogger) { @@ -260,7 +267,7 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que }) lplan = lplan.Optimize(e.logicalOptimizers) - exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta) + exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta, e.enableAnalysis) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewInstantQuery(ctx, q, opts, qs, ts) @@ -275,12 +282,13 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que } return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, - engine: e, - expr: expr, - ts: ts, - t: InstantQuery, - resultSort: resultSort, + Query: &Query{exec: exec, opts: opts}, + engine: e, + expr: expr, + ts: ts, + t: InstantQuery, + resultSort: resultSort, + debugWriter: e.debugWriter, }, nil } @@ -311,7 +319,7 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query }) lplan = lplan.Optimize(e.logicalOptimizers) - exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta) + exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta, e.enableAnalysis) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step) @@ -326,10 +334,11 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query } return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, - engine: e, - expr: expr, - t: RangeQuery, + Query: &Query{exec: exec, opts: opts}, + engine: e, + expr: expr, + t: RangeQuery, + debugWriter: e.debugWriter, }, nil } @@ -337,7 +346,12 @@ type ExplainableQuery interface { promql.Query Explain() *ExplainOutputNode - Profile() + Analyze() *AnalyzeOutputNode +} + +type AnalyzeOutputNode struct { + OperatorTelemetry model.OperatorTelemetry `json:"telemetry,omitempty"` + Children []AnalyzeOutputNode `json:"children,omitempty"` } type ExplainOutputNode struct { @@ -358,8 +372,25 @@ func (q *Query) Explain() *ExplainOutputNode { return explainVector(q.exec) } -func (q *Query) Profile() { - // TODO(bwplotka): Return profile. +func (q *Query) Analyze() *AnalyzeOutputNode { + if observableRoot, ok := q.exec.(model.ObservableVectorOperator); ok { + return analyzeVector(observableRoot) + } + return nil +} + +func analyzeVector(obsv model.ObservableVectorOperator) *AnalyzeOutputNode { + telemetry, obsVectors := obsv.Analyze() + + var children []AnalyzeOutputNode + for _, vector := range obsVectors { + children = append(children, *analyzeVector(vector)) + } + + return &AnalyzeOutputNode{ + OperatorTelemetry: telemetry, + Children: children, + } } func explainVector(v model.VectorOperator) *ExplainOutputNode { @@ -479,12 +510,17 @@ type compatibilityQuery struct { resultSort resultSorter cancel context.CancelFunc + + debugWriter io.Writer } func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { // Handle case with strings early on as this does not need us to process samples. switch e := q.expr.(type) { case *parser.StringLiteral: + if q.debugWriter != nil { + analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), " ", "") + } return &promql.Result{Value: promql.String{V: e.Val, T: q.ts.UnixMilli()}} } ret = &promql.Result{ @@ -567,6 +603,9 @@ loop: } sort.Sort(resultMatrix) ret.Value = resultMatrix + if q.debugWriter != nil { + analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "") + } return ret } @@ -688,6 +727,26 @@ func recoverEngine(logger log.Logger, expr parser.Expr, errp *error) { } } +func analyze(w io.Writer, o model.ObservableVectorOperator, indent, indentNext string) { + telemetry, next := o.Analyze() + _, _ = w.Write([]byte(indent)) + _, _ = w.Write([]byte("Operator Time :")) + _, _ = w.Write([]byte(strconv.FormatInt(int64(telemetry.ExecutionTimeTaken()), 10))) + if len(next) == 0 { + _, _ = w.Write([]byte("\n")) + return + } + _, _ = w.Write([]byte(":\n")) + + for i, n := range next { + if i == len(next)-1 { + analyze(w, n, indentNext+"└──", indentNext+" ") + } else { + analyze(w, n, indentNext+"└──", indentNext+" ") + } + } +} + func explain(w io.Writer, o model.VectorOperator, indent, indentNext string) { me, next := o.Explain() _, _ = w.Write([]byte(indent)) diff --git a/engine/engine_test.go b/engine/engine_test.go index 5a22badb..ed35974a 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -21,6 +21,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/thanos-io/promql-engine/engine" + "github.com/thanos-io/promql-engine/logicalplan" "github.com/efficientgo/core/testutil" @@ -112,6 +113,80 @@ func TestQueryExplain(t *testing.T) { } } +func assertExecutionTimeNonZero(t *testing.T, got *engine.AnalyzeOutputNode) bool { + if got != nil { + if got.OperatorTelemetry.ExecutionTimeTaken() <= 0 { + t.Errorf("expected non-zero ExecutionTime for Operator, got %s ", got.OperatorTelemetry.ExecutionTimeTaken()) + return false + } + for i := range got.Children { + child := got.Children[i] + return got.OperatorTelemetry.ExecutionTimeTaken() > 0 && assertExecutionTimeNonZero(t, &child) + } + } + return true +} + +func TestQueryAnalyze(t *testing.T) { + opts := promql.EngineOpts{Timeout: 1 * time.Hour} + series := storage.MockSeries( + []int64{240, 270, 300, 600, 630, 660}, + []float64{1, 2, 3, 4, 5, 6}, + []string{labels.MetricName, "foo"}, + ) + + start := time.Unix(0, 0) + end := time.Unix(1000, 0) + + for _, tc := range []struct { + query string + }{ + { + query: "foo", + }, + { + query: "time()", + }, + { + query: "sum(foo) by (job)", + }, + { + query: "rate(http_requests_total[30s]) > bool 0", + }, + } { + { + t.Run(tc.query, func(t *testing.T) { + ng := engine.New(engine.Opts{EngineOpts: opts, EnableAnalysis: true}) + ctx := context.Background() + + var ( + query promql.Query + err error + ) + + query, err = ng.NewInstantQuery(ctx, storageWithSeries(series), nil, tc.query, start) + testutil.Ok(t, err) + + queryResults := query.Exec(context.Background()) + testutil.Ok(t, queryResults.Err) + + explainableQuery := query.(engine.ExplainableQuery) + + testutil.Assert(t, assertExecutionTimeNonZero(t, explainableQuery.Analyze())) + + query, err = ng.NewRangeQuery(ctx, storageWithSeries(series), nil, tc.query, start, end, 30*time.Second) + testutil.Ok(t, err) + + queryResults = query.Exec(context.Background()) + testutil.Ok(t, queryResults.Err) + + explainableQuery = query.(engine.ExplainableQuery) + testutil.Assert(t, assertExecutionTimeNonZero(t, explainableQuery.Analyze())) + }) + } + } +} + func TestVectorSelectorWithGaps(t *testing.T) { opts := promql.EngineOpts{ Timeout: 1 * time.Hour, diff --git a/execution/aggregate/hashaggregate.go b/execution/aggregate/hashaggregate.go index af38025b..9d207dd7 100644 --- a/execution/aggregate/hashaggregate.go +++ b/execution/aggregate/hashaggregate.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "sync" + "time" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" @@ -37,6 +38,7 @@ type aggregate struct { newAccumulator newAccumulatorFunc stepsBatch int workers worker.Group + model.OperatorTelemetry } func NewHashAggregate( @@ -68,18 +70,29 @@ func NewHashAggregate( newAccumulator: newAccumulator, } a.workers = worker.NewGroup(stepsBatch, a.workerTask) + a.OperatorTelemetry = &model.TrackedTelemetry{} return a, nil } +func (a *aggregate) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + a.SetName("[*aggregate]") + var ops []model.ObservableVectorOperator + if obsnextParamOp, ok := a.paramOp.(model.ObservableVectorOperator); ok { + ops = append(ops, obsnextParamOp) + } + if obsnext, ok := a.next.(model.ObservableVectorOperator); ok { + ops = append(ops, obsnext) + } + return a, ops +} + func (a *aggregate) Explain() (me string, next []model.VectorOperator) { var ops []model.VectorOperator - if a.paramOp != nil { ops = append(ops, a.paramOp) } ops = append(ops, a.next) - if a.by { return fmt.Sprintf("[*aggregate] %v by (%v)", a.aggregation.String(), a.labels), ops } @@ -106,7 +119,7 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { return nil, ctx.Err() default: } - + start := time.Now() in, err := a.next.Next(ctx) if err != nil { return nil, err @@ -151,7 +164,7 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { result = append(result, output) a.next.GetPool().PutStepVector(vector) } - + a.AddExecutionTimeTaken(time.Since(start)) return result, nil } diff --git a/execution/aggregate/khashaggregate.go b/execution/aggregate/khashaggregate.go index 80dd9878..d1bada91 100644 --- a/execution/aggregate/khashaggregate.go +++ b/execution/aggregate/khashaggregate.go @@ -10,6 +10,7 @@ import ( "math" "sort" "sync" + "time" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" @@ -17,6 +18,7 @@ import ( "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) type kAggregate struct { @@ -36,6 +38,7 @@ type kAggregate struct { inputToHeap []*samplesHeap heaps []*samplesHeap compare func(float64, float64) bool + model.OperatorTelemetry } func NewKHashAggregate( @@ -46,6 +49,7 @@ func NewKHashAggregate( by bool, labels []string, stepsBatch int, + opts *query.Options, ) (model.VectorOperator, error) { var compare func(float64, float64) bool @@ -72,7 +76,10 @@ func NewKHashAggregate( compare: compare, params: make([]float64, stepsBatch), } - + a.OperatorTelemetry = &model.NoopTelemetry{} + if opts.EnableAnalysis { + a.OperatorTelemetry = &model.TrackedTelemetry{} + } return a, nil } @@ -81,6 +88,7 @@ func (a *kAggregate) Next(ctx context.Context) ([]model.StepVector, error) { if err != nil { return nil, err } + start := time.Now() args, err := a.paramOp.Next(ctx) if err != nil { return nil, err @@ -120,6 +128,7 @@ func (a *kAggregate) Next(ctx context.Context) ([]model.StepVector, error) { a.next.GetPool().PutStepVector(vector) } a.next.GetPool().PutVectors(in) + a.AddExecutionTimeTaken(time.Since(start)) return result, nil } @@ -138,6 +147,18 @@ func (a *kAggregate) GetPool() *model.VectorPool { return a.vectorPool } +func (a *kAggregate) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + a.SetName("[*kaggregate]") + next := make([]model.ObservableVectorOperator, 0, 2) + if obsnextParamOp, ok := a.paramOp.(model.ObservableVectorOperator); ok { + next = append(next, obsnextParamOp) + } + if obsnext, ok := a.next.(model.ObservableVectorOperator); ok { + next = append(next, obsnext) + } + return a, next +} + func (a *kAggregate) Explain() (me string, next []model.VectorOperator) { if a.by { return fmt.Sprintf("[*kaggregate] %v by (%v)", a.aggregation.String(), a.labels), []model.VectorOperator{a.paramOp, a.next} diff --git a/execution/binary/scalar.go b/execution/binary/scalar.go index 39d2648b..0c1ebe4d 100644 --- a/execution/binary/scalar.go +++ b/execution/binary/scalar.go @@ -8,12 +8,14 @@ import ( "fmt" "math" "sync" + "time" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/function" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) type ScalarSide int @@ -43,6 +45,7 @@ type scalarOperator struct { // Keep the result if both sides are scalars. bothScalars bool + model.OperatorTelemetry } func NewScalar( @@ -52,6 +55,7 @@ func NewScalar( op parser.ItemType, scalarSide ScalarSide, returnBool bool, + opts *query.Options, ) (*scalarOperator, error) { binaryOperation, err := newOperation(op, scalarSide != ScalarSideBoth) if err != nil { @@ -66,7 +70,7 @@ func NewScalar( operandValIdx = 1 } - return &scalarOperator{ + o := &scalarOperator{ pool: pool, next: next, scalar: scalar, @@ -77,7 +81,25 @@ func NewScalar( operandValIdx: operandValIdx, returnBool: returnBool, bothScalars: scalarSide == ScalarSideBoth, - }, nil + } + o.OperatorTelemetry = &model.NoopTelemetry{} + if opts.EnableAnalysis { + o.OperatorTelemetry = &model.TrackedTelemetry{} + } + return o, nil + +} + +func (o *scalarOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*scalarOperator]") + next := make([]model.ObservableVectorOperator, 0, 2) + if obsnext, ok := o.next.(model.ObservableVectorOperator); ok { + next = append(next, obsnext) + } + if obsnextScalar, ok := o.scalar.(model.ObservableVectorOperator); ok { + next = append(next, obsnextScalar) + } + return o, next } func (o *scalarOperator) Explain() (me string, next []model.VectorOperator) { @@ -99,6 +121,7 @@ func (o *scalarOperator) Next(ctx context.Context) ([]model.StepVector, error) { return nil, ctx.Err() default: } + start := time.Now() in, err := o.next.Next(ctx) if err != nil { @@ -158,6 +181,7 @@ func (o *scalarOperator) Next(ctx context.Context) ([]model.StepVector, error) { o.next.GetPool().PutVectors(in) o.scalar.GetPool().PutVectors(scalarIn) + o.AddExecutionTimeTaken(time.Since(start)) return out, nil } diff --git a/execution/binary/vector.go b/execution/binary/vector.go index 28dab1a8..76b34413 100644 --- a/execution/binary/vector.go +++ b/execution/binary/vector.go @@ -14,6 +14,7 @@ import ( "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) // vectorOperator evaluates an expression between two step vectors. @@ -42,6 +43,7 @@ type vectorOperator struct { // If true then 1/0 needs to be returned instead of the value. returnBool bool + model.OperatorTelemetry } func NewVectorOperator( @@ -51,6 +53,7 @@ func NewVectorOperator( matching *parser.VectorMatching, operation parser.ItemType, returnBool bool, + opts *query.Options, ) (model.VectorOperator, error) { op, err := newOperation(operation, true) if err != nil { @@ -63,7 +66,7 @@ func NewVectorOperator( copy(groupings, matching.MatchingLabels) slices.Sort(groupings) - return &vectorOperator{ + o := &vectorOperator{ pool: pool, lhs: lhs, rhs: rhs, @@ -72,7 +75,24 @@ func NewVectorOperator( operation: op, opType: operation, returnBool: returnBool, - }, nil + } + o.OperatorTelemetry = &model.NoopTelemetry{} + if opts.EnableAnalysis { + o.OperatorTelemetry = &model.TrackedTelemetry{} + } + return o, nil +} + +func (o *vectorOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*vectorOperator]") + next := make([]model.ObservableVectorOperator, 0, 2) + if obsnextParamOp, ok := o.lhs.(model.ObservableVectorOperator); ok { + next = append(next, obsnextParamOp) + } + if obsnext, ok := o.rhs.(model.ObservableVectorOperator); ok { + next = append(next, obsnext) + } + return o, next } func (o *vectorOperator) Explain() (me string, next []model.VectorOperator) { diff --git a/execution/exchange/coalesce.go b/execution/exchange/coalesce.go index 81705130..cc8ce857 100644 --- a/execution/exchange/coalesce.go +++ b/execution/exchange/coalesce.go @@ -7,11 +7,13 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/model" + "github.com/thanos-io/promql-engine/query" ) type errorChan chan error @@ -42,18 +44,36 @@ type coalesce struct { inVectors [][]model.StepVector // sampleOffsets holds per-operator offsets needed to map an input sample ID to an output sample ID. sampleOffsets []uint64 + model.OperatorTelemetry } -func NewCoalesce(pool *model.VectorPool, operators ...model.VectorOperator) model.VectorOperator { - return &coalesce{ +func NewCoalesce(pool *model.VectorPool, opts *query.Options, operators ...model.VectorOperator) model.VectorOperator { + c := &coalesce{ pool: pool, sampleOffsets: make([]uint64, len(operators)), operators: operators, inVectors: make([][]model.StepVector, len(operators)), } + c.OperatorTelemetry = &model.NoopTelemetry{} + if opts.EnableAnalysis { + c.OperatorTelemetry = &model.TrackedTelemetry{} + } + return c +} + +func (c *coalesce) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + c.SetName("[*coalesce]") + obsOperators := make([]model.ObservableVectorOperator, 0, len(c.operators)) + for _, operator := range c.operators { + if obsOperator, ok := operator.(model.ObservableVectorOperator); ok { + obsOperators = append(obsOperators, obsOperator) + } + } + return c, obsOperators } func (c *coalesce) Explain() (me string, next []model.VectorOperator) { + return "[*coalesce]", c.operators } @@ -76,7 +96,7 @@ func (c *coalesce) Next(ctx context.Context) ([]model.StepVector, error) { return nil, ctx.Err() default: } - + start := time.Now() var err error c.once.Do(func() { err = c.loadSeries(ctx) }) if err != nil { @@ -131,6 +151,7 @@ func (c *coalesce) Next(ctx context.Context) ([]model.StepVector, error) { c.inVectors[opIdx] = nil c.operators[opIdx].GetPool().PutVectors(vectors) } + c.AddExecutionTimeTaken(time.Since(start)) if out == nil { return nil, nil diff --git a/execution/exchange/concurrent.go b/execution/exchange/concurrent.go index e2f46fa7..35640582 100644 --- a/execution/exchange/concurrent.go +++ b/execution/exchange/concurrent.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/thanos-io/promql-engine/execution/model" @@ -23,14 +24,26 @@ type concurrencyOperator struct { next model.VectorOperator buffer chan maybeStepVector bufferSize int + model.OperatorTelemetry } func NewConcurrent(next model.VectorOperator, bufferSize int) model.VectorOperator { - return &concurrencyOperator{ + c := &concurrencyOperator{ next: next, buffer: make(chan maybeStepVector, bufferSize), bufferSize: bufferSize, } + c.OperatorTelemetry = &model.TrackedTelemetry{} + return c +} + +func (c *concurrencyOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + c.SetName(("[*concurrencyOperator]")) + next := make([]model.ObservableVectorOperator, 0, 1) + if obsnext, ok := c.next.(model.ObservableVectorOperator); ok { + next = append(next, obsnext) + } + return c, next } func (c *concurrencyOperator) Explain() (me string, next []model.VectorOperator) { @@ -52,6 +65,7 @@ func (c *concurrencyOperator) Next(ctx context.Context) ([]model.StepVector, err default: } + start := time.Now() c.once.Do(func() { go c.pull(ctx) go c.drainBufferOnCancel(ctx) @@ -64,6 +78,7 @@ func (c *concurrencyOperator) Next(ctx context.Context) ([]model.StepVector, err if r.err != nil { return nil, r.err } + c.AddExecutionTimeTaken(time.Since(start)) return r.stepVector, nil } diff --git a/execution/exchange/dedup.go b/execution/exchange/dedup.go index 86e428ab..7bcbdf37 100644 --- a/execution/exchange/dedup.go +++ b/execution/exchange/dedup.go @@ -6,6 +6,7 @@ package exchange import ( "context" "sync" + "time" "github.com/cespare/xxhash/v2" "github.com/prometheus/prometheus/model/histogram" @@ -37,13 +38,25 @@ type dedupOperator struct { // outputIndex is a slice that is used as an index from input sample ID to output sample ID. outputIndex []uint64 dedupCache dedupCache + model.OperatorTelemetry } func NewDedupOperator(pool *model.VectorPool, next model.VectorOperator) model.VectorOperator { - return &dedupOperator{ + d := &dedupOperator{ next: next, pool: pool, } + d.OperatorTelemetry = &model.TrackedTelemetry{} + return d +} + +func (d *dedupOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + d.SetName("[*dedup]") + next := make([]model.ObservableVectorOperator, 0, 1) + if obsnext, ok := d.next.(model.ObservableVectorOperator); ok { + next = append(next, obsnext) + } + return d, next } func (d *dedupOperator) Next(ctx context.Context) ([]model.StepVector, error) { @@ -52,6 +65,7 @@ func (d *dedupOperator) Next(ctx context.Context) ([]model.StepVector, error) { if err != nil { return nil, err } + start := time.Now() in, err := d.next.Next(ctx) if err != nil { @@ -91,6 +105,7 @@ func (d *dedupOperator) Next(ctx context.Context) ([]model.StepVector, error) { } result = append(result, out) } + d.AddExecutionTimeTaken(time.Since(start)) return result, nil } diff --git a/execution/execution.go b/execution/execution.go index a8c9802a..8dd84bfb 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -18,6 +18,7 @@ package execution import ( "context" + "runtime" "sort" "time" @@ -48,7 +49,7 @@ const stepsBatch = 10 // New creates new physical query execution for a given query expression which represents logical plan. // TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan. -func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration) (model.VectorOperator, error) { +func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration, enableAnalysis bool) (model.VectorOperator, error) { opts := &query.Options{ Context: ctx, Start: mint, @@ -57,6 +58,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min LookbackDelta: lookbackDelta, StepsBatch: stepsBatch, ExtLookbackDelta: extLookbackDelta, + EnableAnalysis: enableAnalysis, } selectorPool := engstore.NewSelectorPool(queryable) hints := storage.SelectHints{ @@ -65,6 +67,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min // TODO(fpetkovski): Adjust the step for sub-queries once they are supported. Step: step.Milliseconds(), } + return newOperator(expr, selectorPool, opts, hints) } @@ -118,7 +121,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O } if e.Op == parser.TOPK || e.Op == parser.BOTTOMK { - next, err = aggregate.NewKHashAggregate(model.NewVectorPool(stepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, stepsBatch) + next, err = aggregate.NewKHashAggregate(model.NewVectorPool(stepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, stepsBatch, opts) } else { next, err = aggregate.NewHashAggregate(model.NewVectorPool(stepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, stepsBatch) } @@ -183,7 +186,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O } operators[i] = operator } - coalesce := exchange.NewCoalesce(model.NewVectorPool(stepsBatch), operators...) + coalesce := exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...) dedup := exchange.NewDedupOperator(model.NewVectorPool(stepsBatch), coalesce) return exchange.NewConcurrent(dedup, 2), nil @@ -254,7 +257,7 @@ func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *e operators = append(operators, exchange.NewConcurrent(operator, 2)) } - return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), operators...), nil + return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...), nil } func newInstantVectorFunction(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { @@ -287,7 +290,7 @@ func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Opti operators = append(operators, operator) } - return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), operators...), nil + return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...), nil } func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { @@ -299,7 +302,7 @@ func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.Select if err != nil { return nil, err } - return binary.NewVectorOperator(model.NewVectorPool(stepsBatch), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool) + return binary.NewVectorOperator(model.NewVectorPool(stepsBatch), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool, opts) } func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { @@ -320,7 +323,7 @@ func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.Select scalarSide = binary.ScalarSideLeft } - return binary.NewScalar(model.NewVectorPoolWithSize(stepsBatch, 1), lhs, rhs, e.Op, scalarSide, e.ReturnBool) + return binary.NewScalar(model.NewVectorPoolWithSize(stepsBatch, 1), lhs, rhs, e.Op, scalarSide, e.ReturnBool, opts) } // Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791. diff --git a/execution/function/absent.go b/execution/function/absent.go index 695d17f0..59c55e72 100644 --- a/execution/function/absent.go +++ b/execution/function/absent.go @@ -6,6 +6,7 @@ package function import ( "context" "sync" + "time" "github.com/prometheus/prometheus/model/labels" @@ -19,6 +20,16 @@ type absentOperator struct { series []labels.Labels pool *model.VectorPool next model.VectorOperator + model.OperatorTelemetry +} + +func (o *absentOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*absentOperator]") + next := make([]model.ObservableVectorOperator, 0, 1) + if obsnext, ok := o.next.(model.ObservableVectorOperator); ok { + next = append(next, obsnext) + } + return o, next } func (o *absentOperator) Explain() (me string, next []model.VectorOperator) { @@ -74,6 +85,7 @@ func (o *absentOperator) Next(ctx context.Context) ([]model.StepVector, error) { default: } o.loadSeries() + start := time.Now() vectors, err := o.next.Next(ctx) if err != nil { @@ -93,5 +105,6 @@ func (o *absentOperator) Next(ctx context.Context) ([]model.StepVector, error) { o.next.GetPool().PutStepVector(vectors[i]) } o.next.GetPool().PutVectors(vectors) + o.AddExecutionTimeTaken(time.Since(start)) return result, nil } diff --git a/execution/function/histogram.go b/execution/function/histogram.go index 04697d10..bad31fd4 100644 --- a/execution/function/histogram.go +++ b/execution/function/histogram.go @@ -9,6 +9,7 @@ import ( "math" "strconv" "sync" + "time" "github.com/cespare/xxhash/v2" "github.com/prometheus/prometheus/model/labels" @@ -44,6 +45,19 @@ type histogramOperator struct { // seriesBuckets are the buckets for each individual conventional histogram series. seriesBuckets []buckets + model.OperatorTelemetry +} + +func (o *histogramOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*functionOperator]") + next := make([]model.ObservableVectorOperator, 0, 2) + if obsScalarOp, ok := o.scalarOp.(model.ObservableVectorOperator); ok { + next = append(next, obsScalarOp) + } + if obsVectorOp, ok := o.vectorOp.(model.ObservableVectorOperator); ok { + next = append(next, obsVectorOp) + } + return o, next } func (o *histogramOperator) Explain() (me string, next []model.VectorOperator) { @@ -71,7 +85,7 @@ func (o *histogramOperator) Next(ctx context.Context) ([]model.StepVector, error return nil, ctx.Err() default: } - + start := time.Now() var err error o.once.Do(func() { err = o.loadSeries(ctx) }) if err != nil { @@ -100,6 +114,7 @@ func (o *histogramOperator) Next(ctx context.Context) ([]model.StepVector, error o.scalarOp.GetPool().PutStepVector(scalar) } o.scalarOp.GetPool().PutVectors(scalars) + o.AddExecutionTimeTaken(time.Since(start)) return o.processInputSeries(vectors) } diff --git a/execution/function/noarg.go b/execution/function/noarg.go index f1091149..fef28130 100644 --- a/execution/function/noarg.go +++ b/execution/function/noarg.go @@ -6,6 +6,7 @@ package function import ( "context" "fmt" + "time" "github.com/prometheus/prometheus/model/labels" @@ -24,9 +25,16 @@ type noArgFunctionOperator struct { vectorPool *model.VectorPool series []labels.Labels sampleIDs []uint64 + model.OperatorTelemetry +} + +func (o *noArgFunctionOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*noArgFunctionOperator]") + return o, []model.ObservableVectorOperator{} } func (o *noArgFunctionOperator) Explain() (me string, next []model.VectorOperator) { + return fmt.Sprintf("[*noArgFunctionOperator] %v()", o.funcExpr.Func.Name), []model.VectorOperator{} } @@ -42,6 +50,7 @@ func (o *noArgFunctionOperator) Next(_ context.Context) ([]model.StepVector, err if o.currentStep > o.maxt { return nil, nil } + start := time.Now() ret := o.vectorPool.GetVectorBatch() for i := 0; i < o.stepsBatch && o.currentStep <= o.maxt; i++ { sv := o.vectorPool.GetStepVector(o.currentStep) @@ -50,6 +59,7 @@ func (o *noArgFunctionOperator) Next(_ context.Context) ([]model.StepVector, err ret = append(ret, sv) o.currentStep += o.step } + o.AddExecutionTimeTaken(time.Since(start)) return ret, nil } diff --git a/execution/function/operator.go b/execution/function/operator.go index b4f69768..7ccc254d 100644 --- a/execution/function/operator.go +++ b/execution/function/operator.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "sync" + "time" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" @@ -29,35 +30,51 @@ type functionOperator struct { call functionCall scalarPoints [][]float64 + model.OperatorTelemetry +} + +func SetTelemetry(opts *query.Options) model.OperatorTelemetry { + if opts.EnableAnalysis { + return &model.TrackedTelemetry{} + } + return &model.NoopTelemetry{} } func NewFunctionOperator(funcExpr *parser.Call, nextOps []model.VectorOperator, stepsBatch int, opts *query.Options) (model.VectorOperator, error) { // Some functions need to be handled in special operators + switch funcExpr.Func.Name { case "scalar": return &scalarFunctionOperator{ - next: nextOps[0], - pool: model.NewVectorPoolWithSize(stepsBatch, 1), + next: nextOps[0], + pool: model.NewVectorPoolWithSize(stepsBatch, 1), + OperatorTelemetry: SetTelemetry(opts), }, nil + case "label_join", "label_replace": return &relabelFunctionOperator{ - next: nextOps[0], - funcExpr: funcExpr, + next: nextOps[0], + funcExpr: funcExpr, + OperatorTelemetry: SetTelemetry(opts), }, nil + case "absent": return &absentOperator{ - next: nextOps[0], - pool: model.NewVectorPool(stepsBatch), - funcExpr: funcExpr, + next: nextOps[0], + pool: model.NewVectorPool(stepsBatch), + funcExpr: funcExpr, + OperatorTelemetry: SetTelemetry(opts), }, nil + case "histogram_quantile": return &histogramOperator{ - pool: model.NewVectorPool(stepsBatch), - funcArgs: funcExpr.Args, - once: sync.Once{}, - scalarOp: nextOps[0], - vectorOp: nextOps[1], - scalarPoints: make([]float64, stepsBatch), + pool: model.NewVectorPool(stepsBatch), + funcArgs: funcExpr.Args, + once: sync.Once{}, + scalarOp: nextOps[0], + vectorOp: nextOps[1], + scalarPoints: make([]float64, stepsBatch), + OperatorTelemetry: SetTelemetry(opts), }, nil } @@ -91,7 +108,6 @@ func newNoArgsFunctionOperator(funcExpr *parser.Call, stepsBatch int, opts *quer call: call, vectorPool: model.NewVectorPool(stepsBatch), } - switch funcExpr.Func.Name { case "pi", "time": op.sampleIDs = []uint64{0} @@ -100,6 +116,11 @@ func newNoArgsFunctionOperator(funcExpr *parser.Call, stepsBatch int, opts *quer op.series = []labels.Labels{{}} op.sampleIDs = []uint64{0} } + op.OperatorTelemetry = &model.NoopTelemetry{} + if opts.EnableAnalysis { + op.OperatorTelemetry = &model.TrackedTelemetry{} + } + return op, nil } @@ -127,6 +148,10 @@ func newInstantVectorFunctionOperator(funcExpr *parser.Call, nextOps []model.Vec break } } + f.OperatorTelemetry = &model.NoopTelemetry{} + if opts.EnableAnalysis { + f.OperatorTelemetry = &model.TrackedTelemetry{} + } // Check selector type. switch funcExpr.Args[f.vectorIndex].Type() { @@ -137,6 +162,17 @@ func newInstantVectorFunctionOperator(funcExpr *parser.Call, nextOps []model.Vec } } +func (o *functionOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*functionOperator]") + obsOperators := make([]model.ObservableVectorOperator, 0, len(o.nextOps)) + for _, operator := range o.nextOps { + if obsOperator, ok := operator.(model.ObservableVectorOperator); ok { + obsOperators = append(obsOperators, obsOperator) + } + } + return o, obsOperators +} + func (o *functionOperator) Explain() (me string, next []model.VectorOperator) { return fmt.Sprintf("[*functionOperator] %v(%v)", o.funcExpr.Func.Name, o.funcExpr.Args), o.nextOps } @@ -163,7 +199,7 @@ func (o *functionOperator) Next(ctx context.Context) ([]model.StepVector, error) if err := o.loadSeries(ctx); err != nil { return nil, err } - + start := time.Now() // Process non-variadic single/multi-arg instant vector and scalar input functions. // Call next on vector input. vectors, err := o.nextOps[o.vectorIndex].Next(ctx) @@ -223,6 +259,8 @@ func (o *functionOperator) Next(ctx context.Context) ([]model.StepVector, error) } } + o.AddExecutionTimeTaken(time.Since(start)) + return vectors, nil } diff --git a/execution/function/relabel.go b/execution/function/relabel.go index b650e311..5b1600c4 100644 --- a/execution/function/relabel.go +++ b/execution/function/relabel.go @@ -8,6 +8,7 @@ import ( "regexp" "strings" "sync" + "time" "github.com/efficientgo/core/errors" prommodel "github.com/prometheus/common/model" @@ -22,6 +23,16 @@ type relabelFunctionOperator struct { funcExpr *parser.Call once sync.Once series []labels.Labels + model.OperatorTelemetry +} + +func (o *relabelFunctionOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*relabelFunctionOperator]") + next := make([]model.ObservableVectorOperator, 0, 1) + if obsnext, ok := o.next.(model.ObservableVectorOperator); ok { + next = append(next, obsnext) + } + return o, next } func (o *relabelFunctionOperator) Explain() (me string, next []model.VectorOperator) { @@ -39,7 +50,10 @@ func (o *relabelFunctionOperator) GetPool() *model.VectorPool { } func (o *relabelFunctionOperator) Next(ctx context.Context) ([]model.StepVector, error) { - return o.next.Next(ctx) + start := time.Now() + next, err := o.next.Next(ctx) + o.AddExecutionTimeTaken(time.Since(start)) + return next, err } func (o *relabelFunctionOperator) loadSeries(ctx context.Context) (err error) { diff --git a/execution/function/scalar.go b/execution/function/scalar.go index 4335b6fe..9d6a8553 100644 --- a/execution/function/scalar.go +++ b/execution/function/scalar.go @@ -6,6 +6,7 @@ package function import ( "context" "math" + "time" "github.com/prometheus/prometheus/model/labels" @@ -15,6 +16,16 @@ import ( type scalarFunctionOperator struct { pool *model.VectorPool next model.VectorOperator + model.OperatorTelemetry +} + +func (o *scalarFunctionOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*scalarFunctionOperator]") + next := make([]model.ObservableVectorOperator, 0, 1) + if obsnext, ok := o.next.(model.ObservableVectorOperator); ok { + next = append(next, obsnext) + } + return o, next } func (o *scalarFunctionOperator) Explain() (me string, next []model.VectorOperator) { @@ -35,6 +46,7 @@ func (o *scalarFunctionOperator) Next(ctx context.Context) ([]model.StepVector, return nil, ctx.Err() default: } + start := time.Now() in, err := o.next.Next(ctx) if err != nil { return nil, err @@ -55,5 +67,7 @@ func (o *scalarFunctionOperator) Next(ctx context.Context) ([]model.StepVector, o.next.GetPool().PutStepVector(vector) } o.next.GetPool().PutVectors(in) + o.AddExecutionTimeTaken(time.Since(start)) + return result, nil } diff --git a/execution/model/operator.go b/execution/model/operator.go index 40d04eb6..021a36fe 100644 --- a/execution/model/operator.go +++ b/execution/model/operator.go @@ -5,10 +5,59 @@ package model import ( "context" + "time" "github.com/prometheus/prometheus/model/labels" ) +type NoopTelemetry struct{} + +type TrackedTelemetry struct { + name string + ExecutionTime time.Duration +} + +func (ti *NoopTelemetry) AddExecutionTimeTaken(t time.Duration) {} + +func (ti *TrackedTelemetry) AddExecutionTimeTaken(t time.Duration) { + ti.ExecutionTime += t +} + +func (ti *TrackedTelemetry) Name() string { + return ti.name +} + +func (ti *TrackedTelemetry) SetName(operatorName string) { + ti.name = operatorName +} + +func (ti *NoopTelemetry) Name() string { + return "" +} + +func (ti *NoopTelemetry) SetName(operatorName string) {} + +type OperatorTelemetry interface { + AddExecutionTimeTaken(time.Duration) + ExecutionTimeTaken() time.Duration + SetName(string) + Name() string +} + +func (ti *NoopTelemetry) ExecutionTimeTaken() time.Duration { + return time.Duration(0) +} + +func (ti *TrackedTelemetry) ExecutionTimeTaken() time.Duration { + return ti.ExecutionTime +} + +type ObservableVectorOperator interface { + VectorOperator + OperatorTelemetry + Analyze() (OperatorTelemetry, []ObservableVectorOperator) +} + // VectorOperator performs operations on series in step by step fashion. type VectorOperator interface { // Next yields vectors of samples from all series for one or more execution steps. diff --git a/execution/remote/operator.go b/execution/remote/operator.go index e87b1c50..3e232fa2 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -24,17 +24,28 @@ type Execution struct { opts *query.Options queryRangeStart time.Time vectorSelector model.VectorOperator + model.OperatorTelemetry } func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, opts *query.Options) *Execution { storage := newStorageFromQuery(query, opts) - return &Execution{ + e := &Execution{ storage: storage, query: query, opts: opts, queryRangeStart: queryRangeStart, vectorSelector: scan.NewVectorSelector(pool, storage, opts, 0, 0, 1), } + e.OperatorTelemetry = &model.NoopTelemetry{} + if opts.EnableAnalysis { + e.OperatorTelemetry = &model.TrackedTelemetry{} + } + return e +} + +func (e *Execution) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + e.SetName("[*remoteExec]") + return e, nil } func (e *Execution) Series(ctx context.Context) ([]labels.Labels, error) { @@ -42,7 +53,9 @@ func (e *Execution) Series(ctx context.Context) ([]labels.Labels, error) { } func (e *Execution) Next(ctx context.Context) ([]model.StepVector, error) { + start := time.Now() next, err := e.vectorSelector.Next(ctx) + e.AddExecutionTimeTaken(time.Since(start)) if next == nil { // Closing the storage prematurely can lead to results from the query // engine to be recycled. Because of this, we close the storage only diff --git a/execution/scan/literal_selector.go b/execution/scan/literal_selector.go index 802f067a..c65192f1 100644 --- a/execution/scan/literal_selector.go +++ b/execution/scan/literal_selector.go @@ -7,11 +7,12 @@ import ( "context" "fmt" "sync" + "time" + + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/query" - - "github.com/prometheus/prometheus/model/labels" ) // numberLiteralSelector returns []model.StepVector with same sample value across time range. @@ -27,10 +28,11 @@ type numberLiteralSelector struct { once sync.Once val float64 + model.OperatorTelemetry } func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val float64) *numberLiteralSelector { - return &numberLiteralSelector{ + op := &numberLiteralSelector{ vectorPool: pool, numSteps: opts.NumSteps(), mint: opts.Start.UnixMilli(), @@ -39,6 +41,18 @@ func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val f currentStep: opts.Start.UnixMilli(), val: val, } + + op.OperatorTelemetry = &model.NoopTelemetry{} + if opts.EnableAnalysis { + op.OperatorTelemetry = &model.TrackedTelemetry{} + } + + return op +} + +func (o *numberLiteralSelector) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*numberLiteralSelector] ") + return o, nil } func (o *numberLiteralSelector) Explain() (me string, next []model.VectorOperator) { @@ -60,6 +74,7 @@ func (o *numberLiteralSelector) Next(ctx context.Context) ([]model.StepVector, e return nil, ctx.Err() default: } + start := time.Now() if o.currentStep > o.maxt { return nil, nil @@ -83,6 +98,7 @@ func (o *numberLiteralSelector) Next(ctx context.Context) ([]model.StepVector, e o.step = 1 } o.currentStep += o.step * int64(o.numSteps) + o.AddExecutionTimeTaken(time.Since(start)) return vectors, nil } diff --git a/execution/scan/matrix_selector.go b/execution/scan/matrix_selector.go index c273847d..5f7cdf71 100644 --- a/execution/scan/matrix_selector.go +++ b/execution/scan/matrix_selector.go @@ -56,6 +56,7 @@ type matrixSelector struct { // Lookback delta for extended range functions. extLookbackDelta int64 + model.OperatorTelemetry } // NewMatrixSelector creates operator which selects vector of series over time. @@ -66,13 +67,14 @@ func NewMatrixSelector( opts *query.Options, selectRange, offset time.Duration, shard, numShard int, + ) (model.VectorOperator, error) { call, ok := rangeVectorFuncs[funcExpr.Func.Name] if !ok { return nil, parse.UnknownFunctionError(funcExpr.Func) } isExtFunction := parse.IsExtFunction(funcExpr.Func.Name) - return &matrixSelector{ + m := &matrixSelector{ storage: selector, call: call, funcExpr: funcExpr, @@ -92,7 +94,18 @@ func NewMatrixSelector( numShards: numShard, extLookbackDelta: opts.ExtLookbackDelta.Milliseconds(), - }, nil + } + m.OperatorTelemetry = &model.NoopTelemetry{} + if opts.EnableAnalysis { + m.OperatorTelemetry = &model.TrackedTelemetry{} + } + + return m, nil +} + +func (o *matrixSelector) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*matrixSelector]") + return o, nil } func (o *matrixSelector) Explain() (me string, next []model.VectorOperator) { @@ -120,6 +133,7 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { return nil, ctx.Err() default: } + start := time.Now() if o.currentStep > o.maxt { return nil, nil @@ -203,7 +217,7 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { o.step = 1 } o.currentStep += o.step * int64(o.numSteps) - + o.AddExecutionTimeTaken(time.Since(start)) return vectors, nil } diff --git a/execution/scan/vector_selector.go b/execution/scan/vector_selector.go index ddfe802c..4b2c4dd8 100644 --- a/execution/scan/vector_selector.go +++ b/execution/scan/vector_selector.go @@ -47,6 +47,7 @@ type vectorSelector struct { shard int numShards int + model.OperatorTelemetry } // NewVectorSelector creates operator which selects vector of series. @@ -57,7 +58,7 @@ func NewVectorSelector( offset time.Duration, shard, numShards int, ) model.VectorOperator { - return &vectorSelector{ + o := &vectorSelector{ storage: selector, vectorPool: pool, @@ -72,6 +73,16 @@ func NewVectorSelector( shard: shard, numShards: numShards, } + o.OperatorTelemetry = &model.NoopTelemetry{} + if queryOpts.EnableAnalysis { + o.OperatorTelemetry = &model.TrackedTelemetry{} + } + return o +} + +func (o *vectorSelector) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + o.SetName("[*vectorSelector]") + return o, nil } func (o *vectorSelector) Explain() (me string, next []model.VectorOperator) { @@ -95,7 +106,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { return nil, ctx.Err() default: } - + start := time.Now() if o.currentStep > o.maxt { return nil, nil } @@ -140,6 +151,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { o.step = 1 } o.currentStep += o.step * int64(o.numSteps) + o.AddExecutionTimeTaken(time.Since(start)) return vectors, nil } diff --git a/execution/step_invariant/step_invariant.go b/execution/step_invariant/step_invariant.go index b3c3a211..33a2566f 100644 --- a/execution/step_invariant/step_invariant.go +++ b/execution/step_invariant/step_invariant.go @@ -6,6 +6,7 @@ package step_invariant import ( "context" "sync" + "time" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" @@ -30,6 +31,16 @@ type stepInvariantOperator struct { step int64 currentStep int64 stepsBatch int + model.OperatorTelemetry +} + +func (u *stepInvariantOperator) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + u.SetName("[*stepInvariantOperator]") + next := make([]model.ObservableVectorOperator, 0, 1) + if obsnext, ok := u.next.(model.ObservableVectorOperator); ok { + next = append(next, obsnext) + } + return u, next } func (u *stepInvariantOperator) Explain() (me string, next []model.VectorOperator) { @@ -64,6 +75,10 @@ func NewStepInvariantOperator( case *parser.MatrixSelector, *parser.SubqueryExpr: u.cacheResult = false } + u.OperatorTelemetry = &model.NoopTelemetry{} + if opts.EnableAnalysis { + u.OperatorTelemetry = &model.TrackedTelemetry{} + } return u, nil } @@ -88,6 +103,7 @@ func (u *stepInvariantOperator) Next(ctx context.Context) ([]model.StepVector, e if u.currentStep > u.maxt { return nil, nil } + start := time.Now() select { case <-ctx.Done(): @@ -111,6 +127,7 @@ func (u *stepInvariantOperator) Next(ctx context.Context) ([]model.StepVector, e result = append(result, outVector) u.currentStep += u.step } + u.AddExecutionTimeTaken(time.Since(start)) return result, nil } diff --git a/execution/unary/unary.go b/execution/unary/unary.go index 46b11b47..e1a0a18a 100644 --- a/execution/unary/unary.go +++ b/execution/unary/unary.go @@ -6,6 +6,7 @@ package unary import ( "context" "sync" + "time" "github.com/prometheus/prometheus/model/labels" "gonum.org/v1/gonum/floats" @@ -18,6 +19,7 @@ type unaryNegation struct { once sync.Once series []labels.Labels + model.OperatorTelemetry } func (u *unaryNegation) Explain() (me string, next []model.VectorOperator) { @@ -29,10 +31,20 @@ func NewUnaryNegation( stepsBatch int, ) (model.VectorOperator, error) { u := &unaryNegation{ - next: next, + next: next, + OperatorTelemetry: &model.TrackedTelemetry{}, } + return u, nil } +func (u *unaryNegation) Analyze() (model.OperatorTelemetry, []model.ObservableVectorOperator) { + u.SetName("[*unaryNegation]") + next := make([]model.ObservableVectorOperator, 0, 1) + if obsnext, ok := u.next.(model.ObservableVectorOperator); ok { + next = append(next, obsnext) + } + return u, next +} func (u *unaryNegation) Series(ctx context.Context) ([]labels.Labels, error) { if err := u.loadSeries(ctx); err != nil { @@ -68,7 +80,7 @@ func (u *unaryNegation) Next(ctx context.Context) ([]model.StepVector, error) { return nil, ctx.Err() default: } - + start := time.Now() in, err := u.next.Next(ctx) if err != nil { return nil, err @@ -79,5 +91,6 @@ func (u *unaryNegation) Next(ctx context.Context) ([]model.StepVector, error) { for i := range in { floats.Scale(-1, in[i].Samples) } + u.AddExecutionTimeTaken(time.Since(start)) return in, nil } diff --git a/query/options.go b/query/options.go index 0143eea9..fed4fa26 100644 --- a/query/options.go +++ b/query/options.go @@ -16,7 +16,8 @@ type Options struct { LookbackDelta time.Duration ExtLookbackDelta time.Duration - StepsBatch int64 + StepsBatch int64 + EnableAnalysis bool } func (o *Options) NumSteps() int { From 0918d9a2be28557a93117399b60cf96c6ade469a Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 16 Aug 2023 08:28:37 +0200 Subject: [PATCH 07/11] Propagate warnings through context (#298) * Propagate warnings through context The engine does not return warnings from storage which causes problems with partial response detection in Thanos. I initially thought we had to change the interface of each operator, but @MichaHoffmann had a neat idea to propagate warnings through the context since they have no impact on flow control. Signed-off-by: Filip Petkovski * Fix lint Signed-off-by: Filip Petkovski * Replace fmt.Errorf with errors.New Signed-off-by: Filip Petkovski * Use custom type as key Signed-off-by: Filip Petkovski --------- Signed-off-by: Filip Petkovski --- engine/engine.go | 9 ++++ engine/engine_test.go | 64 +++++++++++++++++++++++++++- execution/storage/series_selector.go | 3 ++ execution/warnings/context.go | 54 +++++++++++++++++++++++ 4 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 execution/warnings/context.go diff --git a/engine/engine.go b/engine/engine.go index f1342e18..12858ba6 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -30,6 +30,7 @@ import ( "github.com/thanos-io/promql-engine/execution" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" + "github.com/thanos-io/promql-engine/execution/warnings" "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/promql-engine/parser" ) @@ -515,6 +516,14 @@ type compatibilityQuery struct { } func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { + ctx = warnings.NewContext(ctx) + defer func() { + warns := warnings.FromContext(ctx) + if len(warns) > 0 { + ret.Warnings = warns + } + }() + // Handle case with strings early on as this does not need us to process samples. switch e := q.expr.(type) { case *parser.StringLiteral: diff --git a/engine/engine_test.go b/engine/engine_test.go index ed35974a..afb5b08a 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/efficientgo/core/errors" promparser "github.com/prometheus/prometheus/promql/parser" "github.com/google/go-cmp/cmp/cmpopts" @@ -1913,6 +1914,7 @@ func TestQueriesAgainstOldEngine(t *testing.T) { return } + testutil.Equals(t, newResult.Warnings, oldResult.Warnings) testutil.Ok(t, newResult.Err) if hasNaNs(oldResult) { t.Log("Applying comparison with NaN equality.") @@ -2004,6 +2006,58 @@ func mergeWithSampleDedup(series []*mockSeries) []storage.Series { return sset } +func TestWarnings(t *testing.T) { + querier := &storage.MockQueryable{ + MockQuerier: &storage.MockQuerier{ + SelectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return newWarningsSeriesSet(storage.Warnings{errors.New("test warning")}) + }, + }, + } + + var ( + start = time.UnixMilli(0) + end = time.UnixMilli(600) + step = 30 * time.Second + ) + + cases := []struct { + name string + query string + expectedWarns storage.Warnings + }{ + { + name: "single select call", + query: "http_requests_total", + expectedWarns: storage.Warnings{ + errors.New("test warning"), + }, + }, + { + name: "multiple select calls", + query: `sum(http_requests_total) / sum(http_responses_total)`, + expectedWarns: storage.Warnings{ + errors.New("test warning"), + errors.New("test warning"), + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + newEngine := engine.New(engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour}}) + q1, err := newEngine.NewRangeQuery(context.Background(), querier, nil, tc.query, start, end, step) + testutil.Ok(t, err) + + res := q1.Exec(context.Background()) + testutil.Ok(t, res.Err) + testutil.WithGoCmp(cmp.Comparer(func(err1, err2 error) bool { + return err1.Error() == err2.Error() + })).Equals(t, tc.expectedWarns, res.Warnings) + }) + } +} + func TestEdgeCases(t *testing.T) { testCases := []struct { name string @@ -4212,6 +4266,7 @@ func (m *mockIterator) Err() error { return nil } type testSeriesSet struct { i int series []storage.Series + warns storage.Warnings } func newTestSeriesSet(series ...storage.Series) storage.SeriesSet { @@ -4221,10 +4276,17 @@ func newTestSeriesSet(series ...storage.Series) storage.SeriesSet { } } +func newWarningsSeriesSet(warns storage.Warnings) storage.SeriesSet { + return &testSeriesSet{ + i: -1, + warns: warns, + } +} + func (s *testSeriesSet) Next() bool { s.i++; return s.i < len(s.series) } func (s *testSeriesSet) At() storage.Series { return s.series[s.i] } func (s *testSeriesSet) Err() error { return nil } -func (s *testSeriesSet) Warnings() storage.Warnings { return nil } +func (s *testSeriesSet) Warnings() storage.Warnings { return s.warns } type slowSeries struct{} diff --git a/execution/storage/series_selector.go b/execution/storage/series_selector.go index e9f0cf27..433fe6a1 100644 --- a/execution/storage/series_selector.go +++ b/execution/storage/series_selector.go @@ -9,6 +9,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + + "github.com/thanos-io/promql-engine/execution/warnings" ) type SeriesSelector interface { @@ -76,6 +78,7 @@ func (o *seriesSelector) loadSeries(ctx context.Context) error { i++ } + warnings.AddToContext(seriesSet.Warnings(), ctx) return seriesSet.Err() } diff --git a/execution/warnings/context.go b/execution/warnings/context.go new file mode 100644 index 00000000..85482247 --- /dev/null +++ b/execution/warnings/context.go @@ -0,0 +1,54 @@ +package warnings + +import ( + "context" + "sync" + + "github.com/prometheus/prometheus/storage" +) + +type warningKey string + +const key warningKey = "promql-warnings" + +type warnings struct { + mu sync.Mutex + warns storage.Warnings +} + +func newWarnings() *warnings { + return &warnings{ + warns: make(storage.Warnings, 0), + } +} + +func (w *warnings) add(warns storage.Warnings) { + w.mu.Lock() + defer w.mu.Unlock() + w.warns = append(w.warns, warns...) +} + +func (w *warnings) get() storage.Warnings { + w.mu.Lock() + defer w.mu.Unlock() + return w.warns +} + +func NewContext(ctx context.Context) context.Context { + return context.WithValue(ctx, key, newWarnings()) +} + +func AddToContext(warns storage.Warnings, ctx context.Context) { + if len(warns) == 0 { + return + } + w, ok := ctx.Value(key).(*warnings) + if !ok { + return + } + w.add(warns) +} + +func FromContext(ctx context.Context) storage.Warnings { + return ctx.Value(key).(*warnings).get() +} From 48ef19d930bacbabf7c44618d97be6832420ee41 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 16 Aug 2023 18:38:54 +0200 Subject: [PATCH 08/11] Propagate warnings from remote engines (#300) This is a follow up commit to https://github.com/thanos-io/promql-engine/pull/298 which enables propagating warnings from remote engines into the distributed query context. Signed-off-by: Filip Petkovski --- engine/distributed_test.go | 31 +++++++++++++++++++++++++++++++ execution/remote/operator.go | 2 ++ 2 files changed, 33 insertions(+) diff --git a/engine/distributed_test.go b/engine/distributed_test.go index ed8ca64e..68e9c31e 100644 --- a/engine/distributed_test.go +++ b/engine/distributed_test.go @@ -10,9 +10,11 @@ import ( "testing" "time" + "github.com/efficientgo/core/errors" "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" "github.com/thanos-io/promql-engine/api" "github.com/thanos-io/promql-engine/engine" @@ -329,3 +331,32 @@ func TestDistributedAggregations(t *testing.T) { } } } + +func TestDistributedEngineWarnings(t *testing.T) { + querier := &storage.MockQueryable{ + MockQuerier: &storage.MockQuerier{ + SelectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return newWarningsSeriesSet(storage.Warnings{errors.New("test warning")}) + }, + }, + } + + opts := engine.Opts{ + EngineOpts: promql.EngineOpts{ + MaxSamples: math.MaxInt64, + Timeout: 1 * time.Minute, + }, + } + remote := engine.NewRemoteEngine(opts, querier, math.MinInt64, math.MaxInt64, nil) + ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remote})) + var ( + start = time.UnixMilli(0) + end = time.UnixMilli(600) + step = 30 * time.Second + ) + q, err := ng.NewRangeQuery(context.Background(), nil, nil, "test", start, end, step) + testutil.Ok(t, err) + + res := q.Exec(context.Background()) + testutil.Equals(t, 1, len(res.Warnings)) +} diff --git a/execution/remote/operator.go b/execution/remote/operator.go index 3e232fa2..f643c5d2 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -15,6 +15,7 @@ import ( "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/scan" engstore "github.com/thanos-io/promql-engine/execution/storage" + "github.com/thanos-io/promql-engine/execution/warnings" "github.com/thanos-io/promql-engine/query" ) @@ -102,6 +103,7 @@ func (s *storageAdapter) GetSeries(ctx context.Context, _, _ int) ([]engstore.Si func (s *storageAdapter) executeQuery(ctx context.Context) { result := s.query.Exec(ctx) + warnings.AddToContext(result.Warnings, ctx) if result.Err != nil { s.err = result.Err return From 755a3d78b58e859046e19a0435d6029972dda6c4 Mon Sep 17 00:00:00 2001 From: Player256 Date: Thu, 17 Aug 2023 00:55:37 +0530 Subject: [PATCH 09/11] added changes to the bench-fast target --- Makefile | 8 +-- engine/bench_test.go | 131 ++++++++++++++++++++++++++++--------------- 2 files changed, 89 insertions(+), 50 deletions(-) diff --git a/Makefile b/Makefile index f8c930d3..0cb7114b 100644 --- a/Makefile +++ b/Makefile @@ -98,13 +98,13 @@ benchmarks: .PHONY: bench-old bench-old: benchmarks @echo "Benchmarking old engine" - @go test ./... -bench 'BenchmarkRangeQuery/.*/old_engine' -run none -count 5 | sed -u 's/\/old_engine//' > benchmarks/old.out + @go test ./... -bench 'BenchmarkRangeQuery/slow/.*/old_engine' -run none -count 5 | sed -u 's/\/old_engine//' > benchmarks/old.out @go test ./... -bench 'BenchmarkNativeHistograms/.*/old_engine' -run none -count 5 | sed -u 's/\/old_engine//' >> benchmarks/old.out .PHONY: bench-new bench-new: benchmarks @echo "Benchmarking new engine" - @go test ./... -bench 'BenchmarkRangeQuery/.*/new_engine' -run none -count 5 | sed -u 's/\/new_engine//' > benchmarks/new.out + @go test ./... -bench 'BenchmarkRangeQuery/slow/.*/new_engine' -run none -count 5 | sed -u 's/\/new_engine//' > benchmarks/new.out @go test ./... -bench 'BenchmarkNativeHistograms/.*/new_engine' -run none -count 5 | sed -u 's/\/new_engine//' >> benchmarks/new.out .PHONY: benchmark @@ -114,10 +114,10 @@ benchmark: bench-old bench-new .PHONY : bench-fast bench-fast: benchmarks @echo "Benchmarking old engine" - @go test ./engine -bench 'BenchmarkRangeQuery/.*/old_engine' -run none -count 5 -short | sed -u 's/\/old_engine//' > benchmarks/old.out + @go test ./engine -bench 'BenchmarkRangeQuery/fast/.*/old_engine' -run none -count 5 -short | sed -u 's/\/old_engine//' > benchmarks/old.out @go test ./engine -bench 'BenchmarkNativeHistograms/.*/old_engine' -run none -count 5 | sed -u 's/\/old_engine//' >> benchmarks/old.out @echo "Benchmarking new engine" - @go test ./engine -bench 'BenchmarkRangeQuery/.*/new_engine' -run none -count 5 -short | sed -u 's/\/new_engine//' > benchmarks/new.out + @go test ./engine -bench 'BenchmarkRangeQuery/fast/.*/new_engine' -run none -count 5 -short | sed -u 's/\/new_engine//' > benchmarks/new.out @go test ./engine -bench 'BenchmarkNativeHistograms/.*/new_engine' -run none -count 5 | sed -u 's/\/new_engine//' >> benchmarks/new.out @benchstat benchmarks/old.out benchmarks/new.out diff --git a/engine/bench_test.go b/engine/bench_test.go index 39839485..3ab9e398 100644 --- a/engine/bench_test.go +++ b/engine/bench_test.go @@ -102,14 +102,9 @@ func BenchmarkSingleQuery(b *testing.B) { func BenchmarkRangeQuery(b *testing.B) { samplesPerHour := 60 * 2 - sixHourDataset := setupStorage(b, 1000, 3, 6*samplesPerHour) - defer sixHourDataset.Close() - - largeSixHourDataset := setupStorage(b, 10000, 10, 6*samplesPerHour) - defer largeSixHourDataset.Close() - - sevenDaysAndTwoHoursDataset := setupStorage(b, 1000, 3, (7*24+2)*samplesPerHour) - defer sevenDaysAndTwoHoursDataset.Close() + var sixHourDataset *promql.Test + var largeSixHourDataset *promql.Test + var sevenDaysAndTwoHoursDataset *promql.Test start := time.Unix(0, 0) end := start.Add(2 * time.Hour) @@ -272,54 +267,98 @@ func BenchmarkRangeQuery(b *testing.B) { }, } - for _, tc := range cases { - b.Run(tc.name, func(b *testing.B) { - b.ReportAllocs() - b.Run("old_engine", func(b *testing.B) { - opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 50000000, - Timeout: 100 * time.Second, - EnableAtModifier: true, - EnableNegativeOffset: true, - } - engine := promql.NewEngine(opts) + b.Run("fast", func(b *testing.B) { + sixHourDataset = setupStorage(b, 1000, 3, 6*samplesPerHour) + defer sixHourDataset.Close() - b.ResetTimer() + for _, tc := range cases { + b.Run(tc.name, func(b *testing.B) { + if testing.Short() && (tc.test == largeSixHourDataset || tc.test == sevenDaysAndTwoHoursDataset) { + b.Skip() + } b.ReportAllocs() - - if testing.Short() { - if tc.test != sixHourDataset { - b.Skip() + b.Run("old_engine", func(b *testing.B) { + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 50000000, + Timeout: 100 * time.Second, + EnableAtModifier: true, + EnableNegativeOffset: true, } - } + engine := promql.NewEngine(opts) - for i := 0; i < b.N; i++ { - qry, err := engine.NewRangeQuery(tc.test.Context(), tc.test.Queryable(), nil, tc.query, start, end, step) - testutil.Ok(b, err) + b.ResetTimer() + b.ReportAllocs() - oldResult := qry.Exec(tc.test.Context()) - testutil.Ok(b, oldResult.Err) - } + for i := 0; i < b.N; i++ { + qry, err := engine.NewRangeQuery(tc.test.Context(), tc.test.Queryable(), nil, tc.query, start, end, step) + testutil.Ok(b, err) + + oldResult := qry.Exec(tc.test.Context()) + testutil.Ok(b, oldResult.Err) + } + }) + b.Run("new_engine", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + newResult := executeRangeQuery(b, tc.query, tc.test, start, end, step) + testutil.Ok(b, newResult.Err) + } + }) }) - b.Run("new_engine", func(b *testing.B) { - b.ResetTimer() - b.ReportAllocs() + } + }) + + b.Run("slow", func(b *testing.B) { + largeSixHourDataset = setupStorage(b, 10000, 10, 6*samplesPerHour) + defer largeSixHourDataset.Close() + + sevenDaysAndTwoHoursDataset = setupStorage(b, 1000, 3, (7*24+2)*samplesPerHour) + defer sevenDaysAndTwoHoursDataset.Close() - if testing.Short() { - if tc.test != sixHourDataset { - b.Skip() + sixHourDataset = setupStorage(b, 1000, 3, 6*samplesPerHour) + defer sixHourDataset.Close() + + for _, tc := range cases { + b.Run(tc.name, func(b *testing.B) { + b.ReportAllocs() + b.Run("old_engine", func(b *testing.B) { + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 50000000, + Timeout: 100 * time.Second, + EnableAtModifier: true, + EnableNegativeOffset: true, } - } + engine := promql.NewEngine(opts) - for i := 0; i < b.N; i++ { - newResult := executeRangeQuery(b, tc.query, tc.test, start, end, step) - testutil.Ok(b, newResult.Err) - } + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + qry, err := engine.NewRangeQuery(tc.test.Context(), tc.test.Queryable(), nil, tc.query, start, end, step) + testutil.Ok(b, err) + + oldResult := qry.Exec(tc.test.Context()) + testutil.Ok(b, oldResult.Err) + } + }) + b.Run("new_engine", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + newResult := executeRangeQuery(b, tc.query, tc.test, start, end, step) + testutil.Ok(b, newResult.Err) + } + }) }) - }) - } + } + }) } func BenchmarkNativeHistograms(b *testing.B) { From a9d826ac3852e7ba7d05a1577d9b79b5035febd1 Mon Sep 17 00:00:00 2001 From: Player256 Date: Thu, 17 Aug 2023 01:03:43 +0530 Subject: [PATCH 10/11] dco check Signed-off-by: Player256 --- engine/bench_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/bench_test.go b/engine/bench_test.go index 3ab9e398..32e359f4 100644 --- a/engine/bench_test.go +++ b/engine/bench_test.go @@ -272,7 +272,7 @@ func BenchmarkRangeQuery(b *testing.B) { defer sixHourDataset.Close() for _, tc := range cases { - b.Run(tc.name, func(b *testing.B) { + b.Run(tc.name , func(b *testing.B) { if testing.Short() && (tc.test == largeSixHourDataset || tc.test == sevenDaysAndTwoHoursDataset) { b.Skip() } From b7285f96fbd5fbe8bf48155992609a668b92f880 Mon Sep 17 00:00:00 2001 From: Player256 Date: Thu, 17 Aug 2023 01:15:18 +0530 Subject: [PATCH 11/11] changes made Signed-off-by: Player256 --- engine/bench_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/engine/bench_test.go b/engine/bench_test.go index 32e359f4..395ac4d7 100644 --- a/engine/bench_test.go +++ b/engine/bench_test.go @@ -276,6 +276,7 @@ func BenchmarkRangeQuery(b *testing.B) { if testing.Short() && (tc.test == largeSixHourDataset || tc.test == sevenDaysAndTwoHoursDataset) { b.Skip() } + b.ResetTimer() b.ReportAllocs() b.Run("old_engine", func(b *testing.B) { opts := promql.EngineOpts{ @@ -324,6 +325,7 @@ func BenchmarkRangeQuery(b *testing.B) { for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { + b.ResetTimer() b.ReportAllocs() b.Run("old_engine", func(b *testing.B) { opts := promql.EngineOpts{