Skip to content

Commit

Permalink
pebble: Collect Pebble Key Statistics
Browse files Browse the repository at this point in the history
Created a new function `ScanStatistics` that returns counts of the
different key kinds in Pebble (by level) as well as the number of
snapshot keys. Also added two new parameters to `DB.ScanInternal`, to
indicate which level to exclusively scan as well as whether or not to
include obsolete keys.

Informs: #1996
  • Loading branch information
raggar committed Jul 18, 2023
1 parent 168079a commit 6fc8b1a
Show file tree
Hide file tree
Showing 7 changed files with 551 additions and 126 deletions.
104 changes: 78 additions & 26 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/manual"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
Expand Down Expand Up @@ -1170,23 +1169,17 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
// resulting in some lower level SSTs being on non-shared storage. Skip-shared
// iteration is invalid in those cases.
func (d *DB) ScanInternal(
ctx context.Context,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *SharedSSTMeta) error,
ctx context.Context, lower, upper []byte, scanInternalOps scanInternalIterOptions,
) error {
iter := d.newInternalIter(nil /* snapshot */, &scanInternalOptions{
IterOptions: IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
UpperBound: upper,
},
skipSharedLevels: visitSharedFile != nil,
})
scanInternalOps.skipSharedLevels = scanInternalOps.visitSharedFile != nil
scanInternalOps.IterOptions = IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
UpperBound: upper,
}
iter := d.newInternalIter(nil /* snapshot */, &scanInternalOps)
defer iter.close()
return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
return scanInternalImpl(ctx, lower, upper, iter, scanInternalOps.visitPointKey, scanInternalOps.visitRangeDel, scanInternalOps.visitRangeKey, scanInternalOps.visitSharedFile, scanInternalOps.visitKey)
}

