-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
backupccl: create backup compaction iterator
For the purposes of backup compaction, a custom iterator is required that behaves similarly to the ReadAsOfIterator, but also surfaces live tombstones point keys. Epic: none Release note: None
- Loading branch information
Showing
3 changed files
with
365 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
// Copyright 2022 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the CockroachDB Software License | ||
// included in the /LICENSE file. | ||
|
||
package storage | ||
|
||
import ( | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/util" | ||
"github.com/cockroachdb/cockroach/pkg/util/hlc" | ||
"github.com/cockroachdb/errors" | ||
) | ||
|
||
// BackupCompactionIterator wraps a SimpleMVCCIterator and only surfaces the | ||
// latest valid key of a given MVCC key, including point tombstones, at or below the | ||
// asOfTimestamp, if set. | ||
// | ||
// The iterator assumes that it will not encounter any write intents and that the | ||
// wrapped SimpleMVCCIterator *only* surfaces point keys. | ||
type BackupCompactionIterator struct { | ||
iter SimpleMVCCIterator | ||
|
||
// asOf is the latest timestamp of a key surfaced by the iterator. | ||
asOf hlc.Timestamp | ||
|
||
// valid tracks if the current key is valid | ||
valid bool | ||
|
||
// err tracks if iterating to the current key returned an error | ||
err error | ||
} | ||
|
||
var _ SimpleMVCCIterator = &BackupCompactionIterator{} | ||
|
||
func NewBackupCompactionIterator( | ||
iter SimpleMVCCIterator, asOf hlc.Timestamp, | ||
) *BackupCompactionIterator { | ||
if asOf.IsEmpty() { | ||
asOf = hlc.MaxTimestamp | ||
} | ||
return &BackupCompactionIterator{ | ||
iter: iter, | ||
asOf: asOf, | ||
} | ||
} | ||
|
||
func (f *BackupCompactionIterator) Close() { | ||
f.iter.Close() | ||
} | ||
|
||
// Next is identical to NextKey, as BackupCompactionIterator only surfaces live keys. | ||
func (f *BackupCompactionIterator) Next() { | ||
f.NextKey() | ||
} | ||
|
||
func (f *BackupCompactionIterator) NextKey() { | ||
f.iter.NextKey() | ||
f.advance() | ||
} | ||
|
||
func (f *BackupCompactionIterator) SeekGE(originalKey MVCCKey) { | ||
// See ReadAsOfIterator comment for explanation of this | ||
synthetic := MVCCKey{Key: originalKey.Key, Timestamp: f.asOf} | ||
f.iter.SeekGE(synthetic) | ||
if f.advance(); f.valid && f.UnsafeKey().Less(originalKey) { | ||
f.NextKey() | ||
} | ||
} | ||
|
||
func (f *BackupCompactionIterator) updateValid() bool { | ||
f.valid, f.err = f.iter.Valid() | ||
return f.valid | ||
} | ||
|
||
// advance moves past keys with timestamps later than f.asOf | ||
func (f *BackupCompactionIterator) advance() { | ||
for { | ||
if ok := f.updateValid(); !ok { | ||
return | ||
} | ||
if key := f.iter.UnsafeKey(); f.asOf.Less(key.Timestamp) { | ||
f.iter.Next() | ||
continue | ||
} | ||
return | ||
} | ||
} | ||
|
||
func (f *BackupCompactionIterator) UnsafeKey() MVCCKey { | ||
return f.iter.UnsafeKey() | ||
} | ||
|
||
func (f *BackupCompactionIterator) UnsafeValue() ([]byte, error) { | ||
return f.iter.UnsafeValue() | ||
} | ||
|
||
func (f *BackupCompactionIterator) Valid() (bool, error) { | ||
if util.RaceEnabled && f.valid { | ||
if err := f.assertInvariants(); err != nil { | ||
return false, err | ||
} | ||
} | ||
return f.valid, f.err | ||
} | ||
|
||
func (f *BackupCompactionIterator) MVCCValueLenAndIsTombstone() (int, bool, error) { | ||
return f.iter.MVCCValueLenAndIsTombstone() | ||
} | ||
|
||
func (f *BackupCompactionIterator) ValueLen() int { | ||
return f.iter.ValueLen() | ||
} | ||
|
||
func (f *BackupCompactionIterator) HasPointAndRange() (bool, bool) { | ||
hasPoint, hasRange := f.iter.HasPointAndRange() | ||
if hasRange { | ||
panic("unexpected range tombstone") | ||
} | ||
return hasPoint, hasRange | ||
} | ||
|
||
func (f *BackupCompactionIterator) RangeBounds() roachpb.Span { | ||
panic("BackupCompactionIterator does not operate on range keys") | ||
} | ||
|
||
func (f *BackupCompactionIterator) RangeKeys() MVCCRangeKeyStack { | ||
panic("BackupCompactionIterator does not operate on range keys") | ||
} | ||
|
||
func (f *BackupCompactionIterator) RangeKeyChanged() bool { | ||
panic("BackupCompactionIterator does not operate on range keys") | ||
} | ||
|
||
func (f *BackupCompactionIterator) assertInvariants() error { | ||
if err := assertSimpleMVCCIteratorInvariants(f); err != nil { | ||
return err | ||
} | ||
|
||
if f.asOf.IsEmpty() { | ||
return errors.AssertionFailedf("f.asOf is empty") | ||
} | ||
|
||
if ok, err := f.iter.Valid(); !ok || err != nil { | ||
errMsg := err.Error() | ||
return errors.AssertionFailedf("invalid underlying iter with err=%s", errMsg) | ||
} | ||
|
||
key := f.UnsafeKey() | ||
if key.Timestamp.IsEmpty() { | ||
return errors.AssertionFailedf("emitted key %s has no timestamp", key) | ||
} | ||
if f.asOf.Less(key.Timestamp) { | ||
return errors.AssertionFailedf("emitted key %s above asOf timestamp %s", key, f.asOf) | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
// Copyright 2022 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the CockroachDB Software License | ||
// included in the /LICENSE file. | ||
|
||
package storage | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/settings/cluster" | ||
"github.com/cockroachdb/cockroach/pkg/util/hlc" | ||
"github.com/cockroachdb/cockroach/pkg/util/leaktest" | ||
"github.com/cockroachdb/cockroach/pkg/util/log" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestBackupCompactionIterator(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
defer log.Scope(t).Close(t) | ||
|
||
pebble, err := Open(context.Background(), InMemory(), | ||
cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */)) | ||
require.NoError(t, err) | ||
defer pebble.Close() | ||
|
||
// The test turns each `input` into a batch for the readAsOfIterator, fully | ||
// iterates the iterator, and puts the surfaced keys into a string in the same | ||
// format as `input`. The test then compares the output to 'expectedNextKey'. | ||
// The 'asOf' field represents the wall time of the hlc.Timestamp for the | ||
// readAsOfIterator. | ||
tests := []asOfTest{ | ||
// Ensure vanilla iteration works, surfacing latest values with no AOST. | ||
{input: "a1b1", expectedNextKey: "a1b1", asOf: ""}, | ||
{input: "a2a1", expectedNextKey: "a2", asOf: ""}, | ||
{input: "a1b2b1", expectedNextKey: "a1b2", asOf: ""}, | ||
{input: "a2Xa1", expectedNextKey: "a2X", asOf: ""}, | ||
{input: "a1b2Xb1", expectedNextKey: "a1b2X", asOf: ""}, | ||
|
||
// Ensure vanilla iterations works with provided AOST | ||
{input: "a1b1", expectedNextKey: "a1b1", asOf: "1"}, | ||
{input: "a1b1", expectedNextKey: "a1b1", asOf: "2"}, | ||
{input: "a1b2b1", expectedNextKey: "a1b2", asOf: "2"}, | ||
{input: "a1b1X", expectedNextKey: "a1b1X", asOf: "2"}, | ||
{input: "a1b2Xb1", expectedNextKey: "a1b2X", asOf: "2"}, | ||
|
||
// Skipping keys with AOST. | ||
{input: "a2a1", expectedNextKey: "a1", asOf: "1"}, | ||
{input: "a1b2b1", expectedNextKey: "a1b1", asOf: "1"}, | ||
|
||
// Double skip within keys. | ||
{input: "b3b2b1", expectedNextKey: "b1", asOf: "1"}, | ||
|
||
// Double skip across keys. | ||
{input: "b2c2c1", expectedNextKey: "c1", asOf: "1"}, | ||
|
||
// Skipping tombstones with AOST. | ||
{input: "a1b2Xb1", expectedNextKey: "a1b1", asOf: "1"}, | ||
{input: "a2Xa1b2Xb1c2Xc1", expectedNextKey: "a1b1c1", asOf: "1"}, | ||
|
||
// Skipping under tombstone to land on another tombstone | ||
{input: "a2Xa1b2b1X", expectedNextKey: "a1b1X", asOf: "1"}, | ||
|
||
// Ensure next key captures at most one mvcc key per key after an asOf skip. | ||
{input: "b3c2c1", expectedNextKey: "c2", asOf: "2"}, | ||
|
||
// Ensure clean iteration over double tombstone. | ||
{input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb2Xc1", asOf: ""}, | ||
{input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb1c1", asOf: "1"}, | ||
|
||
// Ensure key before delete tombstone gets read if under AOST. | ||
{input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: ""}, | ||
{input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: "2"}, | ||
} | ||
|
||
for i, test := range tests { | ||
name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) | ||
t.Run(name, func(t *testing.T) { | ||
batch := pebble.NewBatch() | ||
defer batch.Close() | ||
populateBatch(t, batch, test.input) | ||
iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) | ||
require.NoError(t, err) | ||
defer iter.Close() | ||
|
||
subtests := []iterSubtest{ | ||
{"Next", test.expectedNextKey, SimpleMVCCIterator.Next}, | ||
{"NextKey", test.expectedNextKey, SimpleMVCCIterator.NextKey}, | ||
} | ||
for _, subtest := range subtests { | ||
t.Run(subtest.name, func(t *testing.T) { | ||
asOf := hlc.Timestamp{} | ||
if test.asOf != "" { | ||
asOf.WallTime = int64(test.asOf[0]) | ||
} | ||
it := NewBackupCompactionIterator(iter, asOf) | ||
iterateSimpleMVCCIterator(t, it, subtest) | ||
}) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestBackupCompactionIteratorSeek(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
defer log.Scope(t).Close(t) | ||
|
||
pebble, err := Open(context.Background(), InMemory(), | ||
cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */)) | ||
require.NoError(t, err) | ||
defer pebble.Close() | ||
|
||
tests := []struct { | ||
input string | ||
seekKey string | ||
expected string | ||
asOf string | ||
}{ | ||
// Ensure vanilla seek works. | ||
{"a1b1", "a1", "a1", ""}, | ||
|
||
// Ensure seek always returns the latest key AOST of an MVCC key. | ||
{"a2a1b1", "a1", "b1", ""}, | ||
{"a2a1b1", "a1", "b1", "2"}, | ||
{"a2a1b1", "a1", "a1", "1"}, | ||
|
||
// Seeking above all keys will return the latest key AOST of an MVCC key. | ||
{"a2a1b3", "a8", "a2", ""}, | ||
{"a2a1b3", "a8", "a1", "1"}, | ||
{"a2a1b3X", "b8", "b3X", ""}, | ||
{"a2a1b3Xb2", "b8", "b2", "2"}, | ||
|
||
// Ensure out of bounds seek fails gracefully. | ||
{"a1", "b1", "notOK", ""}, | ||
|
||
// Ensure the asOf timestamp moves the iterator during a seek. | ||
{"a2a1", "a2", "a1", "1"}, | ||
{"a2b1", "a2", "b1", "1"}, | ||
|
||
// Ensure seek will return on a tombstone if it is the latest key. | ||
{"a3Xa1b1", "a3", "a3X", ""}, | ||
{"a3Xa1c2Xc1", "b1", "c2X", ""}, | ||
|
||
// Ensure seek will only return the latest key AOST | ||
{"a3Xa2a1b1", "a2", "b1", ""}, | ||
{"a3Xa2a1b1", "a2", "b1", "3"}, | ||
{"a3a2Xa1b1", "a1", "b1", ""}, | ||
{"a3a2Xa1b2Xb1c1", "a1", "b2X", ""}, | ||
|
||
// Ensure we can seek to a key right before a tombstone. | ||
{"a2Xa1b2b1Xc1", "a1", "b2", ""}, | ||
|
||
// Ensure seek on a key above the AOST returns the correct key AOST. | ||
{"a3a2Xa1b3Xb2b1", "b3", "b2", "2"}, | ||
{"a3a2Xa1b3Xb2b1", "b8", "b3X", "3"}, | ||
|
||
// Ensure seeking on a key on AOST returns that key | ||
{"a3a2Xa1b3Xb2b1", "a2", "a2X", "2"}, | ||
{"a3a2Xa1b3Xb2b1", "b2", "b2", "2"}, | ||
} | ||
for i, test := range tests { | ||
name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) | ||
t.Run(name, func(t *testing.T) { | ||
batch := pebble.NewBatch() | ||
defer batch.Close() | ||
populateBatch(t, batch, test.input) | ||
iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) | ||
require.NoError(t, err) | ||
defer iter.Close() | ||
|
||
asOf := hlc.Timestamp{} | ||
if test.asOf != "" { | ||
asOf.WallTime = int64(test.asOf[0]) | ||
} | ||
it := NewBackupCompactionIterator(iter, asOf) | ||
var output bytes.Buffer | ||
|
||
seekKey := MVCCKey{ | ||
Key: []byte{test.seekKey[0]}, | ||
Timestamp: hlc.Timestamp{WallTime: int64(test.seekKey[1])}, | ||
} | ||
it.SeekGE(seekKey) | ||
ok, err := it.Valid() | ||
require.NoError(t, err) | ||
if !ok { | ||
if test.expected == "notOK" { | ||
return | ||
} | ||
require.NoError(t, err, "seek not ok") | ||
} | ||
output.Write(it.UnsafeKey().Key) | ||
output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime)) | ||
v, err := DecodeMVCCValueAndErr(it.UnsafeValue()) | ||
require.NoError(t, err) | ||
if v.IsTombstone() { | ||
output.WriteRune('X') | ||
} | ||
require.Equal(t, test.expected, output.String()) | ||
}) | ||
} | ||
} |