From 1133a55c8f07ecc762abedef1830708b99de9e31 Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Fri, 2 Feb 2024 16:27:09 +0200 Subject: [PATCH] cassandra code base was refactored --- .secrets.baseline | 16 +- apis/clusters/v1beta1/cassandra_types.go | 330 +++++++----- apis/clusters/v1beta1/cassandra_webhook.go | 24 +- apis/clusters/v1beta1/structs.go | 36 ++ apis/clusters/v1beta1/validation.go | 28 + .../clusters/v1beta1/zz_generated.deepcopy.go | 71 ++- .../clusters.instaclustr.com_cassandras.yaml | 34 +- .../samples/clusters_v1beta1_cassandra.yaml | 14 +- .../samples/clusters_v1beta1_opensearch.yaml | 2 +- controllers/clusters/cadence_controller.go | 22 +- controllers/clusters/cassandra_controller.go | 486 +++++++++--------- pkg/instaclustr/client.go | 10 +- pkg/instaclustr/interfaces.go | 2 +- pkg/instaclustr/mock/client.go | 2 +- pkg/models/cassandra_apiv2.go | 43 +- pkg/utils/slices/slices.go | 73 +++ pkg/utils/slices/slices_test.go | 300 +++++++++++ 17 files changed, 1027 insertions(+), 466 deletions(-) create mode 100644 pkg/utils/slices/slices.go create mode 100644 pkg/utils/slices/slices_test.go diff --git a/.secrets.baseline b/.secrets.baseline index de4dd6a77..4f5e10c6e 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -199,23 +199,23 @@ { "type": "Secret Keyword", "filename": "apis/clusters/v1beta1/cassandra_types.go", - "hashed_secret": "4d2d63a69bc8074b20a4edcdcbbfa2b81d791543", + "hashed_secret": "331cc743251c3b9504229de4d139c539da121a33", "is_verified": false, - "line_number": 257 + "line_number": 263 }, { "type": "Secret Keyword", "filename": "apis/clusters/v1beta1/cassandra_types.go", - "hashed_secret": "e0a46b27231f798fe22dc4d5d82b5feeb5dcf085", + "hashed_secret": "0ad8d7005e084d4f028a4277b73c6fab24269c17", "is_verified": false, - "line_number": 313 + "line_number": 349 }, { "type": "Secret Keyword", "filename": "apis/clusters/v1beta1/cassandra_types.go", - "hashed_secret": "e7f873437cda278898e12c04e623fcbefc193cb8", + "hashed_secret": "e0a46b27231f798fe22dc4d5d82b5feeb5dcf085", "is_verified": false, - "line_number": 349 + "line_number": 414 } ], "apis/clusters/v1beta1/cassandra_webhook.go": [ @@ -783,7 +783,7 @@ "filename": "pkg/instaclustr/client.go", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 2017 + "line_number": 2048 } ], "pkg/instaclustr/mock/client.go": [ @@ -1174,5 +1174,5 @@ } ] }, - "generated_at": "2024-02-02T08:53:13Z" + "generated_at": "2024-02-05T09:41:58Z" } diff --git a/apis/clusters/v1beta1/cassandra_types.go b/apis/clusters/v1beta1/cassandra_types.go index 8129e7a1b..c17bbc3d2 100644 --- a/apis/clusters/v1beta1/cassandra_types.go +++ b/apis/clusters/v1beta1/cassandra_types.go @@ -17,7 +17,6 @@ limitations under the License. package v1beta1 import ( - "encoding/json" "strconv" k8scorev1 "k8s.io/api/core/v1" @@ -52,38 +51,142 @@ type CassandraRestoreFrom struct { // CassandraSpec defines the desired state of Cassandra type CassandraSpec struct { - RestoreFrom *CassandraRestoreFrom `json:"restoreFrom,omitempty"` - OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"` - Cluster `json:",inline"` + GenericClusterSpec `json:",inline"` + + RestoreFrom *CassandraRestoreFrom `json:"restoreFrom,omitempty"` + OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"` DataCentres []*CassandraDataCentre `json:"dataCentres,omitempty"` LuceneEnabled bool `json:"luceneEnabled,omitempty"` PasswordAndUserAuth bool `json:"passwordAndUserAuth,omitempty"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` UserRefs References `json:"userRefs,omitempty"` //+kubebuilder:validate:MaxItems:=1 - ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` + ResizeSettings GenericResizeSettings `json:"resizeSettings,omitempty"` } // CassandraStatus defines the observed state of Cassandra type CassandraStatus struct { - ClusterStatus `json:",inline"` + GenericStatus `json:",inline"` + DataCentres []*CassandraDataCentreStatus `json:"dataCentres,omitempty"` + AvailableUsers References `json:"availableUsers,omitempty"` } -type CassandraDataCentre struct { - DataCentre `json:",inline"` - ContinuousBackup bool `json:"continuousBackup"` - PrivateIPBroadcastForDiscovery bool `json:"privateIpBroadcastForDiscovery"` - ClientToClusterEncryption bool `json:"clientToClusterEncryption"` - ReplicationFactor int `json:"replicationFactor"` +func (s *CassandraStatus) ToOnPremises() ClusterStatus { + dc := &DataCentreStatus{ + ID: s.DataCentres[0].ID, + Nodes: s.DataCentres[0].Nodes, + } + + return ClusterStatus{ + ID: s.ID, + DataCentres: []*DataCentreStatus{dc}, + } +} +func (s *CassandraStatus) Equals(o *CassandraStatus) bool { + return s.GenericStatus.Equals(&o.GenericStatus) && + s.DCEquals(o.DataCentres) +} + +func (s *CassandraStatus) DCEquals(o []*CassandraDataCentreStatus) bool { + if len(s.DataCentres) != len(o) { + return false + } + + sMap := map[string]*CassandraDataCentreStatus{} + for _, dc := range s.DataCentres { + sMap[dc.Name] = dc + } + + for _, oDC := range o { + sDC, ok := sMap[oDC.Name] + if !ok { + return false + } + + if !sDC.Equals(oDC) { + return false + } + } + + return true +} + +type CassandraDataCentre struct { + GenericDataCentreSpec `json:",inline"` + + ContinuousBackup bool `json:"continuousBackup"` + PrivateIPBroadcastForDiscovery bool `json:"privateIpBroadcastForDiscovery"` + PrivateLink bool `json:"privateLink,omitempty"` + ClientToClusterEncryption bool `json:"clientToClusterEncryption"` + ReplicationFactor int `json:"replicationFactor"` + NodesNumber int `json:"nodesNumber"` + NodeSize string `json:"nodeSize"` // Adds the specified version of Debezium Connector Cassandra to the Cassandra cluster // +kubebuilder:validation:MaxItems=1 Debezium []DebeziumCassandraSpec `json:"debezium,omitempty"` - PrivateLink bool `json:"privateLink,omitempty"` ShotoverProxy []ShotoverProxySpec `json:"shotoverProxy,omitempty"` } +func (dc *CassandraDataCentre) Equals(o *CassandraDataCentre) bool { + return dc.GenericDataCentreSpec.Equals(&o.GenericDataCentreSpec) && + dc.ContinuousBackup == o.ContinuousBackup && + dc.PrivateIPBroadcastForDiscovery == o.PrivateIPBroadcastForDiscovery && + dc.PrivateLink == o.PrivateLink && + dc.ClientToClusterEncryption == o.ClientToClusterEncryption && + dc.ReplicationFactor == o.ReplicationFactor && + dc.NodesNumber == o.NodesNumber && + dc.NodeSize == o.NodeSize && + dc.DebeziumEquals(o) && + dc.ShotoverProxyEquals(o) +} + +type CassandraDataCentreStatus struct { + GenericDataCentreStatus `json:",inline"` + Nodes []*Node `json:"nodes"` +} + +func (s *CassandraDataCentreStatus) Equals(o *CassandraDataCentreStatus) bool { + return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) && + s.nodesEqual(o.Nodes) +} + +func (s *CassandraDataCentreStatus) nodesEqual(nodes []*Node) bool { + if len(s.Nodes) != len(nodes) { + return false + } + + sNodes := map[string]*Node{} + for _, node := range s.Nodes { + sNodes[node.ID] = node + } + + for _, node := range nodes { + sNode, ok := sNodes[node.ID] + if !ok { + return false + } + + if !sNode.Equals(node) { + return false + } + } + + return true +} + +func (s *CassandraDataCentreStatus) FromInstAPI(instModel *models.CassandraDataCentre) { + s.GenericDataCentreStatus.FromInstAPI(&instModel.GenericDataCentreFields) + + s.Nodes = make([]*Node, len(instModel.Nodes)) + for i, instNode := range instModel.Nodes { + node := &Node{} + node.FromInstAPI(instNode) + s.Nodes[i] = node + } +} + type ShotoverProxySpec struct { NodeSize string `json:"nodeSize"` } @@ -153,6 +256,14 @@ func (d *CassandraDataCentre) ShotoverProxyEquals(new *CassandraDataCentre) bool return true } +func (c *CassandraSpec) IsEqual(o *CassandraSpec) bool { + return c.GenericClusterSpec.Equals(&o.GenericClusterSpec) && + c.AreDCsEqual(o.DataCentres) && + c.LuceneEnabled == o.LuceneEnabled && + c.PasswordAndUserAuth == o.PasswordAndUserAuth && + c.BundledUseOnly == o.BundledUseOnly +} + //+kubebuilder:object:root=true //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" @@ -211,19 +322,9 @@ func (c *Cassandra) NewBackupSpec(startTimestamp int) *clusterresourcesv1beta1.C } } -func (c *Cassandra) FromInstAPI(iData []byte) (*Cassandra, error) { - iCass := &models.CassandraCluster{} - err := json.Unmarshal(iData, iCass) - if err != nil { - return nil, err - } - - return &Cassandra{ - TypeMeta: c.TypeMeta, - ObjectMeta: c.ObjectMeta, - Spec: c.Spec.FromInstAPI(iCass), - Status: c.Status.FromInstAPI(iCass), - }, nil +func (c *Cassandra) FromInstAPI(instModel *models.CassandraCluster) { + c.Spec.FromInstAPI(instModel) + c.Status.FromInstAPI(instModel) } func (cs *CassandraSpec) HasRestore() bool { @@ -237,88 +338,83 @@ func (cs *CassandraSpec) HasRestore() bool { func (cs *CassandraSpec) DCsUpdateToInstAPI() models.CassandraClusterAPIUpdate { return models.CassandraClusterAPIUpdate{ DataCentres: cs.DCsToInstAPI(), - ResizeSettings: resizeSettingsToInstAPI(cs.ResizeSettings), + ResizeSettings: cs.ResizeSettings.ToInstAPI(), } } -func (cs *CassandraSpec) FromInstAPI(iCass *models.CassandraCluster) CassandraSpec { - return CassandraSpec{ - Cluster: Cluster{ - Name: iCass.Name, - Version: iCass.CassandraVersion, - PCICompliance: iCass.PCIComplianceMode, - PrivateNetworkCluster: iCass.PrivateNetworkCluster, - SLATier: iCass.SLATier, - TwoFactorDelete: cs.Cluster.TwoFactorDeleteFromInstAPI(iCass.TwoFactorDelete), - Description: iCass.Description, - }, - DataCentres: cs.DCsFromInstAPI(iCass.DataCentres), - LuceneEnabled: iCass.LuceneEnabled, - PasswordAndUserAuth: iCass.PasswordAndUserAuth, - BundledUseOnly: iCass.BundledUseOnly, - ResizeSettings: resizeSettingsFromInstAPI(iCass.ResizeSettings), - } -} - -func (cs *CassandraSpec) DebeziumFromInstAPI(iDebeziums []*models.Debezium) (dcs []DebeziumCassandraSpec) { - var debeziums []DebeziumCassandraSpec - for _, iDebezium := range iDebeziums { - debeziums = append(debeziums, DebeziumCassandraSpec{ - KafkaVPCType: iDebezium.KafkaVPCType, - KafkaTopicPrefix: iDebezium.KafkaTopicPrefix, - KafkaDataCentreID: iDebezium.KafkaDataCentreID, - Version: iDebezium.Version, - }) +func (cs *CassandraSpec) FromInstAPI(instModel *models.CassandraCluster) { + cs.GenericClusterSpec.FromInstAPI(&instModel.GenericClusterFields) + + cs.LuceneEnabled = instModel.LuceneEnabled + cs.PasswordAndUserAuth = instModel.PasswordAndUserAuth + cs.BundledUseOnly = instModel.BundledUseOnly + cs.Version = instModel.CassandraVersion + cs.ResizeSettings.FromInstAPI(instModel.ResizeSettings) + + cs.dcsFromInstAPI(instModel.DataCentres) +} + +func (cs *CassandraSpec) dcsFromInstAPI(instModels []*models.CassandraDataCentre) { + cs.DataCentres = make([]*CassandraDataCentre, len(instModels)) + for i, instModel := range instModels { + dc := &CassandraDataCentre{} + dc.FromInstAPI(instModel) + cs.DataCentres[i] = dc } - return debeziums } -func (cs *CassandraSpec) ShotoverProxyFromInstAPI(iShotoverProxys []*models.ShotoverProxy) (sps []ShotoverProxySpec) { - for _, iShotoverProxy := range iShotoverProxys { - sps = append(sps, ShotoverProxySpec{ - NodeSize: iShotoverProxy.NodeSize, +func (d *CassandraDataCentre) FromInstAPI(instModel *models.CassandraDataCentre) { + d.GenericDataCentreSpec.FromInstAPI(&instModel.GenericDataCentreFields) + + d.ContinuousBackup = instModel.ContinuousBackup + d.PrivateIPBroadcastForDiscovery = instModel.PrivateIPBroadcastForDiscovery + d.PrivateLink = instModel.PrivateLink + d.ClientToClusterEncryption = instModel.ClientToClusterEncryption + d.ReplicationFactor = instModel.ReplicationFactor + d.NodesNumber = instModel.NumberOfNodes + d.NodeSize = instModel.NodeSize + + d.debeziumFromInstAPI(instModel.Debezium) + d.shotoverProxyFromInstAPI(instModel.ShotoverProxy) +} + +func (cs *CassandraDataCentre) debeziumFromInstAPI(instModels []*models.Debezium) { + cs.Debezium = make([]DebeziumCassandraSpec, 0, len(instModels)) + for _, instModel := range instModels { + cs.Debezium = append(cs.Debezium, DebeziumCassandraSpec{ + KafkaVPCType: instModel.KafkaVPCType, + KafkaTopicPrefix: instModel.KafkaTopicPrefix, + KafkaDataCentreID: instModel.KafkaDataCentreID, + Version: instModel.Version, }) } - return sps } -func (cs *CassandraSpec) DCsFromInstAPI(iDCs []*models.CassandraDataCentre) (dcs []*CassandraDataCentre) { - for _, iDC := range iDCs { - dcs = append(dcs, &CassandraDataCentre{ - DataCentre: cs.Cluster.DCFromInstAPI(iDC.DataCentre), - ContinuousBackup: iDC.ContinuousBackup, - PrivateIPBroadcastForDiscovery: iDC.PrivateIPBroadcastForDiscovery, - ClientToClusterEncryption: iDC.ClientToClusterEncryption, - ReplicationFactor: iDC.ReplicationFactor, - PrivateLink: iDC.PrivateLink, - Debezium: cs.DebeziumFromInstAPI(iDC.Debezium), - ShotoverProxy: cs.ShotoverProxyFromInstAPI(iDC.ShotoverProxy), +func (cs *CassandraDataCentre) shotoverProxyFromInstAPI(instModels []*models.ShotoverProxy) { + cs.ShotoverProxy = make([]ShotoverProxySpec, 0, len(instModels)) + for _, instModel := range instModels { + cs.ShotoverProxy = append(cs.ShotoverProxy, ShotoverProxySpec{ + NodeSize: instModel.NodeSize, }) } - return } -func (cs *CassandraSpec) DCsToInstAPI() (iDCs []*models.CassandraDataCentre) { +func (cs *CassandraSpec) DCsToInstAPI() (instaModels []*models.CassandraDataCentre) { for _, dc := range cs.DataCentres { - iDCs = append(iDCs, dc.ToInstAPI()) + instaModels = append(instaModels, dc.ToInstAPI()) } return } func (cs *CassandraSpec) ToInstAPI() *models.CassandraCluster { return &models.CassandraCluster{ - Name: cs.Name, - CassandraVersion: cs.Version, - LuceneEnabled: cs.LuceneEnabled, - PasswordAndUserAuth: cs.PasswordAndUserAuth, - DataCentres: cs.DCsToInstAPI(), - SLATier: cs.SLATier, - PrivateNetworkCluster: cs.PrivateNetworkCluster, - PCIComplianceMode: cs.PCICompliance, - TwoFactorDelete: cs.TwoFactorDeletesToInstAPI(), - BundledUseOnly: cs.BundledUseOnly, - Description: cs.Description, - ResizeSettings: resizeSettingsToInstAPI(cs.ResizeSettings), + GenericClusterFields: cs.GenericClusterSpec.ToInstAPI(), + CassandraVersion: cs.Version, + LuceneEnabled: cs.LuceneEnabled, + PasswordAndUserAuth: cs.PasswordAndUserAuth, + BundledUseOnly: cs.BundledUseOnly, + DataCentres: cs.DCsToInstAPI(), + ResizeSettings: cs.ResizeSettings.ToInstAPI(), } } @@ -342,18 +438,10 @@ func (c *Cassandra) RestoreInfoToInstAPI(restoreData *CassandraRestoreFrom) any return iRestore } -func (cs *CassandraSpec) IsEqual(spec CassandraSpec) bool { - return cs.Cluster.IsEqual(spec.Cluster) && - cs.AreDCsEqual(spec.DataCentres) && - cs.LuceneEnabled == spec.LuceneEnabled && - cs.PasswordAndUserAuth == spec.PasswordAndUserAuth && - cs.BundledUseOnly == spec.BundledUseOnly -} - func (c *Cassandra) GetSpec() CassandraSpec { return c.Spec } func (c *Cassandra) IsSpecEqual(spec CassandraSpec) bool { - return c.Spec.IsEqual(spec) + return c.Spec.IsEqual(&spec) } func (cs *CassandraSpec) AreDCsEqual(dcs []*CassandraDataCentre) bool { @@ -361,21 +449,18 @@ func (cs *CassandraSpec) AreDCsEqual(dcs []*CassandraDataCentre) bool { return false } - for i, iDC := range dcs { - dataCentre := cs.DataCentres[i] + k8sDCs := map[string]*CassandraDataCentre{} + for _, dc := range cs.DataCentres { + k8sDCs[dc.Name] = dc + } - if iDC.Name != dataCentre.Name { - continue + for _, instDC := range dcs { + k8sDC, ok := k8sDCs[instDC.Name] + if !ok { + return false } - if !dataCentre.IsEqual(iDC.DataCentre) || - iDC.ClientToClusterEncryption != dataCentre.ClientToClusterEncryption || - iDC.PrivateIPBroadcastForDiscovery != dataCentre.PrivateIPBroadcastForDiscovery || - iDC.PrivateLink != dataCentre.PrivateLink || - iDC.ContinuousBackup != dataCentre.ContinuousBackup || - iDC.ReplicationFactor != dataCentre.ReplicationFactor || - !dataCentre.DebeziumEquals(iDC) || - !dataCentre.ShotoverProxyEquals(iDC) { + if !k8sDC.Equals(instDC) { return false } } @@ -383,33 +468,30 @@ func (cs *CassandraSpec) AreDCsEqual(dcs []*CassandraDataCentre) bool { return true } -func (cs *CassandraStatus) FromInstAPI(iCass *models.CassandraCluster) CassandraStatus { - return CassandraStatus{ - ClusterStatus: ClusterStatus{ - ID: iCass.ID, - State: iCass.Status, - DataCentres: cs.DCsFromInstAPI(iCass.DataCentres), - CurrentClusterOperationStatus: iCass.CurrentClusterOperationStatus, - MaintenanceEvents: cs.MaintenanceEvents, - }, - } +func (cs *CassandraStatus) FromInstAPI(instModel *models.CassandraCluster) { + cs.GenericStatus.FromInstAPI(&instModel.GenericClusterFields) + cs.dcsFromInstAPI(instModel.DataCentres) } -func (cs *CassandraStatus) DCsFromInstAPI(iDCs []*models.CassandraDataCentre) (dcs []*DataCentreStatus) { - for _, iDC := range iDCs { - dcs = append(dcs, cs.ClusterStatus.DCFromInstAPI(iDC.DataCentre)) +func (cs *CassandraStatus) dcsFromInstAPI(instModels []*models.CassandraDataCentre) { + cs.DataCentres = make([]*CassandraDataCentreStatus, len(instModels)) + for i, instModel := range instModels { + dc := &CassandraDataCentreStatus{} + dc.FromInstAPI(instModel) + cs.DataCentres[i] = dc } - return } func (cdc *CassandraDataCentre) ToInstAPI() *models.CassandraDataCentre { return &models.CassandraDataCentre{ - DataCentre: cdc.DataCentre.ToInstAPI(), + GenericDataCentreFields: cdc.GenericDataCentreSpec.ToInstAPI(), ClientToClusterEncryption: cdc.ClientToClusterEncryption, PrivateLink: cdc.PrivateLink, ContinuousBackup: cdc.ContinuousBackup, PrivateIPBroadcastForDiscovery: cdc.PrivateIPBroadcastForDiscovery, ReplicationFactor: cdc.ReplicationFactor, + NodeSize: cdc.NodeSize, + NumberOfNodes: cdc.NodesNumber, Debezium: cdc.DebeziumToInstAPI(), ShotoverProxy: cdc.ShotoverProxyToInstaAPI(), } @@ -457,7 +539,7 @@ func init() { func (c *Cassandra) GetExposePorts() []k8scorev1.ServicePort { var exposePorts []k8scorev1.ServicePort - if !c.Spec.PrivateNetworkCluster { + if !c.Spec.PrivateNetwork { exposePorts = []k8scorev1.ServicePort{ { Name: models.CassandraInterNode, diff --git a/apis/clusters/v1beta1/cassandra_webhook.go b/apis/clusters/v1beta1/cassandra_webhook.go index 2673ad9e7..c0b60a4f0 100644 --- a/apis/clusters/v1beta1/cassandra_webhook.go +++ b/apis/clusters/v1beta1/cassandra_webhook.go @@ -68,10 +68,6 @@ func (c *Cassandra) Default() { models.ResourceStateAnnotation: "", }) } - - for _, dataCentre := range c.Spec.DataCentres { - dataCentre.SetDefaultValues() - } } // ValidateCreate implements webhook.Validator so a webhook will be registered for the type @@ -91,7 +87,7 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob } } - err := c.Spec.Cluster.ValidateCreation() + err := c.Spec.GenericClusterSpec.ValidateCreation() if err != nil { return err } @@ -110,7 +106,7 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob if err != nil { return err } - if c.Spec.PrivateNetworkCluster { + if c.Spec.PrivateNetwork { err = c.Spec.OnPremisesSpec.ValidateSSHGatewayCreation() if err != nil { return err @@ -140,18 +136,18 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob for _, dc := range c.Spec.DataCentres { if c.Spec.OnPremisesSpec != nil { - err = dc.DataCentre.ValidateOnPremisesCreation() + err = dc.GenericDataCentreSpec.ValidateOnPremisesCreation() if err != nil { return err } } else { - err = dc.DataCentre.ValidateCreation() + err = dc.GenericDataCentreSpec.validateCreation() if err != nil { return err } } - if !c.Spec.PrivateNetworkCluster && dc.PrivateIPBroadcastForDiscovery { + if !c.Spec.PrivateNetwork && dc.PrivateIPBroadcastForDiscovery { return fmt.Errorf("cannot use private ip broadcast for discovery on public network cluster") } @@ -182,6 +178,10 @@ func (cv *cassandraValidator) ValidateUpdate(ctx context.Context, old runtime.Ob cassandralog.Info("validate update", "name", c.Name) + if c.Annotations[models.ResourceStateAnnotation] == models.CreatingEvent { + return nil + } + // skip validation when we receive cluster specification update from the Instaclustr Console. if c.Annotations[models.ExternalChangesAnnotation] == models.True { return nil @@ -259,7 +259,7 @@ func (cs *CassandraSpec) newImmutableFields() *immutableCassandraFields { LuceneEnabled: cs.LuceneEnabled, PasswordAndUserAuth: cs.PasswordAndUserAuth, }, - immutableCluster: cs.Cluster.newImmutableFields(), + immutableCluster: cs.GenericClusterSpec.immutableFields(), } } @@ -307,11 +307,11 @@ func (cs *CassandraSpec) validateDataCentresUpdate(oldSpec CassandraSpec) error return fmt.Errorf("cannot change datacentre name: %v", newDC.Name) } - if err := newDC.ValidateCreation(); err != nil { + if err := newDC.validateCreation(); err != nil { return err } - if !cs.PrivateNetworkCluster && newDC.PrivateIPBroadcastForDiscovery { + if !cs.PrivateNetwork && newDC.PrivateIPBroadcastForDiscovery { return fmt.Errorf("cannot use private ip broadcast for discovery on public network cluster") } diff --git a/apis/clusters/v1beta1/structs.go b/apis/clusters/v1beta1/structs.go index bec014884..8ae968eee 100644 --- a/apis/clusters/v1beta1/structs.go +++ b/apis/clusters/v1beta1/structs.go @@ -761,3 +761,39 @@ func (old References) Diff(new References) (added, deleted References) { return added, deleted } + +type GenericResizeSettings []*ResizeSettings + +func (g *GenericResizeSettings) FromInstAPI(instModels []*models.ResizeSettings) { + *g = make(GenericResizeSettings, 0, len(instModels)) + for _, instModel := range instModels { + *g = append(*g, &ResizeSettings{ + NotifySupportContacts: instModel.NotifySupportContacts, + Concurrency: instModel.Concurrency, + }) + } +} + +func (g *GenericResizeSettings) ToInstAPI() []*models.ResizeSettings { + instaModels := make([]*models.ResizeSettings, 0, len(*g)) + for _, setting := range *g { + instaModels = append(instaModels, &models.ResizeSettings{ + NotifySupportContacts: setting.NotifySupportContacts, + Concurrency: setting.Concurrency, + }) + } + + return instaModels +} + +func (g GenericResizeSettings) Equal(o GenericResizeSettings) bool { + if len(g) != len(o) { + return false + } + + if len(g) > 0 { + return *g[0] == *o[0] + } + + return true +} diff --git a/apis/clusters/v1beta1/validation.go b/apis/clusters/v1beta1/validation.go index fa6461e51..ca916133e 100644 --- a/apis/clusters/v1beta1/validation.go +++ b/apis/clusters/v1beta1/validation.go @@ -392,3 +392,31 @@ func (s *GenericDataCentreSpec) validateCreation() error { return nil } + +func (s *GenericDataCentreSpec) ValidateOnPremisesCreation() error { + if s.CloudProvider != models.ONPREMISES { + return fmt.Errorf("cloud provider %s is unavailable for data centre: %s, available value: %s", + s.CloudProvider, s.Name, models.ONPREMISES) + } + + if s.Region != models.CLIENTDC { + return fmt.Errorf("region %s is unavailable for data centre: %s, available value: %s", + s.Region, s.Name, models.CLIENTDC) + } + + return nil +} + +func (s *GenericDataCentreSpec) validateImmutableCloudProviderSettingsUpdate(oldSettings []*CloudProviderSettings) error { + if len(oldSettings) != len(s.CloudProviderSettings) { + return models.ErrImmutableCloudProviderSettings + } + + for i, newProviderSettings := range s.CloudProviderSettings { + if *newProviderSettings != *oldSettings[i] { + return models.ErrImmutableCloudProviderSettings + } + } + + return nil +} diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index f0928bf79..0ce0a9496 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -384,7 +384,7 @@ func (in *Cassandra) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraDataCentre) DeepCopyInto(out *CassandraDataCentre) { *out = *in - in.DataCentre.DeepCopyInto(&out.DataCentre) + in.GenericDataCentreSpec.DeepCopyInto(&out.GenericDataCentreSpec) if in.Debezium != nil { in, out := &in.Debezium, &out.Debezium *out = make([]DebeziumCassandraSpec, len(*in)) @@ -409,6 +409,33 @@ func (in *CassandraDataCentre) DeepCopy() *CassandraDataCentre { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CassandraDataCentreStatus) DeepCopyInto(out *CassandraDataCentreStatus) { + *out = *in + in.GenericDataCentreStatus.DeepCopyInto(&out.GenericDataCentreStatus) + if in.Nodes != nil { + in, out := &in.Nodes, &out.Nodes + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Node) + (*in).DeepCopyInto(*out) + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CassandraDataCentreStatus. +func (in *CassandraDataCentreStatus) DeepCopy() *CassandraDataCentreStatus { + if in == nil { + return nil + } + out := new(CassandraDataCentreStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraList) DeepCopyInto(out *CassandraList) { *out = *in @@ -470,6 +497,7 @@ func (in *CassandraRestoreFrom) DeepCopy() *CassandraRestoreFrom { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraSpec) DeepCopyInto(out *CassandraSpec) { *out = *in + in.GenericClusterSpec.DeepCopyInto(&out.GenericClusterSpec) if in.RestoreFrom != nil { in, out := &in.RestoreFrom, &out.RestoreFrom *out = new(CassandraRestoreFrom) @@ -480,7 +508,6 @@ func (in *CassandraSpec) DeepCopyInto(out *CassandraSpec) { *out = new(OnPremisesSpec) (*in).DeepCopyInto(*out) } - in.Cluster.DeepCopyInto(&out.Cluster) if in.DataCentres != nil { in, out := &in.DataCentres, &out.DataCentres *out = make([]*CassandraDataCentre, len(*in)) @@ -505,7 +532,7 @@ func (in *CassandraSpec) DeepCopyInto(out *CassandraSpec) { } if in.ResizeSettings != nil { in, out := &in.ResizeSettings, &out.ResizeSettings - *out = make([]*ResizeSettings, len(*in)) + *out = make(GenericResizeSettings, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] @@ -529,7 +556,18 @@ func (in *CassandraSpec) DeepCopy() *CassandraSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraStatus) DeepCopyInto(out *CassandraStatus) { *out = *in - in.ClusterStatus.DeepCopyInto(&out.ClusterStatus) + in.GenericStatus.DeepCopyInto(&out.GenericStatus) + if in.DataCentres != nil { + in, out := &in.DataCentres, &out.DataCentres + *out = make([]*CassandraDataCentreStatus, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(CassandraDataCentreStatus) + (*in).DeepCopyInto(*out) + } + } + } if in.AvailableUsers != nil { in, out := &in.AvailableUsers, &out.AvailableUsers *out = make(References, len(*in)) @@ -978,6 +1016,31 @@ func (in *GenericDataCentreStatus) DeepCopy() *GenericDataCentreStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in GenericResizeSettings) DeepCopyInto(out *GenericResizeSettings) { + { + in := &in + *out = make(GenericResizeSettings, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(ResizeSettings) + **out = **in + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GenericResizeSettings. +func (in GenericResizeSettings) DeepCopy() GenericResizeSettings { + if in == nil { + return nil + } + out := new(GenericResizeSettings) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GenericStatus) DeepCopyInto(out *GenericStatus) { *out = *in diff --git a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml index e06ce61ea..7818cfc5b 100644 --- a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml +++ b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml @@ -207,7 +207,7 @@ spec: provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch and Redis. type: boolean - privateNetworkCluster: + privateNetwork: type: boolean resizeSettings: items: @@ -319,15 +319,11 @@ spec: - namespace type: object type: array - cdcid: - type: string currentClusterOperationStatus: type: string dataCentres: items: properties: - encryptionKeyId: - type: string id: type: string name: @@ -353,21 +349,6 @@ spec: type: string type: object type: array - nodesNumber: - type: integer - privateLink: - items: - properties: - advertisedHostname: - type: string - endPointServiceId: - type: string - endPointServiceName: - type: string - required: - - advertisedHostname - type: object - type: array resizeOperations: items: properties: @@ -424,6 +405,8 @@ spec: type: array status: type: string + required: + - nodes type: object type: array id: @@ -514,19 +497,8 @@ spec: type: array type: object type: array - options: - properties: - dataNodeSize: - type: string - masterNodeSize: - type: string - openSearchDashboardsNodeSize: - type: string - type: object state: type: string - twoFactorDeleteEnabled: - type: boolean type: object type: object served: true diff --git a/config/samples/clusters_v1beta1_cassandra.yaml b/config/samples/clusters_v1beta1_cassandra.yaml index bf3759475..1f3797102 100644 --- a/config/samples/clusters_v1beta1_cassandra.yaml +++ b/config/samples/clusters_v1beta1_cassandra.yaml @@ -5,7 +5,7 @@ metadata: spec: name: "example-cassandra" #(immutable) version: "4.0.10" #(immutable) - privateNetworkCluster: true #(immutable) + privateNetwork: false #(immutable) dataCentres: - name: "AWS_cassandra" #(mutable) region: "US_EAST_1" #(immutable) @@ -20,7 +20,7 @@ spec: # version: "2.0.1" #(immutable) cloudProvider: "AWS_VPC" #(immutable) continuousBackup: false #(immutable) - nodesNumber: 4 #(mutable) + nodesNumber: 2 #(mutable) replicationFactor: 2 privateIpBroadcastForDiscovery: false #(immutable) network: "172.16.0.0/19" #(immutable) @@ -29,12 +29,12 @@ spec: clientToClusterEncryption: true #(immutable) # cloudProviderSettings: # - customVirtualNetworkId: "vpc-0b69c781969e980a9" -# nodeSize: "CAS-DEV-t4g.small-5" #(mutable) + nodeSize: "CAS-DEV-t4g.small-5" #(mutable) # nodeSize: "CAS-PRD-r6g.medium-80" #(mutable) # (production node size) - nodeSize: "CAS-PRD-r6g.medium-120" #(mutable) # (production node size) - privateLink: true - shotoverProxy: - - nodeSize: "CSO-PRD-c6g.xlarge-20" +# nodeSize: "CAS-PRD-r6g.medium-120" #(mutable) # (production node size) + privateLink: false +# shotoverProxy: +# - nodeSize: "CSO-PRD-c6g.xlarge-20" # accountName: "InstaclustrRIYOA" # - name: "AWS_cassandra2" # region: "US_EAST_1" diff --git a/config/samples/clusters_v1beta1_opensearch.yaml b/config/samples/clusters_v1beta1_opensearch.yaml index 08e961268..7c49351a1 100644 --- a/config/samples/clusters_v1beta1_opensearch.yaml +++ b/config/samples/clusters_v1beta1_opensearch.yaml @@ -11,7 +11,7 @@ metadata: annotations: test.annotation/first: testAnnotation spec: - name: bohdan-test + name: opensearch-test alertingPlugin: false anomalyDetectionPlugin: false asynchronousSearchPlugin: false diff --git a/controllers/clusters/cadence_controller.go b/controllers/clusters/cadence_controller.go index a443e1813..42a3a1473 100644 --- a/controllers/clusters/cadence_controller.go +++ b/controllers/clusters/cadence_controller.go @@ -748,7 +748,7 @@ func (r *CadenceReconciler) newCassandraSpec(c *v1beta1.Cadence, latestCassandra } slaTier := c.Spec.SLATier - privateClusterNetwork := c.Spec.PrivateNetworkCluster + privateNetwork := c.Spec.PrivateNetworkCluster pciCompliance := c.Spec.PCICompliance var twoFactorDelete []*v1beta1.TwoFactorDelete @@ -789,27 +789,27 @@ func (r *CadenceReconciler) newCassandraSpec(c *v1beta1.Cadence, latestCassandra cassandraDataCentres := []*v1beta1.CassandraDataCentre{ { - DataCentre: v1beta1.DataCentre{ + GenericDataCentreSpec: v1beta1.GenericDataCentreSpec{ Name: dcName, Region: dcRegion, CloudProvider: cloudProvider, ProviderAccountName: providerAccountName, - NodeSize: cassNodeSize, - NodesNumber: cassNodesNumber, Network: network, }, + NodeSize: cassNodeSize, + NodesNumber: cassNodesNumber, ReplicationFactor: cassReplicationFactor, PrivateIPBroadcastForDiscovery: cassPrivateIPBroadcastForDiscovery, }, } spec := v1beta1.CassandraSpec{ - Cluster: v1beta1.Cluster{ - Name: models.CassandraChildPrefix + c.Name, - Version: latestCassandraVersion, - SLATier: slaTier, - PrivateNetworkCluster: privateClusterNetwork, - TwoFactorDelete: twoFactorDelete, - PCICompliance: pciCompliance, + GenericClusterSpec: v1beta1.GenericClusterSpec{ + Name: models.CassandraChildPrefix + c.Name, + Version: latestCassandraVersion, + SLATier: slaTier, + PrivateNetwork: privateNetwork, + TwoFactorDelete: twoFactorDelete, + PCICompliance: pciCompliance, }, DataCentres: cassandraDataCentres, PasswordAndUserAuth: cassPasswordAndUserAuth, diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index 71a65db23..cb85e92b5 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -18,7 +18,9 @@ package clusters import ( "context" + "encoding/json" "errors" + "fmt" "strconv" "github.com/go-logr/logr" @@ -115,233 +117,154 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } } -func (r *CassandraReconciler) handleCreateCluster( - ctx context.Context, - l logr.Logger, - c *v1beta1.Cassandra, -) (reconcile.Result, error) { - l = l.WithName("Cassandra creation event") - var err error - patch := c.NewPatch() - if c.Status.ID == "" { - var id string - if c.Spec.HasRestore() { - l.Info( - "Creating cluster from backup", - "original cluster ID", c.Spec.RestoreFrom.ClusterID, - ) +func (r *CassandraReconciler) createCassandraFromRestore(c *v1beta1.Cassandra, l logr.Logger) (*models.CassandraCluster, error) { + l.Info( + "Creating cluster from backup", + "original cluster ID", c.Spec.RestoreFrom.ClusterID, + ) - id, err = r.API.RestoreCluster(c.RestoreInfoToInstAPI(c.Spec.RestoreFrom), models.CassandraAppKind) - if err != nil { - l.Error(err, "Cannot restore cluster from backup", - "original cluster ID", c.Spec.RestoreFrom.ClusterID, - ) + id, err := r.API.RestoreCluster(c.RestoreInfoToInstAPI(c.Spec.RestoreFrom), models.CassandraAppKind) + if err != nil { + l.Error(err, "Cannot restore cluster from backup", + "original cluster ID", c.Spec.RestoreFrom.ClusterID, + ) - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Cluster restore from backup on the Instaclustr is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf( + c, models.Warning, models.CreationFailed, + "Cluster restore from backup on the Instaclustr is failed. Reason: %v", + err, + ) - return reconcile.Result{}, err - } + return nil, err + } - r.EventRecorder.Eventf( - c, models.Normal, models.Created, - "Cluster restore request is sent. Original cluster ID: %s, new cluster ID: %s", - c.Spec.RestoreFrom.ClusterID, - id, - ) - } else { - l.Info( - "Creating cluster", - "cluster name", c.Spec.Name, - "data centres", c.Spec.DataCentres, - ) - iCassandraSpec := c.Spec.ToInstAPI() - for i, dc := range c.Spec.DataCentres { - for j, debezium := range dc.Debezium { - if debezium.ClusterRef != nil { - cdcID, err := clusterresources.GetDataCentreID(r.Client, ctx, debezium.ClusterRef) - if err != nil { - l.Error(err, "Cannot get cluster ID", - "Cluster reference", debezium.ClusterRef, - ) - return ctrl.Result{}, err - } - iCassandraSpec.DataCentres[i].Debezium[j].KafkaDataCentreID = cdcID - } - } - } + instaModel, err := r.API.GetCassandra(id) + if err != nil { + return nil, fmt.Errorf("failed to get cassandra cluster details, err: %w", err) + } - id, err = r.API.CreateCluster(instaclustr.CassandraEndpoint, iCassandraSpec) - if err != nil { - l.Error( - err, "Cannot create cluster", - "cluster spec", c.Spec, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Cluster creation on the Instaclustr is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + r.EventRecorder.Eventf( + c, models.Normal, models.Created, + "Cluster restore request is sent. Original cluster ID: %s, new cluster ID: %s", + c.Spec.RestoreFrom.ClusterID, + instaModel.ID, + ) - r.EventRecorder.Eventf( - c, models.Normal, models.Created, - "Cluster creation request is sent. Cluster ID: %s", - id, - ) - } + return instaModel, nil +} - c.Status.ID = id - err = r.Status().Patch(ctx, c, patch) - if err != nil { - l.Error(err, "Cannot patch cluster status", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - "kind", c.Kind, - "api Version", c.APIVersion, - "namespace", c.Namespace, - "cluster metadata", c.ObjectMeta, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.PatchFailed, - "Cluster resource status patch is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } +func (r *CassandraReconciler) createCassandra(ctx context.Context, c *v1beta1.Cassandra, l logr.Logger) (*models.CassandraCluster, error) { + l.Info( + "Creating cluster", + "cluster name", c.Spec.Name, + "data centres", c.Spec.DataCentres, + ) - controllerutil.AddFinalizer(c, models.DeletionFinalizer) - c.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent - err = r.Patch(ctx, c, patch) - if err != nil { - l.Error(err, "Cannot patch cluster", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - "kind", c.Kind, - "api Version", c.APIVersion, - "namespace", c.Namespace, - "cluster metadata", c.ObjectMeta, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.PatchFailed, - "Cluster resource patch is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err + iCassandraSpec := c.Spec.ToInstAPI() + for i, dc := range c.Spec.DataCentres { + for j, debezium := range dc.Debezium { + if debezium.ClusterRef != nil { + cdcID, err := clusterresources.GetDataCentreID(r.Client, ctx, debezium.ClusterRef) + if err != nil { + l.Error(err, "Cannot get cluster ID", + "Cluster reference", debezium.ClusterRef, + ) + return nil, err + } + iCassandraSpec.DataCentres[i].Debezium[j].KafkaDataCentreID = cdcID + } } + } - l.Info( - "Cluster has been created", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - "kind", c.Kind, - "api Version", c.APIVersion, - "namespace", c.Namespace, + b, err := r.API.CreateClusterRaw(instaclustr.CassandraEndpoint, iCassandraSpec) + if err != nil { + l.Error( + err, "Cannot create cluster", + "cluster spec", c.Spec, + ) + r.EventRecorder.Eventf( + c, models.Warning, models.CreationFailed, + "Cluster creation on the Instaclustr is failed. Reason: %v", + err, ) + return nil, err } - if c.Status.State != models.DeletedStatus { - err = r.startClusterStatusJob(c) - if err != nil { - l.Error(err, "Cannot start cluster status job", - "c cluster ID", c.Status.ID) + var instModel models.CassandraCluster + err = json.Unmarshal(b, &instModel) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal json to model, err: %w", err) + } - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + r.EventRecorder.Eventf( + c, models.Normal, models.Created, + "Cluster creation request is sent. Cluster ID: %s", + instModel.ID, + ) - r.EventRecorder.Eventf( - c, models.Normal, models.Created, - "Cluster status check job is started", - ) + return &instModel, nil +} + +func (r *CassandraReconciler) createCluster(ctx context.Context, c *v1beta1.Cassandra, l logr.Logger) error { + var instModel *models.CassandraCluster + var err error + + if c.Spec.HasRestore() { + instModel, err = r.createCassandraFromRestore(c, l) + } else { + instModel, err = r.createCassandra(ctx, c, l) + } + if err != nil { + return fmt.Errorf("failed to create cluster, err: %w", err) } - if c.Spec.OnPremisesSpec != nil && c.Spec.OnPremisesSpec.EnableAutomation { - iData, err := r.API.GetCassandra(c.Status.ID) - if err != nil { - l.Error(err, "Cannot get cluster from the Instaclustr API", - "cluster name", c.Spec.Name, - "data centres", c.Spec.DataCentres, - "cluster ID", c.Status.ID, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.FetchFailed, - "Cluster fetch from the Instaclustr API is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - iCassandra, err := c.FromInstAPI(iData) - if err != nil { - l.Error( - err, "Cannot convert cluster from the Instaclustr API", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.ConversionFailed, - "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + c.Spec.FromInstAPI(instModel) + err = r.Update(ctx, c) + if err != nil { + return fmt.Errorf("failed to update cassandra spec, err: %w", err) + } - bootstrap := newOnPremisesBootstrap( - r.Client, - c, - r.EventRecorder, - iCassandra.Status.ClusterStatus, - c.Spec.OnPremisesSpec, - newExposePorts(c.GetExposePorts()), - c.GetHeadlessPorts(), - c.Spec.PrivateNetworkCluster, - ) + c.Status.FromInstAPI(instModel) + err = r.Status().Update(ctx, c) + if err != nil { + return fmt.Errorf("failed to update cassandra status, err: %w", err) + } - err = handleCreateOnPremisesClusterResources(ctx, bootstrap) - if err != nil { - l.Error( - err, "Cannot create resources for on-premises cluster", - "cluster spec", c.Spec.OnPremisesSpec, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Resources creation for on-premises cluster is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + controllerutil.AddFinalizer(c, models.DeletionFinalizer) + c.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent + err = r.Update(ctx, c) + if err != nil { + return err + } - err = r.startClusterOnPremisesIPsJob(c, bootstrap) - if err != nil { - l.Error(err, "Cannot start on-premises cluster IPs check job", - "cluster ID", c.Status.ID, - ) + l.Info( + "Cluster has been created", + "cluster name", c.Spec.Name, + "cluster ID", c.Status.ID, + ) - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "On-premises cluster IPs check job is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + return nil +} - l.Info( - "On-premises resources have been created", - "cluster name", c.Spec.Name, - "on-premises Spec", c.Spec.OnPremisesSpec, - "cluster ID", c.Status.ID, +func (r *CassandraReconciler) startClusterJobs(c *v1beta1.Cassandra, l logr.Logger) error { + err := r.startSyncJob(c) + if err != nil { + l.Error(err, "Cannot start cluster status job", + "c cluster ID", c.Status.ID) + + r.EventRecorder.Eventf( + c, models.Warning, models.CreationFailed, + "Cluster status check job is failed. Reason: %v", + err, ) - return models.ExitReconcile, nil + return err } + r.EventRecorder.Eventf( + c, models.Normal, models.Created, + "Cluster status check job is started", + ) + err = r.startClusterBackupsJob(c) if err != nil { l.Error(err, "Cannot start cluster backups check job", @@ -353,7 +276,7 @@ func (r *CassandraReconciler) handleCreateCluster( "Cluster backups check job is failed. Reason: %v", err, ) - return reconcile.Result{}, err + return err } r.EventRecorder.Eventf( @@ -367,31 +290,24 @@ func (r *CassandraReconciler) handleCreateCluster( l.Error(err, "Failed to start user creation job") r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, "User creation job is failed. Reason: %v", err) - return reconcile.Result{}, err + return err } r.EventRecorder.Event(c, models.Normal, models.Created, "Cluster user creation job is started") } - return models.ExitReconcile, nil + return nil } -func (r *CassandraReconciler) handleUpdateCluster( - ctx context.Context, - l logr.Logger, - req ctrl.Request, - c *v1beta1.Cassandra, -) (reconcile.Result, error) { - l = l.WithName("Cassandra update event") - - iData, err := r.API.GetCassandra(c.Status.ID) +func (r *CassandraReconciler) handleOnPremises(c *v1beta1.Cassandra, l logr.Logger) (reconcile.Result, error) { + instaModel, err := r.API.GetCassandra(c.Status.ID) if err != nil { l.Error(err, "Cannot get cluster from the Instaclustr API", "cluster name", c.Spec.Name, + "data centres", c.Spec.DataCentres, "cluster ID", c.Status.ID, ) - r.EventRecorder.Eventf( c, models.Warning, models.FetchFailed, "Cluster fetch from the Instaclustr API is failed. Reason: %v", @@ -400,30 +316,120 @@ func (r *CassandraReconciler) handleUpdateCluster( return reconcile.Result{}, err } - iCassandra, err := c.FromInstAPI(iData) + iCassandra := v1beta1.Cassandra{} + iCassandra.FromInstAPI(instaModel) + + bootstrap := newOnPremisesBootstrap( + r.Client, + c, + r.EventRecorder, + iCassandra.Status.ToOnPremises(), + c.Spec.OnPremisesSpec, + newExposePorts(c.GetExposePorts()), + c.GetHeadlessPorts(), + c.Spec.PrivateNetwork, + ) + + err = handleCreateOnPremisesClusterResources(context.Background(), bootstrap) if err != nil { l.Error( - err, "Cannot convert cluster from the Instaclustr API", + err, "Cannot create resources for on-premises cluster", + "cluster spec", c.Spec.OnPremisesSpec, + ) + r.EventRecorder.Eventf( + c, models.Warning, models.CreationFailed, + "Resources creation for on-premises cluster is failed. Reason: %v", + err, + ) + return reconcile.Result{}, err + } + + err = r.startClusterOnPremisesIPsJob(c, bootstrap) + if err != nil { + l.Error(err, "Cannot start on-premises cluster IPs check job", + "cluster ID", c.Status.ID, + ) + + r.EventRecorder.Eventf( + c, models.Warning, models.CreationFailed, + "On-premises cluster IPs check job is failed. Reason: %v", + err, + ) + return reconcile.Result{}, err + } + + l.Info( + "On-premises resources have been created", + "cluster name", c.Spec.Name, + "on-premises Spec", c.Spec.OnPremisesSpec, + "cluster ID", c.Status.ID, + ) + + return models.ExitReconcile, nil +} + +func (r *CassandraReconciler) handleCreateCluster( + ctx context.Context, + l logr.Logger, + c *v1beta1.Cassandra, +) (reconcile.Result, error) { + l = l.WithName("Cassandra creation event") + if c.Status.ID == "" { + err := r.createCluster(ctx, c, l) + if err != nil { + r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, "Failed to create cluster. Reason: %v", err) + return reconcile.Result{}, err + } + } + + if c.Status.State != models.DeletedStatus { + err := r.startClusterJobs(c, l) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to start cluster jobs, err: %w", err) + } + + if c.Spec.OnPremisesSpec != nil && c.Spec.OnPremisesSpec.EnableAutomation { + return r.handleOnPremises(c, l) + } + } + + return models.ExitReconcile, nil +} + +func (r *CassandraReconciler) handleUpdateCluster( + ctx context.Context, + l logr.Logger, + req ctrl.Request, + c *v1beta1.Cassandra, +) (reconcile.Result, error) { + l = l.WithName("Cassandra update event") + + instaModel, err := r.API.GetCassandra(c.Status.ID) + if err != nil { + l.Error(err, "Cannot get cluster from the Instaclustr API", "cluster name", c.Spec.Name, "cluster ID", c.Status.ID, ) r.EventRecorder.Eventf( - c, models.Warning, models.ConversionFailed, - "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", + c, models.Warning, models.FetchFailed, + "Cluster fetch from the Instaclustr API is failed. Reason: %v", err, ) return reconcile.Result{}, err } + iCassandra := v1beta1.Cassandra{} + iCassandra.FromInstAPI(instaModel) + if c.Annotations[models.ExternalChangesAnnotation] == models.True || r.RateLimiter.NumRequeues(req) == rlimiter.DefaultMaxTries { - return handleExternalChanges[v1beta1.CassandraSpec](r.EventRecorder, r.Client, c, iCassandra, l) + return handleExternalChanges[v1beta1.CassandraSpec](r.EventRecorder, r.Client, c, &iCassandra, l) } patch := c.NewPatch() - if c.Spec.ClusterSettingsNeedUpdate(iCassandra.Spec.Cluster) { + if c.Spec.ClusterSettingsNeedUpdate(&iCassandra.Spec.GenericClusterSpec) { l.Info("Updating cluster settings", "instaclustr description", iCassandra.Spec.Description, "instaclustr two factor delete", iCassandra.Spec.TwoFactorDelete) @@ -664,7 +670,7 @@ func (r *CassandraReconciler) handleDeleteCluster( return models.ExitReconcile, nil } -func (r *CassandraReconciler) startClusterStatusJob(c *v1beta1.Cassandra) error { +func (r *CassandraReconciler) startSyncJob(c *v1beta1.Cassandra) error { job := r.newWatchStatusJob(c) err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) @@ -722,7 +728,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. return nil } - iData, err := r.API.GetCassandra(c.Status.ID) + instaModel, err := r.API.GetCassandra(c.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { if c.DeletionTimestamp != nil { @@ -738,40 +744,30 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. return err } - iCassandra, err := c.FromInstAPI(iData) - if err != nil { - l.Error(err, "Cannot convert cluster from the Instaclustr API", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - ) - return err - } - - if !areStatusesEqual(&iCassandra.Status.ClusterStatus, &c.Status.ClusterStatus) { - l.Info("Updating cluster status", - "status from Instaclustr", iCassandra.Status.ClusterStatus, - "status from k8s", c.Status.ClusterStatus) + iCassandra := v1beta1.Cassandra{} + iCassandra.FromInstAPI(instaModel) - areDCsEqual := areDataCentresEqual(iCassandra.Status.ClusterStatus.DataCentres, c.Status.ClusterStatus.DataCentres) + if !c.Status.Equals(&iCassandra.Status) { + l.Info("Updating cluster status") patch := c.NewPatch() - c.Status.ClusterStatus = iCassandra.Status.ClusterStatus + c.Status.FromInstAPI(instaModel) err = r.Status().Patch(context.Background(), c, patch) if err != nil { return err } - if !areDCsEqual { + if !c.Status.DCEquals(iCassandra.Status.DataCentres) { var nodes []*v1beta1.Node - for _, dc := range iCassandra.Status.ClusterStatus.DataCentres { + for _, dc := range iCassandra.Status.DataCentres { nodes = append(nodes, dc.Nodes...) } err = exposeservice.Create(r.Client, c.Name, c.Namespace, - c.Spec.PrivateNetworkCluster, + c.Spec.PrivateNetwork, nodes, models.CassandraConnectionPort) if err != nil { @@ -780,7 +776,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. } } - equals := c.Spec.IsEqual(iCassandra.Spec) + equals := c.Spec.IsEqual(&iCassandra.Spec) if equals && c.Annotations[models.ExternalChangesAnnotation] == models.True { patch := c.NewPatch() @@ -1069,7 +1065,7 @@ func (r *CassandraReconciler) reconcileMaintenanceEvents(ctx context.Context, c return err } - if !c.Status.AreMaintenanceEventStatusesEqual(iMEStatuses) { + if !c.Status.MaintenanceEventsEqual(iMEStatuses) { patch := c.NewPatch() c.Status.MaintenanceEvents = iMEStatuses err = r.Status().Patch(ctx, c, patch) @@ -1134,6 +1130,10 @@ func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error { newObj := event.ObjectNew.(*v1beta1.Cassandra) + if newObj.Status.ID == "" && newObj.Annotations[models.ResourceStateAnnotation] == models.CreatingEvent { + return false + } + if newObj.Status.ID == "" { newObj.Annotations[models.ResourceStateAnnotation] = models.CreatingEvent return true diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index a13f737ab..27acd4645 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -351,7 +351,7 @@ func (c *Client) DeleteRedisUser(id string) error { return nil } -func (c *Client) GetCassandra(id string) ([]byte, error) { +func (c *Client) GetCassandra(id string) (*models.CassandraCluster, error) { url := c.serverHostname + CassandraEndpoint + id resp, err := c.DoRequest(url, http.MethodGet, nil) @@ -373,7 +373,13 @@ func (c *Client) GetCassandra(id string) ([]byte, error) { return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) } - return body, nil + var instModel models.CassandraCluster + err = json.Unmarshal(body, &instModel) + if err != nil { + return nil, err + } + + return &instModel, nil } func (c *Client) UpdateCassandra(id string, cassandra models.CassandraClusterAPIUpdate) error { diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index 23526d50f..8257d79b3 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -79,7 +79,7 @@ type API interface { GetKafkaACLStatus(kafkaACLID, kafkaACLEndpoint string) (*kafkamanagementv1beta1.KafkaACLStatus, error) DeleteKafkaACL(kafkaACLID, kafkaACLEndpoint string) error UpdateKafkaACL(kafkaACLID, kafkaACLEndpoint string, kafkaACLSpec any) error - GetCassandra(id string) ([]byte, error) + GetCassandra(id string) (*models.CassandraCluster, error) UpdateCassandra(id string, cassandra models.CassandraClusterAPIUpdate) error GetKafka(id string) ([]byte, error) GetKafkaConnect(id string) ([]byte, error) diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index 0082d757d..14d79d4de 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -294,7 +294,7 @@ func (c *mockClient) RestoreCluster(restoreData any, clusterKind string) (string panic("RestoreCluster: is not implemented") } -func (c *mockClient) GetCassandra(id string) ([]byte, error) { +func (c *mockClient) GetCassandra(id string) (*models.CassandraCluster, error) { panic("GetCassandra: is not implemented") } diff --git a/pkg/models/cassandra_apiv2.go b/pkg/models/cassandra_apiv2.go index 4cf029781..48a6f37ec 100644 --- a/pkg/models/cassandra_apiv2.go +++ b/pkg/models/cassandra_apiv2.go @@ -17,30 +17,31 @@ limitations under the License. package models type CassandraCluster struct { - ClusterStatus - CassandraVersion string `json:"cassandraVersion"` - LuceneEnabled bool `json:"luceneEnabled"` - PasswordAndUserAuth bool `json:"passwordAndUserAuth"` - DataCentres []*CassandraDataCentre `json:"dataCentres"` - Name string `json:"name"` - SLATier string `json:"slaTier"` - PrivateNetworkCluster bool `json:"privateNetworkCluster"` - PCIComplianceMode bool `json:"pciComplianceMode"` - TwoFactorDelete []*TwoFactorDelete `json:"twoFactorDelete,omitempty"` - BundledUseOnly bool `json:"bundledUseOnly,omitempty"` - ResizeSettings []*ResizeSettings `json:"resizeSettings"` - Description string `json:"description,omitempty"` + GenericClusterFields `json:",inline"` + + CassandraVersion string `json:"cassandraVersion"` + LuceneEnabled bool `json:"luceneEnabled"` + PasswordAndUserAuth bool `json:"passwordAndUserAuth"` + BundledUseOnly bool `json:"bundledUseOnly,omitempty"` + + DataCentres []*CassandraDataCentre `json:"dataCentres"` + ResizeSettings []*ResizeSettings `json:"resizeSettings"` } type CassandraDataCentre struct { - DataCentre `json:",inline"` - ReplicationFactor int `json:"replicationFactor"` - ContinuousBackup bool `json:"continuousBackup"` - PrivateLink bool `json:"privateLink,omitempty"` - PrivateIPBroadcastForDiscovery bool `json:"privateIpBroadcastForDiscovery"` - ClientToClusterEncryption bool `json:"clientToClusterEncryption"` - Debezium []*Debezium `json:"debezium,omitempty"` - ShotoverProxy []*ShotoverProxy `json:"shotoverProxy,omitempty"` + GenericDataCentreFields `json:",inline"` + + ContinuousBackup bool `json:"continuousBackup"` + PrivateLink bool `json:"privateLink,omitempty"` + PrivateIPBroadcastForDiscovery bool `json:"privateIpBroadcastForDiscovery"` + ClientToClusterEncryption bool `json:"clientToClusterEncryption"` + ReplicationFactor int `json:"replicationFactor"` + NumberOfNodes int `json:"numberOfNodes"` + NodeSize string `json:"nodeSize"` + + Nodes []*Node `json:"nodes,omitempty"` + Debezium []*Debezium `json:"debezium,omitempty"` + ShotoverProxy []*ShotoverProxy `json:"shotoverProxy,omitempty"` } type Debezium struct { diff --git a/pkg/utils/slices/slices.go b/pkg/utils/slices/slices.go new file mode 100644 index 000000000..35d486b74 --- /dev/null +++ b/pkg/utils/slices/slices.go @@ -0,0 +1,73 @@ +package slices + +func Equals[T comparable](s1, s2 []T) bool { + if len(s1) != len(s2) { + return false + } + + for i := range s1 { + if s1[i] != s2[i] { + return false + } + } + + return true +} + +func EqualsPtr[T comparable](s1, s2 []*T) bool { + if len(s1) != len(s2) { + return false + } + + for i := range s1 { + if *s1[i] != *s2[i] { + return false + } + } + + return true +} + +func EqualsUnordered[T comparable](s1, s2 []T) bool { + if len(s1) != len(s2) { + return false + } + + mapS1 := map[T]int{} + for _, el := range s1 { + mapS1[el]++ + } + + for _, el := range s2 { + count, ok := mapS1[el] + if !ok || count == 0 { + return false + } + + mapS1[el]-- + } + + return true +} + +func EqualsUnorderedPtr[T comparable](s1, s2 []*T) bool { + if len(s1) != len(s2) { + return false + } + + mapS1 := map[T]int{} + for _, el := range s1 { + mapS1[*el]++ + } + + for _, el := range s2 { + count, ok := mapS1[*el] + if !ok || count == 0 { + return false + } + + mapS1[*el]-- + } + + return true +} diff --git a/pkg/utils/slices/slices_test.go b/pkg/utils/slices/slices_test.go new file mode 100644 index 000000000..a95556eb9 --- /dev/null +++ b/pkg/utils/slices/slices_test.go @@ -0,0 +1,300 @@ +package slices + +import "testing" + +func TestEqualsUnordered(t *testing.T) { + type args[T comparable] struct { + s1 []T + s2 []T + } + type testCase[T comparable] struct { + name string + args args[T] + want bool + } + + tests := []testCase[int]{ + { + name: "empty", + args: args[int]{}, + want: true, + }, + { + name: "first is empty", + args: args[int]{ + s2: []int{1}, + }, + want: false, + }, + { + name: "second is empty", + args: args[int]{ + s1: []int{1}, + }, + want: false, + }, + { + name: "length differs", + args: args[int]{ + s1: []int{1}, + s2: []int{1, 1}, + }, + want: false, + }, + { + name: "different values", + args: args[int]{ + s1: []int{1}, + s2: []int{2}, + }, + want: false, + }, + { + name: "equals with the same order", + args: args[int]{ + s1: []int{1, 2, 3}, + s2: []int{1, 2, 3}, + }, + want: true, + }, + { + name: "equals but unordered", + args: args[int]{ + s1: []int{1, 2, 3}, + s2: []int{2, 3, 1}, + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := EqualsUnordered(tt.args.s1, tt.args.s2); got != tt.want { + t.Errorf("EqualsUnordered() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestEquals(t *testing.T) { + type args[T comparable] struct { + s1 []T + s2 []T + } + type testCase[T comparable] struct { + name string + args args[T] + want bool + } + + tests := []testCase[int]{ + { + name: "empty", + args: args[int]{}, + want: true, + }, + { + name: "first is empty", + args: args[int]{ + s2: []int{1}, + }, + want: false, + }, + { + name: "second is empty", + args: args[int]{ + s1: []int{1}, + }, + want: false, + }, + { + name: "length differs", + args: args[int]{ + s1: []int{1}, + s2: []int{1, 1}, + }, + want: false, + }, + { + name: "different values", + args: args[int]{ + s1: []int{1}, + s2: []int{2}, + }, + want: false, + }, + { + name: "equals with the same order", + args: args[int]{ + s1: []int{1, 2, 3}, + s2: []int{1, 2, 3}, + }, + want: true, + }, + { + name: "unordered", + args: args[int]{ + s1: []int{1, 2, 3}, + s2: []int{2, 3, 1}, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := Equals(tt.args.s1, tt.args.s2); got != tt.want { + t.Errorf("Equals() = %v, want %v", got, tt.want) + } + }) + } +} + +func ptrOf[T any](values ...T) []*T { + var ptrs []*T + for i := 0; i < len(values); i++ { + ptrs = append(ptrs, &values[i]) + } + + return ptrs +} + +func TestEqualsPtr(t *testing.T) { + type args[T comparable] struct { + s1 []*T + s2 []*T + } + type testCase[T comparable] struct { + name string + args args[T] + want bool + } + + tests := []testCase[int]{ + { + name: "empty", + args: args[int]{}, + want: true, + }, + { + name: "same values", + args: args[int]{ + s1: ptrOf(1), + s2: ptrOf(1), + }, + want: true, + }, + { + name: "different values", + args: args[int]{ + s1: ptrOf(1), + s2: ptrOf(2), + }, + want: false, + }, + { + name: "different length", + args: args[int]{ + s1: ptrOf(1), + s2: ptrOf(1, 1), + }, + want: false, + }, + { + name: "equals", + args: args[int]{ + s1: ptrOf(1, 2, 3), + s2: ptrOf(1, 2, 3), + }, + want: true, + }, + { + name: "unordered", + args: args[int]{ + s1: ptrOf(1, 2, 3), + s2: ptrOf(3, 2, 1), + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := EqualsPtr(tt.args.s1, tt.args.s2); got != tt.want { + t.Errorf("EqualsPtr() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestEqualsUnorderedPtr(t *testing.T) { + type args[T comparable] struct { + s1 []*T + s2 []*T + } + type testCase[T comparable] struct { + name string + args args[T] + want bool + } + tests := []testCase[int]{ + { + name: "empty", + args: args[int]{}, + want: true, + }, + { + name: "first is empty", + args: args[int]{ + s2: ptrOf(1), + }, + want: false, + }, + { + name: "second is empty", + args: args[int]{ + s1: ptrOf(1), + }, + want: false, + }, + { + name: "length differs", + args: args[int]{ + s1: ptrOf(1), + s2: ptrOf(1, 1), + }, + want: false, + }, + { + name: "different values", + args: args[int]{ + s1: ptrOf(1), + s2: ptrOf(2), + }, + want: false, + }, + { + name: "equals with the same order", + args: args[int]{ + s1: ptrOf(1, 2, 3), + s2: ptrOf(1, 2, 3), + }, + want: true, + }, + { + name: "equals but unordered", + args: args[int]{ + s1: ptrOf(1, 2, 3), + s2: ptrOf(2, 3, 1), + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := EqualsUnorderedPtr(tt.args.s1, tt.args.s2); got != tt.want { + t.Errorf("EqualsUnorderedPtr() = %v, want %v", got, tt.want) + } + }) + } +}