Skip to content

Commit

Permalink
colblk: add PrefixBytesIter for stateful key synthesis
Browse files Browse the repository at this point in the history
Replace PrefixBytes.{AppendAt,SetNextAt} with a new PrefixBytes.{SetAt,SetNext}
API that uses a PrefixBytesIter type to hold the key buffer and metadata that
can be used to more cheaply perform SetNext operations. Subsequent work will
also extend the API to support a SetPrev method.

```
goos: darwin
goarch: arm64
pkg: github.com/cockroachdb/pebble/sstable/colblk
                                     │   old.txt   │               new.txt               │
                                     │   sec/op    │   sec/op     vs base                │
PrefixBytes/alphaLen=2/iteration-10    7.799n ± 4%   5.600n ± 1%  -28.20% (p=0.000 n=20)
PrefixBytes/alphaLen=5/iteration-10    7.741n ± 1%   5.590n ± 1%  -27.78% (p=0.000 n=20)
PrefixBytes/alphaLen=26/iteration-10   7.655n ± 0%   5.616n ± 1%  -26.63% (p=0.000 n=20)
geomean                                7.732n        5.602n       -27.54%
```
  • Loading branch information
jbowens committed Aug 8, 2024
1 parent 3419a64 commit 791b374
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 53 deletions.
18 changes: 17 additions & 1 deletion sstable/colblk/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

package colblk

import "golang.org/x/exp/constraints"
import (
"unsafe"

"golang.org/x/exp/constraints"
)

// align returns the next value greater than or equal to offset that's divisible
// by val.
Expand Down Expand Up @@ -42,3 +46,15 @@ const (
align32Shift = 2
align64Shift = 3
)

// TODO(jackson): A subsequent Go release will remove the ability to call these
// runtime functions. We should consider asm implementations that we maintain
// within the crlib repo.
//
// See https://github.com/golang/go/issues/67401

//go:linkname memmove runtime.memmove
func memmove(to, from unsafe.Pointer, n uintptr)

//go:linkname mallocgc runtime.mallocgc
func mallocgc(size uintptr, typ unsafe.Pointer, needzero bool) unsafe.Pointer
170 changes: 123 additions & 47 deletions sstable/colblk/prefix_bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,49 +238,119 @@ func (b PrefixBytes) At(i int) []byte {
return slices.Concat(b.SharedPrefix(), b.RowBundlePrefix(i), b.RowSuffix(i))
}

