Skip to content

Commit

Permalink
internal/keyspan: add error return value to FragmentIterator methods
Browse files Browse the repository at this point in the history
This commit refactors the keyspan.FragmentIterator interface to propagate
errors from the positioning method that encounters the error, removing the
`Error()` method from the interface. This makes it more difficult to
accidentally ignore an error value by forgetting to check `Error()`.
  • Loading branch information
jbowens committed Jan 18, 2024
1 parent 4e6dda4 commit d4e3355
Show file tree
Hide file tree
Showing 49 changed files with 1,066 additions and 904 deletions.
12 changes: 10 additions & 2 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,8 @@ func TestBatchRangeOps(t *testing.T) {

var buf bytes.Buffer
if fragmentIter != nil {
for s := fragmentIter.First(); s != nil; s = fragmentIter.Next() {
s, err := fragmentIter.First()
for ; s != nil; s, err = fragmentIter.Next() {
for i := range s.Keys {
s.Keys[i].Trailer = base.MakeTrailer(
s.Keys[i].SeqNum()&^base.InternalKeySeqNumBatch,
Expand All @@ -990,6 +991,9 @@ func TestBatchRangeOps(t *testing.T) {
}
fmt.Fprintln(&buf, s)
}
if err != nil {
return err.Error()
}
} else {
for k, v := internalIter.First(); k != nil; k, v = internalIter.Next() {
k.SetSeqNum(k.SeqNum() &^ InternalKeySeqNumBatch)
Expand Down Expand Up @@ -1184,9 +1188,13 @@ func scanInternalIter(w io.Writer, ii internalIterator) {
}

func scanKeyspanIterator(w io.Writer, ki keyspan.FragmentIterator) {
for s := ki.First(); s != nil; s = ki.Next() {
s, err := ki.First()
for ; s != nil; s, err = ki.Next() {
fmt.Fprintln(w, s)
}
if err != nil {
fmt.Fprintf(w, "err=%q", err.Error())
}
}

func TestFlushableBatchBytesIterated(t *testing.T) {
Expand Down
30 changes: 21 additions & 9 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64)

func newFlush(
opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
) *compaction {
) (*compaction, error) {
c := &compaction{
kind: compactionKindFlush,
cmp: opts.Comparer.Compare,
Expand All @@ -895,7 +895,7 @@ func newFlush(
panic("pebble: ingestedFlushable must be flushed one at a time.")
}
c.kind = compactionKindIngestedFlushable
return c
return c, nil
}
}

Expand Down Expand Up @@ -929,35 +929,44 @@ func newFlush(
}
}

updateRangeBounds := func(iter keyspan.FragmentIterator) {
updateRangeBounds := func(iter keyspan.FragmentIterator) error {
// File bounds require s != nil && !s.Empty(). We only need to check for
// s != nil here, as the memtable's FragmentIterator would never surface
// empty spans.
if s := iter.First(); s != nil {
if s, err := iter.First(); err != nil {
return err
} else if s != nil {
if key := s.SmallestKey(); !smallestSet ||
base.InternalCompare(c.cmp, c.smallest, key) > 0 {
smallestSet = true
c.smallest = key.Clone()
}
}
if s := iter.Last(); s != nil {
if s, err := iter.Last(); err != nil {
return err
} else if s != nil {
if key := s.LargestKey(); !largestSet ||
base.InternalCompare(c.cmp, c.largest, key) < 0 {
largestSet = true
c.largest = key.Clone()
}
}
return nil
}

var flushingBytes uint64
for i := range flushing {
f := flushing[i]
updatePointBounds(f.newIter(nil))
if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
updateRangeBounds(rangeDelIter)
if err := updateRangeBounds(rangeDelIter); err != nil {
return nil, err
}
}
if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
updateRangeBounds(rangeKeyIter)
if err := updateRangeBounds(rangeKeyIter); err != nil {
return nil, err
}
}
flushingBytes += f.inuseBytes()
}
Expand All @@ -971,7 +980,7 @@ func newFlush(
}

c.setupInuseKeyRanges()
return c
return c, nil
}

func (c *compaction) hasExtraLevelData() bool {
Expand Down Expand Up @@ -1909,8 +1918,11 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
}
}

c := newFlush(d.opts, d.mu.versions.currentVersion(),
c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
if err != nil {
return 0, err
}
d.addInProgressCompaction(c)

jobID := d.mu.nextJobID
Expand Down
33 changes: 26 additions & 7 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,19 +531,24 @@ func runBuildRemoteCmd(td *datadriven.TestData, d *DB, storage remote.Storage) e
}

if rdi := b.newRangeDelIter(nil, math.MaxUint64); rdi != nil {
for s := rdi.First(); s != nil; s = rdi.Next() {
err := rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
s, err := rdi.First()
for ; s != nil && err == nil; s, err = rdi.Next() {
err = rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
k.SetSeqNum(0)
return w.Add(k, v)
})
if err != nil {
return err
}
}
if err != nil {
return err
}
}

if rki := b.newRangeKeyIter(nil, math.MaxUint64); rki != nil {
for s := rki.First(); s != nil; s = rki.Next() {
s, err := rki.First()
for ; s != nil; s, err = rki.Next() {
for _, k := range s.Keys {
var err error
switch k.Kind() {
Expand All @@ -561,6 +566,9 @@ func runBuildRemoteCmd(td *datadriven.TestData, d *DB, storage remote.Storage) e
}
}
}
if err != nil {
return err
}
}

return w.Close()
Expand Down Expand Up @@ -617,19 +625,24 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
}

if rdi := b.newRangeDelIter(nil, math.MaxUint64); rdi != nil {
for s := rdi.First(); s != nil; s = rdi.Next() {
err := rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
s, err := rdi.First()
for ; s != nil && err == nil; s, err = rdi.Next() {
err = rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
k.SetSeqNum(0)
return w.Add(k, v)
})
if err != nil {
return err
}
}
if err != nil {
return err
}
}

if rki := b.newRangeKeyIter(nil, math.MaxUint64); rki != nil {
for s := rki.First(); s != nil; s = rki.Next() {
s, err := rki.First()
for ; s != nil; s, err = rki.Next() {
for _, k := range s.Keys {
var err error
switch k.Kind() {
Expand All @@ -647,6 +660,9 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
}
}
}
if err != nil {
return err
}
}

return w.Close()
Expand Down Expand Up @@ -841,8 +857,11 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
flushable: mem,
flushed: make(chan struct{}),
}}
c := newFlush(d.opts, d.mu.versions.currentVersion(),
c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
d.mu.versions.picker.getBaseLevel(), toFlush, time.Now())
if err != nil {
return err
}
c.disableSpanElision = true
// NB: define allows the test to exactly specify which keys go
// into which sstables. If the test has a small target file
Expand Down
50 changes: 33 additions & 17 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3090,7 +3090,10 @@ func (d *DB) checkVirtualBounds(m *fileMetadata) {
pointKey, _ := pointIter.First()
var rangeDel *keyspan.Span
if rangeDelIter != nil {
rangeDel = rangeDelIter.First()
rangeDel, err = rangeDelIter.First()
if err != nil {
panic(err)
}
}

// Check that the lower bound is tight.
Expand All @@ -3102,7 +3105,10 @@ func (d *DB) checkVirtualBounds(m *fileMetadata) {
pointKey, _ = pointIter.Last()
rangeDel = nil
if rangeDelIter != nil {
rangeDel = rangeDelIter.Last()
rangeDel, err = rangeDelIter.Last()
if err != nil {
panic(err)
}
}

// Check that the upper bound is tight.
Expand All @@ -3119,15 +3125,18 @@ func (d *DB) checkVirtualBounds(m *fileMetadata) {
}

if rangeDelIter != nil {
for key := rangeDelIter.First(); key != nil; key = rangeDelIter.Next() {
if d.cmp(key.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey))
s, err := rangeDelIter.First()
for ; s != nil; s, err = rangeDelIter.Next() {
if d.cmp(s.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.SmallestKey().UserKey))
}

if d.cmp(key.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey))
if d.cmp(s.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.LargestKey().UserKey))
}
}
if err != nil {
panic(err)
}
}
}

