Skip to content

Commit

Permalink
db: add Batch.NewBatchOnlyIter that reads only from the Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeerbhola committed Oct 27, 2023
1 parent f522e2f commit ed45a77
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 133 deletions.
15 changes: 14 additions & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,20 @@ func (b *Batch) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterat
if b.index == nil {
return &Iterator{err: ErrNotIndexed}, nil
}
return b.db.newIter(ctx, b, snapshotIterOpts{}, o), nil
return b.db.newIter(ctx, b, newIterOpts{}, o), nil
}

// NewBatchOnlyIter constructs an iterator that only reads the contents of the
// batch, and does not overlay the batch mutations on top of the DB state.
//
// The returned Iterator observes all of the Batch's existing mutations, but
// no later mutations. Its view can be refreshed via RefreshBatchSnapshot or
// SetOptions().
func (b *Batch) NewBatchOnlyIter(ctx context.Context, o *IterOptions) (*Iterator, error) {
if b.index == nil {
return &Iterator{err: ErrNotIndexed}, nil
}
return b.db.newIter(ctx, b, newIterOpts{batch: batchIterOpts{batchOnly: true}}, o), nil
}

// newInternalIter creates a new internalIterator that iterates over the
Expand Down
29 changes: 25 additions & 4 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pebble

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -307,6 +308,12 @@ func testBatchEmpty(t *testing.T, size int) {
require.True(t, err != nil)
require.False(t, iter2.First())
require.NoError(t, iter2.Close())
iter3, err := ib.NewBatchOnlyIter(context.Background(), nil)
require.NoError(t, err)
require.False(t, iter3.First())
_, err = iter3.Clone(CloneOptions{})
require.Error(t, err)
require.NoError(t, iter3.Close())
}

