Skip to content

Commit

Permalink
db: add TestCommitPipelineLogDataSeqNum
Browse files Browse the repository at this point in the history
Add a unit test covering the sequence number publishing of a batch containing
only LogData. A batch containing only a LogData may adopt the sequence number
that will be assigned to the next KV committed, but it must not ratchet the
sequence number high enough so as to make the sequence number visible.
  • Loading branch information
jbowens committed Feb 7, 2024
1 parent 15f6f13 commit 291a41d
Showing 1 changed file with 70 additions and 0 deletions.
70 changes: 70 additions & 0 deletions commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/cockroachdb/pebble/internal/arenaskl"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/vfs"
Expand Down Expand Up @@ -284,6 +285,75 @@ func TestCommitPipelineWALClose(t *testing.T) {
}
}

// TestCommitPipelineLogDataSeqNum ensures committing a KV and a LogData
// concurrently never publishes the KV's sequence number before it's been fully
// applied to the memtable (which would violate the consistency of iterators
// to which that sequence number is visible).
//
// A LogData batch reads the 'next sequence number' without incrementing it,
// effectively sharing the sequence number with the next key committed. It may
// finish applying to the memtable before the KV that shares its sequence
// number. However, sequence number publishing ratchets the visible sequence
// number to the batch's first seqnum + number of batch entries ..., so for e.g.
// with first seqnum = 5 and number of entries = 3, it will ratchet to 8. This
// means all seqnums strictly less than 8 are visible. So a LogData batch which
// also grabbed the first seqnum = 5 before this batch, will ratchet to 5 + 0,
// which is a noop.
func TestCommitPipelineLogDataSeqNum(t *testing.T) {
var testEnv commitEnv
testEnv = commitEnv{
logSeqNum: new(atomic.Uint64),
visibleSeqNum: new(atomic.Uint64),
apply: func(b *Batch, mem *memTable) error {
// Jitter a delay in memtable application to get test coverage of
// varying interleavings of which batch completes memtable
// application first.
time.Sleep(time.Duration(rand.Float64() * 20.0 * float64(time.Millisecond)))
// Ensure that our sequence number is not published before we've
// returned from apply.
//
// If b is the Set("foo","bar") batch, the LogData batch sharing the
// sequence number may have already entered commitPipeline.publish,
// but the sequence number it publishes should not be high enough to
// make this batch's KV visible.
//
// It may set visibleSeqNum = b.SeqNum(), but seqnum X is not
// considered visible until the visibleSeqNum is >X.
require.False(t, base.Visible(
b.SeqNum(), // Seqnum of the first KV in the batch b
testEnv.visibleSeqNum.Load(), // Snapshot seqnum
InternalKeySeqNumMax, // Indexed batch "seqnum" (unused here)
))
return nil
},
write: func(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
if syncWG != nil {
syncWG.Done()
}
return nil, nil
},
}
testEnv.logSeqNum.Store(base.SeqNumStart)
testEnv.visibleSeqNum.Store(base.SeqNumStart)
p := newCommitPipeline(testEnv)

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
b := &Batch{}
require.NoError(t, b.Set([]byte("foo"), []byte("bar"), nil))
require.NoError(t, p.Commit(b, false /* sync */, false))
}()
go func() {
defer wg.Done()
b := &Batch{}
require.NoError(t, b.LogData([]byte("foo"), nil))
require.NoError(t, p.Commit(b, false /* sync */, false))
}()
wg.Wait()
}

func BenchmarkCommitPipeline(b *testing.B) {
for _, noSyncWait := range []bool{false, true} {
for _, parallelism := range []int{1, 2, 4, 8, 16, 32, 64, 128} {
Expand Down

0 comments on commit 291a41d

Please sign in to comment.