Skip to content

Commit

Permalink
On premises flow was added for all cluster resources
Browse files Browse the repository at this point in the history
  • Loading branch information
testisnullus committed Nov 15, 2023
1 parent 1c8927a commit 4661adc
Show file tree
Hide file tree
Showing 6 changed files with 451 additions and 58 deletions.
82 changes: 82 additions & 0 deletions apis/clusters/v1beta1/cassandra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"strconv"

k8scorev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -56,6 +58,7 @@ type CassandraRestoreFrom struct {
// CassandraSpec defines the desired state of Cassandra
type CassandraSpec struct {
RestoreFrom *CassandraRestoreFrom `json:"restoreFrom,omitempty"`
OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"`
Cluster `json:",inline"`
DataCentres []*CassandraDataCentre `json:"dataCentres,omitempty"`
LuceneEnabled bool `json:"luceneEnabled,omitempty"`
Expand Down Expand Up @@ -522,3 +525,82 @@ func (c *Cassandra) SetClusterID(id string) {
func init() {
SchemeBuilder.Register(&Cassandra{}, &CassandraList{})
}

func (c *Cassandra) NewExposePorts() []k8scorev1.ServicePort {
var ports []k8scorev1.ServicePort
ports = []k8scorev1.ServicePort{{
Name: models.SSH,
Port: models.Port22,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port22,
},
},
}

if !c.Spec.PrivateNetworkCluster {
additionalPorts := []k8scorev1.ServicePort{
{
Name: models.InterNode,
Port: models.Port7000,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7000,
},
},
{
Name: models.CQLSH,
Port: models.Port9042,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9042,
},
},
{
Name: models.JMX,
Port: models.Port7199,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7199,
},
},
}
if c.Spec.DataCentres[0].ClientToClusterEncryption {
sslPort := k8scorev1.ServicePort{
Name: models.SSL,
Port: models.Port7001,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7001,
},
}
additionalPorts = append(additionalPorts, sslPort)
}
ports = append(ports, additionalPorts...)
}

return ports
}

func (c *Cassandra) NewHeadlessPorts() []k8scorev1.ServicePort {
ports := []k8scorev1.ServicePort{
{
Name: models.InterNode,
Port: models.Port7000,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7000,
},
},
{
Name: models.CQLSH,
Port: models.Port9042,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9042,
},
},
}

return ports
}
12 changes: 12 additions & 0 deletions apis/clusters/v1beta1/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ type ClusteredMaintenanceEvent struct {
Upcoming []*clusterresource.MaintenanceEventStatus `json:"upcoming"`
}

type OnPremisesSpec struct {
StorageClassName string `json:"storageClassName"`
OSDiskSize string `json:"osDiskSize"`
DataDiskSize string `json:"dataDiskSize"`
SSHGatewayCPU int64 `json:"sshGatewayCPU,omitempty"`
SSHGatewayMemory string `json:"sshGatewayMemory,omitempty"`
NodeCPU int64 `json:"nodeCPU"`
NodeMemory string `json:"nodeMemory"`
OSImageURL string `json:"osImageURL"`
CloudInitScriptRef *Reference `json:"cloudInitScriptNamespacedName"`
}

