Skip to content

Commit

Permalink
generic on-premises job.
Browse files Browse the repository at this point in the history
  • Loading branch information
ribaraka committed Nov 14, 2023
1 parent eaf4f64 commit 705e698
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 247 deletions.
12 changes: 0 additions & 12 deletions apis/clusters/v1beta1/cassandra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,6 @@ type CassandraSpec struct {
ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"`
}

type OnPremisesSpec struct {
StorageClassName string `json:"storageClassName"`
OSDiskSize string `json:"osDiskSize"`
DataDiskSize string `json:"dataDiskSize"`
SSHGatewayCPU int64 `json:"sshGatewayCPU,omitempty"`
SSHGatewayMemory string `json:"sshGatewayMemory,omitempty"`
NodeCPU int64 `json:"nodeCPU"`
NodeMemory string `json:"nodeMemory"`
OSImageURL string `json:"osImageURL"`
CloudInitScriptRef *NamespacedName `json:"cloudInitScriptRef"`
}

// CassandraStatus defines the observed state of Cassandra
type CassandraStatus struct {
ClusterStatus `json:",inline"`
Expand Down
12 changes: 12 additions & 0 deletions apis/clusters/v1beta1/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,18 @@ func (cs *ClusterStatus) PrivateLinkStatusesEqual(iStatus *ClusterStatus) bool {
return true
}

type OnPremisesSpec struct {
StorageClassName string `json:"storageClassName"`
OSDiskSize string `json:"osDiskSize"`
DataDiskSize string `json:"dataDiskSize"`
SSHGatewayCPU int64 `json:"sshGatewayCPU,omitempty"`
SSHGatewayMemory string `json:"sshGatewayMemory,omitempty"`
NodeCPU int64 `json:"nodeCPU"`
NodeMemory string `json:"nodeMemory"`
OSImageURL string `json:"osImageURL"`
CloudInitScriptRef *NamespacedName `json:"cloudInitScriptRef"`
}

type UserReference struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
Expand Down
237 changes: 5 additions & 232 deletions controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import (
"strconv"

"github.com/go-logr/logr"
k8scorev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -326,6 +324,7 @@ func (r *CassandraReconciler) handleCreateOnPremisesCluster(
bootstrap := newOnPremiseBootstrap(
r.IcAdminAPI,
r.Client,
r.EventRecorder,
cassandra,
iCassandra.Status.ID,
iCassandra.Status.DataCentres[0].ID,
Expand Down Expand Up @@ -366,7 +365,10 @@ func (r *CassandraReconciler) handleCreateOnPremisesCluster(
"Cluster status check job is started",
)

err = r.startClusterOnPremisesIPsJob(iCassandra)
err = r.Scheduler.ScheduleJob(
cassandra.GetJobID(scheduler.OnPremisesIPsChecker),
scheduler.ClusterStatusInterval,
newWatchOnPremisesIPsJob(bootstrap))
if err != nil {
l.Error(err, "Cannot start cluster on-premises IPs job",
"cassandra cluster ID", cassandra.Status.ID)
Expand Down Expand Up @@ -1023,235 +1025,6 @@ func (r *CassandraReconciler) startUsersCreationJob(cluster *v1beta1.Cassandra)
return nil
}

func (r *CassandraReconciler) startClusterOnPremisesIPsJob(cluster *v1beta1.Cassandra) error {
job := r.newWatchOnPremisesIPsJob(cluster)

err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.OnPremisesIPsChecker), scheduler.ClusterStatusInterval, job)
if err != nil {
return err
}

return nil
}

func (r *CassandraReconciler) newWatchOnPremisesIPsJob(c *v1beta1.Cassandra) scheduler.Job {
l := log.Log.WithValues("component", "cassandraOnPremStatusClusterJob")
return func() error {
if c.Spec.PrivateNetworkCluster {
gateways, err := r.IcAdminAPI.GetGateways(c.Status.DataCentres[0].ID)
if err != nil {
l.Error(err, "Cannot get Cassandra SSH-gateway nodes from the Instaclustr API",
"cluster name", c.Spec.Name,
"status", c.Status)
r.EventRecorder.Eventf(
c, models.Warning, models.FetchFailed,
"SSH-gateway nodes fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return err
}

for _, gateway := range gateways {
gatewayPods := &k8scorev1.PodList{}
err = r.List(context.Background(), gatewayPods, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
models.ClusterIDLabel: c.Status.ID,
models.NodeIDLabel: gateway.ID,
}),
Namespace: c.Namespace,
})
if err != nil {
l.Error(err, "Cannot list SSH-gateway pods",
"cluster name", c.Spec.Name,
"clusterID", c.Status.ID,
)

r.EventRecorder.Eventf(
c, models.Warning, models.CreationFailed,
"Fetching SSH-gateways is failed. Reason: %v",
err,
)
return err
}

for _, pod := range gatewayPods.Items {
if (pod.Status.PodIP != "" && gateway.PrivateAddress == "") ||
(pod.Status.PodIP != "" && pod.Status.PodIP != gateway.PrivateAddress) {
err = r.IcAdminAPI.SetPrivateGatewayIP(c.Status.DataCentres[0].ID, pod.Status.PodIP)
if err != nil {
l.Error(err, "Cannot set Private IP for the SSH-gateway node",
"cluster name", c.Spec.Name,
"clusterID", c.Status.ID,
)

r.EventRecorder.Eventf(
c, models.Warning, models.CreationFailed,
"Setting Private IP for the SSH-gateway node is failed. Reason: %v",
err,
)
return err
}
}
}

gatewaySVCs := &k8scorev1.ServiceList{}
err = r.List(context.Background(), gatewaySVCs, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
models.ClusterIDLabel: c.Status.ID,
models.NodeIDLabel: gateway.ID,
}),
Namespace: c.Namespace,
})
if err != nil {
l.Error(err, "Cannot get services backed by SSH-gateway pods",
"cluster name", c.Spec.Name,
"clusterID", c.Status.ID,
)

r.EventRecorder.Eventf(
c, models.Warning, models.CreationFailed,
"Fetching services backed by SSH-gateway pods is failed. Reason: %v",
err,
)
return err
}

for _, svc := range gatewaySVCs.Items {
if (svc.Status.LoadBalancer.Ingress[0].IP != "" && gateway.PublicAddress == "") ||
(svc.Status.LoadBalancer.Ingress[0].IP != gateway.PublicAddress) {
err = r.IcAdminAPI.SetPublicGatewayIP(c.Status.DataCentres[0].ID, svc.Status.LoadBalancer.Ingress[0].IP)
if err != nil {
l.Error(err, "Cannot set Public IP for the SSH-gateway node",
"cluster name", c.Spec.Name,
"clusterID", c.Status.ID,
)

r.EventRecorder.Eventf(
c, models.Warning, models.CreationFailed,
"Setting Public IP for the SSH-gateway node is failed. Reason: %v",
err,
)
return err
}

l.Info("IPs for on-premises cluster ssh-gateway are set",
"cluster name", c.Spec.Name,
"clusterID", c.Status.ID,
)

r.EventRecorder.Eventf(
c, models.Normal, models.Created,
"SSH-gateway IPs are set",
)
}
}
}
}

