Skip to content

Commit

Permalink
block: move block handle encoding and decoding
Browse files Browse the repository at this point in the history
Move block.HandleWithProperties and the routines for encoding and decoding
block handles in their variable-width format into the block package.
  • Loading branch information
jbowens committed Aug 16, 2024
1 parent 0e8a11a commit 1a5295f
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 104 deletions.
55 changes: 55 additions & 0 deletions sstable/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,61 @@ type Handle struct {
Offset, Length uint64
}

// EncodeVarints encodes the block handle into dst using a variable-width
// encoding and returns the number of bytes written.
func (h Handle) EncodeVarints(dst []byte) int {
n := binary.PutUvarint(dst, h.Offset)
m := binary.PutUvarint(dst[n:], h.Length)
return n + m
}

// HandleWithProperties is used for data blocks and first/lower level index
// blocks, since they can be annotated using BlockPropertyCollectors.
type HandleWithProperties struct {
Handle
Props []byte
}

// EncodeVarints encodes the block handle and properties into dst using a
// variable-width encoding and returns the number of bytes written.
func (h HandleWithProperties) EncodeVarints(dst []byte) []byte {
n := h.Handle.EncodeVarints(dst)
dst = append(dst[:n], h.Props...)
return dst
}

// DecodeHandle returns the block handle encoded in a variable-width encoding at
// the start of src, as well as the number of bytes it occupies. It returns zero
// if given invalid input. A block handle for a data block or a first/lower
// level index block should not be decoded using DecodeHandle since the caller
// may validate that the number of bytes decoded is equal to the length of src,
// which will be false if the properties are not decoded. In those cases the
// caller should use DecodeHandleWithProperties.
func DecodeHandle(src []byte) (Handle, int) {
offset, n := binary.Uvarint(src)
length, m := binary.Uvarint(src[n:])
if n == 0 || m == 0 {
return Handle{}, 0
}
return Handle{Offset: offset, Length: length}, n + m
}

// DecodeHandleWithProperties returns the block handle and properties encoded in
// a variable-width encoding at the start of src. src needs to be exactly the
// length that was encoded. This method must be used for data block and
// first/lower level index blocks. The properties in the block handle point to
// the bytes in src.
func DecodeHandleWithProperties(src []byte) (HandleWithProperties, error) {
bh, n := DecodeHandle(src)
if n == 0 {
return HandleWithProperties{}, errors.Errorf("invalid block.Handle")
}
return HandleWithProperties{
Handle: bh,
Props: src[n:],
}, nil
}

// TrailerLen is the length of the trailer at the end of a block.
const TrailerLen = 5

