From 055fe4f0abc67224e14d4be2554237114c40ee5d Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Fri, 2 Feb 2024 16:27:09 +0200 Subject: [PATCH] cassandra code base was refactored --- .secrets.baseline | 16 +- apis/clusters/v1beta1/cassandra_types.go | 338 +++++++----- apis/clusters/v1beta1/cassandra_webhook.go | 24 +- apis/clusters/v1beta1/generic_spec.go | 17 +- apis/clusters/v1beta1/opensearch_types.go | 48 +- apis/clusters/v1beta1/opensearch_webhook.go | 16 +- apis/clusters/v1beta1/structs.go | 36 ++ apis/clusters/v1beta1/validation.go | 23 + .../clusters/v1beta1/zz_generated.deepcopy.go | 89 +++- .../clusters.instaclustr.com_cassandras.yaml | 34 +- .../samples/clusters_v1beta1_cassandra.yaml | 14 +- .../samples/clusters_v1beta1_opensearch.yaml | 14 +- controllers/clusters/cadence_controller.go | 22 +- controllers/clusters/cassandra_controller.go | 492 +++++++++--------- controllers/clusters/opensearch_controller.go | 8 +- pkg/instaclustr/client.go | 10 +- pkg/instaclustr/interfaces.go | 2 +- pkg/instaclustr/mock/client.go | 2 +- pkg/models/cassandra_apiv2.go | 43 +- pkg/utils/slices/slices.go | 73 +++ pkg/utils/slices/slices_test.go | 300 +++++++++++ 21 files changed, 1069 insertions(+), 552 deletions(-) create mode 100644 pkg/utils/slices/slices.go create mode 100644 pkg/utils/slices/slices_test.go diff --git a/.secrets.baseline b/.secrets.baseline index 7f1664aa1..e2dcb91c9 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -188,23 +188,23 @@ { "type": "Secret Keyword", "filename": "apis/clusters/v1beta1/cassandra_types.go", - "hashed_secret": "4d2d63a69bc8074b20a4edcdcbbfa2b81d791543", + "hashed_secret": "331cc743251c3b9504229de4d139c539da121a33", "is_verified": false, - "line_number": 257 + "line_number": 263 }, { "type": "Secret Keyword", "filename": "apis/clusters/v1beta1/cassandra_types.go", - "hashed_secret": "e0a46b27231f798fe22dc4d5d82b5feeb5dcf085", + "hashed_secret": "0ad8d7005e084d4f028a4277b73c6fab24269c17", "is_verified": false, - "line_number": 313 + "line_number": 349 }, { "type": "Secret Keyword", "filename": "apis/clusters/v1beta1/cassandra_types.go", - "hashed_secret": "e7f873437cda278898e12c04e623fcbefc193cb8", + "hashed_secret": "e0a46b27231f798fe22dc4d5d82b5feeb5dcf085", "is_verified": false, - "line_number": 349 + "line_number": 414 } ], "apis/clusters/v1beta1/cassandra_webhook.go": [ @@ -744,7 +744,7 @@ "filename": "pkg/instaclustr/client.go", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 2042 + "line_number": 2048 } ], "pkg/instaclustr/mock/client.go": [ @@ -1135,5 +1135,5 @@ } ] }, - "generated_at": "2024-02-05T15:02:47Z" + "generated_at": "2024-02-05T09:41:58Z" } diff --git a/apis/clusters/v1beta1/cassandra_types.go b/apis/clusters/v1beta1/cassandra_types.go index 8129e7a1b..9d1a75c50 100644 --- a/apis/clusters/v1beta1/cassandra_types.go +++ b/apis/clusters/v1beta1/cassandra_types.go @@ -17,7 +17,6 @@ limitations under the License. package v1beta1 import ( - "encoding/json" "strconv" k8scorev1 "k8s.io/api/core/v1" @@ -52,36 +51,140 @@ type CassandraRestoreFrom struct { // CassandraSpec defines the desired state of Cassandra type CassandraSpec struct { - RestoreFrom *CassandraRestoreFrom `json:"restoreFrom,omitempty"` - OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"` - Cluster `json:",inline"` + GenericClusterSpec `json:",inline"` + + RestoreFrom *CassandraRestoreFrom `json:"restoreFrom,omitempty"` + OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"` DataCentres []*CassandraDataCentre `json:"dataCentres,omitempty"` LuceneEnabled bool `json:"luceneEnabled,omitempty"` PasswordAndUserAuth bool `json:"passwordAndUserAuth,omitempty"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` UserRefs References `json:"userRefs,omitempty"` //+kubebuilder:validate:MaxItems:=1 - ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` + ResizeSettings GenericResizeSettings `json:"resizeSettings,omitempty"` } // CassandraStatus defines the observed state of Cassandra type CassandraStatus struct { - ClusterStatus `json:",inline"` + GenericStatus `json:",inline"` + DataCentres []*CassandraDataCentreStatus `json:"dataCentres,omitempty"` + AvailableUsers References `json:"availableUsers,omitempty"` } -type CassandraDataCentre struct { - DataCentre `json:",inline"` - ContinuousBackup bool `json:"continuousBackup"` - PrivateIPBroadcastForDiscovery bool `json:"privateIpBroadcastForDiscovery"` - ClientToClusterEncryption bool `json:"clientToClusterEncryption"` - ReplicationFactor int `json:"replicationFactor"` +func (s *CassandraStatus) ToOnPremises() ClusterStatus { + dc := &DataCentreStatus{ + ID: s.DataCentres[0].ID, + Nodes: s.DataCentres[0].Nodes, + } + + return ClusterStatus{ + ID: s.ID, + DataCentres: []*DataCentreStatus{dc}, + } +} + +func (s *CassandraStatus) Equals(o *CassandraStatus) bool { + return s.GenericStatus.Equals(&o.GenericStatus) && + s.DataCentresEqual(o) +} + +func (s *CassandraStatus) DataCentresEqual(o *CassandraStatus) bool { + if len(s.DataCentres) != len(o.DataCentres) { + return false + } + + sMap := map[string]*CassandraDataCentreStatus{} + for _, dc := range s.DataCentres { + sMap[dc.Name] = dc + } + + for _, oDC := range o.DataCentres { + sDC, ok := sMap[oDC.Name] + if !ok { + return false + } + if !sDC.Equals(oDC) { + return false + } + } + + return true +} + +type CassandraDataCentre struct { + GenericDataCentreSpec `json:",inline"` + + ContinuousBackup bool `json:"continuousBackup"` + PrivateIPBroadcastForDiscovery bool `json:"privateIpBroadcastForDiscovery"` + PrivateLink bool `json:"privateLink,omitempty"` + ClientToClusterEncryption bool `json:"clientToClusterEncryption"` + ReplicationFactor int `json:"replicationFactor"` + NodesNumber int `json:"nodesNumber"` + NodeSize string `json:"nodeSize"` // Adds the specified version of Debezium Connector Cassandra to the Cassandra cluster // +kubebuilder:validation:MaxItems=1 - Debezium []DebeziumCassandraSpec `json:"debezium,omitempty"` - PrivateLink bool `json:"privateLink,omitempty"` - ShotoverProxy []ShotoverProxySpec `json:"shotoverProxy,omitempty"` + Debezium []*DebeziumCassandraSpec `json:"debezium,omitempty"` + ShotoverProxy []*ShotoverProxySpec `json:"shotoverProxy,omitempty"` +} + +func (dc *CassandraDataCentre) Equals(o *CassandraDataCentre) bool { + return dc.GenericDataCentreSpec.Equals(&o.GenericDataCentreSpec) && + dc.ContinuousBackup == o.ContinuousBackup && + dc.PrivateIPBroadcastForDiscovery == o.PrivateIPBroadcastForDiscovery && + dc.PrivateLink == o.PrivateLink && + dc.ClientToClusterEncryption == o.ClientToClusterEncryption && + dc.ReplicationFactor == o.ReplicationFactor && + dc.NodesNumber == o.NodesNumber && + dc.NodeSize == o.NodeSize && + dc.DebeziumEquals(o) && + dc.ShotoverProxyEquals(o) +} + +type CassandraDataCentreStatus struct { + GenericDataCentreStatus `json:",inline"` + Nodes []*Node `json:"nodes"` +} + +func (s *CassandraDataCentreStatus) Equals(o *CassandraDataCentreStatus) bool { + return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) && + s.nodesEqual(o.Nodes) +} + +func (s *CassandraDataCentreStatus) nodesEqual(nodes []*Node) bool { + if len(s.Nodes) != len(nodes) { + return false + } + + sNodes := map[string]*Node{} + for _, node := range s.Nodes { + sNodes[node.ID] = node + } + + for _, node := range nodes { + sNode, ok := sNodes[node.ID] + if !ok { + return false + } + + if !sNode.Equals(node) { + return false + } + } + + return true +} + +func (s *CassandraDataCentreStatus) FromInstAPI(instModel *models.CassandraDataCentre) { + s.GenericDataCentreStatus.FromInstAPI(&instModel.GenericDataCentreFields) + + s.Nodes = make([]*Node, len(instModel.Nodes)) + for i, instNode := range instModel.Nodes { + node := &Node{} + node.FromInstAPI(instNode) + s.Nodes[i] = node + } } type ShotoverProxySpec struct { @@ -153,6 +256,14 @@ func (d *CassandraDataCentre) ShotoverProxyEquals(new *CassandraDataCentre) bool return true } +func (c *CassandraSpec) IsEqual(o *CassandraSpec) bool { + return c.GenericClusterSpec.Equals(&o.GenericClusterSpec) && + c.AreDCsEqual(o.DataCentres) && + c.LuceneEnabled == o.LuceneEnabled && + c.PasswordAndUserAuth == o.PasswordAndUserAuth && + c.BundledUseOnly == o.BundledUseOnly +} + //+kubebuilder:object:root=true //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" @@ -211,19 +322,9 @@ func (c *Cassandra) NewBackupSpec(startTimestamp int) *clusterresourcesv1beta1.C } } -func (c *Cassandra) FromInstAPI(iData []byte) (*Cassandra, error) { - iCass := &models.CassandraCluster{} - err := json.Unmarshal(iData, iCass) - if err != nil { - return nil, err - } - - return &Cassandra{ - TypeMeta: c.TypeMeta, - ObjectMeta: c.ObjectMeta, - Spec: c.Spec.FromInstAPI(iCass), - Status: c.Status.FromInstAPI(iCass), - }, nil +func (c *Cassandra) FromInstAPI(instModel *models.CassandraCluster) { + c.Spec.FromInstAPI(instModel) + c.Status.FromInstAPI(instModel) } func (cs *CassandraSpec) HasRestore() bool { @@ -237,88 +338,83 @@ func (cs *CassandraSpec) HasRestore() bool { func (cs *CassandraSpec) DCsUpdateToInstAPI() models.CassandraClusterAPIUpdate { return models.CassandraClusterAPIUpdate{ DataCentres: cs.DCsToInstAPI(), - ResizeSettings: resizeSettingsToInstAPI(cs.ResizeSettings), + ResizeSettings: cs.ResizeSettings.ToInstAPI(), } } -func (cs *CassandraSpec) FromInstAPI(iCass *models.CassandraCluster) CassandraSpec { - return CassandraSpec{ - Cluster: Cluster{ - Name: iCass.Name, - Version: iCass.CassandraVersion, - PCICompliance: iCass.PCIComplianceMode, - PrivateNetworkCluster: iCass.PrivateNetworkCluster, - SLATier: iCass.SLATier, - TwoFactorDelete: cs.Cluster.TwoFactorDeleteFromInstAPI(iCass.TwoFactorDelete), - Description: iCass.Description, - }, - DataCentres: cs.DCsFromInstAPI(iCass.DataCentres), - LuceneEnabled: iCass.LuceneEnabled, - PasswordAndUserAuth: iCass.PasswordAndUserAuth, - BundledUseOnly: iCass.BundledUseOnly, - ResizeSettings: resizeSettingsFromInstAPI(iCass.ResizeSettings), - } -} - -func (cs *CassandraSpec) DebeziumFromInstAPI(iDebeziums []*models.Debezium) (dcs []DebeziumCassandraSpec) { - var debeziums []DebeziumCassandraSpec - for _, iDebezium := range iDebeziums { - debeziums = append(debeziums, DebeziumCassandraSpec{ - KafkaVPCType: iDebezium.KafkaVPCType, - KafkaTopicPrefix: iDebezium.KafkaTopicPrefix, - KafkaDataCentreID: iDebezium.KafkaDataCentreID, - Version: iDebezium.Version, - }) +func (cs *CassandraSpec) FromInstAPI(instModel *models.CassandraCluster) { + cs.GenericClusterSpec.FromInstAPI(&instModel.GenericClusterFields) + + cs.LuceneEnabled = instModel.LuceneEnabled + cs.PasswordAndUserAuth = instModel.PasswordAndUserAuth + cs.BundledUseOnly = instModel.BundledUseOnly + cs.Version = instModel.CassandraVersion + cs.ResizeSettings.FromInstAPI(instModel.ResizeSettings) + + cs.dcsFromInstAPI(instModel.DataCentres) +} + +func (cs *CassandraSpec) dcsFromInstAPI(instModels []*models.CassandraDataCentre) { + cs.DataCentres = make([]*CassandraDataCentre, len(instModels)) + for i, instModel := range instModels { + dc := &CassandraDataCentre{} + dc.FromInstAPI(instModel) + cs.DataCentres[i] = dc } - return debeziums } -func (cs *CassandraSpec) ShotoverProxyFromInstAPI(iShotoverProxys []*models.ShotoverProxy) (sps []ShotoverProxySpec) { - for _, iShotoverProxy := range iShotoverProxys { - sps = append(sps, ShotoverProxySpec{ - NodeSize: iShotoverProxy.NodeSize, - }) +func (d *CassandraDataCentre) FromInstAPI(instModel *models.CassandraDataCentre) { + d.GenericDataCentreSpec.FromInstAPI(&instModel.GenericDataCentreFields) + + d.ContinuousBackup = instModel.ContinuousBackup + d.PrivateIPBroadcastForDiscovery = instModel.PrivateIPBroadcastForDiscovery + d.PrivateLink = instModel.PrivateLink + d.ClientToClusterEncryption = instModel.ClientToClusterEncryption + d.ReplicationFactor = instModel.ReplicationFactor + d.NodesNumber = instModel.NumberOfNodes + d.NodeSize = instModel.NodeSize + + d.debeziumFromInstAPI(instModel.Debezium) + d.shotoverProxyFromInstAPI(instModel.ShotoverProxy) +} + +func (cs *CassandraDataCentre) debeziumFromInstAPI(instModels []*models.Debezium) { + cs.Debezium = make([]*DebeziumCassandraSpec, len(instModels)) + for i, instModel := range instModels { + cs.Debezium[i] = &DebeziumCassandraSpec{ + KafkaVPCType: instModel.KafkaVPCType, + KafkaTopicPrefix: instModel.KafkaTopicPrefix, + KafkaDataCentreID: instModel.KafkaDataCentreID, + Version: instModel.Version, + } } - return sps } -func (cs *CassandraSpec) DCsFromInstAPI(iDCs []*models.CassandraDataCentre) (dcs []*CassandraDataCentre) { - for _, iDC := range iDCs { - dcs = append(dcs, &CassandraDataCentre{ - DataCentre: cs.Cluster.DCFromInstAPI(iDC.DataCentre), - ContinuousBackup: iDC.ContinuousBackup, - PrivateIPBroadcastForDiscovery: iDC.PrivateIPBroadcastForDiscovery, - ClientToClusterEncryption: iDC.ClientToClusterEncryption, - ReplicationFactor: iDC.ReplicationFactor, - PrivateLink: iDC.PrivateLink, - Debezium: cs.DebeziumFromInstAPI(iDC.Debezium), - ShotoverProxy: cs.ShotoverProxyFromInstAPI(iDC.ShotoverProxy), - }) +func (cs *CassandraDataCentre) shotoverProxyFromInstAPI(instModels []*models.ShotoverProxy) { + cs.ShotoverProxy = make([]*ShotoverProxySpec, len(instModels)) + for i, instModel := range instModels { + cs.ShotoverProxy[i] = &ShotoverProxySpec{ + NodeSize: instModel.NodeSize, + } } - return } -func (cs *CassandraSpec) DCsToInstAPI() (iDCs []*models.CassandraDataCentre) { +func (cs *CassandraSpec) DCsToInstAPI() (instaModels []*models.CassandraDataCentre) { for _, dc := range cs.DataCentres { - iDCs = append(iDCs, dc.ToInstAPI()) + instaModels = append(instaModels, dc.ToInstAPI()) } return } func (cs *CassandraSpec) ToInstAPI() *models.CassandraCluster { return &models.CassandraCluster{ - Name: cs.Name, - CassandraVersion: cs.Version, - LuceneEnabled: cs.LuceneEnabled, - PasswordAndUserAuth: cs.PasswordAndUserAuth, - DataCentres: cs.DCsToInstAPI(), - SLATier: cs.SLATier, - PrivateNetworkCluster: cs.PrivateNetworkCluster, - PCIComplianceMode: cs.PCICompliance, - TwoFactorDelete: cs.TwoFactorDeletesToInstAPI(), - BundledUseOnly: cs.BundledUseOnly, - Description: cs.Description, - ResizeSettings: resizeSettingsToInstAPI(cs.ResizeSettings), + GenericClusterFields: cs.GenericClusterSpec.ToInstAPI(), + CassandraVersion: cs.Version, + LuceneEnabled: cs.LuceneEnabled, + PasswordAndUserAuth: cs.PasswordAndUserAuth, + BundledUseOnly: cs.BundledUseOnly, + DataCentres: cs.DCsToInstAPI(), + ResizeSettings: cs.ResizeSettings.ToInstAPI(), } } @@ -342,18 +438,10 @@ func (c *Cassandra) RestoreInfoToInstAPI(restoreData *CassandraRestoreFrom) any return iRestore } -func (cs *CassandraSpec) IsEqual(spec CassandraSpec) bool { - return cs.Cluster.IsEqual(spec.Cluster) && - cs.AreDCsEqual(spec.DataCentres) && - cs.LuceneEnabled == spec.LuceneEnabled && - cs.PasswordAndUserAuth == spec.PasswordAndUserAuth && - cs.BundledUseOnly == spec.BundledUseOnly -} - func (c *Cassandra) GetSpec() CassandraSpec { return c.Spec } func (c *Cassandra) IsSpecEqual(spec CassandraSpec) bool { - return c.Spec.IsEqual(spec) + return c.Spec.IsEqual(&spec) } func (cs *CassandraSpec) AreDCsEqual(dcs []*CassandraDataCentre) bool { @@ -361,21 +449,18 @@ func (cs *CassandraSpec) AreDCsEqual(dcs []*CassandraDataCentre) bool { return false } - for i, iDC := range dcs { - dataCentre := cs.DataCentres[i] + k8sDCs := map[string]*CassandraDataCentre{} + for _, dc := range cs.DataCentres { + k8sDCs[dc.Name] = dc + } - if iDC.Name != dataCentre.Name { - continue + for _, instDC := range dcs { + k8sDC, ok := k8sDCs[instDC.Name] + if !ok { + return false } - if !dataCentre.IsEqual(iDC.DataCentre) || - iDC.ClientToClusterEncryption != dataCentre.ClientToClusterEncryption || - iDC.PrivateIPBroadcastForDiscovery != dataCentre.PrivateIPBroadcastForDiscovery || - iDC.PrivateLink != dataCentre.PrivateLink || - iDC.ContinuousBackup != dataCentre.ContinuousBackup || - iDC.ReplicationFactor != dataCentre.ReplicationFactor || - !dataCentre.DebeziumEquals(iDC) || - !dataCentre.ShotoverProxyEquals(iDC) { + if !k8sDC.Equals(instDC) { return false } } @@ -383,33 +468,30 @@ func (cs *CassandraSpec) AreDCsEqual(dcs []*CassandraDataCentre) bool { return true } -func (cs *CassandraStatus) FromInstAPI(iCass *models.CassandraCluster) CassandraStatus { - return CassandraStatus{ - ClusterStatus: ClusterStatus{ - ID: iCass.ID, - State: iCass.Status, - DataCentres: cs.DCsFromInstAPI(iCass.DataCentres), - CurrentClusterOperationStatus: iCass.CurrentClusterOperationStatus, - MaintenanceEvents: cs.MaintenanceEvents, - }, - } +func (cs *CassandraStatus) FromInstAPI(instModel *models.CassandraCluster) { + cs.GenericStatus.FromInstAPI(&instModel.GenericClusterFields) + cs.dcsFromInstAPI(instModel.DataCentres) } -func (cs *CassandraStatus) DCsFromInstAPI(iDCs []*models.CassandraDataCentre) (dcs []*DataCentreStatus) { - for _, iDC := range iDCs { - dcs = append(dcs, cs.ClusterStatus.DCFromInstAPI(iDC.DataCentre)) +func (cs *CassandraStatus) dcsFromInstAPI(instModels []*models.CassandraDataCentre) { + cs.DataCentres = make([]*CassandraDataCentreStatus, len(instModels)) + for i, instModel := range instModels { + dc := &CassandraDataCentreStatus{} + dc.FromInstAPI(instModel) + cs.DataCentres[i] = dc } - return } func (cdc *CassandraDataCentre) ToInstAPI() *models.CassandraDataCentre { return &models.CassandraDataCentre{ - DataCentre: cdc.DataCentre.ToInstAPI(), + GenericDataCentreFields: cdc.GenericDataCentreSpec.ToInstAPI(), ClientToClusterEncryption: cdc.ClientToClusterEncryption, PrivateLink: cdc.PrivateLink, ContinuousBackup: cdc.ContinuousBackup, PrivateIPBroadcastForDiscovery: cdc.PrivateIPBroadcastForDiscovery, ReplicationFactor: cdc.ReplicationFactor, + NodeSize: cdc.NodeSize, + NumberOfNodes: cdc.NodesNumber, Debezium: cdc.DebeziumToInstAPI(), ShotoverProxy: cdc.ShotoverProxyToInstaAPI(), } @@ -457,7 +539,7 @@ func init() { func (c *Cassandra) GetExposePorts() []k8scorev1.ServicePort { var exposePorts []k8scorev1.ServicePort - if !c.Spec.PrivateNetworkCluster { + if !c.Spec.PrivateNetwork { exposePorts = []k8scorev1.ServicePort{ { Name: models.CassandraInterNode, diff --git a/apis/clusters/v1beta1/cassandra_webhook.go b/apis/clusters/v1beta1/cassandra_webhook.go index 2673ad9e7..c0b60a4f0 100644 --- a/apis/clusters/v1beta1/cassandra_webhook.go +++ b/apis/clusters/v1beta1/cassandra_webhook.go @@ -68,10 +68,6 @@ func (c *Cassandra) Default() { models.ResourceStateAnnotation: "", }) } - - for _, dataCentre := range c.Spec.DataCentres { - dataCentre.SetDefaultValues() - } } // ValidateCreate implements webhook.Validator so a webhook will be registered for the type @@ -91,7 +87,7 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob } } - err := c.Spec.Cluster.ValidateCreation() + err := c.Spec.GenericClusterSpec.ValidateCreation() if err != nil { return err } @@ -110,7 +106,7 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob if err != nil { return err } - if c.Spec.PrivateNetworkCluster { + if c.Spec.PrivateNetwork { err = c.Spec.OnPremisesSpec.ValidateSSHGatewayCreation() if err != nil { return err @@ -140,18 +136,18 @@ func (cv *cassandraValidator) ValidateCreate(ctx context.Context, obj runtime.Ob for _, dc := range c.Spec.DataCentres { if c.Spec.OnPremisesSpec != nil { - err = dc.DataCentre.ValidateOnPremisesCreation() + err = dc.GenericDataCentreSpec.ValidateOnPremisesCreation() if err != nil { return err } } else { - err = dc.DataCentre.ValidateCreation() + err = dc.GenericDataCentreSpec.validateCreation() if err != nil { return err } } - if !c.Spec.PrivateNetworkCluster && dc.PrivateIPBroadcastForDiscovery { + if !c.Spec.PrivateNetwork && dc.PrivateIPBroadcastForDiscovery { return fmt.Errorf("cannot use private ip broadcast for discovery on public network cluster") } @@ -182,6 +178,10 @@ func (cv *cassandraValidator) ValidateUpdate(ctx context.Context, old runtime.Ob cassandralog.Info("validate update", "name", c.Name) + if c.Annotations[models.ResourceStateAnnotation] == models.CreatingEvent { + return nil + } + // skip validation when we receive cluster specification update from the Instaclustr Console. if c.Annotations[models.ExternalChangesAnnotation] == models.True { return nil @@ -259,7 +259,7 @@ func (cs *CassandraSpec) newImmutableFields() *immutableCassandraFields { LuceneEnabled: cs.LuceneEnabled, PasswordAndUserAuth: cs.PasswordAndUserAuth, }, - immutableCluster: cs.Cluster.newImmutableFields(), + immutableCluster: cs.GenericClusterSpec.immutableFields(), } } @@ -307,11 +307,11 @@ func (cs *CassandraSpec) validateDataCentresUpdate(oldSpec CassandraSpec) error return fmt.Errorf("cannot change datacentre name: %v", newDC.Name) } - if err := newDC.ValidateCreation(); err != nil { + if err := newDC.validateCreation(); err != nil { return err } - if !cs.PrivateNetworkCluster && newDC.PrivateIPBroadcastForDiscovery { + if !cs.PrivateNetwork && newDC.PrivateIPBroadcastForDiscovery { return fmt.Errorf("cannot use private ip broadcast for discovery on public network cluster") } diff --git a/apis/clusters/v1beta1/generic_spec.go b/apis/clusters/v1beta1/generic_spec.go index fbeb465f1..be7024be2 100644 --- a/apis/clusters/v1beta1/generic_spec.go +++ b/apis/clusters/v1beta1/generic_spec.go @@ -2,6 +2,7 @@ package v1beta1 import ( "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/utils/slices" ) type GenericClusterSpec struct { @@ -33,19 +34,7 @@ func (s *GenericClusterSpec) Equals(o *GenericClusterSpec) bool { s.PrivateNetwork == o.PrivateNetwork && s.SLATier == o.SLATier && s.Description == o.Description && - s.TwoFactorDeleteEquals(o) -} - -func (s *GenericClusterSpec) TwoFactorDeleteEquals(o *GenericClusterSpec) bool { - if len(s.TwoFactorDelete) != len(o.TwoFactorDelete) { - return false - } - - if len(s.TwoFactorDelete) > 0 { - return *s.TwoFactorDelete[0] == *o.TwoFactorDelete[0] - } - - return true + slices.EqualsPtr(s.TwoFactorDelete, o.TwoFactorDelete) } func (s *GenericClusterSpec) FromInstAPI(model *models.GenericClusterFields) { @@ -126,7 +115,7 @@ func (s *GenericDataCentreSpec) Equals(o *GenericDataCentreSpec) bool { s.ProviderAccountName == o.ProviderAccountName && s.Network == o.Network && areTagsEqual(s.Tags, o.Tags) && - areCloudProviderSettingsEqual(s.CloudProviderSettings, o.CloudProviderSettings) + slices.EqualsPtr(s.CloudProviderSettings, o.CloudProviderSettings) } func (s *GenericDataCentreSpec) FromInstAPI(model *models.GenericDataCentreFields) { diff --git a/apis/clusters/v1beta1/opensearch_types.go b/apis/clusters/v1beta1/opensearch_types.go index a9a89b59f..300e783aa 100644 --- a/apis/clusters/v1beta1/opensearch_types.go +++ b/apis/clusters/v1beta1/opensearch_types.go @@ -26,13 +26,9 @@ import ( clusterresourcesv1beta1 "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/utils/slices" ) -// +kubebuilder:object:generate:=false -type OpenSearchNodeTypes interface { - OpenSearchDataNodes | OpenSearchDashboards | ClusterManagerNodes | OpenSearchIngestNodes -} - // OpenSearchSpec defines the desired state of OpenSearch type OpenSearchSpec struct { GenericClusterSpec `json:",inline"` @@ -333,22 +329,22 @@ func (c *OpenSearch) IsSpecEqual(spec OpenSearchSpec) bool { func (a *OpenSearchSpec) IsEqual(b OpenSearchSpec) bool { return a.GenericClusterSpec.Equals(&b.GenericClusterSpec) && - areOpenSearchSettingsEqual(a.DataNodes, b.DataNodes) && + slices.EqualsPtr(a.DataNodes, b.DataNodes) && + slices.EqualsPtr(a.Dashboards, b.Dashboards) && + slices.EqualsPtr(a.ClusterManagerNodes, b.ClusterManagerNodes) && + slices.EqualsPtr(a.IngestNodes, b.IngestNodes) && a.ICUPlugin == b.ICUPlugin && a.AsynchronousSearchPlugin == b.AsynchronousSearchPlugin && a.KNNPlugin == b.KNNPlugin && - areOpenSearchSettingsEqual(a.Dashboards, b.Dashboards) && a.ReportingPlugin == b.ReportingPlugin && a.SQLPlugin == b.SQLPlugin && a.NotificationsPlugin == b.NotificationsPlugin && a.AnomalyDetectionPlugin == b.AnomalyDetectionPlugin && a.LoadBalancer == b.LoadBalancer && - areOpenSearchSettingsEqual(a.ClusterManagerNodes, b.ClusterManagerNodes) && a.IndexManagementPlugin == b.IndexManagementPlugin && a.AlertingPlugin == b.AlertingPlugin && a.BundledUseOnly == b.BundledUseOnly && - a.areDCsEqual(b.DataCentres) && - areOpenSearchSettingsEqual(a.IngestNodes, b.IngestNodes) + a.areDCsEqual(b.DataCentres) } func (oss *OpenSearchSpec) areDCsEqual(b []*OpenSearchDataCentre) bool { @@ -370,20 +366,6 @@ func (oss *OpenSearchSpec) areDCsEqual(b []*OpenSearchDataCentre) bool { return true } -func areCloudProviderSettingsEqual(a, b []*CloudProviderSettings) bool { - if len(a) != len(b) { - return false - } - - for i := range b { - if *a[i] != *b[i] { - return false - } - } - - return true -} - func areTagsEqual(a, b map[string]string) bool { if len(a) != len(b) { return false @@ -398,24 +380,6 @@ func areTagsEqual(a, b map[string]string) bool { return true } -func areOpenSearchSettingsEqual[T OpenSearchNodeTypes](a, b []*T) bool { - if a == nil && b == nil { - return true - } - - if len(a) != len(b) { - return false - } - - for i := range a { - if *a[i] != *b[i] { - return false - } - } - - return true -} - func (oss *OpenSearchSpec) ToInstAPIUpdate() models.OpenSearchInstAPIUpdateRequest { return models.OpenSearchInstAPIUpdateRequest{ DataNodes: oss.dataNodesToInstAPI(), diff --git a/apis/clusters/v1beta1/opensearch_webhook.go b/apis/clusters/v1beta1/opensearch_webhook.go index b36fc7faf..fc3c69c3c 100644 --- a/apis/clusters/v1beta1/opensearch_webhook.go +++ b/apis/clusters/v1beta1/opensearch_webhook.go @@ -339,7 +339,7 @@ func (oss *OpenSearchSpec) validateImmutableDataCentresUpdate(oldDCs []*OpenSear return fmt.Errorf("cannot update immutable data centre fields: new spec: %v: old spec: %v", newDCImmutableFields, oldDCImmutableFields) } - err := validateImmutableCloudProviderSettingsUpdate(newDC.CloudProviderSettings, oldDC.CloudProviderSettings) + err := oldDC.validateImmutableCloudProviderSettingsUpdate(newDC.CloudProviderSettings) if err != nil { return err } @@ -368,20 +368,6 @@ func (dc *OpenSearchDataCentre) validateDataNode(nodes []*OpenSearchDataNodes) e return nil } -func validateImmutableCloudProviderSettingsUpdate(newSettings, oldSettings []*CloudProviderSettings) error { - if len(oldSettings) != len(newSettings) { - return models.ErrImmutableCloudProviderSettings - } - - for i := range newSettings { - if *newSettings[i] != *oldSettings[i] { - return models.ErrImmutableCloudProviderSettings - } - } - - return nil -} - func validateDataNode(newNodes, oldNodes []*OpenSearchDataNodes) error { for i := range oldNodes { if oldNodes[i].NodesNumber > newNodes[i].NodesNumber { diff --git a/apis/clusters/v1beta1/structs.go b/apis/clusters/v1beta1/structs.go index bec014884..a9daa97ff 100644 --- a/apis/clusters/v1beta1/structs.go +++ b/apis/clusters/v1beta1/structs.go @@ -761,3 +761,39 @@ func (old References) Diff(new References) (added, deleted References) { return added, deleted } + +type GenericResizeSettings []*ResizeSettings + +func (g *GenericResizeSettings) FromInstAPI(instModels []*models.ResizeSettings) { + *g = make(GenericResizeSettings, len(instModels)) + for i, instModel := range instModels { + (*g)[i] = &ResizeSettings{ + NotifySupportContacts: instModel.NotifySupportContacts, + Concurrency: instModel.Concurrency, + } + } +} + +func (g *GenericResizeSettings) ToInstAPI() []*models.ResizeSettings { + instaModels := make([]*models.ResizeSettings, len(*g)) + for i, setting := range *g { + instaModels[i] = &models.ResizeSettings{ + NotifySupportContacts: setting.NotifySupportContacts, + Concurrency: setting.Concurrency, + } + } + + return instaModels +} + +func (g GenericResizeSettings) Equal(o GenericResizeSettings) bool { + if len(g) != len(o) { + return false + } + + if len(g) > 0 { + return *g[0] == *o[0] + } + + return true +} diff --git a/apis/clusters/v1beta1/validation.go b/apis/clusters/v1beta1/validation.go index fa6461e51..a9b200a74 100644 --- a/apis/clusters/v1beta1/validation.go +++ b/apis/clusters/v1beta1/validation.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/utils/slices" "github.com/instaclustr/operator/pkg/validation" ) @@ -392,3 +393,25 @@ func (s *GenericDataCentreSpec) validateCreation() error { return nil } + +func (s *GenericDataCentreSpec) ValidateOnPremisesCreation() error { + if s.CloudProvider != models.ONPREMISES { + return fmt.Errorf("cloud provider %s is unavailable for data centre: %s, available value: %s", + s.CloudProvider, s.Name, models.ONPREMISES) + } + + if s.Region != models.CLIENTDC { + return fmt.Errorf("region %s is unavailable for data centre: %s, available value: %s", + s.Region, s.Name, models.CLIENTDC) + } + + return nil +} + +func (s *GenericDataCentreSpec) validateImmutableCloudProviderSettingsUpdate(oldSettings []*CloudProviderSettings) error { + if !slices.EqualsPtr(s.CloudProviderSettings, oldSettings) { + return models.ErrImmutableCloudProviderSettings + } + + return nil +} diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index 364ee3429..8b798e270 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -384,18 +384,28 @@ func (in *Cassandra) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraDataCentre) DeepCopyInto(out *CassandraDataCentre) { *out = *in - in.DataCentre.DeepCopyInto(&out.DataCentre) + in.GenericDataCentreSpec.DeepCopyInto(&out.GenericDataCentreSpec) if in.Debezium != nil { in, out := &in.Debezium, &out.Debezium - *out = make([]DebeziumCassandraSpec, len(*in)) + *out = make([]*DebeziumCassandraSpec, len(*in)) for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(DebeziumCassandraSpec) + (*in).DeepCopyInto(*out) + } } } if in.ShotoverProxy != nil { in, out := &in.ShotoverProxy, &out.ShotoverProxy - *out = make([]ShotoverProxySpec, len(*in)) - copy(*out, *in) + *out = make([]*ShotoverProxySpec, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(ShotoverProxySpec) + **out = **in + } + } } } @@ -409,6 +419,33 @@ func (in *CassandraDataCentre) DeepCopy() *CassandraDataCentre { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CassandraDataCentreStatus) DeepCopyInto(out *CassandraDataCentreStatus) { + *out = *in + in.GenericDataCentreStatus.DeepCopyInto(&out.GenericDataCentreStatus) + if in.Nodes != nil { + in, out := &in.Nodes, &out.Nodes + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Node) + (*in).DeepCopyInto(*out) + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CassandraDataCentreStatus. +func (in *CassandraDataCentreStatus) DeepCopy() *CassandraDataCentreStatus { + if in == nil { + return nil + } + out := new(CassandraDataCentreStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraList) DeepCopyInto(out *CassandraList) { *out = *in @@ -470,6 +507,7 @@ func (in *CassandraRestoreFrom) DeepCopy() *CassandraRestoreFrom { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraSpec) DeepCopyInto(out *CassandraSpec) { *out = *in + in.GenericClusterSpec.DeepCopyInto(&out.GenericClusterSpec) if in.RestoreFrom != nil { in, out := &in.RestoreFrom, &out.RestoreFrom *out = new(CassandraRestoreFrom) @@ -480,7 +518,6 @@ func (in *CassandraSpec) DeepCopyInto(out *CassandraSpec) { *out = new(OnPremisesSpec) (*in).DeepCopyInto(*out) } - in.Cluster.DeepCopyInto(&out.Cluster) if in.DataCentres != nil { in, out := &in.DataCentres, &out.DataCentres *out = make([]*CassandraDataCentre, len(*in)) @@ -505,7 +542,7 @@ func (in *CassandraSpec) DeepCopyInto(out *CassandraSpec) { } if in.ResizeSettings != nil { in, out := &in.ResizeSettings, &out.ResizeSettings - *out = make([]*ResizeSettings, len(*in)) + *out = make(GenericResizeSettings, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] @@ -529,7 +566,18 @@ func (in *CassandraSpec) DeepCopy() *CassandraSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraStatus) DeepCopyInto(out *CassandraStatus) { *out = *in - in.ClusterStatus.DeepCopyInto(&out.ClusterStatus) + in.GenericStatus.DeepCopyInto(&out.GenericStatus) + if in.DataCentres != nil { + in, out := &in.DataCentres, &out.DataCentres + *out = make([]*CassandraDataCentreStatus, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(CassandraDataCentreStatus) + (*in).DeepCopyInto(*out) + } + } + } if in.AvailableUsers != nil { in, out := &in.AvailableUsers, &out.AvailableUsers *out = make(References, len(*in)) @@ -978,6 +1026,31 @@ func (in *GenericDataCentreStatus) DeepCopy() *GenericDataCentreStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in GenericResizeSettings) DeepCopyInto(out *GenericResizeSettings) { + { + in := &in + *out = make(GenericResizeSettings, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(ResizeSettings) + **out = **in + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GenericResizeSettings. +func (in GenericResizeSettings) DeepCopy() GenericResizeSettings { + if in == nil { + return nil + } + out := new(GenericResizeSettings) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GenericStatus) DeepCopyInto(out *GenericStatus) { *out = *in diff --git a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml index e06ce61ea..7818cfc5b 100644 --- a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml +++ b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml @@ -207,7 +207,7 @@ spec: provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch and Redis. type: boolean - privateNetworkCluster: + privateNetwork: type: boolean resizeSettings: items: @@ -319,15 +319,11 @@ spec: - namespace type: object type: array - cdcid: - type: string currentClusterOperationStatus: type: string dataCentres: items: properties: - encryptionKeyId: - type: string id: type: string name: @@ -353,21 +349,6 @@ spec: type: string type: object type: array - nodesNumber: - type: integer - privateLink: - items: - properties: - advertisedHostname: - type: string - endPointServiceId: - type: string - endPointServiceName: - type: string - required: - - advertisedHostname - type: object - type: array resizeOperations: items: properties: @@ -424,6 +405,8 @@ spec: type: array status: type: string + required: + - nodes type: object type: array id: @@ -514,19 +497,8 @@ spec: type: array type: object type: array - options: - properties: - dataNodeSize: - type: string - masterNodeSize: - type: string - openSearchDashboardsNodeSize: - type: string - type: object state: type: string - twoFactorDeleteEnabled: - type: boolean type: object type: object served: true diff --git a/config/samples/clusters_v1beta1_cassandra.yaml b/config/samples/clusters_v1beta1_cassandra.yaml index bf3759475..1f3797102 100644 --- a/config/samples/clusters_v1beta1_cassandra.yaml +++ b/config/samples/clusters_v1beta1_cassandra.yaml @@ -5,7 +5,7 @@ metadata: spec: name: "example-cassandra" #(immutable) version: "4.0.10" #(immutable) - privateNetworkCluster: true #(immutable) + privateNetwork: false #(immutable) dataCentres: - name: "AWS_cassandra" #(mutable) region: "US_EAST_1" #(immutable) @@ -20,7 +20,7 @@ spec: # version: "2.0.1" #(immutable) cloudProvider: "AWS_VPC" #(immutable) continuousBackup: false #(immutable) - nodesNumber: 4 #(mutable) + nodesNumber: 2 #(mutable) replicationFactor: 2 privateIpBroadcastForDiscovery: false #(immutable) network: "172.16.0.0/19" #(immutable) @@ -29,12 +29,12 @@ spec: clientToClusterEncryption: true #(immutable) # cloudProviderSettings: # - customVirtualNetworkId: "vpc-0b69c781969e980a9" -# nodeSize: "CAS-DEV-t4g.small-5" #(mutable) + nodeSize: "CAS-DEV-t4g.small-5" #(mutable) # nodeSize: "CAS-PRD-r6g.medium-80" #(mutable) # (production node size) - nodeSize: "CAS-PRD-r6g.medium-120" #(mutable) # (production node size) - privateLink: true - shotoverProxy: - - nodeSize: "CSO-PRD-c6g.xlarge-20" +# nodeSize: "CAS-PRD-r6g.medium-120" #(mutable) # (production node size) + privateLink: false +# shotoverProxy: +# - nodeSize: "CSO-PRD-c6g.xlarge-20" # accountName: "InstaclustrRIYOA" # - name: "AWS_cassandra2" # region: "US_EAST_1" diff --git a/config/samples/clusters_v1beta1_opensearch.yaml b/config/samples/clusters_v1beta1_opensearch.yaml index 08e961268..c5bef4e3e 100644 --- a/config/samples/clusters_v1beta1_opensearch.yaml +++ b/config/samples/clusters_v1beta1_opensearch.yaml @@ -11,7 +11,7 @@ metadata: annotations: test.annotation/first: testAnnotation spec: - name: bohdan-test + name: opensearch-test alertingPlugin: false anomalyDetectionPlugin: false asynchronousSearchPlugin: false @@ -21,8 +21,8 @@ spec: # - name: "test-user-2" # namespace: "default" clusterManagerNodes: - - dedicatedManager: true - nodeSize: SRH-DM-DEV-t4g.small-5 + - dedicatedManager: false + nodeSize: SRH-DEV-t4g.small-5 dataCentres: - cloudProvider: AWS_VPC name: AWS_VPC_US_EAST_1 @@ -32,11 +32,11 @@ spec: region: US_EAST_1 # ingestNodes: ## - nodeSize: SRH-DI-PRD-m6g.large-10 -# - nodeSize: SRH-DI-PRD-m6g.xlarge-10 +# - nodeSize: SRH-DI-DEV-t4g.small-5 # nodeCount: 3 - dataNodes: - - nodesNumber: 3 - nodeSize: SRH-DEV-t4g.small-5 +# dataNodes: +# - nodesNumber: 3 +# nodeSize: SRH-DEV-t4g.small-5 icuPlugin: false indexManagementPlugin: true knnPlugin: false diff --git a/controllers/clusters/cadence_controller.go b/controllers/clusters/cadence_controller.go index a443e1813..42a3a1473 100644 --- a/controllers/clusters/cadence_controller.go +++ b/controllers/clusters/cadence_controller.go @@ -748,7 +748,7 @@ func (r *CadenceReconciler) newCassandraSpec(c *v1beta1.Cadence, latestCassandra } slaTier := c.Spec.SLATier - privateClusterNetwork := c.Spec.PrivateNetworkCluster + privateNetwork := c.Spec.PrivateNetworkCluster pciCompliance := c.Spec.PCICompliance var twoFactorDelete []*v1beta1.TwoFactorDelete @@ -789,27 +789,27 @@ func (r *CadenceReconciler) newCassandraSpec(c *v1beta1.Cadence, latestCassandra cassandraDataCentres := []*v1beta1.CassandraDataCentre{ { - DataCentre: v1beta1.DataCentre{ + GenericDataCentreSpec: v1beta1.GenericDataCentreSpec{ Name: dcName, Region: dcRegion, CloudProvider: cloudProvider, ProviderAccountName: providerAccountName, - NodeSize: cassNodeSize, - NodesNumber: cassNodesNumber, Network: network, }, + NodeSize: cassNodeSize, + NodesNumber: cassNodesNumber, ReplicationFactor: cassReplicationFactor, PrivateIPBroadcastForDiscovery: cassPrivateIPBroadcastForDiscovery, }, } spec := v1beta1.CassandraSpec{ - Cluster: v1beta1.Cluster{ - Name: models.CassandraChildPrefix + c.Name, - Version: latestCassandraVersion, - SLATier: slaTier, - PrivateNetworkCluster: privateClusterNetwork, - TwoFactorDelete: twoFactorDelete, - PCICompliance: pciCompliance, + GenericClusterSpec: v1beta1.GenericClusterSpec{ + Name: models.CassandraChildPrefix + c.Name, + Version: latestCassandraVersion, + SLATier: slaTier, + PrivateNetwork: privateNetwork, + TwoFactorDelete: twoFactorDelete, + PCICompliance: pciCompliance, }, DataCentres: cassandraDataCentres, PasswordAndUserAuth: cassPasswordAndUserAuth, diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index 71a65db23..ade4ab49a 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -18,7 +18,9 @@ package clusters import ( "context" + "encoding/json" "errors" + "fmt" "strconv" "github.com/go-logr/logr" @@ -115,233 +117,162 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } } -func (r *CassandraReconciler) handleCreateCluster( - ctx context.Context, - l logr.Logger, - c *v1beta1.Cassandra, -) (reconcile.Result, error) { - l = l.WithName("Cassandra creation event") - var err error - patch := c.NewPatch() - if c.Status.ID == "" { - var id string - if c.Spec.HasRestore() { - l.Info( - "Creating cluster from backup", - "original cluster ID", c.Spec.RestoreFrom.ClusterID, - ) +func (r *CassandraReconciler) createCassandraFromRestore(c *v1beta1.Cassandra, l logr.Logger) (*models.CassandraCluster, error) { + l.Info( + "Creating cluster from backup", + "original cluster ID", c.Spec.RestoreFrom.ClusterID, + ) - id, err = r.API.RestoreCluster(c.RestoreInfoToInstAPI(c.Spec.RestoreFrom), models.CassandraAppKind) - if err != nil { - l.Error(err, "Cannot restore cluster from backup", - "original cluster ID", c.Spec.RestoreFrom.ClusterID, - ) + id, err := r.API.RestoreCluster(c.RestoreInfoToInstAPI(c.Spec.RestoreFrom), models.CassandraAppKind) + if err != nil { + l.Error(err, "Cannot restore cluster from backup", + "original cluster ID", c.Spec.RestoreFrom.ClusterID, + ) - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Cluster restore from backup on the Instaclustr is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf( + c, models.Warning, models.CreationFailed, + "Cluster restore from backup on the Instaclustr is failed. Reason: %v", + err, + ) - return reconcile.Result{}, err - } + return nil, err + } - r.EventRecorder.Eventf( - c, models.Normal, models.Created, - "Cluster restore request is sent. Original cluster ID: %s, new cluster ID: %s", - c.Spec.RestoreFrom.ClusterID, - id, - ) - } else { - l.Info( - "Creating cluster", - "cluster name", c.Spec.Name, - "data centres", c.Spec.DataCentres, - ) - iCassandraSpec := c.Spec.ToInstAPI() - for i, dc := range c.Spec.DataCentres { - for j, debezium := range dc.Debezium { - if debezium.ClusterRef != nil { - cdcID, err := clusterresources.GetDataCentreID(r.Client, ctx, debezium.ClusterRef) - if err != nil { - l.Error(err, "Cannot get cluster ID", - "Cluster reference", debezium.ClusterRef, - ) - return ctrl.Result{}, err - } - iCassandraSpec.DataCentres[i].Debezium[j].KafkaDataCentreID = cdcID - } + instaModel, err := r.API.GetCassandra(id) + if err != nil { + return nil, fmt.Errorf("failed to get cassandra cluster details, err: %w", err) + } + + r.EventRecorder.Eventf( + c, models.Normal, models.Created, + "Cluster restore request is sent. Original cluster ID: %s, new cluster ID: %s", + c.Spec.RestoreFrom.ClusterID, + instaModel.ID, + ) + + return instaModel, nil +} + +func (r *CassandraReconciler) mergeDebezium(c *v1beta1.Cassandra, spec *models.CassandraCluster) error { + for i, dc := range c.Spec.DataCentres { + for j, debezium := range dc.Debezium { + if debezium.ClusterRef != nil { + cdcID, err := clusterresources.GetDataCentreID(r.Client, context.Background(), debezium.ClusterRef) + if err != nil { + return fmt.Errorf("failed to get kafka data centre id, err: %w", err) } + spec.DataCentres[i].Debezium[j].KafkaDataCentreID = cdcID } + } + } - id, err = r.API.CreateCluster(instaclustr.CassandraEndpoint, iCassandraSpec) - if err != nil { - l.Error( - err, "Cannot create cluster", - "cluster spec", c.Spec, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Cluster creation on the Instaclustr is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + return nil +} - r.EventRecorder.Eventf( - c, models.Normal, models.Created, - "Cluster creation request is sent. Cluster ID: %s", - id, - ) - } +func (r *CassandraReconciler) createCassandra(c *v1beta1.Cassandra, l logr.Logger) (*models.CassandraCluster, error) { + l.Info( + "Creating cluster", + "cluster name", c.Spec.Name, + "data centres", c.Spec.DataCentres, + ) - c.Status.ID = id - err = r.Status().Patch(ctx, c, patch) - if err != nil { - l.Error(err, "Cannot patch cluster status", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - "kind", c.Kind, - "api Version", c.APIVersion, - "namespace", c.Namespace, - "cluster metadata", c.ObjectMeta, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.PatchFailed, - "Cluster resource status patch is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + iCassandraSpec := c.Spec.ToInstAPI() - controllerutil.AddFinalizer(c, models.DeletionFinalizer) - c.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent - err = r.Patch(ctx, c, patch) - if err != nil { - l.Error(err, "Cannot patch cluster", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - "kind", c.Kind, - "api Version", c.APIVersion, - "namespace", c.Namespace, - "cluster metadata", c.ObjectMeta, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.PatchFailed, - "Cluster resource patch is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + err := r.mergeDebezium(c, iCassandraSpec) + if err != nil { + l.Error(err, "Cannot get debezium dependencies for Cassandra cluster") + return nil, err + } - l.Info( - "Cluster has been created", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - "kind", c.Kind, - "api Version", c.APIVersion, - "namespace", c.Namespace, + b, err := r.API.CreateClusterRaw(instaclustr.CassandraEndpoint, iCassandraSpec) + if err != nil { + l.Error( + err, "Cannot create cluster", + "cluster spec", c.Spec, + ) + r.EventRecorder.Eventf( + c, models.Warning, models.CreationFailed, + "Cluster creation on the Instaclustr is failed. Reason: %v", + err, ) + return nil, err } - if c.Status.State != models.DeletedStatus { - err = r.startClusterStatusJob(c) - if err != nil { - l.Error(err, "Cannot start cluster status job", - "c cluster ID", c.Status.ID) + var instModel models.CassandraCluster + err = json.Unmarshal(b, &instModel) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal json to model, err: %w", err) + } - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + r.EventRecorder.Eventf( + c, models.Normal, models.Created, + "Cluster creation request is sent. Cluster ID: %s", + instModel.ID, + ) - r.EventRecorder.Eventf( - c, models.Normal, models.Created, - "Cluster status check job is started", - ) + return &instModel, nil +} + +func (r *CassandraReconciler) createCluster(ctx context.Context, c *v1beta1.Cassandra, l logr.Logger) error { + var instModel *models.CassandraCluster + var err error + + if c.Spec.HasRestore() { + instModel, err = r.createCassandraFromRestore(c, l) + } else { + instModel, err = r.createCassandra(c, l) + } + if err != nil { + return fmt.Errorf("failed to create cluster, err: %w", err) } - if c.Spec.OnPremisesSpec != nil && c.Spec.OnPremisesSpec.EnableAutomation { - iData, err := r.API.GetCassandra(c.Status.ID) - if err != nil { - l.Error(err, "Cannot get cluster from the Instaclustr API", - "cluster name", c.Spec.Name, - "data centres", c.Spec.DataCentres, - "cluster ID", c.Status.ID, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.FetchFailed, - "Cluster fetch from the Instaclustr API is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - iCassandra, err := c.FromInstAPI(iData) - if err != nil { - l.Error( - err, "Cannot convert cluster from the Instaclustr API", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.ConversionFailed, - "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + c.Spec.FromInstAPI(instModel) + err = r.Update(ctx, c) + if err != nil { + return fmt.Errorf("failed to update cassandra spec, err: %w", err) + } - bootstrap := newOnPremisesBootstrap( - r.Client, - c, - r.EventRecorder, - iCassandra.Status.ClusterStatus, - c.Spec.OnPremisesSpec, - newExposePorts(c.GetExposePorts()), - c.GetHeadlessPorts(), - c.Spec.PrivateNetworkCluster, - ) + c.Status.FromInstAPI(instModel) + err = r.Status().Update(ctx, c) + if err != nil { + return fmt.Errorf("failed to update cassandra status, err: %w", err) + } - err = handleCreateOnPremisesClusterResources(ctx, bootstrap) - if err != nil { - l.Error( - err, "Cannot create resources for on-premises cluster", - "cluster spec", c.Spec.OnPremisesSpec, - ) - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "Resources creation for on-premises cluster is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + controllerutil.AddFinalizer(c, models.DeletionFinalizer) + c.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent + err = r.Update(ctx, c) + if err != nil { + return err + } - err = r.startClusterOnPremisesIPsJob(c, bootstrap) - if err != nil { - l.Error(err, "Cannot start on-premises cluster IPs check job", - "cluster ID", c.Status.ID, - ) + l.Info( + "Cluster has been created", + "cluster name", c.Spec.Name, + "cluster ID", c.Status.ID, + ) - r.EventRecorder.Eventf( - c, models.Warning, models.CreationFailed, - "On-premises cluster IPs check job is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + return nil +} - l.Info( - "On-premises resources have been created", - "cluster name", c.Spec.Name, - "on-premises Spec", c.Spec.OnPremisesSpec, - "cluster ID", c.Status.ID, +func (r *CassandraReconciler) startClusterJobs(c *v1beta1.Cassandra, l logr.Logger) error { + err := r.startSyncJob(c) + if err != nil { + l.Error(err, "Cannot start cluster synchronizer", + "c cluster ID", c.Status.ID) + + r.EventRecorder.Eventf( + c, models.Warning, models.CreationFailed, + "Cluster status check job is failed. Reason: %v", + err, ) - return models.ExitReconcile, nil + return err } + r.EventRecorder.Eventf( + c, models.Normal, models.Created, + "Cluster status check job is started", + ) + err = r.startClusterBackupsJob(c) if err != nil { l.Error(err, "Cannot start cluster backups check job", @@ -353,7 +284,7 @@ func (r *CassandraReconciler) handleCreateCluster( "Cluster backups check job is failed. Reason: %v", err, ) - return reconcile.Result{}, err + return err } r.EventRecorder.Eventf( @@ -367,31 +298,24 @@ func (r *CassandraReconciler) handleCreateCluster( l.Error(err, "Failed to start user creation job") r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, "User creation job is failed. Reason: %v", err) - return reconcile.Result{}, err + return err } r.EventRecorder.Event(c, models.Normal, models.Created, "Cluster user creation job is started") } - return models.ExitReconcile, nil + return nil } -func (r *CassandraReconciler) handleUpdateCluster( - ctx context.Context, - l logr.Logger, - req ctrl.Request, - c *v1beta1.Cassandra, -) (reconcile.Result, error) { - l = l.WithName("Cassandra update event") - - iData, err := r.API.GetCassandra(c.Status.ID) +func (r *CassandraReconciler) handleOnPremises(c *v1beta1.Cassandra, l logr.Logger) (reconcile.Result, error) { + instaModel, err := r.API.GetCassandra(c.Status.ID) if err != nil { l.Error(err, "Cannot get cluster from the Instaclustr API", "cluster name", c.Spec.Name, + "data centres", c.Spec.DataCentres, "cluster ID", c.Status.ID, ) - r.EventRecorder.Eventf( c, models.Warning, models.FetchFailed, "Cluster fetch from the Instaclustr API is failed. Reason: %v", @@ -400,30 +324,120 @@ func (r *CassandraReconciler) handleUpdateCluster( return reconcile.Result{}, err } - iCassandra, err := c.FromInstAPI(iData) + iCassandra := v1beta1.Cassandra{} + iCassandra.FromInstAPI(instaModel) + + bootstrap := newOnPremisesBootstrap( + r.Client, + c, + r.EventRecorder, + iCassandra.Status.ToOnPremises(), + c.Spec.OnPremisesSpec, + newExposePorts(c.GetExposePorts()), + c.GetHeadlessPorts(), + c.Spec.PrivateNetwork, + ) + + err = handleCreateOnPremisesClusterResources(context.Background(), bootstrap) if err != nil { l.Error( - err, "Cannot convert cluster from the Instaclustr API", + err, "Cannot create resources for on-premises cluster", + "cluster spec", c.Spec.OnPremisesSpec, + ) + r.EventRecorder.Eventf( + c, models.Warning, models.CreationFailed, + "Resources creation for on-premises cluster is failed. Reason: %v", + err, + ) + return reconcile.Result{}, err + } + + err = r.startClusterOnPremisesIPsJob(c, bootstrap) + if err != nil { + l.Error(err, "Cannot start on-premises cluster IPs check job", + "cluster ID", c.Status.ID, + ) + + r.EventRecorder.Eventf( + c, models.Warning, models.CreationFailed, + "On-premises cluster IPs check job is failed. Reason: %v", + err, + ) + return reconcile.Result{}, err + } + + l.Info( + "On-premises resources have been created", + "cluster name", c.Spec.Name, + "on-premises Spec", c.Spec.OnPremisesSpec, + "cluster ID", c.Status.ID, + ) + + return models.ExitReconcile, nil +} + +func (r *CassandraReconciler) handleCreateCluster( + ctx context.Context, + l logr.Logger, + c *v1beta1.Cassandra, +) (reconcile.Result, error) { + l = l.WithName("Cassandra creation event") + if c.Status.ID == "" { + err := r.createCluster(ctx, c, l) + if err != nil { + r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, "Failed to create cluster. Reason: %v", err) + return reconcile.Result{}, err + } + } + + if c.Status.State != models.DeletedStatus { + err := r.startClusterJobs(c, l) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to start cluster jobs, err: %w", err) + } + + if c.Spec.OnPremisesSpec != nil && c.Spec.OnPremisesSpec.EnableAutomation { + return r.handleOnPremises(c, l) + } + } + + return models.ExitReconcile, nil +} + +func (r *CassandraReconciler) handleUpdateCluster( + ctx context.Context, + l logr.Logger, + req ctrl.Request, + c *v1beta1.Cassandra, +) (reconcile.Result, error) { + l = l.WithName("Cassandra update event") + + instaModel, err := r.API.GetCassandra(c.Status.ID) + if err != nil { + l.Error(err, "Cannot get cluster from the Instaclustr API", "cluster name", c.Spec.Name, "cluster ID", c.Status.ID, ) r.EventRecorder.Eventf( - c, models.Warning, models.ConversionFailed, - "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", + c, models.Warning, models.FetchFailed, + "Cluster fetch from the Instaclustr API is failed. Reason: %v", err, ) return reconcile.Result{}, err } + iCassandra := v1beta1.Cassandra{} + iCassandra.FromInstAPI(instaModel) + if c.Annotations[models.ExternalChangesAnnotation] == models.True || r.RateLimiter.NumRequeues(req) == rlimiter.DefaultMaxTries { - return handleExternalChanges[v1beta1.CassandraSpec](r.EventRecorder, r.Client, c, iCassandra, l) + return handleExternalChanges[v1beta1.CassandraSpec](r.EventRecorder, r.Client, c, &iCassandra, l) } patch := c.NewPatch() - if c.Spec.ClusterSettingsNeedUpdate(iCassandra.Spec.Cluster) { + if c.Spec.ClusterSettingsNeedUpdate(&iCassandra.Spec.GenericClusterSpec) { l.Info("Updating cluster settings", "instaclustr description", iCassandra.Spec.Description, "instaclustr two factor delete", iCassandra.Spec.TwoFactorDelete) @@ -664,7 +678,7 @@ func (r *CassandraReconciler) handleDeleteCluster( return models.ExitReconcile, nil } -func (r *CassandraReconciler) startClusterStatusJob(c *v1beta1.Cassandra) error { +func (r *CassandraReconciler) startSyncJob(c *v1beta1.Cassandra) error { job := r.newWatchStatusJob(c) err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) @@ -722,7 +736,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. return nil } - iData, err := r.API.GetCassandra(c.Status.ID) + instaModel, err := r.API.GetCassandra(c.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { if c.DeletionTimestamp != nil { @@ -738,40 +752,32 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. return err } - iCassandra, err := c.FromInstAPI(iData) - if err != nil { - l.Error(err, "Cannot convert cluster from the Instaclustr API", - "cluster name", c.Spec.Name, - "cluster ID", c.Status.ID, - ) - return err - } + iCassandra := v1beta1.Cassandra{} + iCassandra.FromInstAPI(instaModel) - if !areStatusesEqual(&iCassandra.Status.ClusterStatus, &c.Status.ClusterStatus) { - l.Info("Updating cluster status", - "status from Instaclustr", iCassandra.Status.ClusterStatus, - "status from k8s", c.Status.ClusterStatus) + if !c.Status.Equals(&iCassandra.Status) { + l.Info("Updating cluster status") - areDCsEqual := areDataCentresEqual(iCassandra.Status.ClusterStatus.DataCentres, c.Status.ClusterStatus.DataCentres) + dcEqual := c.Status.DataCentresEqual(&iCassandra.Status) patch := c.NewPatch() - c.Status.ClusterStatus = iCassandra.Status.ClusterStatus + c.Status.FromInstAPI(instaModel) err = r.Status().Patch(context.Background(), c, patch) if err != nil { return err } - if !areDCsEqual { + if !dcEqual { var nodes []*v1beta1.Node - for _, dc := range iCassandra.Status.ClusterStatus.DataCentres { + for _, dc := range iCassandra.Status.DataCentres { nodes = append(nodes, dc.Nodes...) } err = exposeservice.Create(r.Client, c.Name, c.Namespace, - c.Spec.PrivateNetworkCluster, + c.Spec.PrivateNetwork, nodes, models.CassandraConnectionPort) if err != nil { @@ -780,7 +786,7 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. } } - equals := c.Spec.IsEqual(iCassandra.Spec) + equals := c.Spec.IsEqual(&iCassandra.Spec) if equals && c.Annotations[models.ExternalChangesAnnotation] == models.True { patch := c.NewPatch() @@ -1069,7 +1075,7 @@ func (r *CassandraReconciler) reconcileMaintenanceEvents(ctx context.Context, c return err } - if !c.Status.AreMaintenanceEventStatusesEqual(iMEStatuses) { + if !c.Status.MaintenanceEventsEqual(iMEStatuses) { patch := c.NewPatch() c.Status.MaintenanceEvents = iMEStatuses err = r.Status().Patch(ctx, c, patch) @@ -1134,6 +1140,10 @@ func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error { newObj := event.ObjectNew.(*v1beta1.Cassandra) + if newObj.Status.ID == "" && newObj.Annotations[models.ResourceStateAnnotation] == models.CreatingEvent { + return false + } + if newObj.Status.ID == "" { newObj.Annotations[models.ResourceStateAnnotation] = models.CreatingEvent return true diff --git a/controllers/clusters/opensearch_controller.go b/controllers/clusters/opensearch_controller.go index 7ecd4cc25..89bb81f54 100644 --- a/controllers/clusters/opensearch_controller.go +++ b/controllers/clusters/opensearch_controller.go @@ -224,11 +224,11 @@ func (r *OpenSearchReconciler) createCluster(ctx context.Context, o *v1beta1.Ope func (r *OpenSearchReconciler) startClusterJobs(o *v1beta1.OpenSearch) error { err := r.startClusterSyncJob(o) if err != nil { - return fmt.Errorf("failed to start cluster status job, err: %w", err) + return fmt.Errorf("failed to start cluster sync job, err: %w", err) } r.EventRecorder.Event(o, models.Normal, models.Created, - "Cluster status check job is started") + "Cluster sync job is started") err = r.startClusterBackupsJob(o) if err != nil { @@ -587,6 +587,8 @@ func (r *OpenSearchReconciler) newWatchStatusJob(o *v1beta1.OpenSearch) schedule if !iO.Status.Equals(&o.Status) { l.Info("Updating OpenSearch cluster status", "old", o.Status, "new", iO.Status) + dcEqual := o.Status.DataCentreEquals(&iO.Status) + patch := o.NewPatch() o.Status.FromInstAPI(instaModel) err = r.Status().Patch(context.Background(), o, patch) @@ -599,7 +601,7 @@ func (r *OpenSearchReconciler) newWatchStatusJob(o *v1beta1.OpenSearch) schedule return err } - if !o.Status.DataCentreEquals(&iO.Status) { + if !dcEqual { var nodes []*v1beta1.Node for _, dc := range iO.Status.DataCentres { diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index a13f737ab..27acd4645 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -351,7 +351,7 @@ func (c *Client) DeleteRedisUser(id string) error { return nil } -func (c *Client) GetCassandra(id string) ([]byte, error) { +func (c *Client) GetCassandra(id string) (*models.CassandraCluster, error) { url := c.serverHostname + CassandraEndpoint + id resp, err := c.DoRequest(url, http.MethodGet, nil) @@ -373,7 +373,13 @@ func (c *Client) GetCassandra(id string) ([]byte, error) { return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) } - return body, nil + var instModel models.CassandraCluster + err = json.Unmarshal(body, &instModel) + if err != nil { + return nil, err + } + + return &instModel, nil } func (c *Client) UpdateCassandra(id string, cassandra models.CassandraClusterAPIUpdate) error { diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index 23526d50f..8257d79b3 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -79,7 +79,7 @@ type API interface { GetKafkaACLStatus(kafkaACLID, kafkaACLEndpoint string) (*kafkamanagementv1beta1.KafkaACLStatus, error) DeleteKafkaACL(kafkaACLID, kafkaACLEndpoint string) error UpdateKafkaACL(kafkaACLID, kafkaACLEndpoint string, kafkaACLSpec any) error - GetCassandra(id string) ([]byte, error) + GetCassandra(id string) (*models.CassandraCluster, error) UpdateCassandra(id string, cassandra models.CassandraClusterAPIUpdate) error GetKafka(id string) ([]byte, error) GetKafkaConnect(id string) ([]byte, error) diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index 0082d757d..14d79d4de 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -294,7 +294,7 @@ func (c *mockClient) RestoreCluster(restoreData any, clusterKind string) (string panic("RestoreCluster: is not implemented") } -func (c *mockClient) GetCassandra(id string) ([]byte, error) { +func (c *mockClient) GetCassandra(id string) (*models.CassandraCluster, error) { panic("GetCassandra: is not implemented") } diff --git a/pkg/models/cassandra_apiv2.go b/pkg/models/cassandra_apiv2.go index 4cf029781..48a6f37ec 100644 --- a/pkg/models/cassandra_apiv2.go +++ b/pkg/models/cassandra_apiv2.go @@ -17,30 +17,31 @@ limitations under the License. package models type CassandraCluster struct { - ClusterStatus - CassandraVersion string `json:"cassandraVersion"` - LuceneEnabled bool `json:"luceneEnabled"` - PasswordAndUserAuth bool `json:"passwordAndUserAuth"` - DataCentres []*CassandraDataCentre `json:"dataCentres"` - Name string `json:"name"` - SLATier string `json:"slaTier"` - PrivateNetworkCluster bool `json:"privateNetworkCluster"` - PCIComplianceMode bool `json:"pciComplianceMode"` - TwoFactorDelete []*TwoFactorDelete `json:"twoFactorDelete,omitempty"` - BundledUseOnly bool `json:"bundledUseOnly,omitempty"` - ResizeSettings []*ResizeSettings `json:"resizeSettings"` - Description string `json:"description,omitempty"` + GenericClusterFields `json:",inline"` + + CassandraVersion string `json:"cassandraVersion"` + LuceneEnabled bool `json:"luceneEnabled"` + PasswordAndUserAuth bool `json:"passwordAndUserAuth"` + BundledUseOnly bool `json:"bundledUseOnly,omitempty"` + + DataCentres []*CassandraDataCentre `json:"dataCentres"` + ResizeSettings []*ResizeSettings `json:"resizeSettings"` } type CassandraDataCentre struct { - DataCentre `json:",inline"` - ReplicationFactor int `json:"replicationFactor"` - ContinuousBackup bool `json:"continuousBackup"` - PrivateLink bool `json:"privateLink,omitempty"` - PrivateIPBroadcastForDiscovery bool `json:"privateIpBroadcastForDiscovery"` - ClientToClusterEncryption bool `json:"clientToClusterEncryption"` - Debezium []*Debezium `json:"debezium,omitempty"` - ShotoverProxy []*ShotoverProxy `json:"shotoverProxy,omitempty"` + GenericDataCentreFields `json:",inline"` + + ContinuousBackup bool `json:"continuousBackup"` + PrivateLink bool `json:"privateLink,omitempty"` + PrivateIPBroadcastForDiscovery bool `json:"privateIpBroadcastForDiscovery"` + ClientToClusterEncryption bool `json:"clientToClusterEncryption"` + ReplicationFactor int `json:"replicationFactor"` + NumberOfNodes int `json:"numberOfNodes"` + NodeSize string `json:"nodeSize"` + + Nodes []*Node `json:"nodes,omitempty"` + Debezium []*Debezium `json:"debezium,omitempty"` + ShotoverProxy []*ShotoverProxy `json:"shotoverProxy,omitempty"` } type Debezium struct { diff --git a/pkg/utils/slices/slices.go b/pkg/utils/slices/slices.go new file mode 100644 index 000000000..2d9a25c4e --- /dev/null +++ b/pkg/utils/slices/slices.go @@ -0,0 +1,73 @@ +package slices + +func Equals[S ~[]T, T comparable](s1, s2 S) bool { + if len(s1) != len(s2) { + return false + } + + for i := range s1 { + if s1[i] != s2[i] { + return false + } + } + + return true +} + +func EqualsPtr[S ~[]*T, T comparable](s1, s2 S) bool { + if len(s1) != len(s2) { + return false + } + + for i := range s1 { + if *s1[i] != *s2[i] { + return false + } + } + + return true +} + +func EqualsUnordered[S ~[]T, T comparable](s1, s2 S) bool { + if len(s1) != len(s2) { + return false + } + + mapS1 := map[T]int{} + for _, el := range s1 { + mapS1[el]++ + } + + for _, el := range s2 { + count, ok := mapS1[el] + if !ok || count == 0 { + return false + } + + mapS1[el]-- + } + + return true +} + +func EqualsUnorderedPtr[S ~[]*T, T comparable](s1, s2 S) bool { + if len(s1) != len(s2) { + return false + } + + mapS1 := map[T]int{} + for _, el := range s1 { + mapS1[*el]++ + } + + for _, el := range s2 { + count, ok := mapS1[*el] + if !ok || count == 0 { + return false + } + + mapS1[*el]-- + } + + return true +} diff --git a/pkg/utils/slices/slices_test.go b/pkg/utils/slices/slices_test.go new file mode 100644 index 000000000..a95556eb9 --- /dev/null +++ b/pkg/utils/slices/slices_test.go @@ -0,0 +1,300 @@ +package slices + +import "testing" + +func TestEqualsUnordered(t *testing.T) { + type args[T comparable] struct { + s1 []T + s2 []T + } + type testCase[T comparable] struct { + name string + args args[T] + want bool + } + + tests := []testCase[int]{ + { + name: "empty", + args: args[int]{}, + want: true, + }, + { + name: "first is empty", + args: args[int]{ + s2: []int{1}, + }, + want: false, + }, + { + name: "second is empty", + args: args[int]{ + s1: []int{1}, + }, + want: false, + }, + { + name: "length differs", + args: args[int]{ + s1: []int{1}, + s2: []int{1, 1}, + }, + want: false, + }, + { + name: "different values", + args: args[int]{ + s1: []int{1}, + s2: []int{2}, + }, + want: false, + }, + { + name: "equals with the same order", + args: args[int]{ + s1: []int{1, 2, 3}, + s2: []int{1, 2, 3}, + }, + want: true, + }, + { + name: "equals but unordered", + args: args[int]{ + s1: []int{1, 2, 3}, + s2: []int{2, 3, 1}, + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := EqualsUnordered(tt.args.s1, tt.args.s2); got != tt.want { + t.Errorf("EqualsUnordered() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestEquals(t *testing.T) { + type args[T comparable] struct { + s1 []T + s2 []T + } + type testCase[T comparable] struct { + name string + args args[T] + want bool + } + + tests := []testCase[int]{ + { + name: "empty", + args: args[int]{}, + want: true, + }, + { + name: "first is empty", + args: args[int]{ + s2: []int{1}, + }, + want: false, + }, + { + name: "second is empty", + args: args[int]{ + s1: []int{1}, + }, + want: false, + }, + { + name: "length differs", + args: args[int]{ + s1: []int{1}, + s2: []int{1, 1}, + }, + want: false, + }, + { + name: "different values", + args: args[int]{ + s1: []int{1}, + s2: []int{2}, + }, + want: false, + }, + { + name: "equals with the same order", + args: args[int]{ + s1: []int{1, 2, 3}, + s2: []int{1, 2, 3}, + }, + want: true, + }, + { + name: "unordered", + args: args[int]{ + s1: []int{1, 2, 3}, + s2: []int{2, 3, 1}, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := Equals(tt.args.s1, tt.args.s2); got != tt.want { + t.Errorf("Equals() = %v, want %v", got, tt.want) + } + }) + } +} + +func ptrOf[T any](values ...T) []*T { + var ptrs []*T + for i := 0; i < len(values); i++ { + ptrs = append(ptrs, &values[i]) + } + + return ptrs +} + +func TestEqualsPtr(t *testing.T) { + type args[T comparable] struct { + s1 []*T + s2 []*T + } + type testCase[T comparable] struct { + name string + args args[T] + want bool + } + + tests := []testCase[int]{ + { + name: "empty", + args: args[int]{}, + want: true, + }, + { + name: "same values", + args: args[int]{ + s1: ptrOf(1), + s2: ptrOf(1), + }, + want: true, + }, + { + name: "different values", + args: args[int]{ + s1: ptrOf(1), + s2: ptrOf(2), + }, + want: false, + }, + { + name: "different length", + args: args[int]{ + s1: ptrOf(1), + s2: ptrOf(1, 1), + }, + want: false, + }, + { + name: "equals", + args: args[int]{ + s1: ptrOf(1, 2, 3), + s2: ptrOf(1, 2, 3), + }, + want: true, + }, + { + name: "unordered", + args: args[int]{ + s1: ptrOf(1, 2, 3), + s2: ptrOf(3, 2, 1), + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := EqualsPtr(tt.args.s1, tt.args.s2); got != tt.want { + t.Errorf("EqualsPtr() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestEqualsUnorderedPtr(t *testing.T) { + type args[T comparable] struct { + s1 []*T + s2 []*T + } + type testCase[T comparable] struct { + name string + args args[T] + want bool + } + tests := []testCase[int]{ + { + name: "empty", + args: args[int]{}, + want: true, + }, + { + name: "first is empty", + args: args[int]{ + s2: ptrOf(1), + }, + want: false, + }, + { + name: "second is empty", + args: args[int]{ + s1: ptrOf(1), + }, + want: false, + }, + { + name: "length differs", + args: args[int]{ + s1: ptrOf(1), + s2: ptrOf(1, 1), + }, + want: false, + }, + { + name: "different values", + args: args[int]{ + s1: ptrOf(1), + s2: ptrOf(2), + }, + want: false, + }, + { + name: "equals with the same order", + args: args[int]{ + s1: ptrOf(1, 2, 3), + s2: ptrOf(1, 2, 3), + }, + want: true, + }, + { + name: "equals but unordered", + args: args[int]{ + s1: ptrOf(1, 2, 3), + s2: ptrOf(2, 3, 1), + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := EqualsUnorderedPtr(tt.args.s1, tt.args.s2); got != tt.want { + t.Errorf("EqualsUnorderedPtr() = %v, want %v", got, tt.want) + } + }) + } +}