Skip to content

Commit

Permalink
pebble: Collect Pebble Key Statistics
Browse files Browse the repository at this point in the history
Created a new function `ScanStatistics` that returns counts of the
different key kinds in Pebble (by level) as well as the number of
snapshot keys. Also modified `ScanInternal` to surface the level of each
key within each visitor function.

Informs: #1996
  • Loading branch information
raggar committed Jul 21, 2023
1 parent fb76a24 commit 6f123ec
Show file tree
Hide file tree
Showing 7 changed files with 527 additions and 102 deletions.
100 changes: 83 additions & 17 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,23 +1171,21 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
// resulting in some lower level SSTs being on non-shared storage. Skip-shared
// iteration is invalid in those cases.
func (d *DB) ScanInternal(
ctx context.Context,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *SharedSSTMeta) error,
ctx context.Context, lower, upper []byte, scanInternalOpts scanInternalIterOptions,
) error {
iter := d.newInternalIter(nil /* snapshot */, &scanInternalOptions{
IterOptions: IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
UpperBound: upper,
},
skipSharedLevels: visitSharedFile != nil,
})
scanInternalOpts.skipSharedLevels = scanInternalOpts.visitSharedFile != nil
scanInternalOpts.IterOptions = IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
UpperBound: upper,
}
iter := d.newInternalIter(nil /* snapshot */, &scanInternalOpts)
defer iter.close()
return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
return scanInternalImpl(ctx, lower, upper, iter,
scanInternalOpts.visitPointKey,
scanInternalOpts.visitRangeDel,
scanInternalOpts.visitRangeKey,
scanInternalOpts.visitSharedFile)
}

