diff --git a/batch.go b/batch.go index a54d1d388f..b7bf4110bf 100644 --- a/batch.go +++ b/batch.go @@ -1702,6 +1702,8 @@ func (i *batchIter) SetBounds(lower, upper []byte) { i.iter.SetBounds(lower, upper) } +func (i *batchIter) SetContext(_ context.Context) {} + type flushableBatchEntry struct { // offset is the byte offset of the record within the batch repr. offset uint32 @@ -2161,6 +2163,8 @@ func (i *flushableBatchIter) SetBounds(lower, upper []byte) { i.upper = upper } +func (i *flushableBatchIter) SetContext(_ context.Context) {} + // flushFlushableBatchIter is similar to flushableBatchIter but it keeps track // of number of bytes iterated. type flushFlushableBatchIter struct { diff --git a/error_iter.go b/error_iter.go index de0ed35118..9b23904c9e 100644 --- a/error_iter.go +++ b/error_iter.go @@ -5,6 +5,8 @@ package pebble import ( + "context" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" ) @@ -68,6 +70,8 @@ func (c *errorIter) String() string { func (c *errorIter) SetBounds(lower, upper []byte) {} +func (c *errorIter) SetContext(_ context.Context) {} + type errorKeyspanIter struct { err error } diff --git a/external_iterator.go b/external_iterator.go index 7c70bd240c..3dc1fb1719 100644 --- a/external_iterator.go +++ b/external_iterator.go @@ -545,6 +545,8 @@ func (s *simpleLevelIter) SetBounds(lower, upper []byte) { s.resetFilteredIters() } +func (s *simpleLevelIter) SetContext(_ context.Context) {} + func (s *simpleLevelIter) String() string { if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) { return "simpleLevelIter: current=" diff --git a/get_iter.go b/get_iter.go index 99c5d7c52c..cbafafd1b6 100644 --- a/get_iter.go +++ b/get_iter.go @@ -241,3 +241,5 @@ func (g *getIter) Close() error { func (g *getIter) SetBounds(lower, upper []byte) { panic("pebble: SetBounds unimplemented") } + +func (g *getIter) SetContext(_ context.Context) {} diff --git a/internal/arenaskl/iterator.go b/internal/arenaskl/iterator.go index bad49099f6..a41dd7e747 100644 --- a/internal/arenaskl/iterator.go +++ b/internal/arenaskl/iterator.go @@ -18,6 +18,7 @@ package arenaskl import ( + "context" "sync" "github.com/cockroachdb/pebble/internal/base" @@ -238,6 +239,9 @@ func (it *Iterator) SetBounds(lower, upper []byte) { it.upper = upper } +// SetContext implements base.InternalIterator. +func (it *Iterator) SetContext(_ context.Context) {} + func (it *Iterator) decodeKey() { it.key.UserKey = it.list.arena.getBytes(it.nd.keyOffset, it.nd.keySize) it.key.Trailer = it.nd.keyTrailer diff --git a/internal/base/iterator.go b/internal/base/iterator.go index 8ab10a6cf1..1b72432f68 100644 --- a/internal/base/iterator.go +++ b/internal/base/iterator.go @@ -5,6 +5,7 @@ package base import ( + "context" "fmt" "time" ) @@ -204,6 +205,10 @@ type InternalIterator interface { // optimizations. SetBounds(lower, upper []byte) + // SetContext replaces the context provided at iterator creation, or the + // last one provided by SetContext. + SetContext(ctx context.Context) + fmt.Stringer } diff --git a/internal/invalidating/iter.go b/internal/invalidating/iter.go index 48909ec936..e27db5897d 100644 --- a/internal/invalidating/iter.go +++ b/internal/invalidating/iter.go @@ -5,6 +5,8 @@ package invalidating import ( + "context" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/fastrand" "github.com/cockroachdb/pebble/internal/invariants" @@ -157,6 +159,10 @@ func (i *iter) SetBounds(lower, upper []byte) { i.iter.SetBounds(lower, upper) } +func (i *iter) SetContext(ctx context.Context) { + i.iter.SetContext(ctx) +} + func (i *iter) String() string { return i.iter.String() } diff --git a/internal/keyspan/interleaving_iter.go b/internal/keyspan/interleaving_iter.go index f993803a91..8679dad39c 100644 --- a/internal/keyspan/interleaving_iter.go +++ b/internal/keyspan/interleaving_iter.go @@ -5,6 +5,7 @@ package keyspan import ( + "context" "fmt" "github.com/cockroachdb/errors" @@ -1050,6 +1051,11 @@ func (i *InterleavingIter) SetBounds(lower, upper []byte) { i.Invalidate() } +// SetContext implements (base.InternalIterator).SetContext. +func (i *InterleavingIter) SetContext(ctx context.Context) { + i.pointIter.SetContext(ctx) +} + // Invalidate invalidates the interleaving iterator's current position, clearing // its state. This prevents optimizations such as reusing the current span on // seek. diff --git a/internal/keyspan/interleaving_iter_test.go b/internal/keyspan/interleaving_iter_test.go index 564857b30d..116f037614 100644 --- a/internal/keyspan/interleaving_iter_test.go +++ b/internal/keyspan/interleaving_iter_test.go @@ -6,6 +6,7 @@ package keyspan import ( "bytes" + "context" "fmt" "io" "sort" @@ -287,3 +288,4 @@ func (i *pointIterator) String() string { return "test-point-iterator" } func (i *pointIterator) SetBounds(lower, upper []byte) { i.lower, i.upper = lower, upper } +func (i *pointIterator) SetContext(_ context.Context) {} diff --git a/internal/keyspan/internal_iter_shim.go b/internal/keyspan/internal_iter_shim.go index cf2adc3ff2..bb9e37bdf9 100644 --- a/internal/keyspan/internal_iter_shim.go +++ b/internal/keyspan/internal_iter_shim.go @@ -4,7 +4,11 @@ package keyspan -import "github.com/cockroachdb/pebble/internal/base" +import ( + "context" + + "github.com/cockroachdb/pebble/internal/base" +) // InternalIteratorShim is a temporary iterator type used as a shim between // keyspan.MergingIter and base.InternalIterator. It's used temporarily for @@ -112,6 +116,9 @@ func (i *InternalIteratorShim) Close() error { func (i *InternalIteratorShim) SetBounds(lower, upper []byte) { } +// SetContext implements (base.InternalIterator).SetContext. +func (i *InternalIteratorShim) SetContext(_ context.Context) {} + // String implements fmt.Stringer. func (i *InternalIteratorShim) String() string { return i.miter.String() diff --git a/iterator.go b/iterator.go index 821fa45e4b..831e31512e 100644 --- a/iterator.go +++ b/iterator.go @@ -2392,6 +2392,22 @@ func (i *Iterator) SetBounds(lower, upper []byte) { i.invalidate() } +// SetContext replaces the context provided at iterator creation, or the last +// one provided by SetContext. Even though iterators are expected to be +// short-lived, there are some cases where either (a) iterators are used far +// from the code that created them, (b) iterators are reused (while being +// short-lived) for processing different requests. For such scenarios, we +// allow the caller to replace the context. +func (i *Iterator) SetContext(ctx context.Context) { + i.ctx = ctx + i.iter.SetContext(ctx) + // If the iterator has an open point iterator that's not currently being + // used, propagate the new context to it. + if i.pointIter != nil && !i.opts.pointKeys() { + i.pointIter.SetContext(i.ctx) + } +} + // Initialization and changing of the bounds must call processBounds. // processBounds saves the bounds and computes derived state from those // bounds. diff --git a/iterator_test.go b/iterator_test.go index 4ce2f811a6..0be563dc71 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -225,6 +225,8 @@ func (f *fakeIter) SetBounds(lower, upper []byte) { f.upper = upper } +func (f *fakeIter) SetContext(_ context.Context) {} + // testIterator tests creating a combined iterator from a number of sub- // iterators. newFunc is a constructor function. splitFunc returns a random // split of the testKeyValuePairs slice such that walking a combined iterator diff --git a/level_iter.go b/level_iter.go index ae6b045341..424e71b8b1 100644 --- a/level_iter.go +++ b/level_iter.go @@ -1232,6 +1232,13 @@ func (l *levelIter) SetBounds(lower, upper []byte) { l.iter.SetBounds(l.tableOpts.LowerBound, l.tableOpts.UpperBound) } +func (l *levelIter) SetContext(ctx context.Context) { + l.ctx = ctx + // TODO(sumeer): this is losing the ctx = objiotracing.WithLevel(ctx, + // manifest.LevelToInt(opts.level)) that happens in table_cache.go. + l.iter.SetContext(ctx) +} + func (l *levelIter) String() string { if l.iterFile != nil { return fmt.Sprintf("%s: fileNum=%s", l.level, l.iter.String()) diff --git a/merging_iter.go b/merging_iter.go index a07668692a..a4341085c5 100644 --- a/merging_iter.go +++ b/merging_iter.go @@ -6,6 +6,7 @@ package pebble import ( "bytes" + "context" "fmt" "runtime/debug" "unsafe" @@ -1358,6 +1359,12 @@ func (m *mergingIter) SetBounds(lower, upper []byte) { m.heap.clear() } +func (m *mergingIter) SetContext(ctx context.Context) { + for i := range m.levels { + m.levels[i].iter.SetContext(ctx) + } +} + func (m *mergingIter) DebugString() string { var buf bytes.Buffer sep := "" diff --git a/range_keys.go b/range_keys.go index 15c3ad7e16..e46b2bedfe 100644 --- a/range_keys.go +++ b/range_keys.go @@ -5,6 +5,8 @@ package pebble import ( + "context" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" @@ -693,6 +695,14 @@ func (i *lazyCombinedIter) SetBounds(lower, upper []byte) { i.pointIter.SetBounds(lower, upper) } +func (i *lazyCombinedIter) SetContext(ctx context.Context) { + if i.combinedIterState.initialized { + i.parent.rangeKey.iiter.SetContext(ctx) + return + } + i.pointIter.SetContext(ctx) +} + func (i *lazyCombinedIter) String() string { if i.combinedIterState.initialized { return i.parent.rangeKey.iiter.String() diff --git a/scan_internal.go b/scan_internal.go index d36a6e98c4..653c987d93 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -354,6 +354,10 @@ func (p *pointCollapsingIterator) SetBounds(lower, upper []byte) { p.iter.SetBounds(lower, upper) } +func (p *pointCollapsingIterator) SetContext(ctx context.Context) { + p.iter.SetContext(ctx) +} + // String implements the InternalIterator interface. func (p *pointCollapsingIterator) String() string { return p.iter.String() diff --git a/sstable/block.go b/sstable/block.go index c6345ea00e..9634d2d9d2 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -5,6 +5,7 @@ package sstable import ( + "context" "encoding/binary" "unsafe" @@ -1540,6 +1541,8 @@ func (i *blockIter) SetBounds(lower, upper []byte) { panic("pebble: SetBounds unimplemented") } +func (i *blockIter) SetContext(_ context.Context) {} + func (i *blockIter) valid() bool { return i.offset >= 0 && i.offset < i.restarts } diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index 8b094cd983..f2ca2154ac 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -347,6 +347,10 @@ func (i *singleLevelIterator) SetBounds(lower, upper []byte) { i.blockUpper = nil } +func (i *singleLevelIterator) SetContext(ctx context.Context) { + i.ctx = ctx +} + // loadBlock loads the block at the current index position and leaves i.data // unpositioned. If unsuccessful, it sets i.err to any error encountered, which // may be nil if we have simply exhausted the entire table. diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 320ce3b4e9..b5aae3affe 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -172,6 +172,10 @@ func (i *iterAdapter) SetBounds(lower, upper []byte) { i.key = nil } +func (i *iterAdapter) SetContext(ctx context.Context) { + i.Iterator.SetContext(ctx) +} + func TestVirtualReader(t *testing.T) { // A faux filenum used to create fake filemetadata for testing. var fileNum int = 1