Skip to content

Commit

Permalink
backupccl: create backup compaction iterator
Browse files Browse the repository at this point in the history
For the purposes of backup compaction, a custom iterator is required
that behaves similarly to the ReadAsOfIterator, but also surfaces live
tombstones point keys.

Epic: none

Release note: None
  • Loading branch information
kev-cao committed Dec 24, 2024
1 parent 026d669 commit c2a68e0
Show file tree
Hide file tree
Showing 3 changed files with 368 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"array_32bit.go",
"array_64bit.go",
"backup_compaction_iterator.go",
"ballast.go",
"batch.go",
"col_mvcc.go",
Expand Down Expand Up @@ -118,6 +119,7 @@ go_test(
name = "storage_test",
size = "medium",
srcs = [
"backup_compaction_iterator_test.go",
"ballast_test.go",
"batch_test.go",
"bench_cloud_io_test.go",
Expand Down
157 changes: 157 additions & 0 deletions pkg/storage/backup_compaction_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package storage

import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

// BackupCompactionIterator wraps a SimpleMVCCIterator and only surfaces the
// latest valid key of a given MVCC key, including point tombstones, at or below the
// asOfTimestamp, if set.
//
// The iterator assumes that it will not encounter any write intents and that the
// wrapped SimpleMVCCIterator *only* surfaces point keys.
type BackupCompactionIterator struct {
iter SimpleMVCCIterator

// asOf is the latest timestamp of a key surfaced by the iterator.
asOf hlc.Timestamp

// valid tracks if the current key is valid.
valid bool

// err tracks if iterating to the current key returned an error.
err error
}

var _ SimpleMVCCIterator = &BackupCompactionIterator{}

// NewBackupCompactionIterator creates a new BackupCompactionIterator. The asOf timestamp cannot be empty.
func NewBackupCompactionIterator(
iter SimpleMVCCIterator, asOf hlc.Timestamp,
) (*BackupCompactionIterator, error) {
if asOf.IsEmpty() {
return nil, errors.New("asOf timestamp cannot be empty")
}
return &BackupCompactionIterator{
iter: iter,
asOf: asOf,
}, nil
}

func (f *BackupCompactionIterator) Close() {
f.iter.Close()
}

// Next is identical to NextKey, as BackupCompactionIterator only surfaces live keys.
func (f *BackupCompactionIterator) Next() {
f.NextKey()
}

func (f *BackupCompactionIterator) NextKey() {
f.iter.NextKey()
f.advance()
}

func (f *BackupCompactionIterator) SeekGE(originalKey MVCCKey) {
// See ReadAsOfIterator comment for explanation of this.
synthetic := MVCCKey{Key: originalKey.Key, Timestamp: f.asOf}
f.iter.SeekGE(synthetic)
if f.advance(); f.valid && f.UnsafeKey().Less(originalKey) {
f.NextKey()
}
}

func (f *BackupCompactionIterator) updateValid() bool {
f.valid, f.err = f.iter.Valid()
return f.valid
}

// advance moves past keys with timestamps later than f.asOf.
func (f *BackupCompactionIterator) advance() {
for {
if ok := f.updateValid(); !ok {
return
}
if key := f.iter.UnsafeKey(); f.asOf.Less(key.Timestamp) {
f.iter.Next()
continue
}
return
}
}

func (f *BackupCompactionIterator) UnsafeKey() MVCCKey {
return f.iter.UnsafeKey()
}

func (f *BackupCompactionIterator) UnsafeValue() ([]byte, error) {
return f.iter.UnsafeValue()
}

func (f *BackupCompactionIterator) Valid() (bool, error) {
if util.RaceEnabled && f.valid {
if err := f.assertInvariants(); err != nil {
return false, err
}
}
return f.valid, f.err
}

func (f *BackupCompactionIterator) MVCCValueLenAndIsTombstone() (int, bool, error) {
return f.iter.MVCCValueLenAndIsTombstone()
}

func (f *BackupCompactionIterator) ValueLen() int {
return f.iter.ValueLen()
}

func (f *BackupCompactionIterator) HasPointAndRange() (bool, bool) {
hasPoint, hasRange := f.iter.HasPointAndRange()
if hasRange {
panic("unexpected range tombstone")
}
return hasPoint, hasRange
}

func (f *BackupCompactionIterator) RangeBounds() roachpb.Span {
return roachpb.Span{}
}

func (f *BackupCompactionIterator) RangeKeys() MVCCRangeKeyStack {
return MVCCRangeKeyStack{}
}

func (f *BackupCompactionIterator) RangeKeyChanged() bool {
return false
}

// assertInvariants checks that the iterator is in a valid state, but first assumes that the underlying iterator
// has already been validated and is in a valid state.
func (f *BackupCompactionIterator) assertInvariants() error {
if err := assertSimpleMVCCIteratorInvariants(f); err != nil {
return err
}

if ok, err := f.iter.Valid(); !ok || err != nil {
errMsg := err.Error()
return errors.AssertionFailedf("invalid underlying iter with err=%s", errMsg)
}

key := f.UnsafeKey()
if key.Timestamp.IsEmpty() {
return errors.AssertionFailedf("emitted key %s has no timestamp", key)
}
if f.asOf.Less(key.Timestamp) {
return errors.AssertionFailedf("emitted key %s above asOf timestamp %s", key, f.asOf)
}

return nil
}
209 changes: 209 additions & 0 deletions pkg/storage/backup_compaction_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package storage

import (
"bytes"
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestBackupCompactionIterator(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

pebble, err := Open(context.Background(), InMemory(),
cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */))
require.NoError(t, err)
defer pebble.Close()

// The test turns each `input` into a batch for the readAsOfIterator, fully
// iterates the iterator, and puts the surfaced keys into a string in the same
// format as `input`. The test then compares the output to 'expectedNextKey'.
// The 'asOf' field represents the wall time of the hlc.Timestamp for the
// readAsOfIterator.
tests := []asOfTest{
// Ensure vanilla iteration works, surfacing latest values with no AOST.
{input: "a1b1", expectedNextKey: "a1b1", asOf: ""},
{input: "a2a1", expectedNextKey: "a2", asOf: ""},
{input: "a1b2b1", expectedNextKey: "a1b2", asOf: ""},
{input: "a2Xa1", expectedNextKey: "a2X", asOf: ""},
{input: "a1b2Xb1", expectedNextKey: "a1b2X", asOf: ""},

// Ensure vanilla iterations works with provided AOST
{input: "a1b1", expectedNextKey: "a1b1", asOf: "1"},
{input: "a1b1", expectedNextKey: "a1b1", asOf: "2"},
{input: "a1b2b1", expectedNextKey: "a1b2", asOf: "2"},
{input: "a1b1X", expectedNextKey: "a1b1X", asOf: "2"},
{input: "a1b2Xb1", expectedNextKey: "a1b2X", asOf: "2"},

// Skipping keys with AOST.
{input: "a2a1", expectedNextKey: "a1", asOf: "1"},
{input: "a1b2b1", expectedNextKey: "a1b1", asOf: "1"},

// Double skip within keys.
{input: "b3b2b1", expectedNextKey: "b1", asOf: "1"},

// Double skip across keys.
{input: "b2c2c1", expectedNextKey: "c1", asOf: "1"},

// Skipping tombstones with AOST.
{input: "a1b2Xb1", expectedNextKey: "a1b1", asOf: "1"},
{input: "a2Xa1b2Xb1c2Xc1", expectedNextKey: "a1b1c1", asOf: "1"},

// Skipping under tombstone to land on another tombstone
{input: "a2Xa1b2b1X", expectedNextKey: "a1b1X", asOf: "1"},

// Ensure next key captures at most one mvcc key per key after an asOf skip.
{input: "b3c2c1", expectedNextKey: "c2", asOf: "2"},

// Ensure clean iteration over double tombstone.
{input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb2Xc1", asOf: ""},
{input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb1c1", asOf: "1"},

// Ensure key before delete tombstone gets read if under AOST.
{input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: ""},
{input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: "2"},
}

for i, test := range tests {
name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf)
t.Run(name, func(t *testing.T) {
batch := pebble.NewBatch()
defer batch.Close()
populateBatch(t, batch, test.input)
iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax})
require.NoError(t, err)
defer iter.Close()

subtests := []iterSubtest{
{"Next", test.expectedNextKey, SimpleMVCCIterator.Next},
{"NextKey", test.expectedNextKey, SimpleMVCCIterator.NextKey},
}
for _, subtest := range subtests {
t.Run(subtest.name, func(t *testing.T) {
asOf := hlc.Timestamp{}
if test.asOf != "" {
asOf.WallTime = int64(test.asOf[0])
} else {
asOf.Logical = 1
}
it, err := NewBackupCompactionIterator(iter, asOf)
require.NoError(t, err)
iterateSimpleMVCCIterator(t, it, subtest)
})
}
})
}
}

func TestBackupCompactionIteratorSeek(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

pebble, err := Open(context.Background(), InMemory(),
cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */))
require.NoError(t, err)
defer pebble.Close()

tests := []struct {
input string
seekKey string
expected string
asOf string
}{
// Ensure vanilla seek works.
{"a1b1", "a1", "a1", ""},

// Ensure seek always returns the latest key AOST of an MVCC key.
{"a2a1b1", "a1", "b1", ""},
{"a2a1b1", "a1", "b1", "2"},
{"a2a1b1", "a1", "a1", "1"},

// Seeking above all keys will return the latest key AOST of an MVCC key.
{"a2a1b3", "a8", "a2", ""},
{"a2a1b3", "a8", "a1", "1"},
{"a2a1b3X", "b8", "b3X", ""},
{"a2a1b3Xb2", "b8", "b2", "2"},

// Ensure out of bounds seek fails gracefully.
{"a1", "b1", "notOK", ""},

// Ensure the asOf timestamp moves the iterator during a seek.
{"a2a1", "a2", "a1", "1"},
{"a2b1", "a2", "b1", "1"},

// Ensure seek will return on a tombstone if it is the latest key.
{"a3Xa1b1", "a3", "a3X", ""},
{"a3Xa1c2Xc1", "b1", "c2X", ""},

// Ensure seek will only return the latest key AOST
{"a3Xa2a1b1", "a2", "b1", ""},
{"a3Xa2a1b1", "a2", "b1", "3"},
{"a3a2Xa1b1", "a1", "b1", ""},
{"a3a2Xa1b2Xb1c1", "a1", "b2X", ""},

// Ensure we can seek to a key right before a tombstone.
{"a2Xa1b2b1Xc1", "a1", "b2", ""},

// Ensure seek on a key above the AOST returns the correct key AOST.
{"a3a2Xa1b3Xb2b1", "b3", "b2", "2"},
{"a3a2Xa1b3Xb2b1", "b3", "b1", "1"},

// Ensure seeking on a key on AOST returns that key
{"a3a2Xa1b3Xb2b1", "a2", "a2X", "2"},
{"a3a2Xa1b3Xb2b1", "b2", "b2", "2"},
}
for i, test := range tests {
name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf)
t.Run(name, func(t *testing.T) {
batch := pebble.NewBatch()
defer batch.Close()
populateBatch(t, batch, test.input)
iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax})
require.NoError(t, err)
defer iter.Close()

asOf := hlc.Timestamp{}
if test.asOf != "" {
asOf.WallTime = int64(test.asOf[0])
}
it, err := NewBackupCompactionIterator(iter, asOf)
require.NoError(t, err)
var output bytes.Buffer

seekKey := MVCCKey{
Key: []byte{test.seekKey[0]},
Timestamp: hlc.Timestamp{WallTime: int64(test.seekKey[1])},
}
it.SeekGE(seekKey)
ok, err := it.Valid()
require.NoError(t, err)
if !ok {
if test.expected == "notOK" {
return
}
require.NoError(t, err, "seek not ok")
}
output.Write(it.UnsafeKey().Key)
output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime))
v, err := DecodeMVCCValueAndErr(it.UnsafeValue())
require.NoError(t, err)
if v.IsTombstone() {
output.WriteRune('X')
}
require.Equal(t, test.expected, output.String())
})
}
}

0 comments on commit c2a68e0

Please sign in to comment.