Skip to content

Commit

Permalink
update du/dd progress on completion
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Jan 7, 2025
1 parent dce9777 commit 05858fa
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 56 deletions.
8 changes: 4 additions & 4 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
fs.wgDataPath.Done()
}()

snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
snapshotID, emptySnapshot, totalBytes, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs)

if err == provider.ErrorCanceled {
Expand All @@ -193,7 +193,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes}})
}
}()

Expand All @@ -215,7 +215,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
fs.wgDataPath.Done()
}()

err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)
totalBytes, err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -226,7 +226,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target, TotalBytes: totalBytes}})
}
}()

Expand Down
17 changes: 16 additions & 1 deletion pkg/datapath/micro_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ func (ms *microServiceBRWatcher) startWatch() {
logger.Info("Calling callback on data path pod termination")

if lastPod.Status.Phase == v1.PodSucceeded {
ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log))
result := funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log)
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, getCompletionProgressFromResult(ms.taskType, result))
ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, result)
} else {
if strings.HasSuffix(terminateMessage, ErrCancelled) {
ms.callbacks.OnCancelled(ms.ctx, ms.namespace, ms.taskName)
Expand Down Expand Up @@ -390,6 +392,19 @@ func getProgressFromMessage(message string, logger logrus.FieldLogger) *uploader
return progress
}

func getCompletionProgressFromResult(taskType string, result Result) *uploader.Progress {
progress := &uploader.Progress{}
if taskType == TaskTypeBackup {
progress.BytesDone = result.Backup.TotalBytes
progress.TotalBytes = result.Backup.TotalBytes
} else {
progress.BytesDone = result.Restore.TotalBytes
progress.TotalBytes = result.Restore.TotalBytes
}

return progress
}

func (ms *microServiceBRWatcher) Cancel() {
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is canceled")
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/datapath/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type BackupResult struct {
SnapshotID string `json:"snapshotID"`
EmptySnapshot bool `json:"emptySnapshot"`
Source AccessPoint `json:"source,omitempty"`
TotalBytes int64 `json:"totalBytes,omitempty"`
}

// RestoreResult represents the result of a restore
type RestoreResult struct {
Target AccessPoint `json:"target,omitempty"`
Target AccessPoint `json:"target,omitempty"`
TotalBytes int64 `json:"totalBytes,omitempty"`
}

// Callbacks defines the collection of callbacks during backup/restore
Expand Down
20 changes: 10 additions & 10 deletions pkg/uploader/provider/kopia.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ func (kp *kopiaProvider) RunBackup(
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, error) {
updater uploader.ProgressUpdater) (string, bool, int64, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
return "", false, 0, errors.New("Need to initial backup progress updater first")
}

if path == "" {
return "", false, errors.New("path is empty")
return "", false, 0, errors.New("path is empty")
}

log := kp.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -175,9 +175,9 @@ func (kp *kopiaProvider) RunBackup(

if kpUploader.IsCanceled() {
log.Warn("Kopia backup is canceled")
return snapshotID, false, ErrorCanceled
return snapshotID, false, 0, ErrorCanceled
}
return snapshotID, false, errors.Wrapf(err, "Failed to run kopia backup")
return snapshotID, false, 0, errors.Wrapf(err, "Failed to run kopia backup")
}

// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
Expand All @@ -189,7 +189,7 @@ func (kp *kopiaProvider) RunBackup(
)

log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size)
return snapshotInfo.ID, false, nil
return snapshotInfo.ID, false, snapshotInfo.Size, nil
}

func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) {
Expand All @@ -211,7 +211,7 @@ func (kp *kopiaProvider) RunRestore(
volumePath string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) error {
updater uploader.ProgressUpdater) (int64, error) {
log := kp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
"volumePath": volumePath,
Expand All @@ -235,12 +235,12 @@ func (kp *kopiaProvider) RunRestore(
size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, volMode, uploaderCfg, log, restoreCancel)

if err != nil {
return errors.Wrapf(err, "Failed to run kopia restore")
return 0, errors.Wrapf(err, "Failed to run kopia restore")
}

if atomic.LoadInt32(&kp.canceling) == 1 {
log.Error("Kopia restore is canceled")
return ErrorCanceled
return 0, ErrorCanceled
}

// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
Expand All @@ -253,5 +253,5 @@ func (kp *kopiaProvider) RunRestore(

log.Info(output)

return nil
return size, nil
}
4 changes: 2 additions & 2 deletions pkg/uploader/provider/kopia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestRunBackup(t *testing.T) {
tc.volMode = uploader.PersistentVolumeFilesystem
}
BackupFunc = tc.hookBackupFunc
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater)
_, _, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater)
if tc.notError {
assert.NoError(t, err)
} else {
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestRunRestore(t *testing.T) {
tc.volMode = uploader.PersistentVolumeFilesystem
}
RestoreFunc = tc.hookRestoreFunc
err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, map[string]string{}, &updater)
_, err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, map[string]string{}, &updater)
if tc.notError {
assert.NoError(t, err)
} else {
Expand Down
62 changes: 45 additions & 17 deletions pkg/uploader/provider/mocks/Provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/uploader/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Provider interface {
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, error)
updater uploader.ProgressUpdater) (string, bool, int64, error)
// RunRestore which will do restore for one specific volume with given snapshot id and return error
// updater is used for updating backup progress which implement by third-party
RunRestore(
Expand All @@ -59,7 +59,7 @@ type Provider interface {
volumePath string,
volMode uploader.PersistentVolumeMode,
uploaderConfig map[string]string,
updater uploader.ProgressUpdater) error
updater uploader.ProgressUpdater) (int64, error)
// Close which will close related repository
Close(ctx context.Context) error
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/uploader/provider/restic.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,21 @@ func (rp *resticProvider) RunBackup(
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, error) {
updater uploader.ProgressUpdater) (string, bool, int64, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
return "", false, 0, errors.New("Need to initial backup progress updater first")
}

