From 9bb0864bdb986aaea84042ab101f5d628f759033 Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Tue, 22 Aug 2023 13:47:00 -0400 Subject: [PATCH] snapshot: Prevent indefinite wait on compact sync.Cond in EFOS Previously, the code inside maybeScheduleDelayedFlush could rotate the memtable by calling `makeRoomForWrite(nil)` but then end up not calling `maybeScheduleFlush`, resulting in an indefinite wait on the condition variable for compactions/flushes in `WaitForFileOnly` if no other writes were happening to schedule the flush. This change addresses that by adding the missing call to `maybeScheduleFlush`. Also update the case where we call maybeScheduleFlush() in WaitforFileOnlySnapshot() to mark the right memtable as flushForced=true to prevent us from needlessly waiting on the condition lock. Also add a context.Context in WaitForFileOnly() to allow callers to use context cancellation to avoid busy-waiting if necessary. --- compaction.go | 2 +- ingest_test.go | 4 +-- scan_internal_test.go | 2 +- snapshot.go | 77 ++++++++++++++++++++++++++++--------------- 4 files changed, 54 insertions(+), 31 deletions(-) diff --git a/compaction.go b/compaction.go index 66a69efb32..5959517c02 100644 --- a/compaction.go +++ b/compaction.go @@ -1827,8 +1827,8 @@ func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) { d.makeRoomForWrite(nil) } else { mem.flushForced = true - d.maybeScheduleFlush() } + d.maybeScheduleFlush() } }() } diff --git a/ingest_test.go b/ingest_test.go index ff59b62bd0..c7574abe0f 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -1047,7 +1047,7 @@ func TestIngestShared(t *testing.T) { panic("insufficient args for file-only-snapshot command") } name := td.CmdArgs[0].Key - err := efos[name].WaitForFileOnlySnapshot(1 * time.Millisecond) + err := efos[name].WaitForFileOnlySnapshot(context.TODO(), 1*time.Millisecond) if err != nil { return err.Error() } @@ -1519,7 +1519,7 @@ func TestConcurrentExcise(t *testing.T) { panic("insufficient args for file-only-snapshot command") } name := td.CmdArgs[0].Key - err := efos[name].WaitForFileOnlySnapshot(1 * time.Millisecond) + err := efos[name].WaitForFileOnlySnapshot(context.TODO(), 1*time.Millisecond) if err != nil { return err.Error() } diff --git a/scan_internal_test.go b/scan_internal_test.go index e47994d171..b6f35f2e6e 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -351,7 +351,7 @@ func TestScanInternal(t *testing.T) { } name := td.CmdArgs[0].Key es := efos[name] - if err := es.WaitForFileOnlySnapshot(1 * time.Millisecond); err != nil { + if err := es.WaitForFileOnlySnapshot(context.TODO(), 1*time.Millisecond); err != nil { return err.Error() } return "ok" diff --git a/snapshot.go b/snapshot.go index 8f5e176d13..0a5513f287 100644 --- a/snapshot.go +++ b/snapshot.go @@ -13,6 +13,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/rangekey" ) @@ -237,10 +238,6 @@ type EventuallyFileOnlySnapshot struct { // grabbed _before_ grabbing this one. sync.Mutex - // transitioned is signalled when this EFOS transitions to being a file-only - // snapshot. - transitioned sync.Cond - // Either the {snap,readState} fields are set below, or the version is set at // any given point of time. If a snapshot is referenced, this is not a // file-only snapshot yet, and if a version is set (and ref'd) this is a @@ -289,7 +286,6 @@ func (d *DB) makeEventuallyFileOnlySnapshot( protectedRanges: keyRanges, closed: make(chan struct{}), } - es.mu.transitioned.L = &es.mu if isFileOnly { es.mu.vers = d.mu.versions.currentVersion() es.mu.vers.Ref() @@ -335,7 +331,6 @@ func (es *EventuallyFileOnlySnapshot) transitionToFileOnlySnapshot(vers *version oldReadState := es.mu.readState es.mu.snap = nil es.mu.readState = nil - es.mu.transitioned.Broadcast() es.mu.Unlock() // It's okay to close a snapshot even if iterators are already open on it. oldReadState.unrefLocked() @@ -360,28 +355,30 @@ func (es *EventuallyFileOnlySnapshot) releaseReadState() { } } -// WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot -// has been converted into a file-only snapshot (i.e. all memtables containing -// keys < seqNum are flushed). A duration can be passed in, and if nonzero, -// a delayed flush will be scheduled at that duration if necessary. -// -// Idempotent; can be called multiple times with no side effects. -func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(dur time.Duration) error { +// hasTransitioned returns true if this EFOS has transitioned to a file-only +// snapshot. +func (es *EventuallyFileOnlySnapshot) hasTransitioned() bool { es.mu.Lock() - if es.mu.vers != nil { - // Fast path. - es.mu.Unlock() - return nil - } - es.mu.Unlock() + defer es.mu.Unlock() + return es.mu.vers != nil +} +// waitForFlush waits for a flush on any memtables that need to be flushed +// before this EFOS can transition to a file-only snapshot. If this EFOS is +// waiting on a flush of the mutable memtable, it forces a rotation within +// `dur` duration. For immutable memtables, it schedules a flush and waits for +// it to finish. +func (es *EventuallyFileOnlySnapshot) waitForFlush(ctx context.Context, dur time.Duration) error { es.db.mu.Lock() + defer es.db.mu.Unlock() + earliestUnflushedSeqNum := es.db.getEarliestUnflushedSeqNumLocked() for earliestUnflushedSeqNum < es.seqNum { select { case <-es.closed: - es.db.mu.Unlock() return ErrClosed + case <-ctx.Done(): + return ctx.Err() default: } // Check if the current mutable memtable contains keys less than seqNum. @@ -389,6 +386,15 @@ func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(dur time.Duration) if es.db.mu.mem.mutable.logSeqNum < es.seqNum && dur.Nanoseconds() > 0 { es.db.maybeScheduleDelayedFlush(es.db.mu.mem.mutable, dur) } else { + // Find the last memtable that contains seqNums less than es.seqNum, + // and force a flush on it. + var mem *flushableEntry + for i := range es.db.mu.mem.queue { + if es.db.mu.mem.queue[i].logSeqNum < es.seqNum { + mem = es.db.mu.mem.queue[i] + } + } + mem.flushForced = true es.db.maybeScheduleFlush() } es.db.mu.compact.cond.Wait() @@ -396,17 +402,34 @@ func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(dur time.Duration) earliestUnflushedSeqNum = es.db.getEarliestUnflushedSeqNumLocked() } if es.excised.Load() { - es.db.mu.Unlock() return ErrSnapshotExcised } - es.db.mu.Unlock() + return nil +} - es.mu.Lock() - defer es.mu.Unlock() +// WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot +// has been converted into a file-only snapshot (i.e. all memtables containing +// keys < seqNum are flushed). A duration can be passed in, and if nonzero, +// a delayed flush will be scheduled at that duration if necessary. +// +// Idempotent; can be called multiple times with no side effects. +func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot( + ctx context.Context, dur time.Duration, +) error { + if es.hasTransitioned() { + return nil + } + + if err := es.waitForFlush(ctx, dur); err != nil { + return err + } - // Wait for transition to file-only snapshot. - if es.mu.vers == nil { - es.mu.transitioned.Wait() + if invariants.Enabled { + // Since we aren't returning an error, we _must_ have transitioned to a + // file-only snapshot by now. + if !es.hasTransitioned() { + panic("expected EFOS to have transitioned to file-only snapshot after flush") + } } return nil }