Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow partial traces to be returned #3941

Merged
merged 24 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e4ee781
enable return partial traces
javiermolinar Aug 6, 2024
1dda114
fix bug returning an error on max size reached
javiermolinar Aug 6, 2024
9503bba
test combiner with partial trace
javiermolinar Aug 6, 2024
fa0072d
allow partial traces to be returned from the ingester
javiermolinar Aug 6, 2024
e55af7d
Merge branch 'main' into partial-trace
javiermolinar Aug 12, 2024
b53d3c0
add message to TraceByIdV2 on partial trace returned
javiermolinar Aug 12, 2024
1913f2a
fix test and move allowtraces one level down
javiermolinar Aug 12, 2024
162dcb7
allow partial traces in segment decoder
javiermolinar Aug 12, 2024
6b0f7b6
fix compilation error
javiermolinar Aug 12, 2024
2672183
use an enum to describe better the trace response
javiermolinar Aug 12, 2024
b37d485
revert segment decoder changes
javiermolinar Aug 16, 2024
a684785
Merge branch 'main' into partial-trace
javiermolinar Aug 21, 2024
ffd6285
do not check the trace size at block level
javiermolinar Aug 21, 2024
395f788
changelog
javiermolinar Aug 21, 2024
d74cd59
move short-circuit to the top of the method
javiermolinar Aug 22, 2024
87170a4
simplify partial trace logic
javiermolinar Aug 23, 2024
03dcf62
added missing changelog
javiermolinar Aug 23, 2024
27cfcff
perf improvement by not processing partial traces from queriers
javiermolinar Aug 23, 2024
78de0c8
Merge branch 'main' into partial-trace
javiermolinar Aug 23, 2024
c1ed361
do not replace the trace every time
javiermolinar Aug 23, 2024
597f90f
flag when querier returns a partial trace
javiermolinar Aug 26, 2024
5305d41
Merge branch 'main' into partial-trace
javiermolinar Aug 26, 2024
a1eb08d
Merge branch 'main' into partial-trace
javiermolinar Aug 26, 2024
d90ea83
fix proto
javiermolinar Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error {
}

