diff --git a/db.go b/db.go index bcef08167eb..6509e7a00f0 100644 --- a/db.go +++ b/db.go @@ -1178,21 +1178,28 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator { func (d *DB) ScanInternal( ctx context.Context, lower, upper []byte, - visitPointKey func(key *InternalKey, value LazyValue) error, + visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error, visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, + includeObsoleteKeys bool, ) error { - iter := d.newInternalIter(nil /* snapshot */, &scanInternalOptions{ + scanInternalOpts := &scanInternalOptions{ + visitPointKey: visitPointKey, + visitRangeDel: visitRangeDel, + visitRangeKey: visitRangeKey, + visitSharedFile: visitSharedFile, + skipSharedLevels: visitSharedFile != nil, + includeObsoleteKeys: includeObsoleteKeys, IterOptions: IterOptions{ KeyTypes: IterKeyTypePointsAndRanges, LowerBound: lower, UpperBound: upper, }, - skipSharedLevels: visitSharedFile != nil, - }) + } + iter := d.newInternalIter(nil /* snapshot */, scanInternalOpts) defer iter.close() - return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) + return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts) } // newInternalIter constructs and returns a new scanInternalIterator on this db. @@ -1231,6 +1238,7 @@ func (d *DB) newInternalIter(s *Snapshot, o *scanInternalOptions) *scanInternalI newIters: d.newIters, newIterRangeKey: d.tableNewRangeKeyIter, seqNum: seqNum, + mergingIter: &buf.merging, } if o != nil { dbi.opts = *o @@ -2021,7 +2029,6 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) { if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) { continue } - destTables[j] = SSTableInfo{TableInfo: m.TableInfo()} if opt.withProperties { p, err := d.tableCache.getTableProperties( @@ -2644,6 +2651,78 @@ func (d *DB) SetCreatorID(creatorID uint64) error { return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID)) } +// KeyStatistics keeps track of the number of keys that have been pinned by a +// snapshot as well as counts of the different key kinds in the lsm. +type KeyStatistics struct { + // when a compaction determines a key is obsolete, but cannot elide the key + // because it's required by an open snapshot. + snapshotPinnedKeys int + // the total number of bytes of all snapshot pinned keys. + snapshotPinnedKeysBytes uint64 + // Note: these fields are currently only populated for point keys (including range deletes). + kindsCount [InternalKeyKindMax + 1]int +} + +// LSMKeyStatistics is used by DB.ScanStatistics. +type LSMKeyStatistics struct { + accumulated KeyStatistics + levels [numLevels]KeyStatistics + bytesRead uint64 +} + +// ScanStatistics returns the count of different key kinds within the lsm for a +// key span [lower, upper) as well as the number of snapshot keys. +func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (LSMKeyStatistics, error) { + stats := LSMKeyStatistics{} + var prevKey InternalKey + + err := d.ScanInternal(ctx, lower, upper, + func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error { + // If the previous key is equal to the current point key, the current key was + // pinned by a snapshot. + size := uint64(key.Size()) + kind := key.Kind() + if iterInfo.kind != IteratorLevelFlushable && d.equal(prevKey.UserKey, key.UserKey) { + if iterInfo.kind == IteratorLevelLSM { + stats.levels[iterInfo.Level].snapshotPinnedKeys++ + stats.levels[iterInfo.Level].snapshotPinnedKeysBytes += size + } + stats.accumulated.snapshotPinnedKeys++ + stats.accumulated.snapshotPinnedKeysBytes += size + } + if iterInfo.kind == IteratorLevelLSM { + stats.levels[iterInfo.Level].kindsCount[kind]++ + } + + stats.accumulated.kindsCount[kind]++ + prevKey.CopyFrom(*key) + stats.bytesRead += uint64(key.Size() + value.Len()) + return nil + }, + func(start, end []byte, seqNum uint64) error { + stats.accumulated.kindsCount[InternalKeyKindRangeDelete]++ + stats.bytesRead += uint64(len(start) + len(end)) + return nil + }, + func(start, end []byte, keys []rangekey.Key) error { + stats.bytesRead += uint64(len(start) + len(end)) + for _, key := range keys { + stats.accumulated.kindsCount[key.Kind()]++ + stats.bytesRead += uint64(len(key.Value) + len(key.Suffix)) + } + return nil + }, + nil, + true, + ) + + if err != nil { + return LSMKeyStatistics{}, err + } + + return stats, nil +} + // ObjProvider returns the objstorage.Provider for this database. Meant to be // used for internal purposes only. func (d *DB) ObjProvider() objstorage.Provider { diff --git a/ingest_test.go b/ingest_test.go index 00b2aae0f8d..2f1ef8eec7d 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -905,29 +905,34 @@ func TestIngestShared(t *testing.T) { w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts) var sharedSSTs []SharedSSTMeta - err = from.ScanInternal(context.TODO(), startKey, endKey, func(key *InternalKey, value LazyValue) error { - val, _, err := value.Value(nil) - require.NoError(t, err) - require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)) - return nil - }, func(start, end []byte, seqNum uint64) error { - require.NoError(t, w.DeleteRange(start, end)) - return nil - }, func(start, end []byte, keys []keyspan.Key) error { - s := keyspan.Span{ - Start: start, - End: end, - Keys: keys, - KeysOrder: 0, - } - require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error { - return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v) - })) - return nil - }, func(sst *SharedSSTMeta) error { - sharedSSTs = append(sharedSSTs, *sst) - return nil - }) + err = from.ScanInternal(context.TODO(), startKey, endKey, + func(key *InternalKey, value LazyValue, _ IteratorLevel) error { + val, _, err := value.Value(nil) + require.NoError(t, err) + require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)) + return nil + }, + func(start, end []byte, seqNum uint64) error { + require.NoError(t, w.DeleteRange(start, end)) + return nil + }, + func(start, end []byte, keys []keyspan.Key) error { + s := keyspan.Span{ + Start: start, + End: end, + Keys: keys, + KeysOrder: 0, + } + require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error { + return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v) + })) + return nil + }, + func(sst *SharedSSTMeta) error { + sharedSSTs = append(sharedSSTs, *sst) + return nil + }, + false) require.NoError(t, err) require.NoError(t, w.Close()) diff --git a/options.go b/options.go index ea89f939111..d2c3df0de5e 100644 --- a/options.go +++ b/options.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage/remote" + "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" ) @@ -246,9 +247,18 @@ func (o *IterOptions) SpanIterOptions(level manifest.Level) keyspan.SpanIterOpti type scanInternalOptions struct { IterOptions + visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error + visitRangeDel func(start, end []byte, seqNum uint64) error + visitRangeKey func(start, end []byte, keys []rangekey.Key) error + visitSharedFile func(sst *SharedSSTMeta) error + // skipSharedLevels skips levels that are shareable (level >= // sharedLevelStart). skipSharedLevels bool + + // includeObsoleteKeys specifies whether keys shadowed by newer internal keys + // are exposed. If false, only one internal key per user key is exposed. + includeObsoleteKeys bool } // RangeKeyMasking configures automatic hiding of point keys by range keys. A diff --git a/scan_internal.go b/scan_internal.go index f6d416b1199..176c86311e5 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -14,7 +14,6 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage" - "github.com/cockroachdb/pebble/rangekey" ) const ( @@ -132,6 +131,10 @@ type pointCollapsingIterator struct { fixedSeqNum uint64 } +func (p *pointCollapsingIterator) Span() *keyspan.Span { + return p.iter.Span() +} + // SeekPrefixGE implements the InternalIterator interface. func (p *pointCollapsingIterator) SeekPrefixGE( prefix, key []byte, flags base.SeekGEFlags, @@ -351,6 +354,27 @@ func (p *pointCollapsingIterator) String() string { var _ internalIterator = &pointCollapsingIterator{} +type IteratorLevelKind int8 + +const ( + IteratorLevelUnknown IteratorLevelKind = iota + IteratorLevelLSM + IteratorLevelFlushable +) + +// This is used with scanInternalIterator to surface additional iterator-specific info where possible. +// Note: this is struct is only provided for point keys. +type IteratorLevel struct { + kind IteratorLevelKind + // FlushableIndex indicates the position within the flushable queue of this level. + // Only valid if kind == IteratorLevelFlushable. + FlushableIndex int + // The level within the LSM. Only valid if kind == IteratorLevelLSM. + Level int + // Sublevel is only valid if kind == IteratorLevelLSM and Level == 0. + Sublevel int +} + // scanInternalIterator is an iterator that returns all internal keys, including // tombstones. For instance, an InternalKeyKindDelete would be returned as an // InternalKeyKindDelete instead of the iterator skipping over to the next key. @@ -370,13 +394,15 @@ type scanInternalIterator struct { iter internalIterator readState *readState rangeKey *iteratorRangeKeyState - pointKeyIter pointCollapsingIterator + pointKeyIter internalIterator iterKey *InternalKey iterValue LazyValue alloc *iterAlloc newIters tableNewIters newIterRangeKey keyspan.TableNewSpanIter seqNum uint64 + iterLevels []IteratorLevel + mergingIter *mergingIter // boundsBuf holds two buffers used to store the lower and upper bounds. // Whenever the InternalIterator's bounds change, the new bounds are copied @@ -557,15 +583,9 @@ func (d *DB) truncateSharedFile( } func scanInternalImpl( - ctx context.Context, - lower, upper []byte, - iter *scanInternalIterator, - visitPointKey func(key *InternalKey, value LazyValue) error, - visitRangeDel func(start, end []byte, seqNum uint64) error, - visitRangeKey func(start, end []byte, keys []rangekey.Key) error, - visitSharedFile func(sst *SharedSSTMeta) error, + ctx context.Context, lower, upper []byte, iter *scanInternalIterator, opts *scanInternalOptions, ) error { - if visitSharedFile != nil && (lower == nil || upper == nil) { + if opts.visitSharedFile != nil && (lower == nil || upper == nil) { panic("lower and upper bounds must be specified in skip-shared iteration mode") } // Before starting iteration, check if any files in levels sharedLevelsStart @@ -577,7 +597,7 @@ func scanInternalImpl( db := iter.readState.db provider := db.objProvider seqNum := iter.seqNum - if visitSharedFile != nil { + if opts.visitSharedFile != nil { if provider == nil { panic("expected non-nil Provider in skip-shared iteration mode") } @@ -605,7 +625,7 @@ func scanInternalImpl( if skip { continue } - if err = visitSharedFile(sst); err != nil { + if err = opts.visitSharedFile(sst); err != nil { return err } } @@ -614,22 +634,34 @@ func scanInternalImpl( for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() { key := iter.unsafeKey() - switch key.Kind() { case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet: - span := iter.unsafeSpan() - if err := visitRangeKey(span.Start, span.End, span.Keys); err != nil { - return err + if opts.visitRangeKey != nil { + span := iter.unsafeSpan() + if err := opts.visitRangeKey(span.Start, span.End, span.Keys); err != nil { + return err + } } case InternalKeyKindRangeDelete: - rangeDel := iter.unsafeRangeDel() - if err := visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil { - return err + if opts.visitRangeDel != nil { + rangeDel := iter.unsafeRangeDel() + if err := opts.visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil { + return err + } } default: - val := iter.lazyValue() - if err := visitPointKey(key, val); err != nil { - return err + if opts.visitPointKey != nil { + var info IteratorLevel + if len(iter.mergingIter.heap.items) > 0 { + mergingIterIdx := iter.mergingIter.heap.items[0].index + info = iter.iterLevels[mergingIterIdx] + } else { + info = IteratorLevel{kind: IteratorLevelUnknown} + } + val := iter.lazyValue() + if err := opts.visitPointKey(key, val, info); err != nil { + return err + } } } } @@ -650,8 +682,10 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * numLevelIters := 0 current := i.readState.current + numMergingLevels += len(current.L0SublevelFiles) numLevelIters += len(current.L0SublevelFiles) + for level := 1; level < len(current.Levels); level++ { if current.Levels[level].Empty() { continue @@ -674,19 +708,26 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels) rangeDelLevels := make([]keyspan.LevelIter, 0, numLevelIters) + i.iterLevels = make([]IteratorLevel, numMergingLevels) + mlevelsIndex := 0 + // Next are the memtables. for j := len(memtables) - 1; j >= 0; j-- { mem := memtables[j] mlevels = append(mlevels, mergingIterLevel{ iter: mem.newIter(&i.opts.IterOptions), }) + i.iterLevels[mlevelsIndex] = IteratorLevel{ + kind: IteratorLevelFlushable, + FlushableIndex: j, + } + mlevelsIndex++ if rdi := mem.newRangeDelIter(&i.opts.IterOptions); rdi != nil { rangeDelIters = append(rangeDelIters, rdi) } } // Next are the file levels: L0 sub-levels followed by lower levels. - mlevelsIndex := len(mlevels) levelsIndex := len(levels) mlevels = mlevels[:numMergingLevels] levels = levels[:numLevelIters] @@ -710,12 +751,14 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * mlevelsIndex++ } - // Add level iterators for the L0 sublevels, iterating from newest to - // oldest. - for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- { - addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i)) + for j := len(current.L0SublevelFiles) - 1; j >= 0; j-- { + i.iterLevels[mlevelsIndex] = IteratorLevel{ + kind: IteratorLevelLSM, + Level: 0, + Sublevel: j, + } + addLevelIterForFiles(current.L0SublevelFiles[j].Iter(), manifest.L0Sublevel(j)) } - // Add level iterators for the non-empty non-L0 levels. for level := 1; level < numLevels; level++ { if current.Levels[level].Empty() { @@ -724,18 +767,28 @@ func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf * if i.opts.skipSharedLevels && level >= sharedLevelsStart { continue } + i.iterLevels[mlevelsIndex] = IteratorLevel{kind: IteratorLevelLSM, Level: level} addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level)) } + buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...) buf.merging.snapshot = i.seqNum rangeDelMiter.Init(i.comparer.Compare, keyspan.VisibleTransform(i.seqNum), new(keyspan.MergingBuffers), rangeDelIters...) - i.pointKeyIter = pointCollapsingIterator{ - comparer: i.comparer, - merge: i.merge, - seqNum: i.seqNum, + + if i.opts.includeObsoleteKeys { + iiter := &keyspan.InterleavingIter{} + iiter.Init(i.comparer, &buf.merging, &rangeDelMiter, nil /* mask */, i.opts.LowerBound, i.opts.UpperBound) + i.pointKeyIter = iiter + } else { + pcIter := &pointCollapsingIterator{ + comparer: i.comparer, + merge: i.merge, + seqNum: i.seqNum, + } + pcIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, nil /* mask */, i.opts.LowerBound, i.opts.UpperBound) + i.pointKeyIter = pcIter } - i.pointKeyIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, nil /* mask */, i.opts.LowerBound, i.opts.UpperBound) - i.iter = &i.pointKeyIter + i.iter = i.pointKeyIter } // constructRangeKeyIter constructs the range-key iterator stack, populating @@ -825,7 +878,10 @@ func (i *scanInternalIterator) lazyValue() LazyValue { // unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns // a non-rangedel kind. func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span { - return i.pointKeyIter.iter.Span() + type spanInternalIterator interface { + Span() *keyspan.Span + } + return i.pointKeyIter.(spanInternalIterator).Span() } // unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns diff --git a/scan_internal_test.go b/scan_internal_test.go index 4a8166db05d..4aee9d4fee3 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -25,15 +25,190 @@ import ( "github.com/stretchr/testify/require" ) +func TestScanStatistics(t *testing.T) { + var d *DB + type scanInternalReader interface { + ScanStatistics( + ctx context.Context, + lower, upper []byte, + ) (LSMKeyStatistics, error) + } + batches := map[string]*Batch{} + snaps := map[string]*Snapshot{} + ctx := context.TODO() + + getOpts := func() *Options { + opts := &Options{ + FS: vfs.NewMem(), + Logger: testLogger{t: t}, + Comparer: testkeys.Comparer, + FormatMajorVersion: FormatRangeKeys, + BlockPropertyCollectors: []func() BlockPropertyCollector{ + sstable.NewTestKeysBlockPropertyCollector, + }, + } + opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ + "": remote.NewInMem(), + }) + opts.Experimental.CreateOnShared = true + opts.Experimental.CreateOnSharedLocator = "" + opts.DisableAutomaticCompactions = true + opts.EnsureDefaults() + opts.WithFSDefaults() + return opts + } + cleanup := func() (err error) { + for key, batch := range batches { + err = firstError(err, batch.Close()) + delete(batches, key) + } + for key, snap := range snaps { + err = firstError(err, snap.Close()) + delete(snaps, key) + } + if d != nil { + err = firstError(err, d.Close()) + d = nil + } + return err + } + defer cleanup() + + datadriven.RunTest(t, "testdata/scan_statistics", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + if err := cleanup(); err != nil { + t.Fatal(err) + return err.Error() + } + var err error + d, err = Open("", getOpts()) + require.NoError(t, err) + require.NoError(t, d.SetCreatorID(1)) + return "" + case "snapshot": + s := d.NewSnapshot() + var name string + td.ScanArgs(t, "name", &name) + snaps[name] = s + return "" + case "batch": + var name string + td.MaybeScanArgs(t, "name", &name) + commit := td.HasArg("commit") + b := d.NewIndexedBatch() + require.NoError(t, runBatchDefineCmd(td, b)) + var err error + if commit { + func() { + defer func() { + if r := recover(); r != nil { + err = errors.New(r.(string)) + } + }() + err = b.Commit(nil) + }() + } else if name != "" { + batches[name] = b + } + if err != nil { + return err.Error() + } + count := b.Count() + if commit { + return fmt.Sprintf("committed %d keys\n", count) + } + return fmt.Sprintf("wrote %d keys to batch %q\n", count, name) + case "compact": + if err := runCompactCmd(td, d); err != nil { + return err.Error() + } + return runLSMCmd(td, d) + case "flush": + err := d.Flush() + if err != nil { + return err.Error() + } + return "" + case "commit": + name := pluckStringCmdArg(td, "batch") + b := batches[name] + defer b.Close() + count := b.Count() + require.NoError(t, d.Apply(b, nil)) + delete(batches, name) + return fmt.Sprintf("committed %d keys\n", count) + case "scan-statistics": + var lower, upper []byte + var reader scanInternalReader = d + var b strings.Builder + var showSnapshotPinned = false + var keyKindsToDisplay []InternalKeyKind + var showLevels []string + + for _, arg := range td.CmdArgs { + switch arg.Key { + case "lower": + lower = []byte(arg.Vals[0]) + case "upper": + upper = []byte(arg.Vals[0]) + case "show-snapshot-pinned": + showSnapshotPinned = true + case "keys": + for _, key := range arg.Vals { + keyKindsToDisplay = append(keyKindsToDisplay, base.ParseKind(key)) + } + case "levels": + showLevels = append(showLevels, arg.Vals...) + default: + } + } + stats, err := reader.ScanStatistics(ctx, lower, upper) + if err != nil { + return err.Error() + } + + for _, level := range showLevels { + lvl, err := strconv.Atoi(level) + if err != nil || lvl >= numLevels { + return fmt.Sprintf("invalid level %s", level) + } + + fmt.Fprintf(&b, "Level %d:\n", lvl) + if showSnapshotPinned { + fmt.Fprintf(&b, " compaction pinned count: %d\n", stats.levels[lvl].snapshotPinnedKeys) + } + for _, kind := range keyKindsToDisplay { + fmt.Fprintf(&b, " %s key count: %d\n", kind.String(), stats.levels[lvl].kindsCount[kind]) + } + } + + fmt.Fprintf(&b, "Aggregate:\n") + if showSnapshotPinned { + fmt.Fprintf(&b, " snapshot pinned count: %d\n", stats.accumulated.snapshotPinnedKeys) + } + for _, kind := range keyKindsToDisplay { + fmt.Fprintf(&b, " %s key count: %d\n", kind.String(), stats.accumulated.kindsCount[kind]) + } + return b.String() + default: + return fmt.Sprintf("unknown command %q", td.Cmd) + } + }) +} + func TestScanInternal(t *testing.T) { var d *DB type scanInternalReader interface { ScanInternal( ctx context.Context, - lower, upper []byte, visitPointKey func(key *InternalKey, value LazyValue) error, + lower, upper []byte, + visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error, visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, - visitSharedFile func(sst *SharedSSTMeta) error) error + visitSharedFile func(sst *SharedSSTMeta) error, + includeObsoleteKeys bool, + ) error } batches := map[string]*Batch{} snaps := map[string]*Snapshot{} @@ -237,18 +412,24 @@ func TestScanInternal(t *testing.T) { } } } - err := reader.ScanInternal(context.TODO(), lower, upper, func(key *InternalKey, value LazyValue) error { - v := value.InPlaceValue() - fmt.Fprintf(&b, "%s (%s)\n", key, v) - return nil - }, func(start, end []byte, seqNum uint64) error { - fmt.Fprintf(&b, "%s-%s#%d,RANGEDEL\n", start, end, seqNum) - return nil - }, func(start, end []byte, keys []rangekey.Key) error { - s := keyspan.Span{Start: start, End: end, Keys: keys} - fmt.Fprintf(&b, "%s\n", s.String()) - return nil - }, fileVisitor) + err := reader.ScanInternal(context.TODO(), lower, upper, + func(key *InternalKey, value LazyValue, _ IteratorLevel) error { + v := value.InPlaceValue() + fmt.Fprintf(&b, "%s (%s)\n", key, v) + return nil + }, + func(start, end []byte, seqNum uint64) error { + fmt.Fprintf(&b, "%s-%s#%d,RANGEDEL\n", start, end, seqNum) + return nil + }, + func(start, end []byte, keys []rangekey.Key) error { + s := keyspan.Span{Start: start, End: end, Keys: keys} + fmt.Fprintf(&b, "%s\n", s.String()) + return nil + }, + fileVisitor, + false, + ) if err != nil { return err.Error() } diff --git a/snapshot.go b/snapshot.go index 6617cce5f5e..98eccccfde0 100644 --- a/snapshot.go +++ b/snapshot.go @@ -66,25 +66,33 @@ func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) *Iter func (s *Snapshot) ScanInternal( ctx context.Context, lower, upper []byte, - visitPointKey func(key *InternalKey, value LazyValue) error, + visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error, visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, + includeObsoleteKeys bool, ) error { if s.db == nil { panic(ErrClosed) } - iter := s.db.newInternalIter(s, &scanInternalOptions{ + scanInternalOpts := &scanInternalOptions{ + visitPointKey: visitPointKey, + visitRangeDel: visitRangeDel, + visitRangeKey: visitRangeKey, + visitSharedFile: visitSharedFile, + skipSharedLevels: visitSharedFile != nil, + includeObsoleteKeys: includeObsoleteKeys, IterOptions: IterOptions{ KeyTypes: IterKeyTypePointsAndRanges, LowerBound: lower, UpperBound: upper, }, - skipSharedLevels: visitSharedFile != nil, - }) + } + + iter := s.db.newInternalIter(s, scanInternalOpts) defer iter.close() - return scanInternalImpl(ctx, lower, upper, iter, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) + return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts) } // Close closes the snapshot, releasing its resources. Close must be called. diff --git a/testdata/scan_statistics b/testdata/scan_statistics new file mode 100644 index 00000000000..79620147dd5 --- /dev/null +++ b/testdata/scan_statistics @@ -0,0 +1,134 @@ + +reset +---- + +batch commit +set b d +set e foo +---- +committed 2 keys + +scan-statistics lower=b upper=f keys=(SET) +---- +Aggregate: + SET key count: 2 + +flush +---- + +scan-statistics lower=b upper=e keys=(SET) levels=(0) +---- +Level 0: + SET key count: 1 +Aggregate: + SET key count: 1 + +scan-statistics lower=b upper=f keys=(SET) levels=(0) +---- +Level 0: + SET key count: 2 +Aggregate: + SET key count: 2 + +scan-statistics lower=f upper=l keys=(SET) +---- +Aggregate: + SET key count: 0 + +batch commit +del b +del e +---- +committed 2 keys + +flush +---- + +scan-statistics lower=b upper=f keys=(SET, DEL) levels=(0) +---- +Level 0: + SET key count: 2 + DEL key count: 2 +Aggregate: + SET key count: 2 + DEL key count: 2 + +reset +---- + +batch commit +set b hi +---- +committed 1 keys + +flush +---- + +batch commit +set b hello +---- +committed 1 keys + +flush +---- + +compact a-z +---- +6: + 000008:[b#0,SET-b#0,SET] + +scan-statistics lower=b upper=f keys=(SET) levels=(6) +---- +Level 6: + SET key count: 1 +Aggregate: + SET key count: 1 + +batch commit +set c a +---- +committed 1 keys + +flush +---- + +scan-statistics lower=b upper=f keys=(SET) levels=(0, 6) +---- +Level 0: + SET key count: 1 +Level 6: + SET key count: 1 +Aggregate: + SET key count: 2 + +reset +---- + +batch commit +set a b +---- +committed 1 keys + +flush +---- + +snapshot name=first +---- + +batch commit +set a c +---- +committed 1 keys + +flush +---- + +compact a-z +---- +6: + 000008:[a#11,SET-a#0,SET] + +scan-statistics lower=a upper=z show-snapshot-pinned +---- +Aggregate: + snapshot pinned count: 1