Expand Down
7 changes: 4 additions & 3 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/rowblk"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -913,7 +914,7 @@ func TestBlockProperties(t *testing.T) {
var i int
iter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
for kv := iter.First(); kv != nil; kv = iter.Next() {
bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -1304,7 +1305,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {

for kv := i.First(); kv != nil; kv = i.Next() {
sb.WriteString(fmt.Sprintf("%s:\n", kv.K))
bhp, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
bhp, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
if err != nil {
return err.Error()
}
Expand All @@ -1326,7 +1327,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
}
for kv := subiter.First(); kv != nil; kv = subiter.Next() {
sb.WriteString(fmt.Sprintf(" %s:\n", kv.K))
dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
dataBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
if err != nil {
return err.Error()
}
Expand Down
6 changes: 3 additions & 3 deletions sstable/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ var ErrEmptySpan = errors.New("cannot copy empty span")
// decoded block handle value.
type indexEntry struct {
sep InternalKey
bh BlockHandleWithProperties
bh block.HandleWithProperties
}

// intersectingIndexEntries returns the entries from the index with separator
Expand All @@ -262,7 +262,7 @@ func intersectingIndexEntries(
var alloc bytealloc.A
res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
for kv := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = top.Next() {
bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
if err != nil {
return nil, err
}
Expand All @@ -285,7 +285,7 @@ func intersectingIndexEntries(
defer sub.Close() // in-loop, but it is a short loop.

for kv := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = sub.Next() {
bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions sstable/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/rowblk"
"github.com/cockroachdb/pebble/vfs"
)
Expand Down Expand Up @@ -434,7 +435,7 @@ func runIterCmd(
if twoLevelIter.topLevelIndex.Valid() {
fmt.Fprintf(&b, "| topLevelIndex.Key() = %q\n", twoLevelIter.topLevelIndex.Key())
v := twoLevelIter.topLevelIndex.Value()
bhp, err := decodeBlockHandleWithProperties(v.InPlaceValue())
bhp, err := block.DecodeHandleWithProperties(v.InPlaceValue())
if err != nil {
fmt.Fprintf(&b, "| topLevelIndex.InPlaceValue() failed to decode as BHP: %s\n", err)
} else {
Expand All @@ -449,7 +450,7 @@ func runIterCmd(
if si.index.Valid() {
fmt.Fprintf(&b, "| index.Key() = %q\n", si.index.Key())
v := si.index.Value()
bhp, err := decodeBlockHandleWithProperties(v.InPlaceValue())
bhp, err := block.DecodeHandleWithProperties(v.InPlaceValue())
if err != nil {
fmt.Fprintf(&b, "| index.InPlaceValue() failed to decode as BHP: %s\n", err)
} else {
Expand Down
13 changes: 1 addition & 12 deletions sstable/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@

package sstable

import (
"sync/atomic"

"github.com/cockroachdb/pebble/sstable/block"
)
import "sync/atomic"

// FilterMetrics holds metrics for the filter policy.
type FilterMetrics struct {
Expand Down Expand Up @@ -40,13 +36,6 @@ func (m *FilterMetricsTracker) Load() FilterMetrics {
}
}

// BlockHandleWithProperties is used for data blocks and first/lower level
// index blocks, since they can be annotated using BlockPropertyCollectors.
type BlockHandleWithProperties struct {
block.Handle
Props []byte
}

type filterWriter interface {
addKey(key []byte)
finish() ([]byte, error)
Expand Down
8 changes: 4 additions & 4 deletions sstable/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Layout struct {
// ValidateBlockChecksums, which validates a static list of BlockHandles
// referenced in this struct.

Data []BlockHandleWithProperties
Data []block.HandleWithProperties
Index []block.Handle
TopIndex block.Handle
Filter block.Handle
Expand Down Expand Up @@ -222,7 +222,7 @@ func (l *Layout) Describe(
case "index", "top-index":
iter, _ := rowblk.NewIter(r.Compare, r.Split, h.Get(), NoTransforms)
iter.Describe(w, b.Offset, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
bh, err := decodeBlockHandleWithProperties(value)
bh, err := block.DecodeHandleWithProperties(value)
if err != nil {
fmt.Fprintf(w, "%10d [err: %s]\n", b.Offset+uint64(enc.Offset), err)
return
Expand Down Expand Up @@ -256,7 +256,7 @@ func (l *Layout) Describe(
bh = vbih.h
isValueBlocksIndexHandle = true
} else {
bh, n = decodeBlockHandle(value)
bh, n = block.DecodeHandle(value)
}
if n == 0 || n != len(value) {
fmt.Fprintf(w, "%10d [err: %s]\n", enc.Offset, err)
Expand Down Expand Up @@ -480,7 +480,7 @@ func (w *layoutWriter) clearFromCache(offset uint64) {
}

func (w *layoutWriter) recordToMetaindex(key string, h block.Handle) {
n := encodeBlockHandle(w.tmp[:], h)
n := h.EncodeVarints(w.tmp[:])
w.recordToMetaindexRaw(key, w.tmp[:n])
}

Expand Down
21 changes: 9 additions & 12 deletions sstable/raw_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,12 +1283,12 @@ func (w *RawWriter) finishDataBlockProps(buf *dataBlockBuf) error {
// with the Writer client.
func (w *RawWriter) maybeAddBlockPropertiesToBlockHandle(
bh block.Handle,
) (BlockHandleWithProperties, error) {
) (block.HandleWithProperties, error) {
err := w.finishDataBlockProps(w.dataBlockBuf)
if err != nil {
return BlockHandleWithProperties{}, err
return block.HandleWithProperties{}, err
}
return BlockHandleWithProperties{Handle: bh, Props: w.dataBlockBuf.dataBlockProps}, nil
return block.HandleWithProperties{Handle: bh, Props: w.dataBlockBuf.dataBlockProps}, nil
}

func (w *RawWriter) indexEntrySep(
Expand Down Expand Up @@ -1321,7 +1321,7 @@ func (w *RawWriter) indexEntrySep(
// indexBlockBufs.
func (w *RawWriter) addIndexEntry(
sep InternalKey,
bhp BlockHandleWithProperties,
bhp block.HandleWithProperties,
tmp []byte,
flushIndexBuf *indexBlockBuf,
writeTo *indexBlockBuf,
Expand All @@ -1334,8 +1334,7 @@ func (w *RawWriter) addIndexEntry(
return nil
}

encoded := encodeBlockHandleWithProperties(tmp, bhp)

encoded := bhp.EncodeVarints(tmp)
if flushIndexBuf != nil {
if cap(w.indexPartitions) == 0 {
w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32)
Expand Down Expand Up @@ -1369,13 +1368,13 @@ func (w *RawWriter) addPrevDataBlockToIndexBlockProps() {
// TODO: Improve coverage of this method. e.g. tests passed without the line
// `w.twoLevelIndex = true` previously.
func (w *RawWriter) addIndexEntrySync(
prevKey, key InternalKey, bhp BlockHandleWithProperties, tmp []byte,
prevKey, key InternalKey, bhp block.HandleWithProperties, tmp []byte,
) error {
return w.addIndexEntrySep(w.indexEntrySep(prevKey, key, w.dataBlockBuf), bhp, tmp)
}

func (w *RawWriter) addIndexEntrySep(
sep InternalKey, bhp BlockHandleWithProperties, tmp []byte,
sep InternalKey, bhp block.HandleWithProperties, tmp []byte,
) error {
shouldFlush := supportsTwoLevelIndex(
w.tableFormat) && w.indexBlock.shouldFlush(
Expand Down Expand Up @@ -1592,12 +1591,10 @@ func (w *RawWriter) writeTwoLevelIndex() (block.Handle, error) {
if err != nil {
return block.Handle{}, err
}
bhp := BlockHandleWithProperties{
w.topLevelIndexBlock.Add(b.sep, block.HandleWithProperties{
Handle: bh,
Props: b.properties,
}
encoded := encodeBlockHandleWithProperties(w.blockBuf.tmp[:], bhp)
w.topLevelIndexBlock.Add(b.sep, encoded)
}.EncodeVarints(w.blockBuf.tmp[:]))
}

// NB: RocksDB includes the block trailer length in the index size
Expand Down
Loading

0 comments on commit 1a5295f

Please sign in to comment.