func TestBatchApplyNoSyncWait(t *testing.T) {
Expand Down Expand Up @@ -451,13 +458,17 @@ func TestIndexedBatchReset(t *testing.T) {
iter2, err := iter.Clone(CloneOptions{})
require.NoError(t, err)
defer iter2.Close()
var count [2]int
for i, it := range []*Iterator{iter, iter2} {
iter3, err := ib.NewBatchOnlyIter(context.Background(), nil)
require.NoError(t, err)
defer iter3.Close()
var count [3]int
for i, it := range []*Iterator{iter, iter2, iter3} {
for it.First(); it.Valid(); it.Next() {
count[i]++
}
}
require.Equal(t, count[0], count[1])
require.Equal(t, count[0], count[2])
return count[0]
}
contains := func(ib *Batch, key, value string) bool {
Expand All @@ -466,8 +477,11 @@ func TestIndexedBatchReset(t *testing.T) {
iter2, err := iter.Clone(CloneOptions{})
require.NoError(t, err)
defer iter2.Close()
var found [2]bool
for i, it := range []*Iterator{iter, iter2} {
iter3, err := ib.NewBatchOnlyIter(context.Background(), nil)
require.NoError(t, err)
defer iter3.Close()
var found [3]bool
for i, it := range []*Iterator{iter, iter2, iter3} {
for it.First(); it.Valid(); it.Next() {
if string(it.Key()) == key &&
string(it.Value()) == value {
Expand All @@ -476,6 +490,7 @@ func TestIndexedBatchReset(t *testing.T) {
}
}
require.Equal(t, found[0], found[1])
require.Equal(t, found[0], found[2])
return found[0]
}
// Set a key and check whether the key-value pair is visible.
Expand Down Expand Up @@ -529,6 +544,12 @@ func TestIndexedBatchMutation(t *testing.T) {
KeyTypes: IterKeyTypePointsAndRanges,
})
return ""
case "new-batch-only-iter":
name := td.CmdArgs[0].String()
iters[name], _ = b.NewBatchOnlyIter(context.Background(), &IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
})
return ""
case "new-db-iter":
name := td.CmdArgs[0].String()
iters[name], _ = d.NewIter(&IterOptions{
Expand Down
163 changes: 96 additions & 67 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,15 +996,31 @@ type snapshotIterOpts struct {
vers *version
}

type batchIterOpts struct {
batchOnly bool
}
type newIterOpts struct {
snapshot snapshotIterOpts
batch batchIterOpts
}

// newIter constructs a new iterator, merging in batch iterators as an extra
// level.
func (d *DB) newIter(
ctx context.Context, batch *Batch, sOpts snapshotIterOpts, o *IterOptions,
ctx context.Context, batch *Batch, internalOpts newIterOpts, o *IterOptions,
) *Iterator {
if internalOpts.batch.batchOnly {
if batch == nil {
panic("batchOnly is true, but batch is nil")
}
if internalOpts.snapshot.vers != nil {
panic("batchOnly is true, but snapshotIterOpts is initialized")
}
}
if err := d.closed.Load(); err != nil {
panic(err)
}
seqNum := sOpts.seqNum
seqNum := internalOpts.snapshot.seqNum
if o.rangeKeys() {
if d.FormatMajorVersion() < FormatRangeKeys {
panic(fmt.Sprintf(
Expand All @@ -1023,22 +1039,28 @@ func (d *DB) newIter(
// DB.mem.queue[0].logSeqNum.
panic("OnlyReadGuaranteedDurable is not supported for batches or snapshots")
}
// Grab and reference the current readState. This prevents the underlying
// files in the associated version from being deleted if there is a current
// compaction. The readState is unref'd by Iterator.Close().
var readState *readState
if sOpts.vers == nil {
// NB: loadReadState() calls readState.ref().
readState = d.loadReadState()
} else {
// s.vers != nil
sOpts.vers.Ref()
}
var newIters tableNewIters
var newIterRangeKey keyspan.TableNewSpanIter
if !internalOpts.batch.batchOnly {
// Grab and reference the current readState. This prevents the underlying
// files in the associated version from being deleted if there is a current
// compaction. The readState is unref'd by Iterator.Close().
if internalOpts.snapshot.vers == nil {
// NB: loadReadState() calls readState.ref().
readState = d.loadReadState()
} else {
// vers != nil
internalOpts.snapshot.vers.Ref()
}

// Determine the seqnum to read at after grabbing the read state (current and
// memtables) above.
if seqNum == 0 {
seqNum = d.mu.versions.visibleSeqNum.Load()
// Determine the seqnum to read at after grabbing the read state (current and
// memtables) above.
if seqNum == 0 {
seqNum = d.mu.versions.visibleSeqNum.Load()
}
newIters = d.newIters
newIterRangeKey = d.tableNewRangeKeyIter
}

// Bundle various structures under a single umbrella in order to allocate
Expand All @@ -1051,14 +1073,15 @@ func (d *DB) newIter(
merge: d.merge,
comparer: *d.opts.Comparer,
readState: readState,
version: sOpts.vers,
version: internalOpts.snapshot.vers,
keyBuf: buf.keyBuf,
prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
boundsBuf: buf.boundsBuf,
batch: batch,
newIters: d.newIters,
newIterRangeKey: d.tableNewRangeKeyIter,
newIters: newIters,
newIterRangeKey: newIterRangeKey,
seqNum: seqNum,
batchOnlyIter: internalOpts.batch.batchOnly,
}
if o != nil {
dbi.opts = *o
Expand Down Expand Up @@ -1348,20 +1371,24 @@ func (i *Iterator) constructPointIter(
if i.batch != nil {
numMergingLevels++
}
numMergingLevels += len(memtables)

current := i.version
if current == nil {
current = i.readState.current
}
numMergingLevels += len(current.L0SublevelFiles)
numLevelIters += len(current.L0SublevelFiles)
for level := 1; level < len(current.Levels); level++ {
if current.Levels[level].Empty() {
continue
var current *version
if !i.batchOnlyIter {
numMergingLevels += len(memtables)

current = i.version
if current == nil {
current = i.readState.current
}
numMergingLevels += len(current.L0SublevelFiles)
numLevelIters += len(current.L0SublevelFiles)
for level := 1; level < len(current.Levels); level++ {
if current.Levels[level].Empty() {
continue
}
numMergingLevels++
numLevelIters++
}
numMergingLevels++
numLevelIters++
}

if numMergingLevels > cap(mlevels) {
Expand Down Expand Up @@ -1399,47 +1426,49 @@ func (i *Iterator) constructPointIter(
}
}

// Next are the memtables.
for j := len(memtables) - 1; j >= 0; j-- {
mem := memtables[j]
mlevels = append(mlevels, mergingIterLevel{
iter: mem.newIter(&i.opts),
rangeDelIter: mem.newRangeDelIter(&i.opts),
})
}
if !i.batchOnlyIter {
// Next are the memtables.
for j := len(memtables) - 1; j >= 0; j-- {
mem := memtables[j]
mlevels = append(mlevels, mergingIterLevel{
iter: mem.newIter(&i.opts),
rangeDelIter: mem.newRangeDelIter(&i.opts),
})
}

// Next are the file levels: L0 sub-levels followed by lower levels.
mlevelsIndex := len(mlevels)
levelsIndex := len(levels)
mlevels = mlevels[:numMergingLevels]
levels = levels[:numLevelIters]
i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
li := &levels[levelsIndex]
// Next are the file levels: L0 sub-levels followed by lower levels.
mlevelsIndex := len(mlevels)
levelsIndex := len(levels)
mlevels = mlevels[:numMergingLevels]
levels = levels[:numLevelIters]
i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
li := &levels[levelsIndex]

li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
li.initRangeDel(&mlevels[mlevelsIndex].rangeDelIter)
li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
mlevels[mlevelsIndex].levelIter = li
mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
li.initRangeDel(&mlevels[mlevelsIndex].rangeDelIter)
li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
mlevels[mlevelsIndex].levelIter = li
mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)

levelsIndex++
mlevelsIndex++
}
levelsIndex++
mlevelsIndex++
}

// Add level iterators for the L0 sublevels, iterating from newest to
// oldest.
for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
}
// Add level iterators for the L0 sublevels, iterating from newest to
// oldest.
for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
}

// Add level iterators for the non-empty non-L0 levels.
for level := 1; level < len(current.Levels); level++ {
if current.Levels[level].Empty() {
continue
// Add level iterators for the non-empty non-L0 levels.
for level := 1; level < len(current.Levels); level++ {
if current.Levels[level].Empty() {
continue
}
addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
}
addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
}
buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
if len(mlevels) <= cap(buf.levelsPositioned) {
Expand Down Expand Up @@ -1494,7 +1523,7 @@ func (d *DB) NewIter(o *IterOptions) (*Iterator, error) {
// NewIterWithContext is like NewIter, and additionally accepts a context for
// tracing.
func (d *DB) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
return d.newIter(ctx, nil /* batch */, snapshotIterOpts{}, o), nil
return d.newIter(ctx, nil /* batch */, newIterOpts{}, o), nil
}

// NewSnapshot returns a point-in-time view of the current DB state. Iterators
Expand Down
6 changes: 5 additions & 1 deletion iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ type Iterator struct {
// Used for an optimization in external iterators to reduce the number of
// merging levels.
forwardOnly bool
// batchOnlyIter is set to true for Batch.NewBatchOnlyIter.
batchOnlyIter bool
// closePointIterOnce is set to true if this point iter can only be Close()d
// once, _and_ closing i.iter and then i.pointIter would close i.pointIter
// twice. This is necessary to track if the point iter is an internal iterator
Expand Down Expand Up @@ -2708,7 +2710,9 @@ func (i *Iterator) CloneWithContext(ctx context.Context, opts CloneOptions) (*It
if opts.IterOptions == nil {
opts.IterOptions = &i.opts
}

if i.batchOnlyIter {
return nil, errors.Errorf("cannot Clone a batch-only Iterator")
}
readState := i.readState
vers := i.version
if readState == nil && vers == nil {
Expand Down
Loading

0 comments on commit ed45a77

Please sign in to comment.