// AppendAt appends the i'th []byte slice in the PrefixBytes onto the provided
// bytes slice.
func (b *PrefixBytes) AppendAt(dst []byte, i int) []byte {
// Inline dst = append(dst, b.SharedPrefix()...)
dst = append(dst, unsafe.Slice((*byte)(b.rawBytes.data), b.rawBytes.offsets.At(0))...)
dst = append(dst, b.RowBundlePrefix(i)...)
return append(dst, b.RowSuffix(i)...)
// PrefixBytesIter is an iterator and associated buffers for PrefixBytes. It
// provides a means for efficiently iterating over the []byte slices contained
// within a PrefixBytes, avoiding unnecessary copying when portions of slices
// are shared.
type PrefixBytesIter struct {
UnsafeBuf
bundlePrefixLen uint32
offsetIndex int
nextBundleOffsetIndex int
}

// SetNextAt updates the provided buf byte slice to hold the i'th []byte slice
// in the PrefixBytes. SetNextAt requires the provided buf to currently hold the
// i-1'th []byte slice in the PrefixBytes. If the i'th slice does not fit in
// buf, SetNextAt allocates a new slice.
func (b *PrefixBytes) SetNextAt(buf []byte, i int) []byte {
if invariants.Enabled {
if x := b.At(i - 1); !bytes.Equal(buf, x) {
panic(errors.AssertionFailedf("buf (%q) does not hold the previous slice (%q)", buf, x))
}
}

// SetAt updates the provided PrefixBytesIter to hold the i'th []byte slice in
// the PrefixBytes.
func (b *PrefixBytes) SetAt(it *PrefixBytesIter, i int) {
// Determine the offset and length of the bundle prefix.
bundleOffsetIndex := b.bundleOffsetIndexForRow(i)
rowSuffixIndex := b.rowSuffixIndex(i)
if rowSuffixIndex == bundleOffsetIndex+1 {
// The key i is the first key in a new bundle. We need to replace the
// bundle prefix in buf. We can copy the entirety of the bundle prefix
// and the row suffix because they're stored contiguously.
s := b.rawBytes.offsets.At(bundleOffsetIndex)
e := b.rawBytes.offsets.At(rowSuffixIndex + 1)
return append(buf[:b.sharedPrefixLen], b.rawBytes.slice(s, e)...)
bundleOffsetStart := b.rawBytes.offsets.At(bundleOffsetIndex)
it.bundlePrefixLen = b.rawBytes.offsets.At(bundleOffsetIndex+1) - bundleOffsetStart

// Determine the offset and length of the row's individual suffix.
it.offsetIndex = b.rowSuffixIndex(i)
// TODO(jackson): rowSuffixOffsets will recompute bundleOffsetIndexForRow in
// the case that the row is a duplicate key. Is it worth optimizing to avoid
// this recomputation? The expected case is non-duplicate keys, so it may
// not be worthwhile.
rowSuffixStart, rowSuffixEnd := b.rowSuffixOffsets(i, it.offsetIndex)

// Grow the size of the iterator's buffer if necessary.
it.len = b.sharedPrefixLen + int(it.bundlePrefixLen) + int(rowSuffixEnd-rowSuffixStart)
if it.len > it.cap {
it.cap = it.len << 1
it.ptr = mallocgc(uintptr(it.cap), nil, false)
}

// The key i is a new key in the same bundle.
rowSuffixStart := b.rawBytes.offsets.At(rowSuffixIndex)
rowSuffixEnd := b.rawBytes.offsets.At(rowSuffixIndex + 1)
if rowSuffixStart == rowSuffixEnd {
// The start and end offsets are equal, indicating that the key is a
// duplicate. Since it's identical to the previous key, there's nothing
// left to do, we can return buf as-is.
return buf
// Copy the shared key prefix.
memmove(it.ptr, b.rawBytes.data, uintptr(b.sharedPrefixLen))
// Copy the bundle prefix.
memmove(
unsafe.Pointer(uintptr(it.ptr)+uintptr(b.sharedPrefixLen)),
unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(bundleOffsetStart)),
uintptr(it.bundlePrefixLen))
// Copy the per-row suffix.
memmove(
unsafe.Pointer(uintptr(it.ptr)+uintptr(b.sharedPrefixLen)+uintptr(it.bundlePrefixLen)),
unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
uintptr(rowSuffixEnd-rowSuffixStart))
// Set nextBundleOffsetIndex so that a call to SetNext can cheaply determine
// whether the next row is in the same bundle.
it.nextBundleOffsetIndex = bundleOffsetIndex + (1 << b.bundleShift) + 1
}

// SetNext updates the provided PrefixBytesIter to hold the next []byte slice in
// the PrefixBytes. SetNext requires the provided iter to currently hold a slice
// and for a subsequent slice to exist within the PrefixBytes.
func (b *PrefixBytes) SetNext(it *PrefixBytesIter) {
it.offsetIndex++
// If the next row is in the same bundle, we can take a fast path of only
// updating the per-row suffix.
if it.offsetIndex < it.nextBundleOffsetIndex {
rowSuffixStart := b.rawBytes.offsets.At(it.offsetIndex)
rowSuffixEnd := b.rawBytes.offsets.At(it.offsetIndex + 1)
if rowSuffixStart == rowSuffixEnd {
// The start and end offsets are equal, indicating that the key is a
// duplicate. Since it's identical to the previous key, there's
// nothing left to do, we can leave buf as-is.
return
}
// Grow the buffer if necessary.
it.len = b.sharedPrefixLen + int(it.bundlePrefixLen) + int(rowSuffixEnd-rowSuffixStart)
if it.len > it.cap {
it.cap = it.len << 1
prevPtr := it.ptr
it.ptr = mallocgc(uintptr(it.cap), nil, false)
memmove(it.ptr, prevPtr, uintptr(b.sharedPrefixLen)+uintptr(it.bundlePrefixLen))
}
// Copy in the per-row suffix.
memmove(
unsafe.Pointer(uintptr(it.ptr)+uintptr(b.sharedPrefixLen)+uintptr(it.bundlePrefixLen)),
unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
uintptr(rowSuffixEnd-rowSuffixStart))
return
}
bundlePrefixLen := b.rawBytes.offsets.At(bundleOffsetIndex+1) - b.rawBytes.offsets.At(bundleOffsetIndex)
return append(buf[:b.sharedPrefixLen+int(bundlePrefixLen)], b.rawBytes.slice(rowSuffixStart, rowSuffixEnd)...)

// We've reached the end of the bundle. We need to update the bundle prefix.
// The offsetIndex is currently pointing to the start of the new bundle
// prefix. Increment it to point at the start of the new row suffix.
it.offsetIndex++
rowSuffixStart := b.rawBytes.offsets.At(it.offsetIndex)
rowSuffixEnd := b.rawBytes.offsets.At(it.offsetIndex + 1)

// Read the offsets of the new bundle prefix and update the index of the
// next bundle.
bundlePrefixStart := b.rawBytes.offsets.At(it.nextBundleOffsetIndex)
it.bundlePrefixLen = rowSuffixStart - bundlePrefixStart
it.nextBundleOffsetIndex = it.offsetIndex + (1 << b.bundleShift)

// Grow the buffer if necessary.
it.len = b.sharedPrefixLen + int(it.bundlePrefixLen) + int(rowSuffixEnd-rowSuffixStart)
if it.len > it.cap {
it.cap = it.len << 1
it.ptr = mallocgc(uintptr(it.cap), nil, false)
memmove(it.ptr, b.rawBytes.data, uintptr(b.sharedPrefixLen))
}
// Copy in the new bundle suffix.
memmove(
unsafe.Pointer(uintptr(it.ptr)+uintptr(b.sharedPrefixLen)),
unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(bundlePrefixStart)),
uintptr(it.bundlePrefixLen))
// Copy in the per-row suffix.
memmove(
unsafe.Pointer(uintptr(it.ptr)+uintptr(b.sharedPrefixLen)+uintptr(it.bundlePrefixLen)),
unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
uintptr(rowSuffixEnd-rowSuffixStart))
}

// SharedPrefix return a []byte of the shared prefix that was extracted from
Expand Down Expand Up @@ -313,33 +383,39 @@ func (b *PrefixBytes) BundlePrefix(i int) []byte {
//
// The returned slice should not be mutated.
func (b *PrefixBytes) RowSuffix(row int) []byte {
i := b.rowSuffixIndex(row)
return b.rawBytes.slice(b.rowSuffixOffsets(row, b.rowSuffixIndex(row)))
}

// rowSuffixOffsets finds the start and end offsets of the row's suffix slice,
// accounting for duplicate keys. It takes the index of the row, and the value
// of rowSuffixIndex(row).
func (b *PrefixBytes) rowSuffixOffsets(row, i int) (low uint32, high uint32) {
// Retrieve the low and high offsets indicating the start and end of the
// row's suffix slice.
lowOff := b.rawBytes.offsets.At(i)
highOff := b.rawBytes.offsets.At(i + 1)
low = b.rawBytes.offsets.At(i)
high = b.rawBytes.offsets.At(i + 1)
// If there's a non-empty slice for the row, this row is different than its
// predecessor.
if lowOff != highOff {
return b.rawBytes.slice(lowOff, highOff)
if low != high {
return low, high
}
// Otherwise, an empty slice indicates a duplicate key. We need to find the
// first non-empty predecessor within the bundle, or if all the rows are
// empty, return nil.
// empty, return arbitrary equal low and high.
//
// Compute the index of the first row in the bundle so we know when to stop.
firstIndex := 1 + b.bundleOffsetIndexForRow(row)
for i > firstIndex {
// Step back a row, and check if the slice is non-empty.
i--
highOff = lowOff
lowOff = b.rawBytes.offsets.At(i)
if lowOff != highOff {
return b.rawBytes.slice(lowOff, highOff)
high = low
low = b.rawBytes.offsets.At(i)
if low != high {
return low, high
}
}
// All the rows in the bundle are empty.
return nil
return low, high
}

// Rows returns the count of rows whose keys are encoded within the PrefixBytes.
Expand Down
10 changes: 5 additions & 5 deletions sstable/colblk/prefix_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,17 +274,17 @@ func BenchmarkPrefixBytes(b *testing.B) {
buf = build(n)
pb, _ := DecodePrefixBytes(buf, 0, n)
b.ResetTimer()
var k []byte
var pbi PrefixBytesIter
for i := 0; i < b.N; i++ {
j := i % n
if j == 0 {
k = pb.AppendAt(k[:0], j)
pb.SetAt(&pbi, j)
} else {
k = pb.SetNextAt(k, j)
pb.SetNext(&pbi)
}
if invariants.Enabled && !bytes.Equal(k, userKeys[j]) {
if invariants.Enabled && !bytes.Equal(pbi.UnsafeSlice(), userKeys[j]) {
b.Fatalf("Constructed key %q (%q, %q, %q) for index %d; expected %q",
k, pb.SharedPrefix(), pb.RowBundlePrefix(j), pb.RowSuffix(j), j, userKeys[j])
pbi.UnsafeSlice(), pb.SharedPrefix(), pb.RowBundlePrefix(j), pb.RowSuffix(j), j, userKeys[j])
}
}
})
Expand Down
13 changes: 13 additions & 0 deletions sstable/colblk/unsafe_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,16 @@ func (s UnsafeIntegerSlice[T]) At(i int) T {
panic("unreachable")
}
}

// UnsafeBuf provides a buffer without bounds checking. Every buf has a len and
// capacity.
type UnsafeBuf struct {
ptr unsafe.Pointer
len int
cap int
}

// UnsafeSlice returns the current contents of the buf.
func (b *UnsafeBuf) UnsafeSlice() []byte {
return unsafe.Slice((*byte)(b.ptr), b.len)
}

0 comments on commit 791b374

Please sign in to comment.