Skip to content

Commit

Permalink
Merge pull request #8608 from Lyndon-Li/update-du-dd-progress-when-te…
Browse files Browse the repository at this point in the history
…rminal-event-is-missing

Update du/dd progress on completion
  • Loading branch information
Lyndon-Li authored Jan 14, 2025
2 parents ddc1bcb + 97ce566 commit 2ef7711
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 62 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8608-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix issue #8497, update du/dd progress on completion
2 changes: 1 addition & 1 deletion pkg/datamover/backup_micro_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestOnDataUploadCompleted(t *testing.T) {
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal backup result { false { }}: fake-marshal-error",
expectedErr: "Failed to marshal backup result { false { } 0}: fake-marshal-error",
},
{
name: "succeed",
Expand Down
2 changes: 1 addition & 1 deletion pkg/datamover/restore_micro_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestOnDataDownloadCompleted(t *testing.T) {
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal restore result {{ }}: fake-marshal-error",
expectedErr: "Failed to marshal restore result {{ } 0}: fake-marshal-error",
},
{
name: "succeed",
Expand Down
8 changes: 4 additions & 4 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
fs.wgDataPath.Done()
}()

snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
snapshotID, emptySnapshot, totalBytes, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs)

if err == provider.ErrorCanceled {
Expand All @@ -193,7 +193,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes}})
}
}()

Expand All @@ -215,7 +215,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
fs.wgDataPath.Done()
}()

err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)
totalBytes, err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -226,7 +226,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target, TotalBytes: totalBytes}})
}
}()

