From 6ef35bba0a143012bf208d30d6528bd1a21815b9 Mon Sep 17 00:00:00 2001 From: Dmytro Zghoba Date: Fri, 20 Sep 2024 15:21:42 +0200 Subject: [PATCH] PBM-1397: implement checkPhysicalBackupDataFiles() --- pbm/backup/storage.go | 63 +++++++++++++++++++++++++++++++++++++++-- pbm/backup/types.go | 10 +++++++ pbm/restore/logical.go | 1 - pbm/restore/physical.go | 5 +--- sdk/impl.go | 21 ++------------ 5 files changed, 73 insertions(+), 27 deletions(-) diff --git a/pbm/backup/storage.go b/pbm/backup/storage.go index 8a883a46a..acd55b1b5 100644 --- a/pbm/backup/storage.go +++ b/pbm/backup/storage.go @@ -46,7 +46,7 @@ func CheckBackupDataFiles(ctx context.Context, stg storage.Storage, bcp *BackupM case defs.LogicalBackup: return checkLogicalBackupDataFiles(ctx, stg, bcp) case defs.PhysicalBackup, defs.IncrementalBackup: - return checkPhysicalBackupFiles(ctx, stg, bcp) + return checkPhysicalBackupDataFiles(ctx, stg, bcp) case defs.ExternalBackup: return nil // no files available } @@ -111,8 +111,65 @@ func checkLogicalBackupDataFiles(_ context.Context, stg storage.Storage, bcp *Ba return errors.Join(errs...) } -func checkPhysicalBackupFiles(ctx context.Context, stg storage.Storage, bcp *BackupMeta) error { - return nil +func checkPhysicalBackupDataFiles(_ context.Context, stg storage.Storage, bcp *BackupMeta) error { + eg := util.NewErrorGroup(runtime.NumCPU() * 2) + for _, rs := range bcp.Replsets { + eg.Go(func() error { + var filelist Filelist + if version.HasFilelistFile(bcp.PBMVersion) { + var err error + filelist, err = ReadFilelistForReplset(stg, bcp.Name, rs.Name) + if err != nil { + return errors.Wrapf(err, "read filelist for replset %s", rs.Name) + } + } else { + filelist = rs.Files + } + if len(filelist) == 0 { + return errors.Errorf("empty filelist for replset %s", rs.Name) + } + + for _, f := range filelist { + if f.Len <= 0 { + continue // no file expected + } + + eg.Go(func() error { + filepath := path.Join(bcp.Name, rs.Name, f.Path(bcp.Compression)) + stat, err := stg.FileStat(filepath) + if err != nil { + return errors.Wrapf(err, "file %s", filepath) + } + if stat.Size == 0 { + return errors.Errorf("empty file %s", filepath) + } + + return nil + }) + } + + return nil + }) + } + + errs := eg.Wait() + return errors.Join(errs...) +} + +func ReadFilelistForReplset(stg storage.Storage, bcpName, rsName string) (Filelist, error) { + pfFilepath := path.Join(bcpName, rsName, FilelistName) + rdr, err := stg.SourceReader(pfFilepath) + if err != nil { + return nil, errors.Wrapf(err, "open %q", pfFilepath) + } + defer rdr.Close() + + filelist, err := ReadFilelist(rdr) + if err != nil { + return nil, errors.Wrapf(err, "parse filelist %q", pfFilepath) + } + + return filelist, nil } func ReadArchiveNamespaces(stg storage.Storage, metafile string) ([]*archive.Namespace, error) { diff --git a/pbm/backup/types.go b/pbm/backup/types.go index 52e387872..2997b88b4 100644 --- a/pbm/backup/types.go +++ b/pbm/backup/types.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "os" + "path/filepath" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -156,6 +157,15 @@ func (f File) String() string { return fmt.Sprintf("%s [%d:%d]", f.Name, f.Off, f.Len) } +func (f File) Path(c compress.CompressionType) string { + src := filepath.Join(f.Name + c.Suffix()) + if f.Len == 0 { + return src + } + + return fmt.Sprintf("%s.%d-%d", src, f.Off, f.Len) +} + func (f *File) WriteTo(w io.Writer) (int64, error) { fd, err := os.Open(f.Name) if err != nil { diff --git a/pbm/restore/logical.go b/pbm/restore/logical.go index 886a8f637..29cdd9a73 100644 --- a/pbm/restore/logical.go +++ b/pbm/restore/logical.go @@ -759,7 +759,6 @@ func (r *Restore) RunSnapshot( usersAndRolesOpt restoreUsersAndRolesOption, ) error { var rdr io.ReadCloser - var err error if version.IsLegacyArchive(bcp.PBMVersion) { sr, err := r.bcpStg.SourceReader(dump) diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index 8d00016a8..905197081 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -1089,10 +1089,7 @@ func (r *PhysRestore) copyFiles() (*s3.DownloadStat, error) { for i := len(r.files) - 1; i >= 0; i-- { set := r.files[i] for _, f := range set.Data { - src := filepath.Join(set.BcpName, setName, f.Name+set.Cmpr.Suffix()) - if f.Len != 0 { - src += fmt.Sprintf(".%d-%d", f.Off, f.Len) - } + src := filepath.Join(set.BcpName, setName, f.Path(set.Cmpr)) // cut dbpath from destination if there is any (see PBM-1058) fname := f.Name if set.dbpath != "" { diff --git a/sdk/impl.go b/sdk/impl.go index 83afcc1ff..51043a554 100644 --- a/sdk/impl.go +++ b/sdk/impl.go @@ -2,7 +2,6 @@ package sdk import ( "context" - "path" "runtime" "time" @@ -192,7 +191,7 @@ func fillFilelistForBackup(ctx context.Context, bcp *BackupMetadata) error { rs := &bcp.Replsets[i] eg.Go(func() error { - filelist, err := getFilelistForReplset(stg, bcp.Name, rs.Name) + filelist, err := backup.ReadFilelistForReplset(stg, bcp.Name, rs.Name) if err != nil { return errors.Wrapf(err, "get filelist for %q [rs: %s] backup", bcp.Name, rs.Name) } @@ -226,7 +225,7 @@ func fillFilelistForBackup(ctx context.Context, bcp *BackupMetadata) error { rs := &bcp.Replsets[i] eg.Go(func() error { - filelist, err := getFilelistForReplset(stg, bcp.Name, rs.Name) + filelist, err := backup.ReadFilelistForReplset(stg, bcp.Name, rs.Name) if err != nil { return errors.Wrapf(err, "fetch files for %q [rs: %s] backup", bcp.Name, rs.Name) } @@ -254,22 +253,6 @@ func getStorageForRead(ctx context.Context, bcp *backup.BackupMeta) (storage.Sto return stg, nil } -func getFilelistForReplset(stg storage.Storage, bcpName, rsName string) (backup.Filelist, error) { - pfFilepath := path.Join(bcpName, rsName, backup.FilelistName) - rdr, err := stg.SourceReader(pfFilepath) - if err != nil { - return nil, errors.Wrapf(err, "open %q", pfFilepath) - } - defer rdr.Close() - - filelist, err := backup.ReadFilelist(rdr) - if err != nil { - return nil, errors.Wrapf(err, "parse filelist %q", pfFilepath) - } - - return filelist, nil -} - func (c *Client) GetRestoreByName(ctx context.Context, name string) (*RestoreMetadata, error) { return restore.GetRestoreMeta(ctx, c.conn, name) }