Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PBM-1397: improve backup files check #1020

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 97 additions & 37 deletions pbm/backup/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (
"runtime"
"sync"

"golang.org/x/sync/errgroup"

"github.com/percona/percona-backup-mongodb/pbm/archive"
"github.com/percona/percona-backup-mongodb/pbm/defs"
"github.com/percona/percona-backup-mongodb/pbm/errors"
"github.com/percona/percona-backup-mongodb/pbm/storage"
sfs "github.com/percona/percona-backup-mongodb/pbm/storage/fs"
"github.com/percona/percona-backup-mongodb/pbm/util"
"github.com/percona/percona-backup-mongodb/pbm/version"
)

Expand Down Expand Up @@ -45,71 +44,132 @@ func ReadMetadata(stg storage.Storage, filename string) (*BackupMeta, error) {
func CheckBackupDataFiles(ctx context.Context, stg storage.Storage, bcp *BackupMeta) error {
switch bcp.Type {
case defs.LogicalBackup:
return checkLogicalBackupFiles(ctx, stg, bcp)
return checkLogicalBackupDataFiles(ctx, stg, bcp)
case defs.PhysicalBackup, defs.IncrementalBackup:
return checkPhysicalBackupFiles(ctx, stg, bcp)
return checkPhysicalBackupDataFiles(ctx, stg, bcp)
case defs.ExternalBackup:
return nil // no files available
}

return errors.Errorf("unknown backup type %s", bcp.Type)
}

func checkLogicalBackupFiles(ctx context.Context, stg storage.Storage, bcp *BackupMeta) error {
func checkLogicalBackupDataFiles(_ context.Context, stg storage.Storage, bcp *BackupMeta) error {
legacy := version.IsLegacyArchive(bcp.PBMVersion)
eg, _ := errgroup.WithContext(ctx)

eg := util.NewErrorGroup(runtime.NumCPU() * 2)
for _, rs := range bcp.Replsets {
rs := rs
eg.Go(func() error {
eg.Go(func() error { return checkFile(stg, rs.DumpName) })

eg.Go(func() error { return checkFile(stg, rs.DumpName) })
eg.Go(func() error {
if version.IsLegacyBackupOplog(bcp.PBMVersion) {
return checkFile(stg, rs.OplogName)
}

eg.Go(func() error {
if version.IsLegacyBackupOplog(bcp.PBMVersion) {
return checkFile(stg, rs.OplogName)
files, err := stg.List(rs.OplogName, "")
if err != nil {
return errors.Wrap(err, "list")
}
if len(files) == 0 {
return errors.Wrap(err, "no oplog files")
}
for i := range files {
if files[i].Size == 0 {
return errors.Errorf("%q is empty", path.Join(rs.OplogName, files[i].Name))
}
}

return nil
})

if legacy {
return nil
}

files, err := stg.List(rs.OplogName, "")
nss, err := ReadArchiveNamespaces(stg, rs.DumpName)
if err != nil {
return errors.Wrap(err, "list")
return errors.Wrapf(err, "parse metafile %q", rs.DumpName)
}
if len(files) == 0 {
return errors.Wrap(err, "no oplog files")
}
for i := range files {
if files[i].Size == 0 {
return errors.Errorf("%q is empty", path.Join(rs.OplogName, files[i].Name))

for _, ns := range nss {
if ns.Size == 0 {
continue
}

ns := archive.NSify(ns.Database, ns.Collection)
f := path.Join(bcp.Name, rs.Name, ns+bcp.Compression.Suffix())

eg.Go(func() error { return checkFile(stg, f) })
}

return nil
})
}

if legacy {
continue
}

nss, err := ReadArchiveNamespaces(stg, rs.DumpName)
if err != nil {
return errors.Wrapf(err, "parse metafile %q", rs.DumpName)
}
errs := eg.Wait()
return errors.Join(errs...)
}

for _, ns := range nss {
if ns.Size == 0 {
continue
func checkPhysicalBackupDataFiles(_ context.Context, stg storage.Storage, bcp *BackupMeta) error {
eg := util.NewErrorGroup(runtime.NumCPU() * 2)
for _, rs := range bcp.Replsets {
eg.Go(func() error {
var filelist Filelist
if version.HasFilelistFile(bcp.PBMVersion) {
var err error
filelist, err = ReadFilelistForReplset(stg, bcp.Name, rs.Name)
if err != nil {
return errors.Wrapf(err, "read filelist for replset %s", rs.Name)
}
} else {
filelist = rs.Files
}
if len(filelist) == 0 {
return errors.Errorf("empty filelist for replset %s", rs.Name)
}

ns := archive.NSify(ns.Database, ns.Collection)
f := path.Join(bcp.Name, rs.Name, ns+bcp.Compression.Suffix())
for _, f := range filelist {
if f.Len <= 0 {
continue // no file expected
}

eg.Go(func() error { return checkFile(stg, f) })
}
eg.Go(func() error {
filepath := path.Join(bcp.Name, rs.Name, f.Path(bcp.Compression))
stat, err := stg.FileStat(filepath)
if err != nil {
return errors.Wrapf(err, "file %s", filepath)
}
if stat.Size == 0 {
return errors.Errorf("empty file %s", filepath)
}

return nil
})
}

return nil
})
}

return eg.Wait()
errs := eg.Wait()
return errors.Join(errs...)
}

func checkPhysicalBackupFiles(ctx context.Context, stg storage.Storage, bcp *BackupMeta) error {
return nil
func ReadFilelistForReplset(stg storage.Storage, bcpName, rsName string) (Filelist, error) {
pfFilepath := path.Join(bcpName, rsName, FilelistName)
rdr, err := stg.SourceReader(pfFilepath)
if err != nil {
return nil, errors.Wrapf(err, "open %q", pfFilepath)
}
defer rdr.Close()

filelist, err := ReadFilelist(rdr)
if err != nil {
return nil, errors.Wrapf(err, "parse filelist %q", pfFilepath)
}

return filelist, nil
}

func ReadArchiveNamespaces(stg storage.Storage, metafile string) ([]*archive.Namespace, error) {
Expand Down
10 changes: 10 additions & 0 deletions pbm/backup/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand Down Expand Up @@ -156,6 +157,15 @@ func (f File) String() string {
return fmt.Sprintf("%s [%d:%d]", f.Name, f.Off, f.Len)
}

func (f File) Path(c compress.CompressionType) string {
src := filepath.Join(f.Name + c.Suffix())
if f.Len == 0 {
return src
}

return fmt.Sprintf("%s.%d-%d", src, f.Off, f.Len)
}

func (f *File) WriteTo(w io.Writer) (int64, error) {
fd, err := os.Open(f.Name)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pbm/restore/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,6 @@ func (r *Restore) RunSnapshot(
usersAndRolesOpt restoreUsersAndRolesOption,
) error {
var rdr io.ReadCloser

var err error
if version.IsLegacyArchive(bcp.PBMVersion) {
sr, err := r.bcpStg.SourceReader(dump)
Expand Down
5 changes: 1 addition & 4 deletions pbm/restore/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,10 +1089,7 @@ func (r *PhysRestore) copyFiles() (*s3.DownloadStat, error) {
for i := len(r.files) - 1; i >= 0; i-- {
set := r.files[i]
for _, f := range set.Data {
src := filepath.Join(set.BcpName, setName, f.Name+set.Cmpr.Suffix())
if f.Len != 0 {
src += fmt.Sprintf(".%d-%d", f.Off, f.Len)
}
src := filepath.Join(set.BcpName, setName, f.Path(set.Cmpr))
// cut dbpath from destination if there is any (see PBM-1058)
fname := f.Name
if set.dbpath != "" {
Expand Down
44 changes: 44 additions & 0 deletions pbm/util/errgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package util

import (
"runtime"
"sync"
)

type errorGroup struct {
errs []error
mu sync.Mutex

wg sync.WaitGroup
sem chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice semaphore implementation:
https://pkg.go.dev/golang.org/x/[email protected]/semaphore

}

func NewErrorGroup(limit int) *errorGroup {
if limit <= 0 {
limit = runtime.NumCPU()
}
return &errorGroup{sem: make(chan struct{}, limit)}
}

func (g *errorGroup) Wait() []error {
g.wg.Wait()
return g.errs
}

func (g *errorGroup) Go(f func() error) {
g.wg.Add(1)
go func() {
g.sem <- struct{}{}

defer func() {
<-g.sem
g.wg.Done()
}()

if err := f(); err != nil {
g.mu.Lock()
g.errs = append(g.errs, err)
g.mu.Unlock()
}
}()
}
21 changes: 2 additions & 19 deletions sdk/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sdk

import (
"context"
"path"
"runtime"
"time"

Expand Down Expand Up @@ -192,7 +191,7 @@ func fillFilelistForBackup(ctx context.Context, bcp *BackupMetadata) error {
rs := &bcp.Replsets[i]

eg.Go(func() error {
filelist, err := getFilelistForReplset(stg, bcp.Name, rs.Name)
filelist, err := backup.ReadFilelistForReplset(stg, bcp.Name, rs.Name)
if err != nil {
return errors.Wrapf(err, "get filelist for %q [rs: %s] backup", bcp.Name, rs.Name)
}
Expand Down Expand Up @@ -226,7 +225,7 @@ func fillFilelistForBackup(ctx context.Context, bcp *BackupMetadata) error {
rs := &bcp.Replsets[i]

eg.Go(func() error {
filelist, err := getFilelistForReplset(stg, bcp.Name, rs.Name)
filelist, err := backup.ReadFilelistForReplset(stg, bcp.Name, rs.Name)
if err != nil {
return errors.Wrapf(err, "fetch files for %q [rs: %s] backup", bcp.Name, rs.Name)
}
Expand Down Expand Up @@ -254,22 +253,6 @@ func getStorageForRead(ctx context.Context, bcp *backup.BackupMeta) (storage.Sto
return stg, nil
}

func getFilelistForReplset(stg storage.Storage, bcpName, rsName string) (backup.Filelist, error) {
pfFilepath := path.Join(bcpName, rsName, backup.FilelistName)
rdr, err := stg.SourceReader(pfFilepath)
if err != nil {
return nil, errors.Wrapf(err, "open %q", pfFilepath)
}
defer rdr.Close()

filelist, err := backup.ReadFilelist(rdr)
if err != nil {
return nil, errors.Wrapf(err, "parse filelist %q", pfFilepath)
}

return filelist, nil
}

func (c *Client) GetRestoreByName(ctx context.Context, name string) (*RestoreMetadata, error) {
return restore.GetRestoreMeta(ctx, c.conn, name)
}
Expand Down
Loading