Skip to content

Commit

Permalink
On premises flow was added for all cluster resources
Browse files Browse the repository at this point in the history
  • Loading branch information
testisnullus committed Nov 16, 2023
1 parent 1c8927a commit e446339
Show file tree
Hide file tree
Showing 39 changed files with 3,359 additions and 921 deletions.
79 changes: 78 additions & 1 deletion apis/clusters/v1beta1/cadence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"fmt"
"regexp"

k8scorev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/instaclustr/operator/pkg/models"
Expand Down Expand Up @@ -62,7 +64,8 @@ type BundledOpenSearchSpec struct {

// CadenceSpec defines the desired state of Cadence
type CadenceSpec struct {
Cluster `json:",inline"`
Cluster `json:",inline"`
OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"`
//+kubebuilder:validation:MinItems:=1
//+kubebuilder:validation:MaxItems:=1
DataCentres []*CadenceDataCentre `json:"dataCentres"`
Expand Down Expand Up @@ -793,3 +796,77 @@ func (o *BundledOpenSearchSpec) validate() error {

return nil
}

func (c *Cadence) NewExposePorts() []k8scorev1.ServicePort {
var ports []k8scorev1.ServicePort
ports = []k8scorev1.ServicePort{{
Name: models.SSH,
Port: models.Port22,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port22,
},
},
}

if !c.Spec.PrivateNetworkCluster {
additionalPorts := []k8scorev1.ServicePort{
{
Name: models.CadenceTChannel,
Port: models.Port7933,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7933,
},
},
{
Name: models.CadenceWeb,
Port: models.Port8088,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port8088,
},
},
}
if c.Spec.DataCentres[0].ClientEncryption {
sslPort := k8scorev1.ServicePort{
Name: models.CadenceGRPC,
Port: models.Port7833,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7833,
},
}
additionalPorts = append(additionalPorts, sslPort)
}
ports = append(ports, additionalPorts...)
}

return ports
}

func (c *Cadence) NewHeadlessPorts() []k8scorev1.ServicePort {
ports := []k8scorev1.ServicePort{
{
Name: models.CadenceTChannel,
Port: models.Port7933,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7933,
},
},
}
if c.Spec.DataCentres[0].ClientEncryption {
sslPort := k8scorev1.ServicePort{
Name: models.CadenceGRPC,
Port: models.Port7833,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7833,
},
}
ports = append(ports, sslPort)
}