var (
combiner = trace.NewCombiner(0)
combiner = trace.NewCombiner(0, true)
marshaller = new(jsonpb.Marshaler)
jsonBytes = bytes.Buffer{}
)
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/combiner/trace_by_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type traceByIDCombiner struct {
// - encode the returned trace as either json or proto depending on the request
func NewTraceByID(maxBytes int, contentType string) Combiner {
return &traceByIDCombiner{
c: trace.NewCombiner(maxBytes),
c: trace.NewCombiner(maxBytes, false),
code: http.StatusNotFound,
contentType: contentType,
}
Expand Down
11 changes: 9 additions & 2 deletions modules/frontend/combiner/trace_by_id_v2.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package combiner

import (
"fmt"

"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
)

func NewTraceByIDV2(maxBytes int, marshalingFormat string) Combiner {
combiner := trace.NewCombiner(maxBytes)
combiner := trace.NewCombiner(maxBytes, true)
gc := &genericCombiner[*tempopb.TraceByIDResponse]{
combine: func(partial *tempopb.TraceByIDResponse, _ *tempopb.TraceByIDResponse, _ PipelineResponse) error {
_, err := combiner.Consume(partial.Trace)
Expand All @@ -21,8 +23,13 @@ func NewTraceByIDV2(maxBytes int, marshalingFormat string) Combiner {
// dedupe duplicate span ids
deduper := newDeduper()
traceResult = deduper.dedupe(traceResult)

resp.Trace = traceResult

if combiner.IsPartialTrace() {
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
resp.Status = tempopb.TraceByIDResponse_PARTIAL
resp.Message = fmt.Sprintf("Trace exceeds maximun size of %d bytes, a partial trace is returned", maxBytes)
}

return resp, nil
},
new: func() *tempopb.TraceByIDResponse { return &tempopb.TraceByIDResponse{} },
Expand Down
27 changes: 27 additions & 0 deletions modules/frontend/combiner/trace_by_id_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,33 @@ func (m MockResponse) RequestData() any {
return nil
}

func TestNewTraceByIdV2ReturnsAPartialTrace(t *testing.T) {
traceResponse := &tempopb.TraceByIDResponse{
Trace: test.MakeTrace(2, []byte{0x01, 0x02}),
Metrics: &tempopb.TraceByIDMetrics{},
}
resBytes, err := proto.Marshal(traceResponse)
require.NoError(t, err)
response := http.Response{
StatusCode: 200,
Header: map[string][]string{
"Content-Type": {"application/protobuf"},
},
Body: io.NopCloser(bytes.NewReader(resBytes)),
}
combiner := NewTraceByIDV2(10, api.HeaderAcceptJSON)
err = combiner.AddResponse(MockResponse{&response})
require.NoError(t, err)

res, err := combiner.HTTPFinal()
require.NoError(t, err)

actualResp := &tempopb.TraceByIDResponse{}
err = new(jsonpb.Unmarshaler).Unmarshal(res.Body, actualResp)
require.NoError(t, err)
assert.Equal(t, actualResp.Status, tempopb.TraceByIDResponse_PARTIAL)
}

func TestNewTraceByIDV2(t *testing.T) {
traceResponse := &tempopb.TraceByIDResponse{
Trace: test.MakeTrace(2, []byte{0x01, 0x02}),
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (i *Ingester) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequ
return &tempopb.TraceByIDResponse{}, nil
}

trace, err := inst.FindTraceByID(ctx, req.TraceID)
trace, err := inst.FindTraceByID(ctx, req.TraceID, req.AllowPartialTrace)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (i *instance) addTraceError(errorsByTrace []tempopb.PushErrorReason, pushEr
errorsByTrace = append(errorsByTrace, tempopb.PushErrorReason_UNKNOWN_ERROR)
return errorsByTrace

} else if pushError == nil && len(errorsByTrace) > 0 {
} else if len(errorsByTrace) > 0 {
errorsByTrace = append(errorsByTrace, tempopb.PushErrorReason_NO_ERROR)
}

Expand Down Expand Up @@ -405,7 +405,7 @@ func (i *instance) ClearFlushedBlocks(completeBlockTimeout time.Duration) error
return err
}

func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace, error) {
func (i *instance) FindTraceByID(ctx context.Context, id []byte, allowPartialTrace bool) (*tempopb.Trace, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.FindTraceByID")
defer span.Finish()

Expand All @@ -426,7 +426,7 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace
maxBytes := i.limiter.limits.MaxBytesPerTrace(i.instanceID)
searchOpts := common.DefaultSearchOptionsWithMaxBytes(maxBytes)

combiner := trace.NewCombiner(maxBytes)
combiner := trace.NewCombiner(maxBytes, allowPartialTrace)
_, err = combiner.Consume(completeTrace)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func TestInstanceSearchDoesNotRace(t *testing.T) {
})

go concurrent(func() {
_, err := i.FindTraceByID(context.Background(), []byte{0x01})
_, err := i.FindTraceByID(context.Background(), []byte{0x01}, false)
assert.NoError(t, err, "error finding trace by id")
})

Expand Down
18 changes: 9 additions & 9 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func pushTracesToInstance(t *testing.T, i *instance, numTraces int) ([]*tempopb.

func queryAll(t *testing.T, i *instance, ids [][]byte, traces []*tempopb.Trace) {
for j, id := range ids {
trace, err := i.FindTraceByID(context.Background(), id)
trace, err := i.FindTraceByID(context.Background(), id, false)
require.NoError(t, err)
require.Equal(t, traces[j], trace)
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestInstanceDoesNotRace(t *testing.T) {
})

go concurrent(func() {
_, err := i.FindTraceByID(context.Background(), []byte{0x01})
_, err := i.FindTraceByID(context.Background(), []byte{0x01}, false)
require.NoError(t, err, "error finding trace by id")
})

Expand Down Expand Up @@ -625,25 +625,25 @@ func TestInstancePartialSuccess(t *testing.T) {
assert.Equal(t, true, traceTooLargeCount > 0)

// check that the two good ones actually made it
result, err := i.FindTraceByID(ctx, ids[0])
result, err := i.FindTraceByID(ctx, ids[0], false)
require.NoError(t, err, "error finding trace by id")
assert.Equal(t, 1, len(result.ResourceSpans))

result, err = i.FindTraceByID(ctx, ids[3])
result, err = i.FindTraceByID(ctx, ids[3], false)
require.NoError(t, err, "error finding trace by id")
assert.Equal(t, 1, len(result.ResourceSpans))

// check that the three traces that had errors did not actually make it
var expected *tempopb.Trace
result, err = i.FindTraceByID(ctx, ids[1])
result, err = i.FindTraceByID(ctx, ids[1], false)
require.NoError(t, err, "error finding trace by id")
assert.Equal(t, expected, result)

result, err = i.FindTraceByID(ctx, ids[2])
result, err = i.FindTraceByID(ctx, ids[2], false)
require.NoError(t, err, "error finding trace by id")
assert.Equal(t, expected, result)

result, err = i.FindTraceByID(ctx, ids[4])
result, err = i.FindTraceByID(ctx, ids[4], false)
require.NoError(t, err, "error finding trace by id")
assert.Equal(t, expected, result)
}
Expand Down Expand Up @@ -745,7 +745,7 @@ func BenchmarkInstanceFindTraceByIDFromCompleteBlock(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
trace, err := instance.FindTraceByID(context.Background(), traceID)
trace, err := instance.FindTraceByID(context.Background(), traceID, false)
require.NotNil(b, trace)
require.NoError(b, err)
}
Expand Down Expand Up @@ -899,7 +899,7 @@ func BenchmarkInstanceContention(t *testing.B) {
})

go concurrent(func() {
_, err := i.FindTraceByID(ctx, []byte{0x01})
_, err := i.FindTraceByID(ctx, []byte{0x01}, false)
require.NoError(t, err, "error finding trace by id")
finds++
})
Expand Down
4 changes: 2 additions & 2 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {
BlockStart: blockStart,
BlockEnd: blockEnd,
QueryMode: queryMode,
}, timeStart, timeEnd)
}, timeStart, timeEnd, false)
if err != nil {
handleError(w, err)
return
Expand Down Expand Up @@ -112,7 +112,7 @@ func (q *Querier) TraceByIDHandlerV2(w http.ResponseWriter, r *http.Request) {
BlockStart: blockStart,
BlockEnd: blockEnd,
QueryMode: queryMode,
}, timeStart, timeEnd)
}, timeStart, timeEnd, true)
if err != nil {
handleError(w, err)
return
Expand Down
17 changes: 12 additions & 5 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (q *Querier) stopping(_ error) error {
}

// FindTraceByID implements tempopb.Querier.
func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequest, timeStart int64, timeEnd int64) (*tempopb.TraceByIDResponse, error) {
func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequest, timeStart int64, timeEnd int64, allowPartialTrace bool) (*tempopb.TraceByIDResponse, error) {
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
if !validation.ValidTraceID(req.TraceID) {
return nil, errors.New("invalid trace id")
}
Expand All @@ -237,7 +237,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
span.SetTag("queryMode", req.QueryMode)

maxBytes := q.limits.MaxBytesPerTrace(userID)
combiner := trace.NewCombiner(maxBytes)
combiner := trace.NewCombiner(maxBytes, allowPartialTrace)

var spanCount, spanCountTotal, traceCountTotal int
if req.QueryMode == QueryModeIngesters || req.QueryMode == QueryModeAll {
Expand All @@ -252,6 +252,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
// get responses from all ingesters in parallel
span.LogFields(ot_log.String("msg", "searching ingesters"))
responses, err := q.forIngesterRings(ctx, userID, getRSFn, func(funcCtx context.Context, client tempopb.QuerierClient) (interface{}, error) {
req.AllowPartialTrace = allowPartialTrace
return client.FindTraceByID(funcCtx, req)
})
if err != nil {
Expand Down Expand Up @@ -309,11 +310,17 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
}

completeTrace, _ := combiner.Result()

return &tempopb.TraceByIDResponse{
resp := &tempopb.TraceByIDResponse{
Trace: completeTrace,
Metrics: &tempopb.TraceByIDMetrics{},
}, nil
}

if combiner.IsPartialTrace() {
resp.Status = tempopb.TraceByIDResponse_PARTIAL
resp.Message = fmt.Sprintf("Trace exceeds maximun size of %d bytes, a partial trace is returned", maxBytes)
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
}

return resp, nil
}

type (
Expand Down
51 changes: 38 additions & 13 deletions pkg/model/trace/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,20 @@ var ErrTraceTooLarge = fmt.Errorf("trace exceeds max size")
// * Only sort the final result once and if needed.
// * Don't scan/hash the spans for the last input (final=true).
type Combiner struct {
result *tempopb.Trace
spans map[token]struct{}
combined bool
maxSizeBytes int
result *tempopb.Trace
spans map[token]struct{}
combined bool
maxSizeBytes int
allowPartialTrace bool
maxTraceSizeReached bool
}

func NewCombiner(maxSizeBytes int) *Combiner {
// It creates a new Trace combiner. If maxSizeBytes is 0, the final trace size is not checked
// when allowPartialTrace is set to true a partial trace that exceed the max size may be returned
func NewCombiner(maxSizeBytes int, allowPartialTrace bool) *Combiner {
return &Combiner{
maxSizeBytes: maxSizeBytes,
maxSizeBytes: maxSizeBytes,
allowPartialTrace: allowPartialTrace,
}
}

Expand All @@ -63,7 +68,7 @@ func (c *Combiner) Consume(tr *tempopb.Trace) (int, error) {
func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error) {
var spanCount int
if tr == nil {
return spanCount, c.sizeError()
return spanCount, nil
}

h := newHash()
Expand All @@ -90,9 +95,17 @@ func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error)
}
}
}
return spanCount, c.sizeError()
moveForward, maxSizeErr := c.canContinue()
if moveForward {
return spanCount, nil
}
return spanCount, maxSizeErr
}