if path == "" {
return "", false, errors.New("path is empty")
return "", false, 0, errors.New("path is empty")
}

if realSource != "" {
return "", false, errors.New("real source is not empty, this is not supported by restic uploader")
return "", false, 0, errors.New("real source is not empty, this is not supported by restic uploader")
}

if volMode == uploader.PersistentVolumeBlock {
return "", false, errors.New("unable to support block mode")
return "", false, 0, errors.New("unable to support block mode")
}

log := rp.log.WithFields(logrus.Fields{
Expand All @@ -149,7 +149,7 @@ func (rp *resticProvider) RunBackup(
if len(uploaderCfg) > 0 {
parallelFilesUpload, err := uploaderutil.GetParallelFilesUpload(uploaderCfg)
if err != nil {
return "", false, errors.Wrap(err, "failed to get uploader config")
return "", false, 0, errors.Wrap(err, "failed to get uploader config")
}
if parallelFilesUpload > 0 {
log.Warnf("ParallelFilesUpload is set to %d, but restic does not support parallel file uploads. Ignoring.", parallelFilesUpload)
Expand All @@ -171,9 +171,9 @@ func (rp *resticProvider) RunBackup(
if err != nil {
if strings.Contains(stderrBuf, "snapshot is empty") {
log.Debugf("Restic backup got empty dir with %s path", path)
return "", true, nil
return "", true, 0, nil
}
return "", false, errors.WithStack(fmt.Errorf("error running restic backup command %s with error: %v stderr: %v", backupCmd.String(), err, stderrBuf))
return "", false, 0, errors.WithStack(fmt.Errorf("error running restic backup command %s with error: %v stderr: %v", backupCmd.String(), err, stderrBuf))
}
// GetSnapshotID
snapshotIDCmd := resticGetSnapshotFunc(rp.repoIdentifier, rp.credentialsFile, tags)
Expand All @@ -184,10 +184,10 @@ func (rp *resticProvider) RunBackup(
}
snapshotID, err := resticGetSnapshotIDFunc(snapshotIDCmd)
if err != nil {
return "", false, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err))
return "", false, 0, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err))
}
log.Infof("Run command=%s, stdout=%s, stderr=%s", backupCmd.String(), summary, stderrBuf)
return snapshotID, false, nil
return snapshotID, false, 0, nil
}

// RunRestore runs a `restore` command and monitors the volume size to
Expand All @@ -198,17 +198,17 @@ func (rp *resticProvider) RunRestore(
volumePath string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) error {
updater uploader.ProgressUpdater) (int64, error) {
if updater == nil {
return errors.New("Need to initial backup progress updater first")
return 0, errors.New("Need to initial backup progress updater first")
}
log := rp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
"volumePath": volumePath,
})

if volMode == uploader.PersistentVolumeBlock {
return errors.New("unable to support block mode")
return 0, errors.New("unable to support block mode")
}

restoreCmd := resticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath)
Expand All @@ -220,15 +220,15 @@ func (rp *resticProvider) RunRestore(

extraFlags, err := rp.parseRestoreExtraFlags(uploaderCfg)
if err != nil {
return errors.Wrap(err, "failed to parse uploader config")
return 0, errors.Wrap(err, "failed to parse uploader config")
} else if len(extraFlags) != 0 {
restoreCmd.ExtraFlags = append(restoreCmd.ExtraFlags, extraFlags...)
}

stdout, stderr, err := restic.RunRestore(restoreCmd, log, updater)

log.Infof("Run command=%v, stdout=%s, stderr=%s", restoreCmd, stdout, stderr)
return err
return 0, err
}

func (rp *resticProvider) parseRestoreExtraFlags(uploaderCfg map[string]string) ([]string, error) {
Expand Down
Loading

0 comments on commit 05858fa

Please sign in to comment.