diff --git a/api/replication.storage/v1alpha1/volumegroupreplication_types.go b/api/replication.storage/v1alpha1/volumegroupreplication_types.go index 8ad2bff70..6a2b1d60e 100644 --- a/api/replication.storage/v1alpha1/volumegroupreplication_types.go +++ b/api/replication.storage/v1alpha1/volumegroupreplication_types.go @@ -21,6 +21,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + VolumeGroupReplicationNameAnnotation = "replication.storage.openshift.io/volume-group-replication-name" +) + // VolumeGroupReplicationSpec defines the desired state of VolumeGroupReplication type VolumeGroupReplicationSpec struct { // volumeGroupReplicationClassName is the volumeGroupReplicationClass name for this VolumeGroupReplication resource @@ -28,9 +32,10 @@ type VolumeGroupReplicationSpec struct { // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="volumeGroupReplicationClassName is immutable" VolumeGroupReplicationClassName string `json:"volumeGroupReplicationClassName"` - // volumeReplicationClassName is the volumeReplicationClass name for VolumeReplication object + // volumeReplicationClassName is the volumeReplicationClass name for the VolumeReplication object + // created for this volumeGroupReplication // +kubebuilder:validation:Required - // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="volumReplicationClassName is immutable" + // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="volumeReplicationClassName is immutable" VolumeReplicationClassName string `json:"volumeReplicationClassName"` // Name of the VolumeReplication object created for this volumeGroupReplication diff --git a/api/replication.storage/v1alpha1/volumegroupreplicationcontent_types.go b/api/replication.storage/v1alpha1/volumegroupreplicationcontent_types.go index fcbb7a95a..17db7c894 100644 --- a/api/replication.storage/v1alpha1/volumegroupreplicationcontent_types.go +++ b/api/replication.storage/v1alpha1/volumegroupreplicationcontent_types.go @@ -21,6 +21,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + VolumeGroupReplicationContentNameAnnotation = "replication.storage.openshift.io/volumegroupreplication-content-name" +) + // VolumeGroupReplicationContentSpec defines the desired state of VolumeGroupReplicationContent type VolumeGroupReplicationContentSpec struct { // VolumeGroupreplicationRef specifies the VolumeGroupReplication object to which this @@ -52,9 +56,8 @@ type VolumeGroupReplicationContentSpec struct { // +optional VolumeGroupReplicationClassName string `json:"volumeGroupReplicationClassName"` - // Source specifies whether the snapshot is (or should be) dynamically provisioned + // Source specifies whether the volume is (or should be) dynamically provisioned // or already exists, and just requires a Kubernetes object representation. - // This field is immutable after creation. // Required. Source VolumeGroupReplicationContentSource `json:"source"` } @@ -68,7 +71,7 @@ type VolumeGroupReplicationContentSource struct { // VolumeGroupReplicationContentStatus defines the status of VolumeGroupReplicationContent type VolumeGroupReplicationContentStatus struct { - // PersistentVolumeRefList is the list of of PV for the group replication + // PersistentVolumeRefList is the list of PV for the group replication // The maximum number of allowed PV in the group is 100. // +optional PersistentVolumeRefList []corev1.LocalObjectReference `json:"persistentVolumeRefList,omitempty"` diff --git a/cmd/manager/main.go b/cmd/manager/main.go index df218544b..8aa629f76 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -212,8 +212,10 @@ func main() { os.Exit(1) } if err = (&replicationController.VolumeGroupReplicationContentReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Connpool: connPool, + Timeout: defaultTimeout, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "VolumeGroupReplicationContent") os.Exit(1) diff --git a/internal/controller/replication.storage/finalizers.go b/internal/controller/replication.storage/finalizers.go index ce880595f..bf3afdb1d 100644 --- a/internal/controller/replication.storage/finalizers.go +++ b/internal/controller/replication.storage/finalizers.go @@ -26,6 +26,7 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -65,43 +66,44 @@ func (r *VolumeReplicationReconciler) removeFinalizerFromVR(logger logr.Logger, return nil } -// addFinalizerToPVC adds the VR finalizer on the PersistentVolumeClaim. -func (r *VolumeReplicationReconciler) addFinalizerToPVC(logger logr.Logger, pvc *corev1.PersistentVolumeClaim) error { - if !slices.Contains(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer) { - logger.Info("adding finalizer to PersistentVolumeClaim object", "Finalizer", pvcReplicationFinalizer) - pvc.ObjectMeta.Finalizers = append(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer) - if err := r.Client.Update(context.TODO(), pvc); err != nil { +// AddFinalizerToPVC adds the VR finalizer on the PersistentVolumeClaim. +func AddFinalizerToPVC(client client.Client, logger logr.Logger, pvc *corev1.PersistentVolumeClaim, + replicationFinalizer string) error { + if !slices.Contains(pvc.ObjectMeta.Finalizers, replicationFinalizer) { + logger.Info("adding finalizer to PersistentVolumeClaim object", "Finalizer", replicationFinalizer) + pvc.ObjectMeta.Finalizers = append(pvc.ObjectMeta.Finalizers, replicationFinalizer) + if err := client.Update(context.TODO(), pvc); err != nil { return fmt.Errorf("failed to add finalizer (%s) to PersistentVolumeClaim resource"+ " (%s/%s) %w", - pvcReplicationFinalizer, pvc.Namespace, pvc.Name, err) + replicationFinalizer, pvc.Namespace, pvc.Name, err) } } return nil } -// removeFinalizerFromPVC removes the VR finalizer on PersistentVolumeClaim. -func (r *VolumeReplicationReconciler) removeFinalizerFromPVC(logger logr.Logger, pvc *corev1.PersistentVolumeClaim, -) error { - if slices.Contains(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer) { - logger.Info("removing finalizer from PersistentVolumeClaim object", "Finalizer", pvcReplicationFinalizer) - pvc.ObjectMeta.Finalizers = util.RemoveFromSlice(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer) - if err := r.Client.Update(context.TODO(), pvc); err != nil { +// RemoveFinalizerFromPVC removes the VR finalizer on PersistentVolumeClaim. +func RemoveFinalizerFromPVC(client client.Client, logger logr.Logger, pvc *corev1.PersistentVolumeClaim, + replicationFinalizer string) error { + if slices.Contains(pvc.ObjectMeta.Finalizers, replicationFinalizer) { + logger.Info("removing finalizer from PersistentVolumeClaim object", "Finalizer", replicationFinalizer) + pvc.ObjectMeta.Finalizers = util.RemoveFromSlice(pvc.ObjectMeta.Finalizers, replicationFinalizer) + if err := client.Update(context.TODO(), pvc); err != nil { return fmt.Errorf("failed to remove finalizer (%s) from PersistentVolumeClaim resource"+ " (%s/%s), %w", - pvcReplicationFinalizer, pvc.Namespace, pvc.Name, err) + replicationFinalizer, pvc.Namespace, pvc.Name, err) } } return nil } -// addFinalizerToVGR adds the VR finalizer on the VolumeGroupReplication. -func (r *VolumeReplicationReconciler) addFinalizerToVGR(logger logr.Logger, vgr *replicationv1alpha1.VolumeGroupReplication) error { +// AddFinalizerToVGR adds the VGR finalizer on the VolumeGroupReplication resource +func AddFinalizerToVGR(client client.Client, logger logr.Logger, vgr *replicationv1alpha1.VolumeGroupReplication) error { if !slices.Contains(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer) { - logger.Info("adding finalizer to VolumeGroupReplication object", "Finalizer", vgrReplicationFinalizer) + logger.Info("adding finalizer to volumeGroupReplication object", "Finalizer", vgrReplicationFinalizer) vgr.ObjectMeta.Finalizers = append(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer) - if err := r.Client.Update(context.TODO(), vgr); err != nil { + if err := client.Update(context.TODO(), vgr); err != nil { return fmt.Errorf("failed to add finalizer (%s) to VolumeGroupReplication resource"+ " (%s/%s) %w", vgrReplicationFinalizer, vgr.Namespace, vgr.Name, err) @@ -111,13 +113,18 @@ func (r *VolumeReplicationReconciler) addFinalizerToVGR(logger logr.Logger, vgr return nil } -// removeFinalizerFromVGR removes the VR finalizer on VolumeGroupReplication. -func (r *VolumeReplicationReconciler) removeFinalizerFromVGR(logger logr.Logger, vgr *replicationv1alpha1.VolumeGroupReplication, -) error { +// RemoveFinalizerFromVGR removes the VGR finalizer from the VolumeGroupReplication instance. +func RemoveFinalizerFromVGR(client client.Client, logger logr.Logger, vgr *replicationv1alpha1.VolumeGroupReplication) error { if slices.Contains(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer) { - logger.Info("removing finalizer from VolumeGroupReplication object", "Finalizer", vgrReplicationFinalizer) + logger.Info("removing finalizer from volumeGroupReplication object", "Finalizer", vgrReplicationFinalizer) + // Check if owner annotations are removed from the VGR resource + if vgr.Annotations[replicationv1alpha1.VolumeGroupReplicationContentNameAnnotation] != "" || + vgr.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation] != "" { + return fmt.Errorf("failed to remove finalizer from volumeGroupReplication object"+ + ",dependent resources are not yet deleted (%s/%s)", vgr.Namespace, vgr.Name) + } vgr.ObjectMeta.Finalizers = util.RemoveFromSlice(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer) - if err := r.Client.Update(context.TODO(), vgr); err != nil { + if err := client.Update(context.TODO(), vgr); err != nil { return fmt.Errorf("failed to remove finalizer (%s) from VolumeGroupReplication resource"+ " (%s/%s), %w", vgrReplicationFinalizer, vgr.Namespace, vgr.Name, err) @@ -126,3 +133,35 @@ func (r *VolumeReplicationReconciler) removeFinalizerFromVGR(logger logr.Logger, return nil } + +// addFinalizerToVGRContent adds the VGR finalizer on the VolumeGroupReplicationContent resource +func (r *VolumeGroupReplicationContentReconciler) addFinalizerToVGRContent(logger logr.Logger, + vgrContent *replicationv1alpha1.VolumeGroupReplicationContent) error { + if !slices.Contains(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer) { + logger.Info("adding finalizer to volumeGroupReplicationContent object", "Finalizer", vgrReplicationFinalizer) + vgrContent.ObjectMeta.Finalizers = append(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer) + if err := r.Client.Update(context.TODO(), vgrContent); err != nil { + return fmt.Errorf("failed to add finalizer (%s) to VolumeGroupReplicationContent resource"+ + " (%s/%s) %w", + vgrReplicationFinalizer, vgrContent.Namespace, vgrContent.Name, err) + } + } + + return nil +} + +// removeFinalizerFromVGRContent removes the VGR finalizer from the VolumeGroupReplicationContent instance. +func (r *VolumeGroupReplicationContentReconciler) removeFinalizerFromVGRContent(logger logr.Logger, + vgrContent *replicationv1alpha1.VolumeGroupReplicationContent) error { + if slices.Contains(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer) { + logger.Info("removing finalizer from volumeGroupReplicationContent object", "Finalizer", vgrReplicationFinalizer) + vgrContent.ObjectMeta.Finalizers = util.RemoveFromSlice(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer) + if err := r.Client.Update(context.TODO(), vgrContent); err != nil { + return fmt.Errorf("failed to remove finalizer (%s) from VolumeGroupReplicationContent resource"+ + " (%s/%s), %w", + vgrReplicationFinalizer, vgrContent.Namespace, vgrContent.Name, err) + } + } + + return nil +} diff --git a/internal/controller/replication.storage/parameters.go b/internal/controller/replication.storage/parameters.go index 1f3f58404..08182c73f 100644 --- a/internal/controller/replication.storage/parameters.go +++ b/internal/controller/replication.storage/parameters.go @@ -29,8 +29,10 @@ const ( // Driver. replicationParameterPrefix = "replication.storage.openshift.io/" - prefixedReplicationSecretNameKey = replicationParameterPrefix + "replication-secret-name" // name key for secret - prefixedReplicationSecretNamespaceKey = replicationParameterPrefix + "replication-secret-namespace" // namespace key secret + prefixedReplicationSecretNameKey = replicationParameterPrefix + "replication-secret-name" // name key for secret + prefixedReplicationSecretNamespaceKey = replicationParameterPrefix + "replication-secret-namespace" // namespace key secret + prefixedGroupReplicationSecretNameKey = replicationParameterPrefix + "group-replication-secret-name" // name key for secret + prefixedGroupReplicationSecretNamespaceKey = replicationParameterPrefix + "group-replication-secret-namespace" // namespace key secret ) // filterPrefixedParameters removes all the reserved keys from the @@ -53,10 +55,14 @@ func validatePrefixedParameters(param map[string]string) error { if strings.HasPrefix(k, replicationParameterPrefix) { switch k { case prefixedReplicationSecretNameKey: + fallthrough + case prefixedGroupReplicationSecretNameKey: if v == "" { return errors.New("secret name cannot be empty") } case prefixedReplicationSecretNamespaceKey: + fallthrough + case prefixedGroupReplicationSecretNamespaceKey: if v == "" { return errors.New("secret namespace cannot be empty") } diff --git a/internal/controller/replication.storage/pvc.go b/internal/controller/replication.storage/pvc.go index 4f35321c2..4d0405bf9 100644 --- a/internal/controller/replication.storage/pvc.go +++ b/internal/controller/replication.storage/pvc.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/replication.storage/v1alpha1" ) @@ -59,22 +60,27 @@ func (r VolumeReplicationReconciler) getPVCDataSource(logger logr.Logger, req ty return pvc, pv, nil } -// annotatePVCWithOwner will add the VolumeReplication details to the PVC annotations. -func (r *VolumeReplicationReconciler) annotatePVCWithOwner(ctx context.Context, logger logr.Logger, reqOwnerName string, pvc *corev1.PersistentVolumeClaim) error { +// AnnotatePVCWithOwner will add the VolumeReplication/VolumeGroupReplication details to the PVC annotations. +func AnnotatePVCWithOwner(client client.Client, logger logr.Logger, reqOwnerName string, + pvc *corev1.PersistentVolumeClaim, pvcAnnotation string) error { if pvc.ObjectMeta.Annotations == nil { pvc.ObjectMeta.Annotations = map[string]string{} } - currentOwnerName := pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation] + if pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation] != "" && + pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeGroupReplicationNameAnnotation] != "" { + logger.Info("PVC can't be part of both VolumeGroupReplication and VolumeReplication") + return fmt.Errorf("PVC %q can't be owned by both VolumeReplication and VolumeGroupReplication", pvc.Name) + } + + currentOwnerName := pvc.ObjectMeta.Annotations[pvcAnnotation] if currentOwnerName == "" { logger.Info("setting owner on PVC annotation", "Name", pvc.Name, "owner", reqOwnerName) - pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation] = reqOwnerName - err := r.Update(ctx, pvc) + pvc.ObjectMeta.Annotations[pvcAnnotation] = reqOwnerName + err := client.Update(context.TODO(), pvc) if err != nil { logger.Error(err, "Failed to update PVC annotation", "Name", pvc.Name) - - return fmt.Errorf("failed to update PVC %q annotation for VolumeReplication: %w", - pvc.Name, err) + return fmt.Errorf("failed to update PVC %q annotation for replication: %w", pvc.Name, err) } return nil @@ -86,22 +92,23 @@ func (r *VolumeReplicationReconciler) annotatePVCWithOwner(ctx context.Context, "current owner", currentOwnerName, "requested owner", reqOwnerName) - return fmt.Errorf("PVC %q not owned by VolumeReplication %q", + return fmt.Errorf("PVC %q not owned by correct VolumeReplication/VolumeGroupReplication %q", pvc.Name, reqOwnerName) } return nil } -// removeOwnerFromPVCAnnotation removes the VolumeReplication owner from the PVC annotations. -func (r *VolumeReplicationReconciler) removeOwnerFromPVCAnnotation(ctx context.Context, logger logr.Logger, pvc *corev1.PersistentVolumeClaim) error { - if _, ok := pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation]; ok { - logger.Info("removing owner annotation from PersistentVolumeClaim object", "Annotation", replicationv1alpha1.VolumeReplicationNameAnnotation) - delete(pvc.ObjectMeta.Annotations, replicationv1alpha1.VolumeReplicationNameAnnotation) - if err := r.Client.Update(ctx, pvc); err != nil { +// RemoveOwnerFromPVCAnnotation removes the VolumeReplication/VolumeGroupReplication owner from the PVC annotations. +func RemoveOwnerFromPVCAnnotation(client client.Client, logger logr.Logger, pvc *corev1.PersistentVolumeClaim, + pvcAnnotation string) error { + if _, ok := pvc.ObjectMeta.Annotations[pvcAnnotation]; ok { + logger.Info("removing owner annotation from PersistentVolumeClaim object", "Annotation", pvcAnnotation) + delete(pvc.ObjectMeta.Annotations, pvcAnnotation) + if err := client.Update(context.TODO(), pvc); err != nil { return fmt.Errorf("failed to remove annotation %q from PersistentVolumeClaim "+ "%q %w", - replicationv1alpha1.VolumeReplicationNameAnnotation, pvc.Name, err) + pvcAnnotation, pvc.Name, err) } } diff --git a/internal/controller/replication.storage/pvc_test.go b/internal/controller/replication.storage/pvc_test.go index 1e2926fcf..ecac23aed 100644 --- a/internal/controller/replication.storage/pvc_test.go +++ b/internal/controller/replication.storage/pvc_test.go @@ -18,6 +18,7 @@ package controller import ( "context" + "fmt" "testing" replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/replication.storage/v1alpha1" @@ -178,6 +179,7 @@ func TestGetVolumeHandle(t *testing.T) { func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) { t.Parallel() vrName := "test-vr" + vrNamespace := "test-ns" testcases := []struct { name string @@ -196,7 +198,7 @@ func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) { Name: "pvc-name", Namespace: mockNamespace, Annotations: map[string]string{ - replicationv1alpha1.VolumeReplicationNameAnnotation: vrName, + replicationv1alpha1.VolumeReplicationNameAnnotation: fmt.Sprintf("%s/%s", vrName, vrNamespace), }, }, }, @@ -220,13 +222,15 @@ func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) { for _, tc := range testcases { volumeReplication := &replicationv1alpha1.VolumeReplication{} mockVolumeReplicationObj.DeepCopyInto(volumeReplication) + volumeReplication.Name = vrName testPVC := &corev1.PersistentVolumeClaim{} tc.pvc.DeepCopyInto(testPVC) ctx := context.TODO() reconciler := createFakeVolumeReplicationReconciler(t, testPVC, volumeReplication) - err := reconciler.annotatePVCWithOwner(ctx, log.FromContext(context.TODO()), vrName, testPVC) + reqOwner := fmt.Sprintf("%s/%s", volumeReplication.Name, volumeReplication.Namespace) + err := AnnotatePVCWithOwner(reconciler.Client, log.FromContext(context.TODO()), reqOwner, testPVC, replicationv1alpha1.VolumeReplicationNameAnnotation) if tc.errorExpected { assert.Error(t, err) } else { @@ -241,14 +245,14 @@ func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) { err = reconciler.Get(ctx, pvcNamespacedName, testPVC) assert.NoError(t, err) - assert.Equal(t, testPVC.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation], vrName) + assert.Equal(t, testPVC.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation], reqOwner) } - err = reconciler.removeOwnerFromPVCAnnotation(context.TODO(), log.FromContext(context.TODO()), testPVC) + err = RemoveOwnerFromPVCAnnotation(reconciler.Client, log.FromContext(context.TODO()), testPVC, replicationv1alpha1.VolumeReplicationNameAnnotation) assert.NoError(t, err) // try calling delete again, it should not fail - err = reconciler.removeOwnerFromPVCAnnotation(context.TODO(), log.FromContext(context.TODO()), testPVC) + err = RemoveOwnerFromPVCAnnotation(reconciler.Client, log.FromContext(context.TODO()), testPVC, replicationv1alpha1.VolumeReplicationNameAnnotation) assert.NoError(t, err) } @@ -262,6 +266,6 @@ func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) { } volumeReplication := &replicationv1alpha1.VolumeReplication{} reconciler := createFakeVolumeReplicationReconciler(t, pvc, volumeReplication) - err := reconciler.removeOwnerFromPVCAnnotation(context.TODO(), log.FromContext(context.TODO()), pvc) + err := RemoveOwnerFromPVCAnnotation(reconciler.Client, log.FromContext(context.TODO()), pvc, replicationv1alpha1.VolumeReplicationNameAnnotation) assert.NoError(t, err) } diff --git a/internal/controller/replication.storage/utils.go b/internal/controller/replication.storage/utils.go new file mode 100644 index 000000000..714189392 --- /dev/null +++ b/internal/controller/replication.storage/utils.go @@ -0,0 +1,73 @@ +/* +Copyright 2024 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "time" + + replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/replication.storage/v1alpha1" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func GetReplicationState(instanceState replicationv1alpha1.ReplicationState) replicationv1alpha1.State { + switch instanceState { + case replicationv1alpha1.Primary: + return replicationv1alpha1.PrimaryState + case replicationv1alpha1.Secondary: + return replicationv1alpha1.SecondaryState + case replicationv1alpha1.Resync: + return replicationv1alpha1.SecondaryState + } + + return replicationv1alpha1.UnknownState +} + +func GetCurrentReplicationState(instanceStatusState replicationv1alpha1.State) replicationv1alpha1.State { + if instanceStatusState == "" { + return replicationv1alpha1.UnknownState + } + + return instanceStatusState +} + +func WaitForVolumeReplicationResource(client client.Client, logger logr.Logger, resourceName string) error { + unstructuredResource := &unstructured.UnstructuredList{} + unstructuredResource.SetGroupVersionKind(schema.GroupVersionKind{ + Group: replicationv1alpha1.GroupVersion.Group, + Kind: resourceName, + Version: replicationv1alpha1.GroupVersion.Version, + }) + for { + err := client.List(context.TODO(), unstructuredResource) + if err == nil { + return nil + } + // return errors other than NoMatch + if !meta.IsNoMatchError(err) { + logger.Error(err, "got an unexpected error while waiting for resource", "Resource", resourceName) + return err + } + logger.Info("resource does not exist", "Resource", resourceName) + time.Sleep(5 * time.Second) + } +} diff --git a/internal/controller/replication.storage/volumegroupreplication_controller.go b/internal/controller/replication.storage/volumegroupreplication_controller.go index 3426fef2a..18c2379a7 100644 --- a/internal/controller/replication.storage/volumegroupreplication_controller.go +++ b/internal/controller/replication.storage/volumegroupreplication_controller.go @@ -18,13 +18,37 @@ package controller import ( "context" + "fmt" + "reflect" + "strings" + "github.com/csi-addons/kubernetes-csi-addons/internal/controller/replication.storage/replication" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/replication.storage/v1alpha1" + "github.com/go-logr/logr" +) + +const ( + volumeGroupReplicationClass = "VolumeGroupReplicationClass" + volumeGroupReplication = "VolumeGroupReplication" + volumeGroupReplicationContent = "VolumeGroupReplicationContent" + volumeGroupReplicationRef = "replication.storage.openshift.io/volumegroupref" ) // VolumeGroupReplicationReconciler reconciles a VolumeGroupReplication object @@ -33,21 +57,580 @@ type VolumeGroupReplicationReconciler struct { Scheme *runtime.Scheme } -//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplications,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplications,verbs=get;list;watch;update;patch //+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplications/status,verbs=get;update;patch //+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplications/finalizers,verbs=update +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplicationclasses,verbs=get;list;watch +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplicationcontents,verbs=get;list;watch;create;update;delete +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications,verbs=get;list;watch;create;update;delete +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications/status,verbs=get;list +//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims;persistentvolumes,verbs=get;list;watch;update +//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims/finalizers,verbs=update + +/* +Steps performed by the reconcile loop: +- Fetch and validate the VGRClass CR +- Fetch the matching PVCs based on the selector provided in the VGR CR, and check if they are already bounded to a CSI volume and the driver matches the driver provided in the VGRClass CR. +- Annotate to the PVCs with owner and add the VGR finalizer to them. +- Add the label selector to the VGR annotation, such that the PVC triggering a reconcile can fetch the VGR to reconcile +- Create the VGRContent with the PVs list fetched above, add VGR name/namespace as the annotation to it +- Wait for the volumes to be grouped, and the VGRContent to be updated with the group handle +- Then, create the VR CR and add VGR name/namespace as the annotation to it +- Update the VGR status with the VR status. + +In case of deletion: +- Remove the owner annotations and finalizers from the PVC +- Check if VR exists, then delete +- Check if VGRContent exists, then delete +- Remove VGR finalizer <- This won't happen until the dependent VR and VRContent is deleted. Validated using owner annotations set in both the dependent CRs. +*/ // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (r *VolumeGroupReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) + logger := log.FromContext(ctx, "Request.Name", req.Name, "Request.Namespace", req.Namespace) + + // Fetch VolumeGroupReplication instance + instance := &replicationv1alpha1.VolumeGroupReplication{} + err := r.Client.Get(ctx, req.NamespacedName, instance) + if err != nil { + if errors.IsNotFound(err) { + logger.Info("volumeGroupReplication resource not found") + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + + // Get VolumeGroupReplicationClass instance + vgrClassObj, err := r.getVolumeGroupReplicationClass(logger, instance.Spec.VolumeGroupReplicationClassName) + if err != nil { + logger.Error(err, "failed to get volumeGroupReplicationClass resource", "VGRClassName", instance.Spec.VolumeGroupReplicationClassName) + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + + // Validate that required parameters are present in the VGRClass resource + err = validatePrefixedParameters(vgrClassObj.Spec.Parameters) + if err != nil { + logger.Error(err, "failed to validate parameters of volumeGroupReplicationClass", "VGRClassName", instance.Spec.VolumeGroupReplicationClassName) + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + + // Declare all dependent resources + vgrContentObj := &replicationv1alpha1.VolumeGroupReplicationContent{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("vgrcontent-%s", instance.UID), + }, + } + vrObj := &replicationv1alpha1.VolumeReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("vr-%s", instance.UID), + Namespace: instance.Namespace, + }, + } + + // Create/Update dependent resources only if the instance is not marked for deletion + if instance.GetDeletionTimestamp().IsZero() { + + // Add finalizer to VGR instance + if err = AddFinalizerToVGR(r.Client, logger, instance); err != nil { + logger.Error(err, "failed to add VolumeGroupReplication finalizer") + return reconcile.Result{}, err + } + + // Check if PVCs exist based on provided selectors + pvcList, pvHandlesList, labelSelector, err := r.getMatchingPVCsFromSource(instance, logger, vgrClassObj) + if err != nil { + logger.Error(err, "failed to get PVCs using selector") + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + if len(pvcList.Items) == 0 { + err = fmt.Errorf("no matching PVCs found for the given selectors") + logger.Error(err, "provided selector should match at least 1 PVC") + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } else if len(pvcList.Items) > 100 { + err = fmt.Errorf("more than 100 PVCs match the given selector") + logger.Error(err, "only 100 PVCs are allowed for volume group replication") + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + + // Add the string representation of the labelSelector to the VGR annotation + if instance.ObjectMeta.Annotations == nil { + instance.ObjectMeta.Annotations = make(map[string]string) + } + + if instance.ObjectMeta.Annotations["pvcSelector"] != labelSelector { + instance.ObjectMeta.Annotations["pvcSelector"] = labelSelector + err = r.Client.Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to add pvc selector annotation to VGR") + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } + + // Update PersistentVolumeClaimsRefList in VGR Status + tmpRefList := []corev1.LocalObjectReference{} + for _, pvc := range pvcList.Items { + tmpRefList = append(tmpRefList, corev1.LocalObjectReference{ + Name: pvc.Name, + }) + } + + // Annotate each PVC with owner and add finalizer to it + for _, pvc := range pvcList.Items { + reqOwner := fmt.Sprintf("%s/%s", instance.Name, instance.Namespace) + err = AnnotatePVCWithOwner(r.Client, logger, reqOwner, &pvc, replicationv1alpha1.VolumeGroupReplicationNameAnnotation) + if err != nil { + logger.Error(err, "Failed to add VGR owner annotation on PVC") + return ctrl.Result{}, err + } - return ctrl.Result{}, nil + if err = AddFinalizerToPVC(r.Client, logger, &pvc, vgrReplicationFinalizer); err != nil { + logger.Error(err, "Failed to add VGR finalizer on PersistentVolumeClaim") + return reconcile.Result{}, err + } + } + + // Update PersistentVolumeClaimsRefList in VGR Status + if !reflect.DeepEqual(instance.Status.PersistentVolumeClaimsRefList, tmpRefList) { + instance.Status.PersistentVolumeClaimsRefList = tmpRefList + err = r.Client.Status().Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update VolumeGroupReplication resource") + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } + + // Create/Update VolumeGroupReplicationContent CR + err = r.createVolumeGroupReplicationContentCR(instance, vgrContentObj, vgrClassObj.Spec.Provisioner, pvHandlesList) + if err != nil { + logger.Error(err, "failed to create/update volumeGroupReplicationContent resource", "VGRContentName", vgrContentObj.Name) + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + + // Update the VGR with VGRContentName, if empty + if instance.Spec.VolumeGroupReplicationContentName == "" { + instance.Spec.VolumeGroupReplicationContentName = vgrContentObj.Name + err = r.Client.Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update volumeGroupReplication instance", "VGRName", instance.Name) + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } + + // Since, the grouping may take few seconds to happen, just exit and wait for the reconcile + // to be triggered when the group handle is updated in the vgrcontent resource. + if vgrContentObj.Spec.VolumeGroupReplicationHandle == "" { + logger.Info("Either volumegroupreplicationcontent is not yet created or it is still grouping the volumes to be replicated") + return reconcile.Result{}, nil + } else { + // Create/Update VolumeReplication CR + err = r.createVolumeReplicationCR(instance, vrObj) + if err != nil { + logger.Error(err, "failed to create/update volumeReplication resource", "VRName", vrObj.Name) + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + + // Update the VGR with VolumeReplication resource name, if not present + if instance.Spec.VolumeReplicationName == "" { + instance.Spec.VolumeReplicationName = vrObj.Name + err = r.Client.Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update volumeGroupReplication instance", "VGRName", instance.Name) + _ = r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } + } + } else { + // When the VGR resource is being deleted + // Remove the owner annotation and the finalizer from pvcs that exist in VGR resource's status + if instance.Status.PersistentVolumeClaimsRefList != nil { + for _, pvcRef := range instance.Status.PersistentVolumeClaimsRefList { + pvc := &corev1.PersistentVolumeClaim{} + err = r.Client.Get(ctx, types.NamespacedName{Name: pvcRef.Name, Namespace: req.Namespace}, pvc) + if err != nil { + logger.Error(err, "failed to fetch pvc from VGR status") + return reconcile.Result{}, err + } + + if err = RemoveOwnerFromPVCAnnotation(r.Client, logger, pvc, replicationv1alpha1.VolumeGroupReplicationNameAnnotation); err != nil { + logger.Error(err, "Failed to remove VolumeReplication annotation from PersistentVolumeClaim") + + return reconcile.Result{}, err + } + + if err = RemoveFinalizerFromPVC(r.Client, logger, pvc, vgrReplicationFinalizer); err != nil { + logger.Error(err, "Failed to remove VGR finalizer from PersistentVolumeClaim") + return reconcile.Result{}, err + } + } + } + // If dependent VR was created, delete it + if instance.Spec.VolumeReplicationName != "" { + req := types.NamespacedName{Name: instance.Spec.VolumeReplicationName, Namespace: req.Namespace} + err = r.Client.Get(ctx, req, vrObj) + if err != nil { + if errors.IsNotFound(err) { + logger.Info("volumeReplication resource not found") + } else { + logger.Error(err, "failed to fetch dependent volumeReplication resource") + return reconcile.Result{}, err + } + } else { + err = r.Client.Delete(ctx, vrObj) + if err != nil { + logger.Error(err, "failed to delete dependent volumeReplication resource") + return reconcile.Result{}, err + } + } + } + + // If dependent VGRContent was created, delete it + if instance.Spec.VolumeGroupReplicationContentName != "" { + req := types.NamespacedName{Name: instance.Spec.VolumeGroupReplicationContentName} + err = r.Client.Get(ctx, req, vgrContentObj) + if err != nil { + if errors.IsNotFound(err) { + logger.Info("volumeGroupReplicationContent resource not found") + } else { + logger.Error(err, "failed to fetch dependent volumeGroupReplicationContent resource") + return reconcile.Result{}, err + } + } else { + err = r.Client.Delete(ctx, vgrContentObj) + if err != nil { + logger.Error(err, "failed to delete dependent volumeGroupReplicationContent resource") + return reconcile.Result{}, err + } + } + } + + // Just log error, and exit reconcile without error. The dependent resource will update the VGR + // to remove their names from the CR, that will trigger a reconcile. + if err = RemoveFinalizerFromVGR(r.Client, logger, instance); err != nil { + logger.Error(err, "failed to remove VolumeGroupReplication finalizer") + } + + return reconcile.Result{}, nil + } + + // Update VGR status based on VR Status + instance.Status.VolumeReplicationStatus = vrObj.Status + err = r.Client.Status().Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update volumeGroupReplication instance's status", "VGRName", instance.Name) + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *VolumeGroupReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { + // Wait for the group CRDs to be present, i.e, VolumeGroupReplication, VolumeGroupReplicationClass and + // VolumeGroupReplicationContent + err := r.waitForGroupCrds() + if err != nil { + return err + } + + // Only reconcile for spec/status update events + skipUpdates := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return false + }, + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + } + + // Watch for only status updates of the VR resource + watchOnlyStatusUpdates := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + if e.ObjectOld == nil || e.ObjectNew == nil { + return false + } + oldObj := e.ObjectOld.(*replicationv1alpha1.VolumeReplication) + newObj := e.ObjectNew.(*replicationv1alpha1.VolumeReplication) + return !reflect.DeepEqual(oldObj.Status, newObj.Status) + }, + CreateFunc: func(e event.CreateEvent) bool { + return false + }, + } + + // Watch for only spec updates of the VGRContent resource + watchOnlySpecUpdates := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + if e.ObjectOld == nil || e.ObjectNew == nil { + return false + } + oldObj := e.ObjectOld.(*replicationv1alpha1.VolumeGroupReplicationContent) + newObj := e.ObjectNew.(*replicationv1alpha1.VolumeGroupReplicationContent) + return !reflect.DeepEqual(oldObj.Spec, newObj.Spec) + }, + CreateFunc: func(e event.CreateEvent) bool { + return false + }, + } + + // Enqueue the VGR reconcile with the VGR name,namespace based on the annotation of the VR and VRContent CR + enqueueVGRRequest := handler.EnqueueRequestsFromMapFunc( + func(context context.Context, obj client.Object) []reconcile.Request { + // Get the VolumeGroupReplication name,namespace + var vgrName, vgrNamespace string + objAnnotations := obj.GetAnnotations() + for k, v := range objAnnotations { + if k == volumeGroupReplicationRef { + vgrName = strings.Split(v, "/")[0] + vgrNamespace = strings.Split(v, "/")[1] + break + } + } + + // Skip reconcile if the triggering resource is not a sub-resource of VGR + if vgrName == "" || vgrNamespace == "" { + return []reconcile.Request{} + } + + // Return a reconcile request with the name, namespace of the VolumeGroupReplication resource + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: vgrNamespace, + Name: vgrName, + }, + }, + } + }, + ) + + // Enqueue the VGR reconcile with the VGR name,namespace based on the labels of the VGR CR + enqueueVGRForPVCRequest := handler.EnqueueRequestsFromMapFunc( + func(context context.Context, obj client.Object) []reconcile.Request { + // Check if the PVC has any labels defined + objLabels := obj.GetLabels() + if len(objLabels) == 0 { + return []reconcile.Request{} + } + + // Check if the resource is present in the cluster + vgrObjsList := &replicationv1alpha1.VolumeGroupReplicationList{} + logger := log.FromContext(context) + err := r.Client.List(context, vgrObjsList) + if err != nil { + logger.Error(err, "failed to list VolumeGroupReplication instances") + return []reconcile.Request{} + } + + // Check if the pvc labels match any VGRs based on selectors present in it's annotation + for _, vgr := range vgrObjsList.Items { + if vgr.Annotations != nil && vgr.Annotations["pvcSelector"] != "" { + labelSelector, err := labels.Parse(vgr.Annotations["pvcSelector"]) + if err != nil { + logger.Error(err, "failed to parse selector from VolumeGroupReplication's annotation") + return []reconcile.Request{} + } + objLabelsSet := labels.Set(objLabels) + if labelSelector.Matches(objLabelsSet) { + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: vgr.Namespace, + Name: vgr.Name, + }, + }, + } + } + } + } + + return []reconcile.Request{} + }, + ) + return ctrl.NewControllerManagedBy(mgr). For(&replicationv1alpha1.VolumeGroupReplication{}). + Owns(&replicationv1alpha1.VolumeGroupReplicationContent{}, builder.WithPredicates(skipUpdates)). + Owns(&replicationv1alpha1.VolumeReplication{}, builder.WithPredicates(skipUpdates)). + Watches(&replicationv1alpha1.VolumeGroupReplicationContent{}, enqueueVGRRequest, builder.WithPredicates(watchOnlySpecUpdates)). + Watches(&replicationv1alpha1.VolumeReplication{}, enqueueVGRRequest, builder.WithPredicates(watchOnlyStatusUpdates)). + Watches(&corev1.PersistentVolumeClaim{}, enqueueVGRForPVCRequest, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Complete(r) } + +// waitForGroupCrds waits for dependent CRDs to the available in the cluster +func (r *VolumeGroupReplicationReconciler) waitForGroupCrds() error { + logger := log.FromContext(context.TODO(), "Name", "checkingGroupDependencies") + + err := WaitForVolumeReplicationResource(r.Client, logger, volumeGroupReplicationClass) + if err != nil { + logger.Error(err, "failed to wait for VolumeGroupReplicationClass CRD") + return err + } + + err = WaitForVolumeReplicationResource(r.Client, logger, volumeGroupReplication) + if err != nil { + logger.Error(err, "failed to wait for VolumeGroupReplication CRD") + return err + } + + err = WaitForVolumeReplicationResource(r.Client, logger, volumeGroupReplicationContent) + if err != nil { + logger.Error(err, "failed to wait for VolumeGroupReplicationContent CRD") + return err + } + + return nil +} + +// setGroupReplicationFailure sets the failure replication status on the VolumeGroupReplication resource +func (r *VolumeGroupReplicationReconciler) setGroupReplicationFailure( + instance *replicationv1alpha1.VolumeGroupReplication, + logger logr.Logger, err error) error { + + instance.Status.State = GetCurrentReplicationState(instance.Status.State) + instance.Status.Message = replication.GetMessageFromError(err) + instance.Status.ObservedGeneration = instance.Generation + if err := r.Client.Status().Update(context.TODO(), instance); err != nil { + logger.Error(err, "failed to update volumeGroupReplication status", "VGRName", instance.Name) + return err + } + + return nil +} + +// getMatchingPVCsFromSource fecthes the PVCs based on the selectors defined in the VolumeGroupReplication resource +func (r *VolumeGroupReplicationReconciler) getMatchingPVCsFromSource(instance *replicationv1alpha1.VolumeGroupReplication, + logger logr.Logger, + vgrClass *replicationv1alpha1.VolumeGroupReplicationClass) (corev1.PersistentVolumeClaimList, []string, string, error) { + + pvcList := corev1.PersistentVolumeClaimList{} + newSelector := labels.NewSelector() + + if instance.Spec.Source.Selector.MatchLabels != nil { + for key, value := range instance.Spec.Source.Selector.MatchLabels { + req, err := labels.NewRequirement(key, selection.Equals, []string{value}) + if err != nil { + logger.Error(err, "failed to add label selector requirement") + return pvcList, nil, "", err + } + newSelector = newSelector.Add(*req) + } + } + + if instance.Spec.Source.Selector.MatchExpressions != nil { + for _, labelExp := range instance.Spec.Source.Selector.MatchExpressions { + req, err := labels.NewRequirement(labelExp.Key, selection.Operator(labelExp.Operator), labelExp.Values) + if err != nil { + logger.Error(err, "failed to add label selector requirement") + return pvcList, nil, "", err + } + newSelector = newSelector.Add(*req) + } + } + opts := []client.ListOption{ + client.MatchingLabelsSelector{Selector: newSelector}, + client.InNamespace(instance.Namespace), + } + err := r.Client.List(context.TODO(), &pvcList, opts...) + if err != nil { + logger.Error(err, "failed to list pvcs with the given selectors") + return pvcList, nil, "", err + } + + pvHandlesList := []string{} + for _, pvc := range pvcList.Items { + pvName := pvc.Spec.VolumeName + pv := &corev1.PersistentVolume{} + err := r.Client.Get(context.TODO(), types.NamespacedName{Name: pvName}, pv) + if err != nil { + logger.Error(err, "failed to get pv for corresponding pvc", "PVC Name", pvc.Name) + return pvcList, nil, "", err + } + if pv.Spec.CSI == nil { + err = fmt.Errorf("pvc %s is not bound to a CSI PV", pvc.Name) + return pvcList, nil, "", err + } + if pv.Spec.CSI.Driver != vgrClass.Spec.Provisioner { + err = fmt.Errorf("driver of PV for PVC %s is different than the VolumeGroupReplicationClass driver", pvc.Name) + return pvcList, nil, "", err + } + pvHandlesList = append(pvHandlesList, pv.Spec.CSI.VolumeHandle) + } + + return pvcList, pvHandlesList, newSelector.String(), nil +} + +func (r *VolumeGroupReplicationReconciler) createVolumeGroupReplicationContentCR(vgr *replicationv1alpha1.VolumeGroupReplication, + vgrContentObj *replicationv1alpha1.VolumeGroupReplicationContent, vgrClass string, pvHandlesList []string) error { + _, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, vgrContentObj, func() error { + if vgrContentObj.CreationTimestamp.IsZero() { + vgrContentObj.Annotations = map[string]string{ + volumeGroupReplicationRef: fmt.Sprintf("%s/%s", vgr.Name, vgr.Namespace), + } + vgrContentObj.Spec = replicationv1alpha1.VolumeGroupReplicationContentSpec{ + VolumeGroupReplicationRef: corev1.ObjectReference{ + APIVersion: vgr.APIVersion, + Kind: vgr.Kind, + Name: vgr.Name, + Namespace: vgr.Namespace, + UID: vgr.UID, + }, + Provisioner: vgrClass, + VolumeGroupReplicationClassName: vgr.Spec.VolumeGroupReplicationClassName, + } + } + + vgrContentObj.Spec.Source = replicationv1alpha1.VolumeGroupReplicationContentSource{ + VolumeHandles: pvHandlesList, + } + + return nil + }) + + return err +} + +func (r *VolumeGroupReplicationReconciler) createVolumeReplicationCR(vgr *replicationv1alpha1.VolumeGroupReplication, + vrObj *replicationv1alpha1.VolumeReplication) error { + apiGroup := "replication.storage.openshift.io" + _, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, vrObj, func() error { + if vrObj.CreationTimestamp.IsZero() { + vrObj.Annotations = map[string]string{ + volumeGroupReplicationRef: fmt.Sprintf("%s/%s", vgr.Name, vgr.Namespace), + } + vrObj.Spec = replicationv1alpha1.VolumeReplicationSpec{ + VolumeReplicationClass: vgr.Spec.VolumeReplicationClassName, + DataSource: corev1.TypedLocalObjectReference{ + APIGroup: &apiGroup, + Kind: vgr.Kind, + Name: vgr.Name, + }, + } + } + + vrObj.Spec.AutoResync = vgr.Spec.AutoResync + vrObj.Spec.ReplicationState = vgr.Spec.ReplicationState + + return controllerutil.SetOwnerReference(vgr, vrObj, r.Scheme) + }) + + return err +} diff --git a/internal/controller/replication.storage/volumegroupreplication_test.go b/internal/controller/replication.storage/volumegroupreplication_test.go new file mode 100644 index 000000000..3681864dc --- /dev/null +++ b/internal/controller/replication.storage/volumegroupreplication_test.go @@ -0,0 +1,199 @@ +/* +Copyright 2024 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/replication.storage/v1alpha1" +) + +const ( + mockPV = "test-vgr-pv" + mockPVC = "test-vgr-pvc" +) + +var mockVolumeGroupReplicationObj = &replicationv1alpha1.VolumeGroupReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "volume-group-replication", + Namespace: mockNamespace, + UID: "testname", + }, + Spec: replicationv1alpha1.VolumeGroupReplicationSpec{ + VolumeGroupReplicationClassName: "volume-group-replication-class", + VolumeReplicationClassName: "volume-replication-class", + Source: replicationv1alpha1.VolumeGroupReplicationSource{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "test": "vgr_test", + }, + }, + }, + }, +} + +var mockVGRPersistentVolume = &corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: mockPV, + }, + Spec: corev1.PersistentVolumeSpec{ + PersistentVolumeSource: corev1.PersistentVolumeSource{ + CSI: &corev1.CSIPersistentVolumeSource{ + Driver: "test-driver", + VolumeHandle: mockVolumeHandle, + }, + }, + }, +} + +var mockVGRPersistentVolumeClaim = &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: mockPVC, + Namespace: mockNamespace, + Labels: map[string]string{ + "test": "vgr_test", + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + VolumeName: mockPV, + }, + Status: corev1.PersistentVolumeClaimStatus{ + Phase: corev1.ClaimBound, + }, +} + +func createFakeVolumeGroupReplicationReconciler(t *testing.T, obj ...runtime.Object) VolumeGroupReplicationReconciler { + t.Helper() + scheme := createFakeScheme(t) + vgrInit := &replicationv1alpha1.VolumeGroupReplication{} + vgrContentInit := &replicationv1alpha1.VolumeGroupReplicationContent{} + client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(obj...).WithStatusSubresource(vgrInit, vgrContentInit).Build() + + return VolumeGroupReplicationReconciler{ + Client: client, + Scheme: scheme, + } +} + +func TestVolumeGroupReplication(t *testing.T) { + t.Parallel() + testcases := []struct { + name string + pv *corev1.PersistentVolume + pvc *corev1.PersistentVolumeClaim + expectedPVCList []string + pvcFound bool + }{ + { + name: "case 1: matching pvc available", + pv: mockVGRPersistentVolume, + pvc: mockVGRPersistentVolumeClaim, + expectedPVCList: []string{mockPVC}, + pvcFound: true, + }, + { + name: "case 2: matching pvc not found", + pv: mockVGRPersistentVolume, + pvc: &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: mockPVC, + Namespace: mockNamespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + VolumeName: mockPV, + }, + Status: corev1.PersistentVolumeClaimStatus{ + Phase: corev1.ClaimBound, + }, + }, + expectedPVCList: []string{}, + pvcFound: false, + }, + } + for _, tc := range testcases { + volumeGroupReplication := &replicationv1alpha1.VolumeGroupReplication{} + mockVolumeGroupReplicationObj.DeepCopyInto(volumeGroupReplication) + + volumeGroupReplicationClass := &replicationv1alpha1.VolumeGroupReplicationClass{} + mockVolumeGroupReplicationClassObj.DeepCopyInto(volumeGroupReplicationClass) + + volumeReplicationClass := &replicationv1alpha1.VolumeReplicationClass{} + mockVolumeReplicationClassObj.DeepCopyInto(volumeReplicationClass) + + testPV := &corev1.PersistentVolume{} + tc.pv.DeepCopyInto(testPV) + + testPVC := &corev1.PersistentVolumeClaim{} + tc.pvc.DeepCopyInto(testPVC) + + r := createFakeVolumeGroupReplicationReconciler(t, testPV, testPVC, volumeReplicationClass, volumeGroupReplicationClass, volumeGroupReplication) + nsKey := types.NamespacedName{ + Namespace: volumeGroupReplication.Namespace, + Name: volumeGroupReplication.Name, + } + req := reconcile.Request{ + NamespacedName: nsKey, + } + res, err := r.Reconcile(context.TODO(), req) + + if tc.pvcFound { + // Check reconcile didn't return any error + assert.Equal(t, reconcile.Result{}, res) + assert.NoError(t, err) + + pvc := &corev1.PersistentVolumeClaim{} + err = r.Client.Get(context.TODO(), types.NamespacedName{Name: testPVC.Name, Namespace: testPVC.Namespace}, pvc) + assert.NoError(t, err) + + vgr := &replicationv1alpha1.VolumeGroupReplication{} + err = r.Client.Get(context.TODO(), nsKey, vgr) + assert.NoError(t, err) + + vgrPVCRefList := vgr.Status.PersistentVolumeClaimsRefList + assert.Equal(t, 1, len(vgrPVCRefList)) + for _, pvc := range vgrPVCRefList { + assert.Equal(t, pvc.Name, mockVGRPersistentVolumeClaim.Name) + } + // Check PVC annotation + expectedOwner := fmt.Sprintf("%s/%s", volumeGroupReplication.Name, volumeGroupReplication.Namespace) + assert.Equal(t, expectedOwner, pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeGroupReplicationNameAnnotation]) + // Check VGRContent Created + assert.NotEmpty(t, vgr.Spec.VolumeGroupReplicationContentName) + } else { + // Check reconcile returned an error + assert.Equal(t, reconcile.Result{}, res) + assert.Error(t, err) + + vgr := &replicationv1alpha1.VolumeGroupReplication{} + err = r.Client.Get(context.TODO(), nsKey, vgr) + assert.NoError(t, err) + + assert.Empty(t, vgr.Status.PersistentVolumeClaimsRefList) + assert.Empty(t, vgr.Spec.VolumeGroupReplicationContentName) + } + } +} diff --git a/internal/controller/replication.storage/volumegroupreplicationclass.go b/internal/controller/replication.storage/volumegroupreplicationclass.go new file mode 100644 index 000000000..209646098 --- /dev/null +++ b/internal/controller/replication.storage/volumegroupreplicationclass.go @@ -0,0 +1,44 @@ +/* +Copyright 2024 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + + replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/replication.storage/v1alpha1" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" +) + +// getVolumeGroupReplicationClass fetches the volumegroupreplicationclass object in the given namespace and return the same. +func (r VolumeGroupReplicationReconciler) getVolumeGroupReplicationClass(logger logr.Logger, vgrClassName string) (*replicationv1alpha1.VolumeGroupReplicationClass, error) { + vgrClassObj := &replicationv1alpha1.VolumeGroupReplicationClass{} + err := r.Client.Get(context.TODO(), types.NamespacedName{Name: vgrClassName}, vgrClassObj) + if err != nil { + if errors.IsNotFound(err) { + logger.Error(err, "VolumeGroupReplicationClass not found", "VolumeGroupReplicationClass", vgrClassName) + } else { + logger.Error(err, "Got an unexpected error while fetching VolumeReplicationClass", "VolumeReplicationClass", vgrClassName) + } + + return nil, err + } + + return vgrClassObj, nil +} diff --git a/internal/controller/replication.storage/volumegroupreplicationclass_test.go b/internal/controller/replication.storage/volumegroupreplicationclass_test.go new file mode 100644 index 000000000..40ed15872 --- /dev/null +++ b/internal/controller/replication.storage/volumegroupreplicationclass_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2024 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "testing" + + replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/replication.storage/v1alpha1" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +var mockVolumeGroupReplicationClassObj = &replicationv1alpha1.VolumeGroupReplicationClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "volume-group-replication-class", + }, + Spec: replicationv1alpha1.VolumeGroupReplicationClassSpec{ + Provisioner: "test-driver", + }, +} + +func TestGetVolumeGroupReplicationClass(t *testing.T) { + t.Parallel() + testcases := []struct { + createVgrc bool + errorExpected bool + isErrorNotFound bool + }{ + {createVgrc: true, errorExpected: false, isErrorNotFound: false}, + {createVgrc: false, errorExpected: true, isErrorNotFound: true}, + } + + for _, tc := range testcases { + var objects []runtime.Object + + volumeGroupReplication := &replicationv1alpha1.VolumeGroupReplication{} + mockVolumeGroupReplicationObj.DeepCopyInto(volumeGroupReplication) + objects = append(objects, volumeGroupReplication) + + if tc.createVgrc { + volumeGroupReplicationClass := &replicationv1alpha1.VolumeGroupReplicationClass{} + mockVolumeGroupReplicationClassObj.DeepCopyInto(volumeGroupReplicationClass) + objects = append(objects, volumeGroupReplicationClass) + } + + reconciler := createFakeVolumeGroupReplicationReconciler(t, objects...) + vgrClassObj, err := reconciler.getVolumeGroupReplicationClass(log.FromContext(context.TODO()), mockVolumeGroupReplicationClassObj.Name) + + if tc.errorExpected { + assert.Error(t, err) + if tc.isErrorNotFound { + assert.True(t, errors.IsNotFound(err)) + } + } else { + assert.NoError(t, err) + assert.NotEqual(t, nil, vgrClassObj) + } + } +} diff --git a/internal/controller/replication.storage/volumegroupreplicationcontent_controller.go b/internal/controller/replication.storage/volumegroupreplicationcontent_controller.go index eccfdd30e..1218df16b 100644 --- a/internal/controller/replication.storage/volumegroupreplicationcontent_controller.go +++ b/internal/controller/replication.storage/volumegroupreplicationcontent_controller.go @@ -18,36 +18,265 @@ package controller import ( "context" + "fmt" + "slices" + "time" + replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/replication.storage/v1alpha1" + grpcClient "github.com/csi-addons/kubernetes-csi-addons/internal/client" + conn "github.com/csi-addons/kubernetes-csi-addons/internal/connection" + "github.com/go-logr/logr" + + "github.com/csi-addons/spec/lib/go/identity" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - - replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/replication.storage/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // VolumeGroupReplicationContentReconciler reconciles a VolumeGroupReplicationContent object type VolumeGroupReplicationContentReconciler struct { client.Client Scheme *runtime.Scheme + // ConnectionPool consists of map of Connection objects + Connpool *conn.ConnectionPool + // Timeout for the Reconcile operation. + Timeout time.Duration } //+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplicationcontents,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplicationcontents/status,verbs=get;update;patch //+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplicationcontents/finalizers,verbs=update +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplicationclasses,verbs=get;list;watch + +/* +Steps performed by the reconcile loop: +- Watch for VGRContent CR +- Fetch the VGRClass using the name from the VGRContent CR, and extract the secrets from it +- Add VGRContent owner annotation to the VGR resource +- Add finalizer to the VGRContent resource +- Create/Modify the volume group based on the handle field +- Update the group handle in VGRContent CR +- Update the VGRContent status with the PV list +*/ // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (r *VolumeGroupReplicationContentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) + logger := log.FromContext(ctx, "Request.Name", req.Name, "Request.Namespace", req.Namespace) + + // Fetch VolumeGroupReplicationContent instance + instance := &replicationv1alpha1.VolumeGroupReplicationContent{} + err := r.Client.Get(ctx, req.NamespacedName, instance) + if err != nil { + if errors.IsNotFound(err) { + logger.Info("volumeGroupReplicationContent resource not found") + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + + volumeGroupClient, err := r.getVolumeGroupClient(ctx, instance.Spec.Provisioner) + if err != nil { + logger.Error(err, "Failed to get VolumeGroupClient") + return reconcile.Result{}, err + } + + // Fetch VolumeGroupReplicationClass + vgrClass := &replicationv1alpha1.VolumeGroupReplicationClass{} + err = r.Client.Get(ctx, types.NamespacedName{Name: instance.Spec.VolumeGroupReplicationClassName}, vgrClass) + if err != nil { + logger.Error(err, "failed to fetch volumeGroupReplicationClass resource") + return reconcile.Result{}, err + } + + // Get secrets and parameters + parameters := filterPrefixedParameters(replicationParameterPrefix, vgrClass.Spec.Parameters) + secretName := vgrClass.Spec.Parameters[prefixedGroupReplicationSecretNameKey] + secretNamespace := vgrClass.Spec.Parameters[prefixedGroupReplicationSecretNamespaceKey] + + // Get the VolumeGroupReplication resource + vgrObj := &replicationv1alpha1.VolumeGroupReplication{} + namespacedObj := types.NamespacedName{Namespace: instance.Spec.VolumeGroupReplicationRef.Namespace, + Name: instance.Spec.VolumeGroupReplicationRef.Name} + err = r.Client.Get(ctx, namespacedObj, vgrObj) + if err != nil { + logger.Error(err, "failed to get owner VolumeGroupReplication") + return reconcile.Result{}, err + } + + // Check if object is being deleted + if instance.GetDeletionTimestamp().IsZero() { + err = r.annotateVolumeGroupReplicationWithVGRContentOwner(ctx, logger, req.Name, vgrObj) + if err != nil { + logger.Error(err, "Failed to annotate VolumeGroupReplication owner") + return ctrl.Result{}, err + } + if err = r.addFinalizerToVGRContent(logger, instance); err != nil { + logger.Error(err, "failed to add VolumeGroupReplicationContent finalizer") + return reconcile.Result{}, err + } + } else { + // Check if the owner VGR is marked for deletion, only then remove the finalizer from VGRContent resource + if vgrObj.GetDeletionTimestamp().IsZero() { + logger.Info("cannot delete VolumeGroupReplicationContent resource, until owner VolumeGroupReplication instance is deleted") + return reconcile.Result{}, nil + } else { + // Delete the volume group, if created + if instance.Spec.VolumeGroupReplicationHandle != "" { + _, err := volumeGroupClient.DeleteVolumeGroup(instance.Spec.VolumeGroupReplicationHandle, secretName, secretNamespace) + if err != nil { + logger.Error(err, "failed to delete volume group") + return reconcile.Result{}, err + } + } + + // Remove the vgrcontent owner annotation from the VGR resource + if err = r.removeVGRContentOwnerFromVGRAnnotation(ctx, logger, vgrObj); err != nil { + logger.Error(err, "Failed to remove VolumeReplication annotation from VolumeGroupReplication") + return reconcile.Result{}, err + } + + // Remove the vgr finalizer from the vgrcontent resource + if err = r.removeFinalizerFromVGRContent(logger, instance); err != nil { + logger.Error(err, "failed to remove finalizer from VolumeGroupReplicationContent resource") + return reconcile.Result{}, err + } + } + } + + // Create/Update volume group + if instance.Spec.VolumeGroupReplicationHandle == "" { + groupName := fmt.Sprintf("volumegroup-%s", instance.UID) + resp, err := volumeGroupClient.CreateVolumeGroup(groupName, instance.Spec.Source.VolumeHandles, secretName, secretNamespace, parameters) + if err != nil { + logger.Error(err, "failed to group volumes") + return reconcile.Result{}, err + } + + // Update the group handle in the VolumeGroupReplicationContent CR + instance.Spec.VolumeGroupReplicationHandle = resp.GetVolumeGroup().VolumeGroupId + err = r.Client.Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update group id in VGRContent") + return reconcile.Result{}, err + } + } else { + groupID := instance.Spec.VolumeGroupReplicationHandle + _, err := volumeGroupClient.ModifyVolumeGroupMembership(groupID, instance.Spec.Source.VolumeHandles, secretName, secretNamespace) + if err != nil { + logger.Error(err, "failed to modify volume group") + return reconcile.Result{}, err + } + } + + // Update VGRContent resource status + pvList := &corev1.PersistentVolumeList{} + err = r.Client.List(ctx, pvList) + if err != nil { + logger.Error(err, "failed to list PVs") + return reconcile.Result{}, err + } + + pvRefList := []corev1.LocalObjectReference{} + for _, pv := range pvList.Items { + if slices.ContainsFunc(instance.Spec.Source.VolumeHandles, func(handle string) bool { + return pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle == handle + }) { + pvRefList = append(pvRefList, corev1.LocalObjectReference{ + Name: pv.Name, + }) + } + } + instance.Status.PersistentVolumeRefList = pvRefList + err = r.Client.Status().Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update VGRContent status") + return reconcile.Result{}, err + } - return ctrl.Result{}, nil + return reconcile.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *VolumeGroupReplicationContentReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). For(&replicationv1alpha1.VolumeGroupReplicationContent{}). Complete(r) } + +func (r *VolumeGroupReplicationContentReconciler) getVolumeGroupClient(ctx context.Context, driverName string) (grpcClient.VolumeGroup, error) { + conn, err := r.Connpool.GetLeaderByDriver(ctx, r.Client, driverName) + if err != nil { + return nil, fmt.Errorf("no leader for the ControllerService of driver %q", driverName) + } + + for _, cap := range conn.Capabilities { + // validate if VOLUME_GROUP capability is supported by the driver. + if cap.GetVolumeGroup() == nil { + continue + } + + // validate of VOLUME_GROUP capability is enabled by the storage driver. + if cap.GetVolumeGroup().GetType() == identity.Capability_VolumeGroup_VOLUME_GROUP { + return grpcClient.NewVolumeGroupClient(conn.Client, r.Timeout), nil + } + } + + return nil, fmt.Errorf("leading CSIAddonsNode %q for driver %q does not support VolumeGroup", conn.Name, driverName) + +} + +// annotateVolumeGroupReplicationWithVGRContentOwner will add the VolumeGroupReplicationContent owner to the VGR annotations. +func (r *VolumeGroupReplicationContentReconciler) annotateVolumeGroupReplicationWithVGRContentOwner(ctx context.Context, logger logr.Logger, reqOwnerName string, vgr *replicationv1alpha1.VolumeGroupReplication) error { + if vgr.ObjectMeta.Annotations == nil { + vgr.ObjectMeta.Annotations = map[string]string{} + } + + currentOwnerName := vgr.ObjectMeta.Annotations[replicationv1alpha1.VolumeGroupReplicationContentNameAnnotation] + if currentOwnerName == "" { + logger.Info("setting vgrcontent owner on VGR annotation", "Name", vgr.Name, "owner", reqOwnerName) + vgr.ObjectMeta.Annotations[replicationv1alpha1.VolumeGroupReplicationContentNameAnnotation] = reqOwnerName + err := r.Update(ctx, vgr) + if err != nil { + logger.Error(err, "Failed to update VGR annotation", "Name", vgr.Name) + + return fmt.Errorf("failed to update VGR %q annotation for VolumeGroupReplicationContent: %w", + vgr.Name, err) + } + + return nil + } + + if currentOwnerName != reqOwnerName { + logger.Info("cannot change the owner of vgr", + "VGR name", vgr.Name, + "current owner", currentOwnerName, + "requested owner", reqOwnerName) + + return fmt.Errorf("VGR %q not owned by VolumeReplication %q", + vgr.Name, reqOwnerName) + } + + return nil +} + +// removeVGRContentOwnerFromVGRAnnotation removes the VolumeGroupReplicationContent owner from the VGR annotations. +func (r *VolumeGroupReplicationContentReconciler) removeVGRContentOwnerFromVGRAnnotation(ctx context.Context, logger logr.Logger, vgr *replicationv1alpha1.VolumeGroupReplication) error { + if _, ok := vgr.ObjectMeta.Annotations[replicationv1alpha1.VolumeGroupReplicationContentNameAnnotation]; ok { + logger.Info("removing vgrcontent owner annotation from VolumeGroupReplication object", "Annotation", replicationv1alpha1.VolumeGroupReplicationContentNameAnnotation) + delete(vgr.ObjectMeta.Annotations, replicationv1alpha1.VolumeGroupReplicationContentNameAnnotation) + if err := r.Client.Update(ctx, vgr); err != nil { + return fmt.Errorf("failed to remove annotation %q from VolumeGroupReplication "+ + "%q %w", + replicationv1alpha1.VolumeGroupReplicationContentNameAnnotation, vgr.Name, err) + } + } + + return nil +} diff --git a/internal/controller/replication.storage/volumereplication_controller.go b/internal/controller/replication.storage/volumereplication_controller.go index c96e43d9a..87cc3a63c 100644 --- a/internal/controller/replication.storage/volumereplication_controller.go +++ b/internal/controller/replication.storage/volumereplication_controller.go @@ -35,11 +35,8 @@ import ( "google.golang.org/grpc/codes" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -113,7 +110,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re vrcObj, err := r.getVolumeReplicationClass(logger, instance.Spec.VolumeReplicationClass) if err != nil { setFailureCondition(instance, "failed to get volumeReplication class", err.Error(), instance.Spec.DataSource.Kind) - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), err.Error()) + uErr := r.updateReplicationStatus(instance, logger, GetCurrentReplicationState(instance.Status.State), err.Error()) if uErr != nil { logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) } @@ -125,7 +122,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re if err != nil { logger.Error(err, "failed to validate parameters of volumeReplicationClass", "VRCName", instance.Spec.VolumeReplicationClass) setFailureCondition(instance, "failed to validate parameters of volumeReplicationClass", err.Error(), instance.Spec.DataSource.Kind) - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), err.Error()) + uErr := r.updateReplicationStatus(instance, logger, GetCurrentReplicationState(instance.Status.State), err.Error()) if uErr != nil { logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) } @@ -161,7 +158,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re if pvErr != nil { logger.Error(pvErr, "failed to get PVC", "PVCName", instance.Spec.DataSource.Name) setFailureCondition(instance, "failed to find PVC", pvErr.Error(), instance.Spec.DataSource.Name) - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), pvErr.Error()) + uErr := r.updateReplicationStatus(instance, logger, GetCurrentReplicationState(instance.Status.State), pvErr.Error()) if uErr != nil { logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) } @@ -176,7 +173,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re if vgrErr != nil { logger.Error(vgrErr, "failed to get VolumeGroupReplication", "VGRName", instance.Spec.DataSource.Name) setFailureCondition(instance, "failed to get VolumeGroupReplication", vgrErr.Error(), instance.Spec.DataSource.Name) - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), vgrErr.Error()) + uErr := r.updateReplicationStatus(instance, logger, GetCurrentReplicationState(instance.Status.State), vgrErr.Error()) if uErr != nil { logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) } @@ -188,7 +185,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re err = fmt.Errorf("unsupported datasource kind") logger.Error(err, "given kind not supported", "Kind", instance.Spec.DataSource.Kind) setFailureCondition(instance, "unsupported datasource", err.Error(), "") - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), err.Error()) + uErr := r.updateReplicationStatus(instance, logger, GetCurrentReplicationState(instance.Status.State), err.Error()) if uErr != nil { logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) } @@ -231,26 +228,25 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re } switch instance.Spec.DataSource.Kind { case pvcDataSource: - err = r.annotatePVCWithOwner(ctx, logger, req.Name, pvc) + reqOwner := fmt.Sprintf("%s/%s", instance.Name, instance.Namespace) + err = AnnotatePVCWithOwner(r.Client, logger, reqOwner, pvc, replicationv1alpha1.VolumeReplicationNameAnnotation) if err != nil { logger.Error(err, "Failed to annotate PVC owner") return ctrl.Result{}, err } - if err = r.addFinalizerToPVC(logger, pvc); err != nil { + if err = AddFinalizerToPVC(r.Client, logger, pvc, pvcReplicationFinalizer); err != nil { logger.Error(err, "Failed to add PersistentVolumeClaim finalizer") - - return reconcile.Result{}, err + return ctrl.Result{}, err } case volumeGroupReplicationDataSource: err = r.annotateVolumeGroupReplicationWithOwner(ctx, logger, req.Name, vgr) if err != nil { logger.Error(err, "Failed to annotate VolumeGroupReplication owner") - return ctrl.Result{}, err } - if err = r.addFinalizerToVGR(logger, vgr); err != nil { + if err = AddFinalizerToVGR(r.Client, logger, vgr); err != nil { logger.Error(err, "Failed to add VolumeGroupReplication finalizer") return reconcile.Result{}, err @@ -266,27 +262,18 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re } switch instance.Spec.DataSource.Kind { case pvcDataSource: - if err = r.removeOwnerFromPVCAnnotation(ctx, logger, pvc); err != nil { + if err = RemoveOwnerFromPVCAnnotation(r.Client, logger, pvc, replicationv1alpha1.VolumeReplicationNameAnnotation); err != nil { logger.Error(err, "Failed to remove VolumeReplication annotation from PersistentVolumeClaim") - return reconcile.Result{}, err } - if err = r.removeFinalizerFromPVC(logger, pvc); err != nil { + if err = RemoveFinalizerFromPVC(r.Client, logger, pvc, pvcReplicationFinalizer); err != nil { logger.Error(err, "Failed to remove PersistentVolumeClaim finalizer") - return reconcile.Result{}, err } case volumeGroupReplicationDataSource: if err = r.removeOwnerFromVGRAnnotation(ctx, logger, vgr); err != nil { logger.Error(err, "Failed to remove VolumeReplication annotation from VolumeGroupReplication") - - return reconcile.Result{}, err - } - - if err = r.removeFinalizerFromVGR(logger, vgr); err != nil { - logger.Error(err, "Failed to remove VolumeGroupReplication finalizer") - return reconcile.Result{}, err } } @@ -324,7 +311,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re if err = r.enableReplication(vr); err != nil { logger.Error(err, "failed to enable replication") msg := replication.GetMessageFromError(err) - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), msg) + uErr := r.updateReplicationStatus(instance, logger, GetCurrentReplicationState(instance.Status.State), msg) if uErr != nil { logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) } @@ -344,7 +331,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re logger.Info("volume is not ready to use") // set the status.State to secondary as the // instance.Status.State is primary for the first time. - err = r.updateReplicationStatus(instance, logger, getReplicationState(instance), "volume is marked secondary and is degraded") + err = r.updateReplicationStatus(instance, logger, GetReplicationState(instance.Spec.ReplicationState), "volume is marked secondary and is degraded") if err != nil { return ctrl.Result{}, err } @@ -372,7 +359,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re replicationErr = fmt.Errorf("unsupported volume state") logger.Error(replicationErr, "given volume state is not supported", "ReplicationState", instance.Spec.ReplicationState) setFailureCondition(instance, "unsupported volume state", replicationErr.Error(), instance.Spec.DataSource.Kind) - err = r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), replicationErr.Error()) + err = r.updateReplicationStatus(instance, logger, GetCurrentReplicationState(instance.Status.State), replicationErr.Error()) if err != nil { logger.Error(err, "failed to update volumeReplication status", "VRName", instance.Name) } @@ -383,7 +370,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re if replicationErr != nil { msg := replication.GetMessageFromError(replicationErr) logger.Error(replicationErr, "failed to Replicate", "ReplicationState", instance.Spec.ReplicationState) - err = r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), msg) + err = r.updateReplicationStatus(instance, logger, GetCurrentReplicationState(instance.Status.State), msg) if err != nil { logger.Error(err, "failed to update volumeReplication status", "VRName", instance.Name) } @@ -396,13 +383,13 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re }, nil } - return ctrl.Result{}, replicationErr + return reconcile.Result{}, replicationErr } if requeueForResync { logger.Info("volume is not ready to use, requeuing for resync") - err = r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), "volume is degraded") + err = r.updateReplicationStatus(instance, logger, GetCurrentReplicationState(instance.Status.State), "volume is degraded") if err != nil { logger.Error(err, "failed to update volumeReplication status", "VRName", instance.Name) } @@ -455,7 +442,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re if instance.Spec.ReplicationState == replicationv1alpha1.Secondary { instance.Status.LastSyncTime = nil } - err = r.updateReplicationStatus(instance, logger, getReplicationState(instance), msg) + err = r.updateReplicationStatus(instance, logger, GetReplicationState(instance.Spec.ReplicationState), msg) if err != nil { return ctrl.Result{}, err } @@ -554,7 +541,6 @@ func (r *VolumeReplicationReconciler) updateReplicationStatus( func (r *VolumeReplicationReconciler) SetupWithManager(mgr ctrl.Manager, ctrlOptions controller.Options) error { err := r.waitForCrds() if err != nil { - return err } @@ -570,46 +556,21 @@ func (r *VolumeReplicationReconciler) SetupWithManager(mgr ctrl.Manager, ctrlOpt func (r *VolumeReplicationReconciler) waitForCrds() error { logger := log.FromContext(context.TODO(), "Name", "checkingDependencies") - err := r.waitForVolumeReplicationResource(logger, volumeReplicationClass) + err := WaitForVolumeReplicationResource(r.Client, logger, volumeReplicationClass) if err != nil { logger.Error(err, "failed to wait for VolumeReplicationClass CRD") - return err } - err = r.waitForVolumeReplicationResource(logger, volumeReplication) + err = WaitForVolumeReplicationResource(r.Client, logger, volumeReplication) if err != nil { logger.Error(err, "failed to wait for VolumeReplication CRD") - return err } return nil } -func (r *VolumeReplicationReconciler) waitForVolumeReplicationResource(logger logr.Logger, resourceName string) error { - unstructuredResource := &unstructured.UnstructuredList{} - unstructuredResource.SetGroupVersionKind(schema.GroupVersionKind{ - Group: replicationv1alpha1.GroupVersion.Group, - Kind: resourceName, - Version: replicationv1alpha1.GroupVersion.Version, - }) - for { - err := r.Client.List(context.TODO(), unstructuredResource) - if err == nil { - return nil - } - // return errors other than NoMatch - if !meta.IsNoMatchError(err) { - logger.Error(err, "got an unexpected error while waiting for resource", "Resource", resourceName) - - return err - } - logger.Info("resource does not exist", "Resource", resourceName) - time.Sleep(5 * time.Second) - } -} - // volumeReplicationInstance contains the attributes // that can be useful in reconciling a particular // instance of the VolumeReplication resource. @@ -783,27 +744,6 @@ func (r *VolumeReplicationReconciler) getVolumeReplicationInfo(vr *volumeReplica return infoResponse, nil } -func getReplicationState(instance *replicationv1alpha1.VolumeReplication) replicationv1alpha1.State { - switch instance.Spec.ReplicationState { - case replicationv1alpha1.Primary: - return replicationv1alpha1.PrimaryState - case replicationv1alpha1.Secondary: - return replicationv1alpha1.SecondaryState - case replicationv1alpha1.Resync: - return replicationv1alpha1.SecondaryState - } - - return replicationv1alpha1.UnknownState -} - -func getCurrentReplicationState(instance *replicationv1alpha1.VolumeReplication) replicationv1alpha1.State { - if instance.Status.State == "" { - return replicationv1alpha1.UnknownState - } - - return instance.Status.State -} - func setFailureCondition(instance *replicationv1alpha1.VolumeReplication, errMessage string, errFromCephCSI string, dataSource string) { switch instance.Spec.ReplicationState { case replicationv1alpha1.Primary: