From d2480d711e8eea68cd306e96db1cb0a7dd2b5f4b Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Tue, 22 Oct 2024 17:20:01 -0400 Subject: [PATCH] db: avoid repeated lookups into IterStatsCollector's map When creating a top-level *pebble.Iterator, look up the appropriate categorized stats struct once and propagate it to all sstable iterators constructed by the Iterator, rather than looking up on every sstable iterator Close. Recent experiments showed significant time spent in the statsMap Load: ``` 2950.28s 4.24hrs (flat, cum) 3.70% of Total 110.55s 110.56s 144:func (c *CategoryStatsCollector) reportStats( . . 145: p uint64, category Category, qosLevel QoSLevel, stats CategoryStats, . . 146:) { 129.36s 1.30hrs 147: v, ok := c.statsMap.Load(category) 91.69s 91.72s 148: if !ok { . . 149: c.mu.Lock() . . 150: v, _ = c.statsMap.LoadOrStore(category, &shardedCategoryStats{ . . 151: Category: category, . . 152: QoSLevel: qosLevel, . . 153: }) . . 154: c.mu.Unlock() . . 155: } . . 156: 93.55s 93.56s 157: shardedStats := v.(*shardedCategoryStats) 2338.74s 2339.56s 158: s := ((p * 25214903917) >> 32) & (numCategoryStatsShards - 1) 40.75s 1.70hrs 159: shardedStats.shards[s].mu.Lock() 52.07s 295.26s 160: shardedStats.shards[s].stats.aggregate(stats) 45.31s 1465.73s 161: shardedStats.shards[s].mu.Unlock() 48.26s 48.27s 162:} ``` With this change, it's limited to 2.44% across the per-Iterator acquistion of an Accumulator and the per-sstable iterator reporting of the iterator stats: ``` Active filters: focus=Accumulat Showing nodes accounting for 2.78hrs, 2.44% of 114.22hrs total Dropped 103 nodes (cum <= 0.57hrs) Showing top 20 nodes out of 48 flat flat% sum% cum cum% 0.81hrs 0.71% 0.71% 1.31hrs 1.15% github.com/cockroachdb/pebble/sstable.(*categoryStatsWithMu).Accumulate 0.39hrs 0.34% 1.05% 1.46hrs 1.28% github.com/cockroachdb/pebble/sstable.(*CategoryStatsCollector).Accumulator 0.37hrs 0.32% 1.37% 0.72hrs 0.63% runtime.mapaccess2 0.34hrs 0.3% 1.67% 0.36hrs 0.31% sync.(*Mutex).Unlock (inline) 0.20hrs 0.18% 1.84% 0.20hrs 0.18% sync.(*entry).load (inline) 0.10hrs 0.088% 1.93% 1.08hrs 0.94% sync.(*Map).Load 0.08hrs 0.069% 2.00% 0.10hrs 0.089% sync.(*Mutex).lockSlow 0.06hrs 0.056% 2.06% 0.07hrs 0.063% runtime.strequal 0.06hrs 0.056% 2.11% 0.11hrs 0.092% runtime.typehash 0.06hrs 0.049% 2.16% 0.17hrs 0.15% runtime.nilinterhash 0.05hrs 0.047% 2.21% 0.05hrs 0.048% sync.(*Map).loadReadOnly (inline) 0.04hrs 0.037% 2.25% 0.11hrs 0.1% runtime.efaceeq 0.04hrs 0.034% 2.28% 0.15hrs 0.13% runtime.nilinterequal 0.03hrs 0.027% 2.31% 1.35hrs 1.18% github.com/cockroachdb/pebble/sstable.(*iterStatsAccumulator).close 0.03hrs 0.027% 2.33% 0.03hrs 0.027% github.com/cockroachdb/pebble/sstable.(*iterStatsAccumulator).init 0.03hrs 0.025% 2.36% 0.03hrs 0.025% aeshashbody 0.02hrs 0.022% 2.38% 0.13hrs 0.11% sync.(*Mutex).Lock (inline) 0.02hrs 0.02% 2.40% 0.02hrs 0.02% github.com/cockroachdb/pebble/sstable.(*iterStatsAccumulator).reportStats 0.02hrs 0.02% 2.42% 0.02hrs 0.02% github.com/cockroachdb/pebble/sstable.(*CategoryStats).aggregate (inline) 0.02hrs 0.014% 2.44% 0.02hrs 0.014% runtime.add (inline) ``` --- db.go | 38 ++++++++++----- external_iterator.go | 7 ++- internal/crdbtest/crdb_bench_test.go | 4 +- iterator.go | 2 + level_iter.go | 9 ++-- level_iter_test.go | 2 +- merging_iter_test.go | 2 +- sstable/block_property_test.go | 4 +- sstable/category_stats.go | 71 +++++++++++++++------------- sstable/random_test.go | 3 +- sstable/reader.go | 34 ++++++------- sstable/reader_common.go | 6 +-- sstable/reader_iter_single_lvl.go | 16 +++---- sstable/reader_iter_test.go | 6 +-- sstable/reader_iter_two_lvl.go | 10 ++-- sstable/reader_test.go | 13 +++-- sstable/reader_virtual.go | 10 ++-- table_cache.go | 13 +++-- testdata/metrics | 18 +++++++ 19 files changed, 147 insertions(+), 121 deletions(-) diff --git a/db.go b/db.go index dc9dd3c396..3adb2d5e07 100644 --- a/db.go +++ b/db.go @@ -12,6 +12,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/arenaskl" @@ -1037,20 +1038,20 @@ type newIterOpts struct { // newIter constructs a new iterator, merging in batch iterators as an extra // level. func (d *DB) newIter( - ctx context.Context, batch *Batch, internalOpts newIterOpts, o *IterOptions, + ctx context.Context, batch *Batch, newIterOpts newIterOpts, o *IterOptions, ) *Iterator { - if internalOpts.batch.batchOnly { + if newIterOpts.batch.batchOnly { if batch == nil { panic("batchOnly is true, but batch is nil") } - if internalOpts.snapshot.vers != nil { + if newIterOpts.snapshot.vers != nil { panic("batchOnly is true, but snapshotIterOpts is initialized") } } if err := d.closed.Load(); err != nil { panic(err) } - seqNum := internalOpts.snapshot.seqNum + seqNum := newIterOpts.snapshot.seqNum if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges { panic("pebble: range key masking requires IterKeyTypePointsAndRanges") } @@ -1064,13 +1065,13 @@ func (d *DB) newIter( var readState *readState var newIters tableNewIters var newIterRangeKey keyspanimpl.TableNewSpanIter - if !internalOpts.batch.batchOnly { + if !newIterOpts.batch.batchOnly { // Grab and reference the current readState. This prevents the underlying // files in the associated version from being deleted if there is a current // compaction. The readState is unref'd by Iterator.Close(). - if internalOpts.snapshot.vers == nil { - if internalOpts.snapshot.readState != nil { - readState = internalOpts.snapshot.readState + if newIterOpts.snapshot.vers == nil { + if newIterOpts.snapshot.readState != nil { + readState = newIterOpts.snapshot.readState readState.ref() } else { // NB: loadReadState() calls readState.ref(). @@ -1078,7 +1079,7 @@ func (d *DB) newIter( } } else { // vers != nil - internalOpts.snapshot.vers.Ref() + newIterOpts.snapshot.vers.Ref() } // Determine the seqnum to read at after grabbing the read state (current and @@ -1100,15 +1101,16 @@ func (d *DB) newIter( merge: d.merge, comparer: *d.opts.Comparer, readState: readState, - version: internalOpts.snapshot.vers, + version: newIterOpts.snapshot.vers, keyBuf: buf.keyBuf, prefixOrFullSeekKey: buf.prefixOrFullSeekKey, boundsBuf: buf.boundsBuf, batch: batch, + tc: d.tableCache, newIters: newIters, newIterRangeKey: newIterRangeKey, seqNum: seqNum, - batchOnlyIter: internalOpts.batch.batchOnly, + batchOnlyIter: newIterOpts.batch.batchOnly, } if o != nil { dbi.opts = *o @@ -1399,7 +1401,19 @@ func (i *Iterator) constructPointIter( // Already have one. return } - internalOpts := internalIterOpts{stats: &i.stats.InternalStats} + internalOpts := internalIterOpts{ + stats: &i.stats.InternalStats, + } + // If the table cache has a sstable stats collector, ask it for an + // accumulator for this iterator's configured category and QoS. All SSTable + // iterators created by this Iterator will accumulate their stats to it as + // they Close during iteration. + if collector := i.tc.dbOpts.sstStatsCollector; collector != nil { + internalOpts.iterStatsAccumulator = collector.Accumulator( + uint64(uintptr(unsafe.Pointer(i))), + i.opts.CategoryAndQoS, + ) + } if i.opts.RangeKeyMasking.Filter != nil { internalOpts.boundLimitedFilter = &i.rangeKeyMasking } diff --git a/external_iterator.go b/external_iterator.go index 72c7a14797..03dca9782c 100644 --- a/external_iterator.go +++ b/external_iterator.go @@ -134,6 +134,11 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato } mlevels := it.alloc.mlevels[:0] + // TODO(jackson): External iterators never provide categorized iterator + // stats today because they exist outside the context of a *DB. If the + // sstables being read are on the physical filesystem, we may still want to + // thread a CategoryStatsCollector through so that we collect their stats. + if len(it.externalReaders) > cap(mlevels) { mlevels = make([]mergingIterLevel, 0, len(it.externalReaders)) } @@ -159,7 +164,7 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato pointIter, err = r.NewPointIter( ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */ sstable.NeverUseFilterBlock, - &it.stats.InternalStats, it.opts.CategoryAndQoS, nil, + &it.stats.InternalStats, nil, sstable.MakeTrivialReaderProvider(r)) if err != nil { return nil, err diff --git a/internal/crdbtest/crdb_bench_test.go b/internal/crdbtest/crdb_bench_test.go index cac71892c3..d20b2fb357 100644 --- a/internal/crdbtest/crdb_bench_test.go +++ b/internal/crdbtest/crdb_bench_test.go @@ -117,7 +117,7 @@ func benchmarkRandSeekInSST( rp := sstable.MakeTrivialReaderProvider(reader) iter, err := reader.NewPointIter( ctx, sstable.NoTransforms, nil, nil, nil, sstable.NeverUseFilterBlock, - &stats, sstable.CategoryAndQoS{}, nil, rp) + &stats, nil, rp) require.NoError(b, err) n := 0 for kv := iter.First(); kv != nil; kv = iter.Next() { @@ -133,7 +133,7 @@ func benchmarkRandSeekInSST( key := queryKeys[i%numQueryKeys] iter, err := reader.NewPointIter( ctx, sstable.NoTransforms, nil, nil, nil, sstable.NeverUseFilterBlock, - &stats, sstable.CategoryAndQoS{}, nil, rp) + &stats, nil, rp) if err != nil { b.Fatal(err) } diff --git a/iterator.go b/iterator.go index 4b21b9367e..a2aa77e4cf 100644 --- a/iterator.go +++ b/iterator.go @@ -245,6 +245,7 @@ type Iterator struct { // and SetOptions or when re-fragmenting a batch's range keys/range dels. // Non-nil if this Iterator includes a Batch. batch *Batch + tc *tableCacheContainer newIters tableNewIters newIterRangeKey keyspanimpl.TableNewSpanIter lazyCombinedIter lazyCombinedIter @@ -2866,6 +2867,7 @@ func (i *Iterator) CloneWithContext(ctx context.Context, opts CloneOptions) (*It boundsBuf: buf.boundsBuf, batch: i.batch, batchSeqNum: i.batchSeqNum, + tc: i.tc, newIters: i.newIters, newIterRangeKey: i.newIterRangeKey, seqNum: i.seqNum, diff --git a/level_iter.go b/level_iter.go index 925a7c95b2..7f4b6f853c 100644 --- a/level_iter.go +++ b/level_iter.go @@ -22,10 +22,11 @@ type internalIterOpts struct { // if compaction is set, sstable-level iterators will be created using // NewCompactionIter; these iterators have a more constrained interface // and are optimized for the sequential scan of a compaction. - compaction bool - bufferPool *sstable.BufferPool - stats *base.InternalIteratorStats - boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter + compaction bool + bufferPool *sstable.BufferPool + stats *base.InternalIteratorStats + iterStatsAccumulator sstable.IterStatsAccumulator + boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter } // levelIter provides a merged view of the sstables in a level. diff --git a/level_iter_test.go b/level_iter_test.go index c52efaa818..8a7f51d7e4 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -184,7 +184,7 @@ func (lt *levelIterTest) newIters( if kinds.Point() { iter, err := lt.readers[file.FileNum].NewPointIter( ctx, transforms, - opts.LowerBound, opts.UpperBound, nil, sstable.AlwaysUseFilterBlock, iio.stats, sstable.CategoryAndQoS{}, + opts.LowerBound, opts.UpperBound, nil, sstable.AlwaysUseFilterBlock, iio.stats, nil, sstable.MakeTrivialReaderProvider(lt.readers[file.FileNum])) if err != nil { return iterSet{}, errors.CombineErrors(err, set.CloseAll()) diff --git a/merging_iter_test.go b/merging_iter_test.go index 2e719a39d7..d8378ee555 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -179,7 +179,7 @@ func TestMergingIterDataDriven(t *testing.T) { context.Background(), sstable.NoTransforms, opts.GetLowerBound(), opts.GetUpperBound(), nil, sstable.AlwaysUseFilterBlock, iio.stats, - sstable.CategoryAndQoS{}, nil, sstable.MakeTrivialReaderProvider(r)) + nil, sstable.MakeTrivialReaderProvider(r)) if err != nil { return iterSet{}, errors.CombineErrors(err, set.CloseAll()) } diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index 17ca33b16b..5abada5cc6 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -988,7 +988,7 @@ func TestBlockProperties(t *testing.T) { iter, err := r.NewPointIter( context.Background(), NoTransforms, lower, upper, filterer, NeverUseFilterBlock, &stats, - CategoryAndQoS{}, nil, MakeTrivialReaderProvider(r)) + nil, MakeTrivialReaderProvider(r)) if err != nil { return err.Error() } @@ -1072,7 +1072,7 @@ func TestBlockProperties_BoundLimited(t *testing.T) { iter, err := r.NewPointIter( context.Background(), NoTransforms, lower, upper, filterer, NeverUseFilterBlock, &stats, - CategoryAndQoS{}, nil, MakeTrivialReaderProvider(r)) + nil, MakeTrivialReaderProvider(r)) if err != nil { return err.Error() } diff --git a/sstable/category_stats.go b/sstable/category_stats.go index 9daf404183..2db32ca61e 100644 --- a/sstable/category_stats.go +++ b/sstable/category_stats.go @@ -105,6 +105,13 @@ type categoryStatsWithMu struct { stats CategoryStats } +// Accumulate implements the IterStatsAccumulator interface. +func (c *categoryStatsWithMu) Accumulate(stats CategoryStats) { + c.mu.Lock() + c.stats.aggregate(stats) + c.mu.Unlock() +} + // CategoryStatsCollector collects and aggregates the stats per category. type CategoryStatsCollector struct { // mu protects additions to statsMap. @@ -141,24 +148,23 @@ func (s *shardedCategoryStats) getStats() CategoryStatsAggregate { return agg } -func (c *CategoryStatsCollector) reportStats( - p uint64, category Category, qosLevel QoSLevel, stats CategoryStats, -) { - v, ok := c.statsMap.Load(category) +// Accumulator returns a stats accumulator for the given category. The provided +// p is used to detrmine which shard to write stats to. +func (c *CategoryStatsCollector) Accumulator(p uint64, caq CategoryAndQoS) IterStatsAccumulator { + v, ok := c.statsMap.Load(caq.Category) if !ok { c.mu.Lock() - v, _ = c.statsMap.LoadOrStore(category, &shardedCategoryStats{ - Category: category, - QoSLevel: qosLevel, + v, _ = c.statsMap.LoadOrStore(caq.Category, &shardedCategoryStats{ + Category: caq.Category, + QoSLevel: caq.QoSLevel, }) c.mu.Unlock() } - - shardedStats := v.(*shardedCategoryStats) - s := ((p * 25214903917) >> 32) & (numCategoryStatsShards - 1) - shardedStats.shards[s].mu.Lock() - shardedStats.shards[s].stats.aggregate(stats) - shardedStats.shards[s].mu.Unlock() + s := v.(*shardedCategoryStats) + // This equation is taken from: + // https://en.wikipedia.org/wiki/Linear_congruential_generator#Parameters_in_common_use + shard := ((p * 25214903917) >> 32) & (numCategoryStatsShards - 1) + return &s.shards[shard].categoryStatsWithMu } // GetStats returns the aggregated stats. @@ -178,34 +184,33 @@ func (c *CategoryStatsCollector) GetStats() []CategoryStatsAggregate { return stats } -// iterStatsAccumulator is a helper for a sstable iterator to accumulate -// stats, which are reported to the CategoryStatsCollector when the -// accumulator is closed. +type IterStatsAccumulator interface { + // Accumulate accumulates the provided stats. + Accumulate(cas CategoryStats) +} + +// iterStatsAccumulator is a helper for a sstable iterator to accumulate stats, +// which are reported to the CategoryStatsCollector when the accumulator is +// closed. type iterStatsAccumulator struct { - Category - QoSLevel - stats CategoryStats - collector *CategoryStatsCollector + stats CategoryStats + parent IterStatsAccumulator } -func (accum *iterStatsAccumulator) init( - categoryAndQoS CategoryAndQoS, collector *CategoryStatsCollector, -) { - accum.Category = categoryAndQoS.Category - accum.QoSLevel = categoryAndQoS.QoSLevel - accum.collector = collector +func (a *iterStatsAccumulator) init(parent IterStatsAccumulator) { + a.parent = parent } -func (accum *iterStatsAccumulator) reportStats( +func (a *iterStatsAccumulator) reportStats( blockBytes, blockBytesInCache uint64, blockReadDuration time.Duration, ) { - accum.stats.BlockBytes += blockBytes - accum.stats.BlockBytesInCache += blockBytesInCache - accum.stats.BlockReadDuration += blockReadDuration + a.stats.BlockBytes += blockBytes + a.stats.BlockBytesInCache += blockBytesInCache + a.stats.BlockReadDuration += blockReadDuration } -func (accum *iterStatsAccumulator) close() { - if accum.collector != nil { - accum.collector.reportStats(uint64(uintptr(unsafe.Pointer(accum))), accum.Category, accum.QoSLevel, accum.stats) +func (a *iterStatsAccumulator) close() { + if a.parent != nil { + a.parent.Accumulate(a.stats) } } diff --git a/sstable/random_test.go b/sstable/random_test.go index fd983d4e11..36bc9ae0ed 100644 --- a/sstable/random_test.go +++ b/sstable/random_test.go @@ -111,8 +111,7 @@ func runErrorInjectionTest(t *testing.T, seed int64) { filterer, filterBlockSizeLimit, &stats, - CategoryAndQoS{}, - nil, /* CategoryStatsCollector */ + nil, /* IterStatsAccumulator */ MakeTrivialReaderProvider(r), ) require.NoError(t, err) diff --git a/sstable/reader.go b/sstable/reader.go index 261d753653..9bb0942180 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -120,13 +120,12 @@ func (r *Reader) NewPointIter( filterer *BlockPropertiesFilterer, filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, ) (Iterator, error) { return r.newPointIter( ctx, transforms, lower, upper, filterer, filterBlockSizeLimit, - stats, categoryAndQoS, statsCollector, rp, nil) + stats, statsAccum, rp, nil) } // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called @@ -152,8 +151,7 @@ func (r *Reader) newPointIter( filterer *BlockPropertiesFilterer, filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, vState *virtualState, ) (Iterator, error) { @@ -166,21 +164,21 @@ func (r *Reader) newPointIter( if r.tableFormat.BlockColumnar() { res, err = newColumnBlockTwoLevelIterator( ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit, - stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */) + stats, statsAccum, rp, nil /* bufferPool */) } else { res, err = newRowBlockTwoLevelIterator( ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit, - stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */) + stats, statsAccum, rp, nil /* bufferPool */) } } else { if r.tableFormat.BlockColumnar() { res, err = newColumnBlockSingleLevelIterator( ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit, - stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */) + stats, statsAccum, rp, nil /* bufferPool */) } else { res, err = newRowBlockSingleLevelIterator( ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit, - stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */) + stats, statsAccum, rp, nil /* bufferPool */) } } if err != nil { @@ -202,7 +200,7 @@ func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterat // likely isn't a cache set up. return r.NewPointIter( context.TODO(), transforms, lower, upper, nil, AlwaysUseFilterBlock, - nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, MakeTrivialReaderProvider(r)) + nil /* stats */, nil /* statsAccum */, MakeTrivialReaderProvider(r)) } // NewCompactionIter returns an iterator similar to NewIter but it also increments @@ -210,18 +208,16 @@ func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterat // after itself and returns a nil iterator. func (r *Reader) NewCompactionIter( transforms IterTransforms, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, bufferPool *block.BufferPool, ) (Iterator, error) { - return r.newCompactionIter(transforms, categoryAndQoS, statsCollector, rp, nil, bufferPool) + return r.newCompactionIter(transforms, statsAccum, rp, nil, bufferPool) } func (r *Reader) newCompactionIter( transforms IterTransforms, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, vState *virtualState, bufferPool *block.BufferPool, @@ -234,7 +230,7 @@ func (r *Reader) newCompactionIter( i, err := newRowBlockTwoLevelIterator( context.Background(), r, vState, transforms, nil /* lower */, nil /* upper */, nil, - NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool) + NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool) if err != nil { return nil, err } @@ -244,7 +240,7 @@ func (r *Reader) newCompactionIter( i, err := newColumnBlockTwoLevelIterator( context.Background(), r, vState, transforms, nil /* lower */, nil /* upper */, nil, - NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool) + NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool) if err != nil { return nil, err } @@ -254,7 +250,7 @@ func (r *Reader) newCompactionIter( if !r.tableFormat.BlockColumnar() { i, err := newRowBlockSingleLevelIterator( context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */ - nil, NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool) + nil, NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool) if err != nil { return nil, err } @@ -263,7 +259,7 @@ func (r *Reader) newCompactionIter( } i, err := newColumnBlockSingleLevelIterator( context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */ - nil, NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool) + nil, NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool) if err != nil { return nil, err } diff --git a/sstable/reader_common.go b/sstable/reader_common.go index 20cc3015de..f8db26278c 100644 --- a/sstable/reader_common.go +++ b/sstable/reader_common.go @@ -32,15 +32,13 @@ type CommonReader interface { filterer *BlockPropertiesFilterer, filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, ) (Iterator, error) NewCompactionIter( transforms IterTransforms, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, bufferPool *block.BufferPool, ) (Iterator, error) diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index 2cad9c0ac0..5dcbbd744a 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -210,8 +210,7 @@ func newColumnBlockSingleLevelIterator( filterer *BlockPropertiesFilterer, filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, bufferPool *block.BufferPool, ) (*singleLevelIteratorColumnBlocks, error) { @@ -225,7 +224,7 @@ func newColumnBlockSingleLevelIterator( useFilterBlock := shouldUseFilterBlock(r, filterBlockSizeLimit) i.init( ctx, r, v, transforms, lower, upper, filterer, useFilterBlock, - stats, categoryAndQoS, statsCollector, bufferPool, + stats, statsAccum, bufferPool, ) var getLazyValuer block.GetLazyValueForPrefixAndValueHandler if r.Properties.NumValueBlocks > 0 { @@ -275,8 +274,7 @@ func newRowBlockSingleLevelIterator( filterer *BlockPropertiesFilterer, filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, bufferPool *block.BufferPool, ) (*singleLevelIteratorRowBlocks, error) { @@ -290,7 +288,7 @@ func newRowBlockSingleLevelIterator( useFilterBlock := shouldUseFilterBlock(r, filterBlockSizeLimit) i.init( ctx, r, v, transforms, lower, upper, filterer, useFilterBlock, - stats, categoryAndQoS, statsCollector, bufferPool, + stats, statsAccum, bufferPool, ) if r.tableFormat >= TableFormatPebblev3 { if r.Properties.NumValueBlocks > 0 { @@ -336,8 +334,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) init( filterer *BlockPropertiesFilterer, useFilterBlock bool, stats *base.InternalIteratorStats, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, bufferPool *block.BufferPool, ) { i.inPool = false @@ -353,8 +350,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) init( i.vState = v i.endKeyInclusive, i.lower, i.upper = v.constrainBounds(lower, upper, false /* endInclusive */) } - - i.iterStats.init(categoryAndQoS, statsCollector) + i.iterStats.init(statsAccum) i.readBlockEnv = readBlockEnv{ Stats: stats, IterStats: &i.iterStats, diff --git a/sstable/reader_iter_test.go b/sstable/reader_iter_test.go index a16793ceb3..28fae7ebc4 100644 --- a/sstable/reader_iter_test.go +++ b/sstable/reader_iter_test.go @@ -64,8 +64,7 @@ func TestIteratorErrorOnInit(t *testing.T) { nil /* lower */, nil, /* upper */ nil /* filterer */, NeverUseFilterBlock, &stats, - CategoryAndQoS{}, - nil, /* statsCollector */ + nil, /* statsAccum */ MakeTrivialReaderProvider(r), &pool, ) @@ -79,8 +78,7 @@ func TestIteratorErrorOnInit(t *testing.T) { nil /* lower */, nil, /* upper */ nil /* filterer */, NeverUseFilterBlock, &stats, - CategoryAndQoS{}, - nil, /* statsCollector */ + nil, /* statsAccum */ MakeTrivialReaderProvider(r), &pool, ) diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 605ec63606..31e38f4367 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -161,8 +161,7 @@ func newColumnBlockTwoLevelIterator( filterer *BlockPropertiesFilterer, filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, bufferPool *block.BufferPool, ) (*twoLevelIteratorColumnBlocks, error) { @@ -175,7 +174,7 @@ func newColumnBlockTwoLevelIterator( i := twoLevelIterColumnBlockPool.Get().(*twoLevelIteratorColumnBlocks) i.secondLevel.init(ctx, r, v, transforms, lower, upper, filterer, false, // Disable the use of the filter block in the second level. - stats, categoryAndQoS, statsCollector, bufferPool) + stats, statsAccum, bufferPool) var getLazyValuer block.GetLazyValueForPrefixAndValueHandler if r.Properties.NumValueBlocks > 0 { // NB: we cannot avoid this ~248 byte allocation, since valueBlockReader @@ -225,8 +224,7 @@ func newRowBlockTwoLevelIterator( filterer *BlockPropertiesFilterer, filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, bufferPool *block.BufferPool, ) (*twoLevelIteratorRowBlocks, error) { @@ -239,7 +237,7 @@ func newRowBlockTwoLevelIterator( i := twoLevelIterRowBlockPool.Get().(*twoLevelIteratorRowBlocks) i.secondLevel.init(ctx, r, v, transforms, lower, upper, filterer, false, // Disable the use of the filter block in the second level. - stats, categoryAndQoS, statsCollector, bufferPool) + stats, statsAccum, bufferPool) if r.tableFormat >= TableFormatPebblev3 { if r.Properties.NumValueBlocks > 0 { // NB: we cannot avoid this ~248 byte allocation, since valueBlockReader diff --git a/sstable/reader_test.go b/sstable/reader_test.go index f05f482843..c244a83383 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -234,7 +234,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i transforms := IterTransforms{ SyntheticPrefixAndSuffix: block.MakeSyntheticPrefixAndSuffix(nil, syntheticSuffix), } - iter, err := v.NewCompactionIter(transforms, CategoryAndQoS{}, nil, rp, &bp) + iter, err := v.NewCompactionIter(transforms, nil, rp, &bp) if err != nil { return err.Error() } @@ -361,7 +361,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i } iter, err := v.NewPointIter( context.Background(), transforms, lower, upper, filterer, NeverUseFilterBlock, - &stats, CategoryAndQoS{}, nil, MakeTrivialReaderProvider(r)) + &stats, nil, MakeTrivialReaderProvider(r)) if err != nil { return err.Error() } @@ -755,7 +755,6 @@ func runTestReader(t *testing.T, o WriterOptions, dir string, r *Reader, printVa filterer, AlwaysUseFilterBlock, &stats, - CategoryAndQoS{}, nil, MakeTrivialReaderProvider(r), ) @@ -899,7 +898,7 @@ func TestCompactionIteratorSetupForCompaction(t *testing.T) { var pool block.BufferPool pool.Init(5) citer, err := r.NewCompactionIter( - NoTransforms, CategoryAndQoS{}, nil, MakeTrivialReaderProvider(r), &pool) + NoTransforms, nil, MakeTrivialReaderProvider(r), &pool) require.NoError(t, err) switch i := citer.(type) { case *singleLevelIteratorRowBlocks: @@ -957,7 +956,7 @@ func TestReadaheadSetupForV3TablesWithMultipleVersions(t *testing.T) { pool.Init(5) defer pool.Release() citer, err := r.NewCompactionIter( - NoTransforms, CategoryAndQoS{}, nil, MakeTrivialReaderProvider(r), &pool) + NoTransforms, nil, MakeTrivialReaderProvider(r), &pool) require.NoError(t, err) defer citer.Close() i := citer.(*singleLevelIteratorRowBlocks) @@ -1254,7 +1253,7 @@ func TestRandomizedPrefixSuffixRewriter(t *testing.T) { SyntheticPrefixAndSuffix: block.MakeSyntheticPrefixAndSuffix(syntheticPrefix, syntheticSuffix), }, nil, nil, nil, - AlwaysUseFilterBlock, nil, CategoryAndQoS{}, nil, + AlwaysUseFilterBlock, nil, nil, MakeTrivialReaderProvider(eReader), &virtualState{ lower: base.MakeInternalKey([]byte("_"), base.SeqNumMax, base.InternalKeyKindSet), upper: base.MakeRangeDeleteSentinelKey([]byte("~~~~~~~~~~~~~~~~")), @@ -2401,7 +2400,7 @@ func BenchmarkIteratorScanObsolete(b *testing.B) { transforms := IterTransforms{HideObsoletePoints: hideObsoletePoints} iter, err := r.NewPointIter( context.Background(), transforms, nil, nil, filterer, - AlwaysUseFilterBlock, nil, CategoryAndQoS{}, nil, + AlwaysUseFilterBlock, nil, nil, MakeTrivialReaderProvider(r)) require.NoError(b, err) b.ResetTimer() diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index dfa84d2adf..c37c8963a1 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -94,13 +94,12 @@ func MakeVirtualReader(reader *Reader, p VirtualReaderParams) VirtualReader { // NewCompactionIter is the compaction iterator function for virtual readers. func (v *VirtualReader) NewCompactionIter( transforms IterTransforms, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, bufferPool *block.BufferPool, ) (Iterator, error) { return v.reader.newCompactionIter( - transforms, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool) + transforms, statsAccum, rp, &v.vState, bufferPool) } // NewPointIter returns an iterator for the point keys in the table. @@ -119,13 +118,12 @@ func (v *VirtualReader) NewPointIter( filterer *BlockPropertiesFilterer, filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, - categoryAndQoS CategoryAndQoS, - statsCollector *CategoryStatsCollector, + statsAccum IterStatsAccumulator, rp ReaderProvider, ) (Iterator, error) { return v.reader.newPointIter( ctx, transforms, lower, upper, filterer, filterBlockSizeLimit, - stats, categoryAndQoS, statsCollector, rp, &v.vState) + stats, statsAccum, rp, &v.vState) } // ValidateBlockChecksumsOnBacking will call ValidateBlockChecksumsOnBacking on the underlying reader. diff --git a/table_cache.go b/table_cache.go index c95203d878..d0d4e361d9 100644 --- a/table_cache.go +++ b/table_cache.go @@ -571,18 +571,17 @@ func (c *tableCacheShard) newPointIter( } transforms := file.IterTransforms() transforms.HideObsoletePoints = hideObsoletePoints - var categoryAndQoS sstable.CategoryAndQoS - if opts != nil { - categoryAndQoS = opts.CategoryAndQoS + iterStatsAccum := internalOpts.iterStatsAccumulator + if iterStatsAccum == nil && opts != nil && dbOpts.sstStatsCollector != nil { + iterStatsAccum = dbOpts.sstStatsCollector.Accumulator( + uint64(uintptr(unsafe.Pointer(v.reader))), opts.CategoryAndQoS) } if internalOpts.compaction { - iter, err = cr.NewCompactionIter( - transforms, categoryAndQoS, dbOpts.sstStatsCollector, rp, - internalOpts.bufferPool) + iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, rp, internalOpts.bufferPool) } else { iter, err = cr.NewPointIter( ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit, - internalOpts.stats, categoryAndQoS, dbOpts.sstStatsCollector, rp) + internalOpts.stats, iterStatsAccum, rp) } if err != nil { return nil, err diff --git a/testdata/metrics b/testdata/metrics index cd66bfe2f7..360a8a56d6 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -83,6 +83,9 @@ Table iters: 1 Filter utility: 0.0% Ingestions: 0 as flushable: 0 (0B in 0 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B +Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} + b, latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} disk-usage ---- @@ -141,6 +144,9 @@ Filter utility: 0.0% Ingestions: 0 as flushable: 0 (0B in 0 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} + b, latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} + c, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} pebble-compaction, non-latency: {BlockBytes:88 BlockBytesInCache:44 BlockReadDuration:10ms} disk-usage @@ -185,6 +191,9 @@ Filter utility: 0.0% Ingestions: 0 as flushable: 0 (0B in 0 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} + b, latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} + c, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} pebble-compaction, non-latency: {BlockBytes:88 BlockBytesInCache:44 BlockReadDuration:10ms} # Closing iter c will release one of the zombie sstables. The other @@ -226,6 +235,8 @@ Filter utility: 0.0% Ingestions: 0 as flushable: 0 (0B in 0 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} + b, latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} c, non-latency: {BlockBytes:44 BlockBytesInCache:44 BlockReadDuration:0s} pebble-compaction, non-latency: {BlockBytes:88 BlockBytesInCache:44 BlockReadDuration:10ms} @@ -271,6 +282,7 @@ Filter utility: 0.0% Ingestions: 0 as flushable: 0 (0B in 0 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} b, latency: {BlockBytes:44 BlockBytesInCache:0 BlockReadDuration:10ms} c, non-latency: {BlockBytes:44 BlockBytesInCache:44 BlockReadDuration:0s} pebble-compaction, non-latency: {BlockBytes:88 BlockBytesInCache:44 BlockReadDuration:10ms} @@ -343,6 +355,7 @@ Filter utility: 0.0% Ingestions: 0 as flushable: 0 (0B in 0 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} b, latency: {BlockBytes:44 BlockBytesInCache:0 BlockReadDuration:10ms} c, non-latency: {BlockBytes:44 BlockBytesInCache:44 BlockReadDuration:0s} pebble-compaction, non-latency: {BlockBytes:88 BlockBytesInCache:44 BlockReadDuration:10ms} @@ -399,6 +412,7 @@ Filter utility: 0.0% Ingestions: 0 as flushable: 0 (0B in 0 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} b, latency: {BlockBytes:44 BlockBytesInCache:0 BlockReadDuration:10ms} c, non-latency: {BlockBytes:44 BlockBytesInCache:44 BlockReadDuration:0s} pebble-compaction, non-latency: {BlockBytes:301 BlockBytesInCache:44 BlockReadDuration:60ms} @@ -504,6 +518,7 @@ Filter utility: 0.0% Ingestions: 2 as flushable: 2 (1.7KB in 3 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} b, latency: {BlockBytes:44 BlockBytesInCache:0 BlockReadDuration:10ms} c, non-latency: {BlockBytes:44 BlockBytesInCache:44 BlockReadDuration:0s} pebble-compaction, non-latency: {BlockBytes:301 BlockBytesInCache:44 BlockReadDuration:60ms} @@ -568,6 +583,7 @@ Filter utility: 0.0% Ingestions: 2 as flushable: 2 (1.7KB in 3 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} b, latency: {BlockBytes:44 BlockBytesInCache:0 BlockReadDuration:10ms} c, non-latency: {BlockBytes:44 BlockBytesInCache:44 BlockReadDuration:0s} pebble-compaction, non-latency: {BlockBytes:301 BlockBytesInCache:44 BlockReadDuration:60ms} @@ -646,6 +662,7 @@ Filter utility: 0.0% Ingestions: 3 as flushable: 2 (1.7KB in 3 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} b, latency: {BlockBytes:44 BlockBytesInCache:0 BlockReadDuration:10ms} c, non-latency: {BlockBytes:44 BlockBytesInCache:44 BlockReadDuration:0s} pebble-compaction, non-latency: {BlockBytes:301 BlockBytesInCache:44 BlockReadDuration:60ms} @@ -749,6 +766,7 @@ Filter utility: 0.0% Ingestions: 4 as flushable: 2 (1.7KB in 3 tables) Cgo memory usage: 0B block cache: 0B (data: 0B, maps: 0B, entries: 0B) memtables: 0B Iter category stats: + a, non-latency: {BlockBytes:0 BlockBytesInCache:0 BlockReadDuration:0s} b, latency: {BlockBytes:44 BlockBytesInCache:0 BlockReadDuration:10ms} c, non-latency: {BlockBytes:44 BlockBytesInCache:44 BlockReadDuration:0s} pebble-compaction, non-latency: {BlockBytes:677 BlockBytesInCache:376 BlockReadDuration:70ms}