Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: create backup compaction iterator #137529

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"array_32bit.go",
"array_64bit.go",
"backup_compaction_iterator.go",
"ballast.go",
"batch.go",
"col_mvcc.go",
Expand Down Expand Up @@ -118,6 +119,7 @@ go_test(
name = "storage_test",
size = "medium",
srcs = [
"backup_compaction_iterator_test.go",
"ballast_test.go",
"batch_test.go",
"bench_cloud_io_test.go",
Expand Down
157 changes: 157 additions & 0 deletions pkg/storage/backup_compaction_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package storage

import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

// BackupCompactionIterator wraps a SimpleMVCCIterator and only surfaces the
// latest valid key of a given MVCC key, including point tombstones, at or below the
// asOfTimestamp, if set.
//
// The iterator assumes that it will not encounter any write intents and that the
// wrapped SimpleMVCCIterator *only* surfaces point keys.
type BackupCompactionIterator struct {
iter SimpleMVCCIterator

// asOf is the latest timestamp of a key surfaced by the iterator.
asOf hlc.Timestamp

// valid tracks if the current key is valid.
valid bool

// err tracks if iterating to the current key returned an error.
err error
}

var _ SimpleMVCCIterator = &BackupCompactionIterator{}

// NewBackupCompactionIterator creates a new BackupCompactionIterator. The asOf timestamp cannot be empty.
func NewBackupCompactionIterator(
iter SimpleMVCCIterator, asOf hlc.Timestamp,
) (*BackupCompactionIterator, error) {
if asOf.IsEmpty() {
return nil, errors.New("asOf timestamp cannot be empty")
}
return &BackupCompactionIterator{
iter: iter,
asOf: asOf,
}, nil
}

func (f *BackupCompactionIterator) Close() {
f.iter.Close()
}

// Next is identical to NextKey, as BackupCompactionIterator only surfaces live keys.
func (f *BackupCompactionIterator) Next() {
kev-cao marked this conversation as resolved.
Show resolved Hide resolved
f.NextKey()
}

func (f *BackupCompactionIterator) NextKey() {
f.iter.NextKey()
f.advance()
}

func (f *BackupCompactionIterator) SeekGE(originalKey MVCCKey) {
// See ReadAsOfIterator comment for explanation of this.
synthetic := MVCCKey{Key: originalKey.Key, Timestamp: f.asOf}
f.iter.SeekGE(synthetic)
if f.advance(); f.valid && f.UnsafeKey().Less(originalKey) {
f.NextKey()
kev-cao marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (f *BackupCompactionIterator) updateValid() bool {
f.valid, f.err = f.iter.Valid()
return f.valid
}

// advance moves past keys with timestamps later than f.asOf.
func (f *BackupCompactionIterator) advance() {
kev-cao marked this conversation as resolved.
Show resolved Hide resolved
for {
if ok := f.updateValid(); !ok {
return
}
if key := f.iter.UnsafeKey(); f.asOf.Less(key.Timestamp) {
f.iter.Next()
continue
}
return
}
}

func (f *BackupCompactionIterator) UnsafeKey() MVCCKey {
return f.iter.UnsafeKey()
}

func (f *BackupCompactionIterator) UnsafeValue() ([]byte, error) {
return f.iter.UnsafeValue()
}

func (f *BackupCompactionIterator) Valid() (bool, error) {
if util.RaceEnabled && f.valid {
if err := f.assertInvariants(); err != nil {
return false, err
}
}
return f.valid, f.err
}

func (f *BackupCompactionIterator) MVCCValueLenAndIsTombstone() (int, bool, error) {
return f.iter.MVCCValueLenAndIsTombstone()
}

func (f *BackupCompactionIterator) ValueLen() int {
return f.iter.ValueLen()
}

func (f *BackupCompactionIterator) HasPointAndRange() (bool, bool) {
hasPoint, hasRange := f.iter.HasPointAndRange()
if hasRange {
panic("unexpected range tombstone")
}
return hasPoint, hasRange
}

func (f *BackupCompactionIterator) RangeBounds() roachpb.Span {
return roachpb.Span{}
}

func (f *BackupCompactionIterator) RangeKeys() MVCCRangeKeyStack {
return MVCCRangeKeyStack{}
}

func (f *BackupCompactionIterator) RangeKeyChanged() bool {
return false
}

// assertInvariants checks that the iterator is in a valid state, but first assumes that the underlying iterator
// has already been validated and is in a valid state.
func (f *BackupCompactionIterator) assertInvariants() error {
if err := assertSimpleMVCCIteratorInvariants(f); err != nil {
return err
}

if ok, err := f.iter.Valid(); !ok || err != nil {
errMsg := err.Error()
return errors.AssertionFailedf("invalid underlying iter with err=%s", errMsg)
}

key := f.UnsafeKey()
if key.Timestamp.IsEmpty() {
return errors.AssertionFailedf("emitted key %s has no timestamp", key)
}
if f.asOf.Less(key.Timestamp) {
return errors.AssertionFailedf("emitted key %s above asOf timestamp %s", key, f.asOf)
}

return nil
}
211 changes: 211 additions & 0 deletions pkg/storage/backup_compaction_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package storage

import (
"bytes"
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestBackupCompactionIterator(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

pebble, err := Open(context.Background(), InMemory(),
cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */))
require.NoError(t, err)
defer pebble.Close()

// The test turns each `input` into a batch for the readAsOfIterator, fully
// iterates the iterator, and puts the surfaced keys into a string in the same
// format as `input`. The test then compares the output to 'expectedNextKey'.
// The 'asOf' field represents the wall time of the hlc.Timestamp for the
// readAsOfIterator.
tests := []asOfTest{
// Ensure vanilla iteration works, surfacing latest values with no AOST.
{input: "a1b1", expectedNextKey: "a1b1", asOf: ""},
{input: "a2a1", expectedNextKey: "a2", asOf: ""},
{input: "a1b2b1", expectedNextKey: "a1b2", asOf: ""},
{input: "a2Xa1", expectedNextKey: "a2X", asOf: ""},
{input: "a1b2Xb1", expectedNextKey: "a1b2X", asOf: ""},

// Ensure vanilla iterations works with provided AOST
{input: "a1b1", expectedNextKey: "a1b1", asOf: "1"},
{input: "a1b1", expectedNextKey: "a1b1", asOf: "2"},
{input: "a1b2b1", expectedNextKey: "a1b2", asOf: "2"},
{input: "a1b1X", expectedNextKey: "a1b1X", asOf: "2"},
{input: "a1b2Xb1", expectedNextKey: "a1b2X", asOf: "2"},

// Skipping keys with AOST.
{input: "a2a1", expectedNextKey: "a1", asOf: "1"},
{input: "a1b2b1", expectedNextKey: "a1b1", asOf: "1"},

// Double skip within keys.
{input: "b3b2b1", expectedNextKey: "b1", asOf: "1"},

// Double skip across keys.
{input: "b2c2c1", expectedNextKey: "c1", asOf: "1"},

// Skipping tombstones with AOST.
{input: "a1b2Xb1", expectedNextKey: "a1b1", asOf: "1"},
{input: "a2Xa1b2Xb1c2Xc1", expectedNextKey: "a1b1c1", asOf: "1"},

// Skipping under tombstone to land on another tombstone
{input: "a2Xa1b2b1X", expectedNextKey: "a1b1X", asOf: "1"},

// Ensure next key captures at most one mvcc key per key after an asOf skip.
{input: "b3c2c1", expectedNextKey: "c2", asOf: "2"},

// Ensure clean iteration over double tombstone.
{input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb2Xc1", asOf: ""},
{input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb1c1", asOf: "1"},

// Ensure key before delete tombstone gets read if under AOST.
{input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: ""},
{input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: "2"},
}

for i, test := range tests {
name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf)
t.Run(name, func(t *testing.T) {
batch := pebble.NewBatch()
defer batch.Close()
populateBatch(t, batch, test.input)
iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax})
require.NoError(t, err)
defer iter.Close()

subtests := []iterSubtest{
{"Next", test.expectedNextKey, SimpleMVCCIterator.Next},
{"NextKey", test.expectedNextKey, SimpleMVCCIterator.NextKey},
kev-cao marked this conversation as resolved.
Show resolved Hide resolved
}
for _, subtest := range subtests {
t.Run(subtest.name, func(t *testing.T) {
asOf := hlc.Timestamp{}
if test.asOf != "" {
asOf.WallTime = int64(test.asOf[0])
} else {
asOf = hlc.MaxTimestamp
}
it, err := NewBackupCompactionIterator(iter, asOf)
require.NoError(t, err)
iterateSimpleMVCCIterator(t, it, subtest)
})
}
})
}
}

func TestBackupCompactionIteratorSeek(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

pebble, err := Open(context.Background(), InMemory(),
cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */))
require.NoError(t, err)
defer pebble.Close()

tests := []struct {
input string
seekKey string
expected string
asOf string
}{
// Ensure vanilla seek works.
{"a1b1", "a1", "a1", ""},

// Ensure seek always returns the latest key AOST of an MVCC key.
{"a2a1b1", "a1", "b1", ""},
{"a2a1b1", "a1", "b1", "2"},
{"a2a1b1", "a1", "a1", "1"},

// Seeking above all keys will return the latest key AOST of an MVCC key.
{"a2a1b3", "a8", "a2", ""},
{"a2a1b3", "a8", "a1", "1"},
{"a2a1b3X", "b8", "b3X", ""},
{"a2a1b3Xb2", "b8", "b2", "2"},

// Ensure out of bounds seek fails gracefully.
{"a1", "b1", "notOK", ""},

// Ensure the asOf timestamp moves the iterator during a seek.
{"a2a1", "a2", "a1", "1"},
{"a2b1", "a2", "b1", "1"},

// Ensure seek will return on a tombstone if it is the latest key.
{"a3Xa1b1", "a3", "a3X", ""},
{"a3Xa1c2Xc1", "b1", "c2X", ""},

// Ensure seek will only return the latest key AOST
{"a3Xa2a1b1", "a2", "b1", ""},
{"a3Xa2a1b1", "a2", "b1", "3"},
kev-cao marked this conversation as resolved.
Show resolved Hide resolved
{"a3a2Xa1b1", "a1", "b1", ""},
{"a3a2Xa1b2Xb1c1", "a1", "b2X", ""},

// Ensure we can seek to a key right before a tombstone.
{"a2Xa1b2b1Xc1", "a1", "b2", ""},

// Ensure seek on a key above the AOST returns the correct key AOST.
{"a3a2Xa1b3Xb2b1", "b3", "b2", "2"},
{"a3a2Xa1b3Xb2b1", "b3", "b1", "1"},

// Ensure seeking on a key on AOST returns that key
{"a3a2Xa1b3Xb2b1", "a2", "a2X", "2"},
{"a3a2Xa1b3Xb2b1", "b2", "b2", "2"},
}
for i, test := range tests {
name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf)
t.Run(name, func(t *testing.T) {
batch := pebble.NewBatch()
defer batch.Close()
populateBatch(t, batch, test.input)
iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax})
require.NoError(t, err)
defer iter.Close()

asOf := hlc.Timestamp{}
if test.asOf != "" {
asOf.WallTime = int64(test.asOf[0])
} else {
asOf = hlc.MaxTimestamp
}
it, err := NewBackupCompactionIterator(iter, asOf)
require.NoError(t, err)
var output bytes.Buffer

seekKey := MVCCKey{
Key: []byte{test.seekKey[0]},
Timestamp: hlc.Timestamp{WallTime: int64(test.seekKey[1])},
}
it.SeekGE(seekKey)
ok, err := it.Valid()
require.NoError(t, err)
if !ok {
if test.expected == "notOK" {
return
}
require.NoError(t, err, "seek not ok")
}
output.Write(it.UnsafeKey().Key)
output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime))
v, err := DecodeMVCCValueAndErr(it.UnsafeValue())
require.NoError(t, err)
if v.IsTombstone() {
output.WriteRune('X')
}
require.Equal(t, test.expected, output.String())
})
}
}
Loading