Skip to content

Commit

Permalink
Fix range rewrite middleware failing to rewrite queries w/ offset (#3926
Browse files Browse the repository at this point in the history
)

* Fix range rewrite middleware failing to rewrite queries w/ offset

This adds a pointer to the PrometheusEngine to the range rewrite middleware.
We use this engine instance to run the query against a fake Queryable instance
in order to have Prometheus calculate the actual start/end time of the query for us. This is an optional parameter to the middleware and we fall back to using the start/end time on failure.

- Add related query options to QueryStorageMetadataAttributes

* HFix missing options.PrometheusEngine value that was recently removed

* Respond to PR comments

- Refactor tests to use helper functions
- Add comment to leverage dynamic lookback in prometheus when available

* Move query rewriting after lookback calculation

This allows us to take into account the possibility of a lookback rewrite
when we calculate range rewriting

* Fix linter errors

* Fix some style issues
  • Loading branch information
marcushill authored Nov 19, 2021
1 parent 70a91b9 commit 565c655
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 28 deletions.
5 changes: 4 additions & 1 deletion src/query/api/v1/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"encoding/json"
"fmt"
"net/http"
_ "net/http/pprof" // needed for pprof handler registration

// needed for pprof handler registration
_ "net/http/pprof"
"time"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -495,6 +497,7 @@ func (h *Handler) RegisterRoutes() error {
ResolutionMultiplier: h.middlewareConfig.Prometheus.ResolutionMultiplier,
DefaultLookback: h.options.DefaultLookback(),
Storage: h.options.Storage(),
PrometheusEngineFn: h.options.PrometheusEngineFn(),
},
}
override := h.registry.MiddlewareOpts(route)
Expand Down
171 changes: 148 additions & 23 deletions src/query/api/v1/middleware/rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,28 @@ package middleware

import (
"bytes"
"context"
"errors"
"io/ioutil"
"net/http"
"net/url"
"time"

"github.com/gorilla/mux"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
promstorage "github.com/prometheus/prometheus/storage"
"go.uber.org/zap"

"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/storage"
xhttp "github.com/m3db/m3/src/x/net/http"
xtime "github.com/m3db/m3/src/x/time"
)

var errIgnorableQuerierError = errors.New("ignorable error")

