Skip to content

Commit

Permalink
db: add SetContext method to Iterator to override existing context
Browse files Browse the repository at this point in the history
This is to allow for (short-lived) Iterator reuse in CockroachDB, and
cases where the code that uses an Iterator is "far" from the code that
uses the iterator (e.g. rangefeed.registration.maybeRunCatchUpScan).

Relates to cockroachdb/cockroach#113257
and cockroachdb/cockroach#113256
  • Loading branch information
sumeerbhola committed Nov 6, 2023
1 parent 9a4379b commit a0b01b6
Show file tree
Hide file tree
Showing 19 changed files with 100 additions and 1 deletion.
4 changes: 4 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,8 @@ func (i *batchIter) SetBounds(lower, upper []byte) {
i.iter.SetBounds(lower, upper)
}

func (i *batchIter) SetContext(_ context.Context) {}

type flushableBatchEntry struct {
// offset is the byte offset of the record within the batch repr.
offset uint32
Expand Down Expand Up @@ -2161,6 +2163,8 @@ func (i *flushableBatchIter) SetBounds(lower, upper []byte) {
i.upper = upper
}

func (i *flushableBatchIter) SetContext(_ context.Context) {}

// flushFlushableBatchIter is similar to flushableBatchIter but it keeps track
// of number of bytes iterated.
type flushFlushableBatchIter struct {
Expand Down
4 changes: 4 additions & 0 deletions error_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package pebble

import (
"context"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
)
Expand Down Expand Up @@ -68,6 +70,8 @@ func (c *errorIter) String() string {

func (c *errorIter) SetBounds(lower, upper []byte) {}

func (c *errorIter) SetContext(_ context.Context) {}

type errorKeyspanIter struct {
err error
}
Expand Down
2 changes: 2 additions & 0 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,8 @@ func (s *simpleLevelIter) SetBounds(lower, upper []byte) {
s.resetFilteredIters()
}

func (s *simpleLevelIter) SetContext(_ context.Context) {}

func (s *simpleLevelIter) String() string {
if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) {
return "simpleLevelIter: current=<nil>"
Expand Down
2 changes: 2 additions & 0 deletions get_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,5 @@ func (g *getIter) Close() error {
func (g *getIter) SetBounds(lower, upper []byte) {
panic("pebble: SetBounds unimplemented")
}

func (g *getIter) SetContext(_ context.Context) {}
4 changes: 4 additions & 0 deletions internal/arenaskl/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package arenaskl

import (
"context"
"sync"

"github.com/cockroachdb/pebble/internal/base"
Expand Down Expand Up @@ -238,6 +239,9 @@ func (it *Iterator) SetBounds(lower, upper []byte) {
it.upper = upper
}

// SetContext implements base.InternalIterator.
func (it *Iterator) SetContext(_ context.Context) {}

func (it *Iterator) decodeKey() {
it.key.UserKey = it.list.arena.getBytes(it.nd.keyOffset, it.nd.keySize)
it.key.Trailer = it.nd.keyTrailer
Expand Down
5 changes: 5 additions & 0 deletions internal/base/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package base

import (
"context"
"fmt"
"time"
)
Expand Down Expand Up @@ -204,6 +205,10 @@ type InternalIterator interface {
// optimizations.
SetBounds(lower, upper []byte)

// SetContext replaces the context provided at iterator creation, or the
// last one provided by SetContext.
SetContext(ctx context.Context)

fmt.Stringer
}

Expand Down
6 changes: 6 additions & 0 deletions internal/invalidating/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package invalidating

import (
"context"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/fastrand"
"github.com/cockroachdb/pebble/internal/invariants"
Expand Down Expand Up @@ -157,6 +159,10 @@ func (i *iter) SetBounds(lower, upper []byte) {
i.iter.SetBounds(lower, upper)
}

func (i *iter) SetContext(ctx context.Context) {
i.iter.SetContext(ctx)
}

func (i *iter) String() string {
return i.iter.String()
}
6 changes: 6 additions & 0 deletions internal/keyspan/interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package keyspan

import (
"context"
"fmt"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -1050,6 +1051,11 @@ func (i *InterleavingIter) SetBounds(lower, upper []byte) {
i.Invalidate()
}

// SetContext implements (base.InternalIterator).SetContext.
func (i *InterleavingIter) SetContext(ctx context.Context) {
i.pointIter.SetContext(ctx)
}

// Invalidate invalidates the interleaving iterator's current position, clearing
// its state. This prevents optimizations such as reusing the current span on
// seek.
Expand Down
2 changes: 2 additions & 0 deletions internal/keyspan/interleaving_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package keyspan

import (
"bytes"
"context"
"fmt"
"io"
"sort"
Expand Down Expand Up @@ -287,3 +288,4 @@ func (i *pointIterator) String() string { return "test-point-iterator" }
func (i *pointIterator) SetBounds(lower, upper []byte) {
i.lower, i.upper = lower, upper
}
func (i *pointIterator) SetContext(_ context.Context) {}
9 changes: 8 additions & 1 deletion internal/keyspan/internal_iter_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

package keyspan

import "github.com/cockroachdb/pebble/internal/base"
import (
"context"

"github.com/cockroachdb/pebble/internal/base"
)

// InternalIteratorShim is a temporary iterator type used as a shim between
// keyspan.MergingIter and base.InternalIterator. It's used temporarily for
Expand Down Expand Up @@ -112,6 +116,9 @@ func (i *InternalIteratorShim) Close() error {
func (i *InternalIteratorShim) SetBounds(lower, upper []byte) {
}

// SetContext implements (base.InternalIterator).SetContext.
func (i *InternalIteratorShim) SetContext(_ context.Context) {}

// String implements fmt.Stringer.
func (i *InternalIteratorShim) String() string {
return i.miter.String()
Expand Down
16 changes: 16 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2392,6 +2392,22 @@ func (i *Iterator) SetBounds(lower, upper []byte) {
i.invalidate()
}

// SetContext replaces the context provided at iterator creation, or the last
// one provided by SetContext. Even though iterators are expected to be
// short-lived, there are some cases where either (a) iterators are used far
// from the code that created them, (b) iterators are reused (while being
// short-lived) for processing different requests. For such scenarios, we
// allow the caller to replace the context.
func (i *Iterator) SetContext(ctx context.Context) {
i.ctx = ctx
i.iter.SetContext(ctx)
// If the iterator has an open point iterator that's not currently being
// used, propagate the new context to it.
if i.pointIter != nil && !i.opts.pointKeys() {
i.pointIter.SetContext(i.ctx)
}
}

// Initialization and changing of the bounds must call processBounds.
// processBounds saves the bounds and computes derived state from those
// bounds.
Expand Down
2 changes: 2 additions & 0 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ func (f *fakeIter) SetBounds(lower, upper []byte) {
f.upper = upper
}

func (f *fakeIter) SetContext(_ context.Context) {}

// testIterator tests creating a combined iterator from a number of sub-
// iterators. newFunc is a constructor function. splitFunc returns a random
// split of the testKeyValuePairs slice such that walking a combined iterator
Expand Down
7 changes: 7 additions & 0 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,13 @@ func (l *levelIter) SetBounds(lower, upper []byte) {
l.iter.SetBounds(l.tableOpts.LowerBound, l.tableOpts.UpperBound)
}

func (l *levelIter) SetContext(ctx context.Context) {
l.ctx = ctx
// TODO(sumeer): this is losing the ctx = objiotracing.WithLevel(ctx,
// manifest.LevelToInt(opts.level)) that happens in table_cache.go.
l.iter.SetContext(ctx)
}

func (l *levelIter) String() string {
if l.iterFile != nil {
return fmt.Sprintf("%s: fileNum=%s", l.level, l.iter.String())
Expand Down
7 changes: 7 additions & 0 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pebble

import (
"bytes"
"context"
"fmt"
"runtime/debug"
"unsafe"
Expand Down Expand Up @@ -1358,6 +1359,12 @@ func (m *mergingIter) SetBounds(lower, upper []byte) {
m.heap.clear()
}

func (m *mergingIter) SetContext(ctx context.Context) {
for i := range m.levels {
m.levels[i].iter.SetContext(ctx)
}
}

func (m *mergingIter) DebugString() string {
var buf bytes.Buffer
sep := ""
Expand Down
10 changes: 10 additions & 0 deletions range_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package pebble

import (
"context"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
Expand Down Expand Up @@ -693,6 +695,14 @@ func (i *lazyCombinedIter) SetBounds(lower, upper []byte) {
i.pointIter.SetBounds(lower, upper)
}

func (i *lazyCombinedIter) SetContext(ctx context.Context) {
if i.combinedIterState.initialized {
i.parent.rangeKey.iiter.SetContext(ctx)
return
}
i.pointIter.SetContext(ctx)
}

func (i *lazyCombinedIter) String() string {
if i.combinedIterState.initialized {
return i.parent.rangeKey.iiter.String()
Expand Down
4 changes: 4 additions & 0 deletions scan_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ func (p *pointCollapsingIterator) SetBounds(lower, upper []byte) {
p.iter.SetBounds(lower, upper)
}

func (p *pointCollapsingIterator) SetContext(ctx context.Context) {
p.iter.SetContext(ctx)
}

// String implements the InternalIterator interface.
func (p *pointCollapsingIterator) String() string {
return p.iter.String()
Expand Down
3 changes: 3 additions & 0 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package sstable

import (
"context"
"encoding/binary"
"unsafe"

Expand Down Expand Up @@ -1540,6 +1541,8 @@ func (i *blockIter) SetBounds(lower, upper []byte) {
panic("pebble: SetBounds unimplemented")
}

func (i *blockIter) SetContext(_ context.Context) {}

func (i *blockIter) valid() bool {
return i.offset >= 0 && i.offset < i.restarts
}
Expand Down
4 changes: 4 additions & 0 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ func (i *singleLevelIterator) SetBounds(lower, upper []byte) {
i.blockUpper = nil
}

func (i *singleLevelIterator) SetContext(ctx context.Context) {
i.ctx = ctx
}

// loadBlock loads the block at the current index position and leaves i.data
// unpositioned. If unsuccessful, it sets i.err to any error encountered, which
// may be nil if we have simply exhausted the entire table.
Expand Down
4 changes: 4 additions & 0 deletions sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func (i *iterAdapter) SetBounds(lower, upper []byte) {
i.key = nil
}

func (i *iterAdapter) SetContext(ctx context.Context) {
i.Iterator.SetContext(ctx)
}

func TestVirtualReader(t *testing.T) {
// A faux filenum used to create fake filemetadata for testing.
var fileNum int = 1
Expand Down

0 comments on commit a0b01b6

Please sign in to comment.