// newInternalIter constructs and returns a new scanInternalIterator on this db.
Expand All @@ -1196,7 +1189,7 @@ func (d *DB) ScanInternal(
// TODO(bilal): This method has a lot of similarities with db.newIter as well as
// finishInitializingIter. Both pairs of methods should be refactored to reduce
// this duplication.
func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalIterator {
func (d *DB) newInternalIter(s *Snapshot, o *scanInternalIterOptions) *scanInternalIterator {
if err := d.closed.Load(); err != nil {
panic(err)
}
Expand Down Expand Up @@ -1237,15 +1230,18 @@ func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalI
}

func finishInitializingInternalIter(buf *iterAlloc, i *scanInternalIterator) *scanInternalIterator {
// Short-hand.
memtables := i.readState.memtables
// We only need to read from memtables which contain sequence numbers older
// than seqNum. Trim off newer memtables.
for j := len(memtables) - 1; j >= 0; j-- {
if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
break
var memtables flushableList
if !i.opts.restrictToLevel {
// Short-hand.
memtables = i.readState.memtables
// We only need to read from memtables which contain sequence numbers older
// than seqNum. Trim off newer memtables.
for j := len(memtables) - 1; j >= 0; j-- {
if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
break
}
memtables = memtables[:j]
}
memtables = memtables[:j]
}
i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)

Expand Down Expand Up @@ -1953,7 +1949,6 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) {
continue
}

destTables[j] = SSTableInfo{TableInfo: m.TableInfo()}
if opt.withProperties {
p, err := d.tableCache.getTableProperties(
Expand Down Expand Up @@ -2484,6 +2479,63 @@ func (d *DB) SetCreatorID(creatorID uint64) error {
return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
}

// KeyStatistics keeps track of the number of keys that have been pinned by a
// compaction as well as counts of the different key kinds in the lsm.
type KeyStatistics struct {
compactionPinnedCount int
kindsCount map[string]int
}

// LsmKeyStatistics is used by DB.ScanStatistics.
type LsmKeyStatistics struct {
accumulated *KeyStatistics
levels map[int]*KeyStatistics
}

// ScanStatistics returns the count of different key kinds within the lsm for a
// key span [lower, upper) as well as the number of snapshot keys.
func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (*LsmKeyStatistics, error) {
stats := &LsmKeyStatistics{}
stats.levels = make(map[int]*KeyStatistics)

// statistics per level
for lvl := 0; lvl < numLevels; lvl++ {
stats.levels[lvl] = &KeyStatistics{}
stats.levels[lvl].kindsCount = make(map[string]int)

var prevKey *InternalKey
err := d.ScanInternal(ctx, lower, upper, scanInternalIterOptions{
visitKey: func(key *InternalKey, _ LazyValue) error {
if prevKey != nil && d.cmp(prevKey.UserKey, key.UserKey) == 0 {
stats.levels[lvl].compactionPinnedCount++
}
stats.levels[lvl].kindsCount[key.Kind().String()]++
prevKey = key
return nil
},
includeObsoleteKeys: true,
restrictToLevel: true,
level: lvl,
})

if err != nil {
return nil, err
}
}

// statistics in aggregate across all levels
stats.accumulated = &KeyStatistics{}
stats.accumulated.kindsCount = make(map[string]int)
for lvl := 0; lvl < numLevels; lvl++ {
stats.accumulated.compactionPinnedCount += stats.levels[lvl].compactionPinnedCount
for kind, count := range stats.levels[lvl].kindsCount {
stats.accumulated.kindsCount[kind] += count
}
}

return stats, nil
}

// ObjProvider returns the objstorage.Provider for this database. Meant to be
// used for internal purposes only.
func (d *DB) ObjProvider() objstorage.Provider {
Expand Down
54 changes: 31 additions & 23 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,29 +905,37 @@ func TestIngestShared(t *testing.T) {
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), startKey, endKey, func(key *InternalKey, value LazyValue) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val))
return nil
}, func(start, end []byte, seqNum uint64) error {
require.NoError(t, w.DeleteRange(start, end))
return nil
}, func(start, end []byte, keys []keyspan.Key) error {
s := keyspan.Span{
Start: start,
End: end,
Keys: keys,
KeysOrder: 0,
}
require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
}))
return nil
}, func(sst *SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
return nil
})
ops := scanInternalIterOptions{
visitPointKey: func(key *InternalKey, value LazyValue) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val))
return nil
},
visitRangeDel: func(start, end []byte, seqNum uint64) error {
require.NoError(t, w.DeleteRange(start, end))
return nil
},
visitRangeKey: func(start, end []byte, keys []keyspan.Key) error {
s := keyspan.Span{
Start: start,
End: end,
Keys: keys,
KeysOrder: 0,
}
require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
}))
return nil
},
visitSharedFile: func(sst *SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
return nil
},
restrictToLevel: false,
includeObsoleteKeys: false,
}
err = from.ScanInternal(context.TODO(), startKey, endKey, ops)
require.NoError(t, err)
require.NoError(t, w.Close())

Expand Down
23 changes: 22 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/objstorage/shared"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
)
Expand Down Expand Up @@ -243,12 +244,32 @@ func (o *IterOptions) SpanIterOptions(level manifest.Level) keyspan.SpanIterOpti

// scanInternalOptions is similar to IterOptions, meant for use with
// scanInternalIterator.
type scanInternalOptions struct {
type scanInternalIterOptions struct {
IterOptions

visitPointKey func(key *InternalKey, value LazyValue) error
visitRangeDel func(start, end []byte, seqNum uint64) error
visitRangeKey func(start, end []byte, keys []rangekey.Key) error
visitSharedFile func(sst *SharedSSTMeta) error
// visitKey is called on each key iterated over.
visitKey func(key *InternalKey, value LazyValue) error

// skipSharedLevels skips levels that are shareable (level >=
// sharedLevelStart).
skipSharedLevels bool

// includeObsoleteKeys uses a keyspan.InterleavingIter instead of a
// pointCollapsingIter ensuring that obsolete keys are included during
// iteration.
includeObsoleteKeys bool

// restrictToLevel indicates whether ScanInternal should run on a single level.
// Note: memtables are also skipped when restrictToLevel is enabled.
// This setting should be used with the level option.
restrictToLevel bool

// level indicates the level which should be iterated on during Scan Internal.
level int
}

// RangeKeyMasking configures automatic hiding of point keys by range keys. A
Expand Down
Loading

0 comments on commit 6fc8b1a

Please sign in to comment.