Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup DistinctValue collector and exit early for ingesters #4104

Merged
merged 8 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero)
* [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero)
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/combiner/search_tag_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func NewTypedSearchTagValues(limitBytes int) GRPCCombiner[*tempopb.SearchTagValu
}

func NewSearchTagValuesV2(limitBytes int) Combiner {
// Distinct collector with no limit
d := collector.NewDistinctValue(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) })
// Distinct collector with no limit and diff enabled
d := collector.NewDistinctValueWithDiff(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) })

return &genericCombiner[*tempopb.SearchTagValuesV2Response]{
httpStatusCode: 200,
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
}

if valueCollector.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", valueCollector.TotalDataSize())
_ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", valueCollector.Size())
}

resp := &tempopb.SearchTagValuesV2Response{}
Expand Down
45 changes: 43 additions & 2 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,9 @@ func (q *Querier) SearchTagsBlocks(ctx context.Context, req *tempopb.SearchTagsB

for _, t := range s.Tags {
distinctValues.Collect(t)
if distinctValues.Exceeded() {
break // stop early
}
}
}

Expand Down Expand Up @@ -539,9 +542,17 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest
if err != nil {
return nil, fmt.Errorf("error querying ingesters in Querier.SearchTags: %w", err)
}

stopEarly := false
for _, resp := range lookupResults {
for _, res := range resp.response.(*tempopb.SearchTagsResponse).TagNames {
distinctValues.Collect(res)
if distinctValues.Exceeded() {
stopEarly = true
electron0zero marked this conversation as resolved.
Show resolved Hide resolved
}
}
if stopEarly {
break
}
}

Expand Down Expand Up @@ -573,12 +584,23 @@ func (q *Querier) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsReque
limit := q.limits.MaxBytesPerTagValuesQuery(userID)
distinctValues := collector.NewScopedDistinctString(limit)