request := &v1beta1.OnPremiseNode{}
nodes, err := r.IcAdminAPI.GetOnPremisesNodes(c.Status.ID)
if err != nil {
l.Error(err, "Cannot get Cassandra on-premises nodes from the Instaclustr API",
"cluster name", c.Spec.Name,
"status", c.Status)
r.EventRecorder.Eventf(
c, models.Warning, models.FetchFailed,
"On-premises nodes fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return err
}

for _, node := range nodes {
nodePods := &k8scorev1.PodList{}
err = r.List(context.Background(), nodePods, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
models.ClusterIDLabel: c.Status.ID,
models.NodeIDLabel: node.ID,
}),
Namespace: c.Namespace,
})
if err != nil {
l.Error(err, "Cannot get on-premises cluster pods",
"cluster name", c.Spec.Name,
"clusterID", c.Status.ID,
)

r.EventRecorder.Eventf(
c, models.Warning, models.CreationFailed,
"Fetching on-premises cluster pods is failed. Reason: %v",
err,
)
return err
}

nodeSVCs := &k8scorev1.ServiceList{}
err = r.List(context.Background(), nodeSVCs, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
models.ClusterIDLabel: c.Status.ID,
models.NodeIDLabel: node.ID,
}),
Namespace: c.Namespace,
})
if err != nil {
l.Error(err, "Cannot get services backed by on-premises cluster pods",
"cluster name", c.Spec.Name,
"clusterID", c.Status.ID,
)

r.EventRecorder.Eventf(
c, models.Warning, models.CreationFailed,
"Fetching services backed by on-premises cluster pods is failed. Reason: %v",
err,
)
return err
}

for _, pod := range nodePods.Items {
if (pod.Status.PodIP != "" && node.PrivateAddress == "") ||
(pod.Status.PodIP != "" && pod.Status.PodIP != node.PrivateAddress) {
request.PrivateAddress = pod.Status.PodIP
}
}

for _, svc := range nodeSVCs.Items {
if (svc.Status.LoadBalancer.Ingress[0].IP != "" && node.PublicAddress == "") ||
(svc.Status.LoadBalancer.Ingress[0].IP != node.PublicAddress) {
request.PublicAddress = svc.Status.LoadBalancer.Ingress[0].IP
}
}

if request.PublicAddress != "" || request.PrivateAddress != "" {
err = r.IcAdminAPI.SetNodeIPs(node.ID, request)
if err != nil {
l.Error(err, "Cannot set IPs for on-premises cluster nodes",
"cluster name", c.Spec.Name,
"clusterID", c.Status.ID,
)

r.EventRecorder.Eventf(
c, models.Warning, models.CreationFailed,
"Setting IPs for on-premises cluster nodes is failed. Reason: %v",
err,
)
return err
}

l.Info("IPs for on-premises cluster node are set",
"cluster name", c.Spec.Name,
"clusterID", c.Status.ID,
"request", request,
)
r.EventRecorder.Eventf(
c, models.Normal, models.Created,
"Nodes IPs are set",
)
}
}
return nil
}
}

func (r *CassandraReconciler) newWatchStatusJob(cassandra *v1beta1.Cassandra) scheduler.Job {
l := log.Log.WithValues("component", "CassandraStatusClusterJob")
return func() error {
Expand Down
Loading

0 comments on commit 705e698

Please sign in to comment.