Skip to content

Commit

Permalink
sstable: fix two-level monotonic bounds bug
Browse files Browse the repository at this point in the history
This commit fixes a sstable iterator bug surfaced by the metamorphic test in
the issue #2816.

In the circumstances of this bug, a two-level sstable iterator with
monotonically advanced bounds could reuse the wrong index block if a previous
block was excluded due to a block-property filter. An accompanying regression
unit test is added too.

Additionally, it refactors some additional sstable tests to run against more
table formats and automatically against newly introduced table formats.

While this commit is sufficient to fix the bug, in subsequent work I anticipate
refactoring and tightening invariants. This refactoring will not be suitable
for a backport, so this commit first resolves the bug in a backportable way.
  • Loading branch information
jbowens committed Aug 16, 2023
1 parent 8801607 commit 97285ae
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 106 deletions.
18 changes: 16 additions & 2 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,8 +688,11 @@ func (i *blockIter) getFirstUserKey() []byte {
// SeekGE implements internalIterator.SeekGE, as documented in the pebble
// package.
func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, base.LazyValue) {
i.clearCache()
if invariants.Enabled && i.isDataInvalidated() {
panic(errors.AssertionFailedf("invalidated blockIter used"))
}

i.clearCache()
// Find the index of the smallest restart point whose key is > the key
// sought; index will be numRestarts if there is no such restart point.
i.offset = 0
Expand Down Expand Up @@ -818,8 +821,11 @@ func (i *blockIter) SeekPrefixGE(
// SeekLT implements internalIterator.SeekLT, as documented in the pebble
// package.
func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, base.LazyValue) {
i.clearCache()
if invariants.Enabled && i.isDataInvalidated() {
panic(errors.AssertionFailedf("invalidated blockIter used"))
}

i.clearCache()
// Find the index of the smallest restart point whose key is >= the key
// sought; index will be numRestarts if there is no such restart point.
i.offset = 0
Expand Down Expand Up @@ -986,6 +992,10 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
// First implements internalIterator.First, as documented in the pebble
// package.
func (i *blockIter) First() (*InternalKey, base.LazyValue) {
if invariants.Enabled && i.isDataInvalidated() {
panic(errors.AssertionFailedf("invalidated blockIter used"))
}

i.offset = 0
if !i.valid() {
return nil, base.LazyValue{}
Expand Down Expand Up @@ -1015,6 +1025,10 @@ func decodeRestart(b []byte) int32 {

// Last implements internalIterator.Last, as documented in the pebble package.
func (i *blockIter) Last() (*InternalKey, base.LazyValue) {
if invariants.Enabled && i.isDataInvalidated() {
panic(errors.AssertionFailedf("invalidated blockIter used"))
}

// Seek forward from the last restart point.
i.offset = decodeRestart(i.data[i.restarts+4*(i.numRestarts-1):])
if !i.valid() {
Expand Down
41 changes: 41 additions & 0 deletions sstable/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,47 @@ func runIterCmd(
continue
case "reset-stats":
*opts.stats = base.InternalIteratorStats{}
continue
case "internal-iter-state":
fmt.Fprintf(&b, "| %T:\n", origIter)
si, _ := origIter.(*singleLevelIterator)
if twoLevelIter, ok := origIter.(*twoLevelIterator); ok {
si = &twoLevelIter.singleLevelIterator
if twoLevelIter.topLevelIndex.valid() {
fmt.Fprintf(&b, "| topLevelIndex.Key() = %q\n", twoLevelIter.topLevelIndex.Key())
v := twoLevelIter.topLevelIndex.value()
bhp, err := decodeBlockHandleWithProperties(v.InPlaceValue())
if err != nil {
fmt.Fprintf(&b, "| topLevelIndex.InPlaceValue() failed to decode as BHP: %s\n", err)
} else {
fmt.Fprintf(&b, "| topLevelIndex.InPlaceValue() = (Offset: %d, Length: %d, Props: %x)\n",
bhp.Offset, bhp.Length, bhp.Props)
}
} else {
fmt.Fprintf(&b, "| topLevelIndex iter invalid\n")
}
fmt.Fprintf(&b, "| topLevelIndex.isDataInvalidated()=%t\n", twoLevelIter.topLevelIndex.isDataInvalidated())
}
if si.index.valid() {
fmt.Fprintf(&b, "| index.Key() = %q\n", si.index.Key())
v := si.index.value()
bhp, err := decodeBlockHandleWithProperties(v.InPlaceValue())
if err != nil {
fmt.Fprintf(&b, "| index.InPlaceValue() failed to decode as BHP: %s\n", err)
} else {
fmt.Fprintf(&b, "| index.InPlaceValue() = (Offset: %d, Length: %d, Props: %x)\n",
bhp.Offset, bhp.Length, bhp.Props)
}
} else {
fmt.Fprintf(&b, "| index iter invalid\n")
}
fmt.Fprintf(&b, "| index.isDataInvalidated()=%t\n", si.index.isDataInvalidated())
fmt.Fprintf(&b, "| data.isDataInvalidated()=%t\n", si.data.isDataInvalidated())
fmt.Fprintf(&b, "| hideObsoletePoints = %t\n", si.hideObsoletePoints)
fmt.Fprintf(&b, "| dataBH = (Offset: %d, Length: %d)\n", si.dataBH.Offset, si.dataBH.Length)
fmt.Fprintf(&b, "| (boundsCmp,positionedUsingLatestBounds) = (%d,%t)\n", si.boundsCmp, si.positionedUsingLatestBounds)
fmt.Fprintf(&b, "| exhaustedBounds = %d\n", si.exhaustedBounds)

continue
}
if opts.everyOp != nil {
Expand Down
1 change: 1 addition & 0 deletions sstable/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
TableFormatPebblev2 // Range keys.
TableFormatPebblev3 // Value blocks.
TableFormatPebblev4 // DELSIZED tombstones.
NumTableFormats

TableFormatMax = TableFormatPebblev4
)
Expand Down
12 changes: 10 additions & 2 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ func disableBoundsOpt(bound []byte, ptr uintptr) bool {
return bound[len(bound)-1]&byte(1) == 0 && simpleHash == 0
}

// ensureBoundsOptDeterminism provides a facility for disabling of the bounds
// optimizations performed by disableBoundsOpt for tests that require
// deterministic iterator behavior. Some unit tests examine internal iterator
// state and require this behavior to be deterministic.
var ensureBoundsOptDeterminism bool

// SetBounds implements internalIterator.SetBounds, as documented in the pebble
// package. Note that the upper field is exclusive.
func (i *singleLevelIterator) SetBounds(lower, upper []byte) {
Expand All @@ -320,12 +326,14 @@ func (i *singleLevelIterator) SetBounds(lower, upper []byte) {
if i.positionedUsingLatestBounds {
if i.upper != nil && lower != nil && i.cmp(i.upper, lower) <= 0 {
i.boundsCmp = +1
if invariants.Enabled && disableBoundsOpt(lower, uintptr(unsafe.Pointer(i))) {
if invariants.Enabled && !ensureBoundsOptDeterminism &&
disableBoundsOpt(lower, uintptr(unsafe.Pointer(i))) {
i.boundsCmp = 0
}
} else if i.lower != nil && upper != nil && i.cmp(upper, i.lower) <= 0 {
i.boundsCmp = -1
if invariants.Enabled && disableBoundsOpt(upper, uintptr(unsafe.Pointer(i))) {
if invariants.Enabled && !ensureBoundsOptDeterminism &&
disableBoundsOpt(upper, uintptr(unsafe.Pointer(i))) {
i.boundsCmp = 0
}
}
Expand Down
21 changes: 19 additions & 2 deletions sstable/reader_iter_two_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (i *twoLevelIterator) loadIndex(dir int8) loadBlockResult {
// Ensure the index data block iterators are invalidated even if loading of
// the index fails.
i.data.invalidate()
i.index.invalidate()
if !i.topLevelIndex.valid() {
i.index.offset = 0
i.index.restarts = 0
Expand Down Expand Up @@ -259,8 +260,16 @@ func (i *twoLevelIterator) SeekGE(
// the position of the two-level index iterator without remembering the
// previous value of maybeFilteredKeys.

// We fall into the slow path if i.index.isDataInvalidated() even if the
// top-level iterator is already positioned correctly and all other
// conditions are met. An alternative structure could reuse topLevelIndex's
// current position and reload the index block to which it points. Arguably,
// an index block load is expensive and the index block may still be earlier
// than the index block containing the sought key, resulting in a wasteful
// block load.

var dontSeekWithinSingleLevelIter bool
if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.valid() || err != nil ||
if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.valid() || i.index.isDataInvalidated() || err != nil ||
(i.boundsCmp <= 0 && !flags.TrySeekUsingNext()) || i.cmp(key, i.topLevelIndex.Key().UserKey) > 0 {
// Slow-path: need to position the topLevelIndex.

Expand Down Expand Up @@ -447,8 +456,16 @@ func (i *twoLevelIterator) SeekPrefixGE(
// not reuse the position of the two-level index iterator without
// remembering the previous value of maybeFilteredKeysTwoLevel.

// We fall into the slow path if i.index.isDataInvalidated() even if the
// top-level iterator is already positioned correctly and all other
// conditions are met. An alternative structure could reuse topLevelIndex's
// current position and reload the index block to which it points. Arguably,
// an index block load is expensive and the index block may still be earlier
// than the index block containing the sought key, resulting in a wasteful
// block load.

var dontSeekWithinSingleLevelIter bool
if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.valid() || err != nil ||
if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.valid() || i.index.isDataInvalidated() || err != nil ||
(i.boundsCmp <= 0 && !flags.TrySeekUsingNext()) || i.cmp(key, i.topLevelIndex.Key().UserKey) > 0 {
// Slow-path: need to position the topLevelIndex.

Expand Down
111 changes: 70 additions & 41 deletions sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestReader(t *testing.T) {
"prefixFilter": "testdata/prefixreader",
}

for _, format := range []TableFormat{TableFormatPebblev2, TableFormatPebblev3, TableFormatPebblev4} {
for format := TableFormatPebblev2; format <= TableFormatMax; format++ {
for dName, blockSize := range blockSizes {
for iName, indexBlockSize := range blockSizes {
for lName, tableOpt := range writerOpts {
Expand All @@ -486,7 +486,7 @@ func TestReader(t *testing.T) {
format, oName, lName, dName, iName),
func(t *testing.T) {
runTestReader(
t, tableOpt, testDirs[oName], nil /* Reader */, 0, false, true)
t, tableOpt, testDirs[oName], nil /* Reader */, true)
})
}
}
Expand All @@ -513,7 +513,7 @@ func TestReaderHideObsolete(t *testing.T) {
t.Run(fmt.Sprintf("blockSize=%s", dName), func(t *testing.T) {
runTestReader(
t, opts, "testdata/reader_hide_obsolete",
nil /* Reader */, 0, false, true)
nil /* Reader */, true)
})
}
}
Expand All @@ -539,46 +539,76 @@ func TestHamletReader(t *testing.T) {
t.Run(
fmt.Sprintf("sst=%s", prebuiltSST),
func(t *testing.T) {
runTestReader(t, WriterOptions{}, "testdata/hamletreader", r, 0, false, false)
runTestReader(t, WriterOptions{}, "testdata/hamletreader", r, false)
},
)
}
}

func TestReaderStats(t *testing.T) {
tableOpt := WriterOptions{
BlockSize: 30,
IndexBlockSize: 30,
func forEveryTableFormat[I any](
t *testing.T, formatTable [NumTableFormats]I, runTest func(*testing.T, TableFormat, I),
) {
t.Helper()
for tf := TableFormatUnspecified + 1; tf < TableFormatMax; tf++ {
t.Run(tf.String(), func(t *testing.T) {
runTest(t, tf, formatTable[tf])
})
}
runTestReader(t, tableOpt, "testdata/readerstats", nil, 10000, false, false)
}

func TestReaderStatsV3(t *testing.T) {
writerOpt := WriterOptions{
BlockSize: 32 << 10,
IndexBlockSize: 32 << 10,
Comparer: testkeys.Comparer,
TableFormat: TableFormatPebblev3,
}
tdFile := "testdata/readerstats_v3"
runTestReader(t, writerOpt, tdFile, nil /* Reader */, 0, true, false)
func TestReaderStats(t *testing.T) {
forEveryTableFormat[string](t,
[NumTableFormats]string{
TableFormatUnspecified: "",
TableFormatLevelDB: "testdata/readerstats_LevelDB",
TableFormatRocksDBv2: "testdata/readerstats_LevelDB",
TableFormatPebblev1: "testdata/readerstats_LevelDB",
TableFormatPebblev2: "testdata/readerstats_LevelDB",
TableFormatPebblev3: "testdata/readerstats_Pebblev3",
TableFormatPebblev4: "testdata/readerstats_Pebblev3",
}, func(t *testing.T, format TableFormat, dir string) {
if dir == "" {
t.Skip()
}
writerOpt := WriterOptions{
BlockSize: 32 << 10,
IndexBlockSize: 32 << 10,
Comparer: testkeys.Comparer,
TableFormat: format,
}
runTestReader(t, writerOpt, dir, nil /* Reader */, false /* printValue */)
})
}

func TestReaderWithBlockPropertyFilter(t *testing.T) {
for _, format := range []TableFormat{TableFormatPebblev2, TableFormatPebblev3} {
writerOpt := WriterOptions{
BlockSize: 1,
IndexBlockSize: 40,
Comparer: testkeys.Comparer,
TableFormat: format,
BlockPropertyCollectors: []func() BlockPropertyCollector{NewTestKeysBlockPropertyCollector},
}
tdFile := "testdata/reader_bpf"
if format == TableFormatPebblev3 {
tdFile = "testdata/reader_bpf_v3"
}
runTestReader(t, writerOpt, tdFile, nil /* Reader */, 0, true, false)
}
// Some of these tests examine internal iterator state, so they require
// determinism. When the invariants tag is set, disableBoundsOpt may disable
// the bounds optimization depending on the iterator pointer address. This
// can add nondeterminism to the internal iterator statae. Disable this
// nondeterminism for the duration of this test.
ensureBoundsOptDeterminism = true
defer func() { ensureBoundsOptDeterminism = false }()

forEveryTableFormat[string](t,
[NumTableFormats]string{
TableFormatUnspecified: "", // Block properties unsupported
TableFormatLevelDB: "", // Block properties unsupported
TableFormatRocksDBv2: "", // Block properties unsupported
TableFormatPebblev1: "", // Block properties unsupported
TableFormatPebblev2: "testdata/reader_bpf/Pebblev2",
TableFormatPebblev3: "testdata/reader_bpf/Pebblev3",
TableFormatPebblev4: "testdata/reader_bpf/Pebblev3",
}, func(t *testing.T, format TableFormat, dir string) {
if dir == "" {
t.Skip("Block-properties unsupported")
}
writerOpt := WriterOptions{
Comparer: testkeys.Comparer,
TableFormat: format,
BlockPropertyCollectors: []func() BlockPropertyCollector{NewTestKeysBlockPropertyCollector},
}
runTestReader(t, writerOpt, dir, nil /* Reader */, false)
})
}

func TestInjectedErrors(t *testing.T) {
Expand Down Expand Up @@ -702,15 +732,7 @@ func indexLayoutString(t *testing.T, r *Reader) string {
return buf.String()
}

func runTestReader(
t *testing.T,
o WriterOptions,
dir string,
r *Reader,
cacheSize int,
printLayout bool,
printValue bool,
) {
func runTestReader(t *testing.T, o WriterOptions, dir string, r *Reader, printValue bool) {
datadriven.Walk(t, dir, func(t *testing.T, path string) {
defer func() {
if r != nil {
Expand All @@ -726,6 +748,13 @@ func runTestReader(
r.Close()
r = nil
}
var cacheSize int
var printLayout bool
d.MaybeScanArgs(t, "cache-size", &cacheSize)
d.MaybeScanArgs(t, "print-layout", &printLayout)
d.MaybeScanArgs(t, "block-size", &o.BlockSize)
d.MaybeScanArgs(t, "index-block-size", &o.IndexBlockSize)

var err error
_, r, err = runBuildCmd(d, &o, cacheSize)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Test case for bug https://github.com/cockroachdb/pebble/issues/2036 Build
# sstable with two-level index, with two data blocks in each lower-level index
# block.
build
build block-size=1 index-block-size=40 print-layout=true
[email protected]:cAT10
[email protected]:dAT7
[email protected]:eAT15
Expand Down
Loading

0 comments on commit 97285ae

Please sign in to comment.