diff --git a/db.go b/db.go index dfa14f10d66..44a1bfa1e1f 100644 --- a/db.go +++ b/db.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/manual" "github.com/cockroachdb/pebble/objstorage" - "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" @@ -1170,23 +1169,17 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator { // resulting in some lower level SSTs being on non-shared storage. Skip-shared // iteration is invalid in those cases. func (d *DB) ScanInternal( - ctx context.Context, - lower, upper []byte, - visitPointKey func(key *InternalKey, value LazyValue) error, - visitRangeDel func(start, end []byte, seqNum uint64) error, - visitRangeKey func(start, end []byte, keys []rangekey.Key) error, - visitSharedFile func(sst *SharedSSTMeta) error, + ctx context.Context, lower, upper []byte, scanInternalOps scanInternalIterOptions, ) error { - iter := d.newInternalIter(nil /* snapshot */, &scanInternalOptions{ - IterOptions: IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: lower, - UpperBound: upper, - }, - skipSharedLevels: visitSharedFile != nil, - }) + scanInternalOps.skipSharedLevels = scanInternalOps.visitSharedFile != nil + scanInternalOps.IterOptions = IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: lower, + UpperBound: upper, + } + iter := d.newInternalIter(nil /* snapshot */, &scanInternalOps) defer iter.close() - return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) + return scanInternalImpl(ctx, lower, upper, iter, scanInternalOps.visitPointKey, scanInternalOps.visitRangeDel, scanInternalOps.visitRangeKey, scanInternalOps.visitSharedFile, scanInternalOps.visitKey) } // newInternalIter constructs and returns a new scanInternalIterator on this db. @@ -1196,7 +1189,7 @@ func (d *DB) ScanInternal( // TODO(bilal): This method has a lot of similarities with db.newIter as well as // finishInitializingIter. Both pairs of methods should be refactored to reduce // this duplication. -func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalIterator { +func (d *DB) newInternalIter(s *Snapshot, o *scanInternalIterOptions) *scanInternalIterator { if err := d.closed.Load(); err != nil { panic(err) } @@ -1237,15 +1230,18 @@ func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalI } func finishInitializingInternalIter(buf *iterAlloc, i *scanInternalIterator) *scanInternalIterator { - // Short-hand. - memtables := i.readState.memtables - // We only need to read from memtables which contain sequence numbers older - // than seqNum. Trim off newer memtables. - for j := len(memtables) - 1; j >= 0; j-- { - if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum { - break + var memtables flushableList + if !i.opts.restrictToLevel { + // Short-hand. + memtables = i.readState.memtables + // We only need to read from memtables which contain sequence numbers older + // than seqNum. Trim off newer memtables. + for j := len(memtables) - 1; j >= 0; j-- { + if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum { + break + } + memtables = memtables[:j] } - memtables = memtables[:j] } i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound) @@ -1953,7 +1949,6 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) { if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) { continue } - destTables[j] = SSTableInfo{TableInfo: m.TableInfo()} if opt.withProperties { p, err := d.tableCache.getTableProperties( @@ -2484,6 +2479,63 @@ func (d *DB) SetCreatorID(creatorID uint64) error { return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID)) } +// KeyStatistics keeps track of the number of keys that have been pinned by a +// compaction as well as counts of the different key kinds in the lsm. +type KeyStatistics struct { + compactionPinnedCount int + kindsCount map[string]int +} + +// LsmKeyStatistics is used by DB.ScanStatistics. +type LsmKeyStatistics struct { + accumulated *KeyStatistics + levels map[int]*KeyStatistics +} + +// ScanStatistics returns the count of different key kinds within the lsm for a +// key span [lower, upper) as well as the number of snapshot keys. +func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (*LsmKeyStatistics, error) { + stats := &LsmKeyStatistics{} + stats.levels = make(map[int]*KeyStatistics) + + // statistics per level + for lvl := 0; lvl < numLevels; lvl++ { + stats.levels[lvl] = &KeyStatistics{} + stats.levels[lvl].kindsCount = make(map[string]int) + + var prevKey *InternalKey + err := d.ScanInternal(ctx, lower, upper, scanInternalIterOptions{ + visitKey: func(key *InternalKey, _ LazyValue) error { + if prevKey != nil && d.cmp(prevKey.UserKey, key.UserKey) == 0 { + stats.levels[lvl].compactionPinnedCount++ + } + stats.levels[lvl].kindsCount[key.Kind().String()]++ + prevKey = key + return nil + }, + includeObsoleteKeys: true, + restrictToLevel: true, + level: lvl, + }) + + if err != nil { + return nil, err + } + } + + // statistics in aggregate across all levels + stats.accumulated = &KeyStatistics{} + stats.accumulated.kindsCount = make(map[string]int) + for lvl := 0; lvl < numLevels; lvl++ { + stats.accumulated.compactionPinnedCount += stats.levels[lvl].compactionPinnedCount + for kind, count := range stats.levels[lvl].kindsCount { + stats.accumulated.kindsCount[kind] += count + } + } + + return stats, nil +} + // ObjProvider returns the objstorage.Provider for this database. Meant to be // used for internal purposes only. func (d *DB) ObjProvider() objstorage.Provider { diff --git a/ingest_test.go b/ingest_test.go index 8374ab5cbb0..0e98f41378d 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -905,29 +905,37 @@ func TestIngestShared(t *testing.T) { w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts) var sharedSSTs []SharedSSTMeta - err = from.ScanInternal(context.TODO(), startKey, endKey, func(key *InternalKey, value LazyValue) error { - val, _, err := value.Value(nil) - require.NoError(t, err) - require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)) - return nil - }, func(start, end []byte, seqNum uint64) error { - require.NoError(t, w.DeleteRange(start, end)) - return nil - }, func(start, end []byte, keys []keyspan.Key) error { - s := keyspan.Span{ - Start: start, - End: end, - Keys: keys, - KeysOrder: 0, - } - require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error { - return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v) - })) - return nil - }, func(sst *SharedSSTMeta) error { - sharedSSTs = append(sharedSSTs, *sst) - return nil - }) + ops := scanInternalIterOptions{ + visitPointKey: func(key *InternalKey, value LazyValue) error { + val, _, err := value.Value(nil) + require.NoError(t, err) + require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)) + return nil + }, + visitRangeDel: func(start, end []byte, seqNum uint64) error { + require.NoError(t, w.DeleteRange(start, end)) + return nil + }, + visitRangeKey: func(start, end []byte, keys []keyspan.Key) error { + s := keyspan.Span{ + Start: start, + End: end, + Keys: keys, + KeysOrder: 0, + } + require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error { + return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v) + })) + return nil + }, + visitSharedFile: func(sst *SharedSSTMeta) error { + sharedSSTs = append(sharedSSTs, *sst) + return nil + }, + restrictToLevel: false, + includeObsoleteKeys: false, + } + err = from.ScanInternal(context.TODO(), startKey, endKey, ops) require.NoError(t, err) require.NoError(t, w.Close()) diff --git a/options.go b/options.go index 1ad8bc61771..1f4895b3218 100644 --- a/options.go +++ b/options.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage/shared" + "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" ) @@ -243,12 +244,32 @@ func (o *IterOptions) SpanIterOptions(level manifest.Level) keyspan.SpanIterOpti // scanInternalOptions is similar to IterOptions, meant for use with // scanInternalIterator. -type scanInternalOptions struct { +type scanInternalIterOptions struct { IterOptions + visitPointKey func(key *InternalKey, value LazyValue) error + visitRangeDel func(start, end []byte, seqNum uint64) error + visitRangeKey func(start, end []byte, keys []rangekey.Key) error + visitSharedFile func(sst *SharedSSTMeta) error + // visitKey is called on each key iterated over. + visitKey func(key *InternalKey, value LazyValue) error + // skipSharedLevels skips levels that are shareable (level >= // sharedLevelStart). skipSharedLevels bool + + // includeObsoleteKeys uses a keyspan.InterleavingIter instead of a + // pointCollapsingIter ensuring that obsolete keys are included during + // iteration. + includeObsoleteKeys bool + + // restrictToLevel indicates whether ScanInternal should run on a single level. + // Note: memtables are also skipped when restrictToLevel is enabled. + // This setting should be used with the level option. + restrictToLevel bool + + // level indicates the level which should be iterated on during Scan Internal. + level int } // RangeKeyMasking configures automatic hiding of point keys by range keys. A diff --git a/scan_internal.go b/scan_internal.go index 28fbb2be2af..8410e057fa9 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -684,13 +684,13 @@ var _ internalIterator = &pointCollapsingIterator{} // *must* return the range delete as well as the range key unset/delete that did // the shadowing. type scanInternalIterator struct { - opts scanInternalOptions + opts scanInternalIterOptions comparer *base.Comparer merge Merge iter internalIterator readState *readState rangeKey *iteratorRangeKeyState - pointKeyIter pointCollapsingIterator + pointKeyIter internalIterator iterKey *InternalKey iterValue LazyValue alloc *iterAlloc @@ -884,6 +884,7 @@ func scanInternalImpl( visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, + visitKey func(key *InternalKey, value LazyValue) error, ) error { if visitSharedFile != nil && (lower == nil || upper == nil) { panic("lower and upper bounds must be specified in skip-shared iteration mode") @@ -934,21 +935,31 @@ func scanInternalImpl( for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() { key := iter.unsafeKey() - switch key.Kind() { case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet: - span := iter.unsafeSpan() - if err := visitRangeKey(span.Start, span.End, span.Keys); err != nil { - return err + if visitRangeKey != nil { + span := iter.unsafeSpan() + if err := visitRangeKey(span.Start, span.End, span.Keys); err != nil { + return err + } } case InternalKeyKindRangeDelete: - rangeDel := iter.unsafeRangeDel() - if err := visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil { - return err + if visitRangeDel != nil { + rangeDel := iter.unsafeRangeDel() + if err := visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil { + return err + } } default: - val := iter.lazyValue() - if err := visitPointKey(key, val); err != nil { + if visitPointKey != nil { + val := iter.lazyValue() + if err := visitPointKey(key, val); err != nil { + return err + } + } + } + if visitKey != nil { + if err := visitKey(key, iter.lazyValue()); err != nil { return err } } @@ -970,8 +981,12 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * numLevelIters := 0 current := i.readState.current - numMergingLevels += len(current.L0SublevelFiles) - numLevelIters += len(current.L0SublevelFiles) + + if !i.opts.restrictToLevel || (i.opts.restrictToLevel && i.opts.level == 0) { + numMergingLevels += len(current.L0SublevelFiles) + numLevelIters += len(current.L0SublevelFiles) + } + for level := 1; level < len(current.Levels); level++ { if current.Levels[level].Empty() { continue @@ -979,6 +994,9 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * if i.opts.skipSharedLevels && level >= sharedLevelsStart { continue } + if i.opts.restrictToLevel && level != i.opts.level { + continue + } numMergingLevels++ numLevelIters++ } @@ -994,7 +1012,7 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels) rangeDelLevels := make([]keyspan.LevelIter, 0, numLevelIters) - // Next are the memtables. + // Next are the memtables (will be skipped if i.opts.level is not nil). for j := len(memtables) - 1; j >= 0; j-- { mem := memtables[j] mlevels = append(mlevels, mergingIterLevel{ @@ -1030,12 +1048,11 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * mlevelsIndex++ } - // Add level iterators for the L0 sublevels, iterating from newest to - // oldest. - for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- { - addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i)) + if !i.opts.restrictToLevel || (i.opts.restrictToLevel && i.opts.level == 0) { + for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- { + addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i)) + } } - // Add level iterators for the non-empty non-L0 levels. for level := 1; level < numLevels; level++ { if current.Levels[level].Empty() { @@ -1044,18 +1061,30 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * if i.opts.skipSharedLevels && level >= sharedLevelsStart { continue } + if i.opts.restrictToLevel && level != i.opts.level { + continue + } addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level)) } + buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...) buf.merging.snapshot = i.seqNum rangeDelMiter.Init(i.comparer.Compare, keyspan.VisibleTransform(i.seqNum), new(keyspan.MergingBuffers), rangeDelIters...) - i.pointKeyIter = pointCollapsingIterator{ - comparer: i.comparer, - merge: i.merge, - seqNum: i.seqNum, + + if i.opts.includeObsoleteKeys { + iiter := &keyspan.InterleavingIter{} + iiter.Init(i.comparer, &buf.merging, &rangeDelMiter, nil, i.opts.LowerBound, i.opts.UpperBound) + i.pointKeyIter = iiter + } else { + pcIter := &pointCollapsingIterator{ + comparer: i.comparer, + merge: i.merge, + seqNum: i.seqNum, + } + pcIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, nil /* mask */, i.opts.LowerBound, i.opts.UpperBound) + i.pointKeyIter = pcIter } - i.pointKeyIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, nil /* mask */, i.opts.LowerBound, i.opts.UpperBound) - i.iter = &i.pointKeyIter + i.iter = i.pointKeyIter } // constructRangeKeyIter constructs the range-key iterator stack, populating @@ -1070,16 +1099,18 @@ func (i *scanInternalIterator) constructRangeKeyIter() { nil /* hasPrefix */, nil /* prefix */, false, /* onlySets */ &i.rangeKey.rangeKeyBuffers.internal) - // Next are the flushables: memtables and large batches. - for j := len(i.readState.memtables) - 1; j >= 0; j-- { - mem := i.readState.memtables[j] - // We only need to read from memtables which contain sequence numbers older - // than seqNum. - if logSeqNum := mem.logSeqNum; logSeqNum >= i.seqNum { - continue - } - if rki := mem.newRangeKeyIter(&i.opts.IterOptions); rki != nil { - i.rangeKey.iterConfig.AddLevel(rki) + if !i.opts.restrictToLevel { + // Next are the flushables: memtables and large batches. + for j := len(i.readState.memtables) - 1; j >= 0; j-- { + mem := i.readState.memtables[j] + // We only need to read from memtables which contain sequence numbers older + // than seqNum. + if logSeqNum := mem.logSeqNum; logSeqNum >= i.seqNum { + continue + } + if rki := mem.newRangeKeyIter(&i.opts.IterOptions); rki != nil { + i.rangeKey.iterConfig.AddLevel(rki) + } } } @@ -1096,14 +1127,16 @@ func (i *scanInternalIterator) constructRangeKeyIter() { // LargestSeqNum ascending, and we need to add them to the merging iterator // in LargestSeqNum descending to preserve the merging iterator's invariants // around Key Trailer order. - iter := current.RangeKeyLevels[0].Iter() - for f := iter.Last(); f != nil; f = iter.Prev() { - spanIter, err := i.newIterRangeKey(f, i.opts.SpanIterOptions(manifest.Level(0))) - if err != nil { - i.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err}) - continue + if !i.opts.restrictToLevel || (i.opts.restrictToLevel && i.opts.level == 0) { + iter := current.RangeKeyLevels[0].Iter() + for f := iter.Last(); f != nil; f = iter.Prev() { + spanIter, err := i.newIterRangeKey(f, i.opts.SpanIterOptions(manifest.Level(0))) + if err != nil { + i.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err}) + continue + } + i.rangeKey.iterConfig.AddLevel(spanIter) } - i.rangeKey.iterConfig.AddLevel(spanIter) } // Add level iterators for the non-empty non-L0 levels. @@ -1114,6 +1147,9 @@ func (i *scanInternalIterator) constructRangeKeyIter() { if i.opts.skipSharedLevels && level >= sharedLevelsStart { continue } + if i.opts.restrictToLevel && level != i.opts.level { + continue + } li := i.rangeKey.iterConfig.NewLevelIter() spanIterOpts := i.opts.SpanIterOptions(manifest.Level(level)) li.Init(spanIterOpts, i.comparer.Compare, i.newIterRangeKey, current.RangeKeyLevels[level].Iter(), @@ -1145,7 +1181,10 @@ func (i *scanInternalIterator) lazyValue() LazyValue { // unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns // a non-rangedel kind. func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span { - return i.pointKeyIter.iter.Span() + if !i.opts.includeObsoleteKeys { + return i.pointKeyIter.(*pointCollapsingIterator).iter.Span() + } + return nil } // unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns diff --git a/scan_internal_test.go b/scan_internal_test.go index 367d9c7044a..01844daba9d 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -25,15 +25,185 @@ import ( "github.com/stretchr/testify/require" ) +func TestScanStatistics(t *testing.T) { + var d *DB + type scanInternalReader interface { + ScanStatistics( + ctx context.Context, + lower, upper []byte, + ) (*LsmKeyStatistics, error) + } + batches := map[string]*Batch{} + snaps := map[string]*Snapshot{} + ctx := context.TODO() + + getOpts := func() *Options { + opts := &Options{ + FS: vfs.NewMem(), + Logger: testLogger{t: t}, + Comparer: testkeys.Comparer, + FormatMajorVersion: FormatRangeKeys, + BlockPropertyCollectors: []func() BlockPropertyCollector{ + sstable.NewTestKeysBlockPropertyCollector, + }, + } + opts.Experimental.SharedStorage = shared.MakeSimpleFactory(map[shared.Locator]shared.Storage{ + "": shared.NewInMem(), + }) + opts.Experimental.CreateOnShared = true + opts.Experimental.CreateOnSharedLocator = "" + opts.DisableAutomaticCompactions = true + opts.EnsureDefaults() + opts.WithFSDefaults() + return opts + } + cleanup := func() (err error) { + for key, batch := range batches { + err = firstError(err, batch.Close()) + delete(batches, key) + } + for key, snap := range snaps { + err = firstError(err, snap.Close()) + delete(snaps, key) + } + if d != nil { + err = firstError(err, d.Close()) + d = nil + } + return err + } + defer cleanup() + + datadriven.RunTest(t, "testdata/scan_statistics", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + if err := cleanup(); err != nil { + t.Fatal(err) + return err.Error() + } + var err error + d, err = Open("", getOpts()) + require.NoError(t, err) + require.NoError(t, d.SetCreatorID(1)) + return "" + case "snapshot": + s := d.NewSnapshot() + var name string + td.ScanArgs(t, "name", &name) + snaps[name] = s + return "" + case "batch": + var name string + td.MaybeScanArgs(t, "name", &name) + commit := td.HasArg("commit") + b := d.NewIndexedBatch() + require.NoError(t, runBatchDefineCmd(td, b)) + var err error + if commit { + func() { + defer func() { + if r := recover(); r != nil { + err = errors.New(r.(string)) + } + }() + err = b.Commit(nil) + }() + } else if name != "" { + batches[name] = b + } + if err != nil { + return err.Error() + } + count := b.Count() + if commit { + return fmt.Sprintf("committed %d keys\n", count) + } + return fmt.Sprintf("wrote %d keys to batch %q\n", count, name) + case "compact": + if err := runCompactCmd(td, d); err != nil { + return err.Error() + } + return runLSMCmd(td, d) + case "flush": + err := d.Flush() + if err != nil { + return err.Error() + } + return "" + case "commit": + name := pluckStringCmdArg(td, "batch") + b := batches[name] + defer b.Close() + count := b.Count() + require.NoError(t, d.Apply(b, nil)) + delete(batches, name) + return fmt.Sprintf("committed %d keys\n", count) + case "scan-statistics": + var lower, upper []byte + var reader scanInternalReader = d + var b strings.Builder + var showCompactionPinned = false + var showKeyKinds []string + var showLevels []string + + for _, arg := range td.CmdArgs { + switch arg.Key { + case "lower": + lower = []byte(arg.Vals[0]) + case "upper": + upper = []byte(arg.Vals[0]) + case "show-compaction-pinned": + showCompactionPinned = true + case "keys": + showKeyKinds = append(showKeyKinds, arg.Vals...) + case "levels": + showLevels = append(showLevels, arg.Vals...) + default: + showKeyKinds = append(showKeyKinds, arg.Key) + } + } + stats, err := reader.ScanStatistics(ctx, lower, upper) + if err != nil { + return err.Error() + } + + for _, level := range showLevels { + lvl, err := strconv.Atoi(level) + if err != nil || lvl >= numLevels { + return fmt.Sprintf("invalid level %s", level) + } + + fmt.Fprintf(&b, "Level %d:\n", lvl) + if showCompactionPinned { + fmt.Fprintf(&b, " compaction pinned count: %d\n", stats.levels[lvl].compactionPinnedCount) + } + for _, keyKindToShow := range showKeyKinds { + fmt.Fprintf(&b, " %s key count: %d\n", keyKindToShow, stats.levels[lvl].kindsCount[keyKindToShow]) + } + } + + fmt.Fprintf(&b, "Aggregate:\n") + if showCompactionPinned { + fmt.Fprintf(&b, " compaction pinned count: %d\n", stats.accumulated.compactionPinnedCount) + } + for _, keyKindToShow := range showKeyKinds { + fmt.Fprintf(&b, " %s key count: %d\n", keyKindToShow, stats.accumulated.kindsCount[keyKindToShow]) + } + return b.String() + default: + return fmt.Sprintf("unknown command %q", td.Cmd) + } + }) +} + func TestScanInternal(t *testing.T) { var d *DB type scanInternalReader interface { ScanInternal( ctx context.Context, - lower, upper []byte, visitPointKey func(key *InternalKey, value LazyValue) error, - visitRangeDel func(start, end []byte, seqNum uint64) error, - visitRangeKey func(start, end []byte, keys []rangekey.Key) error, - visitSharedFile func(sst *SharedSSTMeta) error) error + lower, upper []byte, + iterOptions scanInternalIterOptions, + ) error } batches := map[string]*Batch{} snaps := map[string]*Snapshot{} @@ -237,18 +407,26 @@ func TestScanInternal(t *testing.T) { } } } - err := reader.ScanInternal(context.TODO(), lower, upper, func(key *InternalKey, value LazyValue) error { - v := value.InPlaceValue() - fmt.Fprintf(&b, "%s (%s)\n", key, v) - return nil - }, func(start, end []byte, seqNum uint64) error { - fmt.Fprintf(&b, "%s-%s#%d,RANGEDEL\n", start, end, seqNum) - return nil - }, func(start, end []byte, keys []rangekey.Key) error { - s := keyspan.Span{Start: start, End: end, Keys: keys} - fmt.Fprintf(&b, "%s\n", s.String()) - return nil - }, fileVisitor) + ops := scanInternalIterOptions{ + visitPointKey: func(key *InternalKey, value LazyValue) error { + v := value.InPlaceValue() + fmt.Fprintf(&b, "%s (%s)\n", key, v) + return nil + }, + visitRangeDel: func(start, end []byte, seqNum uint64) error { + fmt.Fprintf(&b, "%s-%s#%d,RANGEDEL\n", start, end, seqNum) + return nil + }, + visitRangeKey: func(start, end []byte, keys []rangekey.Key) error { + s := keyspan.Span{Start: start, End: end, Keys: keys} + fmt.Fprintf(&b, "%s\n", s.String()) + return nil + }, + visitSharedFile: fileVisitor, + restrictToLevel: false, + includeObsoleteKeys: false, + } + err := reader.ScanInternal(context.TODO(), lower, upper, ops) if err != nil { return err.Error() } diff --git a/snapshot.go b/snapshot.go index 6617cce5f5e..100d311d6c2 100644 --- a/snapshot.go +++ b/snapshot.go @@ -8,8 +8,6 @@ import ( "context" "io" "math" - - "github.com/cockroachdb/pebble/rangekey" ) // Snapshot provides a read-only point-in-time view of the DB state. @@ -64,27 +62,22 @@ func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) *Iter // See comment on db.ScanInternal for the behaviour that can be expected of // point keys deleted by range dels and keys masked by range keys. func (s *Snapshot) ScanInternal( - ctx context.Context, - lower, upper []byte, - visitPointKey func(key *InternalKey, value LazyValue) error, - visitRangeDel func(start, end []byte, seqNum uint64) error, - visitRangeKey func(start, end []byte, keys []rangekey.Key) error, - visitSharedFile func(sst *SharedSSTMeta) error, + ctx context.Context, lower, upper []byte, scanInternalOps scanInternalIterOptions, ) error { if s.db == nil { panic(ErrClosed) } - iter := s.db.newInternalIter(s, &scanInternalOptions{ - IterOptions: IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: lower, - UpperBound: upper, - }, - skipSharedLevels: visitSharedFile != nil, - }) + scanInternalOps.skipSharedLevels = scanInternalOps.visitSharedFile != nil + scanInternalOps.IterOptions = IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: lower, + UpperBound: upper, + } + + iter := s.db.newInternalIter(s, &scanInternalOps) defer iter.close() - return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) + return scanInternalImpl(ctx, lower, upper, iter, scanInternalOps.visitPointKey, scanInternalOps.visitRangeDel, scanInternalOps.visitRangeKey, scanInternalOps.visitSharedFile, scanInternalOps.visitKey) } // Close closes the snapshot, releasing its resources. Close must be called. diff --git a/testdata/scan_statistics b/testdata/scan_statistics new file mode 100644 index 00000000000..641d1b80335 --- /dev/null +++ b/testdata/scan_statistics @@ -0,0 +1,134 @@ + +reset +---- + +batch commit +set b d +set e foo +---- +committed 2 keys + +scan-statistics lower=b upper=e keys=(SET) +---- +Aggregate: + SET key count: 0 + +flush +---- + +scan-statistics lower=b upper=e keys=(SET) levels=(0) +---- +Level 0: + SET key count: 1 +Aggregate: + SET key count: 1 + +scan-statistics lower=b upper=f keys=(SET) levels=(0) +---- +Level 0: + SET key count: 2 +Aggregate: + SET key count: 2 + +scan-statistics lower=f upper=l keys=(SET) +---- +Aggregate: + SET key count: 0 + +batch commit +del b +del e +---- +committed 2 keys + +flush +---- + +scan-statistics lower=b upper=f keys=(SET, DEL) levels=(0) +---- +Level 0: + SET key count: 2 + DEL key count: 2 +Aggregate: + SET key count: 2 + DEL key count: 2 + +reset +---- + +batch commit +set b hi +---- +committed 1 keys + +flush +---- + +batch commit +set b hello +---- +committed 1 keys + +flush +---- + +compact a-z +---- +6: + 000008:[b#0,SET-b#0,SET] + +scan-statistics lower=b upper=f keys=(SET) levels=(6) +---- +Level 6: + SET key count: 1 +Aggregate: + SET key count: 1 + +batch commit +set c a +---- +committed 1 keys + +flush +---- + +scan-statistics lower=b upper=f keys=(SET) levels=(0, 6) +---- +Level 0: + SET key count: 1 +Level 6: + SET key count: 1 +Aggregate: + SET key count: 2 + +reset +---- + +batch commit +set a b +---- +committed 1 keys + +flush +---- + +snapshot name=first +---- + +batch commit +set a c +---- +committed 1 keys + +flush +---- + +compact a-z +---- +6: + 000008:[a#11,SET-a#0,SET] + +scan-statistics lower=a upper=z show-compaction-pinned +---- +Aggregate: + compaction pinned count: 1