Skip to content

Commit

Permalink
colblk: allow finishing data, index blocks without last row
Browse files Browse the repository at this point in the history
Adapt the DataBlockWriter and IndexBlockWriter interfaces to allow a user to
finish the block without the most recently added row. This will be used during
sstable building when deciding whether to flush a block with or without an
additional KV. The sstable writer will add a new KV to the writer, query the
new size, and then decide whether to flush without the new KV (eg, if flushing
without the KV reduces memory fragmentation by more closely fitting a
configured size class).
  • Loading branch information
jbowens committed Sep 10, 2024
1 parent 2cc212b commit 117aa7d
Show file tree
Hide file tree
Showing 15 changed files with 879 additions and 47 deletions.
24 changes: 24 additions & 0 deletions internal/crdbtest/crdbtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,30 @@ func EncodeMVCCKey(dst []byte, key []byte, walltime uint64, logical uint32) []by
return EncodeTimestamp(dst, walltime, logical)
}

// AppendTimestamp appends an encoded MVCC timestamp onto key, returning the new
// key. The provided key should already have the 0x00 sentinel byte (i.e., key
// should be a proper prefix from the perspective of Pebble).
func AppendTimestamp(key []byte, walltime uint64, logical uint32) []byte {
if key[len(key)-1] != 0 {
panic(errors.AssertionFailedf("key does not end with 0x00 sentinel byte: %x", key))
}
if logical == 0 {
if walltime == 0 {
return key
}
key = append(key, make([]byte, 9)...)
binary.BigEndian.PutUint64(key[len(key)-9:], walltime)
key[len(key)-1] = 9 // Version length byte
return key
}
key = append(key, make([]byte, 13)...)
binary.BigEndian.PutUint64(key[len(key)-13:], walltime)
binary.BigEndian.PutUint32(key[len(key)-5:], logical)
key[len(key)-1] = 13 // Version length byte
return key

}