// Do not combine more spans for now
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
if c.maxTraceSizeReached && c.allowPartialTrace {
return spanCount, nil
}
// loop through every span and copy spans in B that don't exist to A
for _, b := range tr.ResourceSpans {
notFoundILS := b.ScopeSpans[:0]
Expand Down Expand Up @@ -129,19 +142,26 @@ func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error)
}

c.combined = true
return spanCount, c.sizeError()
moveForward, maxSizeErr := c.canContinue()
if moveForward {
return spanCount, nil
}

return spanCount, maxSizeErr
}

func (c *Combiner) sizeError() error {
func (c *Combiner) canContinue() (bool, error) {
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
if c.result == nil || c.maxSizeBytes <= 0 {
return nil
return true, nil
}

if c.result.Size() > c.maxSizeBytes {
return fmt.Errorf("%w (max bytes: %d)", ErrTraceTooLarge, c.maxSizeBytes)
// To avoid recalculing the size
c.maxTraceSizeReached = true
return c.allowPartialTrace, fmt.Errorf("%w (max bytes: %d)", ErrTraceTooLarge, c.maxSizeBytes)
}

return nil
return true, nil
}

// Result returns the final trace and span count.
Expand All @@ -156,3 +176,8 @@ func (c *Combiner) Result() (*tempopb.Trace, int) {

return c.result, spanCount
}

// Returns true if the comnbined trace is a partial one
func (c *Combiner) IsPartialTrace() bool {
return c.maxTraceSizeReached && c.allowPartialTrace
}
Loading
Loading