diff --git a/objstorage/objstorageprovider/provider_test.go b/objstorage/objstorageprovider/provider_test.go index 1ae8671179b..6f1f7517609 100644 --- a/objstorage/objstorageprovider/provider_test.go +++ b/objstorage/objstorageprovider/provider_test.go @@ -7,7 +7,9 @@ package objstorageprovider import ( "context" "fmt" + "math/rand" "strings" + "sync" "testing" "github.com/cockroachdb/datadriven" @@ -537,3 +539,62 @@ func xor(n int) byte { v ^= v >> 8 return byte(v) } + +// TestParallelSync checks that multiple goroutines can create and delete +// objects and sync in parallel. +func TestParallelSync(t *testing.T) { + for _, shared := range []bool{false, true} { + name := "local" + if shared { + name = "shared" + } + t.Run(name, func(t *testing.T) { + st := DefaultSettings(vfs.NewMem(), "") + st.Remote.StorageFactory = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ + "": remote.NewInMem(), + }) + + st.Remote.CreateOnShared = true + st.Remote.CreateOnSharedLocator = "" + p, err := Open(st) + require.NoError(t, err) + require.NoError(t, p.SetCreatorID(1)) + + const numGoroutines = 4 + const numOps = 100 + var wg sync.WaitGroup + for n := 0; n < numGoroutines; n++ { + wg.Add(1) + startNum := numOps * n + go func() { + rng := rand.New(rand.NewSource(int64(startNum))) + for i := 0; i < numOps; i++ { + num := base.FileNum(startNum + i).DiskFileNum() + w, _, err := p.Create(context.Background(), base.FileTypeTable, num, objstorage.CreateOptions{ + PreferSharedStorage: shared, + }) + if err != nil { + panic(err) + } + if err := w.Finish(); err != nil { + panic(err) + } + if rng.Intn(2) == 0 { + if err := p.Sync(); err != nil { + panic(err) + } + } + if err := p.Remove(base.FileTypeTable, num); err != nil { + panic(err) + } + if rng.Intn(2) == 0 { + if err := p.Sync(); err != nil { + panic(err) + } + } + } + }() + } + }) + } +} diff --git a/objstorage/objstorageprovider/remote.go b/objstorage/objstorageprovider/remote.go index 4d7e29b9524..d4ab17e1171 100644 --- a/objstorage/objstorageprovider/remote.go +++ b/objstorage/objstorageprovider/remote.go @@ -24,7 +24,11 @@ import ( // All fields remain unset if remote storage is not configured. type remoteSubsystem struct { catalog *remoteobjcat.Catalog - cache *sharedcache.Cache + // catalogSyncMutex is used to correctly serialize two sharedSync operations. + // It must be acquired before the provider mutex. + catalogSyncMutex sync.Mutex + + cache *sharedcache.Cache // shared contains the fields relevant to shared objects, i.e. objects that // are created by Pebble and potentially shared between Pebble instances. @@ -176,6 +180,12 @@ func (p *provider) sharedCheckInitialized() error { } func (p *provider) sharedSync() error { + // Serialize parallel sync operations. Note that ApplyBatch is alrady + // serialized internally, but we want to make sure they get called with + // batches in the right order. + p.remote.catalogSyncMutex.Lock() + defer p.remote.catalogSyncMutex.Unlock() + batch := func() remoteobjcat.Batch { p.mu.Lock() defer p.mu.Unlock() @@ -188,9 +198,9 @@ func (p *provider) sharedSync() error { return nil } - err := p.remote.catalog.ApplyBatch(batch) - if err != nil { - // We have to put back the batch (for the next Sync). + if err := p.remote.catalog.ApplyBatch(batch); err != nil { + // Put back the batch (for the next Sync), appending any operations that + // happened in the meantime. p.mu.Lock() defer p.mu.Unlock() batch.Append(p.mu.remote.catalogBatch)