// EncodeTimestamp encodes a MVCC timestamp into a key, returning the new key.
// The key's capacity must be sufficiently large to hold the encoded timestamp.
func EncodeTimestamp(key []byte, walltime uint64, logical uint32) []byte {
Expand Down
16 changes: 12 additions & 4 deletions sstable/colblk/cockroach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type cockroachKeyWriter struct {
}

func (kw *cockroachKeyWriter) ComparePrev(key []byte) KeyComparison {
lp := kw.prefixes.LastKey()
lp := kw.prefixes.UnsafeGet(kw.prefixes.Rows() - 1)
var cmpv KeyComparison
cmpv.PrefixLen = int32(crdbtest.Split(key)) // TODO(jackson): Inline
cmpv.CommonPrefixLen = int32(crbytes.CommonPrefix(lp, key[:cmpv.PrefixLen]))
Expand Down Expand Up @@ -100,6 +100,14 @@ func (kw *cockroachKeyWriter) WriteKey(
kw.untypedSuffixes.Put(untypedSuffix)
}

func (kw *cockroachKeyWriter) MaterializeKey(dst []byte, i int) []byte {
dst = append(dst, kw.prefixes.UnsafeGet(i)...)
if untypedSuffixed := kw.untypedSuffixes.UnsafeGet(i); len(untypedSuffixed) > 0 {
return append(dst, untypedSuffixed...)
}
return crdbtest.AppendTimestamp(dst, kw.wallTimes.Get(i), uint32(kw.logicalTimes.Get(i)))
}

func (kw *cockroachKeyWriter) Reset() {
kw.prefixes.Reset()
kw.wallTimes.Reset()
Expand Down Expand Up @@ -334,7 +342,7 @@ func TestCockroachDataBlock(t *testing.T) {
w.Add(ik, values[count], block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp)
count++
}
serializedBlock := w.Finish()
serializedBlock, _ := w.Finish(w.Rows(), w.Size())
var reader DataBlockReader
var it DataBlockIter
reader.Init(cockroachKeySchema, serializedBlock)
Expand Down Expand Up @@ -410,7 +418,7 @@ func benchmarkCockroachDataBlockWriter(b *testing.B, keyConfig crdbtest.KeyConfi
w.Add(ik, values[count], block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp)
count++
}
_ = w.Finish()
_, _ = w.Finish(w.Rows(), w.Size())
}
}

Expand Down Expand Up @@ -454,7 +462,7 @@ func benchmarkCockroachDataBlockIter(b *testing.B, keyConfig crdbtest.KeyConfig,
w.Add(ik, values[count], block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp)
count++
}
serializedBlock := w.Finish()
serializedBlock, _ := w.Finish(w.Rows(), w.Size())
var reader DataBlockReader
var it DataBlockIter
reader.Init(cockroachKeySchema, serializedBlock)
Expand Down
63 changes: 45 additions & 18 deletions sstable/colblk/data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type KeyWriter interface {
// WriteKey is guaranteed to be called sequentially with increasing row
// indexes, beginning at zero.
WriteKey(row int, key []byte, keyPrefixLen, keyPrefixLenSharedWithPrev int32)
// MaterializeKey appends the zero-indexed row'th key written to dst,
// returning the result.
MaterializeKey(dst []byte, row int) []byte
}

// KeyComparison holds information about a key and its comparison to another a
Expand Down Expand Up @@ -162,7 +165,7 @@ type defaultKeyWriter struct {
}

func (w *defaultKeyWriter) ComparePrev(key []byte) KeyComparison {
lp := w.prefixes.LastKey()
lp := w.prefixes.UnsafeGet(w.prefixes.nKeys - 1)

var cmpv KeyComparison
cmpv.PrefixLen = int32(w.comparer.Split(key))
Expand Down Expand Up @@ -225,6 +228,12 @@ func (w *defaultKeyWriter) WriteKey(
w.suffixes.Put(key[keyPrefixLen:])
}

func (w *defaultKeyWriter) MaterializeKey(dst []byte, row int) []byte {
dst = append(dst, w.prefixes.UnsafeGet(row)...)
dst = append(dst, w.suffixes.UnsafeGet(row)...)
return dst
}

func (w *defaultKeyWriter) NumColumns() int {
return 2
}
Expand Down Expand Up @@ -367,6 +376,7 @@ type DataBlockWriter struct {
rows int
maximumKeyLength int
valuePrefixTmp [1]byte
lastUserKeyTmp []byte
}

// TODO(jackson): Add an isObsolete bitmap column.
Expand Down Expand Up @@ -395,6 +405,8 @@ func (w *DataBlockWriter) Init(schema KeySchema) {
w.isValueExternal.Reset()
w.rows = 0
w.maximumKeyLength = 0
w.lastUserKeyTmp = w.lastUserKeyTmp[:0]
w.enc.reset()
}

// Reset resets the data block writer to its initial state, retaining buffers.
Expand All @@ -406,6 +418,7 @@ func (w *DataBlockWriter) Reset() {
w.isValueExternal.Reset()
w.rows = 0
w.maximumKeyLength = 0
w.lastUserKeyTmp = w.lastUserKeyTmp[:0]
w.enc.reset()
}

Expand Down Expand Up @@ -484,36 +497,50 @@ func (w *DataBlockWriter) Size() int {
return int(off)
}

// Finish serializes the pending data block.
func (w *DataBlockWriter) Finish() []byte {
// Finish serializes the pending data block, including the first [rows] rows.
// The value of [rows] must be Rows() or Rows()-1. The provided size must be the
// size of the data block with the provided row count (i.e., the return value of
// [Size] when DataBlockWriter.Rows() = [rows]).
//
// Finish the returns the serialized, uncompressed data block and the
// InternalKey of the last key contained within the data block. The memory of
// the lastKey's UserKey is owned by the DataBlockWriter. The caller must
// copy it if they require it to outlive a Reset of the writer.
func (w *DataBlockWriter) Finish(rows, size int) (finished []byte, lastKey base.InternalKey) {
if invariants.Enabled && rows != w.rows && rows != w.rows-1 {
panic(errors.AssertionFailedf("data block has %d rows; asked to finish %d", w.rows, rows))
}

cols := len(w.Schema.ColumnTypes) + dataBlockColumnMax
h := Header{
Version: Version1,
Columns: uint16(cols),
Rows: uint32(w.rows),
Rows: uint32(rows),
}

// Invert the prefix-same bitmap before writing it out, because we want it
// to represent when the prefix changes.
w.prefixSame.Invert(w.rows)
w.prefixSame.Invert(rows)

w.enc.init(w.Size(), h, dataBlockCustomHeaderSize)
w.enc.init(size, h, dataBlockCustomHeaderSize)

// Write the max key length in the custom header.
binary.LittleEndian.PutUint32(w.enc.data()[:dataBlockCustomHeaderSize], uint32(w.maximumKeyLength))

// Write the user-defined key columns.
w.enc.encode(w.rows, w.KeyWriter)

// Write the internal key trailers.
w.enc.encode(w.rows, &w.trailers)

w.enc.encode(w.rows, &w.prefixSame)

// Write the value columns.
w.enc.encode(w.rows, &w.values)
w.enc.encode(w.rows, &w.isValueExternal)
return w.enc.finish()
w.enc.encode(rows, w.KeyWriter)
w.enc.encode(rows, &w.trailers)
w.enc.encode(rows, &w.prefixSame)
w.enc.encode(rows, &w.values)
w.enc.encode(rows, &w.isValueExternal)
finished = w.enc.finish()

w.lastUserKeyTmp = w.lastUserKeyTmp[:0]
w.lastUserKeyTmp = w.KeyWriter.MaterializeKey(w.lastUserKeyTmp[:0], rows-1)
lastKey = base.InternalKey{
UserKey: w.lastUserKeyTmp,
Trailer: base.InternalKeyTrailer(w.trailers.Get(rows - 1)),
}
return finished, lastKey
}

// DataBlockReaderSize is the size of a DataBlockReader struct. If allocating
Expand Down
13 changes: 9 additions & 4 deletions sstable/colblk/data_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestDataBlock(t *testing.T) {
var w DataBlockWriter
var r DataBlockReader
var it DataBlockIter
var sizes []int
datadriven.Walk(t, "testdata/data_block", func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string {
buf.Reset()
Expand All @@ -40,6 +41,7 @@ func TestDataBlock(t *testing.T) {
w.Init(testKeysSchema)
}
fmt.Fprint(&buf, &w)
sizes = sizes[:0]
return buf.String()
case "write":
for _, line := range strings.Split(td.Input, "\n") {
Expand All @@ -54,16 +56,19 @@ func TestDataBlock(t *testing.T) {
}
v := []byte(line[j+1:])
w.Add(ik, v, vp, kcmp)
sizes = append(sizes, w.Size())
}
fmt.Fprint(&buf, &w)
return buf.String()
case "finish":
block := w.Finish()
rows := w.Rows()
td.MaybeScanArgs(t, "rows", &rows)
block, lastKey := w.Finish(rows, sizes[rows-1])
r.Init(testKeysSchema, block)
f := binfmt.New(r.r.data).LineWidth(20)
r.Describe(f)

return f.String()
fmt.Fprintf(&buf, "LastKey: %s\n%s", lastKey.Pretty(testkeys.Comparer.FormatKey), f.String())
return buf.String()
case "iter":
it.Init(&r, testKeysSchema.NewKeySeeker(), func([]byte) base.LazyValue {
return base.LazyValue{ValueOrHandle: []byte("mock external value")}
Expand Down Expand Up @@ -106,7 +111,7 @@ func benchmarkDataBlockWriter(b *testing.B, prefixSize, valueSize int) {
w.Add(ik, values[j], vp, kcmp)
j++
}
w.Finish()
w.Finish(w.Rows(), w.Size())
}
}

Expand Down
35 changes: 23 additions & 12 deletions sstable/colblk/index_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ package colblk
import (
"bytes"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/binfmt"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/sstable/block"
)

Expand Down Expand Up @@ -97,26 +99,35 @@ func (w *IndexBlockWriter) UnsafeSeparator(i int) []byte {

// Size returns the size of the pending index block.
func (w *IndexBlockWriter) Size() int {
return w.size(w.rows)
}

func (w *IndexBlockWriter) size(rows int) int {
off := blockHeaderSize(indexBlockColumnCount, indexBlockCustomHeaderSize)
off = w.separators.Size(w.rows, off)
off = w.offsets.Size(w.rows, off)
off = w.lengths.Size(w.rows, off)
off = w.blockProperties.Size(w.rows, off)
off = w.separators.Size(rows, off)
off = w.offsets.Size(rows, off)
off = w.lengths.Size(rows, off)
off = w.blockProperties.Size(rows, off)
off++
return int(off)
}

// Finish serializes the pending index block.
func (w *IndexBlockWriter) Finish() []byte {
w.enc.init(w.Size(), Header{
// Finish serializes the pending index block, including the first [rows] rows.
// The value of [rows] must be Rows() or Rows()-1.
func (w *IndexBlockWriter) Finish(rows int) []byte {
if invariants.Enabled && rows != w.rows && rows != w.rows-1 {
panic(errors.AssertionFailedf("index block has %d rows; asked to finish %d", w.rows, rows))
}

w.enc.init(w.size(rows), Header{
Version: Version1,
Columns: indexBlockColumnCount,
Rows: uint32(w.rows),
Rows: uint32(rows),
}, indexBlockCustomHeaderSize)
w.enc.encode(w.rows, &w.separators)
w.enc.encode(w.rows, &w.offsets)
w.enc.encode(w.rows, &w.lengths)
w.enc.encode(w.rows, &w.blockProperties)
w.enc.encode(rows, &w.separators)
w.enc.encode(rows, &w.offsets)
w.enc.encode(rows, &w.lengths)
w.enc.encode(rows, &w.blockProperties)
return w.enc.finish()
}

Expand Down
7 changes: 5 additions & 2 deletions sstable/colblk/index_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ func TestIndexBlock(t *testing.T) {
}
w.AddBlockHandle([]byte(fields[0]), h, bp)
}
fmt.Fprintf(&buf, "UnsafeSeparator(Rows()-1) = %q\n", w.UnsafeSeparator(w.Rows()-1))
data := w.Finish()

rows := w.Rows()
d.MaybeScanArgs(t, "rows", &rows)
data := w.Finish(rows)
fmt.Fprintf(&buf, "UnsafeSeparator(%d) = %q\n", rows-1, w.UnsafeSeparator(rows-1))
r.Init(data)
fmt.Fprint(&buf, r.DebugString())
return buf.String()
Expand Down
28 changes: 24 additions & 4 deletions sstable/colblk/prefix_bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,9 @@ func (b *PrefixBytesBuilder) Reset() {
}
}

// Rows returns the number of keys added to the builder.
func (b *PrefixBytesBuilder) Rows() int { return b.nKeys }

// prefixBytesSizing maintains metadata about the size of the accumulated data
// and its encoded size. Every key addition computes a new prefixBytesSizing
// struct. The PrefixBytesBuilder maintains two prefixBytesSizing structs, one
Expand Down Expand Up @@ -861,10 +864,27 @@ func (b *PrefixBytesBuilder) Put(key []byte, bytesSharedWithPrev int) {
}
}

// LastKey returns the last key added to the builder through Put. The key is
// guaranteed to be stable until Finish or Reset is called.
func (b *PrefixBytesBuilder) LastKey() []byte {
return b.data[len(b.data)-b.sizings[(b.nKeys+1)&1].lastKeyLen:]
// UnsafeGet returns the zero-indexed i'th key added to the builder through Put.
// UnsafeGet may only be used to retrieve the Rows()-1'th or Rows()-2'th keys.
// If called with a different i value, UnsafeGet panics. The keys returned by
// UnsafeGet are guaranteed to be stable until Finish or Reset is called. The
// caller must not mutate the returned slice.
func (b *PrefixBytesBuilder) UnsafeGet(i int) []byte {
switch i {
case b.nKeys - 1:
// The last key is the [lastKeyLen] bytes.
return b.data[len(b.data)-b.sizings[i&1].lastKeyLen:]
case b.nKeys - 2:
// Check if the very last key is a duplicate of the second-to-last key.
lastKeyLen := b.sizings[(i+1)&1].lastKeyLen
if b.offsets.elems.At(b.rowSuffixIndex(i+1)) == b.offsets.elems.At(b.rowSuffixIndex(i+2)) {
return b.data[len(b.data)-b.sizings[i&1].lastKeyLen:]
}
lastLastKeyLen := b.sizings[i&1].lastKeyLen
return b.data[len(b.data)-lastKeyLen-lastLastKeyLen : len(b.data)-lastKeyLen]
default:
panic(errors.AssertionFailedf("UnsafeGet(%d) called on PrefixBytes with %d keys", i, b.nKeys))
}
}

// addOffset adds an offset to the offsets table. If necessary, addOffset will
Expand Down
9 changes: 8 additions & 1 deletion sstable/colblk/prefix_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestPrefixBytes(t *testing.T) {
for _, k := range inputKeys {
keyPrefixLenSharedWithPrev := len(k)
if builder.nKeys > 0 {
keyPrefixLenSharedWithPrev = crbytes.CommonPrefix(builder.LastKey(), k)
keyPrefixLenSharedWithPrev = crbytes.CommonPrefix(builder.UnsafeGet(builder.nKeys-1), k)
}
p := []byte(k)
builder.Put(p, keyPrefixLenSharedWithPrev)
Expand All @@ -56,6 +56,13 @@ func TestPrefixBytes(t *testing.T) {
}
fmt.Fprint(&out, builder.debugString(0))
return out.String()
case "unsafe-get":
var indices []int
td.ScanArgs(t, "i", &indices)
for _, i := range indices {
fmt.Fprintf(&out, "UnsafeGet(%d) = %s\n", i, builder.UnsafeGet(i))
}
return out.String()
case "finish":
var rows int
td.ScanArgs(t, "rows", &rows)
Expand Down
Loading

0 comments on commit 117aa7d

Please sign in to comment.