From 5db333dfa8da64363677c514d520a667d52e285d Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Thu, 20 Jul 2023 17:24:29 -0400 Subject: [PATCH] record: allow unbounded queued blocks in LogWriter Previously, a LogWriter would allocate up to 16 blocks of 32 KiB for buffering WAL writes. If all 16 blocks had been allocated and no free blocks were available, a batch writing to the WAL would queue until the flushing goroutine freed blocks. In testing of write-heavy workloads, especially with larger value sizes, we've seen queueing at the LogWriter. This queueing blocks the commit pipeline, preventing any batches from committing regardless of priority and whether they require waiting for fsync. This commit modifies LogWriter to allow the queueing of an unbounded number of blocks. In practice, for the current WAL, the memtable size serves as an upper bound. With a 64 MiB memtable, at most 64 MiB / 32 KiB = 2,048 blocks may queue. This is not an unreasonable of additional memory overhead for a write-heavy workload. Beyond improving throughput for write-heavy workloads, removing this hard bound improves tolerance of momentary fsync stalls. Informs cockroachdb/cockroach#88699. --- batch.go | 3 -- batch_test.go | 32 ---------------- commit_test.go | 4 +- db.go | 4 +- record/log_writer.go | 81 +++++++++++---------------------------- record/log_writer_test.go | 16 ++++---- 6 files changed, 35 insertions(+), 105 deletions(-) diff --git a/batch.go b/batch.go index 20dd566003..f08e3ce659 100644 --- a/batch.go +++ b/batch.go @@ -318,9 +318,6 @@ type BatchCommitStats struct { // SemaphoreWaitDuration is the wait time for semaphores in // commitPipeline.Commit. SemaphoreWaitDuration time.Duration - // WALQueueWaitDuration is the wait time for allocating memory blocks in the - // LogWriter (due to the LogWriter not writing fast enough). - WALQueueWaitDuration time.Duration // MemTableWriteStallDuration is the wait caused by a write stall due to too // many memtables (due to not flushing fast enough). MemTableWriteStallDuration time.Duration diff --git a/batch_test.go b/batch_test.go index 5a0d73dc7d..c0b1fe0030 100644 --- a/batch_test.go +++ b/batch_test.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/pebble/internal/batchskl" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/testkeys" - "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -1233,28 +1232,6 @@ func TestBatchCommitStats(t *testing.T) { } } - // WAL queue stall funcs. - // - // The LogWriter gets changed when stalling/unstalling the memtable, so we - // need to use a hook to tell us about the latest LogWriter. - var unstallWALQueue func() - stallWALQueue := func() { - var unstallLatestWALQueue func() - db.mu.Lock() - defer db.mu.Unlock() - db.mu.log.registerLogWriterForTesting = func(w *record.LogWriter) { - // db.mu will be held when this is called. - unstallLatestWALQueue = w.ReserveAllFreeBlocksForTesting() - } - db.mu.log.registerLogWriterForTesting(db.mu.log.LogWriter) - unstallWALQueue = func() { - db.mu.Lock() - defer db.mu.Unlock() - db.mu.log.registerLogWriterForTesting = nil - unstallLatestWALQueue() - } - } - // Commit wait stall funcs. var unstallCommitWait func() stallCommitWait := func() { @@ -1268,12 +1245,9 @@ func TestBatchCommitStats(t *testing.T) { stallCommitSemaphore() stallMemtable() stallL0ReadAmp() - stallWALQueue() stallCommitWait() // Exceed initialMemTableSize -- this is needed to make stallMemtable work. - // It also exceeds record.blockSize, requiring a new block to be allocated, - // which is what we need for stallWALQueue to work. require.NoError(t, b.Set(make([]byte, initialMemTableSize), nil, nil)) var commitWG sync.WaitGroup @@ -1291,8 +1265,6 @@ func TestBatchCommitStats(t *testing.T) { time.Sleep(sleepDuration) unstallL0ReadAmp() time.Sleep(sleepDuration) - unstallWALQueue() - time.Sleep(sleepDuration) unstallCommitWait() // Wait for Apply to return. @@ -1303,10 +1275,6 @@ func TestBatchCommitStats(t *testing.T) { return errors.Errorf("SemaphoreWaitDuration %s is too low", stats.SemaphoreWaitDuration.String()) } - if expectedDuration > stats.WALQueueWaitDuration { - return errors.Errorf("WALQueueWaitDuration %s is too low", - stats.WALQueueWaitDuration.String()) - } if expectedDuration > stats.MemTableWriteStallDuration { return errors.Errorf("MemTableWriteStallDuration %s is too low", stats.MemTableWriteStallDuration.String()) diff --git a/commit_test.go b/commit_test.go index 0e1ad4ac3c..ee627a113f 100644 --- a/commit_test.go +++ b/commit_test.go @@ -245,7 +245,7 @@ func TestCommitPipelineWALClose(t *testing.T) { return nil }, write: func(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) { - _, _, err := wal.SyncRecord(b.data, syncWG, syncErr) + _, err := wal.SyncRecord(b.data, syncWG, syncErr) return nil, err }, } @@ -316,7 +316,7 @@ func BenchmarkCommitPipeline(b *testing.B) { break } - _, _, err := wal.SyncRecord(b.data, syncWG, syncErr) + _, err := wal.SyncRecord(b.data, syncWG, syncErr) return mem, err }, } diff --git a/db.go b/db.go index 4b4a6aa3c4..dd9c4ed8fa 100644 --- a/db.go +++ b/db.go @@ -909,7 +909,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem b.flushable.setSeqNum(b.SeqNum()) if !d.opts.DisableWAL { var err error - size, b.commitStats.WALQueueWaitDuration, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) + size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) if err != nil { panic(err) } @@ -947,7 +947,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem } if b.flushable == nil { - size, b.commitStats.WALQueueWaitDuration, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) + size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) if err != nil { panic(err) } diff --git a/record/log_writer.go b/record/log_writer.go index 90f8c2ef34..7cc7a83063 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -266,10 +266,7 @@ type LogWriter struct { block *block free struct { sync.Mutex - // Condition variable used to signal a block is freed. - cond sync.Cond - blocks []*block - allocated int + blocks []*block } flusher struct { @@ -313,9 +310,10 @@ type LogWriterConfig struct { QueueSemChan chan struct{} } -// CapAllocatedBlocks is the maximum number of blocks allocated by the -// LogWriter. -const CapAllocatedBlocks = 16 +// initialAllocatedBlocksCap is the initial capacity of the various slices +// intended to hold LogWriter blocks. The LogWriter may allocate more blocks +// than this threshold allows. +const initialAllocatedBlocksCap = 32 // NewLogWriter returns a new LogWriter. func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterConfig) *LogWriter { @@ -335,9 +333,7 @@ func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterCon }, queueSemChan: logWriterConfig.QueueSemChan, } - r.free.cond.L = &r.free.Mutex - r.free.blocks = make([]*block, 0, CapAllocatedBlocks) - r.free.allocated = 1 + r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap) r.block = &block{} r.flusher.ready.init(&r.flusher.Mutex, &r.flusher.syncQ) r.flusher.closed = make(chan struct{}) @@ -405,8 +401,12 @@ func (w *LogWriter) flushLoop(context.Context) { // the flush work (w.block.written.Load()). // The list of full blocks that need to be written. This is copied from - // f.pending on every loop iteration, though the number of elements is small - // (usually 1, max 16). + // f.pending on every loop iteration, though the number of elements is + // usually small (most frequently 1). In the case of the WAL LogWriter, the + // number of blocks is bounded by the size of the WAL's corresponding + // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables, + // this works out to at most 2048 elements if the entirety of the memtable's + // contents are queued. pending := make([]*block, 0, cap(f.pending)) for { for { @@ -432,8 +432,7 @@ func (w *LogWriter) flushLoop(context.Context) { // Found work to do, so no longer idle. workStartTime := time.Now() idleDuration := workStartTime.Sub(idleStartTime) - pending = pending[:len(f.pending)] - copy(pending, f.pending) + pending = append(pending[:0], f.pending...) f.pending = f.pending[:0] f.metrics.PendingBufferLen.AddSample(int64(len(pending))) @@ -556,28 +555,18 @@ func (w *LogWriter) flushBlock(b *block) error { b.flushed = 0 w.free.Lock() w.free.blocks = append(w.free.blocks, b) - w.free.cond.Signal() w.free.Unlock() return nil } // queueBlock queues the current block for writing to the underlying writer, // allocates a new block and reserves space for the next header. -func (w *LogWriter) queueBlock() (waitDuration time.Duration) { +func (w *LogWriter) queueBlock() { // Allocate a new block, blocking until one is available. We do this first // because w.block is protected by w.flusher.Mutex. w.free.Lock() if len(w.free.blocks) == 0 { - if w.free.allocated < cap(w.free.blocks) { - w.free.allocated++ - w.free.blocks = append(w.free.blocks, &block{}) - } else { - now := time.Now() - for len(w.free.blocks) == 0 { - w.free.cond.Wait() - } - waitDuration = time.Since(now) - } + w.free.blocks = append(w.free.blocks, &block{}) } nextBlock := w.free.blocks[len(w.free.blocks)-1] w.free.blocks = w.free.blocks[:len(w.free.blocks)-1] @@ -592,28 +581,6 @@ func (w *LogWriter) queueBlock() (waitDuration time.Duration) { f.Unlock() w.blockNum++ - return waitDuration -} - -// ReserveAllFreeBlocksForTesting is used to only for testing. -func (w *LogWriter) ReserveAllFreeBlocksForTesting() (releaseFunc func()) { - w.free.Lock() - defer w.free.Unlock() - free := w.free.blocks - w.free.blocks = nil - return func() { - w.free.Lock() - defer w.free.Unlock() - // It is possible that someone has pushed a free block and w.free.blocks - // is no longer nil. That is harmless. Also, the waiter loops on the - // condition len(w.free.blocks) == 0, so to actually unblock it we need to - // give it a free block. - if len(free) == 0 { - free = append(free, &block{}) - } - w.free.blocks = free - w.free.cond.Broadcast() - } } // Close flushes and syncs any unwritten data and closes the writer. @@ -665,7 +632,7 @@ func (w *LogWriter) Close() error { // of the record. // External synchronisation provided by commitPipeline.mu. func (w *LogWriter) WriteRecord(p []byte) (int64, error) { - logSize, _, err := w.SyncRecord(p, nil, nil) + logSize, err := w.SyncRecord(p, nil, nil) return logSize, err } @@ -676,9 +643,9 @@ func (w *LogWriter) WriteRecord(p []byte) (int64, error) { // External synchronisation provided by commitPipeline.mu. func (w *LogWriter) SyncRecord( p []byte, wg *sync.WaitGroup, err *error, -) (logSize int64, waitDuration time.Duration, err2 error) { +) (logSize int64, err2 error) { if w.err != nil { - return -1, 0, w.err + return -1, w.err } // The `i == 0` condition ensures we handle empty records. Such records can @@ -686,9 +653,7 @@ func (w *LogWriter) SyncRecord( // MANIFEST is currently written using Writer, it is good to support the same // semantics with LogWriter. for i := 0; i == 0 || len(p) > 0; i++ { - var wd time.Duration - p, wd = w.emitFragment(i, p) - waitDuration += wd + p = w.emitFragment(i, p) } if wg != nil { @@ -707,7 +672,7 @@ func (w *LogWriter) SyncRecord( // race with our read. That's ok because the only error we could be seeing is // one to syncing for which the caller can receive notification of by passing // in a non-nil err argument. - return offset, waitDuration, nil + return offset, nil } // Size returns the current size of the file. @@ -728,7 +693,7 @@ func (w *LogWriter) emitEOFTrailer() { b.written.Store(i + int32(recyclableHeaderSize)) } -func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte, waitDuration time.Duration) { +func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) { b := w.block i := b.written.Load() first := n == 0 @@ -762,9 +727,9 @@ func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte, waitDurati for i := b.written.Load(); i < blockSize; i++ { b.buf[i] = 0 } - waitDuration = w.queueBlock() + w.queueBlock() } - return p[r:], waitDuration + return p[r:] } // Metrics must be called after Close. The callee will no longer modify the diff --git a/record/log_writer_test.go b/record/log_writer_test.go index e17577b74e..09a9c03349 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -148,7 +148,7 @@ func TestSyncError(t *testing.T) { var syncErr error var syncWG sync.WaitGroup syncWG.Add(1) - _, _, err = w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + _, err = w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() if injectedErr != syncErr { @@ -186,7 +186,7 @@ func TestSyncRecord(t *testing.T) { for i := 0; i < 100000; i++ { var syncWG sync.WaitGroup syncWG.Add(1) - offset, _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + offset, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() require.NoError(t, syncErr) @@ -214,7 +214,7 @@ func TestSyncRecordWithSignalChan(t *testing.T) { for i := 0; i < 5; i++ { var syncWG sync.WaitGroup syncWG.Add(1) - _, _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() require.NoError(t, syncErr) @@ -273,7 +273,7 @@ func TestMinSyncInterval(t *testing.T) { syncRecord := func(n int) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) - _, _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) + _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) require.NoError(t, err) return wg } @@ -344,7 +344,7 @@ func TestMinSyncIntervalClose(t *testing.T) { syncRecord := func(n int) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) - _, _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) + _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) require.NoError(t, err) return wg } @@ -379,7 +379,7 @@ func TestMetricsWithoutSync(t *testing.T) { f := &syncFileWithWait{} f.writeWG.Add(1) w := NewLogWriter(f, 0, LogWriterConfig{WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) - offset, _, err := w.SyncRecord([]byte("hello"), nil, nil) + offset, err := w.SyncRecord([]byte("hello"), nil, nil) require.NoError(t, err) const recordSize = 16 require.EqualValues(t, recordSize, offset) @@ -388,7 +388,7 @@ func TestMetricsWithoutSync(t *testing.T) { // constitutes ~14 blocks (each 32KB). const numRecords = 28 << 10 for i := 0; i < numRecords; i++ { - _, _, err = w.SyncRecord([]byte("hello"), nil, nil) + _, err = w.SyncRecord([]byte("hello"), nil, nil) require.NoError(t, err) } // Unblock the flush loop. It will run once or twice to write these blocks, @@ -430,7 +430,7 @@ func TestMetricsWithSync(t *testing.T) { wg.Add(100) for i := 0; i < 100; i++ { var syncErr error - _, _, err := w.SyncRecord([]byte("hello"), &wg, &syncErr) + _, err := w.SyncRecord([]byte("hello"), &wg, &syncErr) require.NoError(t, err) } // Unblock the flush loop. It may have run once or twice for these writes,