Skip to content

Commit

Permalink
metamorphic: add external files ingest operation
Browse files Browse the repository at this point in the history
This commit adds two new operations:
 - `newExternalObjOp` transforms a batch into an external object.
 - `ingestExternalFilesOp` ingests random parts of random external
   objects. If external storage is not enabled, this operation is
   emulated by creating sst files in the local FS and ingesting those.

The support of external storage is a setting independent of the shared
storage setting, allowing us to test more combinations. The external
storage uses the `"external"` locator whereas shared storage uses the
empty locator.
  • Loading branch information
RaduBerinde committed Feb 15, 2024
1 parent 7c60d90 commit 6685c5a
Show file tree
Hide file tree
Showing 18 changed files with 823 additions and 268 deletions.
4 changes: 4 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func ingestLoad1External(
if !e.HasRangeKey && !e.HasPointKey {
return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
}
// #3287: range keys don't yet work correctly when the range key bounds are not tight.
if e.HasRangeKey {
return nil, errors.New("pebble: range keys not supported in external files")
}
// Don't load table stats. Doing a round trip to shared storage, one SST
// at a time is not worth it as it slows down ingestion.
meta := &fileMetadata{
Expand Down
2 changes: 1 addition & 1 deletion level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ func (l *levelIter) SetContext(ctx context.Context) {

func (l *levelIter) String() string {
if l.iterFile != nil {
return fmt.Sprintf("%s: fileNum=%s", l.level, l.iter.String())
return fmt.Sprintf("%s: fileNum=%s", l.level, l.iterFile.FileNum.String())
}
return fmt.Sprintf("%s: fileNum=<nil>", l.level)
}
Expand Down
119 changes: 109 additions & 10 deletions metamorphic/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package metamorphic

import (
"context"
"fmt"
"slices"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/internal/base"
Expand All @@ -32,7 +34,6 @@ func writeSSTForIngestion(
rangeKeyIter keyspan.FragmentIterator,
writable objstorage.Writable,
targetFMV pebble.FormatMajorVersion,
bounds pebble.KeyRange,
) (*sstable.WriterMetadata, error) {
writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
if t.testOpts.disableValueBlocksForIngestSSTables {
Expand Down Expand Up @@ -62,7 +63,11 @@ func writeSSTForIngestion(
value = pebble.LazyValue{}
key.SetKind(pebble.InternalKeyKindDelete)
}
if err := w.Add(*key, value.InPlaceValue()); err != nil {
valBytes, _, err := value.Value(nil)
if err != nil {
return nil, err
}
if err := w.Add(*key, valBytes); err != nil {
return nil, err
}
}
Expand All @@ -73,9 +78,7 @@ func writeSSTForIngestion(
if rangeDelIter != nil {
span, err := rangeDelIter.First()
for ; span != nil; span, err = rangeDelIter.Next() {
startCopy := append([]byte(nil), span.Start...)
endCopy := append([]byte(nil), span.End...)
if err := w.DeleteRange(startCopy, endCopy); err != nil {
if err := w.DeleteRange(slices.Clone(span.Start), slices.Clone(span.End)); err != nil {
return nil, err
}
}
Expand All @@ -99,11 +102,9 @@ func writeSSTForIngestion(
// containing keys that both set and unset the same suffix at the
// same sequence number is nonsensical, so we "coalesce" or collapse
// the keys.
startCopy := append([]byte(nil), span.Start...)
endCopy := append([]byte(nil), span.End...)
collapsed := keyspan.Span{
Start: startCopy,
End: endCopy,
Start: slices.Clone(span.Start),
End: slices.Clone(span.End),
Keys: make([]keyspan.Key, 0, len(span.Keys)),
}
rangekey.Coalesce(
Expand Down Expand Up @@ -151,7 +152,105 @@ func buildForIngest(
iter, rangeDelIter, rangeKeyIter,
writable,
db.FormatMajorVersion(),
pebble.KeyRange{},
)
return path, meta, err
}

// buildForIngest builds a local SST file containing the keys in the given
// external object (truncated to the given bounds) and returns its path and
// metadata.
func buildForIngestExternalEmulation(
t *Test, dbID objID, externalObjID objID, bounds pebble.KeyRange, i int,
) (path string, _ *sstable.WriterMetadata) {
path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
f, err := t.opts.FS.Create(path)
panicIfErr(err)

reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds)
defer reader.Close()

writable := objstorageprovider.NewFileWritable(f)
meta, err := writeSSTForIngestion(
t,
pointIter, rangeDelIter, rangeKeyIter,
writable,
t.minFMV(),
)
if err != nil {
panic(err)
}
return path, meta
}

func openExternalObj(
t *Test, externalObjID objID, bounds pebble.KeyRange,
) (
reader *sstable.Reader,
pointIter base.InternalIterator,
rangeDelIter keyspan.FragmentIterator,
rangeKeyIter keyspan.FragmentIterator,
) {
objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), externalObjName(externalObjID))
panicIfErr(err)
opts := sstable.ReaderOptions{
Comparer: t.opts.Comparer,
}
reader, err = sstable.NewReader(objstorageprovider.NewRemoteReadable(objReader, objSize), opts)
panicIfErr(err)

pointIter, err = reader.NewIter(bounds.Start, bounds.End)
panicIfErr(err)

rangeDelIter, err = reader.NewRawRangeDelIter()
panicIfErr(err)
if rangeDelIter != nil {
rangeDelIter = keyspan.Truncate(
t.opts.Comparer.Compare,
rangeDelIter,
bounds.Start, bounds.End,
nil /* start */, nil /* end */, false, /* panicOnUpperTruncate */
)
}

rangeKeyIter, err = reader.NewRawRangeKeyIter()
panicIfErr(err)
if rangeKeyIter != nil {
rangeKeyIter = keyspan.Truncate(
t.opts.Comparer.Compare,
rangeKeyIter,
bounds.Start, bounds.End,
nil /* start */, nil /* end */, false, /* panicOnUpperTruncate */
)
}
return reader, pointIter, rangeDelIter, rangeKeyIter
}

// externalObjIsEmpty returns true if the given external object has no point or
// range keys withing the given bounds.
func externalObjIsEmpty(t *Test, externalObjID objID, bounds pebble.KeyRange) bool {
reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds)
defer reader.Close()
defer closeIters(pointIter, rangeDelIter, rangeKeyIter)

key, _ := pointIter.First()
panicIfErr(pointIter.Error())
if key != nil {
return false
}
for _, it := range []keyspan.FragmentIterator{rangeDelIter, rangeKeyIter} {
if it != nil {
span, err := it.First()
panicIfErr(err)
if span != nil {
return false
}
}
}
return true
}

func panicIfErr(err error) {
if err != nil {
panic(err)
}
}
11 changes: 10 additions & 1 deletion metamorphic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
OpNewIter
OpNewIterUsingClone
OpNewSnapshot
OpNewExternalObj
OpReaderGet
OpReplicate
OpSnapshotClose
Expand All @@ -49,6 +50,7 @@ const (
OpWriterDeleteRange
OpWriterIngest
OpWriterIngestAndExcise
OpWriterIngestExternalFiles
OpWriterLogData
OpWriterMerge
OpWriterRangeKeyDelete
Expand Down Expand Up @@ -179,6 +181,8 @@ func DefaultOpConfig() OpConfig {
OpWriterRangeKeyDelete: 5,
OpWriterSet: 100,
OpWriterSingleDelete: 50,
OpNewExternalObj: 2,
OpWriterIngestExternalFiles: 20,
},
// Use a new prefix 75% of the time (and 25% of the time use an existing
// prefix with an alternative suffix).
Expand Down Expand Up @@ -309,9 +313,14 @@ func multiInstanceConfig() OpConfig {
cfg.ops[OpReplicate] = 5
cfg.ops[OpWriterIngestAndExcise] = 50
// Single deletes and merges are disabled in multi-instance mode, as
// replicateOp doesn't support them.
// replicateOp and ingestAndExciseOp don't support them.
cfg.ops[OpWriterSingleDelete] = 0
cfg.ops[OpWriterMerge] = 0

// TODO(radu): external file ingest doesn't yet work with OpReplicate ("cannot
// use skip-shared iteration due to non-shareable files in lower levels").
cfg.ops[OpNewExternalObj] = 0
cfg.ops[OpWriterIngestExternalFiles] = 0
return cfg
}

Expand Down
114 changes: 114 additions & 0 deletions metamorphic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"math"
"os"
"slices"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/internal/randvar"
Expand Down Expand Up @@ -83,6 +84,8 @@ type generator struct {
liveSnapshots objIDSlice
// liveWriters contains the DB, and any live batches. The DB is always at index 0.
liveWriters objIDSlice
// externalObjects contains the external objects created.
externalObjects objIDSlice

// Maps used to find associated objects during generation. These maps are not
// needed during test execution.
Expand Down Expand Up @@ -176,6 +179,7 @@ func generate(rng *rand.Rand, count uint64, cfg OpConfig, km *keyManager) []op {
OpNewIter: g.newIter,
OpNewIterUsingClone: g.newIterUsingClone,
OpNewSnapshot: g.newSnapshot,
OpNewExternalObj: g.newExternalObj,
OpReaderGet: g.readerGet,
OpReplicate: g.replicate,
OpSnapshotClose: g.snapshotClose,
Expand All @@ -184,6 +188,7 @@ func generate(rng *rand.Rand, count uint64, cfg OpConfig, km *keyManager) []op {
OpWriterDeleteRange: g.writerDeleteRange,
OpWriterIngest: g.writerIngest,
OpWriterIngestAndExcise: g.writerIngestAndExcise,
OpWriterIngestExternalFiles: g.writerIngestExternalFiles,
OpWriterLogData: g.writerLogData,
OpWriterMerge: g.writerMerge,
OpWriterRangeKeyDelete: g.writerRangeKeyDelete,
Expand Down Expand Up @@ -1031,6 +1036,24 @@ func (g *generator) snapshotClose() {
g.add(&closeOp{objID: snapID})
}

func (g *generator) newExternalObj() {
if len(g.liveBatches) == 0 {
return
}
batchID := g.liveBatches.rand(g.rng)
if g.keyManager.objKeyMeta(batchID).bounds.IsUnset() {
return
}
g.removeBatchFromGenerator(batchID)
objID := makeObjID(externalObjTag, g.init.externalObjSlots)
g.init.externalObjSlots++
g.externalObjects = append(g.externalObjects, objID)
g.add(&newExternalObjOp{
batchID: batchID,
externalObjID: objID,
})
}

func (g *generator) writerApply() {
if len(g.liveBatches) == 0 {
return
Expand Down Expand Up @@ -1250,6 +1273,97 @@ func (g *generator) writerIngestAndExcise() {
})
}

func (g *generator) writerIngestExternalFiles() {
if len(g.externalObjects) == 0 {
return
}
dbID := g.dbs.rand(g.rng)
numFiles := 1 + g.expRandInt(1)
objs := make([]externalObjWithBounds, numFiles)
for i := range objs {
// We allow the same object to be selected multiple times.
id := g.externalObjects.rand(g.rng)

b := g.keyManager.objKeyMeta(id).bounds
// For now, set the bounds to the bounds of the entire object.
kr := pebble.KeyRange{
Start: b.smallest,
End: b.largest,
}
if !b.largestExcl {
// Move up the end key a bit by appending a few letters to the prefix.
kr.End = append(g.keyGenerator.prefix(kr.End), randBytes(g.rng, 1, 3)...)
}

objs[i] = externalObjWithBounds{
externalObjID: id,
bounds: kr,
}
}

// Sort the objects by their start bound.
sorted := make([]*externalObjWithBounds, numFiles)
for i := range sorted {
sorted[i] = &objs[i]
}
slices.SortFunc(sorted, func(a, b *externalObjWithBounds) int {
return g.cmp(a.bounds.Start, b.bounds.Start)
})

// We take the overall bounds of all the objects and generate 2*numFiles keys
// in that range to create the per-object bounds. In many cases, there won't
// be overlap between the object and its corresponding bounds but that's ok
// (we verify that when running the op).
overall := sorted[0].bounds
for i := 1; i < len(sorted); i++ {
if g.cmp(overall.End, sorted[i].bounds.End) < 0 {
overall.End = sorted[i].bounds.End
}
}
if g.cmp(overall.Start, overall.End) >= 0 {
panic(fmt.Sprintf("invalid bounds [%q, %q)", overall.Start, overall.End))
}
// Generate 2*numFiles distinct keys and sort them. These will form the ingest
// bounds for each file.
keys := g.keyGenerator.UniqueKeys(2*numFiles, func() []byte {
return g.keyGenerator.RandKeyInRange(0.01, overall)
})

for i, o := range sorted {
if i > 0 && g.rng.Intn(4) == 0 {
// Sometimes use the previous end key as the start key; this will be common in practice.
o.bounds.Start = sorted[i-1].bounds.End
} else {
o.bounds.Start = keys[2*i]
}
o.bounds.End = keys[2*i+1]
}
// The batches we're ingesting may contain single delete tombstones that
// when applied to the writer result in nondeterminism in the deleted key.
// If that's the case, we can restore determinism by first deleting the keys
// from the writer.
//
// Generating additional operations here is not ideal, but it simplifies
// single delete invariants significantly.
for _, o := range objs {
singleDeleteConflicts := g.keyManager.checkForSingleDelConflicts(o.externalObjID, dbID, true /* collapsed */)
for _, key := range singleDeleteConflicts {
if g.cmp(key, o.bounds.Start) >= 0 && g.cmp(key, o.bounds.End) < 0 {
g.add(&deleteOp{
writerID: dbID,
key: key,
derivedDBID: dbID,
})
}
}
}

g.add(&ingestExternalFilesOp{
dbID: dbID,
objs: objs,
})
}

func (g *generator) writerLogData() {
if len(g.liveWriters) == 0 {
return
Expand Down
2 changes: 1 addition & 1 deletion metamorphic/key_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (kg *keyGenerator) randKey(newKeyProbability float64, bounds *pebble.KeyRan
key = append(key, testkeys.Suffix(suffix)...)
}
if kg.cmp(key, bounds.Start) < 0 || kg.cmp(key, bounds.End) >= 0 {
panic(fmt.Sprintf("invalid randKey %q bounds: [%q, %q) %v %v", key, bounds.Start, bounds.End, kg.cmp(key, bounds.Start), kg.cmp(key, bounds.End)))
panic(fmt.Sprintf("invalid randKey %q; bounds: [%q, %q) %v %v", key, bounds.Start, bounds.End, kg.cmp(key, bounds.Start), kg.cmp(key, bounds.End)))
}
// We might (rarely) produce an existing key here, that's ok.
kg.keyManager.addNewKey(key)
Expand Down
Loading

0 comments on commit 6685c5a

Please sign in to comment.