stopEarly := false
for _, resp := range lookupResults {
for _, res := range resp.response.(*tempopb.SearchTagsV2Response).Scopes {
for _, tag := range res.Tags {
distinctValues.Collect(res.Name, tag)
if distinctValues.Exceeded() {
stopEarly = true
electron0zero marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
if stopEarly {
break
}
}
if stopEarly {
break
}
}

if distinctValues.Exceeded() {
Expand Down Expand Up @@ -610,6 +632,7 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal

// Virtual tags values. Get these first.
for _, v := range search.GetVirtualTagValues(req.TagName) {
// virtual tags are small so no need to stopEarly here
distinctValues.Collect(v)
}

Expand All @@ -619,9 +642,18 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal
if err != nil {
return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err)
}

stopEarly := false
for _, resp := range lookupResults {
for _, res := range resp.response.(*tempopb.SearchTagValuesResponse).TagValues {
distinctValues.Collect(res)
if distinctValues.Exceeded() {
stopEarly = true
electron0zero marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
if stopEarly {
break
}
}

Expand All @@ -648,6 +680,7 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV
// Virtual tags values. Get these first.
virtualVals := search.GetVirtualTagValuesV2(req.TagName)
for _, v := range virtualVals {
// no need to stop early here, virtual tags are small
distinctValues.Collect(v)
}

Expand All @@ -664,14 +697,22 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV
if err != nil {
return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err)
}

stopEarly := false
for _, resp := range lookupResults {
for _, res := range resp.response.(*tempopb.SearchTagValuesV2Response).TagValues {
distinctValues.Collect(*res)
if distinctValues.Collect(*res) {
stopEarly = true
break
}
}
if stopEarly {
break
}
}

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
_ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", distinctValues.Size())
}

return valuesToV2Response(distinctValues), nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/collector/distinct_string_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func NewDistinctString(maxDataSize int) *DistinctString {
}
}

// Collect adds a new value to the distinct string collector.
// return indicates if the value was added or not.
func (d *DistinctString) Collect(s string) bool {
if _, ok := d.values[s]; ok {
// Already present
Expand Down
101 changes: 62 additions & 39 deletions pkg/collector/distinct_value_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,66 +5,83 @@ import (
)

type DistinctValue[T comparable] struct {
values map[T]struct{}
new map[T]struct{}
len func(T) int
maxLen int
currLen int
totalLen int
mtx sync.RWMutex
values map[T]struct{}
new map[T]struct{}
len func(T) int
maxLen int
currLen int
limExceeded bool
diffEnabled bool
mtx sync.Mutex
}

// NewDistinctValue with the given maximum data size. This is calculated
// as the total length of the recorded strings. For ease of use, maximum=0
// is interpreted as unlimited.
// Use NewDistinctValueWithDiff to enable diff support, but that one is slightly slower.
func NewDistinctValue[T comparable](maxDataSize int, len func(T) int) *DistinctValue[T] {
return &DistinctValue[T]{
values: make(map[T]struct{}),
new: make(map[T]struct{}),
maxLen: maxDataSize,
len: len,
values: make(map[T]struct{}),
new: make(map[T]struct{}),
maxLen: maxDataSize,
diffEnabled: false, // disable diff to make it faster
len: len,
}
}

// NewDistinctValueWithDiff is like NewDistinctValue but with diff support enabled.
func NewDistinctValueWithDiff[T comparable](maxDataSize int, len func(T) int) *DistinctValue[T] {
return &DistinctValue[T]{
values: make(map[T]struct{}),
new: make(map[T]struct{}),
maxLen: maxDataSize,
diffEnabled: true,
len: len,
}
}

// Collect adds a new value to the distinct value collector.
// return true when it reaches the limits and can't fit more values.
// callers of return of Collect or call Exceeded to stop early.
func (d *DistinctValue[T]) Collect(v T) (exceeded bool) {
d.mtx.RLock()
if _, ok := d.values[v]; ok {
d.mtx.RUnlock()
return // Already present
d.mtx.Lock()
defer d.mtx.Unlock()

if d.limExceeded {
return true
}
d.mtx.RUnlock()

// Calculate length
valueLen := d.len(v)

d.mtx.Lock()
defer d.mtx.Unlock()
// Can it fit?
// note: we will stop adding values slightly before the limit is reached
if d.maxLen > 0 && d.currLen+valueLen >= d.maxLen {
// No, it can't fit
d.limExceeded = true
return true
}

if _, ok := d.values[v]; ok {
return // Already present
}

// Record total inspected length regardless
d.totalLen += valueLen

// Can it fit?
if d.maxLen > 0 && d.currLen+valueLen > d.maxLen {
// No
return true
if d.diffEnabled {
d.new[v] = struct{}{}
}

d.new[v] = struct{}{}
d.values[v] = struct{}{}
d.currLen += valueLen

return false
}

// Values returns the final list of distinct values collected and sorted.
func (d *DistinctValue[T]) Values() []T {
ss := make([]T, 0, len(d.values))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joe-elliott I think we have the same subtle bug here as well, that we had in Diff.

we are checking the len of a member var without locking. I think we got lucky here because we usualy call Values after we are done collecting.

locking this as well.


d.mtx.RLock()
defer d.mtx.RUnlock()
d.mtx.Lock()
defer d.mtx.Unlock()

for k := range d.values {
ss = append(ss, k)
Expand All @@ -73,26 +90,32 @@ func (d *DistinctValue[T]) Values() []T {
return ss
}

// Exceeded indicates if some values were lost because the maximum size limit was met.
// Exceeded indicates that we have exceeded the limit
// can be used to stop early and to avoid collecting further values
func (d *DistinctValue[T]) Exceeded() bool {
d.mtx.RLock()
defer d.mtx.RUnlock()
return d.totalLen > d.currLen
d.mtx.Lock()
defer d.mtx.Unlock()
return d.limExceeded
}

// TotalDataSize is the total size of all distinct strings encountered.
func (d *DistinctValue[T]) TotalDataSize() int {
d.mtx.RLock()
defer d.mtx.RUnlock()
return d.totalLen
// Size is the total size of all distinct items collected
func (d *DistinctValue[T]) Size() int {
d.mtx.Lock()
defer d.mtx.Unlock()
return d.currLen
}

// Diff returns all new strings collected since the last time diff was called
// returns nil if diff is not enabled
func (d *DistinctValue[T]) Diff() []T {
if !d.diffEnabled {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

ss := make([]T, 0, len(d.new))
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved

d.mtx.RLock()
defer d.mtx.RUnlock()
d.mtx.Lock()
defer d.mtx.Unlock()

for k := range d.new {
ss = append(ss, k)
Expand Down
46 changes: 45 additions & 1 deletion pkg/collector/distinct_value_collector_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package collector

import (
"fmt"
"sort"
"strconv"
"testing"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/require"
)

func TestDistinctValueCollectorDiff(t *testing.T) {
d := NewDistinctValue[string](0, func(s string) int { return len(s) })
d := NewDistinctValueWithDiff[string](0, func(s string) int { return len(s) })

d.Collect("123")
d.Collect("4567")
Expand All @@ -28,3 +31,44 @@ func stringsSlicesEqual(t *testing.T, a, b []string) {
sort.Strings(b)
require.Equal(t, a, b)
}

func BenchmarkCollect(b *testing.B) {
// simulate 100 ingesters, each returning 10_000 tag values
numIngesters := 100
numTagValuesPerIngester := 10_000
ingesterTagValues := make([][]tempopb.TagValue, numIngesters)
for i := 0; i < numIngesters; i++ {
tagValues := make([]tempopb.TagValue, numTagValuesPerIngester)
for j := 0; j < numTagValuesPerIngester; j++ {
tagValues[j] = tempopb.TagValue{
Type: "string",
Value: fmt.Sprintf("value_%d_%d", i, j),
}
}
ingesterTagValues[i] = tagValues
}

limits := []int{
0, // no limit
100_000, // 100KB
1_000_000, // 1MB
10_000_000, // 10MB
}

b.ResetTimer() // to exclude the setup time for generating tag values
for _, lim := range limits {
b.Run("limit:"+strconv.Itoa(lim), func(b *testing.B) {
for n := 0; n < b.N; n++ {
// NewDistinctValue is collecting tag values without diff support
distinctValues := NewDistinctValue(lim, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) })
for _, tagValues := range ingesterTagValues {
for _, v := range tagValues {
if distinctValues.Collect(v) {
break // stop early if limit is reached
}
}
}
}
})
}
}