Skip to content

Commit

Permalink
metamorphic: test shared storage including the secondary cache
Browse files Browse the repository at this point in the history
This commit adds a standard option to the metamorphic tests that tests
shared storage including the secondary cache.

This commit fixes a buglet in the vfs stack involving incorrect OpenReadWrite
implementations, which affect the secondary cache and were caught by the
changes to the metamorphic tests.

To test the test changes, I introduced a bug in the secondary cache like so:

+++ b/objstorage/objstorageprovider/sharedcache/shared_cache.go
@@ -127,6 +127,7 @@ func (c *Cache) ReadAt(
                // Note this. The below code does not need the original ofs, as with the earlier
                // reading from the cache done, the relevant offset is ofs + int64(n). Same with p.
                ofs += int64(n)
+               ofs += 3
                p = p[n:]
---

With this chnage, the test failed with the following error:

backing file 000006 error: pebble/table: invalid table (bad magic number: ...)
  • Loading branch information
joshimhoff committed Jul 10, 2023
1 parent fa85ec4 commit 168079a
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 2 deletions.
2 changes: 1 addition & 1 deletion internal/errorfs/errorfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (fs *FS) OpenReadWrite(name string, opts ...vfs.OpenOption) (vfs.File, erro
if err := fs.inj.MaybeError(OpOpen, name); err != nil {
return nil, err
}
f, err := fs.fs.Open(name)
f, err := fs.fs.OpenReadWrite(name)
if err != nil {
return nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ type checkpointOp struct {
}

func (o *checkpointOp) run(t *test, h historyRecorder) {
// TODO(josh): db.Checkpoint does not work with shared storage yet.
// It would be better to filter out ahead of calling run on the op,
// by setting the weight that generator.go uses to zero, or similar.
// But IIUC the ops are shared for ALL the metamorphic test runs, so
// not sure how to do that easily:
// https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
if t.testOpts.sharedStorageEnabled {
h.Recordf("%s // %v", o, nil)
return
}
var opts []pebble.CheckpointOption
if len(o.spans) > 0 {
opts = append(opts, pebble.WithRestrictToSpans(o.spans))
Expand Down Expand Up @@ -1209,6 +1219,11 @@ func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
type dbRestartOp struct{}

func (o *dbRestartOp) run(t *test, h historyRecorder) {
// TODO(josh): db.Restart does not work with shared storage yet.
if t.testOpts.sharedStorageEnabled {
h.Recordf("%s", o)
return
}
if err := t.restartDB(); err != nil {
h.Recordf("%s // %v", o, err)
h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
Expand Down
42 changes: 42 additions & 0 deletions metamorphic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/pebble/bloom"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/objstorage/shared"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -91,6 +92,17 @@ func parseOptions(
case "TestOptions.async_apply_to_db":
opts.asyncApplyToDB = true
return true
case "TestOptions.shared_storage_enabled":
opts.sharedStorageEnabled = true
opts.Opts.Experimental.SharedStorage = shared.MakeSimpleFactory(map[shared.Locator]shared.Storage{
"": shared.NewInMem(),
})
opts.Opts.Experimental.CreateOnShared = true
return true
case "TestOptions.secondary_cache_enabled":
opts.secondaryCacheEnabled = true
opts.Opts.Experimental.SecondaryCacheSize = 1024 * 1024 * 32 // 32 MBs
return true
default:
if customOptionParsers == nil {
return false
Expand Down Expand Up @@ -145,6 +157,12 @@ func optionsToString(opts *TestOptions) string {
if opts.asyncApplyToDB {
fmt.Fprint(&buf, " async_apply_to_db=true\n")
}
if opts.sharedStorageEnabled {
fmt.Fprint(&buf, " shared_storage_enabled=true\n")
}
if opts.secondaryCacheEnabled {
fmt.Fprint(&buf, " secondary_cache_enabled=true\n")
}
for _, customOpt := range opts.CustomOpts {
fmt.Fprintf(&buf, " %s=%s\n", customOpt.Name(), customOpt.Value())
}
Expand Down Expand Up @@ -208,6 +226,11 @@ type TestOptions struct {
enableValueBlocks bool
// Use DB.ApplyNoSyncWait for applies that want to sync the WAL.
asyncApplyToDB bool
// Enable the use of shared storage.
sharedStorageEnabled bool
// Enable the secondary cache. Only effective if sharedStorageEnabled is
// also true.
secondaryCacheEnabled bool
}

// CustomOption defines a custom option that configures the behavior of an
Expand Down Expand Up @@ -349,6 +372,11 @@ func standardOptions() []*TestOptions {
[Options]
format_major_version=%s
`, newestFormatMajorVersionTODO),
27: `
[TestOptions]
shared_storage_enabled=true
secondary_cache_enabled=true
`,
}

opts := make([]*TestOptions, len(stdOpts))
Expand Down Expand Up @@ -469,6 +497,20 @@ func randomOptions(
testOpts.Opts.Experimental.EnableValueBlocks = func() bool { return true }
}
testOpts.asyncApplyToDB = rng.Intn(2) != 0
// 20% of time, enable shared storage.
if rng.Intn(5) == 0 {
testOpts.sharedStorageEnabled = true
testOpts.Opts.Experimental.SharedStorage = shared.MakeSimpleFactory(map[shared.Locator]shared.Storage{
"": shared.NewInMem(),
})
testOpts.Opts.Experimental.CreateOnShared = true
// If shared storage is enabled, enable secondary cache 50% of time.
if rng.Intn(2) == 0 {
testOpts.secondaryCacheEnabled = true
// TODO(josh): Randomize various secondary cache settings.
testOpts.Opts.Experimental.SecondaryCacheSize = 1024 * 1024 * 32 // 32 MBs
}
}
return testOpts
}

Expand Down
1 change: 1 addition & 0 deletions metamorphic/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func TestOptionsRoundtrip(t *testing.T) {
"MaxConcurrentCompactions:",
"Experimental.EnableValueBlocks:",
"Experimental.DisableIngestAsFlushable:",
"Experimental.SharedStorage:",
// Floating points
"Experimental.PointTombstoneWeight:",
}
Expand Down
10 changes: 10 additions & 0 deletions metamorphic/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ func (t *test) init(h *history, dir string, testOpts *TestOptions) error {
}
h.log.Printf("// db.Open() %v", err)

if t.testOpts.sharedStorageEnabled {
err = withRetries(func() error {
return db.SetCreatorID(1)
})
if err != nil {
return err
}
h.log.Printf("// db.SetCreatorID() %v", err)
}

t.tmpDir = t.opts.FS.PathJoin(dir, "tmp")
if err = t.opts.FS.MkdirAll(t.tmpDir, 0755); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
providerSettings.Shared.StorageFactory = opts.Experimental.SharedStorage
providerSettings.Shared.CreateOnShared = opts.Experimental.CreateOnShared
providerSettings.Shared.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
providerSettings.Shared.CacheSizeBytes = opts.Experimental.SecondaryCacheSize

d.objProvider, err = objstorageprovider.Open(providerSettings)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,10 @@ type Options struct {
// using CreateOnSharedLocator. Can only be used when SharedStorage is set.
CreateOnShared bool
CreateOnSharedLocator shared.Locator

// CacheSizeBytes is the size of the on-disk block cache for objects
// on shared storage. If it is 0, no cache is used.
SecondaryCacheSize int64
}

// Filters is a map from filter policy name to filter policy. It is used for
Expand Down
2 changes: 1 addition & 1 deletion vfs/disk_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, erro

// OpenReadWrite implements the FS interface.
func (d *diskHealthCheckingFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) {
return d.fs.Open(name, opts...)
return d.fs.OpenReadWrite(name, opts...)
}

// OpenDir implements the FS interface.
Expand Down

0 comments on commit 168079a

Please sign in to comment.