Skip to content

Commit

Permalink
objstorageprovider: fix race during sharedSync
Browse files Browse the repository at this point in the history
Applying a batch to the remote object catalog is serialized through
the catalog mutex. However, it is possible for two goroutines to try
to sync in parallel, and the one with the more recent operations to
apply the batch first. That can cause the catalog code to see deletion
of an object before creation.

The fix is to use a mutex to make the entire sync code atomic.

Fixes #2852.
  • Loading branch information
RaduBerinde committed Aug 28, 2023
1 parent 0b401ee commit c564927
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 4 deletions.
61 changes: 61 additions & 0 deletions objstorage/objstorageprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package objstorageprovider
import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"testing"

"github.com/cockroachdb/datadriven"
Expand Down Expand Up @@ -537,3 +539,62 @@ func xor(n int) byte {
v ^= v >> 8
return byte(v)
}

// TestParallelSync checks that multiple goroutines can create and delete
// objects and sync in parallel.
func TestParallelSync(t *testing.T) {
for _, shared := range []bool{false, true} {
name := "local"
if shared {
name = "shared"
}
t.Run(name, func(t *testing.T) {
st := DefaultSettings(vfs.NewMem(), "")
st.Remote.StorageFactory = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
"": remote.NewInMem(),
})

st.Remote.CreateOnShared = true
st.Remote.CreateOnSharedLocator = ""
p, err := Open(st)
require.NoError(t, err)
require.NoError(t, p.SetCreatorID(1))

const numGoroutines = 4
const numOps = 100
var wg sync.WaitGroup
for n := 0; n < numGoroutines; n++ {
wg.Add(1)
startNum := numOps * n
go func() {
rng := rand.New(rand.NewSource(int64(startNum)))
for i := 0; i < numOps; i++ {
num := base.FileNum(startNum + i).DiskFileNum()
w, _, err := p.Create(context.Background(), base.FileTypeTable, num, objstorage.CreateOptions{
PreferSharedStorage: shared,
})
if err != nil {
panic(err)
}
if err := w.Finish(); err != nil {
panic(err)
}
if rng.Intn(2) == 0 {
if err := p.Sync(); err != nil {
panic(err)
}
}
if err := p.Remove(base.FileTypeTable, num); err != nil {
panic(err)
}
if rng.Intn(2) == 0 {
if err := p.Sync(); err != nil {
panic(err)
}
}
}
}()
}
})
}
}
18 changes: 14 additions & 4 deletions objstorage/objstorageprovider/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
// All fields remain unset if remote storage is not configured.
type remoteSubsystem struct {
catalog *remoteobjcat.Catalog
cache *sharedcache.Cache
// catalogSyncMutex is used to correctly serialize two sharedSync operations.
// It must be acquired before the provider mutex.
catalogSyncMutex sync.Mutex

cache *sharedcache.Cache

// shared contains the fields relevant to shared objects, i.e. objects that
// are created by Pebble and potentially shared between Pebble instances.
Expand Down Expand Up @@ -176,6 +180,12 @@ func (p *provider) sharedCheckInitialized() error {
}

func (p *provider) sharedSync() error {
// Serialize parallel sync operations. Note that ApplyBatch is alrady
// serialized internally, but we want to make sure they get called with
// batches in the right order.
p.remote.catalogSyncMutex.Lock()
defer p.remote.catalogSyncMutex.Unlock()

batch := func() remoteobjcat.Batch {
p.mu.Lock()
defer p.mu.Unlock()
Expand All @@ -188,9 +198,9 @@ func (p *provider) sharedSync() error {
return nil
}

err := p.remote.catalog.ApplyBatch(batch)
if err != nil {
// We have to put back the batch (for the next Sync).
if err := p.remote.catalog.ApplyBatch(batch); err != nil {
// Put back the batch (for the next Sync), appending any operations that
// happened in the meantime.
p.mu.Lock()
defer p.mu.Unlock()
batch.Append(p.mu.remote.catalogBatch)
Expand Down

0 comments on commit c564927

Please sign in to comment.