From 9c30e588b436329d7e4009323b7cfb0c25f6e732 Mon Sep 17 00:00:00 2001 From: Kevin Cao Date: Mon, 16 Dec 2024 11:16:32 -0500 Subject: [PATCH] backupccl: create backup compaction iterator For the purposes of backup compaction, a custom iterator is required that behaves similarly to the ReadAsOfIterator, but also surfaces live tombstones point keys. Epic: none Release note: None --- pkg/storage/backup_compaction_iterator.go | 163 +++++++++++++++ .../backup_compaction_iterator_test.go | 197 ++++++++++++++++++ 2 files changed, 360 insertions(+) create mode 100644 pkg/storage/backup_compaction_iterator.go create mode 100644 pkg/storage/backup_compaction_iterator_test.go diff --git a/pkg/storage/backup_compaction_iterator.go b/pkg/storage/backup_compaction_iterator.go new file mode 100644 index 000000000000..90c81114aa51 --- /dev/null +++ b/pkg/storage/backup_compaction_iterator.go @@ -0,0 +1,163 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// BackupCompactionIterator wraps a SimpleMVCCIterator and only surfaces the +// latest valid key of a given MVCC key that is also below the asOfTimestamp, if +// set. The iterator does surface point tombstones, but not any MVCC keys +// shadowed by tombstones below the asOfTimestamp, if set. The iterator assumes +// that it will not encounter any write intents. +type BackupCompactionIterator struct { + iter SimpleMVCCIterator + + // asOf is the latest timestamp of a key surfaced by the iterator. + asOf hlc.Timestamp + + // valid tracks if the current key is valid + valid bool + + // err tracks if iterating to the current key returned an error + err error + + // newestRangeTombstone contains the timestamp of the latest range + // tombstone below asOf at the current position, if any. + newestRangeTombstone hlc.Timestamp +} + +var _ SimpleMVCCIterator = &BackupCompactionIterator{} + +func NewBackupCompactionIterator( + iter SimpleMVCCIterator, asOf hlc.Timestamp, +) *BackupCompactionIterator { + if asOf.IsEmpty() { + asOf = hlc.MaxTimestamp + } + return &BackupCompactionIterator{ + iter: iter, + asOf: asOf, + } +} + +func (f *BackupCompactionIterator) Close() { + f.iter.Close() +} + +func (f *BackupCompactionIterator) Next() { + f.NextKey() +} + +func (f *BackupCompactionIterator) NextKey() { + f.iter.NextKey() + f.advance() +} + +func (f *BackupCompactionIterator) SeekGE(originalKey MVCCKey) { + synthetic := MVCCKey{Key: originalKey.Key, Timestamp: f.asOf} + f.iter.SeekGE(synthetic) + if f.advance(); f.valid && f.UnsafeKey().Less(originalKey) { + f.NextKey() + } +} + +func (f *BackupCompactionIterator) updateValid() bool { + f.valid, f.err = f.iter.Valid() + return f.valid +} + +func (f *BackupCompactionIterator) advance() { + for { + if ok := f.updateValid(); !ok { + return + } + if key := f.iter.UnsafeKey(); f.asOf.Less(key.Timestamp) { + f.iter.Next() + continue + } + return + } +} + +func (f *BackupCompactionIterator) UnsafeKey() MVCCKey { + return f.iter.UnsafeKey() +} + +func (f *BackupCompactionIterator) UnsafeValue() ([]byte, error) { + return f.iter.UnsafeValue() +} + +func (f *BackupCompactionIterator) Valid() (bool, error) { + if util.RaceEnabled && f.valid { + if err := f.assertInvariants(); err != nil { + return false, err + } + } + return f.valid, f.err +} + +func (f *BackupCompactionIterator) MVCCValueLenAndIsTombstone() (int, bool, error) { + return f.iter.MVCCValueLenAndIsTombstone() +} + +func (f *BackupCompactionIterator) ValueLen() int { + return f.iter.ValueLen() +} + +func (f *BackupCompactionIterator) HasPointAndRange() (bool, bool) { + hasPoint, hasRange := f.iter.HasPointAndRange() + if hasRange { + panic("unexpected range tombstone") + } + return hasPoint, hasRange +} + +func (f *BackupCompactionIterator) RangeBounds() roachpb.Span { + panic("BackupCompactionIterator does not operate on range keys") +} + +func (f *BackupCompactionIterator) RangeKeys() MVCCRangeKeyStack { + panic("BackupCompactionIterator does not operate on range keys") +} + +func (f *BackupCompactionIterator) RangeKeyChanged() bool { + panic("BackupCompactionIterator does not operate on range keys") +} + +func (f *BackupCompactionIterator) assertInvariants() error { + if err := assertSimpleMVCCIteratorInvariants(f); err != nil { + return err + } + + if f.asOf.IsEmpty() { + return errors.AssertionFailedf("f.asOf is empty") + } + + if ok, err := f.iter.Valid(); !ok || err != nil { + errMsg := err.Error() + return errors.AssertionFailedf("invalid underlying iter with err=%s", errMsg) + } + + key := f.UnsafeKey() + if key.Timestamp.IsEmpty() { + return errors.AssertionFailedf("emitted key %s has no timestamp", key) + } + if f.asOf.Less(key.Timestamp) { + return errors.AssertionFailedf("emitted key %s above asOf timestamp %s", key, f.asOf) + } + + return nil +} diff --git a/pkg/storage/backup_compaction_iterator_test.go b/pkg/storage/backup_compaction_iterator_test.go new file mode 100644 index 000000000000..2951afc54ddc --- /dev/null +++ b/pkg/storage/backup_compaction_iterator_test.go @@ -0,0 +1,197 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestBackupCompactionIterator(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + pebble, err := Open(context.Background(), InMemory(), + cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) + defer pebble.Close() + + // The test turns each `input` into a batch for the readAsOfIterator, fully + // iterates the iterator, and puts the surfaced keys into a string in the same + // format as `input`. The test then compares the output to 'expectedNextKey'. + // The 'asOf' field represents the wall time of the hlc.Timestamp for the + // readAsOfIterator. + tests := []asOfTest{ + // Ensure nextkey works as expected. + {input: "b1c1", expectedNextKey: "b1c1", asOf: ""}, + {input: "b2b1", expectedNextKey: "b2", asOf: ""}, + + // Ensure AOST is an inclusive upper bound. + {input: "b1", expectedNextKey: "b1", asOf: "1"}, + {input: "b2b1", expectedNextKey: "b1", asOf: "1"}, + + // Double skip within keys. + {input: "b3b2b1", expectedNextKey: "b1", asOf: "1"}, + + // Double skip across keys. + {input: "b2c2c1", expectedNextKey: "c1", asOf: "1"}, + + // Ensure next key captures at most one mvcc key per key after an asOf skip. + {input: "b3c2c1", expectedNextKey: "c2", asOf: "2"}, + + // Ensure an AOST 'next' takes precedence over a tombstone 'nextkey'. + {input: "b2Xb1c1", expectedNextKey: "b2Xc1", asOf: ""}, + {input: "b2Xb1c1", expectedNextKey: "b1c1", asOf: "1"}, + + // Ensure clean iteration over double tombstone. + {input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb2Xc1", asOf: ""}, + {input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb1c1", asOf: "1"}, + + // Ensure tombstone is skipped after an AOST skip. + {input: "b3c2Xc1d1", expectedNextKey: "c1d1", asOf: "1"}, + + // Ensure key before delete tombstone gets read if under AOST. + {input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: ""}, + {input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: "2"}, + } + + for i, test := range tests { + name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) + t.Run(name, func(t *testing.T) { + batch := pebble.NewBatch() + defer batch.Close() + populateBatch(t, batch, test.input) + iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) + require.NoError(t, err) + defer iter.Close() + + subtests := []iterSubtest{ + {"NextKey", test.expectedNextKey, SimpleMVCCIterator.NextKey}, + } + for _, subtest := range subtests { + t.Run(subtest.name, func(t *testing.T) { + asOf := hlc.Timestamp{} + if test.asOf != "" { + asOf.WallTime = int64(test.asOf[0]) + } + it := NewBackupCompactionIterator(iter, asOf) + iterateSimpleMVCCIterator(t, it, subtest) + }) + } + }) + } +} + +func TestBackupCompactionIteratorSeek(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + pebble, err := Open(context.Background(), InMemory(), + cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) + defer pebble.Close() + + tests := []struct { + input string + seekKey string + expected string + asOf string + }{ + // Ensure vanilla seek works. + {"a1b1", "a1", "a1", ""}, + + // Ensure seek always returns the latest key of an MVCC key. + {"a2a1b1", "a1", "b1", ""}, + {"a2a1b1", "a1", "b1", "2"}, + {"a2a1b1", "a1", "a1", "1"}, + + // Ensure out of bounds seek fails gracefully. + {"a1", "b1", "notOK", ""}, + + // Ensure the asOf timestamp moves the iterator during a seek. + {"a2a1", "a2", "a1", "1"}, + {"a2b1", "a2", "b1", "1"}, + + // Ensure seek will return on a tombstone if it is the latest key. + {"a3Xa1b1", "a3", "a3X", ""}, + {"a3Xa1c2Xc1", "b1", "c2X", ""}, + + // Ensure seek does not return on a key shadowed by a tombstone. + {"a3Xa2a1b1", "a2", "b1", ""}, + {"a3Xa2a1b1", "a2", "b1", "3"}, + {"a3a2Xa1b1", "a1", "b1", ""}, + {"a3a2Xa1b2Xb1c1", "a1", "b2X", ""}, + + // Ensure we can seek to a key right before a tombstone. + {"a2Xa1b2b1Xc1", "a1", "b2", ""}, + + // Ensure seeking on a tombstone still lands on the tombstone if + // within AOST + {"a2Xa1b2", "a2X", "a2X", ""}, + + // Ensure seeking on a tombstone moves past it if above AOST + {"a3Xa1b2", "a2X", "a1", "2"}, + + // Ensure AOST 'next' takes precendence over tombstone 'nextkey'. + {"a4a3Xa1b1", "a3", "a1", "1"}, + {"a4a3Xa2a1b1", "a2", "a1", "1"}, + {"a4a3Xa2a1b1", "a2", "a2", "2"}, + } + for i, test := range tests { + name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) + t.Run(name, func(t *testing.T) { + batch := pebble.NewBatch() + defer batch.Close() + populateBatch(t, batch, test.input) + iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) + require.NoError(t, err) + defer iter.Close() + + asOf := hlc.Timestamp{} + if test.asOf != "" { + asOf.WallTime = int64(test.asOf[0]) + } + it := NewBackupCompactionIterator(iter, asOf) + var output bytes.Buffer + + seekKey := MVCCKey{ + Key: []byte{test.seekKey[0]}, + Timestamp: hlc.Timestamp{WallTime: int64(test.seekKey[1])}, + } + it.SeekGE(seekKey) + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + if test.expected == "notOK" { + return + } + require.NoError(t, err, "seek not ok") + } + output.Write(it.UnsafeKey().Key) + output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime)) + v, err := DecodeMVCCValueAndErr(it.UnsafeValue()) + require.NoError(t, err) + if v.IsTombstone() { + output.WriteRune('X') + } + require.Equal(t, test.expected, output.String()) + }) + } +}