From 6e9db73c28dd4db04c7bf7b960ecbd9b7435127e Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 10 Jan 2025 00:33:49 -0800 Subject: [PATCH] Share Config for Expanded Postings Cache Size Across All Tenants Signed-off-by: alanprot --- pkg/storage/tsdb/expanded_postings_cache.go | 25 +++++++++++-------- .../tsdb/expanded_postings_cache_test.go | 15 +++++------ 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 3ea8da709e..7f1496cf65 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/oklog/ulid" @@ -100,6 +101,9 @@ func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f type ExpandedPostingsCacheFactory struct { seedByHash *seedByHash cfg TSDBPostingsCacheConfig + + headCachedBytes atomic.Int64 + blocksCachedBytes atomic.Int64 } func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPostingsCacheFactory { @@ -118,7 +122,7 @@ func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPosti } func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(userId string, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache { - return newBlocksPostingsForMatchersCache(userId, f.cfg, metrics, f.seedByHash) + return newBlocksPostingsForMatchersCache(userId, f.cfg, metrics, f.seedByHash, &f.headCachedBytes, &f.blocksCachedBytes) } type ExpandedPostingsCache interface { @@ -138,7 +142,7 @@ type blocksPostingsForMatchersCache struct { seedByHash *seedByHash } -func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache { +func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash, headCachedBytes, blocksCachedBytes *atomic.Int64) ExpandedPostingsCache { if cfg.PostingsForMatchers == nil { cfg.PostingsForMatchers = tsdb.PostingsForMatchers } @@ -148,8 +152,8 @@ func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfi } return &blocksPostingsForMatchersCache{ - headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), - blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), + headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, headCachedBytes, "head", metrics, cfg.timeNow), + blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, blocksCachedBytes, "block", metrics, cfg.timeNow), postingsForMatchersFunc: cfg.PostingsForMatchers, timeNow: cfg.timeNow, metrics: metrics, @@ -333,10 +337,10 @@ type fifoCache[V any] struct { // Fields from here should be locked cachedMtx sync.RWMutex cached *list.List - cachedBytes int64 + cachedBytes *atomic.Int64 } -func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] { +func newFifoCache[V any](cfg PostingsCacheConfig, cachedBytes *atomic.Int64, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] { return &fifoCache[V]{ cachedValues: new(sync.Map), cached: list.New(), @@ -344,6 +348,7 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded timeNow: timeNow, name: name, metrics: *metrics, + cachedBytes: cachedBytes, } } @@ -417,7 +422,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) { return "", false } - if c.cachedBytes > c.cfg.MaxBytes { + if c.cachedBytes.Load() > c.cfg.MaxBytes { return "full", true } @@ -437,7 +442,7 @@ func (c *fifoCache[V]) evictHead() { c.cached.Remove(front) oldestKey := front.Value.(string) if oldest, loaded := c.cachedValues.LoadAndDelete(oldestKey); loaded { - c.cachedBytes -= oldest.(*cacheEntryPromise[V]).sizeBytes + c.cachedBytes.Add(-oldest.(*cacheEntryPromise[V]).sizeBytes) } } @@ -449,7 +454,7 @@ func (c *fifoCache[V]) created(key string, sizeBytes int64) { c.cachedMtx.Lock() defer c.cachedMtx.Unlock() c.cached.PushBack(key) - c.cachedBytes += sizeBytes + c.cachedBytes.Add(sizeBytes) } func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { @@ -459,7 +464,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { c.cachedMtx.Lock() defer c.cachedMtx.Unlock() - c.cachedBytes += newSizeBytes - oldSize + c.cachedBytes.Add(newSizeBytes - oldSize) } type cacheEntryPromise[V any] struct { diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index 19272c00e7..7fd46e3b5d 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "testing" "time" @@ -13,7 +14,6 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) func TestCacheKey(t *testing.T) { @@ -52,14 +52,14 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { MaxBytes: 10 << 20, } m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) - cache := newFifoCache[int](cfg, "test", m, time.Now) + cache := newFifoCache[int](cfg, &atomic.Int64{}, "test", m, time.Now) calls := atomic.Int64{} concurrency := 100 wg := sync.WaitGroup{} wg.Add(concurrency) fetchFunc := func() (int, int64, error) { - calls.Inc() + calls.Add(1) time.Sleep(100 * time.Millisecond) return 0, 0, nil } @@ -81,7 +81,7 @@ func TestFifoCacheDisabled(t *testing.T) { cfg.Enabled = false m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) timeNow := time.Now - cache := newFifoCache[int](cfg, "test", m, timeNow) + cache := newFifoCache[int](cfg, &atomic.Int64{}, "test", m, timeNow) old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) { return 1, 0, nil }) @@ -124,7 +124,8 @@ func TestFifoCacheExpire(t *testing.T) { r := prometheus.NewPedanticRegistry() m := NewPostingCacheMetrics(r) timeNow := time.Now - cache := newFifoCache[int](c.cfg, "test", m, timeNow) + cachedBytes := atomic.Int64{} + cache := newFifoCache[int](c.cfg, &cachedBytes, "test", m, timeNow) for i := 0; i < numberOfKeys; i++ { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) @@ -169,7 +170,7 @@ func TestFifoCacheExpire(t *testing.T) { for i := 0; i < numberOfKeys; i++ { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) - originalSize := cache.cachedBytes + originalSize := cache.cachedBytes.Load() p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { return 2, 18, nil }) @@ -177,7 +178,7 @@ func TestFifoCacheExpire(t *testing.T) { // New value require.Equal(t, 2, p.v) // Total Size Updated - require.Equal(t, originalSize+10, cache.cachedBytes) + require.Equal(t, originalSize+10, cache.cachedBytes.Load()) } err := testutil.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(`