Skip to content

Commit

Permalink
snapshot: Prevent indefinite wait on compact sync.Cond in EFOS
Browse files Browse the repository at this point in the history
Previously, the code inside maybeScheduleDelayedFlush could
rotate the memtable by calling `makeRoomForWrite(nil)` but
then end up not calling `maybeScheduleFlush`, resulting in
an indefinite wait on the condition variable for compactions/flushes
in `WaitForFileOnly` if no other writes were happening to schedule
the flush. This change addresses that by adding the missing call
to `maybeScheduleFlush`.

Also update the case where we call maybeScheduleFlush() in
WaitforFileOnlySnapshot() to mark the right memtable as
flushForced=true to prevent us from needlessly waiting
on the condition lock.

Also add a context.Context in WaitForFileOnly() to allow callers
to use context cancellation to avoid busy-waiting if necessary.
  • Loading branch information
itsbilal committed Aug 24, 2023
1 parent 8638640 commit 9bb0864
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 31 deletions.
2 changes: 1 addition & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1827,8 +1827,8 @@ func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
d.makeRoomForWrite(nil)
} else {
mem.flushForced = true
d.maybeScheduleFlush()
}
d.maybeScheduleFlush()
}
}()
}
Expand Down
4 changes: 2 additions & 2 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ func TestIngestShared(t *testing.T) {
panic("insufficient args for file-only-snapshot command")
}
name := td.CmdArgs[0].Key
err := efos[name].WaitForFileOnlySnapshot(1 * time.Millisecond)
err := efos[name].WaitForFileOnlySnapshot(context.TODO(), 1*time.Millisecond)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -1519,7 +1519,7 @@ func TestConcurrentExcise(t *testing.T) {
panic("insufficient args for file-only-snapshot command")
}
name := td.CmdArgs[0].Key
err := efos[name].WaitForFileOnlySnapshot(1 * time.Millisecond)
err := efos[name].WaitForFileOnlySnapshot(context.TODO(), 1*time.Millisecond)
if err != nil {
return err.Error()
}
Expand Down
2 changes: 1 addition & 1 deletion scan_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestScanInternal(t *testing.T) {
}
name := td.CmdArgs[0].Key
es := efos[name]
if err := es.WaitForFileOnlySnapshot(1 * time.Millisecond); err != nil {
if err := es.WaitForFileOnlySnapshot(context.TODO(), 1*time.Millisecond); err != nil {
return err.Error()
}
return "ok"
Expand Down
77 changes: 50 additions & 27 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/rangekey"
)

Expand Down Expand Up @@ -237,10 +238,6 @@ type EventuallyFileOnlySnapshot struct {
// grabbed _before_ grabbing this one.
sync.Mutex

// transitioned is signalled when this EFOS transitions to being a file-only
// snapshot.
transitioned sync.Cond

// Either the {snap,readState} fields are set below, or the version is set at
// any given point of time. If a snapshot is referenced, this is not a
// file-only snapshot yet, and if a version is set (and ref'd) this is a
Expand Down Expand Up @@ -289,7 +286,6 @@ func (d *DB) makeEventuallyFileOnlySnapshot(
protectedRanges: keyRanges,
closed: make(chan struct{}),
}
es.mu.transitioned.L = &es.mu
if isFileOnly {
es.mu.vers = d.mu.versions.currentVersion()
es.mu.vers.Ref()
Expand Down Expand Up @@ -335,7 +331,6 @@ func (es *EventuallyFileOnlySnapshot) transitionToFileOnlySnapshot(vers *version
oldReadState := es.mu.readState
es.mu.snap = nil
es.mu.readState = nil
es.mu.transitioned.Broadcast()
es.mu.Unlock()
// It's okay to close a snapshot even if iterators are already open on it.
oldReadState.unrefLocked()
Expand All @@ -360,53 +355,81 @@ func (es *EventuallyFileOnlySnapshot) releaseReadState() {
}
}

// WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot
// has been converted into a file-only snapshot (i.e. all memtables containing
// keys < seqNum are flushed). A duration can be passed in, and if nonzero,
// a delayed flush will be scheduled at that duration if necessary.
//
// Idempotent; can be called multiple times with no side effects.
func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(dur time.Duration) error {
// hasTransitioned returns true if this EFOS has transitioned to a file-only
// snapshot.
func (es *EventuallyFileOnlySnapshot) hasTransitioned() bool {
es.mu.Lock()
if es.mu.vers != nil {
// Fast path.
es.mu.Unlock()
return nil
}
es.mu.Unlock()
defer es.mu.Unlock()
return es.mu.vers != nil
}

// waitForFlush waits for a flush on any memtables that need to be flushed
// before this EFOS can transition to a file-only snapshot. If this EFOS is
// waiting on a flush of the mutable memtable, it forces a rotation within
// `dur` duration. For immutable memtables, it schedules a flush and waits for
// it to finish.
func (es *EventuallyFileOnlySnapshot) waitForFlush(ctx context.Context, dur time.Duration) error {
es.db.mu.Lock()
defer es.db.mu.Unlock()

earliestUnflushedSeqNum := es.db.getEarliestUnflushedSeqNumLocked()
for earliestUnflushedSeqNum < es.seqNum {
select {
case <-es.closed:
es.db.mu.Unlock()
return ErrClosed
case <-ctx.Done():
return ctx.Err()
default:
}
// Check if the current mutable memtable contains keys less than seqNum.
// If so, rotate it.
if es.db.mu.mem.mutable.logSeqNum < es.seqNum && dur.Nanoseconds() > 0 {
es.db.maybeScheduleDelayedFlush(es.db.mu.mem.mutable, dur)
} else {
// Find the last memtable that contains seqNums less than es.seqNum,
// and force a flush on it.
var mem *flushableEntry
for i := range es.db.mu.mem.queue {
if es.db.mu.mem.queue[i].logSeqNum < es.seqNum {
mem = es.db.mu.mem.queue[i]
}
}
mem.flushForced = true
es.db.maybeScheduleFlush()
}
es.db.mu.compact.cond.Wait()

earliestUnflushedSeqNum = es.db.getEarliestUnflushedSeqNumLocked()
}
if es.excised.Load() {
es.db.mu.Unlock()
return ErrSnapshotExcised
}
es.db.mu.Unlock()
return nil
}

es.mu.Lock()
defer es.mu.Unlock()
// WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot
// has been converted into a file-only snapshot (i.e. all memtables containing
// keys < seqNum are flushed). A duration can be passed in, and if nonzero,
// a delayed flush will be scheduled at that duration if necessary.
//
// Idempotent; can be called multiple times with no side effects.
func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(
ctx context.Context, dur time.Duration,
) error {
if es.hasTransitioned() {
return nil
}

if err := es.waitForFlush(ctx, dur); err != nil {
return err
}

// Wait for transition to file-only snapshot.
if es.mu.vers == nil {
es.mu.transitioned.Wait()
if invariants.Enabled {
// Since we aren't returning an error, we _must_ have transitioned to a
// file-only snapshot by now.
if !es.hasTransitioned() {
panic("expected EFOS to have transitioned to file-only snapshot after flush")
}
}
return nil
}
Expand Down

0 comments on commit 9bb0864

Please sign in to comment.