Skip to content

Commit

Permalink
fix(backup): enqueue volumes with correct names
Browse files Browse the repository at this point in the history
In `enqueueVolumesForBackupVolume` of volume controller, it needs
to use `BackupVolume.Spec.VolumeName` to get volume CR instead of
`BackupVolume.Name`.

ref: longhorn/longhorn 5411, 10057

Signed-off-by: James Lu <[email protected]>
  • Loading branch information
mantissahz authored and derekbit committed Dec 26, 2024
1 parent 1b30ba2 commit b4418eb
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 6 deletions.
9 changes: 9 additions & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,15 @@ func newStorageClass(name, provisioner string) *storagev1.StorageClass {
}
}

func newSnapshot(name string) *longhorn.Snapshot {
return &longhorn.Snapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: TestNamespace,
},
}
}

func newBackup(name string) *longhorn.Backup {
return &longhorn.Backup{
ObjectMeta: metav1.ObjectMeta{
Expand Down
20 changes: 19 additions & 1 deletion controller/system_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func (c *SystemBackupController) WaitForVolumeBackupToComplete(backups map[strin
for name := range backups {
// Retrieve the latest backup
var backup *longhorn.Backup
backup, err = c.ds.GetBackup(name)
backup, err = c.ds.GetBackupRO(name)
if err != nil {
if apierrors.IsNotFound(err) {
log.Warnf("Backup %v not found when checking the volume backup status in system backup", name)
Expand All @@ -845,6 +845,9 @@ func (c *SystemBackupController) WaitForVolumeBackupToComplete(backups map[strin

switch backup.Status.State {
case longhorn.BackupStateCompleted:
if !c.isVolumeLastBackupSynced(backup) {
continue
}
delete(backups, name)
case longhorn.BackupStateError:
return errors.Wrapf(fmt.Errorf("%s", backup.Status.Error), "failed creating Volume backup %v", name)
Expand All @@ -864,6 +867,21 @@ func (c *SystemBackupController) WaitForVolumeBackupToComplete(backups map[strin
return fmt.Errorf("unexpected error: stopped waiting for Volume backups without completing, failing or timing out")
}

func (c *SystemBackupController) isVolumeLastBackupSynced(backup *longhorn.Backup) bool {
snapshot, err := c.ds.GetSnapshotRO(backup.Status.SnapshotName)
if err != nil {
c.logger.WithError(err).Warnf("Failed to get snapshot %v for backup %v", backup.Status.SnapshotName, backup.Name)
return false
}
volume, err := c.ds.GetVolumeRO(snapshot.Spec.Volume)
if err != nil {
c.logger.WithError(err).Warnf("Failed to get volume %v for snapshot %v", snapshot.Spec.Volume, snapshot.Name)
return false
}

return volume.Status.LastBackup == backup.Name
}

func (c *SystemBackupController) isVolumeBackupUpToDate(volume *longhorn.Volume, systemBackup *longhorn.SystemBackup) (bool, error) {
log := getLoggerForSystemBackup(c.logger, systemBackup)
log = log.WithField("volume", volume.Name)
Expand Down
6 changes: 6 additions & 0 deletions controller/system_backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,13 @@ func (s *TestSuite) TestReconcileSystemBackup(c *C) {

for _, backup := range backups {
backup.Status.State = longhorn.BackupStateCompleted
backup.Status.SnapshotName = backup.Spec.SnapshotName
snapshot, err := lhClient.LonghornV1beta2().Snapshots(TestNamespace).Get(context.TODO(), backup.Status.SnapshotName, metav1.GetOptions{})
c.Assert(err, IsNil)
tc.existVolumes[SystemRolloutCRName(snapshot.Spec.Volume)].Status.LastBackup = backup.Name
fakeSystemRolloutSnapshot(snapshot, c, informerFactories.LhInformerFactory, lhClient)
}
fakeSystemRolloutVolumes(tc.existVolumes, c, informerFactories.LhInformerFactory, lhClient)
fakeSystemRolloutBackups(backups, c, informerFactories.LhInformerFactory, lhClient)
err = systemBackupController.WaitForVolumeBackupToComplete(backups, systemBackup)
c.Assert(err, IsNil)
Expand Down
32 changes: 32 additions & 0 deletions controller/system_restore_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,38 @@ func fakeSystemRolloutSettingDefaultEngineImage(c *C, informerFactory lhinformer
fakeSystemRolloutSettings(setting, c, informerFactory, client)
}

func fakeSystemRolloutSnapshot(fakeObj *longhorn.Snapshot, c *C, informerFactory lhinformers.SharedInformerFactory, client *lhfake.Clientset) {
indexer := informerFactory.Longhorn().V1beta2().Snapshots().Informer().GetIndexer()

clientInterface := client.LonghornV1beta2().Snapshots(TestNamespace)

exists, err := clientInterface.List(context.TODO(), metav1.ListOptions{})
c.Assert(err, IsNil)

for _, exist := range exists.Items {
if exist.Name != fakeObj.Name {
continue
}
exist, err := clientInterface.Get(context.TODO(), exist.Name, metav1.GetOptions{})
c.Assert(err, IsNil)

err = clientInterface.Delete(context.TODO(), exist.Name, metav1.DeleteOptions{})
c.Assert(err, IsNil)

err = indexer.Delete(exist)
c.Assert(err, IsNil)
}

snap := newSnapshot(fakeObj.Name)
snap.Spec.Volume = fakeObj.Spec.Volume
snap.Status = fakeObj.Status
exist, err := clientInterface.Create(context.TODO(), snap, metav1.CreateOptions{})
c.Assert(err, IsNil)

err = indexer.Add(exist)
c.Assert(err, IsNil)
}

func fakeSystemRolloutStorageClasses(fakeObjs map[SystemRolloutCRName]*storagev1.StorageClass, c *C, informerFactory informers.SharedInformerFactory, client *fake.Clientset) {
indexer := informerFactory.Storage().V1().StorageClasses().Informer().GetIndexer()

Expand Down
15 changes: 10 additions & 5 deletions controller/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4694,23 +4694,28 @@ func (c *VolumeController) enqueueVolumesForBackupVolume(obj interface{}) {

// Update last backup for the volume name matches backup volume name
var matchedVolumeName string
_, err := c.ds.GetVolumeRO(bv.Name)
canonicalBackupVolumeName := bv.Spec.VolumeName
backupTargetName := bv.Spec.BackupTargetName
_, err := c.ds.GetVolumeRO(canonicalBackupVolumeName)
if err == nil {
matchedVolumeName = bv.Name
key := bv.Namespace + "/" + bv.Name
matchedVolumeName = canonicalBackupVolumeName
key := bv.Namespace + "/" + canonicalBackupVolumeName
c.queue.Add(key)
}

// Update last backup for DR volumes
volumes, err := c.ds.ListDRVolumesWithBackupVolumeNameRO(bv.Name)
volumes, err := c.ds.ListDRVolumesWithBackupVolumeNameRO(canonicalBackupVolumeName)
if err != nil {
return
}
for volumeName := range volumes {
for volumeName, volume := range volumes {
if volumeName == matchedVolumeName {
// Skip the volume which be enqueued already
continue
}
if backupTargetName != volume.Spec.BackupTargetName {
continue
}

key := bv.Namespace + "/" + volumeName
c.queue.Add(key)
Expand Down

0 comments on commit b4418eb

Please sign in to comment.