type TwoFactorDelete struct {
// Email address which will be contacted when the cluster is requested to be deleted.
Email string `json:"email"`
Expand Down
149 changes: 92 additions & 57 deletions controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ import (
"github.com/instaclustr/operator/pkg/scheduler"
)

const (
StatusRUNNING = "RUNNING"
)

// CassandraReconciler reconciles a Cassandra object
type CassandraReconciler struct {
client.Client
Expand Down Expand Up @@ -112,27 +108,27 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
func (r *CassandraReconciler) handleCreateCluster(
ctx context.Context,
l logr.Logger,
cassandra *v1beta1.Cassandra,
c *v1beta1.Cassandra,
) (reconcile.Result, error) {
l = l.WithName("Cassandra creation event")
var err error
patch := cassandra.NewPatch()
if cassandra.Status.ID == "" {
patch := c.NewPatch()
if c.Status.ID == "" {
var id string
if cassandra.Spec.HasRestore() {
if c.Spec.HasRestore() {
l.Info(
"Creating cluster from backup",
"original cluster ID", cassandra.Spec.RestoreFrom.ClusterID,
"original cluster ID", c.Spec.RestoreFrom.ClusterID,
)

id, err = r.API.RestoreCluster(cassandra.RestoreInfoToInstAPI(cassandra.Spec.RestoreFrom), models.CassandraAppKind)
id, err = r.API.RestoreCluster(c.RestoreInfoToInstAPI(c.Spec.RestoreFrom), models.CassandraAppKind)
if err != nil {
l.Error(err, "Cannot restore cluster from backup",
"original cluster ID", cassandra.Spec.RestoreFrom.ClusterID,
"original cluster ID", c.Spec.RestoreFrom.ClusterID,
)

r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
c, models.Warning, models.CreationFailed,
"Cluster restore from backup on the Instaclustr is failed. Reason: %v",
err,
)
Expand All @@ -141,72 +137,72 @@ func (r *CassandraReconciler) handleCreateCluster(
}

r.EventRecorder.Eventf(
cassandra, models.Normal, models.Created,
c, models.Normal, models.Created,
"Cluster restore request is sent. Original cluster ID: %s, new cluster ID: %s",
cassandra.Spec.RestoreFrom.ClusterID,
c.Spec.RestoreFrom.ClusterID,
id,
)
} else {
l.Info(
"Creating cluster",
"cluster name", cassandra.Spec.Name,
"data centres", cassandra.Spec.DataCentres,
"cluster name", c.Spec.Name,
"data centres", c.Spec.DataCentres,
)

id, err = r.API.CreateCluster(instaclustr.CassandraEndpoint, cassandra.Spec.ToInstAPI())
id, err = r.API.CreateCluster(instaclustr.CassandraEndpoint, c.Spec.ToInstAPI())
if err != nil {
l.Error(
err, "Cannot create cluster",
"cluster spec", cassandra.Spec,
"cluster spec", c.Spec,
)
r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
c, models.Warning, models.CreationFailed,
"Cluster creation on the Instaclustr is failed. Reason: %v",
err,
)
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
cassandra, models.Normal, models.Created,
c, models.Normal, models.Created,
"Cluster creation request is sent. Cluster ID: %s",
id,
)
}

cassandra.Status.ID = id
err = r.Status().Patch(ctx, cassandra, patch)
c.Status.ID = id
err = r.Status().Patch(ctx, c, patch)
if err != nil {
l.Error(err, "Cannot patch cluster status",
"cluster name", cassandra.Spec.Name,
"cluster ID", cassandra.Status.ID,
"kind", cassandra.Kind,
"api Version", cassandra.APIVersion,
"namespace", cassandra.Namespace,
"cluster metadata", cassandra.ObjectMeta,
"cluster name", c.Spec.Name,
"cluster ID", c.Status.ID,
"kind", c.Kind,
"api Version", c.APIVersion,
"namespace", c.Namespace,
"cluster metadata", c.ObjectMeta,
)
r.EventRecorder.Eventf(
cassandra, models.Warning, models.PatchFailed,
c, models.Warning, models.PatchFailed,
"Cluster resource status patch is failed. Reason: %v",
err,
)
return reconcile.Result{}, err
}

controllerutil.AddFinalizer(cassandra, models.DeletionFinalizer)
cassandra.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent
err = r.Patch(ctx, cassandra, patch)
controllerutil.AddFinalizer(c, models.DeletionFinalizer)
c.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent
err = r.Patch(ctx, c, patch)
if err != nil {
l.Error(err, "Cannot patch cluster",
"cluster name", cassandra.Spec.Name,
"cluster ID", cassandra.Status.ID,
"kind", cassandra.Kind,
"api Version", cassandra.APIVersion,
"namespace", cassandra.Namespace,
"cluster metadata", cassandra.ObjectMeta,
"cluster name", c.Spec.Name,
"cluster ID", c.Status.ID,
"kind", c.Kind,
"api Version", c.APIVersion,
"namespace", c.Namespace,
"cluster metadata", c.ObjectMeta,
)
r.EventRecorder.Eventf(
cassandra, models.Warning, models.PatchFailed,
c, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v",
err,
)
Expand All @@ -215,62 +211,101 @@ func (r *CassandraReconciler) handleCreateCluster(

l.Info(
"Cluster has been created",
"cluster name", cassandra.Spec.Name,
"cluster ID", cassandra.Status.ID,
"kind", cassandra.Kind,
"api Version", cassandra.APIVersion,
"namespace", cassandra.Namespace,
"cluster name", c.Spec.Name,
"cluster ID", c.Status.ID,
"kind", c.Kind,
"api Version", c.APIVersion,
"namespace", c.Namespace,
)
}

if cassandra.Status.State != models.DeletedStatus {
err = r.startClusterStatusJob(cassandra)
if c.Status.State != models.DeletedStatus {
err = r.startClusterStatusJob(c)
if err != nil {
l.Error(err, "Cannot start cluster status job",
"cassandra cluster ID", cassandra.Status.ID)
"c cluster ID", c.Status.ID)

r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
c, models.Warning, models.CreationFailed,
"Cluster status check job is failed. Reason: %v",
err,
)
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
cassandra, models.Normal, models.Created,
c, models.Normal, models.Created,
"Cluster status check job is started",
)

err = r.startClusterBackupsJob(cassandra)
if c.Spec.OnPremisesSpec != nil {
if len(c.Status.DataCentres) > 0 && c.Status.State != models.RunningStatus {
err := handleCreateOnPremisesClusterResources(
ctx,
r.Client,
c,
c.Status.ID,
c.Status.DataCentres[0].ID,
c.Spec.OnPremisesSpec,
c.Status.ClusterStatus,
c.NewExposePorts(),
c.NewHeadlessPorts(),
c.Spec.PrivateNetworkCluster)
if err != nil {
l.Error(
err, "Cannot create resources for on-premises cluster",
"cluster spec", c.Spec.OnPremisesSpec,
)
r.EventRecorder.Eventf(
c, models.Warning, models.CreationFailed,
"Resources creation for on-premises cluster is failed. Reason: %v",
err,
)
return reconcile.Result{}, err
}

l.Info(
"On-premises resources have been created",
"cluster name", c.Spec.Name,
"on-premises Spec", c.Spec.OnPremisesSpec,
"cluster ID", c.Status.ID,
)

} else {
l.Info("Waiting for Data Centres provisioning...")
return models.ReconcileRequeue, nil
}
}

err = r.startClusterBackupsJob(c)
if err != nil {
l.Error(err, "Cannot start cluster backups check job",
"cluster ID", cassandra.Status.ID,
"cluster ID", c.Status.ID,
)

r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
c, models.Warning, models.CreationFailed,
"Cluster backups check job is failed. Reason: %v",
err,
)
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
cassandra, models.Normal, models.Created,
c, models.Normal, models.Created,
"Cluster backups check job is started",
)

if cassandra.Spec.UserRefs != nil && cassandra.Status.AvailableUsers == nil {
err = r.startUsersCreationJob(cassandra)
if c.Spec.UserRefs != nil && c.Status.AvailableUsers == nil {
err = r.startUsersCreationJob(c)
if err != nil {
l.Error(err, "Failed to start user creation job")
r.EventRecorder.Eventf(cassandra, models.Warning, models.CreationFailed,
r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed,
"User creation job is failed. Reason: %v", err)
return reconcile.Result{}, err
}

r.EventRecorder.Event(cassandra, models.Normal, models.Created,
r.EventRecorder.Event(c, models.Normal, models.Created,
"Cluster user creation job is started")
}
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/clusters/kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (r *KafkaReconciler) handleUpdateCluster(
return reconcile.Result{}, err
}

if iKafka.Status.ClusterStatus.State != StatusRUNNING {
if iKafka.Status.ClusterStatus.State != models.RunningStatus {
l.Error(instaclustr.ClusterNotRunning, "Unable to update cluster, cluster still not running",
"cluster name", k.Spec.Name,
"cluster state", iKafka.Status.ClusterStatus.State)
Expand Down
Loading

0 comments on commit 4661adc

Please sign in to comment.