-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
backupccl: create backup compaction iterator
For the purposes of backup compaction, a custom iterator is required that behaves similarly to the ReadAsOfIterator, but also surfaces live tombstones point keys. Epic: none Release note: None
- Loading branch information
Showing
2 changed files
with
360 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
// Copyright 2024 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package storage | ||
|
||
import ( | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/util" | ||
"github.com/cockroachdb/cockroach/pkg/util/hlc" | ||
"github.com/cockroachdb/errors" | ||
) | ||
|
||
// BackupCompactionIterator wraps a SimpleMVCCIterator and only surfaces the | ||
// latest valid key of a given MVCC key that is also below the asOfTimestamp, if | ||
// set. The iterator does surface point tombstones, but not any MVCC keys | ||
// shadowed by tombstones below the asOfTimestamp, if set. The iterator assumes | ||
// that it will not encounter any write intents. | ||
type BackupCompactionIterator struct { | ||
iter SimpleMVCCIterator | ||
|
||
// asOf is the latest timestamp of a key surfaced by the iterator. | ||
asOf hlc.Timestamp | ||
|
||
// valid tracks if the current key is valid | ||
valid bool | ||
|
||
// err tracks if iterating to the current key returned an error | ||
err error | ||
|
||
// newestRangeTombstone contains the timestamp of the latest range | ||
// tombstone below asOf at the current position, if any. | ||
newestRangeTombstone hlc.Timestamp | ||
} | ||
|
||
var _ SimpleMVCCIterator = &BackupCompactionIterator{} | ||
|
||
func NewBackupCompactionIterator( | ||
iter SimpleMVCCIterator, asOf hlc.Timestamp, | ||
) *BackupCompactionIterator { | ||
if asOf.IsEmpty() { | ||
asOf = hlc.MaxTimestamp | ||
} | ||
return &BackupCompactionIterator{ | ||
iter: iter, | ||
asOf: asOf, | ||
} | ||
} | ||
|
||
func (f *BackupCompactionIterator) Close() { | ||
f.iter.Close() | ||
} | ||
|
||
func (f *BackupCompactionIterator) Next() { | ||
f.NextKey() | ||
} | ||
|
||
func (f *BackupCompactionIterator) NextKey() { | ||
f.iter.NextKey() | ||
f.advance() | ||
} | ||
|
||
func (f *BackupCompactionIterator) SeekGE(originalKey MVCCKey) { | ||
synthetic := MVCCKey{Key: originalKey.Key, Timestamp: f.asOf} | ||
f.iter.SeekGE(synthetic) | ||
if f.advance(); f.valid && f.UnsafeKey().Less(originalKey) { | ||
f.NextKey() | ||
} | ||
} | ||
|
||
func (f *BackupCompactionIterator) updateValid() bool { | ||
f.valid, f.err = f.iter.Valid() | ||
return f.valid | ||
} | ||
|
||
func (f *BackupCompactionIterator) advance() { | ||
for { | ||
if ok := f.updateValid(); !ok { | ||
return | ||
} | ||
if key := f.iter.UnsafeKey(); f.asOf.Less(key.Timestamp) { | ||
f.iter.Next() | ||
continue | ||
} | ||
return | ||
} | ||
} | ||
|
||
func (f *BackupCompactionIterator) UnsafeKey() MVCCKey { | ||
return f.iter.UnsafeKey() | ||
} | ||
|
||
func (f *BackupCompactionIterator) UnsafeValue() ([]byte, error) { | ||
return f.iter.UnsafeValue() | ||
} | ||
|
||
func (f *BackupCompactionIterator) Valid() (bool, error) { | ||
if util.RaceEnabled && f.valid { | ||
if err := f.assertInvariants(); err != nil { | ||
return false, err | ||
} | ||
} | ||
return f.valid, f.err | ||
} | ||
|
||
func (f *BackupCompactionIterator) MVCCValueLenAndIsTombstone() (int, bool, error) { | ||
return f.iter.MVCCValueLenAndIsTombstone() | ||
} | ||
|
||
func (f *BackupCompactionIterator) ValueLen() int { | ||
return f.iter.ValueLen() | ||
} | ||
|
||
func (f *BackupCompactionIterator) HasPointAndRange() (bool, bool) { | ||
hasPoint, hasRange := f.iter.HasPointAndRange() | ||
if hasRange { | ||
panic("unexpected range tombstone") | ||
} | ||
return hasPoint, hasRange | ||
} | ||
|
||
func (f *BackupCompactionIterator) RangeBounds() roachpb.Span { | ||
panic("BackupCompactionIterator does not operate on range keys") | ||
} | ||
|
||
func (f *BackupCompactionIterator) RangeKeys() MVCCRangeKeyStack { | ||
panic("BackupCompactionIterator does not operate on range keys") | ||
} | ||
|
||
func (f *BackupCompactionIterator) RangeKeyChanged() bool { | ||
panic("BackupCompactionIterator does not operate on range keys") | ||
} | ||
|
||
func (f *BackupCompactionIterator) assertInvariants() error { | ||
if err := assertSimpleMVCCIteratorInvariants(f); err != nil { | ||
return err | ||
} | ||
|
||
if f.asOf.IsEmpty() { | ||
return errors.AssertionFailedf("f.asOf is empty") | ||
} | ||
|
||
if ok, err := f.iter.Valid(); !ok || err != nil { | ||
errMsg := err.Error() | ||
return errors.AssertionFailedf("invalid underlying iter with err=%s", errMsg) | ||
} | ||
|
||
key := f.UnsafeKey() | ||
if key.Timestamp.IsEmpty() { | ||
return errors.AssertionFailedf("emitted key %s has no timestamp", key) | ||
} | ||
if f.asOf.Less(key.Timestamp) { | ||
return errors.AssertionFailedf("emitted key %s above asOf timestamp %s", key, f.asOf) | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,197 @@ | ||
// Copyright 2024 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package storage | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/settings/cluster" | ||
"github.com/cockroachdb/cockroach/pkg/util/hlc" | ||
"github.com/cockroachdb/cockroach/pkg/util/leaktest" | ||
"github.com/cockroachdb/cockroach/pkg/util/log" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestBackupCompactionIterator(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
defer log.Scope(t).Close(t) | ||
|
||
pebble, err := Open(context.Background(), InMemory(), | ||
cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */)) | ||
require.NoError(t, err) | ||
defer pebble.Close() | ||
|
||
// The test turns each `input` into a batch for the readAsOfIterator, fully | ||
// iterates the iterator, and puts the surfaced keys into a string in the same | ||
// format as `input`. The test then compares the output to 'expectedNextKey'. | ||
// The 'asOf' field represents the wall time of the hlc.Timestamp for the | ||
// readAsOfIterator. | ||
tests := []asOfTest{ | ||
// Ensure nextkey works as expected. | ||
{input: "b1c1", expectedNextKey: "b1c1", asOf: ""}, | ||
{input: "b2b1", expectedNextKey: "b2", asOf: ""}, | ||
|
||
// Ensure AOST is an inclusive upper bound. | ||
{input: "b1", expectedNextKey: "b1", asOf: "1"}, | ||
{input: "b2b1", expectedNextKey: "b1", asOf: "1"}, | ||
|
||
// Double skip within keys. | ||
{input: "b3b2b1", expectedNextKey: "b1", asOf: "1"}, | ||
|
||
// Double skip across keys. | ||
{input: "b2c2c1", expectedNextKey: "c1", asOf: "1"}, | ||
|
||
// Ensure next key captures at most one mvcc key per key after an asOf skip. | ||
{input: "b3c2c1", expectedNextKey: "c2", asOf: "2"}, | ||
|
||
// Ensure an AOST 'next' takes precedence over a tombstone 'nextkey'. | ||
{input: "b2Xb1c1", expectedNextKey: "b2Xc1", asOf: ""}, | ||
{input: "b2Xb1c1", expectedNextKey: "b1c1", asOf: "1"}, | ||
|
||
// Ensure clean iteration over double tombstone. | ||
{input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb2Xc1", asOf: ""}, | ||
{input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb1c1", asOf: "1"}, | ||
|
||
// Ensure tombstone is skipped after an AOST skip. | ||
{input: "b3c2Xc1d1", expectedNextKey: "c1d1", asOf: "1"}, | ||
|
||
// Ensure key before delete tombstone gets read if under AOST. | ||
{input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: ""}, | ||
{input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: "2"}, | ||
} | ||
|
||
for i, test := range tests { | ||
name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) | ||
t.Run(name, func(t *testing.T) { | ||
batch := pebble.NewBatch() | ||
defer batch.Close() | ||
populateBatch(t, batch, test.input) | ||
iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) | ||
require.NoError(t, err) | ||
defer iter.Close() | ||
|
||
subtests := []iterSubtest{ | ||
{"NextKey", test.expectedNextKey, SimpleMVCCIterator.NextKey}, | ||
} | ||
for _, subtest := range subtests { | ||
t.Run(subtest.name, func(t *testing.T) { | ||
asOf := hlc.Timestamp{} | ||
if test.asOf != "" { | ||
asOf.WallTime = int64(test.asOf[0]) | ||
} | ||
it := NewBackupCompactionIterator(iter, asOf) | ||
iterateSimpleMVCCIterator(t, it, subtest) | ||
}) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestBackupCompactionIteratorSeek(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
defer log.Scope(t).Close(t) | ||
|
||
pebble, err := Open(context.Background(), InMemory(), | ||
cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */)) | ||
require.NoError(t, err) | ||
defer pebble.Close() | ||
|
||
tests := []struct { | ||
input string | ||
seekKey string | ||
expected string | ||
asOf string | ||
}{ | ||
// Ensure vanilla seek works. | ||
{"a1b1", "a1", "a1", ""}, | ||
|
||
// Ensure seek always returns the latest key of an MVCC key. | ||
{"a2a1b1", "a1", "b1", ""}, | ||
{"a2a1b1", "a1", "b1", "2"}, | ||
{"a2a1b1", "a1", "a1", "1"}, | ||
|
||
// Ensure out of bounds seek fails gracefully. | ||
{"a1", "b1", "notOK", ""}, | ||
|
||
// Ensure the asOf timestamp moves the iterator during a seek. | ||
{"a2a1", "a2", "a1", "1"}, | ||
{"a2b1", "a2", "b1", "1"}, | ||
|
||
// Ensure seek will return on a tombstone if it is the latest key. | ||
{"a3Xa1b1", "a3", "a3X", ""}, | ||
{"a3Xa1c2Xc1", "b1", "c2X", ""}, | ||
|
||
// Ensure seek does not return on a key shadowed by a tombstone. | ||
{"a3Xa2a1b1", "a2", "b1", ""}, | ||
{"a3Xa2a1b1", "a2", "b1", "3"}, | ||
{"a3a2Xa1b1", "a1", "b1", ""}, | ||
{"a3a2Xa1b2Xb1c1", "a1", "b2X", ""}, | ||
|
||
// Ensure we can seek to a key right before a tombstone. | ||
{"a2Xa1b2b1Xc1", "a1", "b2", ""}, | ||
|
||
// Ensure seeking on a tombstone still lands on the tombstone if | ||
// within AOST | ||
{"a2Xa1b2", "a2X", "a2X", ""}, | ||
|
||
// Ensure seeking on a tombstone moves past it if above AOST | ||
{"a3Xa1b2", "a2X", "a1", "2"}, | ||
|
||
// Ensure AOST 'next' takes precendence over tombstone 'nextkey'. | ||
{"a4a3Xa1b1", "a3", "a1", "1"}, | ||
{"a4a3Xa2a1b1", "a2", "a1", "1"}, | ||
{"a4a3Xa2a1b1", "a2", "a2", "2"}, | ||
} | ||
for i, test := range tests { | ||
name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) | ||
t.Run(name, func(t *testing.T) { | ||
batch := pebble.NewBatch() | ||
defer batch.Close() | ||
populateBatch(t, batch, test.input) | ||
iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) | ||
require.NoError(t, err) | ||
defer iter.Close() | ||
|
||
asOf := hlc.Timestamp{} | ||
if test.asOf != "" { | ||
asOf.WallTime = int64(test.asOf[0]) | ||
} | ||
it := NewBackupCompactionIterator(iter, asOf) | ||
var output bytes.Buffer | ||
|
||
seekKey := MVCCKey{ | ||
Key: []byte{test.seekKey[0]}, | ||
Timestamp: hlc.Timestamp{WallTime: int64(test.seekKey[1])}, | ||
} | ||
it.SeekGE(seekKey) | ||
ok, err := it.Valid() | ||
require.NoError(t, err) | ||
if !ok { | ||
if test.expected == "notOK" { | ||
return | ||
} | ||
require.NoError(t, err, "seek not ok") | ||
} | ||
output.Write(it.UnsafeKey().Key) | ||
output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime)) | ||
v, err := DecodeMVCCValueAndErr(it.UnsafeValue()) | ||
require.NoError(t, err) | ||
if v.IsTombstone() { | ||
output.WriteRune('X') | ||
} | ||
require.Equal(t, test.expected, output.String()) | ||
}) | ||
} | ||
} |