Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update du/dd progress on completion #8608

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/8608-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix issue #8497, update du/dd progress on completion
2 changes: 1 addition & 1 deletion pkg/datamover/backup_micro_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestOnDataUploadCompleted(t *testing.T) {
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal backup result { false { }}: fake-marshal-error",
expectedErr: "Failed to marshal backup result { false { } 0}: fake-marshal-error",
},
{
name: "succeed",
Expand Down
2 changes: 1 addition & 1 deletion pkg/datamover/restore_micro_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestOnDataDownloadCompleted(t *testing.T) {
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal restore result {{ }}: fake-marshal-error",
expectedErr: "Failed to marshal restore result {{ } 0}: fake-marshal-error",
},
{
name: "succeed",
Expand Down
8 changes: 4 additions & 4 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
fs.wgDataPath.Done()
}()

snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
snapshotID, emptySnapshot, totalBytes, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs)

if err == provider.ErrorCanceled {
Expand All @@ -193,7 +193,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes}})
}
}()

Expand All @@ -215,7 +215,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
fs.wgDataPath.Done()
}()

err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)
totalBytes, err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -226,7 +226,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target, TotalBytes: totalBytes}})
}
}()

Expand Down
8 changes: 5 additions & 3 deletions pkg/datapath/file_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func TestAsyncBackup(t *testing.T) {
SnapshotID: "fake-snapshot",
EmptySnapshot: false,
Source: AccessPoint{ByPath: "fake-path"},
TotalBytes: 1000,
},
},
path: "fake-path",
Expand All @@ -95,7 +96,7 @@ func TestAsyncBackup(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.result.Backup.TotalBytes, test.err)
mockProvider.On("Close", mock.Anything).Return(nil)
fs.uploaderProv = mockProvider
fs.initialized = true
Expand Down Expand Up @@ -167,7 +168,8 @@ func TestAsyncRestore(t *testing.T) {
},
result: Result{
Restore: RestoreResult{
Target: AccessPoint{ByPath: "fake-path"},
Target: AccessPoint{ByPath: "fake-path"},
TotalBytes: 1000,
},
},
path: "fake-path",
Expand All @@ -179,7 +181,7 @@ func TestAsyncRestore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err)
mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Restore.TotalBytes, test.err)
mockProvider.On("Close", mock.Anything).Return(nil)
fs.uploaderProv = mockProvider
fs.initialized = true
Expand Down
17 changes: 16 additions & 1 deletion pkg/datapath/micro_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@
logger.Info("Calling callback on data path pod termination")

