Skip to content

Commit

Permalink
record: allow unbounded queued blocks in LogWriter
Browse files Browse the repository at this point in the history
Previously, a LogWriter would allocate up to 16 blocks of 32 KiB for buffering
WAL writes. If all 16 blocks had been allocated and no free blocks were
available, a batch writing to the WAL would queue until the flushing goroutine
freed blocks. In testing of write-heavy workloads, especially with larger value
sizes, we've seen queueing at the LogWriter. This queueing blocks the commit
pipeline, preventing any batches from committing regardless of priority and
whether they require waiting for fsync.

This commit modifies LogWriter to allow the queueing of an unbounded number of
blocks. In practice, for the current WAL, the memtable size serves as an upper
bound. With a 64 MiB memtable, at most 64 MiB / 32 KiB = 2,048 blocks may
queue. This is not an unreasonable of additional memory overhead for a
write-heavy workload.

Beyond improving throughput for write-heavy workloads, removing this hard bound
improves tolerance of momentary fsync stalls.

Informs cockroachdb/cockroach#88699.
  • Loading branch information
jbowens committed Jul 20, 2023
1 parent fb76a24 commit 5db333d
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 105 deletions.
3 changes: 0 additions & 3 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,6 @@ type BatchCommitStats struct {
// SemaphoreWaitDuration is the wait time for semaphores in
// commitPipeline.Commit.
SemaphoreWaitDuration time.Duration
// WALQueueWaitDuration is the wait time for allocating memory blocks in the
// LogWriter (due to the LogWriter not writing fast enough).
WALQueueWaitDuration time.Duration
// MemTableWriteStallDuration is the wait caused by a write stall due to too
// many memtables (due to not flushing fast enough).
MemTableWriteStallDuration time.Duration
Expand Down
32 changes: 0 additions & 32 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/pebble/internal/batchskl"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1233,28 +1232,6 @@ func TestBatchCommitStats(t *testing.T) {
}
}

// WAL queue stall funcs.
//
// The LogWriter gets changed when stalling/unstalling the memtable, so we
// need to use a hook to tell us about the latest LogWriter.
var unstallWALQueue func()
stallWALQueue := func() {
var unstallLatestWALQueue func()
db.mu.Lock()
defer db.mu.Unlock()
db.mu.log.registerLogWriterForTesting = func(w *record.LogWriter) {
// db.mu will be held when this is called.
unstallLatestWALQueue = w.ReserveAllFreeBlocksForTesting()
}
db.mu.log.registerLogWriterForTesting(db.mu.log.LogWriter)
unstallWALQueue = func() {
db.mu.Lock()
defer db.mu.Unlock()
db.mu.log.registerLogWriterForTesting = nil
unstallLatestWALQueue()
}
}

