Skip to content

Commit

Permalink
db: avoid repeated lookups into IterStatsCollector's map
Browse files Browse the repository at this point in the history
When creating a top-level *pebble.Iterator, look up the appropriate categorized
stats struct once and propagate it to all sstable iterators constructed by the
Iterator, rather than looking up on every sstable iterator Close.

Recent experiments showed significant time spent in the statsMap Load:

```
  2950.28s    4.24hrs (flat, cum)  3.70% of Total
   110.55s    110.56s    144:func (c *CategoryStatsCollector) reportStats(
         .          .    145:	p uint64, category Category, qosLevel QoSLevel, stats CategoryStats,
         .          .    146:) {
   129.36s    1.30hrs    147:	v, ok := c.statsMap.Load(category)
    91.69s     91.72s    148:	if !ok {
         .          .    149:		c.mu.Lock()
         .          .    150:		v, _ = c.statsMap.LoadOrStore(category, &shardedCategoryStats{
         .          .    151:			Category: category,
         .          .    152:			QoSLevel: qosLevel,
         .          .    153:		})
         .          .    154:		c.mu.Unlock()
         .          .    155:	}
         .          .    156:
    93.55s     93.56s    157:	shardedStats := v.(*shardedCategoryStats)
  2338.74s   2339.56s    158:	s := ((p * 25214903917) >> 32) & (numCategoryStatsShards - 1)
    40.75s    1.70hrs    159:	shardedStats.shards[s].mu.Lock()
    52.07s    295.26s    160:	shardedStats.shards[s].stats.aggregate(stats)
    45.31s   1465.73s    161:	shardedStats.shards[s].mu.Unlock()
    48.26s     48.27s    162:}
```

With this change, it's limited to 2.44% across the per-Iterator acquistion of
an Accumulator and the per-sstable iterator reporting of the iterator stats:
```
Active filters:
   focus=Accumulat
Showing nodes accounting for 2.78hrs, 2.44% of 114.22hrs total
Dropped 103 nodes (cum <= 0.57hrs)
Showing top 20 nodes out of 48
      flat  flat%   sum%        cum   cum%
   0.81hrs  0.71%  0.71%    1.31hrs  1.15%  github.com/cockroachdb/pebble/sstable.(*categoryStatsWithMu).Accumulate
   0.39hrs  0.34%  1.05%    1.46hrs  1.28%  github.com/cockroachdb/pebble/sstable.(*CategoryStatsCollector).Accumulator
   0.37hrs  0.32%  1.37%    0.72hrs  0.63%  runtime.mapaccess2
   0.34hrs   0.3%  1.67%    0.36hrs  0.31%  sync.(*Mutex).Unlock (inline)
   0.20hrs  0.18%  1.84%    0.20hrs  0.18%  sync.(*entry).load (inline)
   0.10hrs 0.088%  1.93%    1.08hrs  0.94%  sync.(*Map).Load
   0.08hrs 0.069%  2.00%    0.10hrs 0.089%  sync.(*Mutex).lockSlow
   0.06hrs 0.056%  2.06%    0.07hrs 0.063%  runtime.strequal
   0.06hrs 0.056%  2.11%    0.11hrs 0.092%  runtime.typehash
   0.06hrs 0.049%  2.16%    0.17hrs  0.15%  runtime.nilinterhash
   0.05hrs 0.047%  2.21%    0.05hrs 0.048%  sync.(*Map).loadReadOnly (inline)
   0.04hrs 0.037%  2.25%    0.11hrs   0.1%  runtime.efaceeq
   0.04hrs 0.034%  2.28%    0.15hrs  0.13%  runtime.nilinterequal
   0.03hrs 0.027%  2.31%    1.35hrs  1.18%  github.com/cockroachdb/pebble/sstable.(*iterStatsAccumulator).close
   0.03hrs 0.027%  2.33%    0.03hrs 0.027%  github.com/cockroachdb/pebble/sstable.(*iterStatsAccumulator).init
   0.03hrs 0.025%  2.36%    0.03hrs 0.025%  aeshashbody
   0.02hrs 0.022%  2.38%    0.13hrs  0.11%  sync.(*Mutex).Lock (inline)
   0.02hrs  0.02%  2.40%    0.02hrs  0.02%  github.com/cockroachdb/pebble/sstable.(*iterStatsAccumulator).reportStats
   0.02hrs  0.02%  2.42%    0.02hrs  0.02%  github.com/cockroachdb/pebble/sstable.(*CategoryStats).aggregate (inline)
   0.02hrs 0.014%  2.44%    0.02hrs 0.014%  runtime.add (inline)
```
  • Loading branch information
