diff --git a/objstorage/objstorageprovider/provider_test.go b/objstorage/objstorageprovider/provider_test.go index a8440e5fa4..73aeb23d07 100644 --- a/objstorage/objstorageprovider/provider_test.go +++ b/objstorage/objstorageprovider/provider_test.go @@ -10,7 +10,9 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "testing" + "time" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" @@ -581,7 +583,8 @@ func TestParallelSync(t *testing.T) { name = "shared" } t.Run(name, func(t *testing.T) { - st := DefaultSettings(vfs.NewMem(), "") + fs := vfs.NewStrictMem() + st := DefaultSettings(fs, "") st.Remote.StorageFactory = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ "": remote.NewInMem(), }) @@ -592,15 +595,35 @@ func TestParallelSync(t *testing.T) { require.NoError(t, err) require.NoError(t, p.SetCreatorID(1)) - const numGoroutines = 4 + const numGoroutines = 32 const numOps = 100 var wg sync.WaitGroup + wg.Add(numGoroutines + 1) + + var mustExistMu struct { + sync.Mutex + m map[base.DiskFileNum]struct{} + } + mustExistMu.m = make(map[base.DiskFileNum]struct{}) + setMustExist := func(num base.DiskFileNum, val bool) { + mustExistMu.Lock() + defer mustExistMu.Unlock() + if val { + mustExistMu.m[num] = struct{}{} + } else { + delete(mustExistMu.m, num) + } + } + + var stop atomic.Bool for n := 0; n < numGoroutines; n++ { - wg.Add(1) go func(startNum int, shared bool) { defer wg.Done() rng := rand.New(rand.NewSource(int64(startNum))) for i := 0; i < numOps; i++ { + if stop.Load() { + return + } num := base.DiskFileNum(startNum + i) w, _, err := p.Create(context.Background(), base.FileTypeTable, num, objstorage.CreateOptions{ PreferSharedStorage: shared, @@ -612,22 +635,68 @@ func TestParallelSync(t *testing.T) { panic(err) } if rng.Intn(2) == 0 { + if stop.Load() { + return + } if err := p.Sync(); err != nil { panic(err) } + setMustExist(num, true) } - if err := p.Remove(base.FileTypeTable, num); err != nil { - panic(err) - } - if rng.Intn(2) == 0 { - if err := p.Sync(); err != nil { + + if rng.Intn(4) == 0 { + setMustExist(num, false) + if err := p.Remove(base.FileTypeTable, num); err != nil { panic(err) } + if rng.Intn(2) == 0 { + if err := p.Sync(); err != nil { + panic(err) + } + } } } }(numOps*(n+1), shared) } + mustExist := make(map[base.DiskFileNum]struct{}) + // "Crash" at a random time. + time.AfterFunc(time.Duration(rand.Int63n(int64(10*time.Millisecond))), func() { + defer wg.Done() + if shared { + // TODO(radu): we cannot simulate a crash in shared mode because we + // have no way to restore the remote.Storage to the state at the time + // of the crash. + return + } + // Grab a consistent snapshot of the current mustExist map. + mustExistMu.Lock() + for n := range mustExistMu.m { + mustExist[n] = struct{}{} + } + fs.SetIgnoreSyncs(true) + mustExistMu.Unlock() + stop.Store(true) + }) + // Wait until the timer function above and all the goroutines finish. wg.Wait() + // Now close the provider, reset the filesystem, and check that all files + // we expect to exist are there. + require.NoError(t, p.Close()) + fs.ResetToSyncedState() + + p, err = Open(st) + require.NoError(t, err) + // Check that all objects exist and can be opened. + for num := range mustExist { + if _, err := p.Lookup(base.FileTypeTable, num); err != nil { + t.Fatalf("object %s not present after crash", num) + } + r, err := p.OpenForReading(context.Background(), base.FileTypeTable, num, objstorage.OpenOptions{}) + if err != nil { + t.Fatalf("object %s cannot be opened after crash: %s", num, err) + } + require.NoError(t, r.Close()) + } }) } }