From f6dad821cb69ddd54522ffc813343a7c9c2ad6f8 Mon Sep 17 00:00:00 2001 From: Suraj Nath <9503187+electron0zero@users.noreply.github.com> Date: Fri, 20 Sep 2024 18:55:12 +0530 Subject: [PATCH 1/8] make the collector go fast... --- pkg/collector/distinct_value_collector.go | 101 +++++++++++------- .../distinct_value_collector_test.go | 44 ++++++++ pkg/collector/generic_collector.go | 34 ++++++ 3 files changed, 140 insertions(+), 39 deletions(-) create mode 100644 pkg/collector/generic_collector.go diff --git a/pkg/collector/distinct_value_collector.go b/pkg/collector/distinct_value_collector.go index 5b8ca29608f..f46f65fd521 100644 --- a/pkg/collector/distinct_value_collector.go +++ b/pkg/collector/distinct_value_collector.go @@ -5,57 +5,74 @@ import ( ) type DistinctValue[T comparable] struct { - values map[T]struct{} - new map[T]struct{} - len func(T) int - maxLen int - currLen int - totalLen int - mtx sync.RWMutex + values map[T]struct{} + new map[T]struct{} + len func(T) int + maxLen int + currLen int + limExceeded bool + diffEnabled bool + mtx sync.Mutex } // NewDistinctValue with the given maximum data size. This is calculated // as the total length of the recorded strings. For ease of use, maximum=0 // is interpreted as unlimited. +// Use NewDistinctValueWithDiff to enable diff support, but that one is slightly slower. func NewDistinctValue[T comparable](maxDataSize int, len func(T) int) *DistinctValue[T] { return &DistinctValue[T]{ - values: make(map[T]struct{}), - new: make(map[T]struct{}), - maxLen: maxDataSize, - len: len, + values: make(map[T]struct{}), + new: make(map[T]struct{}), + maxLen: maxDataSize, + diffEnabled: false, // disable diff to make it faster + len: len, } } +// NewDistinctValueWithDiff is like NewDistinctValue but with diff support enabled. +func NewDistinctValueWithDiff[T comparable](maxDataSize int, len func(T) int) *DistinctValue[T] { + return &DistinctValue[T]{ + values: make(map[T]struct{}), + new: make(map[T]struct{}), + maxLen: maxDataSize, + diffEnabled: true, + len: len, + } +} + +// Collect adds a new value to the distinct value collector. +// return true when it reaches the limits and can't fit more values. +// callers of return of Collect or call Exceeded to stop early. func (d *DistinctValue[T]) Collect(v T) (exceeded bool) { - d.mtx.RLock() - if _, ok := d.values[v]; ok { - d.mtx.RUnlock() - return // Already present + d.mtx.Lock() + defer d.mtx.Unlock() + + if d.limExceeded { + return true } - d.mtx.RUnlock() // Calculate length valueLen := d.len(v) - d.mtx.Lock() - defer d.mtx.Unlock() + // Can it fit? + // note: we will stop adding values slightly before the limit is reached + if d.maxLen > 0 && d.currLen+valueLen >= d.maxLen { + // No, it can't fit + exceeded = true + return true + } if _, ok := d.values[v]; ok { return // Already present } - // Record total inspected length regardless - d.totalLen += valueLen - - // Can it fit? - if d.maxLen > 0 && d.currLen+valueLen > d.maxLen { - // No - return true + if d.diffEnabled { + d.new[v] = struct{}{} } - d.new[v] = struct{}{} d.values[v] = struct{}{} d.currLen += valueLen + return false } @@ -63,8 +80,8 @@ func (d *DistinctValue[T]) Collect(v T) (exceeded bool) { func (d *DistinctValue[T]) Values() []T { ss := make([]T, 0, len(d.values)) - d.mtx.RLock() - defer d.mtx.RUnlock() + d.mtx.Lock() + defer d.mtx.Unlock() for k := range d.values { ss = append(ss, k) @@ -73,26 +90,32 @@ func (d *DistinctValue[T]) Values() []T { return ss } -// Exceeded indicates if some values were lost because the maximum size limit was met. +// Exceeded indicates that +// if we get rid of totalLen, then Exceeded won't work as expected func (d *DistinctValue[T]) Exceeded() bool { - d.mtx.RLock() - defer d.mtx.RUnlock() - return d.totalLen > d.currLen + d.mtx.Lock() + defer d.mtx.Unlock() + return d.limExceeded } -// TotalDataSize is the total size of all distinct strings encountered. -func (d *DistinctValue[T]) TotalDataSize() int { - d.mtx.RLock() - defer d.mtx.RUnlock() - return d.totalLen +// Size is the total size of all distinct items collected +func (d *DistinctValue[T]) Size() int { + d.mtx.Lock() + defer d.mtx.Unlock() + return d.currLen } // Diff returns all new strings collected since the last time diff was called +// returns nil if diff is not enabled func (d *DistinctValue[T]) Diff() []T { + if !d.diffEnabled { + return nil + } + ss := make([]T, 0, len(d.new)) - d.mtx.RLock() - defer d.mtx.RUnlock() + d.mtx.Lock() + defer d.mtx.Unlock() for k := range d.new { ss = append(ss, k) diff --git a/pkg/collector/distinct_value_collector_test.go b/pkg/collector/distinct_value_collector_test.go index 890daad0856..848f346928e 100644 --- a/pkg/collector/distinct_value_collector_test.go +++ b/pkg/collector/distinct_value_collector_test.go @@ -1,9 +1,12 @@ package collector import ( + "fmt" "sort" + "strconv" "testing" + "github.com/grafana/tempo/pkg/tempopb" "github.com/stretchr/testify/require" ) @@ -28,3 +31,44 @@ func stringsSlicesEqual(t *testing.T, a, b []string) { sort.Strings(b) require.Equal(t, a, b) } + +func BenchmarkCollect(b *testing.B) { + // simulate 100 ingesters, each returning 10_000 tag values + numIngesters := 100 + numTagValuesPerIngester := 10_000 + ingesterTagValues := make([][]tempopb.TagValue, numIngesters) + for i := 0; i < numIngesters; i++ { + tagValues := make([]tempopb.TagValue, numTagValuesPerIngester) + for j := 0; j < numTagValuesPerIngester; j++ { + tagValues[j] = tempopb.TagValue{ + Type: fmt.Sprintf("string"), + Value: fmt.Sprintf("value_%d_%d", i, j), + } + } + ingesterTagValues[i] = tagValues + } + + limits := []int{ + 0, // no limit + 100_000, // 100KB + 1_000_000, // 1MB + 10_000_000, // 10MB + } + + b.ResetTimer() // to exclude the setup time for generating tag values + for _, lim := range limits { + b.Run("limit:"+strconv.Itoa(lim), func(b *testing.B) { + for n := 0; n < b.N; n++ { + // NewDistinctValue is collecting tag values without diff support + distinctValues := NewDistinctValue(lim, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) }) + for _, tagValues := range ingesterTagValues { + for _, v := range tagValues { + if distinctValues.Collect(v) { + break // stop early if limit is reached + } + } + } + } + }) + } +} diff --git a/pkg/collector/generic_collector.go b/pkg/collector/generic_collector.go new file mode 100644 index 00000000000..31fd907f741 --- /dev/null +++ b/pkg/collector/generic_collector.go @@ -0,0 +1,34 @@ +package collector + +import "sync" + +// FIXME: make this collector generic using the golang generics instead of the interface{} + +type GenericCollector[T comparable] struct { + values map[T]struct{} + mu sync.Mutex +} + +// FIXME: do we need a GenericCollector for this? this is only used by SearchRecent?? +// SearchRecent is the searching for only in ingesters?? +func NewGenericCollector[T comparable]() *GenericCollector[T] { + return &GenericCollector[T]{ + values: make(map[T]struct{}), + } +} + +func (c *GenericCollector) Collect(response interface{}) { + c.mu.Lock() + defer c.mu.Unlock() + + c.responses = append(c.responses, response) +} + +func (c *GenericCollector) Results() interface{} { + return c.responses +} + +// FIXME: impliment this method +func (c *GenericCollector) Exceeded() bool { + return false +} From 23b34ff1060e20bab60d72ce1844345029d14a62 Mon Sep 17 00:00:00 2001 From: Suraj Nath <9503187+electron0zero@users.noreply.github.com> Date: Fri, 20 Sep 2024 19:03:56 +0530 Subject: [PATCH 2/8] fixup usage and log lines --- modules/frontend/combiner/search_tag_values.go | 4 ++-- modules/ingester/instance_search.go | 2 +- modules/querier/querier.go | 2 +- pkg/collector/distinct_value_collector.go | 6 +++--- pkg/collector/distinct_value_collector_test.go | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/modules/frontend/combiner/search_tag_values.go b/modules/frontend/combiner/search_tag_values.go index 4a814cf946f..4cb5b8bf537 100644 --- a/modules/frontend/combiner/search_tag_values.go +++ b/modules/frontend/combiner/search_tag_values.go @@ -43,8 +43,8 @@ func NewTypedSearchTagValues(limitBytes int) GRPCCombiner[*tempopb.SearchTagValu } func NewSearchTagValuesV2(limitBytes int) Combiner { - // Distinct collector with no limit - d := collector.NewDistinctValue(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) }) + // Distinct collector with no limit and diff enabled + d := collector.NewDistinctValueWithDiff(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) }) return &genericCombiner[*tempopb.SearchTagValuesV2Response]{ httpStatusCode: 200, diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 16b856db9b9..b0f7f711b1d 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -573,7 +573,7 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag } if valueCollector.Exceeded() { - level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", valueCollector.TotalDataSize()) + _ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", valueCollector.Size()) } resp := &tempopb.SearchTagValuesV2Response{} diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 21d09a21fcc..33d87a60963 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -671,7 +671,7 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV } if distinctValues.Exceeded() { - level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize()) + _ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", distinctValues.Size()) } return valuesToV2Response(distinctValues), nil diff --git a/pkg/collector/distinct_value_collector.go b/pkg/collector/distinct_value_collector.go index f46f65fd521..f216efeee84 100644 --- a/pkg/collector/distinct_value_collector.go +++ b/pkg/collector/distinct_value_collector.go @@ -58,7 +58,7 @@ func (d *DistinctValue[T]) Collect(v T) (exceeded bool) { // note: we will stop adding values slightly before the limit is reached if d.maxLen > 0 && d.currLen+valueLen >= d.maxLen { // No, it can't fit - exceeded = true + d.limExceeded = true return true } @@ -90,8 +90,8 @@ func (d *DistinctValue[T]) Values() []T { return ss } -// Exceeded indicates that -// if we get rid of totalLen, then Exceeded won't work as expected +// Exceeded indicates that we have exceeded the limit +// can be used to stop early and to avoid collecting further values func (d *DistinctValue[T]) Exceeded() bool { d.mtx.Lock() defer d.mtx.Unlock() diff --git a/pkg/collector/distinct_value_collector_test.go b/pkg/collector/distinct_value_collector_test.go index 848f346928e..a4d97b17d2a 100644 --- a/pkg/collector/distinct_value_collector_test.go +++ b/pkg/collector/distinct_value_collector_test.go @@ -11,7 +11,7 @@ import ( ) func TestDistinctValueCollectorDiff(t *testing.T) { - d := NewDistinctValue[string](0, func(s string) int { return len(s) }) + d := NewDistinctValueWithDiff[string](0, func(s string) int { return len(s) }) d.Collect("123") d.Collect("4567") From e2edd76b2b40b7710512eab89c7b47e0ba88ba51 Mon Sep 17 00:00:00 2001 From: Suraj Nath <9503187+electron0zero@users.noreply.github.com> Date: Fri, 20 Sep 2024 19:37:56 +0530 Subject: [PATCH 3/8] exit early when we hit the limits of collector --- modules/querier/querier.go | 43 +++++++++++++++++++++- pkg/collector/distinct_string_collector.go | 2 + 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 33d87a60963..ea49bd66aed 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -504,6 +504,9 @@ func (q *Querier) SearchTagsBlocks(ctx context.Context, req *tempopb.SearchTagsB for _, t := range s.Tags { distinctValues.Collect(t) + if distinctValues.Exceeded() { + break // stop early + } } } @@ -539,9 +542,17 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest if err != nil { return nil, fmt.Errorf("error querying ingesters in Querier.SearchTags: %w", err) } + + stopEarly := false for _, resp := range lookupResults { for _, res := range resp.response.(*tempopb.SearchTagsResponse).TagNames { distinctValues.Collect(res) + if distinctValues.Exceeded() { + stopEarly = true + } + } + if stopEarly { + break } } @@ -573,12 +584,23 @@ func (q *Querier) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsReque limit := q.limits.MaxBytesPerTagValuesQuery(userID) distinctValues := collector.NewScopedDistinctString(limit) + stopEarly := false for _, resp := range lookupResults { for _, res := range resp.response.(*tempopb.SearchTagsV2Response).Scopes { for _, tag := range res.Tags { distinctValues.Collect(res.Name, tag) + if distinctValues.Exceeded() { + stopEarly = true + break + } + } + if stopEarly { + break } } + if stopEarly { + break + } } if distinctValues.Exceeded() { @@ -610,6 +632,7 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal // Virtual tags values. Get these first. for _, v := range search.GetVirtualTagValues(req.TagName) { + // virtual tags are small so no need to stopEarly here distinctValues.Collect(v) } @@ -619,9 +642,18 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal if err != nil { return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err) } + + stopEarly := false for _, resp := range lookupResults { for _, res := range resp.response.(*tempopb.SearchTagValuesResponse).TagValues { distinctValues.Collect(res) + if distinctValues.Exceeded() { + stopEarly = true + break + } + } + if stopEarly { + break } } @@ -648,6 +680,7 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV // Virtual tags values. Get these first. virtualVals := search.GetVirtualTagValuesV2(req.TagName) for _, v := range virtualVals { + // no need to stop early here, virtual tags are small distinctValues.Collect(v) } @@ -664,9 +697,17 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV if err != nil { return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err) } + + stopEarly := false for _, resp := range lookupResults { for _, res := range resp.response.(*tempopb.SearchTagValuesV2Response).TagValues { - distinctValues.Collect(*res) + if distinctValues.Collect(*res) { + stopEarly = true + break + } + } + if stopEarly { + break } } diff --git a/pkg/collector/distinct_string_collector.go b/pkg/collector/distinct_string_collector.go index a96fb7e5906..2df55b8bc02 100644 --- a/pkg/collector/distinct_string_collector.go +++ b/pkg/collector/distinct_string_collector.go @@ -24,6 +24,8 @@ func NewDistinctString(maxDataSize int) *DistinctString { } } +// Collect adds a new value to the distinct string collector. +// return indicates if the value was added or not. func (d *DistinctString) Collect(s string) bool { if _, ok := d.values[s]; ok { // Already present From 7fab35215faf431f90b4e8e8888c1d3fa944cc4b Mon Sep 17 00:00:00 2001 From: Suraj Nath <9503187+electron0zero@users.noreply.github.com> Date: Fri, 20 Sep 2024 20:35:38 +0530 Subject: [PATCH 4/8] cleanup --- pkg/collector/generic_collector.go | 34 ------------------------------ 1 file changed, 34 deletions(-) delete mode 100644 pkg/collector/generic_collector.go diff --git a/pkg/collector/generic_collector.go b/pkg/collector/generic_collector.go deleted file mode 100644 index 31fd907f741..00000000000 --- a/pkg/collector/generic_collector.go +++ /dev/null @@ -1,34 +0,0 @@ -package collector - -import "sync" - -// FIXME: make this collector generic using the golang generics instead of the interface{} - -type GenericCollector[T comparable] struct { - values map[T]struct{} - mu sync.Mutex -} - -// FIXME: do we need a GenericCollector for this? this is only used by SearchRecent?? -// SearchRecent is the searching for only in ingesters?? -func NewGenericCollector[T comparable]() *GenericCollector[T] { - return &GenericCollector[T]{ - values: make(map[T]struct{}), - } -} - -func (c *GenericCollector) Collect(response interface{}) { - c.mu.Lock() - defer c.mu.Unlock() - - c.responses = append(c.responses, response) -} - -func (c *GenericCollector) Results() interface{} { - return c.responses -} - -// FIXME: impliment this method -func (c *GenericCollector) Exceeded() bool { - return false -} From 7e74b04fb8312a5294a3c4a717175838e860d4d3 Mon Sep 17 00:00:00 2001 From: Suraj Nath <9503187+electron0zero@users.noreply.github.com> Date: Fri, 20 Sep 2024 20:37:09 +0530 Subject: [PATCH 5/8] CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 45c1542f0bb..31fc7e2eeda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## main / unreleased +* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero) * [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero) * [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen) * [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero) From c49a064e876320ff2deb283f0e14ed13311bdc85 Mon Sep 17 00:00:00 2001 From: Suraj Nath <9503187+electron0zero@users.noreply.github.com> Date: Fri, 20 Sep 2024 21:01:12 +0530 Subject: [PATCH 6/8] fix lint --- pkg/collector/distinct_value_collector_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/collector/distinct_value_collector_test.go b/pkg/collector/distinct_value_collector_test.go index a4d97b17d2a..b17b1c3e680 100644 --- a/pkg/collector/distinct_value_collector_test.go +++ b/pkg/collector/distinct_value_collector_test.go @@ -41,7 +41,7 @@ func BenchmarkCollect(b *testing.B) { tagValues := make([]tempopb.TagValue, numTagValuesPerIngester) for j := 0; j < numTagValuesPerIngester; j++ { tagValues[j] = tempopb.TagValue{ - Type: fmt.Sprintf("string"), + Type: "string", Value: fmt.Sprintf("value_%d_%d", i, j), } } From fbb28d7773156c39e7180920eacc4591ca487815 Mon Sep 17 00:00:00 2001 From: Suraj Nath <9503187+electron0zero@users.noreply.github.com> Date: Fri, 20 Sep 2024 21:27:09 +0530 Subject: [PATCH 7/8] break with goto --- modules/querier/querier.go | 36 +++++++++--------------------------- 1 file changed, 9 insertions(+), 27 deletions(-) diff --git a/modules/querier/querier.go b/modules/querier/querier.go index ea49bd66aed..30ffc90f2f9 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -543,17 +543,14 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest return nil, fmt.Errorf("error querying ingesters in Querier.SearchTags: %w", err) } - stopEarly := false +outerLoop: for _, resp := range lookupResults { for _, res := range resp.response.(*tempopb.SearchTagsResponse).TagNames { distinctValues.Collect(res) if distinctValues.Exceeded() { - stopEarly = true + break outerLoop // break out of all loops } } - if stopEarly { - break - } } if distinctValues.Exceeded() { @@ -584,22 +581,15 @@ func (q *Querier) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsReque limit := q.limits.MaxBytesPerTagValuesQuery(userID) distinctValues := collector.NewScopedDistinctString(limit) - stopEarly := false +outerLoop: for _, resp := range lookupResults { for _, res := range resp.response.(*tempopb.SearchTagsV2Response).Scopes { for _, tag := range res.Tags { distinctValues.Collect(res.Name, tag) if distinctValues.Exceeded() { - stopEarly = true - break + break outerLoop // break out of all loops } } - if stopEarly { - break - } - } - if stopEarly { - break } } @@ -632,7 +622,7 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal // Virtual tags values. Get these first. for _, v := range search.GetVirtualTagValues(req.TagName) { - // virtual tags are small so no need to stopEarly here + // virtual tags are small so no need to stop early here distinctValues.Collect(v) } @@ -643,18 +633,14 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err) } - stopEarly := false +outerLoop: for _, resp := range lookupResults { for _, res := range resp.response.(*tempopb.SearchTagValuesResponse).TagValues { distinctValues.Collect(res) if distinctValues.Exceeded() { - stopEarly = true - break + break outerLoop // break out of all loops } } - if stopEarly { - break - } } if distinctValues.Exceeded() { @@ -698,17 +684,13 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err) } - stopEarly := false +outerLoop: for _, resp := range lookupResults { for _, res := range resp.response.(*tempopb.SearchTagValuesV2Response).TagValues { if distinctValues.Collect(*res) { - stopEarly = true - break + break outerLoop // break out of all loops } } - if stopEarly { - break - } } if distinctValues.Exceeded() { From 9db274cc9818894a4db69206b91a48c57a67bc33 Mon Sep 17 00:00:00 2001 From: Suraj Nath <9503187+electron0zero@users.noreply.github.com> Date: Fri, 20 Sep 2024 21:33:21 +0530 Subject: [PATCH 8/8] locked and loaded --- pkg/collector/distinct_value_collector.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/collector/distinct_value_collector.go b/pkg/collector/distinct_value_collector.go index f216efeee84..930241ef796 100644 --- a/pkg/collector/distinct_value_collector.go +++ b/pkg/collector/distinct_value_collector.go @@ -78,11 +78,10 @@ func (d *DistinctValue[T]) Collect(v T) (exceeded bool) { // Values returns the final list of distinct values collected and sorted. func (d *DistinctValue[T]) Values() []T { - ss := make([]T, 0, len(d.values)) - d.mtx.Lock() defer d.mtx.Unlock() + ss := make([]T, 0, len(d.values)) for k := range d.values { ss = append(ss, k) } @@ -95,6 +94,7 @@ func (d *DistinctValue[T]) Values() []T { func (d *DistinctValue[T]) Exceeded() bool { d.mtx.Lock() defer d.mtx.Unlock() + return d.limExceeded } @@ -102,21 +102,21 @@ func (d *DistinctValue[T]) Exceeded() bool { func (d *DistinctValue[T]) Size() int { d.mtx.Lock() defer d.mtx.Unlock() + return d.currLen } // Diff returns all new strings collected since the last time diff was called // returns nil if diff is not enabled func (d *DistinctValue[T]) Diff() []T { + d.mtx.Lock() + defer d.mtx.Unlock() + if !d.diffEnabled { return nil } ss := make([]T, 0, len(d.new)) - - d.mtx.Lock() - defer d.mtx.Unlock() - for k := range d.new { ss = append(ss, k) }