Skip to content

Commit

Permalink
Share Config for Expanded Postings Cache Size Across All Tenants
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot committed Jan 10, 2025
1 parent 1369b45 commit 6e9db73
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
25 changes: 15 additions & 10 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/oklog/ulid"
Expand Down Expand Up @@ -100,6 +101,9 @@ func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f
type ExpandedPostingsCacheFactory struct {
seedByHash *seedByHash
cfg TSDBPostingsCacheConfig

headCachedBytes atomic.Int64
blocksCachedBytes atomic.Int64
}

func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPostingsCacheFactory {
Expand All @@ -118,7 +122,7 @@ func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPosti
}

func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(userId string, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache {
return newBlocksPostingsForMatchersCache(userId, f.cfg, metrics, f.seedByHash)
return newBlocksPostingsForMatchersCache(userId, f.cfg, metrics, f.seedByHash, &f.headCachedBytes, &f.blocksCachedBytes)
}

type ExpandedPostingsCache interface {
Expand All @@ -138,7 +142,7 @@ type blocksPostingsForMatchersCache struct {
seedByHash *seedByHash
}

func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache {
func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash, headCachedBytes, blocksCachedBytes *atomic.Int64) ExpandedPostingsCache {
if cfg.PostingsForMatchers == nil {
cfg.PostingsForMatchers = tsdb.PostingsForMatchers
}
Expand All @@ -148,8 +152,8 @@ func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfi
}

return &blocksPostingsForMatchersCache{
headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, headCachedBytes, "head", metrics, cfg.timeNow),
blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, blocksCachedBytes, "block", metrics, cfg.timeNow),
postingsForMatchersFunc: cfg.PostingsForMatchers,
timeNow: cfg.timeNow,
metrics: metrics,
Expand Down Expand Up @@ -333,17 +337,18 @@ type fifoCache[V any] struct {
// Fields from here should be locked
cachedMtx sync.RWMutex
cached *list.List
cachedBytes int64
cachedBytes *atomic.Int64
}

func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] {
func newFifoCache[V any](cfg PostingsCacheConfig, cachedBytes *atomic.Int64, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] {
return &fifoCache[V]{
cachedValues: new(sync.Map),
cached: list.New(),
cfg: cfg,
timeNow: timeNow,
name: name,
metrics: *metrics,
cachedBytes: cachedBytes,
}
}

Expand Down Expand Up @@ -417,7 +422,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
return "", false
}

if c.cachedBytes > c.cfg.MaxBytes {
if c.cachedBytes.Load() > c.cfg.MaxBytes {
return "full", true
}

Expand All @@ -437,7 +442,7 @@ func (c *fifoCache[V]) evictHead() {
c.cached.Remove(front)
oldestKey := front.Value.(string)
if oldest, loaded := c.cachedValues.LoadAndDelete(oldestKey); loaded {
c.cachedBytes -= oldest.(*cacheEntryPromise[V]).sizeBytes
c.cachedBytes.Add(-oldest.(*cacheEntryPromise[V]).sizeBytes)
}
}

Expand All @@ -449,7 +454,7 @@ func (c *fifoCache[V]) created(key string, sizeBytes int64) {
c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()
c.cached.PushBack(key)
c.cachedBytes += sizeBytes
c.cachedBytes.Add(sizeBytes)
}

func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
Expand All @@ -459,7 +464,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {

c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()
c.cachedBytes += newSizeBytes - oldSize
c.cachedBytes.Add(newSizeBytes - oldSize)
}

type cacheEntryPromise[V any] struct {
Expand Down
15 changes: 8 additions & 7 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -13,7 +14,6 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestCacheKey(t *testing.T) {
Expand Down Expand Up @@ -52,14 +52,14 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
MaxBytes: 10 << 20,
}
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
cache := newFifoCache[int](cfg, "test", m, time.Now)
cache := newFifoCache[int](cfg, &atomic.Int64{}, "test", m, time.Now)
calls := atomic.Int64{}
concurrency := 100
wg := sync.WaitGroup{}
wg.Add(concurrency)

fetchFunc := func() (int, int64, error) {
calls.Inc()
calls.Add(1)
time.Sleep(100 * time.Millisecond)
return 0, 0, nil
}
Expand All @@ -81,7 +81,7 @@ func TestFifoCacheDisabled(t *testing.T) {
cfg.Enabled = false
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
timeNow := time.Now
cache := newFifoCache[int](cfg, "test", m, timeNow)
cache := newFifoCache[int](cfg, &atomic.Int64{}, "test", m, timeNow)
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
return 1, 0, nil
})
Expand Down Expand Up @@ -124,7 +124,8 @@ func TestFifoCacheExpire(t *testing.T) {
r := prometheus.NewPedanticRegistry()
m := NewPostingCacheMetrics(r)
timeNow := time.Now
cache := newFifoCache[int](c.cfg, "test", m, timeNow)
cachedBytes := atomic.Int64{}
cache := newFifoCache[int](c.cfg, &cachedBytes, "test", m, timeNow)

for i := 0; i < numberOfKeys; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
Expand Down Expand Up @@ -169,15 +170,15 @@ func TestFifoCacheExpire(t *testing.T) {

for i := 0; i < numberOfKeys; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
originalSize := cache.cachedBytes
originalSize := cache.cachedBytes.Load()
p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) {
return 2, 18, nil
})
require.False(t, loaded)
// New value
require.Equal(t, 2, p.v)
// Total Size Updated
require.Equal(t, originalSize+10, cache.cachedBytes)
require.Equal(t, originalSize+10, cache.cachedBytes.Load())
}

err := testutil.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(`
Expand Down

0 comments on commit 6e9db73

Please sign in to comment.