// Commit wait stall funcs.
var unstallCommitWait func()
stallCommitWait := func() {
Expand All @@ -1268,12 +1245,9 @@ func TestBatchCommitStats(t *testing.T) {
stallCommitSemaphore()
stallMemtable()
stallL0ReadAmp()
stallWALQueue()
stallCommitWait()

// Exceed initialMemTableSize -- this is needed to make stallMemtable work.
// It also exceeds record.blockSize, requiring a new block to be allocated,
// which is what we need for stallWALQueue to work.
require.NoError(t, b.Set(make([]byte, initialMemTableSize), nil, nil))

var commitWG sync.WaitGroup
Expand All @@ -1291,8 +1265,6 @@ func TestBatchCommitStats(t *testing.T) {
time.Sleep(sleepDuration)
unstallL0ReadAmp()
time.Sleep(sleepDuration)
unstallWALQueue()
time.Sleep(sleepDuration)
unstallCommitWait()

// Wait for Apply to return.
Expand All @@ -1303,10 +1275,6 @@ func TestBatchCommitStats(t *testing.T) {
return errors.Errorf("SemaphoreWaitDuration %s is too low",
stats.SemaphoreWaitDuration.String())
}
if expectedDuration > stats.WALQueueWaitDuration {
return errors.Errorf("WALQueueWaitDuration %s is too low",
stats.WALQueueWaitDuration.String())
}
if expectedDuration > stats.MemTableWriteStallDuration {
return errors.Errorf("MemTableWriteStallDuration %s is too low",
stats.MemTableWriteStallDuration.String())
Expand Down
4 changes: 2 additions & 2 deletions commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestCommitPipelineWALClose(t *testing.T) {
return nil
},
write: func(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
_, _, err := wal.SyncRecord(b.data, syncWG, syncErr)
_, err := wal.SyncRecord(b.data, syncWG, syncErr)
return nil, err
},
}
Expand Down Expand Up @@ -316,7 +316,7 @@ func BenchmarkCommitPipeline(b *testing.B) {
break
}

_, _, err := wal.SyncRecord(b.data, syncWG, syncErr)
_, err := wal.SyncRecord(b.data, syncWG, syncErr)
return mem, err
},
}
Expand Down
4 changes: 2 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem
b.flushable.setSeqNum(b.SeqNum())
if !d.opts.DisableWAL {
var err error
size, b.commitStats.WALQueueWaitDuration, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -947,7 +947,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem
}

if b.flushable == nil {
size, b.commitStats.WALQueueWaitDuration, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
if err != nil {
panic(err)
}
Expand Down
81 changes: 23 additions & 58 deletions record/log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,7 @@ type LogWriter struct {
block *block
free struct {
sync.Mutex
// Condition variable used to signal a block is freed.
cond sync.Cond
blocks []*block
allocated int
blocks []*block
}

flusher struct {
Expand Down Expand Up @@ -313,9 +310,10 @@ type LogWriterConfig struct {
QueueSemChan chan struct{}
}

// CapAllocatedBlocks is the maximum number of blocks allocated by the
// LogWriter.
const CapAllocatedBlocks = 16
// initialAllocatedBlocksCap is the initial capacity of the various slices
// intended to hold LogWriter blocks. The LogWriter may allocate more blocks
// than this threshold allows.
const initialAllocatedBlocksCap = 32

// NewLogWriter returns a new LogWriter.
func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterConfig) *LogWriter {
Expand All @@ -335,9 +333,7 @@ func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterCon
},
queueSemChan: logWriterConfig.QueueSemChan,
}
r.free.cond.L = &r.free.Mutex
r.free.blocks = make([]*block, 0, CapAllocatedBlocks)
r.free.allocated = 1
r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
r.block = &block{}
r.flusher.ready.init(&r.flusher.Mutex, &r.flusher.syncQ)
r.flusher.closed = make(chan struct{})
Expand Down Expand Up @@ -405,8 +401,12 @@ func (w *LogWriter) flushLoop(context.Context) {
// the flush work (w.block.written.Load()).

// The list of full blocks that need to be written. This is copied from
// f.pending on every loop iteration, though the number of elements is small
// (usually 1, max 16).
// f.pending on every loop iteration, though the number of elements is
// usually small (most frequently 1). In the case of the WAL LogWriter, the
// number of blocks is bounded by the size of the WAL's corresponding
// memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
// this works out to at most 2048 elements if the entirety of the memtable's
// contents are queued.
pending := make([]*block, 0, cap(f.pending))
for {
for {
Expand All @@ -432,8 +432,7 @@ func (w *LogWriter) flushLoop(context.Context) {
// Found work to do, so no longer idle.
workStartTime := time.Now()
idleDuration := workStartTime.Sub(idleStartTime)
pending = pending[:len(f.pending)]
copy(pending, f.pending)
pending = append(pending[:0], f.pending...)
f.pending = f.pending[:0]
f.metrics.PendingBufferLen.AddSample(int64(len(pending)))

Expand Down Expand Up @@ -556,28 +555,18 @@ func (w *LogWriter) flushBlock(b *block) error {
b.flushed = 0
w.free.Lock()
w.free.blocks = append(w.free.blocks, b)
w.free.cond.Signal()
w.free.Unlock()
return nil
}

// queueBlock queues the current block for writing to the underlying writer,
// allocates a new block and reserves space for the next header.
func (w *LogWriter) queueBlock() (waitDuration time.Duration) {
func (w *LogWriter) queueBlock() {
// Allocate a new block, blocking until one is available. We do this first
// because w.block is protected by w.flusher.Mutex.
w.free.Lock()
if len(w.free.blocks) == 0 {
if w.free.allocated < cap(w.free.blocks) {
w.free.allocated++
w.free.blocks = append(w.free.blocks, &block{})
} else {
now := time.Now()
for len(w.free.blocks) == 0 {
w.free.cond.Wait()
}
waitDuration = time.Since(now)
}
w.free.blocks = append(w.free.blocks, &block{})
}
nextBlock := w.free.blocks[len(w.free.blocks)-1]
w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
Expand All @@ -592,28 +581,6 @@ func (w *LogWriter) queueBlock() (waitDuration time.Duration) {
f.Unlock()

w.blockNum++
return waitDuration
}

// ReserveAllFreeBlocksForTesting is used to only for testing.
func (w *LogWriter) ReserveAllFreeBlocksForTesting() (releaseFunc func()) {
w.free.Lock()
defer w.free.Unlock()
free := w.free.blocks
w.free.blocks = nil
return func() {
w.free.Lock()
defer w.free.Unlock()
// It is possible that someone has pushed a free block and w.free.blocks
// is no longer nil. That is harmless. Also, the waiter loops on the
// condition len(w.free.blocks) == 0, so to actually unblock it we need to
// give it a free block.
if len(free) == 0 {
free = append(free, &block{})
}
w.free.blocks = free
w.free.cond.Broadcast()
}
}

// Close flushes and syncs any unwritten data and closes the writer.
Expand Down Expand Up @@ -665,7 +632,7 @@ func (w *LogWriter) Close() error {
// of the record.
// External synchronisation provided by commitPipeline.mu.
func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
logSize, _, err := w.SyncRecord(p, nil, nil)
logSize, err := w.SyncRecord(p, nil, nil)
return logSize, err
}

Expand All @@ -676,19 +643,17 @@ func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
// External synchronisation provided by commitPipeline.mu.
func (w *LogWriter) SyncRecord(
p []byte, wg *sync.WaitGroup, err *error,
) (logSize int64, waitDuration time.Duration, err2 error) {
) (logSize int64, err2 error) {
if w.err != nil {
return -1, 0, w.err
return -1, w.err
}

// The `i == 0` condition ensures we handle empty records. Such records can
// possibly be generated for VersionEdits stored in the MANIFEST. While the
// MANIFEST is currently written using Writer, it is good to support the same
// semantics with LogWriter.
for i := 0; i == 0 || len(p) > 0; i++ {
var wd time.Duration
p, wd = w.emitFragment(i, p)
waitDuration += wd
p = w.emitFragment(i, p)
}

if wg != nil {
Expand All @@ -707,7 +672,7 @@ func (w *LogWriter) SyncRecord(
// race with our read. That's ok because the only error we could be seeing is
// one to syncing for which the caller can receive notification of by passing
// in a non-nil err argument.
return offset, waitDuration, nil
return offset, nil
}

// Size returns the current size of the file.
Expand All @@ -728,7 +693,7 @@ func (w *LogWriter) emitEOFTrailer() {
b.written.Store(i + int32(recyclableHeaderSize))
}

func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte, waitDuration time.Duration) {
func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
b := w.block
i := b.written.Load()
first := n == 0
Expand Down Expand Up @@ -762,9 +727,9 @@ func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte, waitDurati
for i := b.written.Load(); i < blockSize; i++ {
b.buf[i] = 0
}
waitDuration = w.queueBlock()
w.queueBlock()
}
return p[r:], waitDuration
return p[r:]
}

// Metrics must be called after Close. The callee will no longer modify the
Expand Down
16 changes: 8 additions & 8 deletions record/log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestSyncError(t *testing.T) {
var syncErr error
var syncWG sync.WaitGroup
syncWG.Add(1)
_, _, err = w.SyncRecord([]byte("hello"), &syncWG, &syncErr)
_, err = w.SyncRecord([]byte("hello"), &syncWG, &syncErr)
require.NoError(t, err)
syncWG.Wait()
if injectedErr != syncErr {
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestSyncRecord(t *testing.T) {
for i := 0; i < 100000; i++ {
var syncWG sync.WaitGroup
syncWG.Add(1)
offset, _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr)
offset, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr)
require.NoError(t, err)
syncWG.Wait()
require.NoError(t, syncErr)
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestSyncRecordWithSignalChan(t *testing.T) {
for i := 0; i < 5; i++ {
var syncWG sync.WaitGroup
syncWG.Add(1)
_, _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr)
_, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr)
require.NoError(t, err)
syncWG.Wait()
require.NoError(t, syncErr)
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestMinSyncInterval(t *testing.T) {
syncRecord := func(n int) *sync.WaitGroup {
wg := &sync.WaitGroup{}
wg.Add(1)
_, _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error))
_, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error))
require.NoError(t, err)
return wg
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestMinSyncIntervalClose(t *testing.T) {
syncRecord := func(n int) *sync.WaitGroup {
wg := &sync.WaitGroup{}
wg.Add(1)
_, _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error))
_, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error))
require.NoError(t, err)
return wg
}
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestMetricsWithoutSync(t *testing.T) {
f := &syncFileWithWait{}
f.writeWG.Add(1)
w := NewLogWriter(f, 0, LogWriterConfig{WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})})
offset, _, err := w.SyncRecord([]byte("hello"), nil, nil)
offset, err := w.SyncRecord([]byte("hello"), nil, nil)
require.NoError(t, err)
const recordSize = 16
require.EqualValues(t, recordSize, offset)
Expand All @@ -388,7 +388,7 @@ func TestMetricsWithoutSync(t *testing.T) {
// constitutes ~14 blocks (each 32KB).
const numRecords = 28 << 10
for i := 0; i < numRecords; i++ {
_, _, err = w.SyncRecord([]byte("hello"), nil, nil)
_, err = w.SyncRecord([]byte("hello"), nil, nil)
require.NoError(t, err)
}
// Unblock the flush loop. It will run once or twice to write these blocks,
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestMetricsWithSync(t *testing.T) {
wg.Add(100)
for i := 0; i < 100; i++ {
var syncErr error
_, _, err := w.SyncRecord([]byte("hello"), &wg, &syncErr)
_, err := w.SyncRecord([]byte("hello"), &wg, &syncErr)
require.NoError(t, err)
}
// Unblock the flush loop. It may have run once or twice for these writes,
Expand Down

0 comments on commit 5db333d

Please sign in to comment.