Expand All @@ -3136,28 +3145,35 @@ func (d *DB) checkVirtualBounds(m *fileMetadata) {
}

rangeKeyIter, err := d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
defer rangeKeyIter.Close()

if err != nil {
panic(errors.Wrap(err, "pebble: error creating range key iterator"))
}
defer rangeKeyIter.Close()

// Check that the lower bound is tight.
if d.cmp(rangeKeyIter.First().SmallestKey().UserKey, m.SmallestRangeKey.UserKey) != 0 {
if s, err := rangeKeyIter.First(); err != nil {
panic(err)
} else if d.cmp(s.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) != 0 {
panic(errors.Newf("pebble: virtual sstable %s lower range key bound is not tight", m.FileNum))
}

// Check that upper bound is tight.
if d.cmp(rangeKeyIter.Last().LargestKey().UserKey, m.LargestRangeKey.UserKey) != 0 {
if s, err := rangeKeyIter.Last(); err != nil {
panic(err)
} else if d.cmp(s.LargestKey().UserKey, m.LargestRangeKey.UserKey) != 0 {
panic(errors.Newf("pebble: virtual sstable %s upper range key bound is not tight", m.FileNum))
}

for key := rangeKeyIter.First(); key != nil; key = rangeKeyIter.Next() {
if d.cmp(key.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) < 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey))
s, err := rangeKeyIter.First()
for ; s != nil; s, err = rangeKeyIter.Next() {
if d.cmp(s.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) < 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.SmallestKey().UserKey))
}
if d.cmp(key.LargestKey().UserKey, m.LargestRangeKey.UserKey) > 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey))
if d.cmp(s.LargestKey().UserKey, m.LargestRangeKey.UserKey) > 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.LargestKey().UserKey))
}
}
if err != nil {
panic(err)
}
}
19 changes: 9 additions & 10 deletions error_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,12 @@ type errorKeyspanIter struct {
// errorKeyspanIter implements the keyspan.FragmentIterator interface.
var _ keyspan.FragmentIterator = (*errorKeyspanIter)(nil)

func (*errorKeyspanIter) SeekGE(key []byte) *keyspan.Span { return nil }
func (*errorKeyspanIter) SeekLT(key []byte) *keyspan.Span { return nil }
func (*errorKeyspanIter) First() *keyspan.Span { return nil }
func (*errorKeyspanIter) Last() *keyspan.Span { return nil }
func (*errorKeyspanIter) Next() *keyspan.Span { return nil }
func (*errorKeyspanIter) Prev() *keyspan.Span { return nil }
func (i *errorKeyspanIter) Error() error { return i.err }
func (i *errorKeyspanIter) Close() error { return i.err }
func (*errorKeyspanIter) String() string { return "error" }
func (*errorKeyspanIter) WrapChildren(wrap keyspan.WrapFn) {}
func (i *errorKeyspanIter) SeekGE(key []byte) (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) SeekLT(key []byte) (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) First() (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) Last() (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) Next() (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) Prev() (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) Close() error { return i.err }
func (*errorKeyspanIter) String() string { return "error" }
func (*errorKeyspanIter) WrapChildren(wrap keyspan.WrapFn) {}
16 changes: 12 additions & 4 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,30 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
iter := flushable.newRangeKeyIter(nil)
var buf bytes.Buffer
if iter != nil {
for span := iter.First(); span != nil; span = iter.Next() {
span, err := iter.First()
for ; span != nil; span, err = iter.Next() {
buf.WriteString(span.String())
buf.WriteString("\n")
}
iter.Close()
err = firstError(err, iter.Close())
if err != nil {
fmt.Fprintf(&buf, "err=%q", err.Error())
}
}
return buf.String()
case "rangedelIter":
iter := flushable.newRangeDelIter(nil)
var buf bytes.Buffer
if iter != nil {
for span := iter.First(); span != nil; span = iter.Next() {
span, err := iter.First()
for ; span != nil; span, err = iter.Next() {
buf.WriteString(span.String())
buf.WriteString("\n")
}
iter.Close()
err = firstError(err, iter.Close())
if err != nil {
fmt.Fprintf(&buf, "err=%q", err.Error())
}
}
return buf.String()
case "readyForFlush":
Expand Down
Loading

0 comments on commit d4e3355

Please sign in to comment.