// PrometheusRangeRewriteOptions are the options for the prometheus range rewriting middleware.
type PrometheusRangeRewriteOptions struct { // nolint:maligned
Enabled bool
Expand All @@ -45,6 +52,11 @@ type PrometheusRangeRewriteOptions struct { // nolint:maligned
ResolutionMultiplier int
DefaultLookback time.Duration
Storage storage.Storage

// TODO(marcus): There's a conversation with Prometheus about supporting dynamic lookback.
// We can replace this with a single engine reference if that work is ever completed.
// https://groups.google.com/g/prometheus-developers/c/9wzuobfLMV8
PrometheusEngineFn func(time.Duration) (*promql.Engine, error)
}

// PrometheusRangeRewrite is middleware that, when enabled, will rewrite the query parameter
Expand All @@ -67,7 +79,7 @@ func PrometheusRangeRewrite(opts Options) mux.MiddlewareFunc {
}

logger := opts.InstrumentOpts.Logger()
if err := rewriteRangeDuration(r, mwOpts, logger); err != nil {
if err := RewriteRangeDuration(r, mwOpts, logger); err != nil {
logger.Error("could not rewrite range", zap.Error(err))
xhttp.WriteError(w, err)
return
Expand All @@ -84,7 +96,8 @@ const (
lookbackParam = handleroptions.LookbackParam
)

func rewriteRangeDuration(
// RewriteRangeDuration is the driver function for the PrometheusRangeRewrite middleware
func RewriteRangeDuration(
r *http.Request,
opts PrometheusRangeRewriteOptions,
logger *zap.Logger,
Expand Down Expand Up @@ -118,33 +131,39 @@ func rewriteRangeDuration(
return err
}

attrs, err := store.QueryStorageMetadataAttributes(ctx, params.start, params.end, fetchOpts)
// Get the appropriate time range before updating the lookback
// This is necessary to cover things like the offset and `@` modifiers.
startTime, endTime := getQueryBounds(opts, params, fetchOpts, logger)
res, err := findLargestQueryResolution(ctx, store, fetchOpts, startTime, endTime)
if err != nil {
return err
}

// Find the largest resolution
var res time.Duration
for _, attr := range attrs {
if attr.Resolution > res {
res = attr.Resolution
}
}

// Largest resolution is 0 which means we're routing to the unaggregated namespace.
// Unaggregated namespace can service all requests, so return.
if res == 0 {
return nil
}

// Rewrite ranges within the query, if necessary
updatedLookback, updateLookback := maybeUpdateLookback(params, res, opts)
originalLookback := params.lookback

// We use the lookback as a part of bounds calculation
// If the lookback had changed, we need to recalculate the bounds
if updateLookback {
params.lookback = updatedLookback
startTime, endTime = getQueryBounds(opts, params, fetchOpts, logger)
res, err = findLargestQueryResolution(ctx, store, fetchOpts, startTime, endTime)
if err != nil {
return err
}
}

// parse the query so that we can manipulate it
expr, err := parser.ParseExpr(params.query)
if err != nil {
return err
}

updateQuery, updatedQuery := maybeRewriteRangeInQuery(params.query, expr, res, opts.ResolutionMultiplier)
updateLookback, updatedLookback := maybeUpdateLookback(params, res, opts)
updatedQuery, updateQuery := maybeRewriteRangeInQuery(params.query, expr, res, opts.ResolutionMultiplier)

if !updateQuery && !updateLookback {
return nil
Expand Down Expand Up @@ -183,12 +202,77 @@ func rewriteRangeDuration(
logger.Debug("rewrote duration values in request",
zap.String("originalQuery", params.query),
zap.String("updatedQuery", updatedQuery),
zap.Duration("originalLookback", params.lookback),
zap.Duration("originalLookback", originalLookback),
zap.Duration("updatedLookback", updatedLookback))

return nil
}

func findLargestQueryResolution(ctx context.Context,
store storage.Storage,
fetchOpts *storage.FetchOptions,
startTime time.Time,
endTime time.Time,
) (time.Duration, error) {
attrs, err := store.QueryStorageMetadataAttributes(ctx, startTime, endTime, fetchOpts)
if err != nil {
return 0, err
}

// Find the largest resolution
var res time.Duration
for _, attr := range attrs {
if attr.Resolution > res {
res = attr.Resolution
}
}
return res, nil
}

// Using the prometheus engine in this way should be considered
// optional and best effort. Fall back to the frequently accurate logic
// of using the start and end time in the request
func getQueryBounds(
opts PrometheusRangeRewriteOptions,
params params,
fetchOpts *storage.FetchOptions,
logger *zap.Logger,
) (start time.Time, end time.Time) {
start = params.start
end = params.end
if opts.PrometheusEngineFn == nil {
return start, end
}

lookback := opts.DefaultLookback
if params.isLookbackSet {
lookback = params.lookback
}
engine, err := opts.PrometheusEngineFn(lookback)
if err != nil {
logger.Debug("Found an error when getting a Prom engine to "+
"calculate start/end time for query rewriting. Falling back to request start/end time",
zap.String("originalQuery", params.query),
zap.Duration("lookbackDuration", lookback))
return start, end
}

queryable := fakeQueryable{
engine: engine,
instant: opts.Instant,
}
err = queryable.calculateQueryBounds(params.query, params.start, params.end, fetchOpts.Step)
if err != nil {
logger.Debug("Found an error when using the Prom engine to "+
"calculate start/end time for query rewriting. Falling back to request start/end time",
zap.String("originalQuery", params.query))
return start, end
}
// calculates the query boundaries in roughly the same way as prometheus
start, end = queryable.getQueryBounds()
return start, end
}

type params struct {
query string
start, end time.Time
Expand Down Expand Up @@ -226,7 +310,7 @@ func extractParams(r *http.Request, instant bool) (params, error) {
}, nil
}

func maybeRewriteRangeInQuery(query string, expr parser.Node, res time.Duration, multiplier int) (bool, string) {
func maybeRewriteRangeInQuery(query string, expr parser.Node, res time.Duration, multiplier int) (string, bool) {
updated := false // nolint: ifshort
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
// nolint:gocritic
Expand All @@ -241,16 +325,16 @@ func maybeRewriteRangeInQuery(query string, expr parser.Node, res time.Duration,
})

if updated {
return true, expr.String()
return expr.String(), true
}
return false, query
return query, false
}

func maybeUpdateLookback(
params params,
maxResolution time.Duration,
opts PrometheusRangeRewriteOptions,
) (bool, time.Duration) {
) (time.Duration, bool) {
var (
lookback = params.lookback
resolutionBasedLookback = maxResolution * time.Duration(opts.ResolutionMultiplier) // nolint: durationcheck
Expand All @@ -259,7 +343,48 @@ func maybeUpdateLookback(
lookback = opts.DefaultLookback
}
if lookback < resolutionBasedLookback {
return true, resolutionBasedLookback
return resolutionBasedLookback, true
}
return lookback, false
}

type fakeQueryable struct {
engine *promql.Engine
instant bool
calculatedStartTime time.Time
calculatedEndTime time.Time
}

func (f *fakeQueryable) Querier(ctx context.Context, mint, maxt int64) (promstorage.Querier, error) {
f.calculatedStartTime = xtime.FromUnixMillis(mint)
f.calculatedEndTime = xtime.FromUnixMillis(maxt)
// fail here to cause prometheus to give up on query execution
return nil, errIgnorableQuerierError
}

func (f *fakeQueryable) calculateQueryBounds(
q string,
start time.Time,
end time.Time,
step time.Duration,
) (err error) {
var query promql.Query
if f.instant {
// startTime and endTime are the same for instant queries
query, err = f.engine.NewInstantQuery(f, q, start)
} else {
query, err = f.engine.NewRangeQuery(f, q, start, end, step)
}
if err != nil {
return err
}
// The result returned by Exec will be an error, but that's expected
if res := query.Exec(context.Background()); !errors.Is(res.Err, errIgnorableQuerierError) {
return err
}
return false, lookback
return nil
}

func (f *fakeQueryable) getQueryBounds() (startTime time.Time, endTime time.Time) {
return f.calculatedStartTime, f.calculatedEndTime
}
Loading

0 comments on commit 565c655

Please sign in to comment.