Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reduce GC by not passing the whole struct to compile and process TraceQL metrics #4029

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions modules/frontend/combiner/metrics_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
var _ GRPCCombiner[*tempopb.QueryRangeResponse] = (*genericCombiner[*tempopb.QueryRangeResponse])(nil)

// NewQueryRange returns a query range combiner.
func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, error) {
combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal, trackDiffs)
func NewQueryRange(start, end, step uint64, query string, trackDiffs bool) (Combiner, error) {
combiner, err := traceql.QueryRangeCombinerFor(start, end, step, query, traceql.AggregateModeFinal, trackDiffs)
if err != nil {
return nil, err
}
Expand All @@ -21,7 +21,7 @@ func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, e
httpStatusCode: 200,
new: func() *tempopb.QueryRangeResponse { return &tempopb.QueryRangeResponse{} },
current: &tempopb.QueryRangeResponse{Metrics: &tempopb.SearchMetrics{}},
combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, resp PipelineResponse) error {
combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, _ PipelineResponse) error {
if partial.Metrics != nil {
// this is a coordination between the sharder and combiner. the sharder returns one response with summary metrics
// only. the combiner correctly takes and accumulates that job. however, if the response has no jobs this is
Expand Down Expand Up @@ -54,8 +54,8 @@ func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, e
}, nil
}

func NewTypedQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (GRPCCombiner[*tempopb.QueryRangeResponse], error) {
c, err := NewQueryRange(req, trackDiffs)
func NewTypedQueryRange(start, end, step uint64, query string, trackDiffs bool) (GRPCCombiner[*tempopb.QueryRangeResponse], error) {
c, err := NewQueryRange(start, end, step, query, trackDiffs)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/metrics_query_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newQueryInstantStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTri
httpReq = httpReq.Clone(ctx)

var finalResponse *tempopb.QueryInstantResponse
c, err := combiner.NewTypedQueryRange(qr, true)
c, err := combiner.NewTypedQueryRange(qr.Start, qr.End, qr.Step, qr.Query, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func newMetricsQueryInstantHTTPHandler(cfg Config, next pipeline.AsyncRoundTripp
req.URL.Path = strings.ReplaceAll(req.URL.Path, api.PathMetricsQueryInstant, api.PathMetricsQueryRange)
req = api.BuildQueryRangeRequest(req, qr)

combiner, err := combiner.NewTypedQueryRange(qr, false)
combiner, err := combiner.NewTypedQueryRange(qr.Start, qr.End, qr.Step, qr.Query, false)
if err != nil {
level.Error(logger).Log("msg", "query instant: query range combiner failed", "err", err)
return &http.Response{
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp
start := time.Now()

var finalResponse *tempopb.QueryRangeResponse
c, err := combiner.NewTypedQueryRange(req, true)
c, err := combiner.NewTypedQueryRange(req.Start, req.End, req.Step, req.Query, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper
logQueryRangeRequest(logger, tenant, queryRangeReq)

// build and use roundtripper
combiner, err := combiner.NewTypedQueryRange(queryRangeReq, false)
combiner, err := combiner.NewTypedQueryRange(queryRangeReq.Start, queryRangeReq.End, queryRangeReq.Step, queryRangeReq.Query, false)
if err != nil {
level.Error(logger).Log("msg", "query range: query range combiner failed", "err", err)
return &http.Response{
Expand Down
6 changes: 3 additions & 3 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
return pipeline.NewBadRequest(errors.New("step must be greater than 0")), nil
}

traceql.AlignRequest(req)
req.Start, req.End = traceql.AlignRequest(req.Start, req.End, req.Step)

// calculate and enforce max search duration
// Note: this is checked after alignment for consistency.
Expand Down Expand Up @@ -168,7 +168,7 @@ func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string
// Make a copy and limit to backend time range.
// Preserve instant nature of request if needed
backendReq := searchReq
traceql.TrimToBefore(&backendReq, cutoff)
backendReq.Start, backendReq.End = traceql.TrimToBefore(backendReq.Start, backendReq.End, backendReq.Step, cutoff)

// If empty window then no need to search backend
if backendReq.Start == backendReq.End {
Expand Down Expand Up @@ -292,7 +292,7 @@ func max(a, b uint32) uint32 {
}

func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, parent *http.Request, tenantID string, cutoff time.Time) *http.Request {
traceql.TrimToAfter(&searchReq, cutoff)
searchReq.Start, searchReq.End = traceql.TrimToAfter(searchReq.Start, searchReq.End, searchReq.Step, cutoff)

// if start == end then we don't need to query it
if searchReq.Start == searchReq.End {
Expand Down
2 changes: 1 addition & 1 deletion modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (i *instance) QueryRange(ctx context.Context, req *tempopb.QueryRangeReques
return resp, err
}

rr := r.ToProto(req)
rr := r.ToProto(req.Start, req.End, req.Step)
return &tempopb.QueryRangeResponse{
Series: rr,
}, nil
Expand Down
10 changes: 5 additions & 5 deletions modules/generator/processor/localblocks/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeReque
// Compile the raw version of the query for wal blocks
// These aren't cached and we put them all into the same evaluator
// for efficiency.
eval, err := e.CompileMetricsQueryRange(req, int(req.Exemplars), timeOverlapCutoff, unsafe)
eval, err := e.CompileMetricsQueryRange(req.Start, req.End, req.Step, req.Query, int(req.Exemplars), timeOverlapCutoff, unsafe)
if err != nil {
return nil, err
}
Expand All @@ -64,7 +64,7 @@ func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeReque
// which can be cached. But we need their results separately so they are
// computed separately.
overallEvalMtx := sync.Mutex{}
overallEval, err := traceql.NewEngine().CompileMetricsQueryRangeNonRaw(req, traceql.AggregateModeSum)
overallEval, err := traceql.NewEngine().CompileMetricsQueryRangeNonRaw(req.Start, req.End, req.Step, req.Query, traceql.AggregateModeSum)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeReque
}

// Combine the uncacheable results into the overall results
walResults := eval.Results().ToProto(req)
walResults := eval.Results().ToProto(req.Start, req.End, req.Step)
overallEval.ObserveSeries(walResults)

return overallEval.Results(), nil
Expand Down Expand Up @@ -192,7 +192,7 @@ func (p *Processor) queryRangeCompleteBlock(ctx context.Context, b *ingester.Loc
}

// Not in cache or not cacheable, so execute
eval, err := traceql.NewEngine().CompileMetricsQueryRange(&req, exemplars, timeOverlapCutoff, unsafe)
eval, err := traceql.NewEngine().CompileMetricsQueryRange(req.Start, req.End, req.Step, req.Query, exemplars, timeOverlapCutoff, unsafe)
if err != nil {
return nil, err
}
Expand All @@ -204,7 +204,7 @@ func (p *Processor) queryRangeCompleteBlock(ctx context.Context, b *ingester.Loc
return nil, err
}

results := eval.Results().ToProto(&req)
results := eval.Results().ToProto(req.Start, req.End, req.Step)

if name != "" {
err = p.queryRangeCacheSet(ctx, m, name, &tempopb.QueryRangeResponse{
Expand Down
4 changes: 2 additions & 2 deletions modules/querier/querier_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (q *Querier) queryRangeRecent(ctx context.Context, req *tempopb.QueryRangeR
return nil, fmt.Errorf("error querying generators in Querier.queryRangeRecent: %w", err)
}

c, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeSum, false)
c, err := traceql.QueryRangeCombinerFor(req.Start, req.End, req.Step, req.Query, traceql.AggregateModeSum, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (q *Querier) queryBlock(ctx context.Context, req *tempopb.QueryRangeRequest
timeOverlapCutoff = v
}

eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, int(req.Exemplars), timeOverlapCutoff, unsafe)
eval, err := traceql.NewEngine().CompileMetricsQueryRange(req.Start, req.End, req.Step, req.Query, int(req.Exemplars), timeOverlapCutoff, unsafe)
if err != nil {
return nil, err
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Element interface {
type metricsFirstStageElement interface {
Element
extractConditions(request *FetchSpansRequest)
init(req *tempopb.QueryRangeRequest, mode AggregateMode)
init(start, end, step uint64, mode AggregateMode)
observe(Span) // TODO - batching?
observeExemplar(Span)
observeSeries([]*tempopb.TimeSeries) // Re-entrant metrics on the query-frontend. Using proto version for efficiency
Expand Down Expand Up @@ -1095,14 +1095,14 @@ func (a *MetricsAggregate) extractConditions(request *FetchSpansRequest) {
}
}

func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode) {
func (a *MetricsAggregate) init(start, end, step uint64, mode AggregateMode) {
switch mode {
case AggregateModeSum:
a.initSum(q)
a.initSum(start, end, step)
return

case AggregateModeFinal:
a.initFinal(q)
a.initFinal(start, end, step)
return
}

Expand All @@ -1121,7 +1121,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
}

case metricsAggregateRate:
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) }
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(step).Seconds()) }
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}
Expand Down Expand Up @@ -1149,7 +1149,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
}

a.agg = NewGroupingAggregator(a.op.String(), func() RangeAggregator {
return NewStepAggregator(q.Start, q.End, q.Step, innerAgg)
return NewStepAggregator(start, end, step, innerAgg)
}, a.by, byFunc, byFuncLabel)
a.exemplarFn = exemplarFn
}
Expand Down Expand Up @@ -1210,19 +1210,19 @@ func (a *MetricsAggregate) bucketizeAttribute(s Span) (Static, bool) {
}
}

func (a *MetricsAggregate) initSum(q *tempopb.QueryRangeRequest) {
func (a *MetricsAggregate) initSum(start, end, step uint64) {
// Currently all metrics are summed by job to produce
// intermediate results. This will change when adding min/max/topk/etc
a.seriesAgg = NewSimpleAdditionCombiner(q)
a.seriesAgg = NewSimpleAdditionCombiner(start, end, step)
}

func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) {
func (a *MetricsAggregate) initFinal(start, end, step uint64) {
switch a.op {
case metricsAggregateQuantileOverTime:
a.seriesAgg = NewHistogramAggregator(q, a.floats)
a.seriesAgg = NewHistogramAggregator(start, end, step, a.floats)
default:
// These are simple additions by series
a.seriesAgg = NewSimpleAdditionCombiner(q)
a.seriesAgg = NewSimpleAdditionCombiner(start, end, step)
}
}

Expand Down
24 changes: 14 additions & 10 deletions pkg/traceql/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ type tsRange struct {
}

type QueryRangeCombiner struct {
req *tempopb.QueryRangeRequest
start uint64
end uint64
step uint64
eval *MetricsFrontendEvaluator
metrics *tempopb.SearchMetrics

Expand All @@ -156,8 +158,8 @@ type QueryRangeCombiner struct {
seriesUpdated map[string]tsRange
}

func QueryRangeCombinerFor(req *tempopb.QueryRangeRequest, mode AggregateMode, trackDiffs bool) (*QueryRangeCombiner, error) {
eval, err := NewEngine().CompileMetricsQueryRangeNonRaw(req, mode)
func QueryRangeCombinerFor(start, end, step uint64, query string, mode AggregateMode, trackDiffs bool) (*QueryRangeCombiner, error) {
eval, err := NewEngine().CompileMetricsQueryRangeNonRaw(start, end, step, query, mode)
if err != nil {
return nil, err
}
Expand All @@ -168,7 +170,9 @@ func QueryRangeCombinerFor(req *tempopb.QueryRangeRequest, mode AggregateMode, t
}

return &QueryRangeCombiner{
req: req,
start: start,
end: end,
step: step,
eval: eval,
metrics: &tempopb.SearchMetrics{},
seriesUpdated: seriesUpdated,
Expand Down Expand Up @@ -199,7 +203,7 @@ func (q *QueryRangeCombiner) Combine(resp *tempopb.QueryRangeResponse) {

func (q *QueryRangeCombiner) Response() *tempopb.QueryRangeResponse {
return &tempopb.QueryRangeResponse{
Series: q.eval.Results().ToProto(q.req),
Series: q.eval.Results().ToProto(q.start, q.end, q.step),
Metrics: q.metrics,
}
}
Expand All @@ -216,7 +220,7 @@ func (q *QueryRangeCombiner) Diff() *tempopb.QueryRangeResponse {

// filter out series that haven't change
resp := &tempopb.QueryRangeResponse{
Series: q.eval.Results().ToProtoDiff(q.req, seriesRangeFn),
Series: q.eval.Results().ToProtoDiff(q.start, q.end, q.step, seriesRangeFn),
Metrics: q.metrics,
}

Expand All @@ -240,11 +244,11 @@ func (q *QueryRangeCombiner) markUpdatedRanges(resp *tempopb.QueryRangeResponse)
// Normalize into request alignment by converting timestamp into index and back
// TimestampMs may not match exactly when we trim things around blocks, and the generators
// This is mainly for instant queries that have large steps and few samples.
idxMin := IntervalOfMs(series.Samples[0].TimestampMs, q.req.Start, q.req.End, q.req.Step)
idxMax := IntervalOfMs(series.Samples[len(series.Samples)-1].TimestampMs, q.req.Start, q.req.End, q.req.Step)
idxMin := IntervalOfMs(series.Samples[0].TimestampMs, q.start, q.end, q.step)
idxMax := IntervalOfMs(series.Samples[len(series.Samples)-1].TimestampMs, q.start, q.end, q.step)

nanoMin := TimestampOf(uint64(idxMin), q.req.Start, q.req.Step)
nanoMax := TimestampOf(uint64(idxMax), q.req.Start, q.req.Step)
nanoMin := TimestampOf(uint64(idxMin), q.start, q.step)
nanoMax := TimestampOf(uint64(idxMax), q.start, q.step)

tsr, ok := q.seriesUpdated[series.PromLabels]
if !ok {
Expand Down
10 changes: 3 additions & 7 deletions pkg/traceql/combine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,9 @@ func TestQueryRangeCombinerDiffs(t *testing.T) {
},
}

req := &tempopb.QueryRangeRequest{
Start: start,
End: end,
Step: step,
Query: "{} | rate()", // simple aggregate
}
combiner, err := QueryRangeCombinerFor(req, AggregateModeFinal, true)
query := "{} | rate()" // simple aggregate

combiner, err := QueryRangeCombinerFor(start, end, step, query, AggregateModeFinal, true)
require.NoError(t, err)

for i, tc := range tcs {
Expand Down
Loading