jbowens committed Oct 24, 2024
1 parent cd6ba1c commit d2480d7
Show file tree
Hide file tree
Showing 19 changed files with 147 additions and 121 deletions.
38 changes: 26 additions & 12 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/arenaskl"
Expand Down Expand Up @@ -1037,20 +1038,20 @@ type newIterOpts struct {
// newIter constructs a new iterator, merging in batch iterators as an extra
// level.
func (d *DB) newIter(
ctx context.Context, batch *Batch, internalOpts newIterOpts, o *IterOptions,
ctx context.Context, batch *Batch, newIterOpts newIterOpts, o *IterOptions,
) *Iterator {
if internalOpts.batch.batchOnly {
if newIterOpts.batch.batchOnly {
if batch == nil {
panic("batchOnly is true, but batch is nil")
}
if internalOpts.snapshot.vers != nil {
if newIterOpts.snapshot.vers != nil {
panic("batchOnly is true, but snapshotIterOpts is initialized")
}
}
if err := d.closed.Load(); err != nil {
panic(err)
}
seqNum := internalOpts.snapshot.seqNum
seqNum := newIterOpts.snapshot.seqNum
if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
}
Expand All @@ -1064,21 +1065,21 @@ func (d *DB) newIter(
var readState *readState
var newIters tableNewIters
var newIterRangeKey keyspanimpl.TableNewSpanIter
if !internalOpts.batch.batchOnly {
if !newIterOpts.batch.batchOnly {
// Grab and reference the current readState. This prevents the underlying
// files in the associated version from being deleted if there is a current
// compaction. The readState is unref'd by Iterator.Close().
if internalOpts.snapshot.vers == nil {
if internalOpts.snapshot.readState != nil {
readState = internalOpts.snapshot.readState
if newIterOpts.snapshot.vers == nil {
if newIterOpts.snapshot.readState != nil {
readState = newIterOpts.snapshot.readState
readState.ref()
} else {
// NB: loadReadState() calls readState.ref().
readState = d.loadReadState()
}
} else {
// vers != nil
internalOpts.snapshot.vers.Ref()
newIterOpts.snapshot.vers.Ref()
}

// Determine the seqnum to read at after grabbing the read state (current and
Expand All @@ -1100,15 +1101,16 @@ func (d *DB) newIter(
merge: d.merge,
comparer: *d.opts.Comparer,
readState: readState,
version: internalOpts.snapshot.vers,
version: newIterOpts.snapshot.vers,
keyBuf: buf.keyBuf,
prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
boundsBuf: buf.boundsBuf,
batch: batch,
tc: d.tableCache,
newIters: newIters,
newIterRangeKey: newIterRangeKey,
seqNum: seqNum,
batchOnlyIter: internalOpts.batch.batchOnly,
batchOnlyIter: newIterOpts.batch.batchOnly,
}
if o != nil {
dbi.opts = *o
Expand Down Expand Up @@ -1399,7 +1401,19 @@ func (i *Iterator) constructPointIter(
// Already have one.
return
}
internalOpts := internalIterOpts{stats: &i.stats.InternalStats}
internalOpts := internalIterOpts{
stats: &i.stats.InternalStats,
}
// If the table cache has a sstable stats collector, ask it for an
// accumulator for this iterator's configured category and QoS. All SSTable
// iterators created by this Iterator will accumulate their stats to it as
// they Close during iteration.
if collector := i.tc.dbOpts.sstStatsCollector; collector != nil {
internalOpts.iterStatsAccumulator = collector.Accumulator(
uint64(uintptr(unsafe.Pointer(i))),
i.opts.CategoryAndQoS,
)
}
if i.opts.RangeKeyMasking.Filter != nil {
internalOpts.boundLimitedFilter = &i.rangeKeyMasking
}
Expand Down
7 changes: 6 additions & 1 deletion external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
}
mlevels := it.alloc.mlevels[:0]

// TODO(jackson): External iterators never provide categorized iterator
// stats today because they exist outside the context of a *DB. If the
// sstables being read are on the physical filesystem, we may still want to
// thread a CategoryStatsCollector through so that we collect their stats.

if len(it.externalReaders) > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, len(it.externalReaders))
}
Expand All @@ -159,7 +164,7 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
pointIter, err = r.NewPointIter(
ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
sstable.NeverUseFilterBlock,
&it.stats.InternalStats, it.opts.CategoryAndQoS, nil,
&it.stats.InternalStats, nil,
sstable.MakeTrivialReaderProvider(r))
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions internal/crdbtest/crdb_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func benchmarkRandSeekInSST(
rp := sstable.MakeTrivialReaderProvider(reader)
iter, err := reader.NewPointIter(
ctx, sstable.NoTransforms, nil, nil, nil, sstable.NeverUseFilterBlock,
&stats, sstable.CategoryAndQoS{}, nil, rp)
&stats, nil, rp)
require.NoError(b, err)
n := 0
for kv := iter.First(); kv != nil; kv = iter.Next() {
Expand All @@ -133,7 +133,7 @@ func benchmarkRandSeekInSST(
key := queryKeys[i%numQueryKeys]
iter, err := reader.NewPointIter(
ctx, sstable.NoTransforms, nil, nil, nil, sstable.NeverUseFilterBlock,
&stats, sstable.CategoryAndQoS{}, nil, rp)
&stats, nil, rp)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 2 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type Iterator struct {
// and SetOptions or when re-fragmenting a batch's range keys/range dels.
// Non-nil if this Iterator includes a Batch.
batch *Batch
tc *tableCacheContainer
newIters tableNewIters
newIterRangeKey keyspanimpl.TableNewSpanIter
lazyCombinedIter lazyCombinedIter
Expand Down Expand Up @@ -2866,6 +2867,7 @@ func (i *Iterator) CloneWithContext(ctx context.Context, opts CloneOptions) (*It
boundsBuf: buf.boundsBuf,
batch: i.batch,
batchSeqNum: i.batchSeqNum,
tc: i.tc,
newIters: i.newIters,
newIterRangeKey: i.newIterRangeKey,
seqNum: i.seqNum,
Expand Down
9 changes: 5 additions & 4 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ type internalIterOpts struct {
// if compaction is set, sstable-level iterators will be created using
// NewCompactionIter; these iterators have a more constrained interface
// and are optimized for the sequential scan of a compaction.
compaction bool
bufferPool *sstable.BufferPool
stats *base.InternalIteratorStats
boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
compaction bool
bufferPool *sstable.BufferPool
stats *base.InternalIteratorStats
iterStatsAccumulator sstable.IterStatsAccumulator
boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
}

// levelIter provides a merged view of the sstables in a level.
Expand Down
2 changes: 1 addition & 1 deletion level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (lt *levelIterTest) newIters(
if kinds.Point() {
iter, err := lt.readers[file.FileNum].NewPointIter(
ctx, transforms,
opts.LowerBound, opts.UpperBound, nil, sstable.AlwaysUseFilterBlock, iio.stats, sstable.CategoryAndQoS{},
opts.LowerBound, opts.UpperBound, nil, sstable.AlwaysUseFilterBlock, iio.stats,
nil, sstable.MakeTrivialReaderProvider(lt.readers[file.FileNum]))
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
Expand Down
2 changes: 1 addition & 1 deletion merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestMergingIterDataDriven(t *testing.T) {
context.Background(),
sstable.NoTransforms,
opts.GetLowerBound(), opts.GetUpperBound(), nil, sstable.AlwaysUseFilterBlock, iio.stats,
sstable.CategoryAndQoS{}, nil, sstable.MakeTrivialReaderProvider(r))
nil, sstable.MakeTrivialReaderProvider(r))
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
}
Expand Down
4 changes: 2 additions & 2 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ func TestBlockProperties(t *testing.T) {
iter, err := r.NewPointIter(
context.Background(),
NoTransforms, lower, upper, filterer, NeverUseFilterBlock, &stats,
CategoryAndQoS{}, nil, MakeTrivialReaderProvider(r))
nil, MakeTrivialReaderProvider(r))
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -1072,7 +1072,7 @@ func TestBlockProperties_BoundLimited(t *testing.T) {
iter, err := r.NewPointIter(
context.Background(),
NoTransforms, lower, upper, filterer, NeverUseFilterBlock, &stats,
CategoryAndQoS{}, nil, MakeTrivialReaderProvider(r))
nil, MakeTrivialReaderProvider(r))
if err != nil {
return err.Error()
}
Expand Down
71 changes: 38 additions & 33 deletions sstable/category_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ type categoryStatsWithMu struct {
stats CategoryStats
}

// Accumulate implements the IterStatsAccumulator interface.
func (c *categoryStatsWithMu) Accumulate(stats CategoryStats) {
c.mu.Lock()
c.stats.aggregate(stats)
c.mu.Unlock()
}

// CategoryStatsCollector collects and aggregates the stats per category.
type CategoryStatsCollector struct {
// mu protects additions to statsMap.
Expand Down Expand Up @@ -141,24 +148,23 @@ func (s *shardedCategoryStats) getStats() CategoryStatsAggregate {
return agg
}

func (c *CategoryStatsCollector) reportStats(
p uint64, category Category, qosLevel QoSLevel, stats CategoryStats,
) {
v, ok := c.statsMap.Load(category)
// Accumulator returns a stats accumulator for the given category. The provided
// p is used to detrmine which shard to write stats to.
func (c *CategoryStatsCollector) Accumulator(p uint64, caq CategoryAndQoS) IterStatsAccumulator {
v, ok := c.statsMap.Load(caq.Category)
if !ok {
c.mu.Lock()
v, _ = c.statsMap.LoadOrStore(category, &shardedCategoryStats{
Category: category,
QoSLevel: qosLevel,
v, _ = c.statsMap.LoadOrStore(caq.Category, &shardedCategoryStats{
Category: caq.Category,
QoSLevel: caq.QoSLevel,
})
c.mu.Unlock()
}

shardedStats := v.(*shardedCategoryStats)
s := ((p * 25214903917) >> 32) & (numCategoryStatsShards - 1)
shardedStats.shards[s].mu.Lock()
shardedStats.shards[s].stats.aggregate(stats)
shardedStats.shards[s].mu.Unlock()
s := v.(*shardedCategoryStats)
// This equation is taken from:
// https://en.wikipedia.org/wiki/Linear_congruential_generator#Parameters_in_common_use
shard := ((p * 25214903917) >> 32) & (numCategoryStatsShards - 1)
return &s.shards[shard].categoryStatsWithMu
}

// GetStats returns the aggregated stats.
Expand All @@ -178,34 +184,33 @@ func (c *CategoryStatsCollector) GetStats() []CategoryStatsAggregate {
return stats
}

// iterStatsAccumulator is a helper for a sstable iterator to accumulate
// stats, which are reported to the CategoryStatsCollector when the
// accumulator is closed.
type IterStatsAccumulator interface {
// Accumulate accumulates the provided stats.
Accumulate(cas CategoryStats)
}

// iterStatsAccumulator is a helper for a sstable iterator to accumulate stats,
// which are reported to the CategoryStatsCollector when the accumulator is
// closed.
type iterStatsAccumulator struct {
Category
QoSLevel
stats CategoryStats
collector *CategoryStatsCollector
stats CategoryStats
parent IterStatsAccumulator
}

func (accum *iterStatsAccumulator) init(
categoryAndQoS CategoryAndQoS, collector *CategoryStatsCollector,
) {
accum.Category = categoryAndQoS.Category
accum.QoSLevel = categoryAndQoS.QoSLevel
accum.collector = collector
func (a *iterStatsAccumulator) init(parent IterStatsAccumulator) {
a.parent = parent
}

func (accum *iterStatsAccumulator) reportStats(
func (a *iterStatsAccumulator) reportStats(
blockBytes, blockBytesInCache uint64, blockReadDuration time.Duration,
) {
accum.stats.BlockBytes += blockBytes
accum.stats.BlockBytesInCache += blockBytesInCache
accum.stats.BlockReadDuration += blockReadDuration
a.stats.BlockBytes += blockBytes
a.stats.BlockBytesInCache += blockBytesInCache
a.stats.BlockReadDuration += blockReadDuration
}

func (accum *iterStatsAccumulator) close() {
if accum.collector != nil {
accum.collector.reportStats(uint64(uintptr(unsafe.Pointer(accum))), accum.Category, accum.QoSLevel, accum.stats)
func (a *iterStatsAccumulator) close() {
if a.parent != nil {
a.parent.Accumulate(a.stats)
}
}
3 changes: 1 addition & 2 deletions sstable/random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ func runErrorInjectionTest(t *testing.T, seed int64) {
filterer,
filterBlockSizeLimit,
&stats,
CategoryAndQoS{},
nil, /* CategoryStatsCollector */
nil, /* IterStatsAccumulator */
MakeTrivialReaderProvider(r),
)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit d2480d7

Please sign in to comment.