Skip to content

Commit

Permalink
remoteobjcat: improve ApplyBatch code
Browse files Browse the repository at this point in the history
Improving the ApplyBatch code to first perform sanity checks, then
apply the change to the catalog file, then apply to the in-memory
state. This ensures correct state if writing to the catalog file
fails.

This also fixes a problem with the existing code when the catalog file
gets rotated: we write down the in-memory state in the new file then
write the batch; but because the in-memory state already reflected the
batch, we essentially were applying it twice. The fix is accompanied
by new sanity checks when loading the catalog.
  • Loading branch information
RaduBerinde committed Aug 28, 2023
1 parent c564927 commit 123e628
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 33 deletions.
8 changes: 7 additions & 1 deletion objstorage/objstorageprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ func DefaultSettings(fs vfs.FS, dirName string) Settings {

// Open creates the provider.
func Open(settings Settings) (objstorage.Provider, error) {
return open(settings)
// Note: we can't just `return open(settings)` because in an error case we
// would return (*provider)(nil) which is not objstorage.Provider(nil).
p, err := open(settings)
if err != nil {
return nil, err
}
return p, nil
}

func open(settings Settings) (p *provider, _ error) {
Expand Down
45 changes: 25 additions & 20 deletions objstorage/objstorageprovider/remoteobjcat/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,37 +218,40 @@ func (c *Catalog) ApplyBatch(b Batch) error {
c.mu.Lock()
defer c.mu.Unlock()

// Add new objects before deleting any objects. This allows for cases where
// the same batch adds and deletes an object.
for _, meta := range b.ve.NewObjects {
if _, exists := c.mu.objects[meta.FileNum]; exists {
return errors.AssertionFailedf("adding existing object %s", meta.FileNum)
// Sanity checks.
toAdd := make(map[base.DiskFileNum]struct{}, len(b.ve.NewObjects))
exists := func(n base.DiskFileNum) bool {
_, ok := c.mu.objects[n]
if !ok {
_, ok = toAdd[n]
}
return ok
}
for _, meta := range b.ve.NewObjects {
c.mu.objects[meta.FileNum] = meta
}
removeAddedObjects := func() {
for i := range b.ve.NewObjects {
delete(c.mu.objects, b.ve.NewObjects[i].FileNum)
if exists(meta.FileNum) {
return errors.AssertionFailedf("adding existing object %s", meta.FileNum)
}
toAdd[meta.FileNum] = struct{}{}
}
for _, n := range b.ve.DeletedObjects {
if _, exists := c.mu.objects[n]; !exists {
removeAddedObjects()
if !exists(n) {
return errors.AssertionFailedf("deleting non-existent object %s", n)
}
}
// Apply the remainder of the batch to our current state.
for _, n := range b.ve.DeletedObjects {
delete(c.mu.objects, n)
}

if err := c.writeToCatalogFileLocked(&b.ve); err != nil {
return errors.Wrapf(err, "pebble: could not write to remote object catalog: %v", err)
}

b.Reset()
// Add new objects before deleting any objects. This allows for cases where
// the same batch adds and deletes an object.
for _, meta := range b.ve.NewObjects {
c.mu.objects[meta.FileNum] = meta
}
for _, n := range b.ve.DeletedObjects {
delete(c.mu.objects, n)
}

return nil
}

Expand All @@ -273,13 +276,15 @@ func (c *Catalog) loadFromCatalogFile(filename string) error {
errors.Safe(filename))
}
var ve VersionEdit
err = ve.Decode(r)
if err != nil {
if err := ve.Decode(r); err != nil {
return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
errors.Safe(filename))
}
// Apply the version edit to the current state.
ve.Apply(&c.mu.creatorID, c.mu.objects)
if err := ve.Apply(&c.mu.creatorID, c.mu.objects); err != nil {
return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
errors.Safe(filename))
}
}
return nil
}
Expand Down
22 changes: 15 additions & 7 deletions objstorage/objstorageprovider/remoteobjcat/testdata/catalog
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ sync: test/REMOTE-OBJ-CATALOG-000003
sync: test/REMOTE-OBJ-CATALOG-000003
sync: test/REMOTE-OBJ-CATALOG-000003
sync: test/REMOTE-OBJ-CATALOG-000003
sync: test/REMOTE-OBJ-CATALOG-000003
close: test/REMOTE-OBJ-CATALOG-000003
create: test/REMOTE-OBJ-CATALOG-000004
sync: test/REMOTE-OBJ-CATALOG-000004
Expand All @@ -188,7 +189,6 @@ remove: test/REMOTE-OBJ-CATALOG-000003
sync: test/REMOTE-OBJ-CATALOG-000004
sync: test/REMOTE-OBJ-CATALOG-000004
sync: test/REMOTE-OBJ-CATALOG-000004
sync: test/REMOTE-OBJ-CATALOG-000004

list test
----
Expand All @@ -210,6 +210,7 @@ sync: test/REMOTE-OBJ-CATALOG-000004
sync: test/REMOTE-OBJ-CATALOG-000004
sync: test/REMOTE-OBJ-CATALOG-000004
sync: test/REMOTE-OBJ-CATALOG-000004
sync: test/REMOTE-OBJ-CATALOG-000004
close: test/REMOTE-OBJ-CATALOG-000004
create: test/REMOTE-OBJ-CATALOG-000005
sync: test/REMOTE-OBJ-CATALOG-000005
Expand All @@ -224,7 +225,6 @@ sync: test/REMOTE-OBJ-CATALOG-000005
sync: test/REMOTE-OBJ-CATALOG-000005
sync: test/REMOTE-OBJ-CATALOG-000005
sync: test/REMOTE-OBJ-CATALOG-000005
sync: test/REMOTE-OBJ-CATALOG-000005

list test
----
Expand All @@ -245,7 +245,6 @@ sync: test
remove: test/REMOTE-OBJ-CATALOG-000005
sync: test/REMOTE-OBJ-CATALOG-000006
sync: test/REMOTE-OBJ-CATALOG-000006
sync: test/REMOTE-OBJ-CATALOG-000006
close: test/REMOTE-OBJ-CATALOG-000006
create: test/REMOTE-OBJ-CATALOG-000007
sync: test/REMOTE-OBJ-CATALOG-000007
Expand All @@ -258,10 +257,19 @@ sync: test/REMOTE-OBJ-CATALOG-000007
sync: test/REMOTE-OBJ-CATALOG-000007
sync: test/REMOTE-OBJ-CATALOG-000007
sync: test/REMOTE-OBJ-CATALOG-000007
sync: test/REMOTE-OBJ-CATALOG-000007
sync: test/REMOTE-OBJ-CATALOG-000007
close: test/REMOTE-OBJ-CATALOG-000007
create: test/REMOTE-OBJ-CATALOG-000008
sync: test/REMOTE-OBJ-CATALOG-000008
create: test/marker.remote-obj-catalog.000008.REMOTE-OBJ-CATALOG-000008
close: test/marker.remote-obj-catalog.000008.REMOTE-OBJ-CATALOG-000008
remove: test/marker.remote-obj-catalog.000007.REMOTE-OBJ-CATALOG-000007
sync: test
remove: test/REMOTE-OBJ-CATALOG-000007
sync: test/REMOTE-OBJ-CATALOG-000008
sync: test/REMOTE-OBJ-CATALOG-000008
sync: test/REMOTE-OBJ-CATALOG-000008

list test
----
REMOTE-OBJ-CATALOG-000007
marker.remote-obj-catalog.000007.REMOTE-OBJ-CATALOG-000007
REMOTE-OBJ-CATALOG-000008
marker.remote-obj-catalog.000008.REMOTE-OBJ-CATALOG-000008
15 changes: 11 additions & 4 deletions objstorage/objstorageprovider/remoteobjcat/version_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,21 @@ var errCorruptCatalog = base.CorruptionErrorf("pebble: corrupt remote object cat
// Apply the version edit to a creator ID and a map of objects.
func (v *VersionEdit) Apply(
creatorID *objstorage.CreatorID, objects map[base.DiskFileNum]RemoteObjectMetadata,
) {
) error {
if v.CreatorID.IsSet() {
*creatorID = v.CreatorID
}
for _, fileNum := range v.DeletedObjects {
delete(objects, fileNum)
}
for _, meta := range v.NewObjects {
if _, exists := objects[meta.FileNum]; exists {
return errors.AssertionFailedf("version edit adds existing object %s", meta.FileNum)
}
objects[meta.FileNum] = meta
}
for _, fileNum := range v.DeletedObjects {
if _, exists := objects[fileNum]; !exists {
return errors.AssertionFailedf("version edit deletes non-existent object %s", fileNum)
}
delete(objects, fileNum)
}
return nil
}
4 changes: 3 additions & 1 deletion tool/remotecat.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func (m *remoteCatalogT) runDumpOne(stdout io.Writer, filename string) error {
}
}
editIdx++
ve.Apply(&creatorID, objects)
if err := ve.Apply(&creatorID, objects); err != nil {
return err
}
}
fmt.Fprintf(stdout, "CreatorID: %v\n", creatorID)
var filenums []base.DiskFileNum
Expand Down

0 comments on commit 123e628

Please sign in to comment.