diff --git a/apis/clusters/v1beta1/cassandra_types.go b/apis/clusters/v1beta1/cassandra_types.go index 1fc6da1fb..1b52bb732 100644 --- a/apis/clusters/v1beta1/cassandra_types.go +++ b/apis/clusters/v1beta1/cassandra_types.go @@ -70,18 +70,6 @@ type CassandraSpec struct { ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` } -type OnPremisesSpec struct { - StorageClassName string `json:"storageClassName"` - OSDiskSize string `json:"osDiskSize"` - DataDiskSize string `json:"dataDiskSize"` - SSHGatewayCPU int64 `json:"sshGatewayCPU,omitempty"` - SSHGatewayMemory string `json:"sshGatewayMemory,omitempty"` - NodeCPU int64 `json:"nodeCPU"` - NodeMemory string `json:"nodeMemory"` - OSImageURL string `json:"osImageURL"` - CloudInitScriptRef *NamespacedName `json:"cloudInitScriptRef"` -} - // CassandraStatus defines the observed state of Cassandra type CassandraStatus struct { ClusterStatus `json:",inline"` diff --git a/apis/clusters/v1beta1/structs.go b/apis/clusters/v1beta1/structs.go index a8f811b6c..49fa7abe9 100644 --- a/apis/clusters/v1beta1/structs.go +++ b/apis/clusters/v1beta1/structs.go @@ -765,6 +765,18 @@ func (cs *ClusterStatus) PrivateLinkStatusesEqual(iStatus *ClusterStatus) bool { return true } +type OnPremisesSpec struct { + StorageClassName string `json:"storageClassName"` + OSDiskSize string `json:"osDiskSize"` + DataDiskSize string `json:"dataDiskSize"` + SSHGatewayCPU int64 `json:"sshGatewayCPU,omitempty"` + SSHGatewayMemory string `json:"sshGatewayMemory,omitempty"` + NodeCPU int64 `json:"nodeCPU"` + NodeMemory string `json:"nodeMemory"` + OSImageURL string `json:"osImageURL"` + CloudInitScriptRef *NamespacedName `json:"cloudInitScriptRef"` +} + type UserReference struct { Namespace string `json:"namespace"` Name string `json:"name"` diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index 1d102e3d2..4a77c8175 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -22,9 +22,7 @@ import ( "strconv" "github.com/go-logr/logr" - k8scorev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -326,6 +324,7 @@ func (r *CassandraReconciler) handleCreateOnPremisesCluster( bootstrap := newOnPremiseBootstrap( r.IcAdminAPI, r.Client, + r.EventRecorder, cassandra, iCassandra.Status.ID, iCassandra.Status.DataCentres[0].ID, @@ -366,7 +365,10 @@ func (r *CassandraReconciler) handleCreateOnPremisesCluster( "Cluster status check job is started", ) - err = r.startClusterOnPremisesIPsJob(iCassandra) + err = r.Scheduler.ScheduleJob( + cassandra.GetJobID(scheduler.OnPremisesIPsChecker), + scheduler.ClusterStatusInterval, + newWatchOnPremisesIPsJob(bootstrap)) if err != nil { l.Error(err, "Cannot start cluster on-premises IPs job", "cassandra cluster ID", cassandra.Status.ID) @@ -1023,235 +1025,6 @@ func (r *CassandraReconciler) startUsersCreationJob(cluster *v1beta1.Cassandra) return nil } -func (r *CassandraReconciler) startClusterOnPremisesIPsJob(cluster *v1beta1.Cassandra) error { - job := r.newWatchOnPremisesIPsJob(cluster) - - err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job) - if err != nil { - return err - } - - return nil -} - -func (r *CassandraReconciler) newWatchOnPremisesIPsJob(c *v1beta1.Cassandra) scheduler.Job { - l := log.Log.WithValues("component", "cassandraOnPremStatusClusterJob") - return func() error { - if c.Spec.PrivateNetworkCluster { - gateways, err := r.IcAdminAPI.GetGateways(c.Status.DataCentres[0].ID) - if err != nil { - l.Error(err, "Cannot get Cassandra SSH-gateway nodes from the Instaclustr API", - "cluster name", c.Spec.Name, - "status", c.Status) - r.EventRecorder.Eventf( - c, models.Warning, models.FetchFailed, - "SSH-gateway nodes fetch from the Instaclustr API is failed. Reason: %v", - err, - ) - return err - } - - for _, gateway := range gateways { - gatewayPods := &k8scorev1.PodList{} - err = r.List(context.Background(), gatewayPods, &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{ - models.ClusterIDLabel: c.Status.ID, - models.NodeIDLabel: gateway.ID, - }), - Namespace: c.Namespace, - }) - if err != nil { - l.Error(err, "Cannot list SSH-gateway pods", - "cluster name", c.Spec.Name, - "clusterID", c.Status.ID, - ) - - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Fetching SSH-gateways is failed. Reason: %v", - err, - ) - return err - } - - for _, pod := range gatewayPods.Items { - if (pod.Status.PodIP != "" && gateway.PrivateAddress == "") || - (pod.Status.PodIP != "" && pod.Status.PodIP != gateway.PrivateAddress) { - err = r.IcAdminAPI.SetPrivateGatewayIP(c.Status.DataCentres[0].ID, pod.Status.PodIP) - if err != nil { - l.Error(err, "Cannot set Private IP for the SSH-gateway node", - "cluster name", c.Spec.Name, - "clusterID", c.Status.ID, - ) - - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Setting Private IP for the SSH-gateway node is failed. Reason: %v", - err, - ) - return err - } - } - } - - gatewaySVCs := &k8scorev1.ServiceList{} - err = r.List(context.Background(), gatewaySVCs, &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{ - models.ClusterIDLabel: c.Status.ID, - models.NodeIDLabel: gateway.ID, - }), - Namespace: c.Namespace, - }) - if err != nil { - l.Error(err, "Cannot get services backed by SSH-gateway pods", - "cluster name", c.Spec.Name, - "clusterID", c.Status.ID, - ) - - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Fetching services backed by SSH-gateway pods is failed. Reason: %v", - err, - ) - return err - } - - for _, svc := range gatewaySVCs.Items { - if (svc.Status.LoadBalancer.Ingress[0].IP != "" && gateway.PublicAddress == "") || - (svc.Status.LoadBalancer.Ingress[0].IP != gateway.PublicAddress) { - err = r.IcAdminAPI.SetPublicGatewayIP(c.Status.DataCentres[0].ID, svc.Status.LoadBalancer.Ingress[0].IP) - if err != nil { - l.Error(err, "Cannot set Public IP for the SSH-gateway node", - "cluster name", c.Spec.Name, - "clusterID", c.Status.ID, - ) - - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Setting Public IP for the SSH-gateway node is failed. Reason: %v", - err, - ) - return err - } - - l.Info("IPs for on-premises cluster ssh-gateway are set", - "cluster name", c.Spec.Name, - "clusterID", c.Status.ID, - ) - - r.EventRecorder.Eventf( - c, models.Normal, models.Created, - "SSH-gateway IPs are set", - ) - } - } - } - } - - request := &v1beta1.OnPremiseNode{} - nodes, err := r.IcAdminAPI.GetOnPremisesNodes(c.Status.ID) - if err != nil { - l.Error(err, "Cannot get Cassandra on-premises nodes from the Instaclustr API", - "cluster name", c.Spec.Name, - "status", c.Status) - r.EventRecorder.Eventf( - c, models.Warning, models.FetchFailed, - "On-premises nodes fetch from the Instaclustr API is failed. Reason: %v", - err, - ) - return err - } - - for _, node := range nodes { - nodePods := &k8scorev1.PodList{} - err = r.List(context.Background(), nodePods, &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{ - models.ClusterIDLabel: c.Status.ID, - models.NodeIDLabel: node.ID, - }), - Namespace: c.Namespace, - }) - if err != nil { - l.Error(err, "Cannot get on-premises cluster pods", - "cluster name", c.Spec.Name, - "clusterID", c.Status.ID, - ) - - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Fetching on-premises cluster pods is failed. Reason: %v", - err, - ) - return err - } - - nodeSVCs := &k8scorev1.ServiceList{} - err = r.List(context.Background(), nodeSVCs, &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{ - models.ClusterIDLabel: c.Status.ID, - models.NodeIDLabel: node.ID, - }), - Namespace: c.Namespace, - }) - if err != nil { - l.Error(err, "Cannot get services backed by on-premises cluster pods", - "cluster name", c.Spec.Name, - "clusterID", c.Status.ID, - ) - - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Fetching services backed by on-premises cluster pods is failed. Reason: %v", - err, - ) - return err - } - - for _, pod := range nodePods.Items { - if (pod.Status.PodIP != "" && node.PrivateAddress == "") || - (pod.Status.PodIP != "" && pod.Status.PodIP != node.PrivateAddress) { - request.PrivateAddress = pod.Status.PodIP - } - } - - for _, svc := range nodeSVCs.Items { - if (svc.Status.LoadBalancer.Ingress[0].IP != "" && node.PublicAddress == "") || - (svc.Status.LoadBalancer.Ingress[0].IP != node.PublicAddress) { - request.PublicAddress = svc.Status.LoadBalancer.Ingress[0].IP - } - } - - if request.PublicAddress != "" || request.PrivateAddress != "" { - err = r.IcAdminAPI.SetNodeIPs(node.ID, request) - if err != nil { - l.Error(err, "Cannot set IPs for on-premises cluster nodes", - "cluster name", c.Spec.Name, - "clusterID", c.Status.ID, - ) - - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Setting IPs for on-premises cluster nodes is failed. Reason: %v", - err, - ) - return err - } - - l.Info("IPs for on-premises cluster node are set", - "cluster name", c.Spec.Name, - "clusterID", c.Status.ID, - "request", request, - ) - r.EventRecorder.Eventf( - c, models.Normal, models.Created, - "Nodes IPs are set", - ) - } - } - return nil - } -} - func (r *CassandraReconciler) newWatchStatusJob(cassandra *v1beta1.Cassandra) scheduler.Job { l := log.Log.WithValues("component", "CassandraStatusClusterJob") return func() error { diff --git a/controllers/clusters/on_premise.go b/controllers/clusters/on_premise.go index a8227ad56..11f2c49c5 100644 --- a/controllers/clusters/on_premise.go +++ b/controllers/clusters/on_premise.go @@ -11,24 +11,28 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" virtcorev1 "kubevirt.io/api/core/v1" cdiv1beta1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/scheduler" ) type onPremiseBootstrap struct { IcAdminAPI instaclustr.IcadminAPI K8sClient client.Client + EventRecorder record.EventRecorder K8sObject client.Object ClusterID string CdcID string OnPremisesSpec *v1beta1.OnPremisesSpec - ExposeNodePorts []k8scorev1.ServicePort + ExposePorts []k8scorev1.ServicePort HeadlessPorts []k8scorev1.ServicePort PrivateNetworkCluster bool } @@ -36,6 +40,7 @@ type onPremiseBootstrap struct { func newOnPremiseBootstrap( icAdminAPI instaclustr.IcadminAPI, k8sClient client.Client, + e record.EventRecorder, o client.Object, clusterID, cdcID string, @@ -47,11 +52,12 @@ func newOnPremiseBootstrap( return &onPremiseBootstrap{ IcAdminAPI: icAdminAPI, K8sClient: k8sClient, + EventRecorder: e, K8sObject: o, ClusterID: clusterID, CdcID: cdcID, OnPremisesSpec: onPremisesSpec, - ExposeNodePorts: exposePorts, + ExposePorts: exposePorts, HeadlessPorts: headlessPorts, PrivateNetworkCluster: privateNetworkCluster, } @@ -383,7 +389,7 @@ func newExposeService( Finalizers: []string{models.DeletionFinalizer}, }, Spec: k8scorev1.ServiceSpec{ - Ports: bootstrap.ExposeNodePorts, + Ports: bootstrap.ExposePorts, Selector: map[string]string{ models.KubevirtDomainLabel: vmName, models.NodeIDLabel: nodeID, @@ -763,3 +769,179 @@ func deleteOnPremResources(ctx context.Context, K8sClient client.Client, cluster return nil } + +func newWatchOnPremisesIPsJob(b *onPremiseBootstrap) scheduler.Job { + l := log.Log.WithValues("component", "cassandraOnPremStatusClusterJob") + return func() error { + if b.PrivateNetworkCluster { + gateways, err := b.IcAdminAPI.GetGateways(b.CdcID) + if err != nil { + l.Error(err, "Cannot get Cassandra SSH-gateway nodes from the Instaclustr API", + "cluster name", b.K8sObject.GetName(), + "data centre ID", b.CdcID) + b.EventRecorder.Eventf(b.K8sObject, models.Warning, models.FetchFailed, + "SSH-gateway nodes fetch from the Instaclustr API is failed. Reason: %v", err) + return err + } + + for _, gateway := range gateways { + gatewayPods := &k8scorev1.PodList{} + err = b.K8sClient.List(context.Background(), gatewayPods, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.ClusterIDLabel: b.ClusterID, + models.NodeIDLabel: gateway.ID, + }), + Namespace: b.K8sObject.GetNamespace(), + }) + if err != nil { + l.Error(err, "Cannot list SSH-gateway pods", + "cluster name", b.K8sObject.GetName(), + "clusterID", b.ClusterID) + + b.EventRecorder.Eventf(b.K8sObject, models.Warning, models.CreationFailed, + "Fetching SSH-gateways is failed. Reason: %v", err) + return err + } + + for _, pod := range gatewayPods.Items { + if (pod.Status.PodIP != "" && gateway.PrivateAddress == "") || + (pod.Status.PodIP != "" && pod.Status.PodIP != gateway.PrivateAddress) { + err = b.IcAdminAPI.SetPrivateGatewayIP(b.CdcID, pod.Status.PodIP) + if err != nil { + l.Error(err, "Cannot set Private IP for the SSH-gateway node", + "cluster name", b.K8sObject.GetName(), + "clusterID", b.ClusterID) + + b.EventRecorder.Eventf(b.K8sObject, models.Warning, models.CreationFailed, + "Setting Private IP for the SSH-gateway node is failed. Reason: %v", err) + return err + } + } + } + + gatewaySVCs := &k8scorev1.ServiceList{} + err = b.K8sClient.List(context.Background(), gatewaySVCs, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.ClusterIDLabel: b.ClusterID, + models.NodeIDLabel: gateway.ID, + }), + Namespace: b.K8sObject.GetNamespace(), + }) + if err != nil { + l.Error(err, "Cannot get services backed by SSH-gateway pods", + "cluster name", b.K8sObject.GetName(), + "clusterID", b.ClusterID) + b.EventRecorder.Eventf(b.K8sObject, models.Warning, models.CreationFailed, + "Fetching services backed by SSH-gateway pods is failed. Reason: %v", err) + return err + } + + for _, svc := range gatewaySVCs.Items { + if (svc.Status.LoadBalancer.Ingress[0].IP != "" && gateway.PublicAddress == "") || + (svc.Status.LoadBalancer.Ingress[0].IP != gateway.PublicAddress) { + err = b.IcAdminAPI.SetPublicGatewayIP(b.CdcID, svc.Status.LoadBalancer.Ingress[0].IP) + if err != nil { + l.Error(err, "Cannot set Public IP for the SSH-gateway node", + "cluster name", b.K8sObject.GetName(), + "clusterID", b.ClusterID) + + b.EventRecorder.Eventf(b.K8sObject, models.Warning, models.CreationFailed, + "Setting Public IP for the SSH-gateway node is failed. Reason: %v", err) + return err + } + + l.Info("IPs for on-premises cluster ssh-gateway are set", + "cluster name", b.K8sObject.GetName(), + "clusterID", b.ClusterID) + + b.EventRecorder.Eventf(b.K8sObject, models.Normal, models.Created, "SSH-gateway IPs are set") + } + } + } + } + + request := &v1beta1.OnPremiseNode{} + nodes, err := b.IcAdminAPI.GetOnPremisesNodes(b.ClusterID) + if err != nil { + l.Error(err, "Cannot get Cassandra on-premises nodes from the Instaclustr API", + "cluster name", b.K8sObject.GetName(), + "clusterID", b.ClusterID) + b.EventRecorder.Eventf(b.K8sObject, models.Warning, models.FetchFailed, + "On-premises nodes fetch from the Instaclustr API is failed. Reason: %v", err) + return err + } + + for _, node := range nodes { + nodePods := &k8scorev1.PodList{} + err = b.K8sClient.List(context.Background(), nodePods, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.ClusterIDLabel: b.ClusterID, + models.NodeIDLabel: node.ID, + }), + Namespace: b.K8sObject.GetNamespace(), + }) + if err != nil { + l.Error(err, "Cannot get on-premises cluster pods", + "cluster name", b.K8sObject.GetName(), + "clusterID", b.ClusterID) + + b.EventRecorder.Eventf(b.K8sObject, models.Warning, models.CreationFailed, + "Fetching on-premises cluster pods is failed. Reason: %v", err) + return err + } + + nodeSVCs := &k8scorev1.ServiceList{} + err = b.K8sClient.List(context.Background(), nodeSVCs, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + models.ClusterIDLabel: b.ClusterID, + models.NodeIDLabel: node.ID, + }), + Namespace: b.K8sObject.GetNamespace(), + }) + if err != nil { + l.Error(err, "Cannot get services backed by on-premises cluster pods", + "cluster name", b.K8sObject.GetName(), + "clusterID", b.ClusterID) + + b.EventRecorder.Eventf(b.K8sObject, models.Warning, models.CreationFailed, + "Fetching services backed by on-premises cluster pods is failed. Reason: %v", err) + return err + } + + for _, pod := range nodePods.Items { + if (pod.Status.PodIP != "" && node.PrivateAddress == "") || + (pod.Status.PodIP != "" && pod.Status.PodIP != node.PrivateAddress) { + request.PrivateAddress = pod.Status.PodIP + } + } + + for _, svc := range nodeSVCs.Items { + if (svc.Status.LoadBalancer.Ingress[0].IP != "" && node.PublicAddress == "") || + (svc.Status.LoadBalancer.Ingress[0].IP != node.PublicAddress) { + request.PublicAddress = svc.Status.LoadBalancer.Ingress[0].IP + } + } + + if request.PublicAddress != "" || request.PrivateAddress != "" { + err = b.IcAdminAPI.SetNodeIPs(node.ID, request) + if err != nil { + l.Error(err, "Cannot set IPs for on-premises cluster nodes", + "cluster name", b.K8sObject.GetName(), + "clusterID", b.ClusterID) + + b.EventRecorder.Eventf(b.K8sObject, models.Warning, models.CreationFailed, + "Setting IPs for on-premises cluster nodes is failed. Reason: %v", err) + return err + } + + l.Info("IPs for on-premises cluster node are set", + "cluster name", b.K8sObject.GetName(), + "clusterID", b.ClusterID, + "request", request) + b.EventRecorder.Eventf(b.K8sObject, models.Normal, models.Created, "Nodes IPs are set") + } + } + + return nil + } +}