diff --git a/apis/clusters/v1beta1/cassandra_types.go b/apis/clusters/v1beta1/cassandra_types.go index 7f3ddc2f4..1fc6da1fb 100644 --- a/apis/clusters/v1beta1/cassandra_types.go +++ b/apis/clusters/v1beta1/cassandra_types.go @@ -21,7 +21,9 @@ import ( "fmt" "strconv" + k8scorev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -55,8 +57,8 @@ type CassandraRestoreFrom struct { // CassandraSpec defines the desired state of Cassandra type CassandraSpec struct { - RestoreFrom *CassandraRestoreFrom `json:"restoreFrom,omitempty"` - OnPremisesSpec *CassandraOnPremisesSpec `json:"onPremisesSpec,omitempty"` + RestoreFrom *CassandraRestoreFrom `json:"restoreFrom,omitempty"` + OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"` Cluster `json:",inline"` DataCentres []*CassandraDataCentre `json:"dataCentres,omitempty"` LuceneEnabled bool `json:"luceneEnabled,omitempty"` @@ -68,16 +70,16 @@ type CassandraSpec struct { ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` } -type CassandraOnPremisesSpec struct { - StorageClassName string `json:"storageClassName"` - OSDiskSize string `json:"osDiskSize"` - DataDiskSize string `json:"dataDiskSize"` - SSHGatewayCPU int64 `json:"sshGatewayCPU,omitempty"` - SSHGatewayMemory string `json:"sshGatewayMemory,omitempty"` - NodeCPU int64 `json:"nodeCPU"` - NodeMemory string `json:"nodeMemory"` - OSImageURL string `json:"osImageURL"` - CloudInitScriptNamespacedName *NamespacedName `json:"cloudInitScriptNamespacedName"` +type OnPremisesSpec struct { + StorageClassName string `json:"storageClassName"` + OSDiskSize string `json:"osDiskSize"` + DataDiskSize string `json:"dataDiskSize"` + SSHGatewayCPU int64 `json:"sshGatewayCPU,omitempty"` + SSHGatewayMemory string `json:"sshGatewayMemory,omitempty"` + NodeCPU int64 `json:"nodeCPU"` + NodeMemory string `json:"nodeMemory"` + OSImageURL string `json:"osImageURL"` + CloudInitScriptRef *NamespacedName `json:"cloudInitScriptRef"` } // CassandraStatus defines the observed state of Cassandra @@ -170,6 +172,85 @@ func (c *Cassandra) NewBackupSpec(startTimestamp int) *clusterresourcesv1beta1.C } } +func (c *Cassandra) NewExposePorts() []k8scorev1.ServicePort { + var ports []k8scorev1.ServicePort + ports = []k8scorev1.ServicePort{{ + Name: models.SSH, + Port: models.Port22, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: models.Port22, + }, + }, + } + + if !c.Spec.PrivateNetworkCluster { + additionalPorts := []k8scorev1.ServicePort{ + { + Name: models.InterNode, + Port: models.Port7000, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: models.Port7000, + }, + }, + { + Name: models.CQLSH, + Port: models.Port9042, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: models.Port9042, + }, + }, + { + Name: models.JMX, + Port: models.Port7199, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: models.Port7199, + }, + }, + } + if c.Spec.DataCentres[0].ClientToClusterEncryption { + sslPort := k8scorev1.ServicePort{ + Name: models.SSL, + Port: models.Port7001, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: models.Port7001, + }, + } + additionalPorts = append(additionalPorts, sslPort) + } + ports = append(ports, additionalPorts...) + } + + return ports +} + +func (c *Cassandra) NewHeadlessPorts() []k8scorev1.ServicePort { + ports := []k8scorev1.ServicePort{ + { + Name: models.InterNode, + Port: models.Port7000, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: models.Port7000, + }, + }, + { + Name: models.CQLSH, + Port: models.Port9042, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: models.Port9042, + }, + }, + } + + return ports +} + func (c *Cassandra) FromInstAPI(iData []byte) (*Cassandra, error) { iCass := &models.CassandraCluster{} err := json.Unmarshal(iData, iCass) diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index 2f53f6749..df4644083 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -423,26 +423,6 @@ func (in *CassandraList) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *CassandraOnPremisesSpec) DeepCopyInto(out *CassandraOnPremisesSpec) { - *out = *in - if in.CloudInitScriptNamespacedName != nil { - in, out := &in.CloudInitScriptNamespacedName, &out.CloudInitScriptNamespacedName - *out = new(NamespacedName) - **out = **in - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CassandraOnPremisesSpec. -func (in *CassandraOnPremisesSpec) DeepCopy() *CassandraOnPremisesSpec { - if in == nil { - return nil - } - out := new(CassandraOnPremisesSpec) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraRestoreFrom) DeepCopyInto(out *CassandraRestoreFrom) { *out = *in @@ -479,7 +459,7 @@ func (in *CassandraSpec) DeepCopyInto(out *CassandraSpec) { } if in.OnPremisesSpec != nil { in, out := &in.OnPremisesSpec, &out.OnPremisesSpec - *out = new(CassandraOnPremisesSpec) + *out = new(OnPremisesSpec) (*in).DeepCopyInto(*out) } in.Cluster.DeepCopyInto(&out.Cluster) @@ -1387,6 +1367,26 @@ func (in *OnPremiseNode) DeepCopy() *OnPremiseNode { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OnPremisesSpec) DeepCopyInto(out *OnPremisesSpec) { + *out = *in + if in.CloudInitScriptRef != nil { + in, out := &in.CloudInitScriptRef, &out.CloudInitScriptRef + *out = new(NamespacedName) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OnPremisesSpec. +func (in *OnPremisesSpec) DeepCopy() *OnPremisesSpec { + if in == nil { + return nil + } + out := new(OnPremisesSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OpenSearch) DeepCopyInto(out *OpenSearch) { *out = *in diff --git a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml index 8fda040e9..b43559d8a 100644 --- a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml +++ b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml @@ -111,7 +111,7 @@ spec: type: string onPremisesSpec: properties: - cloudInitScriptNamespacedName: + cloudInitScriptRef: properties: name: type: string @@ -140,7 +140,7 @@ spec: storageClassName: type: string required: - - cloudInitScriptNamespacedName + - cloudInitScriptRef - dataDiskSize - nodeCPU - nodeMemory diff --git a/config/samples/clusters_v1beta1_cassandra.yaml b/config/samples/clusters_v1beta1_cassandra.yaml index b7f63121c..9ea63764b 100644 --- a/config/samples/clusters_v1beta1_cassandra.yaml +++ b/config/samples/clusters_v1beta1_cassandra.yaml @@ -15,7 +15,7 @@ spec: nodeCPU: 2 nodeMemory: 8192Mi osImageURL: "https://s3.amazonaws.com/debian-bucket/debian-11-generic-amd64-20230601-1398.raw" - cloudInitScriptNamespacedName: + cloudInitScriptRef: namespace: default name: instaclustr-cloud-init-secret dataCentres: diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index 6f0aef021..1d102e3d2 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -19,22 +19,15 @@ package clusters import ( "context" "errors" - "fmt" "strconv" - "strings" "github.com/go-logr/logr" k8scorev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" - virtcorev1 "kubevirt.io/api/core/v1" - cdiv1beta1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -59,7 +52,7 @@ type CassandraReconciler struct { client.Client Scheme *runtime.Scheme API instaclustr.API - IcadminAPI instaclustr.IcadminAPI + IcAdminAPI instaclustr.IcadminAPI Scheduler scheduler.Interface EventRecorder record.EventRecorder } @@ -101,16 +94,10 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( switch cassandra.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - if cassandra.Spec.OnPremisesSpec != nil { - return r.handleCreateOnPremisesCluster(ctx, l, cassandra) - } return r.handleCreateCluster(ctx, l, cassandra) case models.UpdatingEvent: return r.handleUpdateCluster(ctx, l, cassandra) case models.DeletingEvent: - if cassandra.Spec.OnPremisesSpec != nil { - return r.handleDeleteOnPremisesCluster(ctx, l, cassandra) - } return r.handleDeleteCluster(ctx, l, cassandra) case models.GenericEvent: l.Info("Event isn't handled", @@ -241,6 +228,10 @@ func (r *CassandraReconciler) handleCreateCluster( ) } + if cassandra.Spec.OnPremisesSpec != nil { + return r.handleCreateOnPremisesCluster(ctx, l, cassandra) + } + if cassandra.Status.State != models.DeletedStatus { err = r.startClusterStatusJob(cassandra) if err != nil { @@ -301,149 +292,81 @@ func (r *CassandraReconciler) handleCreateOnPremisesCluster( l logr.Logger, cassandra *v1beta1.Cassandra, ) (reconcile.Result, error) { - l = l.WithName("On-premises Cassandra creation event") - patch := cassandra.NewPatch() - if cassandra.Status.ID == "" { - l.Info( - "Creating on-premises cluster", + iData, err := r.API.GetCassandra(cassandra.Status.ID) + if err != nil { + l.Error(err, "Cannot get cluster from the Instaclustr API", "cluster name", cassandra.Spec.Name, - "data centres", cassandra.Spec.DataCentres, + "cluster ID", cassandra.Status.ID, ) - id, err := r.API.CreateCluster(instaclustr.CassandraEndpoint, cassandra.Spec.ToInstAPI()) - if err != nil { - l.Error( - err, "Cannot create cluster", - "cluster spec", cassandra.Spec, - ) - r.EventRecorder.Eventf( - cassandra, models.Warning, models.CreationFailed, - "Cluster creation on the Instaclustr is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - r.EventRecorder.Eventf( - cassandra, models.Normal, models.Created, - "Cluster creation request is sent. Cluster ID: %s", - id, + cassandra, models.Warning, models.FetchFailed, + "Cluster fetch from the Instaclustr API is failed. Reason: %v", + err, ) + return reconcile.Result{}, err + } - cassandra.Status.ID = id - err = r.Status().Patch(ctx, cassandra, patch) - if err != nil { - l.Error(err, "Cannot patch cluster status", - "cluster name", cassandra.Spec.Name, - "cluster ID", cassandra.Status.ID, - "kind", cassandra.Kind, - "api Version", cassandra.APIVersion, - "namespace", cassandra.Namespace, - "cluster metadata", cassandra.ObjectMeta, - ) - r.EventRecorder.Eventf( - cassandra, models.Warning, models.PatchFailed, - "Cluster resource status patch is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - - cassandra.Annotations[models.ResourceStateAnnotation] = models.CreatingEvent - err = r.Patch(ctx, cassandra, patch) - if err != nil { - l.Error(err, "Cannot patch cluster", - "cluster name", cassandra.Spec.Name, - "cluster ID", cassandra.Status.ID, - "kind", cassandra.Kind, - "api Version", cassandra.APIVersion, - "namespace", cassandra.Namespace, - "cluster metadata", cassandra.ObjectMeta, - ) - r.EventRecorder.Eventf( - cassandra, models.Warning, models.PatchFailed, - "Cluster resource patch is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - - err = r.startClusterStatusJob(cassandra) - if err != nil { - l.Error(err, "Cannot start cluster status job", - "cassandra cluster ID", cassandra.Status.ID) - - r.EventRecorder.Eventf( - cassandra, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + iCassandra, err := cassandra.FromInstAPI(iData) + if err != nil { + l.Error( + err, "Cannot convert cluster from the Instaclustr API", + "cluster name", cassandra.Spec.Name, + "cluster ID", cassandra.Status.ID, + ) r.EventRecorder.Eventf( - cassandra, models.Normal, models.Created, - "Cluster status check job is started", + cassandra, models.Warning, models.ConversionFailed, + "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", + err, ) + return reconcile.Result{}, err } - if len(cassandra.Status.DataCentres) > 0 && cassandra.Status.State != models.RunningStatus { - err := r.reconcileOnPremResources(ctx, cassandra) - if err != nil { - l.Error( - err, "Cannot create resources for on-premises cluster", - "cluster spec", cassandra.Spec.OnPremisesSpec, - ) - r.EventRecorder.Eventf( - cassandra, models.Warning, models.CreationFailed, - "Resources creation for on-premises cluster is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + bootstrap := newOnPremiseBootstrap( + r.IcAdminAPI, + r.Client, + cassandra, + iCassandra.Status.ID, + iCassandra.Status.DataCentres[0].ID, + cassandra.Spec.OnPremisesSpec, + cassandra.NewExposePorts(), + cassandra.NewHeadlessPorts(), + cassandra.Spec.PrivateNetworkCluster) - l.Info( - "On-premises resources have been created", - "cluster name", cassandra.Spec.Name, - "on-premises Spec", cassandra.Spec.OnPremisesSpec, - "cluster ID", cassandra.Status.ID, + err = reconcileOnPremResources(ctx, bootstrap) + if err != nil { + l.Error( + err, "Cannot create resources for on-premises cluster", + "cluster spec", cassandra.Spec.OnPremisesSpec, ) - - } else { - l.Info("Waiting for Data Centres provisioning...") - return models.ReconcileRequeue, nil + r.EventRecorder.Eventf( + cassandra, models.Warning, models.CreationFailed, + "Resources creation for on-premises cluster is failed. Reason: %v", + err, + ) + return reconcile.Result{}, err } - controllerutil.AddFinalizer(cassandra, models.DeletionFinalizer) - cassandra.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent - err := r.Patch(ctx, cassandra, patch) + err = r.startClusterStatusJob(cassandra) if err != nil { - l.Error(err, "Cannot patch cluster", - "cluster name", cassandra.Spec.Name, - "cluster ID", cassandra.Status.ID, - "kind", cassandra.Kind, - "api Version", cassandra.APIVersion, - "namespace", cassandra.Namespace, - "cluster metadata", cassandra.ObjectMeta, - ) + l.Error(err, "Cannot start cluster status job", + "cassandra cluster ID", cassandra.Status.ID) + r.EventRecorder.Eventf( - cassandra, models.Warning, models.PatchFailed, - "Cluster resource patch is failed. Reason: %v", + cassandra, models.Warning, models.CreationFailed, + "Cluster status check job is failed. Reason: %v", err, ) return reconcile.Result{}, err } - l.Info( - "Cluster has been created", - "cluster name", cassandra.Spec.Name, - "cluster ID", cassandra.Status.ID, - "kind", cassandra.Kind, - "api Version", cassandra.APIVersion, - "namespace", cassandra.Namespace, + r.EventRecorder.Eventf( + cassandra, models.Normal, models.Created, + "Cluster status check job is started", ) - err = r.startClusterOnPremisesIPsJob(cassandra) + err = r.startClusterOnPremisesIPsJob(iCassandra) if err != nil { l.Error(err, "Cannot start cluster on-premises IPs job", "cassandra cluster ID", cassandra.Status.ID) @@ -455,6 +378,12 @@ func (r *CassandraReconciler) handleCreateOnPremisesCluster( ) return reconcile.Result{}, err } + + l.Info("On-premises resources have been created", + "cluster name", cassandra.Spec.Name, + "on-premises Spec", cassandra.Spec.OnPremisesSpec, + "cluster ID", cassandra.Status.ID) + return models.ExitReconcile, nil } @@ -727,6 +656,43 @@ func (r *CassandraReconciler) handleDeleteCluster( r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.BackupsChecker)) r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.StatusChecker)) + if cassandra.Spec.OnPremisesSpec != nil { + r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.OnPremisesIPsChecker)) + + err = deleteOnPremResources(ctx, r.Client, cassandra.Status.ID, cassandra.Namespace) + if err != nil { + l.Error(err, "Cannot delete cluster on-premises resources", + "cluster ID", cassandra.Status.ID) + r.EventRecorder.Eventf(cassandra, models.Warning, models.DeletionFailed, + "Cluster on-premises resources deletion is failed. Reason: %v", err) + return reconcile.Result{}, err + } + + l.Info("Cluster on-premises resources are deleted", + "cluster ID", cassandra.Status.ID) + r.EventRecorder.Eventf(cassandra, models.Normal, models.Deleted, + "Cluster on-premises resources are deleted deleted") + + controllerutil.RemoveFinalizer(cassandra, models.DeletionFinalizer) + + err = r.Patch(ctx, cassandra, patch) + if err != nil { + l.Error(err, "Cannot patch cluster resource", + "cluster name", cassandra.Spec.Name, + "cluster ID", cassandra.Status.ID, + "kind", cassandra.Kind, + "api Version", cassandra.APIVersion, + "namespace", cassandra.Namespace, + "cluster metadata", cassandra.ObjectMeta, + ) + r.EventRecorder.Eventf(cassandra, models.Warning, models.PatchFailed, + "Cluster resource patch is failed. Reason: %v", err) + return reconcile.Result{}, err + } + + return reconcile.Result{}, err + } + l.Info("Deleting cluster backup resources", "cluster ID", cassandra.Status.ID) err = r.deleteBackups(ctx, cassandra.Status.ID, cassandra.Namespace) @@ -803,140 +769,6 @@ func (r *CassandraReconciler) handleDeleteCluster( return models.ExitReconcile, nil } -func (r *CassandraReconciler) handleDeleteOnPremisesCluster( - ctx context.Context, - l logr.Logger, - c *v1beta1.Cassandra, -) (reconcile.Result, error) { - l = l.WithName("On-premises Cassandra deletion event") - - _, err := r.API.GetCassandra(c.Status.ID) - if err != nil && !errors.Is(err, instaclustr.NotFound) { - l.Error( - err, "Cannot get cluster from the Instaclustr API", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - "kind", c.Kind, - "api Version", c.APIVersion, - "namespace", c.Namespace, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.FetchFailed, - "Cluster fetch from the Instaclustr API is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - - patch := c.NewPatch() - - if !errors.Is(err, instaclustr.NotFound) { - l.Info("Sending cluster deletion to the Instaclustr API", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID) - - err = r.API.DeleteCluster(c.Status.ID, instaclustr.CassandraEndpoint) - if err != nil { - l.Error(err, "Cannot delete cluster", - "cluster name", c.Spec.Name, - "state", c.Status.State, - "kind", c.Kind, - "api Version", c.APIVersion, - "namespace", c.Namespace, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.DeletionFailed, - "Cluster deletion on the Instaclustr API is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - - r.EventRecorder.Event(c, models.Normal, models.DeletionStarted, - "Cluster deletion request is sent to the Instaclustr API.") - - if c.Spec.TwoFactorDelete != nil { - c.Annotations[models.ResourceStateAnnotation] = models.UpdatedEvent - c.Annotations[models.ClusterDeletionAnnotation] = models.Triggered - err = r.Patch(ctx, c, patch) - if err != nil { - l.Error(err, "Cannot patch cluster resource", - "cluster name", c.Spec.Name, - "cluster state", c.Status.State) - r.EventRecorder.Eventf(c, models.Warning, models.PatchFailed, - "Cluster resource patch is failed. Reason: %v", err) - - return reconcile.Result{}, err - } - - l.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", c.Status.ID) - - r.EventRecorder.Event(c, models.Normal, models.DeletionStarted, - "Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.") - - return reconcile.Result{}, err - } - } - - r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker)) - r.Scheduler.RemoveJob(c.GetJobID(scheduler.OnPremisesIPsChecker)) - - err = r.deleteOnPremResources(ctx, c) - if err != nil { - l.Error(err, "Cannot delete cluster on-premises resources", - "cluster ID", c.Status.ID, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.DeletionFailed, - "Cluster on-premises resources deletion is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - - l.Info("Cluster on-premises resources are deleted", - "cluster ID", c.Status.ID, - ) - r.EventRecorder.Eventf( - c, models.Normal, models.Deleted, - "Cluster on-premises resources are deleted deleted", - ) - - controllerutil.RemoveFinalizer(c, models.DeletionFinalizer) - c.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent - err = r.Patch(ctx, c, patch) - if err != nil { - l.Error(err, "Cannot patch cluster resource", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - "kind", c.Kind, - "api Version", c.APIVersion, - "namespace", c.Namespace, - "cluster metadata", c.ObjectMeta, - ) - - r.EventRecorder.Eventf( - c, models.Warning, models.PatchFailed, - "Cluster resource patch is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - - l.Info("Cluster has been deleted", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - "kind", c.Kind, - "api Version", c.APIVersion) - - r.EventRecorder.Eventf( - c, models.Normal, models.Deleted, - "Cluster resource is deleted", - ) - - return models.ExitReconcile, nil -} - func (r *CassandraReconciler) handleUsersCreate( ctx context.Context, l logr.Logger, @@ -1204,11 +1036,9 @@ func (r *CassandraReconciler) startClusterOnPremisesIPsJob(cluster *v1beta1.Cass func (r *CassandraReconciler) newWatchOnPremisesIPsJob(c *v1beta1.Cassandra) scheduler.Job { l := log.Log.WithValues("component", "cassandraOnPremStatusClusterJob") - return func() error { - - if c.Spec.OnPremisesSpec != nil && c.Spec.PrivateNetworkCluster { - gateways, err := r.IcadminAPI.GetGateways(c.Status.DataCentres[0].ID) + if c.Spec.PrivateNetworkCluster { + gateways, err := r.IcAdminAPI.GetGateways(c.Status.DataCentres[0].ID) if err != nil { l.Error(err, "Cannot get Cassandra SSH-gateway nodes from the Instaclustr API", "cluster name", c.Spec.Name, @@ -1247,7 +1077,7 @@ func (r *CassandraReconciler) newWatchOnPremisesIPsJob(c *v1beta1.Cassandra) sch for _, pod := range gatewayPods.Items { if (pod.Status.PodIP != "" && gateway.PrivateAddress == "") || (pod.Status.PodIP != "" && pod.Status.PodIP != gateway.PrivateAddress) { - err = r.IcadminAPI.SetPrivateGatewayIP(c.Status.DataCentres[0].ID, pod.Status.PodIP) + err = r.IcAdminAPI.SetPrivateGatewayIP(c.Status.DataCentres[0].ID, pod.Status.PodIP) if err != nil { l.Error(err, "Cannot set Private IP for the SSH-gateway node", "cluster name", c.Spec.Name, @@ -1289,7 +1119,7 @@ func (r *CassandraReconciler) newWatchOnPremisesIPsJob(c *v1beta1.Cassandra) sch for _, svc := range gatewaySVCs.Items { if (svc.Status.LoadBalancer.Ingress[0].IP != "" && gateway.PublicAddress == "") || (svc.Status.LoadBalancer.Ingress[0].IP != gateway.PublicAddress) { - err = r.IcadminAPI.SetPublicGatewayIP(c.Status.DataCentres[0].ID, svc.Status.LoadBalancer.Ingress[0].IP) + err = r.IcAdminAPI.SetPublicGatewayIP(c.Status.DataCentres[0].ID, svc.Status.LoadBalancer.Ingress[0].IP) if err != nil { l.Error(err, "Cannot set Public IP for the SSH-gateway node", "cluster name", c.Spec.Name, @@ -1319,7 +1149,7 @@ func (r *CassandraReconciler) newWatchOnPremisesIPsJob(c *v1beta1.Cassandra) sch } request := &v1beta1.OnPremiseNode{} - nodes, err := r.IcadminAPI.GetOnPremisesNodes(c.Status.ID) + nodes, err := r.IcAdminAPI.GetOnPremisesNodes(c.Status.ID) if err != nil { l.Error(err, "Cannot get Cassandra on-premises nodes from the Instaclustr API", "cluster name", c.Spec.Name, @@ -1392,7 +1222,7 @@ func (r *CassandraReconciler) newWatchOnPremisesIPsJob(c *v1beta1.Cassandra) sch } if request.PublicAddress != "" || request.PrivateAddress != "" { - err = r.IcadminAPI.SetNodeIPs(node.ID, request) + err = r.IcAdminAPI.SetNodeIPs(node.ID, request) if err != nil { l.Error(err, "Cannot set IPs for on-premises cluster nodes", "cluster name", c.Spec.Name, @@ -1785,817 +1615,6 @@ func (r *CassandraReconciler) reconcileMaintenanceEvents(ctx context.Context, c return nil } -func (r *CassandraReconciler) reconcileOnPremResources( - ctx context.Context, - c *v1beta1.Cassandra, -) error { - if c.Spec.PrivateNetworkCluster { - err := r.reconcileSSHGatewayResources(ctx, c) - if err != nil { - return err - } - } - - err := r.reconcileNodesResources(ctx, c) - if err != nil { - return err - } - - return nil -} - -func (r *CassandraReconciler) reconcileSSHGatewayResources( - ctx context.Context, - c *v1beta1.Cassandra, -) error { - gateways, err := r.IcadminAPI.GetGateways(c.Status.DataCentres[0].ID) - if err != nil { - return err - } - - for i, gateway := range gateways { - gatewayDVSize, err := resource.ParseQuantity(c.Spec.OnPremisesSpec.OSDiskSize) - if err != nil { - return err - } - - gatewayDVName := fmt.Sprintf("%s-%d-%s", models.GatewayDVPrefix, i, strings.ToLower(c.Spec.Name)) - gatewayDV, err := r.createDV(ctx, c, gatewayDVName, gateway.ID, gatewayDVSize, true) - if err != nil { - return err - } - - gatewayCPU := resource.Quantity{} - gatewayCPU.Set(c.Spec.OnPremisesSpec.SSHGatewayCPU) - - gatewayMemory, err := resource.ParseQuantity(c.Spec.OnPremisesSpec.SSHGatewayMemory) - if err != nil { - return err - } - - gatewayName := fmt.Sprintf("%s-%d-%s", models.GatewayVMPrefix, i, strings.ToLower(c.Spec.Name)) - - secretName, err := r.reconcileIgnitionScriptSecret(ctx, c, gatewayName, gateway.ID, gateway.Rack) - if err != nil { - return err - } - - gatewayVM := &virtcorev1.VirtualMachine{} - err = r.Get(ctx, types.NamespacedName{ - Namespace: c.Namespace, - Name: gatewayName, - }, gatewayVM) - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - gatewayVM, err = r.newVM( - ctx, - c, - gatewayName, - gateway.ID, - gateway.Rack, - gatewayDV.Name, - secretName, - gatewayCPU, - gatewayMemory) - if err != nil { - return err - } - err = r.Client.Create(ctx, gatewayVM) - if err != nil { - return err - } - } - - gatewaySvcName := fmt.Sprintf("%s-%s", models.GatewaySvcPrefix, gatewayName) - gatewayExposeService := &k8scorev1.Service{} - err = r.Get(ctx, types.NamespacedName{ - Namespace: c.Namespace, - Name: gatewaySvcName, - }, gatewayExposeService) - - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - gatewayExposeService = r.newExposeService(c, gatewaySvcName, gatewayName, gateway.ID) - err = r.Client.Create(ctx, gatewayExposeService) - if err != nil { - return err - } - } - } - - return nil -} - -func (r *CassandraReconciler) reconcileNodesResources( - ctx context.Context, - c *v1beta1.Cassandra, -) error { - nodes, err := r.IcadminAPI.GetOnPremisesNodes(c.Status.ID) - if err != nil { - return err - } - - for i, node := range nodes { - nodeOSDiskSize, err := resource.ParseQuantity(c.Spec.OnPremisesSpec.OSDiskSize) - if err != nil { - return err - } - - nodeOSDiskDVName := fmt.Sprintf("%s-%d-%s", models.NodeOSDVPrefix, i, strings.ToLower(c.Name)) - nodeOSDV, err := r.createDV(ctx, c, nodeOSDiskDVName, node.ID, nodeOSDiskSize, true) - if err != nil { - return err - } - - nodeDataDiskDVSize, err := resource.ParseQuantity(c.Spec.OnPremisesSpec.DataDiskSize) - if err != nil { - return err - } - - nodeDataDiskDVName := fmt.Sprintf("%s-%d-%s", models.NodeDVPrefix, i, strings.ToLower(c.Name)) - nodeDataDV, err := r.createDV(ctx, c, nodeDataDiskDVName, node.ID, nodeDataDiskDVSize, false) - if err != nil { - return err - } - - nodeCPU := resource.Quantity{} - nodeCPU.Set(c.Spec.OnPremisesSpec.NodeCPU) - - nodeMemory, err := resource.ParseQuantity(c.Spec.OnPremisesSpec.NodeMemory) - if err != nil { - return err - } - - nodeName := fmt.Sprintf("%s-%d-%s", models.NodeVMPrefix, i, strings.ToLower(c.Name)) - - secretName, err := r.reconcileIgnitionScriptSecret(ctx, c, nodeName, node.ID, node.Rack) - if err != nil { - return err - } - - nodeVM := &virtcorev1.VirtualMachine{} - err = r.Get(ctx, types.NamespacedName{ - Namespace: c.Namespace, - Name: nodeName, - }, nodeVM) - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - nodeVM, err = r.newVM( - ctx, - c, - nodeName, - node.ID, - node.Rack, - nodeOSDV.Name, - secretName, - nodeCPU, - nodeMemory, - nodeDataDV.Name) - if err != nil { - return err - } - err = r.Client.Create(ctx, nodeVM) - if err != nil { - return err - } - } - - if !c.Spec.PrivateNetworkCluster { - nodeExposeName := fmt.Sprintf("%s-%s", models.NodeSvcPrefix, nodeName) - nodeExposeService := &k8scorev1.Service{} - err = r.Get(ctx, types.NamespacedName{ - Namespace: c.Namespace, - Name: nodeExposeName, - }, nodeExposeService) - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - nodeExposeService = r.newExposeService(c, nodeExposeName, nodeName, node.ID) - err = r.Client.Create(ctx, nodeExposeService) - if err != nil { - return err - } - } - } - - headlessServiceName := fmt.Sprintf("%s-%s", models.KubevirtSubdomain, c.Spec.Name) - headlessSVC := &k8scorev1.Service{} - err = r.Get(ctx, types.NamespacedName{ - Namespace: c.Namespace, - Name: headlessServiceName, - }, headlessSVC) - - if client.IgnoreNotFound(err) != nil { - return err - } - if k8serrors.IsNotFound(err) { - ports := []k8scorev1.ServicePort{ - { - Name: models.InterNode, - Port: models.Port7000, - TargetPort: intstr.IntOrString{ - Type: intstr.Int, - IntVal: models.Port7000, - }, - }, - { - Name: models.CQLSH, - Port: models.Port9042, - TargetPort: intstr.IntOrString{ - Type: intstr.Int, - IntVal: models.Port9042, - }, - }, - } - headlessSVC = &k8scorev1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: models.ServiceKind, - APIVersion: models.K8sAPIVersionV1, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: headlessServiceName, - Namespace: c.Namespace, - Labels: map[string]string{ - models.ClusterIDLabel: c.Status.ID, - }, - //Finalizers: []string{models.DeletionFinalizer}, - }, - Spec: k8scorev1.ServiceSpec{ - ClusterIP: "None", - Ports: ports, - Selector: map[string]string{ - models.ClusterIDLabel: c.Status.ID, - }, - }, - } - err = r.Client.Create(ctx, headlessSVC) - if err != nil { - return err - } - } - - } - return nil -} - -func (r *CassandraReconciler) createDV( - ctx context.Context, - c *v1beta1.Cassandra, - name, - nodeID string, - size resource.Quantity, - isOSDisk bool, -) (*cdiv1beta1.DataVolume, error) { - dv := &cdiv1beta1.DataVolume{} - pvc := &k8scorev1.PersistentVolumeClaim{} - err := r.Get(ctx, types.NamespacedName{ - Namespace: c.Namespace, - Name: name, - }, pvc) - if client.IgnoreNotFound(err) != nil { - return nil, err - } - if k8serrors.IsNotFound(err) { - err = r.Get(ctx, types.NamespacedName{ - Namespace: c.Namespace, - Name: name, - }, dv) - if client.IgnoreNotFound(err) != nil { - return nil, err - } - if k8serrors.IsNotFound(err) { - if isOSDisk { - dv = r.newOSDiskDV(c, name, nodeID, size) - } else { - dv = r.newDataDiskDV(c, name, nodeID, size) - } - err = r.Client.Create(ctx, dv) - if err != nil { - return nil, err - } - } - } - return dv, nil -} - -func (r *CassandraReconciler) newOSDiskDV( - c *v1beta1.Cassandra, - name, - nodeID string, - size resource.Quantity, -) *cdiv1beta1.DataVolume { - return &cdiv1beta1.DataVolume{ - TypeMeta: metav1.TypeMeta{ - Kind: models.DVKind, - APIVersion: models.CDIKubevirtV1beta1APIVersion, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: c.Namespace, - Labels: map[string]string{ - models.ClusterIDLabel: c.Status.ID, - models.NodeIDLabel: nodeID, - }, - Finalizers: []string{models.DeletionFinalizer}, - }, - Spec: cdiv1beta1.DataVolumeSpec{ - Source: &cdiv1beta1.DataVolumeSource{ - HTTP: &cdiv1beta1.DataVolumeSourceHTTP{ - URL: c.Spec.OnPremisesSpec.OSImageURL, - }, - }, - PVC: &k8scorev1.PersistentVolumeClaimSpec{ - AccessModes: []k8scorev1.PersistentVolumeAccessMode{ - k8scorev1.ReadWriteOnce, - }, - Resources: k8scorev1.ResourceRequirements{ - Requests: k8scorev1.ResourceList{ - models.Storage: size, - }, - }, - StorageClassName: &c.Spec.OnPremisesSpec.StorageClassName, - }, - }, - } -} - -func (r *CassandraReconciler) newDataDiskDV( - c *v1beta1.Cassandra, - name, - nodeID string, - size resource.Quantity, -) *cdiv1beta1.DataVolume { - return &cdiv1beta1.DataVolume{ - TypeMeta: metav1.TypeMeta{ - Kind: models.DVKind, - APIVersion: models.CDIKubevirtV1beta1APIVersion, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: c.Namespace, - Labels: map[string]string{ - models.ClusterIDLabel: c.Status.ID, - models.NodeIDLabel: nodeID, - }, - Finalizers: []string{models.DeletionFinalizer}, - }, - Spec: cdiv1beta1.DataVolumeSpec{ - Source: &cdiv1beta1.DataVolumeSource{ - Blank: &cdiv1beta1.DataVolumeBlankImage{}, - }, - PVC: &k8scorev1.PersistentVolumeClaimSpec{ - AccessModes: []k8scorev1.PersistentVolumeAccessMode{ - k8scorev1.ReadWriteOnce, - }, - Resources: k8scorev1.ResourceRequirements{ - Requests: k8scorev1.ResourceList{ - models.Storage: size, - }, - }, - StorageClassName: &c.Spec.OnPremisesSpec.StorageClassName, - }, - }, - } -} - -func (r *CassandraReconciler) newExposeService( - c *v1beta1.Cassandra, - name, - vmName, - nodeID string, -) *k8scorev1.Service { - var ports []k8scorev1.ServicePort - ports = []k8scorev1.ServicePort{{ - Name: models.SSH, - Port: models.Port22, - TargetPort: intstr.IntOrString{ - Type: intstr.Int, - IntVal: models.Port22, - }, - }, - } - - if !c.Spec.PrivateNetworkCluster { - additionalPorts := []k8scorev1.ServicePort{ - { - Name: models.InterNode, - Port: models.Port7000, - TargetPort: intstr.IntOrString{ - Type: intstr.Int, - IntVal: models.Port7000, - }, - }, - { - Name: models.CQLSH, - Port: models.Port9042, - TargetPort: intstr.IntOrString{ - Type: intstr.Int, - IntVal: models.Port9042, - }, - }, - { - Name: models.JMX, - Port: models.Port7199, - TargetPort: intstr.IntOrString{ - Type: intstr.Int, - IntVal: models.Port7199, - }, - }, - } - if c.Spec.DataCentres[0].ClientToClusterEncryption { - sslPort := k8scorev1.ServicePort{ - Name: models.SSL, - Port: models.Port7001, - TargetPort: intstr.IntOrString{ - Type: intstr.Int, - IntVal: models.Port7001, - }, - } - additionalPorts = append(additionalPorts, sslPort) - } - ports = append(ports, additionalPorts...) - } - - return &k8scorev1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: models.ServiceKind, - APIVersion: models.K8sAPIVersionV1, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: c.Namespace, - Labels: map[string]string{ - models.ClusterIDLabel: c.Status.ID, - models.NodeIDLabel: nodeID, - }, - Finalizers: []string{models.DeletionFinalizer}, - }, - Spec: k8scorev1.ServiceSpec{ - Ports: ports, - Selector: map[string]string{ - models.KubevirtDomainLabel: vmName, - models.NodeIDLabel: nodeID, - }, - Type: models.LBType, - }, - } -} - -func (r *CassandraReconciler) newVM( - ctx context.Context, - c *v1beta1.Cassandra, - vmName, - nodeID, - nodeRack, - OSDiskDVName, - ignitionSecretName string, - cpu, - memory resource.Quantity, - storageDVNames ...string, -) (*virtcorev1.VirtualMachine, error) { - runStrategy := virtcorev1.RunStrategyAlways - bootOrder1 := uint(1) - - cloudInitSecret := &k8scorev1.Secret{} - err := r.Get(ctx, types.NamespacedName{ - Namespace: c.Spec.OnPremisesSpec.CloudInitScriptNamespacedName.Namespace, - Name: c.Spec.OnPremisesSpec.CloudInitScriptNamespacedName.Name, - }, cloudInitSecret) - if err != nil { - return nil, err - } - - vm := &virtcorev1.VirtualMachine{ - TypeMeta: metav1.TypeMeta{ - Kind: models.VirtualMachineKind, - APIVersion: models.KubevirtV1APIVersion, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: vmName, - Namespace: c.Namespace, - Labels: map[string]string{ - models.ClusterIDLabel: c.Status.ID, - models.NodeIDLabel: nodeID, - models.NodeRackLabel: nodeRack, - models.KubevirtDomainLabel: vmName, - }, - Finalizers: []string{models.DeletionFinalizer}, - }, - Spec: virtcorev1.VirtualMachineSpec{ - RunStrategy: &runStrategy, - Template: &virtcorev1.VirtualMachineInstanceTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - models.ClusterIDLabel: c.Status.ID, - models.NodeIDLabel: nodeID, - models.NodeRackLabel: nodeRack, - models.KubevirtDomainLabel: vmName, - }, - }, - Spec: virtcorev1.VirtualMachineInstanceSpec{ - Hostname: vmName, - Subdomain: fmt.Sprintf("%s-%s", models.KubevirtSubdomain, c.Spec.Name), - Domain: virtcorev1.DomainSpec{ - Resources: virtcorev1.ResourceRequirements{ - Requests: k8scorev1.ResourceList{ - models.CPU: cpu, - models.Memory: memory, - }, - }, - Devices: virtcorev1.Devices{ - Disks: []virtcorev1.Disk{ - { - Name: models.Boot, - BootOrder: &bootOrder1, - IO: models.Native, - Cache: models.None, - DiskDevice: virtcorev1.DiskDevice{ - Disk: &virtcorev1.DiskTarget{ - Bus: models.Virtio, - }, - }, - }, - { - Name: models.CloudInit, - DiskDevice: virtcorev1.DiskDevice{}, - Cache: models.None, - }, - { - Name: models.IgnitionDisk, - DiskDevice: virtcorev1.DiskDevice{}, - Serial: models.IgnitionSerial, - Cache: models.None, - }, - }, - Interfaces: []virtcorev1.Interface{ - { - Name: models.Default, - InterfaceBindingMethod: virtcorev1.InterfaceBindingMethod{ - Bridge: &virtcorev1.InterfaceBridge{}, - }, - }, - }, - }, - }, - Volumes: []virtcorev1.Volume{ - { - Name: models.Boot, - VolumeSource: virtcorev1.VolumeSource{ - PersistentVolumeClaim: &virtcorev1.PersistentVolumeClaimVolumeSource{ - PersistentVolumeClaimVolumeSource: k8scorev1.PersistentVolumeClaimVolumeSource{ - ClaimName: OSDiskDVName, - }, - }, - }, - }, - { - Name: models.CloudInit, - VolumeSource: virtcorev1.VolumeSource{ - CloudInitNoCloud: &virtcorev1.CloudInitNoCloudSource{ - UserDataSecretRef: &k8scorev1.LocalObjectReference{ - Name: c.Spec.OnPremisesSpec.CloudInitScriptNamespacedName.Name, - }, - }, - }, - }, - { - Name: models.IgnitionDisk, - VolumeSource: virtcorev1.VolumeSource{ - Secret: &virtcorev1.SecretVolumeSource{ - SecretName: ignitionSecretName, - }, - }, - }, - }, - Networks: []virtcorev1.Network{ - { - Name: models.Default, - NetworkSource: virtcorev1.NetworkSource{ - Pod: &virtcorev1.PodNetwork{}, - }, - }, - }, - }, - }, - }, - } - - for i, dvName := range storageDVNames { - diskName := fmt.Sprintf("%s-%d-%s", models.DataDisk, i, vm.Name) - vm.Spec.Template.Spec.Domain.Devices.Disks = append(vm.Spec.Template.Spec.Domain.Devices.Disks, virtcorev1.Disk{ - Name: diskName, - IO: models.Native, - Cache: models.None, - DiskDevice: virtcorev1.DiskDevice{ - Disk: &virtcorev1.DiskTarget{ - Bus: models.Virtio, - }, - }, - Serial: models.DataDiskSerial, - }) - vm.Spec.Template.Spec.Volumes = append(vm.Spec.Template.Spec.Volumes, virtcorev1.Volume{ - Name: diskName, - VolumeSource: virtcorev1.VolumeSource{ - PersistentVolumeClaim: &virtcorev1.PersistentVolumeClaimVolumeSource{ - PersistentVolumeClaimVolumeSource: k8scorev1.PersistentVolumeClaimVolumeSource{ - ClaimName: dvName, - }, - }, - }, - }) - } - - return vm, nil -} - -func (r *CassandraReconciler) reconcileIgnitionScriptSecret( - ctx context.Context, - c *v1beta1.Cassandra, - nodeName, - nodeID, - nodeRack string, -) (string, error) { - ignitionSecret := &k8scorev1.Secret{} - err := r.Get(ctx, types.NamespacedName{ - Namespace: c.Namespace, - Name: fmt.Sprintf("%s-%s", models.IgnitionScriptSecretPrefix, nodeName), - }, ignitionSecret) - if client.IgnoreNotFound(err) != nil { - return "", err - } - if k8serrors.IsNotFound(err) { - script, err := r.IcadminAPI.GetIgnitionScript(nodeID) - if err != nil { - return "", err - } - - ignitionSecret = &k8scorev1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: models.SecretKind, - APIVersion: models.K8sAPIVersionV1, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", models.IgnitionScriptSecretPrefix, nodeName), - Namespace: c.Namespace, - Labels: map[string]string{ - models.ControlledByLabel: c.Name, - models.ClusterIDLabel: c.Status.ID, - models.NodeIDLabel: nodeID, - models.NodeRackLabel: nodeRack, - }, - Finalizers: []string{models.DeletionFinalizer}, - }, - StringData: map[string]string{ - models.Script: script, - }, - } - err = r.Create(ctx, ignitionSecret) - if err != nil { - return "", err - } - } - - return ignitionSecret.Name, nil -} - -func (r *CassandraReconciler) deleteOnPremResources( - ctx context.Context, - c *v1beta1.Cassandra, -) error { - vms := &virtcorev1.VirtualMachineList{} - err := r.List(ctx, vms, &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{ - models.ClusterIDLabel: c.Status.ID, - }), - Namespace: c.Namespace, - }) - if err != nil { - return err - } - - for _, vm := range vms.Items { - err = r.Delete(ctx, &vm) - if err != nil { - return err - } - - patch := client.MergeFrom(vm.DeepCopy()) - controllerutil.RemoveFinalizer(&vm, models.DeletionFinalizer) - err = r.Patch(ctx, &vm, patch) - if err != nil { - return err - } - } - - vmis := &virtcorev1.VirtualMachineInstanceList{} - err = r.List(ctx, vmis, &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{ - models.ClusterIDLabel: c.Status.ID, - }), - Namespace: c.Namespace, - }) - if err != nil { - return err - } - - for _, vmi := range vmis.Items { - err = r.Delete(ctx, &vmi) - if err != nil { - return err - } - - patch := client.MergeFrom(vmi.DeepCopy()) - controllerutil.RemoveFinalizer(&vmi, models.DeletionFinalizer) - err = r.Patch(ctx, &vmi, patch) - if err != nil { - return err - } - } - - dvs := &cdiv1beta1.DataVolumeList{} - err = r.List(ctx, dvs, &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{ - models.ClusterIDLabel: c.Status.ID, - }), - Namespace: c.Namespace, - }) - if err != nil { - return err - } - - for _, dv := range dvs.Items { - err = r.Delete(ctx, &dv) - if err != nil { - return err - } - - patch := client.MergeFrom(dv.DeepCopy()) - controllerutil.RemoveFinalizer(&dv, models.DeletionFinalizer) - err = r.Patch(ctx, &dv, patch) - if err != nil { - return err - } - } - - svcs := &k8scorev1.ServiceList{} - err = r.List(ctx, svcs, &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{ - models.ClusterIDLabel: c.Status.ID, - }), - Namespace: c.Namespace, - }) - if err != nil { - return err - } - - for _, svc := range svcs.Items { - err = r.Delete(ctx, &svc) - if err != nil { - return err - } - - patch := client.MergeFrom(svc.DeepCopy()) - controllerutil.RemoveFinalizer(&svc, models.DeletionFinalizer) - err = r.Patch(ctx, &svc, patch) - if err != nil { - return err - } - } - - secrets := &k8scorev1.SecretList{} - err = r.List(ctx, secrets, &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{ - models.ClusterIDLabel: c.Status.ID, - }), - Namespace: c.Namespace, - }) - if err != nil { - return err - } - - for _, secret := range secrets.Items { - err = r.Delete(ctx, &secret) - if err != nil { - return err - } - - patch := client.MergeFrom(secret.DeepCopy()) - controllerutil.RemoveFinalizer(&secret, models.DeletionFinalizer) - err = r.Patch(ctx, &secret, patch) - if err != nil { - return err - } - } - return nil -} - func (r *CassandraReconciler) handleExternalDelete(ctx context.Context, c *v1beta1.Cassandra) error { l := log.FromContext(ctx) diff --git a/controllers/clusters/on_premise.go b/controllers/clusters/on_premise.go new file mode 100644 index 000000000..a8227ad56 --- /dev/null +++ b/controllers/clusters/on_premise.go @@ -0,0 +1,765 @@ +package clusters + +import ( + "context" + "fmt" + "strings" + + k8scorev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + virtcorev1 "kubevirt.io/api/core/v1" + cdiv1beta1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/instaclustr/operator/apis/clusters/v1beta1" + "github.com/instaclustr/operator/pkg/instaclustr" + "github.com/instaclustr/operator/pkg/models" +) + +type onPremiseBootstrap struct { + IcAdminAPI instaclustr.IcadminAPI + K8sClient client.Client + K8sObject client.Object + ClusterID string + CdcID string + OnPremisesSpec *v1beta1.OnPremisesSpec + ExposeNodePorts []k8scorev1.ServicePort + HeadlessPorts []k8scorev1.ServicePort + PrivateNetworkCluster bool +} + +func newOnPremiseBootstrap( + icAdminAPI instaclustr.IcadminAPI, + k8sClient client.Client, + o client.Object, + clusterID, + cdcID string, + onPremisesSpec *v1beta1.OnPremisesSpec, + exposePorts, + headlessPorts []k8scorev1.ServicePort, + privateNetworkCluster bool, +) *onPremiseBootstrap { + return &onPremiseBootstrap{ + IcAdminAPI: icAdminAPI, + K8sClient: k8sClient, + K8sObject: o, + ClusterID: clusterID, + CdcID: cdcID, + OnPremisesSpec: onPremisesSpec, + ExposeNodePorts: exposePorts, + HeadlessPorts: headlessPorts, + PrivateNetworkCluster: privateNetworkCluster, + } +} + +func createDV( + ctx context.Context, + bootstrap *onPremiseBootstrap, + name, + nodeID string, + size resource.Quantity, + isOSDisk bool, +) (*cdiv1beta1.DataVolume, error) { + ns := bootstrap.K8sObject.GetNamespace() + dv := &cdiv1beta1.DataVolume{} + pvc := &k8scorev1.PersistentVolumeClaim{} + err := bootstrap.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: ns, + Name: name, + }, pvc) + if client.IgnoreNotFound(err) != nil { + return nil, err + } + if k8serrors.IsNotFound(err) { + err = bootstrap.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: ns, + Name: name, + }, dv) + if client.IgnoreNotFound(err) != nil { + return nil, err + } + if k8serrors.IsNotFound(err) { + dv = newDataDiskDV(bootstrap, name, nodeID, size, isOSDisk) + err = bootstrap.K8sClient.Create(ctx, dv) + if err != nil { + return nil, err + } + } + } + + return dv, nil +} + +func newDataDiskDV( + bootstrap *onPremiseBootstrap, + name, + nodeID string, + storageSize resource.Quantity, + isOSDisk bool, +) *cdiv1beta1.DataVolume { + dvSource := &cdiv1beta1.DataVolumeSource{} + + if isOSDisk { + dvSource.HTTP = &cdiv1beta1.DataVolumeSourceHTTP{URL: bootstrap.OnPremisesSpec.OSImageURL} + } else { + dvSource.Blank = &cdiv1beta1.DataVolumeBlankImage{} + } + + return &cdiv1beta1.DataVolume{ + TypeMeta: metav1.TypeMeta{ + Kind: models.DVKind, + APIVersion: models.CDIKubevirtV1beta1APIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: bootstrap.K8sObject.GetNamespace(), + Labels: map[string]string{ + models.ClusterIDLabel: bootstrap.ClusterID, + models.NodeIDLabel: nodeID, + }, + Finalizers: []string{models.DeletionFinalizer}, + }, + Spec: cdiv1beta1.DataVolumeSpec{ + Source: dvSource, + PVC: &k8scorev1.PersistentVolumeClaimSpec{ + AccessModes: []k8scorev1.PersistentVolumeAccessMode{ + k8scorev1.ReadWriteOnce, + }, + Resources: k8scorev1.ResourceRequirements{ + Requests: k8scorev1.ResourceList{ + models.Storage: storageSize, + }, + }, + StorageClassName: &bootstrap.OnPremisesSpec.StorageClassName, + }, + }, + } +} + +func reconcileIgnitionScriptSecret( + ctx context.Context, + bootstrap *onPremiseBootstrap, + nodeName, + nodeID, + nodeRack string, +) (string, error) { + ns := bootstrap.K8sObject.GetNamespace() + ignitionSecretName := fmt.Sprintf("%s-%s", models.IgnitionScriptSecretPrefix, nodeName) + ignitionSecret := &k8scorev1.Secret{} + err := bootstrap.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: ns, + Name: ignitionSecretName, + }, ignitionSecret) + if client.IgnoreNotFound(err) != nil { + return "", err + } + if k8serrors.IsNotFound(err) { + script, err := bootstrap.IcAdminAPI.GetIgnitionScript(nodeID) + if err != nil { + return "", err + } + + ignitionSecret = &k8scorev1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: models.SecretKind, + APIVersion: models.K8sAPIVersionV1, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: ignitionSecretName, + Namespace: ns, + Labels: map[string]string{ + models.ControlledByLabel: bootstrap.K8sObject.GetName(), + models.ClusterIDLabel: bootstrap.ClusterID, + models.NodeIDLabel: nodeID, + models.NodeRackLabel: nodeRack, + }, + Finalizers: []string{models.DeletionFinalizer}, + }, + StringData: map[string]string{ + models.Script: script, + }, + } + err = bootstrap.K8sClient.Create(ctx, ignitionSecret) + if err != nil { + return "", err + } + } + + return ignitionSecretName, nil +} + +func newVM( + ctx context.Context, + bootstrap *onPremiseBootstrap, + vmName, + nodeID, + nodeRack, + OSDiskDVName, + ignitionSecretName string, + cpu, + memory resource.Quantity, + storageDVNames ...string, +) (*virtcorev1.VirtualMachine, error) { + runStrategy := virtcorev1.RunStrategyAlways + bootOrder1 := uint(1) + + cloudInitSecret := &k8scorev1.Secret{} + err := bootstrap.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: bootstrap.OnPremisesSpec.CloudInitScriptRef.Namespace, + Name: bootstrap.OnPremisesSpec.CloudInitScriptRef.Name, + }, cloudInitSecret) + if err != nil { + return nil, err + } + + vm := &virtcorev1.VirtualMachine{ + TypeMeta: metav1.TypeMeta{ + Kind: models.VirtualMachineKind, + APIVersion: models.KubevirtV1APIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: vmName, + Namespace: bootstrap.K8sObject.GetNamespace(), + Labels: map[string]string{ + models.ClusterIDLabel: bootstrap.ClusterID, + models.NodeIDLabel: nodeID, + models.NodeRackLabel: nodeRack, + models.KubevirtDomainLabel: vmName, + }, + Finalizers: []string{models.DeletionFinalizer}, + }, + Spec: virtcorev1.VirtualMachineSpec{ + RunStrategy: &runStrategy, + Template: &virtcorev1.VirtualMachineInstanceTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + models.ClusterIDLabel: bootstrap.ClusterID, + models.NodeIDLabel: nodeID, + models.NodeRackLabel: nodeRack, + models.KubevirtDomainLabel: vmName, + }, + }, + Spec: virtcorev1.VirtualMachineInstanceSpec{ + Hostname: vmName, + Subdomain: fmt.Sprintf("%s-%s", models.KubevirtSubdomain, bootstrap.K8sObject.GetName()), + Domain: virtcorev1.DomainSpec{ + Resources: virtcorev1.ResourceRequirements{ + Requests: k8scorev1.ResourceList{ + models.CPU: cpu, + models.Memory: memory, + }, + }, + Devices: virtcorev1.Devices{ + Disks: []virtcorev1.Disk{ + { + Name: models.Boot, + BootOrder: &bootOrder1, + IO: models.Native, + Cache: models.None, + DiskDevice: virtcorev1.DiskDevice{ + Disk: &virtcorev1.DiskTarget{ + Bus: models.Virtio, + }, + }, + }, + { + Name: models.CloudInit, + DiskDevice: virtcorev1.DiskDevice{}, + Cache: models.None, + }, + { + Name: models.IgnitionDisk, + DiskDevice: virtcorev1.DiskDevice{}, + Serial: models.IgnitionSerial, + Cache: models.None, + }, + }, + Interfaces: []virtcorev1.Interface{ + { + Name: models.Default, + InterfaceBindingMethod: virtcorev1.InterfaceBindingMethod{ + Bridge: &virtcorev1.InterfaceBridge{}, + }, + }, + }, + }, + }, + Volumes: []virtcorev1.Volume{ + { + Name: models.Boot, + VolumeSource: virtcorev1.VolumeSource{ + PersistentVolumeClaim: &virtcorev1.PersistentVolumeClaimVolumeSource{ + PersistentVolumeClaimVolumeSource: k8scorev1.PersistentVolumeClaimVolumeSource{ + ClaimName: OSDiskDVName, + }, + }, + }, + }, + { + Name: models.CloudInit, + VolumeSource: virtcorev1.VolumeSource{ + CloudInitNoCloud: &virtcorev1.CloudInitNoCloudSource{ + UserDataSecretRef: &k8scorev1.LocalObjectReference{ + Name: bootstrap.OnPremisesSpec.CloudInitScriptRef.Name, + }, + }, + }, + }, + { + Name: models.IgnitionDisk, + VolumeSource: virtcorev1.VolumeSource{ + Secret: &virtcorev1.SecretVolumeSource{ + SecretName: ignitionSecretName, + }, + }, + }, + }, + Networks: []virtcorev1.Network{ + { + Name: models.Default, + NetworkSource: virtcorev1.NetworkSource{ + Pod: &virtcorev1.PodNetwork{}, + }, + }, + }, + }, + }, + }, + } + + for i, dvName := range storageDVNames { + diskName := fmt.Sprintf("%s-%d-%s", models.DataDisk, i, vm.Name) + + vm.Spec.Template.Spec.Domain.Devices.Disks = append(vm.Spec.Template.Spec.Domain.Devices.Disks, virtcorev1.Disk{ + Name: diskName, + IO: models.Native, + Cache: models.None, + DiskDevice: virtcorev1.DiskDevice{ + Disk: &virtcorev1.DiskTarget{ + Bus: models.Virtio, + }, + }, + Serial: models.DataDiskSerial, + }) + + vm.Spec.Template.Spec.Volumes = append(vm.Spec.Template.Spec.Volumes, virtcorev1.Volume{ + Name: diskName, + VolumeSource: virtcorev1.VolumeSource{ + PersistentVolumeClaim: &virtcorev1.PersistentVolumeClaimVolumeSource{ + PersistentVolumeClaimVolumeSource: k8scorev1.PersistentVolumeClaimVolumeSource{ + ClaimName: dvName, + }, + }, + }, + }) + } + + return vm, nil +} + +func newExposeService( + bootstrap *onPremiseBootstrap, + svcName, + vmName, + nodeID string, +) *k8scorev1.Service { + return &k8scorev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: models.ServiceKind, + APIVersion: models.K8sAPIVersionV1, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: svcName, + Namespace: bootstrap.K8sObject.GetNamespace(), + Labels: map[string]string{ + models.ClusterIDLabel: bootstrap.ClusterID, + models.NodeIDLabel: nodeID, + }, + Finalizers: []string{models.DeletionFinalizer}, + }, + Spec: k8scorev1.ServiceSpec{ + Ports: bootstrap.ExposeNodePorts, + Selector: map[string]string{ + models.KubevirtDomainLabel: vmName, + models.NodeIDLabel: nodeID, + }, + Type: models.LBType, + }, + } +} + +func newHeadlessService(bootstrap *onPremiseBootstrap, svcName string) *k8scorev1.Service { + return &k8scorev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: models.ServiceKind, + APIVersion: models.K8sAPIVersionV1, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: svcName, + Namespace: bootstrap.K8sObject.GetNamespace(), + Labels: map[string]string{ + models.ClusterIDLabel: bootstrap.ClusterID, + }, + //Finalizers: []string{models.DeletionFinalizer}, + }, + Spec: k8scorev1.ServiceSpec{ + ClusterIP: "None", + Ports: bootstrap.HeadlessPorts, + Selector: map[string]string{ + models.ClusterIDLabel: bootstrap.ClusterID, + }, + }, + } +} + +func reconcileOnPremResources(ctx context.Context, bootstrap *onPremiseBootstrap) error { + if bootstrap.PrivateNetworkCluster { + err := reconcileSSHGatewayResources(ctx, bootstrap) + if err != nil { + return err + } + } + + err := reconcileNodesResources(ctx, bootstrap) + if err != nil { + return err + } + + return nil +} + +func reconcileSSHGatewayResources(ctx context.Context, bootstrap *onPremiseBootstrap) error { + gateways, err := bootstrap.IcAdminAPI.GetGateways(bootstrap.CdcID) + if err != nil { + return err + } + + for i, gateway := range gateways { + gatewayDVSize, err := resource.ParseQuantity(bootstrap.OnPremisesSpec.OSDiskSize) + if err != nil { + return err + } + + gatewayDVName := fmt.Sprintf("%s-%d-%s", models.GatewayDVPrefix, i, strings.ToLower(bootstrap.K8sObject.GetName())) + gatewayDV, err := createDV(ctx, bootstrap, gatewayDVName, gateway.ID, gatewayDVSize, true) + if err != nil { + return err + } + + gatewayCPU := resource.Quantity{} + gatewayCPU.Set(bootstrap.OnPremisesSpec.SSHGatewayCPU) + + gatewayMemory, err := resource.ParseQuantity(bootstrap.OnPremisesSpec.SSHGatewayMemory) + if err != nil { + return err + } + + gatewayVMName := fmt.Sprintf("%s-%d-%s", models.GatewayVMPrefix, i, strings.ToLower(bootstrap.K8sObject.GetName())) + secretName, err := reconcileIgnitionScriptSecret( + ctx, + bootstrap, + gatewayVMName, + gateway.ID, + gateway.Rack) + if err != nil { + return err + } + + gatewayVM := &virtcorev1.VirtualMachine{} + err = bootstrap.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: bootstrap.K8sObject.GetNamespace(), + Name: gatewayVMName, + }, gatewayVM) + if client.IgnoreNotFound(err) != nil { + return err + } + if k8serrors.IsNotFound(err) { + gatewayVM, err = newVM( + ctx, + bootstrap, + gatewayVMName, + gateway.ID, + gateway.Rack, + gatewayDV.Name, + secretName, + gatewayCPU, + gatewayMemory) + if err != nil { + return err + } + err = bootstrap.K8sClient.Create(ctx, gatewayVM) + if err != nil { + return err + } + } + + gatewaySvcName := fmt.Sprintf("%s-%s", models.GatewaySvcPrefix, gatewayVMName) + gatewayExposeService := &k8scorev1.Service{} + err = bootstrap.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: bootstrap.K8sObject.GetNamespace(), + Name: gatewaySvcName, + }, gatewayExposeService) + if client.IgnoreNotFound(err) != nil { + return err + } + if k8serrors.IsNotFound(err) { + gatewayExposeService = newExposeService(bootstrap, gatewaySvcName, gatewayVMName, gateway.ID) + + err = bootstrap.K8sClient.Create(ctx, gatewayExposeService) + if err != nil { + return err + } + } + } + + return nil +} + +func reconcileNodesResources(ctx context.Context, bootstrap *onPremiseBootstrap) error { + nodes, err := bootstrap.IcAdminAPI.GetOnPremisesNodes(bootstrap.ClusterID) + if err != nil { + return err + } + + for i, node := range nodes { + nodeOSDiskSize, err := resource.ParseQuantity(bootstrap.OnPremisesSpec.OSDiskSize) + if err != nil { + return err + } + + clusterName := strings.ToLower(bootstrap.K8sObject.GetName()) + nodeOSDiskDVName := fmt.Sprintf("%s-%d-%s", models.NodeOSDVPrefix, i, clusterName) + nodeOSDV, err := createDV(ctx, bootstrap, nodeOSDiskDVName, node.ID, nodeOSDiskSize, true) + if err != nil { + return err + } + + nodeDataDiskDVSize, err := resource.ParseQuantity(bootstrap.OnPremisesSpec.DataDiskSize) + if err != nil { + return err + } + + nodeDataDiskDVName := fmt.Sprintf("%s-%d-%s", models.NodeDVPrefix, i, clusterName) + nodeDataDV, err := createDV(ctx, bootstrap, nodeDataDiskDVName, node.ID, nodeDataDiskDVSize, false) + if err != nil { + return err + } + + nodeCPU := resource.Quantity{} + nodeCPU.Set(bootstrap.OnPremisesSpec.NodeCPU) + + nodeMemory, err := resource.ParseQuantity(bootstrap.OnPremisesSpec.NodeMemory) + if err != nil { + return err + } + + nodeName := fmt.Sprintf("%s-%d-%s", models.NodeVMPrefix, i, clusterName) + + secretName, err := reconcileIgnitionScriptSecret(ctx, bootstrap, nodeName, node.ID, node.Rack) + if err != nil { + return err + } + + ns := bootstrap.K8sObject.GetNamespace() + nodeVM := &virtcorev1.VirtualMachine{} + err = bootstrap.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: ns, + Name: nodeName, + }, nodeVM) + if client.IgnoreNotFound(err) != nil { + return err + } + if k8serrors.IsNotFound(err) { + nodeVM, err = newVM( + ctx, + bootstrap, + nodeName, + node.ID, + node.Rack, + nodeOSDV.Name, + secretName, + nodeCPU, + nodeMemory, + nodeDataDV.Name) + if err != nil { + return err + } + err = bootstrap.K8sClient.Create(ctx, nodeVM) + if err != nil { + return err + } + } + + if !bootstrap.PrivateNetworkCluster { + nodeExposeName := fmt.Sprintf("%s-%s", models.NodeSvcPrefix, nodeName) + nodeExposeService := &k8scorev1.Service{} + err = bootstrap.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: ns, + Name: nodeExposeName, + }, nodeExposeService) + if client.IgnoreNotFound(err) != nil { + return err + } + if k8serrors.IsNotFound(err) { + nodeExposeService = newExposeService(bootstrap, nodeExposeName, nodeName, node.ID) + err = bootstrap.K8sClient.Create(ctx, nodeExposeService) + if err != nil { + return err + } + } + } + + headlessServiceName := fmt.Sprintf("%s-%s", models.KubevirtSubdomain, clusterName) + headlessSVC := &k8scorev1.Service{} + err = bootstrap.K8sClient.Get(ctx, types.NamespacedName{ + Namespace: ns, + Name: headlessServiceName, + }, headlessSVC) + if client.IgnoreNotFound(err) != nil { + return err + } + if k8serrors.IsNotFound(err) { + headlessSVC = newHeadlessService(bootstrap, headlessServiceName) + err = bootstrap.K8sClient.Create(ctx, headlessSVC) + if err != nil { + return err + } + } + + } + return nil +} + +func deleteOnPremResources(ctx context.Context, K8sClient client.Client, clusterID, namespace string) error { + vms := &virtcorev1.VirtualMachineList{} + err := K8sClient.List(ctx, vms, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.ClusterIDLabel: clusterID, + }), + Namespace: namespace, + }) + if err != nil { + return err + } + + for _, vm := range vms.Items { + err = K8sClient.Delete(ctx, &vm) + if err != nil { + return err + } + + patch := client.MergeFrom(vm.DeepCopy()) + controllerutil.RemoveFinalizer(&vm, models.DeletionFinalizer) + err = K8sClient.Patch(ctx, &vm, patch) + if err != nil { + return err + } + } + + vmis := &virtcorev1.VirtualMachineInstanceList{} + err = K8sClient.List(ctx, vmis, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.ClusterIDLabel: clusterID, + }), + Namespace: namespace, + }) + if err != nil { + return err + } + + for _, vmi := range vmis.Items { + err = K8sClient.Delete(ctx, &vmi) + if err != nil { + return err + } + + patch := client.MergeFrom(vmi.DeepCopy()) + controllerutil.RemoveFinalizer(&vmi, models.DeletionFinalizer) + err = K8sClient.Patch(ctx, &vmi, patch) + if err != nil { + return err + } + } + + dvs := &cdiv1beta1.DataVolumeList{} + err = K8sClient.List(ctx, dvs, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.ClusterIDLabel: clusterID, + }), + Namespace: namespace, + }) + if err != nil { + return err + } + + for _, dv := range dvs.Items { + err = K8sClient.Delete(ctx, &dv) + if err != nil { + return err + } + + patch := client.MergeFrom(dv.DeepCopy()) + controllerutil.RemoveFinalizer(&dv, models.DeletionFinalizer) + err = K8sClient.Patch(ctx, &dv, patch) + if err != nil { + return err + } + } + + svcs := &k8scorev1.ServiceList{} + err = K8sClient.List(ctx, svcs, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.ClusterIDLabel: clusterID, + }), + Namespace: namespace, + }) + if err != nil { + return err + } + + for _, svc := range svcs.Items { + err = K8sClient.Delete(ctx, &svc) + if err != nil { + return err + } + + patch := client.MergeFrom(svc.DeepCopy()) + controllerutil.RemoveFinalizer(&svc, models.DeletionFinalizer) + err = K8sClient.Patch(ctx, &svc, patch) + if err != nil { + return err + } + } + + secrets := &k8scorev1.SecretList{} + err = K8sClient.List(ctx, secrets, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.ClusterIDLabel: clusterID, + }), + Namespace: namespace, + }) + if err != nil { + return err + } + + for _, secret := range secrets.Items { + err = K8sClient.Delete(ctx, &secret) + if err != nil { + return err + } + + patch := client.MergeFrom(secret.DeepCopy()) + controllerutil.RemoveFinalizer(&secret, models.DeletionFinalizer) + err = K8sClient.Patch(ctx, &secret, patch) + if err != nil { + return err + } + } + + return nil +} diff --git a/main.go b/main.go index 88ff4173c..404f5fb82 100644 --- a/main.go +++ b/main.go @@ -138,7 +138,7 @@ func main() { Client: mgr.GetClient(), Scheme: mgr.GetScheme(), API: instaClient, - IcadminAPI: icadminClient, + IcAdminAPI: icadminClient, Scheduler: s, EventRecorder: eventRecorder, }).SetupWithManager(mgr); err != nil {