Skip to content

Commit

Permalink
sstable: Add bounds checking for virtual sstable iterators
Browse files Browse the repository at this point in the history
This pr adds bounds checking (global and block) for singleLevelIterator
when returning a value.

Fixes: #2593 Release note: None
  • Loading branch information
raggar committed Jun 9, 2023
1 parent fe84618 commit cfa91ed
Showing 1 changed file with 67 additions and 13 deletions.
80 changes: 67 additions & 13 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,31 @@ func (i *singleLevelIterator) init(
return nil
}

// helper function to check if keys returned from iterator are within block and global bounds
func (i *singleLevelIterator) MaybeCheckKeyWithinBounds(iKey *InternalKey) {
// used for virtual sstable iterators
if invariants.Enabled && i.vState != nil && iKey != nil {
key := iKey.UserKey
var withinUpper = i.cmp(key, i.upper) <= 0
var withinLower = i.cmp(key, i.lower) >= 0

if i.blockUpper != nil {
withinUpper = withinUpper && i.cmp(key, i.blockUpper) <= 0
}
if i.blockLower != nil {
withinLower = withinLower && i.cmp(key, i.blockLower) >= 0
}
if i.vState != nil {
withinUpper = withinUpper && i.cmp(key, i.vState.upper.UserKey) <= 0
withinLower = withinLower && i.cmp(key, i.vState.lower.UserKey) >= 0
}

if !withinUpper || !withinLower {
panic(fmt.Sprintf("key: %s out of bounds of singleLevelIterator", key))
}
}
}

// setupForCompaction sets up the singleLevelIterator for use with compactionIter.
// Currently, it skips readahead ramp-up. It should be called after init is called.
func (i *singleLevelIterator) setupForCompaction() {
Expand Down Expand Up @@ -932,6 +957,7 @@ func (i *singleLevelIterator) SeekPrefixGE(
}
}
k, v := i.seekPrefixGE(prefix, key, flags, i.useFilter)
i.MaybeCheckKeyWithinBounds(k)
return k, v
}

Expand Down Expand Up @@ -1143,7 +1169,9 @@ func (i *singleLevelIterator) SeekLT(
// be chosen as "compleu". The SeekGE in the index block will then point
// us to the block containing "complexion". If this happens, we want the
// last key from the previous data block.
return i.skipBackward()
ikey, val := i.skipBackward()
i.MaybeCheckKeyWithinBounds(ikey)
return ikey, val
}

// First implements internalIterator.First, as documented in the pebble
Expand All @@ -1163,7 +1191,10 @@ func (i *singleLevelIterator) First() (*InternalKey, base.LazyValue) {
}
i.positionedUsingLatestBounds = true
i.maybeFilteredKeysSingleLevel = false
return i.firstInternal()

key, val := i.firstInternal()
i.MaybeCheckKeyWithinBounds(key)
return key, val
}

// firstInternal is a helper used for absolute positioning in a single-level
Expand Down Expand Up @@ -1231,7 +1262,9 @@ func (i *singleLevelIterator) Last() (*InternalKey, base.LazyValue) {
}
i.positionedUsingLatestBounds = true
i.maybeFilteredKeysSingleLevel = false
return i.lastInternal()
key, val := i.lastInternal()
i.MaybeCheckKeyWithinBounds(key)
return key, val
}

// lastInternal is a helper used for absolute positioning in a single-level
Expand Down Expand Up @@ -1377,7 +1410,10 @@ func (i *singleLevelIterator) NextPrefix(succKey []byte) (*InternalKey, base.Laz
}
return key, val
}
return i.skipForward()

k, v := i.skipForward()
i.MaybeCheckKeyWithinBounds(k)
return k, v
}

// Prev implements internalIterator.Prev, as documented in the pebble
Expand All @@ -1401,7 +1437,9 @@ func (i *singleLevelIterator) Prev() (*InternalKey, base.LazyValue) {
}
return key, val
}
return i.skipBackward()
key, val := i.skipBackward()
i.MaybeCheckKeyWithinBounds(key)
return key, val
}

func (i *singleLevelIterator) skipForward() (*InternalKey, base.LazyValue) {
Expand Down Expand Up @@ -2066,7 +2104,9 @@ func (i *twoLevelIterator) SeekGE(
return ikey, val
}
}
return i.skipForward()
ikey, val := i.skipForward()
i.MaybeCheckKeyWithinBounds(ikey)
return ikey, val
}

// SeekPrefixGE implements internalIterator.SeekPrefixGE, as documented in the
Expand Down Expand Up @@ -2245,7 +2285,9 @@ func (i *twoLevelIterator) SeekPrefixGE(
}
}
// NB: skipForward checks whether exhaustedBounds is already +1.
return i.skipForward()
k, v := i.skipForward()
i.singleLevelIterator.MaybeCheckKeyWithinBounds(k)
return k, v
}

// virtualLast should only be called if i.vReader != nil and i.endKeyInclusive
Expand Down Expand Up @@ -2357,7 +2399,9 @@ func (i *twoLevelIterator) SeekLT(
}
}
// NB: skipBackward checks whether exhaustedBounds is already -1.
return i.skipBackward()
k, v := i.skipBackward()
i.singleLevelIterator.MaybeCheckKeyWithinBounds(k)
return k, v
}

// First implements internalIterator.First, as documented in the pebble
Expand Down Expand Up @@ -2410,7 +2454,9 @@ func (i *twoLevelIterator) First() (*InternalKey, base.LazyValue) {
}
}
// NB: skipForward checks whether exhaustedBounds is already +1.
return i.skipForward()
k, v := i.skipForward()
i.singleLevelIterator.MaybeCheckKeyWithinBounds(k)
return k, v
}

// Last implements internalIterator.Last, as documented in the pebble
Expand Down Expand Up @@ -2460,7 +2506,9 @@ func (i *twoLevelIterator) Last() (*InternalKey, base.LazyValue) {
}
}
// NB: skipBackward checks whether exhaustedBounds is already -1.
return i.skipBackward()
k, v := i.skipBackward()
i.singleLevelIterator.MaybeCheckKeyWithinBounds(k)
return k, v
}

// Next implements internalIterator.Next, as documented in the pebble
Expand All @@ -2477,7 +2525,9 @@ func (i *twoLevelIterator) Next() (*InternalKey, base.LazyValue) {
if key, val := i.singleLevelIterator.Next(); key != nil {
return key, val
}
return i.skipForward()
ikey, val := i.skipForward()
i.MaybeCheckKeyWithinBounds(ikey)
return ikey, val
}

// NextPrefix implements (base.InternalIterator).NextPrefix.
Expand Down Expand Up @@ -2521,7 +2571,9 @@ func (i *twoLevelIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyVa
} else if key, val := i.singleLevelIterator.SeekGE(succKey, base.SeekGEFlagsNone); key != nil {
return key, val
}
return i.skipForward()
k, v := i.skipForward()
i.MaybeCheckKeyWithinBounds(k)
return k, v
}

// Prev implements internalIterator.Prev, as documented in the pebble
Expand All @@ -2536,7 +2588,9 @@ func (i *twoLevelIterator) Prev() (*InternalKey, base.LazyValue) {
if key, val := i.singleLevelIterator.Prev(); key != nil {
return key, val
}
return i.skipBackward()
k, v := i.skipBackward()
i.singleLevelIterator.MaybeCheckKeyWithinBounds(k)
return k, v
}

func (i *twoLevelIterator) skipForward() (*InternalKey, base.LazyValue) {
Expand Down

0 comments on commit cfa91ed

Please sign in to comment.