Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

manifest: virtual iterator bounds checking virtual sstable metadata #2698

Merged
merged 1 commit into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2649,3 +2649,98 @@ func (d *DB) SetCreatorID(creatorID uint64) error {
func (d *DB) ObjProvider() objstorage.Provider {
return d.objProvider
}

func (d *DB) checkVirtualBounds(m *fileMetadata, iterOpts *IterOptions) {
if !invariants.Enabled {
return
}

if m.HasPointKeys {
pointIter, rangeDelIter, err := d.newIters(context.TODO(), m, iterOpts, internalIterOpts{})
if err != nil {
panic(errors.Wrap(err, "pebble: error creating point iterator"))
}

defer pointIter.Close()
if rangeDelIter != nil {
defer rangeDelIter.Close()
}

pointKey, _ := pointIter.First()
var rangeDel *keyspan.Span
if rangeDelIter != nil {
rangeDel = rangeDelIter.First()
}

// Check that the lower bound is tight.
if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.SmallestPointKey.UserKey) != 0) &&
(pointKey == nil || d.cmp(pointKey.UserKey, m.SmallestPointKey.UserKey) != 0) {
panic(errors.Newf("pebble: virtual sstable %s lower point key bound is not tight", m.FileNum))
}

pointKey, _ = pointIter.Last()
rangeDel = nil
if rangeDelIter != nil {
rangeDel = rangeDelIter.Last()
}

// Check that the upper bound is tight.
if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.LargestPointKey.UserKey) != 0) &&
(pointKey == nil || d.cmp(pointKey.UserKey, m.LargestPointKey.UserKey) != 0) {
panic(errors.Newf("pebble: virtual sstable %s upper point key bound is not tight", m.FileNum))
}

// Check that iterator keys are within bounds.
for key, _ := pointIter.First(); key != nil; key, _ = pointIter.Next() {
if d.cmp(key.UserKey, m.SmallestPointKey.UserKey) < 0 || d.cmp(key.UserKey, m.LargestPointKey.UserKey) > 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.UserKey))
}
}

if rangeDelIter != nil {
for key := rangeDelIter.First(); key != nil; key = rangeDelIter.Next() {
if d.cmp(key.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey))
}

if d.cmp(key.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey))
}
}
}
}

if !m.HasRangeKeys {
return
}

spanIterOpts := keyspan.SpanIterOptions{}
if iterOpts != nil {
spanIterOpts.Level = iterOpts.level
}
rangeKeyIter, err := d.tableNewRangeKeyIter(m, spanIterOpts)
defer rangeKeyIter.Close()

if err != nil {
panic(errors.Wrap(err, "pebble: error creating range key iterator"))
}

// Check that the lower bound is tight.
if d.cmp(rangeKeyIter.First().SmallestKey().UserKey, m.SmallestRangeKey.UserKey) != 0 {
panic(errors.Newf("pebble: virtual sstable %s lower range key bound is not tight", m.FileNum))
}

// Check that upper bound is tight.
if d.cmp(rangeKeyIter.Last().LargestKey().UserKey, m.LargestRangeKey.UserKey) != 0 {
panic(errors.Newf("pebble: virtual sstable %s upper range key bound is not tight", m.FileNum))
}

for key := rangeKeyIter.First(); key != nil; key = rangeKeyIter.Next() {
if d.cmp(key.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) < 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey))
}
if d.cmp(key.LargestKey().UserKey, m.LargestRangeKey.UserKey) > 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey))
}
}
}
8 changes: 8 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,10 @@ func (d *DB) ingest(
if err := ingestLink(jobID, d.opts, d.objProvider, loadResult, shared); err != nil {
return IngestOperationStats{}, err
}

for i, sharedMeta := range loadResult.sharedMeta {
d.checkVirtualBounds(sharedMeta, &IterOptions{level: manifest.Level(loadResult.sharedLevels[i])})
}
// Make the new tables durable. We need to do this at some point before we
// update the MANIFEST (via logAndApply), otherwise a crash can have the
// tables referenced in the MANIFEST, but not present in the provider.
Expand Down Expand Up @@ -1490,6 +1494,8 @@ func (d *DB) excise(
if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
return nil, err
}
leftFile.ValidateVirtual(m)
d.checkVirtualBounds(leftFile, &IterOptions{level: manifest.Level(level)} /* iterOptions */)
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile})
ve.CreatedBackingTables = append(ve.CreatedBackingTables, leftFile.FileBacking)
backingTableCreated = true
Expand Down Expand Up @@ -1597,6 +1603,8 @@ func (d *DB) excise(
// for it here.
rightFile.Size = 1
}
rightFile.ValidateVirtual(m)
d.checkVirtualBounds(rightFile, &IterOptions{level: manifest.Level(level)} /* iterOptions */)
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile})
if !backingTableCreated {
ve.CreatedBackingTables = append(ve.CreatedBackingTables, rightFile.FileBacking)
Expand Down
2 changes: 2 additions & 0 deletions table_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,9 @@ func TestVirtualReadsWiring(t *testing.T) {
v2.SmallestPointKey = v2.Smallest

v1.ValidateVirtual(parentFile)
d.checkVirtualBounds(v1, nil /* iterOptions */)
v2.ValidateVirtual(parentFile)
d.checkVirtualBounds(v2, nil /* iterOptions */)

// Write the version edit.
fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics {
Expand Down
4 changes: 4 additions & 0 deletions version_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func TestLatestRefCounting(t *testing.T) {
m2.SmallestPointKey = m2.Smallest

m1.ValidateVirtual(f)
d.checkVirtualBounds(m1, nil /* iterOptions */)
m2.ValidateVirtual(f)
d.checkVirtualBounds(m2, nil /* iterOptions */)

fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics {
metrics := newFileMetrics(ve.NewFiles)
Expand Down Expand Up @@ -282,7 +284,9 @@ func TestVirtualSSTableManifestReplay(t *testing.T) {
m2.Stats.NumEntries = 1

m1.ValidateVirtual(f)
d.checkVirtualBounds(m1, nil /* iterOptions */)
m2.ValidateVirtual(f)
d.checkVirtualBounds(m2, nil /* iterOptions */)

fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics {
metrics := newFileMetrics(ve.NewFiles)
Expand Down
Loading