// newInternalIter constructs and returns a new scanInternalIterator on this db.
Expand All @@ -1197,7 +1195,7 @@ func (d *DB) ScanInternal(
// TODO(bilal): This method has a lot of similarities with db.newIter as well as
// finishInitializingIter. Both pairs of methods should be refactored to reduce
// this duplication.
func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalIterator {
func (d *DB) newInternalIter(s *Snapshot, o *scanInternalIterOptions) *scanInternalIterator {
if err := d.closed.Load(); err != nil {
panic(err)
}
Expand Down Expand Up @@ -1226,6 +1224,7 @@ func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalI
newIters: d.newIters,
newIterRangeKey: d.tableNewRangeKeyIter,
seqNum: seqNum,
mergingIter: &buf.merging,
}
if o != nil {
dbi.opts = *o
Expand Down Expand Up @@ -1988,7 +1987,6 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) {
continue
}

destTables[j] = SSTableInfo{TableInfo: m.TableInfo()}
if opt.withProperties {
p, err := d.tableCache.getTableProperties(
Expand Down Expand Up @@ -2571,6 +2569,74 @@ func (d *DB) SetCreatorID(creatorID uint64) error {
return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
}

// KeyStatistics keeps track of the number of keys that have been pinned by a
// snapshot as well as counts of the different key kinds in the lsm.
type KeyStatistics struct {
compactionPinnedCount int
kindsCount [InternalKeyKindMax + 1]int
}

// LSMKeyStatistics is used by DB.ScanStatistics.
type LSMKeyStatistics struct {
accumulated KeyStatistics
levels map[int]*KeyStatistics
}

// ScanStatistics returns the count of different key kinds within the lsm for a
// key span [lower, upper) as well as the number of snapshot keys.
func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (*LSMKeyStatistics, error) {
stats := &LSMKeyStatistics{}
stats.accumulated = KeyStatistics{}
stats.levels = make(map[int]*KeyStatistics)
for lvl := 0; lvl < numLevels; lvl++ {
stats.levels[lvl] = &KeyStatistics{}
}
var prevKey *InternalKey

err := d.ScanInternal(ctx, lower, upper, scanInternalIterOptions{
visitPointKey: func(key *InternalKey, value LazyValue, iterInfo *iterInfo) error {
if iterInfo == nil || iterInfo.level == -1 {
return nil
}
// If the previous key is equal to the current point key, the current key was
// pinned by a snapshot.
if prevKey != nil && d.cmp(prevKey.UserKey, key.UserKey) == 0 {
stats.levels[iterInfo.level].compactionPinnedCount++
stats.accumulated.compactionPinnedCount++
}
stats.levels[iterInfo.level].kindsCount[key.Kind()]++
stats.accumulated.kindsCount[key.Kind()]++
prevKey = key
return nil
},
visitRangeDel: func(start, end []byte, seqNum uint64, iterInfo *iterInfo) error {
if iterInfo == nil || iterInfo.level == -1 {
return nil
}
stats.levels[iterInfo.level].kindsCount[InternalKeyKindRangeDelete]++
stats.accumulated.kindsCount[InternalKeyKindRangeDelete]++
return nil
},
visitRangeKey: func(start, end []byte, keys []rangekey.Key, iterInfo *iterInfo) error {
if iterInfo == nil || iterInfo.level == -1 {
return nil
}
for _, key := range keys {
stats.levels[iterInfo.level].kindsCount[key.Kind()]++
stats.accumulated.kindsCount[key.Kind()]++
}
return nil
},
includeObsoleteKeys: true,
})

if err != nil {
return nil, err
}

return stats, nil
}

// ObjProvider returns the objstorage.Provider for this database. Meant to be
// used for internal purposes only.
func (d *DB) ObjProvider() objstorage.Provider {
Expand Down
53 changes: 30 additions & 23 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,29 +905,36 @@ func TestIngestShared(t *testing.T) {
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), startKey, endKey, func(key *InternalKey, value LazyValue) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val))
return nil
}, func(start, end []byte, seqNum uint64) error {
require.NoError(t, w.DeleteRange(start, end))
return nil
}, func(start, end []byte, keys []keyspan.Key) error {
s := keyspan.Span{
Start: start,
End: end,
Keys: keys,
KeysOrder: 0,
}
require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
}))
return nil
}, func(sst *SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
return nil
})
ops := scanInternalIterOptions{
visitPointKey: func(key *InternalKey, value LazyValue, _ *iterInfo) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val))
return nil
},
visitRangeDel: func(start, end []byte, seqNum uint64, _ *iterInfo) error {
require.NoError(t, w.DeleteRange(start, end))
return nil
},
visitRangeKey: func(start, end []byte, keys []keyspan.Key, _ *iterInfo) error {
s := keyspan.Span{
Start: start,
End: end,
Keys: keys,
KeysOrder: 0,
}
require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
}))
return nil
},
visitSharedFile: func(sst *SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
return nil
},
includeObsoleteKeys: false,
}
err = from.ScanInternal(context.TODO(), startKey, endKey, ops)
require.NoError(t, err)
require.NoError(t, w.Close())

Expand Down
12 changes: 11 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/objstorage/shared"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
)
Expand Down Expand Up @@ -243,12 +244,21 @@ func (o *IterOptions) SpanIterOptions(level manifest.Level) keyspan.SpanIterOpti

// scanInternalOptions is similar to IterOptions, meant for use with
// scanInternalIterator.
type scanInternalOptions struct {
type scanInternalIterOptions struct {
IterOptions

visitPointKey func(key *InternalKey, value LazyValue, iterInfo *iterInfo) error
visitRangeDel func(start, end []byte, seqNum uint64, iterInfo *iterInfo) error
visitRangeKey func(start, end []byte, keys []rangekey.Key, iterInfo *iterInfo) error
visitSharedFile func(sst *SharedSSTMeta) error

// skipSharedLevels skips levels that are shareable (level >=
// sharedLevelStart).
skipSharedLevels bool

// includeObsoleteKeys when False indicates that a pointCollapsingIter will be
// used.
includeObsoleteKeys bool
}

// RangeKeyMasking configures automatic hiding of point keys by range keys. A
Expand Down
Loading

0 comments on commit 6f123ec

Please sign in to comment.