Expand Down
8 changes: 5 additions & 3 deletions pkg/datapath/file_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func TestAsyncBackup(t *testing.T) {
SnapshotID: "fake-snapshot",
EmptySnapshot: false,
Source: AccessPoint{ByPath: "fake-path"},
TotalBytes: 1000,
},
},
path: "fake-path",
Expand All @@ -95,7 +96,7 @@ func TestAsyncBackup(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.result.Backup.TotalBytes, test.err)
mockProvider.On("Close", mock.Anything).Return(nil)
fs.uploaderProv = mockProvider
fs.initialized = true
Expand Down Expand Up @@ -167,7 +168,8 @@ func TestAsyncRestore(t *testing.T) {
},
result: Result{
Restore: RestoreResult{
Target: AccessPoint{ByPath: "fake-path"},
Target: AccessPoint{ByPath: "fake-path"},
TotalBytes: 1000,
},
},
path: "fake-path",
Expand All @@ -179,7 +181,7 @@ func TestAsyncRestore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err)
mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Restore.TotalBytes, test.err)
mockProvider.On("Close", mock.Anything).Return(nil)
fs.uploaderProv = mockProvider
fs.initialized = true
Expand Down
17 changes: 16 additions & 1 deletion pkg/datapath/micro_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ func (ms *microServiceBRWatcher) startWatch() {
logger.Info("Calling callback on data path pod termination")

if lastPod.Status.Phase == v1.PodSucceeded {
ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log))
result := funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log)
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, getCompletionProgressFromResult(ms.taskType, result))
ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, result)
} else {
if strings.HasSuffix(terminateMessage, ErrCancelled) {
ms.callbacks.OnCancelled(ms.ctx, ms.namespace, ms.taskName)
Expand Down Expand Up @@ -390,6 +392,19 @@ func getProgressFromMessage(message string, logger logrus.FieldLogger) *uploader
return progress
}

func getCompletionProgressFromResult(taskType string, result Result) *uploader.Progress {
progress := &uploader.Progress{}
if taskType == TaskTypeBackup {
progress.BytesDone = result.Backup.TotalBytes
progress.TotalBytes = result.Backup.TotalBytes
} else {
progress.BytesDone = result.Restore.TotalBytes
progress.TotalBytes = result.Restore.TotalBytes
}

return progress
}

func (ms *microServiceBRWatcher) Cancel() {
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is canceled")
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/datapath/micro_service_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func TestStartWatch(t *testing.T) {
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 1,
},
{
name: "completed",
Expand All @@ -223,6 +224,7 @@ func TestStartWatch(t *testing.T) {
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 1,
},
{
name: "completed with redirect error",
Expand All @@ -244,6 +246,7 @@ func TestStartWatch(t *testing.T) {
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 1,
},
{
name: "complete but terminated event not received in time",
Expand All @@ -263,6 +266,7 @@ func TestStartWatch(t *testing.T) {
},
expectStartEvent: true,
expectComplete: true,
expectProgress: 1,
},
{
name: "complete but terminated event not received immediately",
Expand All @@ -286,6 +290,7 @@ func TestStartWatch(t *testing.T) {
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 1,
},
{
name: "completed with progress",
Expand Down Expand Up @@ -313,7 +318,7 @@ func TestStartWatch(t *testing.T) {
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 2,
expectProgress: 3,
},
{
name: "failed",
Expand Down
4 changes: 3 additions & 1 deletion pkg/datapath/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type BackupResult struct {
SnapshotID string `json:"snapshotID"`
EmptySnapshot bool `json:"emptySnapshot"`
Source AccessPoint `json:"source,omitempty"`
TotalBytes int64 `json:"totalBytes,omitempty"`
}

// RestoreResult represents the result of a restore
type RestoreResult struct {
Target AccessPoint `json:"target,omitempty"`
Target AccessPoint `json:"target,omitempty"`
TotalBytes int64 `json:"totalBytes,omitempty"`
}

// Callbacks defines the collection of callbacks during backup/restore
Expand Down
20 changes: 10 additions & 10 deletions pkg/uploader/provider/kopia.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ func (kp *kopiaProvider) RunBackup(
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, error) {
updater uploader.ProgressUpdater) (string, bool, int64, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
return "", false, 0, errors.New("Need to initial backup progress updater first")
}

if path == "" {
return "", false, errors.New("path is empty")
return "", false, 0, errors.New("path is empty")
}

log := kp.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -175,9 +175,9 @@ func (kp *kopiaProvider) RunBackup(

if kpUploader.IsCanceled() {
log.Warn("Kopia backup is canceled")
return snapshotID, false, ErrorCanceled
return snapshotID, false, 0, ErrorCanceled
}
return snapshotID, false, errors.Wrapf(err, "Failed to run kopia backup")
return snapshotID, false, 0, errors.Wrapf(err, "Failed to run kopia backup")
}

// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
Expand All @@ -189,7 +189,7 @@ func (kp *kopiaProvider) RunBackup(
)

log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size)
return snapshotInfo.ID, false, nil
return snapshotInfo.ID, false, snapshotInfo.Size, nil
}

func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) {
Expand All @@ -211,7 +211,7 @@ func (kp *kopiaProvider) RunRestore(
volumePath string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) error {
updater uploader.ProgressUpdater) (int64, error) {
log := kp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
"volumePath": volumePath,
Expand All @@ -235,12 +235,12 @@ func (kp *kopiaProvider) RunRestore(
size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, volMode, uploaderCfg, log, restoreCancel)

if err != nil {
return errors.Wrapf(err, "Failed to run kopia restore")
return 0, errors.Wrapf(err, "Failed to run kopia restore")
}

if atomic.LoadInt32(&kp.canceling) == 1 {
log.Error("Kopia restore is canceled")
return ErrorCanceled
return 0, ErrorCanceled
}

// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
Expand All @@ -253,5 +253,5 @@ func (kp *kopiaProvider) RunRestore(

log.Info(output)

return nil
return size, nil
}
4 changes: 2 additions & 2 deletions pkg/uploader/provider/kopia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestRunBackup(t *testing.T) {
tc.volMode = uploader.PersistentVolumeFilesystem
}
BackupFunc = tc.hookBackupFunc
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater)
_, _, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater)
if tc.notError {
assert.NoError(t, err)
} else {
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestRunRestore(t *testing.T) {
tc.volMode = uploader.PersistentVolumeFilesystem
}
RestoreFunc = tc.hookRestoreFunc
err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, map[string]string{}, &updater)
_, err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, map[string]string{}, &updater)
if tc.notError {
assert.NoError(t, err)
} else {
Expand Down
62 changes: 45 additions & 17 deletions pkg/uploader/provider/mocks/Provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2ef7711

Please sign in to comment.