return ports
}
50 changes: 47 additions & 3 deletions apis/clusters/v1beta1/cadence_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1beta1
import (
"context"
"fmt"
"regexp"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -81,6 +82,37 @@ func (cv *cadenceValidator) ValidateCreate(ctx context.Context, obj runtime.Obje
return err
}

if c.Spec.OnPremisesSpec != nil {
osDiskSizeMatched, err := regexp.Match(models.StorageRegExp, []byte(c.Spec.OnPremisesSpec.OSDiskSize))
if !osDiskSizeMatched || err != nil {
return fmt.Errorf("disk size field for node OS must fit pattern: %s",
models.StorageRegExp)
}

dataDiskSizeMatched, err := regexp.Match(models.StorageRegExp, []byte(c.Spec.OnPremisesSpec.DataDiskSize))
if !dataDiskSizeMatched || err != nil {
return fmt.Errorf("disk size field for storring cluster data must fit pattern: %s",
models.StorageRegExp)
}

nodeMemoryMatched, err := regexp.Match(models.MemoryRegExp, []byte(c.Spec.OnPremisesSpec.DataDiskSize))
if !nodeMemoryMatched || err != nil {
return fmt.Errorf("node memory field must fit pattern: %s",
models.MemoryRegExp)
}

if c.Spec.PrivateNetworkCluster {
if c.Spec.OnPremisesSpec.SSHGatewayCPU == 0 || c.Spec.OnPremisesSpec.SSHGatewayMemory == "" {
return fmt.Errorf("fields SSHGatewayCPU and SSHGatewayMemory must not be empty")
}
sshGatewayMemoryMatched, err := regexp.Match(models.MemoryRegExp, []byte(c.Spec.OnPremisesSpec.DataDiskSize))
if !sshGatewayMemoryMatched || err != nil {
return fmt.Errorf("ssh gateway memory field must fit pattern: %s",
models.MemoryRegExp)
}
}
}

appVersions, err := cv.API.ListAppVersions(models.CadenceAppKind)
if err != nil {
return fmt.Errorf("cannot list versions for kind: %v, err: %w",
Expand Down Expand Up @@ -169,10 +201,22 @@ func (cv *cadenceValidator) ValidateCreate(ctx context.Context, obj runtime.Obje
return fmt.Errorf("data centres field is empty")
}

//TODO: add support of multiple DCs for OnPrem clusters
if len(c.Spec.DataCentres) > 1 && c.Spec.OnPremisesSpec != nil {
return fmt.Errorf("on-premises cluster can be provisioned with only one data centre")
}

for _, dc := range c.Spec.DataCentres {
err := dc.DataCentre.ValidateCreation()
if err != nil {
return err
if c.Spec.OnPremisesSpec != nil {
err := dc.DataCentre.ValidateOnPremisesCreation()
if err != nil {
return err
}
} else {
err := dc.DataCentre.ValidateCreation()
if err != nil {
return err
}
}

if !c.Spec.PrivateNetworkCluster && dc.PrivateLink != nil {
Expand Down
93 changes: 93 additions & 0 deletions apis/clusters/v1beta1/cassandra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"strconv"

k8scorev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -56,6 +58,7 @@ type CassandraRestoreFrom struct {
// CassandraSpec defines the desired state of Cassandra
type CassandraSpec struct {
RestoreFrom *CassandraRestoreFrom `json:"restoreFrom,omitempty"`
OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"`
Cluster `json:",inline"`
DataCentres []*CassandraDataCentre `json:"dataCentres,omitempty"`
LuceneEnabled bool `json:"luceneEnabled,omitempty"`
Expand Down Expand Up @@ -522,3 +525,93 @@ func (c *Cassandra) SetClusterID(id string) {
func init() {
SchemeBuilder.Register(&Cassandra{}, &CassandraList{})
}

func (c *Cassandra) NewExposePorts() []k8scorev1.ServicePort {
var ports []k8scorev1.ServicePort
ports = []k8scorev1.ServicePort{{
Name: models.SSH,
Port: models.Port22,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port22,
},
},
}

if !c.Spec.PrivateNetworkCluster {
additionalPorts := []k8scorev1.ServicePort{
{
Name: models.CassandraInterNode,
Port: models.Port7000,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7000,
},
},
{
Name: models.CassandraCQL,
Port: models.Port9042,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9042,
},
},
{
Name: models.CassandraJMX,
Port: models.Port7199,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7199,
},
},
}
if c.Spec.DataCentres[0].ClientToClusterEncryption {
sslPort := k8scorev1.ServicePort{
Name: models.CassandraSSL,
Port: models.Port7001,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7001,
},
}
additionalPorts = append(additionalPorts, sslPort)
}
ports = append(ports, additionalPorts...)
}

return ports
}

func (c *Cassandra) NewHeadlessPorts() []k8scorev1.ServicePort {
ports := []k8scorev1.ServicePort{
{
Name: models.CassandraInterNode,
Port: models.Port7000,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7000,
},
},
{
Name: models.CassandraCQL,
Port: models.Port9042,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port9042,
},
},
}
if c.Spec.DataCentres[0].ClientToClusterEncryption {
sslPort := k8scorev1.ServicePort{
Name: models.CassandraSSL,
Port: models.Port7001,
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: models.Port7001,
},
}
ports = append(ports, sslPort)
}

return ports
}
50 changes: 47 additions & 3 deletions apis/clusters/v1beta1/cassandra_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1beta1
import (
"context"
"fmt"
"regexp"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -91,6 +92,37 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob
return fmt.Errorf("spark should not have more than 1 item")
}

if c.Spec.OnPremisesSpec != nil {
osDiskSizeMatched, err := regexp.Match(models.StorageRegExp, []byte(c.Spec.OnPremisesSpec.OSDiskSize))
if !osDiskSizeMatched || err != nil {
return fmt.Errorf("disk size field for node OS must fit pattern: %s",
models.StorageRegExp)
}

dataDiskSizeMatched, err := regexp.Match(models.StorageRegExp, []byte(c.Spec.OnPremisesSpec.DataDiskSize))
if !dataDiskSizeMatched || err != nil {
return fmt.Errorf("disk size field for storring cluster data must fit pattern: %s",
models.StorageRegExp)
}

nodeMemoryMatched, err := regexp.Match(models.MemoryRegExp, []byte(c.Spec.OnPremisesSpec.DataDiskSize))
if !nodeMemoryMatched || err != nil {
return fmt.Errorf("node memory field must fit pattern: %s",
models.MemoryRegExp)
}

if c.Spec.PrivateNetworkCluster {
if c.Spec.OnPremisesSpec.SSHGatewayCPU == 0 || c.Spec.OnPremisesSpec.SSHGatewayMemory == "" {
return fmt.Errorf("fields SSHGatewayCPU and SSHGatewayMemory must not be empty")
}
sshGatewayMemoryMatched, err := regexp.Match(models.MemoryRegExp, []byte(c.Spec.OnPremisesSpec.DataDiskSize))
if !sshGatewayMemoryMatched || err != nil {
return fmt.Errorf("ssh gateway memory field must fit pattern: %s",
models.MemoryRegExp)
}
}
}

appVersions, err := cv.API.ListAppVersions(models.CassandraAppKind)
if err != nil {
return fmt.Errorf("cannot list versions for kind: %v, err: %w",
Expand All @@ -113,10 +145,22 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob
return fmt.Errorf("data centres field is empty")
}

//TODO: add support of multiple DCs for OnPrem clusters
if len(c.Spec.DataCentres) > 1 && c.Spec.OnPremisesSpec != nil {
return fmt.Errorf("on-premises cluster can be provisioned with only one data centre")
}

for _, dc := range c.Spec.DataCentres {
err := dc.DataCentre.ValidateCreation()
if err != nil {
return err
if c.Spec.OnPremisesSpec != nil {
err := dc.DataCentre.ValidateOnPremisesCreation()
if err != nil {
return err
}
} else {
err := dc.DataCentre.ValidateCreation()
if err != nil {
return err
}
}

if !c.Spec.PrivateNetworkCluster && dc.PrivateIPBroadcastForDiscovery {
Expand Down
Loading

0 comments on commit e446339

Please sign in to comment.