Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: refactor ingest overlap checking #3580

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,7 +1245,6 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
c.version = d.mu.versions.currentVersion()

baseLevel := d.mu.versions.picker.getBaseLevel()
iterOpts := IterOptions{logger: d.opts.Logger}
ve := &versionEdit{}
var ingestSplitFiles []ingestSplitFile
ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
Expand All @@ -1272,6 +1271,19 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
}

ctx := context.Background()
overlapChecker := &overlapChecker{
comparer: d.opts.Comparer,
newIters: d.newIters,
opts: IterOptions{
logger: d.opts.Logger,
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
},
v: c.version,
}
replacedFiles := make(map[base.FileNum][]newFileEntry)
for _, file := range ingestFlushable.files {
var fileToSplit *fileMetadata
Expand All @@ -1284,11 +1296,8 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
level = 6
} else {
var err error
level, fileToSplit, err = ingestTargetLevel(
d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer,
c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
suggestSplit,
)
level, fileToSplit, err = ingestTargetLevel(ctx, overlapChecker,
baseLevel, d.mu.compact.inProgress, file.FileMetadata, suggestSplit)
if err != nil {
return nil, err
}
Expand Down
35 changes: 31 additions & 4 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,7 @@ func (s *ingestedFlushable) computePossibleOverlaps(
fn func(bounded) shouldContinue, bounded ...bounded,
) {
for _, b := range bounded {
bounds := b.UserKeyBounds()
if s.anyFileOverlaps(b, &bounds) {
if s.anyFileOverlaps(b.UserKeyBounds()) {
// Some file overlaps in key boundaries. The file doesn't necessarily
// contain any keys within the key range, but we would need to perform I/O
// to know for sure. The flushable interface dictates that we're not
Expand All @@ -316,7 +315,7 @@ func (s *ingestedFlushable) computePossibleOverlaps(

// anyFileBoundsOverlap returns true if there is at least a file in s.files with
// bounds that overlap the given bounds.
func (s *ingestedFlushable) anyFileOverlaps(b bounded, bounds *base.UserKeyBounds) bool {
func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool {
// Note that s.files are non-overlapping and sorted.
for _, f := range s.files {
fileBounds := f.UserKeyBounds()
Expand Down Expand Up @@ -350,7 +349,11 @@ func computePossibleOverlapsGenericImpl[F flushable](
rangeDelIter := f.newRangeDelIter(nil)
rkeyIter := f.newRangeKeyIter(nil)
for _, b := range bounded {
if overlapWithIterator(iter, &rangeDelIter, rkeyIter, b.UserKeyBounds(), cmp) {
overlap, err := determineOverlapAllIters(cmp, b.UserKeyBounds(), iter, rangeDelIter, rkeyIter)
if invariants.Enabled && err != nil {
panic(errors.AssertionFailedf("expected iterator to be infallible: %v", err))
}
if overlap {
if !fn(b) {
break
}
Expand All @@ -367,3 +370,27 @@ func computePossibleOverlapsGenericImpl[F flushable](
}
}
}

// determineOverlapAllIters checks for overlap in a point iterator, range
// deletion iterator and range key iterator.
func determineOverlapAllIters(
cmp base.Compare,
bounds base.UserKeyBounds,
pointIter base.InternalIterator,
rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
) (bool, error) {
if pointIter != nil {
if pointOverlap, err := determineOverlapPointIterator(cmp, bounds, pointIter); pointOverlap || err != nil {
return pointOverlap, err
}
}
if rangeDelIter != nil {
if rangeDelOverlap, err := determineOverlapKeyspanIterator(cmp, bounds, rangeDelIter); rangeDelOverlap || err != nil {
return rangeDelOverlap, err
}
}
if rangeKeyIter != nil {
return determineOverlapKeyspanIterator(cmp, bounds, rangeKeyIter)
}
return false, nil
}
179 changes: 38 additions & 141 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/private"
"github.com/cockroachdb/pebble/objstorage"
Expand Down Expand Up @@ -888,92 +886,13 @@ func ingestUpdateSeqNum(
return nil
}

// overlapWIthIterator returns true if the given iterators produce keys or spans
// overlapping the given UserKeyBounds. May return false positives (e.g. in
// error cases).
func overlapWithIterator(
iter internalIterator,
rangeDelIter *keyspan.FragmentIterator,
rkeyIter keyspan.FragmentIterator,
bounds base.UserKeyBounds,
cmp Compare,
) bool {
// Check overlap with point operations.
//
// When using levelIter, it seeks to the SST whose boundaries
// contain keyRange.smallest.UserKey(S).
// It then tries to find a point in that SST that is >= S.
// If there's no such point it means the SST ends in a tombstone in which case
// levelIter.SeekGE generates a boundary range del sentinel.
// The comparison of this boundary with keyRange.largest(L) below
// is subtle but maintains correctness.
// 1) boundary < L,
// since boundary is also > S (initial seek),
// whatever the boundary's start key may be, we're always overlapping.
// 2) boundary > L,
// overlap with boundary cannot be determined since we don't know boundary's start key.
// We require checking for overlap with rangeDelIter.
// 3) boundary == L and L is not sentinel,
// means boundary < L and hence is similar to 1).
// 4) boundary == L and L is sentinel,
// we'll always overlap since for any values of i,j ranges [i, k) and [j, k) always overlap.
kv := iter.SeekGE(bounds.Start, base.SeekGEFlagsNone)
if kv != nil {
if bounds.End.IsUpperBoundForInternalKey(cmp, kv.K) {
return true
}
}
// Assume overlap if iterator errored.
if err := iter.Error(); err != nil {
return true
}

computeOverlapWithSpans := func(rIter keyspan.FragmentIterator) (bool, error) {
// NB: The spans surfaced by the fragment iterator are non-overlapping.
span, err := rIter.SeekGE(bounds.Start)
if err != nil {
return false, err
}
for ; span != nil; span, err = rIter.Next() {
if !bounds.End.IsUpperBoundFor(cmp, span.Start) {
// The span starts after our bounds.
return false, nil
}
if !span.Empty() {
return true, nil
}
}
return false, err
}

// rkeyIter is either a range key level iter, or a range key iterator
// over a single file.
if rkeyIter != nil {
// If an error occurs, assume overlap.
if overlap, err := computeOverlapWithSpans(rkeyIter); overlap || err != nil {
return true
}
}

// Check overlap with range deletions.
if rangeDelIter == nil || *rangeDelIter == nil {
return false
}
overlap, err := computeOverlapWithSpans(*rangeDelIter)
// If an error occurs, assume overlap.
return overlap || err != nil
}

// ingestTargetLevel returns the target level for a file being ingested.
// If suggestSplit is true, it accounts for ingest-time splitting as part of
// its target level calculation, and if a split candidate is found, that file
// is returned as the splitFile.
func ingestTargetLevel(
newIters tableNewIters,
newRangeKeyIter keyspanimpl.TableNewSpanIter,
iterOps IterOptions,
comparer *Comparer,
v *version,
ctx context.Context,
overlapChecker *overlapChecker,
baseLevel int,
compactions map[*compaction]struct{},
meta *fileMetadata,
Expand Down Expand Up @@ -1047,68 +966,29 @@ func ingestTargetLevel(

// This assertion implicitly checks that we have the current version of
// the metadata.
if v.L0Sublevels == nil {
if overlapChecker.v.L0Sublevels == nil {
return 0, nil, base.AssertionFailedf("could not read L0 sublevels")
}
iterOps.CategoryAndQoS = sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
}
// Check for overlap over the keys of L0 by iterating over the sublevels.
for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ {
iter := newLevelIter(context.Background(),
iterOps, comparer, newIters, v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), internalIterOpts{})

var rangeDelIter keyspan.FragmentIterator
// Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
// sets it up for the target file.
iter.initRangeDel(&rangeDelIter)
bounds := meta.UserKeyBounds()

levelIter := keyspanimpl.LevelIter{}
levelIter.Init(
keyspan.SpanIterOptions{}, comparer.Compare, newRangeKeyIter,
v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), manifest.KeyTypeRange,
)

overlap := overlapWithIterator(iter, &rangeDelIter, &levelIter, meta.UserKeyBounds(), comparer.Compare)
err := iter.Close() // Closes range del iter as well.
err = firstError(err, levelIter.Close())
if err != nil {
return 0, nil, err
}
if overlap {
return targetLevel, nil, nil
}
// Check for overlap over the keys of L0.
if overlap, err := overlapChecker.DetermineAnyDataOverlapInLevel(ctx, bounds, 0); err != nil {
return 0, nil, err
} else if overlap {
return 0, nil, nil
}

level := baseLevel
for ; level < numLevels; level++ {
levelIter := newLevelIter(context.Background(),
iterOps, comparer, newIters, v.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
var rangeDelIter keyspan.FragmentIterator
// Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
// sets it up for the target file.
levelIter.initRangeDel(&rangeDelIter)

rkeyLevelIter := &keyspanimpl.LevelIter{}
rkeyLevelIter.Init(
keyspan.SpanIterOptions{}, comparer.Compare, newRangeKeyIter,
v.Levels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange,
)

overlap := overlapWithIterator(levelIter, &rangeDelIter, rkeyLevelIter, meta.UserKeyBounds(), comparer.Compare)
err := levelIter.Close() // Closes range del iter as well.
err = firstError(err, rkeyLevelIter.Close())
for level := baseLevel; level < numLevels; level++ {
dataOverlap, err := overlapChecker.DetermineAnyDataOverlapInLevel(ctx, bounds, level)
if err != nil {
return 0, nil, err
}
if overlap {
} else if dataOverlap {
return targetLevel, splitFile, nil
}

// Check boundary overlap.
var candidateSplitFile *fileMetadata
boundaryOverlaps := v.Overlaps(level, meta.UserKeyBounds())
boundaryOverlaps := overlapChecker.v.Overlaps(level, bounds)
if !boundaryOverlaps.Empty() {
// We are already guaranteed to not have any data overlaps with files
// in boundaryOverlaps, otherwise we'd have returned in the above if
Expand Down Expand Up @@ -1145,8 +1025,8 @@ func ingestTargetLevel(
if c.outputLevel == nil || level != c.outputLevel.level {
continue
}
if comparer.Compare(meta.Smallest.UserKey, c.largest.UserKey) <= 0 &&
comparer.Compare(meta.Largest.UserKey, c.smallest.UserKey) >= 0 {
if overlapChecker.comparer.Compare(meta.Smallest.UserKey, c.largest.UserKey) <= 0 &&
overlapChecker.comparer.Compare(meta.Largest.UserKey, c.smallest.UserKey) >= 0 {
overlaps = true
break
}
Expand Down Expand Up @@ -1513,6 +1393,7 @@ func (d *DB) ingest(
}
}
}
ctx := context.Background()
// Allocate file numbers for all of the files being ingested and mark them as
// pending in order to prevent them from being deleted. Note that this causes
// the file number ordering to be out of alignment with sequence number
Expand Down Expand Up @@ -1787,7 +1668,7 @@ func (d *DB) ingest(

// Assign the sstables to the correct level in the LSM and apply the
// version edit.
ve, err = d.ingestApply(jobID, loadResult, mut, exciseSpan, seqNum)
ve, err = d.ingestApply(ctx, jobID, loadResult, mut, exciseSpan, seqNum)
}

// Only one ingest can occur at a time because if not, one would block waiting
Expand Down Expand Up @@ -2268,7 +2149,12 @@ func (d *DB) ingestSplit(
}

func (d *DB) ingestApply(
jobID JobID, lr ingestLoadResult, mut *memTable, exciseSpan KeyRange, exciseSeqNum uint64,
ctx context.Context,
jobID JobID,
lr ingestLoadResult,
mut *memTable,
exciseSpan KeyRange,
exciseSeqNum uint64,
) (*versionEdit, error) {
d.mu.Lock()
defer d.mu.Unlock()
Expand Down Expand Up @@ -2301,11 +2187,22 @@ func (d *DB) ingestApply(
}
}

current := d.mu.versions.currentVersion()
overlapChecker := &overlapChecker{
comparer: d.opts.Comparer,
newIters: d.newIters,
opts: IterOptions{
logger: d.opts.Logger,
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
},
v: current,
}
shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
current := d.mu.versions.currentVersion()
baseLevel := d.mu.versions.picker.getBaseLevel()
iterOps := IterOptions{logger: d.opts.Logger}
// filesToSplit is a list where each element is a pair consisting of a file
// being ingested and a file being split to make room for an ingestion into
// that level. Each ingested file will appear at most once in this list. It
Expand Down Expand Up @@ -2370,8 +2267,8 @@ func (d *DB) ingestApply(
// not see any version applications while we're at this. The one
// complication here would be pulling out the mu.compact.inProgress
// check from ingestTargetLevel, as that requires d.mu to be held.
f.Level, splitFile, err = ingestTargetLevel(
d.newIters, d.tableNewRangeKeyIter, iterOps, d.opts.Comparer, current, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit)
f.Level, splitFile, err = ingestTargetLevel(ctx, overlapChecker,
baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit)
}

if splitFile != nil {
Expand Down
18 changes: 14 additions & 4 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2277,10 +2277,20 @@ func TestIngestTargetLevel(t *testing.T) {
for _, target := range strings.Split(td.Input, "\n") {
meta, err := manifest.ParseFileMetadataDebug(target)
require.NoError(t, err)
level, overlapFile, err := ingestTargetLevel(
d.newIters, d.tableNewRangeKeyIter, IterOptions{logger: d.opts.Logger},
d.opts.Comparer, d.mu.versions.currentVersion(), 1, d.mu.compact.inProgress, meta,
suggestSplit)
overlapChecker := &overlapChecker{
comparer: d.opts.Comparer,
newIters: d.newIters,
opts: IterOptions{
logger: d.opts.Logger,
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
},
v: d.mu.versions.currentVersion(),
}
level, overlapFile, err := ingestTargetLevel(context.Background(), overlapChecker,
1, d.mu.compact.inProgress, meta, suggestSplit)
if err != nil {
return err.Error()
}
Expand Down
Loading
Loading