if lastPod.Status.Phase == v1.PodSucceeded {
ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log))
result := funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log)
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, getCompletionProgressFromResult(ms.taskType, result))
ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, result)
} else {
if strings.HasSuffix(terminateMessage, ErrCancelled) {
ms.callbacks.OnCancelled(ms.ctx, ms.namespace, ms.taskName)
Expand Down Expand Up @@ -390,6 +392,19 @@
return progress
}

func getCompletionProgressFromResult(taskType string, result Result) *uploader.Progress {
progress := &uploader.Progress{}
if taskType == TaskTypeBackup {
progress.BytesDone = result.Backup.TotalBytes
progress.TotalBytes = result.Backup.TotalBytes

Check warning on line 399 in pkg/datapath/micro_service_watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/micro_service_watcher.go#L398-L399

Added lines #L398 - L399 were not covered by tests
} else {
progress.BytesDone = result.Restore.TotalBytes
progress.TotalBytes = result.Restore.TotalBytes
}

return progress
}

func (ms *microServiceBRWatcher) Cancel() {
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is canceled")
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/datapath/micro_service_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func TestStartWatch(t *testing.T) {
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 1,
},
{
name: "completed",
Expand All @@ -223,6 +224,7 @@ func TestStartWatch(t *testing.T) {
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 1,
},
{
name: "completed with redirect error",
Expand All @@ -244,6 +246,7 @@ func TestStartWatch(t *testing.T) {
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 1,
},
{
name: "complete but terminated event not received in time",
Expand All @@ -263,6 +266,7 @@ func TestStartWatch(t *testing.T) {
},
expectStartEvent: true,
expectComplete: true,
expectProgress: 1,
},
{
name: "complete but terminated event not received immediately",
Expand All @@ -286,6 +290,7 @@ func TestStartWatch(t *testing.T) {
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 1,
},
{
name: "completed with progress",
Expand Down Expand Up @@ -313,7 +318,7 @@ func TestStartWatch(t *testing.T) {
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 2,
expectProgress: 3,
},
{
name: "failed",
Expand Down
4 changes: 3 additions & 1 deletion pkg/datapath/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type BackupResult struct {
SnapshotID string `json:"snapshotID"`
EmptySnapshot bool `json:"emptySnapshot"`
Source AccessPoint `json:"source,omitempty"`
TotalBytes int64 `json:"totalBytes,omitempty"`
}

// RestoreResult represents the result of a restore
type RestoreResult struct {
Target AccessPoint `json:"target,omitempty"`
Target AccessPoint `json:"target,omitempty"`
TotalBytes int64 `json:"totalBytes,omitempty"`
}

// Callbacks defines the collection of callbacks during backup/restore
Expand Down
20 changes: 10 additions & 10 deletions pkg/uploader/provider/kopia.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, error) {
updater uploader.ProgressUpdater) (string, bool, int64, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
return "", false, 0, errors.New("Need to initial backup progress updater first")

Check warning on line 125 in pkg/uploader/provider/kopia.go

View check run for this annotation

Codecov / codecov/patch

pkg/uploader/provider/kopia.go#L125

Added line #L125 was not covered by tests
}

if path == "" {
return "", false, errors.New("path is empty")
return "", false, 0, errors.New("path is empty")

Check warning on line 129 in pkg/uploader/provider/kopia.go

View check run for this annotation

Codecov / codecov/patch

pkg/uploader/provider/kopia.go#L129

Added line #L129 was not covered by tests
}

log := kp.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -175,9 +175,9 @@

if kpUploader.IsCanceled() {
log.Warn("Kopia backup is canceled")
return snapshotID, false, ErrorCanceled
return snapshotID, false, 0, ErrorCanceled

Check warning on line 178 in pkg/uploader/provider/kopia.go

View check run for this annotation

Codecov / codecov/patch

pkg/uploader/provider/kopia.go#L178

Added line #L178 was not covered by tests
}
return snapshotID, false, errors.Wrapf(err, "Failed to run kopia backup")
return snapshotID, false, 0, errors.Wrapf(err, "Failed to run kopia backup")
}

// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
Expand All @@ -189,7 +189,7 @@
)

log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size)
return snapshotInfo.ID, false, nil
return snapshotInfo.ID, false, snapshotInfo.Size, nil
}

func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) {
Expand All @@ -211,7 +211,7 @@
volumePath string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) error {
updater uploader.ProgressUpdater) (int64, error) {
log := kp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
"volumePath": volumePath,
Expand All @@ -235,12 +235,12 @@
size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, volMode, uploaderCfg, log, restoreCancel)

if err != nil {
return errors.Wrapf(err, "Failed to run kopia restore")
return 0, errors.Wrapf(err, "Failed to run kopia restore")
}

if atomic.LoadInt32(&kp.canceling) == 1 {
log.Error("Kopia restore is canceled")
return ErrorCanceled
return 0, ErrorCanceled

Check warning on line 243 in pkg/uploader/provider/kopia.go

View check run for this annotation

Codecov / codecov/patch

pkg/uploader/provider/kopia.go#L243

Added line #L243 was not covered by tests
}

// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
Expand All @@ -253,5 +253,5 @@

log.Info(output)

return nil
return size, nil
}
4 changes: 2 additions & 2 deletions pkg/uploader/provider/kopia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestRunBackup(t *testing.T) {
tc.volMode = uploader.PersistentVolumeFilesystem
}
BackupFunc = tc.hookBackupFunc
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater)
_, _, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater)
if tc.notError {
assert.NoError(t, err)
} else {
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestRunRestore(t *testing.T) {
tc.volMode = uploader.PersistentVolumeFilesystem
}
RestoreFunc = tc.hookRestoreFunc
err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, map[string]string{}, &updater)
_, err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, map[string]string{}, &updater)
if tc.notError {
assert.NoError(t, err)
} else {
Expand Down
62 changes: 45 additions & 17 deletions pkg/uploader/provider/mocks/Provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading