Skip to content

Commit

Permalink
db: delete compactions output in case of late cancellation
Browse files Browse the repository at this point in the history
Previously, if we had a late cancellation of a compaction (ie.
right before we were gonna logAndApply), we would end up not marking
compaction outputs as obsolete/zombie, and neither would we delete
them rightaway. This change updates the compaction running code
to also clean up compaction outputs in this case of compaction
cancellation.

Fixes cockroachdb/cockroach#132668.
  • Loading branch information
itsbilal committed Oct 17, 2024
1 parent 1ed5524 commit dd26bf0
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 4 deletions.
74 changes: 71 additions & 3 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2174,6 +2174,53 @@ func (d *DB) compact(c *compaction, errChannel chan error) {
})
}

// cleanupVersionEdit cleans up any on-disk artifacts that were created
// for the application of a versionEdit that is no longer going to be applied.
//
// d.mu must be held when calling this method.
func (d *DB) cleanupVersionEdit(ve *versionEdit) {
obsoleteFiles := make([]*fileBacking, 0, len(ve.NewFiles))
deletedFiles := make(map[base.FileNum]struct{})
for key := range ve.DeletedFiles {
deletedFiles[key.FileNum] = struct{}{}
}
for i := range ve.NewFiles {
if ve.NewFiles[i].Meta.Virtual {
// We handle backing files separately.
continue
}
if _, ok := deletedFiles[ve.NewFiles[i].Meta.FileNum]; ok {
// This file is being moved in this ve to a different level.
// Don't mark it as obsolete.
continue
}
obsoleteFiles = append(obsoleteFiles, ve.NewFiles[i].Meta.PhysicalMeta().FileBacking)
}
for i := range ve.CreatedBackingTables {
if ve.CreatedBackingTables[i].IsUnused() {
obsoleteFiles = append(obsoleteFiles, ve.CreatedBackingTables[i])
}
}
for i := range obsoleteFiles {
// Add this file to zombie tables as well, as the versionSet
// asserts on whether every obsolete file was at one point
// marked zombie.
d.mu.versions.zombieTables[obsoleteFiles[i].DiskFileNum] = tableInfo{
fileInfo: fileInfo{
FileNum: obsoleteFiles[i].DiskFileNum,
FileSize: obsoleteFiles[i].Size,
},
// TODO(bilal): This is harmless if it's wrong, as it only causes
// incorrect accounting for the size of it in metrics. Currently
// all compactions only write to local files anyway except with
// disaggregated storage; if this becomes the norm, we should do
// an objprovider lookup here.
isLocal: true,
}
}
d.mu.versions.addObsoleteLocked(obsoleteFiles)
}

// compact1 runs one compaction.
//
// d.mu must be held when calling this, but the mutex may be dropped and
Expand Down Expand Up @@ -2204,8 +2251,11 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
// as only the holder of the manifest lock will ever write to it.
if c.cancel.Load() {
err = firstError(err, ErrCancelledCompaction)
}
if err != nil {
// This is the first time we've seen a cancellation during the
// life of this compaction (or the original condition on err == nil
// would not have been true). We should delete any tables already
// created, as d.runCompaction did not do that.
d.cleanupVersionEdit(ve)
// logAndApply calls logUnlock. If we didn't call it, we need to call
// logUnlock ourselves.
d.mu.versions.logUnlock()
Expand Down Expand Up @@ -2748,9 +2798,27 @@ func (d *DB) runCompaction(
}
if result.Err != nil {
// Delete any created tables.
obsoleteFiles := make([]*fileBacking, 0, len(result.Tables))
d.mu.Lock()
for i := range result.Tables {
_ = d.objProvider.Remove(fileTypeTable, result.Tables[i].ObjMeta.DiskFileNum)
backing := &fileBacking{
DiskFileNum: result.Tables[i].ObjMeta.DiskFileNum,
Size: result.Tables[i].WriterMeta.Size,
}
obsoleteFiles = append(obsoleteFiles, backing)
// Add this file to zombie tables as well, as the versionSet
// asserts on whether every obsolete file was at one point
// marked zombie.
d.mu.versions.zombieTables[backing.DiskFileNum] = tableInfo{
fileInfo: fileInfo{
FileNum: backing.DiskFileNum,
FileSize: backing.Size,
},
isLocal: true,
}
}
d.mu.versions.addObsoleteLocked(obsoleteFiles)
d.mu.Unlock()
}
// Refresh the disk available statistic whenever a compaction/flush
// completes, before re-acquiring the mutex.
Expand Down
6 changes: 6 additions & 0 deletions internal/manifest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,12 @@ func (b *FileBacking) Ref() {
b.refs.Add(1)
}

// IsUnused returns if the backing is not being used by any tables in a version
// or btree.
func (b *FileBacking) IsUnused() bool {
return b.refs.Load() == 0
}

// Unref decrements the backing's ref count (and returns the new count).
func (b *FileBacking) Unref() int32 {
v := b.refs.Add(-1)
Expand Down
2 changes: 1 addition & 1 deletion internal/manifest/virtual_backings.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type backingWithMetadata struct {
}

// AddAndRef adds a new backing to the set and takes a reference on it. Another
// backing for the same DiskFilNum must not exist.
// backing for the same DiskFileNum must not exist.
//
// The added backing is unused until it is associated with a table via AddTable
// or protected via Protect.
Expand Down

0 comments on commit dd26bf0

Please sign in to comment.