Skip to content

Commit

Permalink
pebble: Collect Pebble Key Statistics
Browse files Browse the repository at this point in the history
Created a new function `ScanStatistics` that returns counts of the
different key kinds in Pebble (by level) as well as the number of
snapshot keys. Also modified `ScanInternal` to surface the level of each
key within each visitor function.

Informs: #1996
  • Loading branch information
raggar committed Aug 1, 2023
1 parent 9c48749 commit f3b488f
Show file tree
Hide file tree
Showing 7 changed files with 556 additions and 83 deletions.
91 changes: 85 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,21 +1178,28 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
func (d *DB) ScanInternal(
ctx context.Context,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue) error,
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *SharedSSTMeta) error,
includeObsoleteKeys bool,
) error {
iter := d.newInternalIter(nil /* snapshot */, &scanInternalOptions{
scanInternalOpts := &scanInternalOptions{
visitPointKey: visitPointKey,
visitRangeDel: visitRangeDel,
visitRangeKey: visitRangeKey,
visitSharedFile: visitSharedFile,
skipSharedLevels: visitSharedFile != nil,
includeObsoleteKeys: includeObsoleteKeys,
IterOptions: IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
UpperBound: upper,
},
skipSharedLevels: visitSharedFile != nil,
})
}
iter := d.newInternalIter(nil /* snapshot */, scanInternalOpts)
defer iter.close()
return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
}

// newInternalIter constructs and returns a new scanInternalIterator on this db.
Expand Down Expand Up @@ -1231,6 +1238,7 @@ func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalI
newIters: d.newIters,
newIterRangeKey: d.tableNewRangeKeyIter,
seqNum: seqNum,
mergingIter: &buf.merging,
}
if o != nil {
dbi.opts = *o
Expand Down Expand Up @@ -2021,7 +2029,6 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) {
continue
}

destTables[j] = SSTableInfo{TableInfo: m.TableInfo()}
if opt.withProperties {
p, err := d.tableCache.getTableProperties(
Expand Down Expand Up @@ -2644,6 +2651,78 @@ func (d *DB) SetCreatorID(creatorID uint64) error {
return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
}

// KeyStatistics keeps track of the number of keys that have been pinned by a
// snapshot as well as counts of the different key kinds in the lsm.
type KeyStatistics struct {
// when a compaction determines a key is obsolete, but cannot elide the key
// because it's required by an open snapshot.
snapshotPinnedKeys int
// the total number of bytes of all snapshot pinned keys.
snapshotPinnedKeysBytes uint64
// Note: these fields are currently only populated for point keys (including range deletes).
kindsCount [InternalKeyKindMax + 1]int
}

// LSMKeyStatistics is used by DB.ScanStatistics.
type LSMKeyStatistics struct {
accumulated KeyStatistics
levels [numLevels]KeyStatistics
bytesRead uint64
}

// ScanStatistics returns the count of different key kinds within the lsm for a
// key span [lower, upper) as well as the number of snapshot keys.
func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (LSMKeyStatistics, error) {
stats := LSMKeyStatistics{}
var prevKey InternalKey

err := d.ScanInternal(ctx, lower, upper,
func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
// If the previous key is equal to the current point key, the current key was
// pinned by a snapshot.
size := uint64(key.Size())
kind := key.Kind()
if iterInfo.kind != IteratorLevelFlushable && d.equal(prevKey.UserKey, key.UserKey) {
if iterInfo.kind == IteratorLevelLSM {
stats.levels[iterInfo.Level].snapshotPinnedKeys++
stats.levels[iterInfo.Level].snapshotPinnedKeysBytes += size
}
stats.accumulated.snapshotPinnedKeys++
stats.accumulated.snapshotPinnedKeysBytes += size
}
if iterInfo.kind == IteratorLevelLSM {
stats.levels[iterInfo.Level].kindsCount[kind]++
}

stats.accumulated.kindsCount[kind]++
prevKey.CopyFrom(*key)
stats.bytesRead += uint64(key.Size() + value.Len())
return nil
},
func(start, end []byte, seqNum uint64) error {
stats.accumulated.kindsCount[InternalKeyKindRangeDelete]++
stats.bytesRead += uint64(len(start) + len(end))
return nil
},
func(start, end []byte, keys []rangekey.Key) error {
stats.bytesRead += uint64(len(start) + len(end))
for _, key := range keys {
stats.accumulated.kindsCount[key.Kind()]++
stats.bytesRead += uint64(len(key.Value) + len(key.Suffix))
}
return nil
},
nil,
true,
)

if err != nil {
return LSMKeyStatistics{}, err
}

return stats, nil
}

// ObjProvider returns the objstorage.Provider for this database. Meant to be
// used for internal purposes only.
func (d *DB) ObjProvider() objstorage.Provider {
Expand Down
51 changes: 28 additions & 23 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,29 +905,34 @@ func TestIngestShared(t *testing.T) {
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), startKey, endKey, func(key *InternalKey, value LazyValue) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val))
return nil
}, func(start, end []byte, seqNum uint64) error {
require.NoError(t, w.DeleteRange(start, end))
return nil
}, func(start, end []byte, keys []keyspan.Key) error {
s := keyspan.Span{
Start: start,
End: end,
Keys: keys,
KeysOrder: 0,
}
require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
}))
return nil
}, func(sst *SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
return nil
})
err = from.ScanInternal(context.TODO(), startKey, endKey,
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val))
return nil
},
func(start, end []byte, seqNum uint64) error {
require.NoError(t, w.DeleteRange(start, end))
return nil
},
func(start, end []byte, keys []keyspan.Key) error {
s := keyspan.Span{
Start: start,
End: end,
Keys: keys,
KeysOrder: 0,
}
require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
}))
return nil
},
func(sst *SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
return nil
},
false)
require.NoError(t, err)
require.NoError(t, w.Close())

Expand Down
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
)
Expand Down Expand Up @@ -246,9 +247,18 @@ func (o *IterOptions) SpanIterOptions(level manifest.Level) keyspan.SpanIterOpti
type scanInternalOptions struct {
IterOptions

visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error
visitRangeDel func(start, end []byte, seqNum uint64) error
visitRangeKey func(start, end []byte, keys []rangekey.Key) error
visitSharedFile func(sst *SharedSSTMeta) error

// skipSharedLevels skips levels that are shareable (level >=
// sharedLevelStart).
skipSharedLevels bool

// includeObsoleteKeys specifies whether keys shadowed by newer internal keys
// are exposed. If false, only one internal key per user key is exposed.
includeObsoleteKeys bool
}

// RangeKeyMasking configures automatic hiding of point keys by range keys. A
Expand Down
Loading

0 comments on commit f3b488f

Please sign in to comment.