Skip to content

Commit

Permalink
manifest: Added Virtual SSTable Metadata Bounds Checking
Browse files Browse the repository at this point in the history
Added a check inside of `FileMetadata.ValidateVirtual()` to ensure that
the keys returned from a virtual sstable are within the bounds of the
Smallest and Largest metadata keys.

Informs: #2593
  • Loading branch information
raggar committed Jul 13, 2023
1 parent 168079a commit 7f872b3
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 0 deletions.
28 changes: 28 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2489,3 +2489,31 @@ func (d *DB) SetCreatorID(creatorID uint64) error {
func (d *DB) ObjProvider() objstorage.Provider {
return d.objProvider
}

func (d *DB) checkVirtualBounds(m *fileMetadata) {
if !invariants.Enabled {
return
}

it, _, err := d.newIters(context.TODO(), m, nil, internalIterOpts{})
if err != nil {
panic(fmt.Sprintf("pebble: error creating iterator :%s", err.Error()))
}

// Check that virtual sstable bounds are tight from both ends.
first, _ := it.First()
if d.cmp(first.UserKey, m.Smallest.UserKey) != 0 {
panic(fmt.Sprintf("pebble: virtual sstable %s lower bound is not tight: %s != %s", m.FileNum, m.Smallest.UserKey, first.UserKey))
}
last, _ := it.Last()
if d.cmp(last.UserKey, m.Largest.UserKey) != 0 {
panic(fmt.Sprintf("pebble: virtual sstable %s upper bound is not tight: %s != %s", m.FileNum, m.Largest.UserKey, last.UserKey))
}

// Check that iterator keys are within bounds.
for key, _ := it.First(); key != nil; key, _ = it.Next() {
if d.cmp(key.UserKey, m.Smallest.UserKey) < 0 || d.cmp(key.UserKey, m.Largest.UserKey) > 0 {
panic("pebble: virtual sstable key is not within bounds")
}
}
}
5 changes: 5 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,9 @@ func (d *DB) excise(
SmallestSeqNum: m.SmallestSeqNum,
LargestSeqNum: m.LargestSeqNum,
}
leftFile.ValidateVirtual(m)
d.checkVirtualBounds(leftFile)

if m.HasPointKeys && !exciseSpan.Contains(d.cmp, m.SmallestPointKey) {
// This file will contain point keys
smallestPointKey := m.SmallestPointKey
Expand Down Expand Up @@ -1511,6 +1514,8 @@ func (d *DB) excise(
SmallestSeqNum: m.SmallestSeqNum,
LargestSeqNum: m.LargestSeqNum,
}
rightFile.ValidateVirtual(m)
d.checkVirtualBounds(rightFile)
if m.HasPointKeys && !exciseSpan.Contains(d.cmp, m.LargestPointKey) {
// This file will contain point keys
largestPointKey := m.LargestPointKey
Expand Down
2 changes: 2 additions & 0 deletions table_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,9 @@ func TestVirtualReadsWiring(t *testing.T) {
v2.SmallestPointKey = v2.Smallest

v1.ValidateVirtual(parentFile)
d.checkVirtualBounds(v1)
v2.ValidateVirtual(parentFile)
d.checkVirtualBounds(v2)

// Write the version edit.
fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics {
Expand Down
4 changes: 4 additions & 0 deletions version_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func TestLatestRefCounting(t *testing.T) {
m2.SmallestPointKey = m2.Smallest

m1.ValidateVirtual(f)
d.checkVirtualBounds(m1)
m2.ValidateVirtual(f)
d.checkVirtualBounds(m2)

fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics {
metrics := newFileMetrics(ve.NewFiles)
Expand Down Expand Up @@ -282,7 +284,9 @@ func TestVirtualSSTableManifestReplay(t *testing.T) {
m2.Stats.NumEntries = 1

m1.ValidateVirtual(f)
d.checkVirtualBounds(m1)
m2.ValidateVirtual(f)
d.checkVirtualBounds(m2)

fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics {
metrics := newFileMetrics(ve.NewFiles)
Expand Down

0 comments on commit 7f872b3

Please sign in to comment.