Skip to content

Commit

Permalink
pebble: Collect Pebble Key Statistics
Browse files Browse the repository at this point in the history
Created a new function `ScanStatistics` that returns counts of the
different key kinds in Pebble (by level) as well as the number of
snapshot keys. Also modified `ScanInternal` to surface the level of each
key within each visitor function.

Informs: #1996
  • Loading branch information
raggar committed Jul 27, 2023
1 parent 7ef7553 commit 94afe72
Show file tree
Hide file tree
Showing 7 changed files with 521 additions and 83 deletions.
81 changes: 75 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,21 +1173,28 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
func (d *DB) ScanInternal(
ctx context.Context,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue) error,
visitPointKey func(key *InternalKey, value LazyValue, iterInfo iterInfo) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *SharedSSTMeta) error,
includeObsoleteKeys bool,
) error {
iter := d.newInternalIter(nil /* snapshot */, &scanInternalOptions{
scanInternalOpts := &scanInternalOptions{
visitPointKey: visitPointKey,
visitRangeDel: visitRangeDel,
visitRangeKey: visitRangeKey,
visitSharedFile: visitSharedFile,
skipSharedLevels: visitSharedFile != nil,
includeObsoleteKeys: includeObsoleteKeys,
IterOptions: IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
UpperBound: upper,
},
skipSharedLevels: visitSharedFile != nil,
})
}
iter := d.newInternalIter(nil /* snapshot */, scanInternalOpts)
defer iter.close()
return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
}

// newInternalIter constructs and returns a new scanInternalIterator on this db.
Expand Down Expand Up @@ -1226,6 +1233,7 @@ func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalI
newIters: d.newIters,
newIterRangeKey: d.tableNewRangeKeyIter,
seqNum: seqNum,
mergingIter: &buf.merging,
}
if o != nil {
dbi.opts = *o
Expand Down Expand Up @@ -1988,7 +1996,6 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) {
continue
}

destTables[j] = SSTableInfo{TableInfo: m.TableInfo()}
if opt.withProperties {
p, err := d.tableCache.getTableProperties(
Expand Down Expand Up @@ -2573,6 +2580,68 @@ func (d *DB) SetCreatorID(creatorID uint64) error {
return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
}

// KeyStatistics keeps track of the number of keys that have been pinned by a
// snapshot as well as counts of the different key kinds in the lsm.
type KeyStatistics struct {
// snapshotPinnedKeys represents the number of duplicate keys per sstable.
// This occurs when a compaction tries to compact ta key but can't due to it being pinned by an open snapshot.
snapshotPinnedKeys int
// the total number of bytes of all snapshot pinned keys.
snapshotPinnedKeysBytes uint64
kindsCount [InternalKeyKindMax + 1]int
}

// LSMKeyStatistics is used by DB.ScanStatistics.
type LSMKeyStatistics struct {
accumulated KeyStatistics
levels [numLevels]KeyStatistics
}

// ScanStatistics returns the count of different key kinds within the lsm for a
// key span [lower, upper) as well as the number of snapshot keys.
func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (LSMKeyStatistics, error) {
stats := LSMKeyStatistics{}
var prevKey *InternalKey

err := d.ScanInternal(ctx, lower, upper,
func(key *InternalKey, value LazyValue, iterInfo iterInfo) error {
if iterInfo.level == -1 {
return nil
}
// If the previous key is equal to the current point key, the current key was
// pinned by a snapshot.
if prevKey != nil && d.cmp(prevKey.UserKey, key.UserKey) == 0 {
stats.levels[iterInfo.level].snapshotPinnedKeys++
stats.levels[iterInfo.level].snapshotPinnedKeysBytes += uint64(key.Size())
stats.accumulated.snapshotPinnedKeys++
stats.accumulated.snapshotPinnedKeysBytes += uint64(key.Size())
}
stats.levels[iterInfo.level].kindsCount[key.Kind()]++
stats.accumulated.kindsCount[key.Kind()]++
prevKey = key
return nil
},
func(start, end []byte, seqNum uint64) error {
stats.accumulated.kindsCount[InternalKeyKindRangeDelete]++
return nil
},
func(start, end []byte, keys []rangekey.Key) error {
for _, key := range keys {
stats.accumulated.kindsCount[key.Kind()]++
}
return nil
},
nil,
true,
)

if err != nil {
return LSMKeyStatistics{}, err
}

return stats, nil
}

// ObjProvider returns the objstorage.Provider for this database. Meant to be
// used for internal purposes only.
func (d *DB) ObjProvider() objstorage.Provider {
Expand Down
51 changes: 28 additions & 23 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,29 +905,34 @@ func TestIngestShared(t *testing.T) {
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), startKey, endKey, func(key *InternalKey, value LazyValue) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val))
return nil
}, func(start, end []byte, seqNum uint64) error {
require.NoError(t, w.DeleteRange(start, end))
return nil
}, func(start, end []byte, keys []keyspan.Key) error {
s := keyspan.Span{
Start: start,
End: end,
Keys: keys,
KeysOrder: 0,
}
require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
}))
return nil
}, func(sst *SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
return nil
})
err = from.ScanInternal(context.TODO(), startKey, endKey,
func(key *InternalKey, value LazyValue, _ iterInfo) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val))
return nil
},
func(start, end []byte, seqNum uint64) error {
require.NoError(t, w.DeleteRange(start, end))
return nil
},
func(start, end []byte, keys []keyspan.Key) error {
s := keyspan.Span{
Start: start,
End: end,
Keys: keys,
KeysOrder: 0,
}
require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
}))
return nil
},
func(sst *SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
return nil
},
false)
require.NoError(t, err)
require.NoError(t, w.Close())

Expand Down
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
)
Expand Down Expand Up @@ -246,9 +247,18 @@ func (o *IterOptions) SpanIterOptions(level manifest.Level) keyspan.SpanIterOpti
type scanInternalOptions struct {
IterOptions

visitPointKey func(key *InternalKey, value LazyValue, iterInfo iterInfo) error
visitRangeDel func(start, end []byte, seqNum uint64) error
visitRangeKey func(start, end []byte, keys []rangekey.Key) error
visitSharedFile func(sst *SharedSSTMeta) error

// skipSharedLevels skips levels that are shareable (level >=
// sharedLevelStart).
skipSharedLevels bool

// includeObsoleteKeys specifies whether keys shadowed by newer internal keys
// are exposed. If false, only one internal key per user key is exposed.
includeObsoleteKeys bool
}

// RangeKeyMasking configures automatic hiding of point keys by range keys. A
Expand Down
Loading

0 comments on commit 94afe72

Please sign in to comment.