From 49d0b34381fc4dc936339f2f0037a3701aae7235 Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Tue, 13 Feb 2024 14:29:02 +0200 Subject: [PATCH] KafkaConnect code base refactor --- .secrets.baseline | 6 +- apis/clusters/v1beta1/cassandra_types.go | 59 +-- apis/clusters/v1beta1/cassandra_webhook.go | 2 + apis/clusters/v1beta1/generic_spec.go | 7 - apis/clusters/v1beta1/kafka_types.go | 40 +- apis/clusters/v1beta1/kafka_webhook.go | 2 + apis/clusters/v1beta1/kafkaconnect_types.go | 349 +++++++++--------- .../v1beta1/kafkaconnect_types_test.go | 72 ++++ apis/clusters/v1beta1/kafkaconnect_webhook.go | 24 +- apis/clusters/v1beta1/opensearch_types.go | 3 + apis/clusters/v1beta1/opensearch_webhook.go | 2 + apis/clusters/v1beta1/redis_types.go | 40 +- apis/clusters/v1beta1/redis_webhook.go | 2 + apis/clusters/v1beta1/structs.go | 35 ++ apis/clusters/v1beta1/validation.go | 1 - .../clusters/v1beta1/zz_generated.deepcopy.go | 49 ++- .../clusters.instaclustr.com_cassandras.yaml | 4 - ...lusters.instaclustr.com_kafkaconnects.yaml | 153 +++++--- .../clusters.instaclustr.com_kafkas.yaml | 4 - ...clusters.instaclustr.com_opensearches.yaml | 4 - .../bases/clusters.instaclustr.com_redis.yaml | 4 - .../samples/clusters_v1beta1_cassandra.yaml | 20 +- config/samples/clusters_v1beta1_kafka.yaml | 18 +- .../clusters_v1beta1_kafkaconnect.yaml | 6 +- controllers/clusterresources/helpers.go | 6 +- controllers/clusters/cadence_controller.go | 9 +- .../datatest/kafkaconnect_v1beta1.yaml | 2 +- .../clusters/kafkaconnect_controller.go | 241 ++++++------ .../clusters/kafkaconnect_controller_test.go | 4 +- pkg/instaclustr/client.go | 10 +- pkg/instaclustr/interfaces.go | 2 +- pkg/instaclustr/mock/client.go | 2 +- pkg/models/apiv2_generic.go | 1 - pkg/models/cassandra_apiv2.go | 1 + pkg/models/kafka_apiv2.go | 1 + pkg/models/kafka_connect_apiv2.go | 28 +- pkg/models/opensearch_apiv2.go | 1 + pkg/models/redis_apiv2.go | 1 + 38 files changed, 661 insertions(+), 554 deletions(-) create mode 100644 apis/clusters/v1beta1/kafkaconnect_types_test.go diff --git a/.secrets.baseline b/.secrets.baseline index 014d253e1..a68c86679 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -185,14 +185,14 @@ "filename": "apis/clusters/v1beta1/cassandra_types.go", "hashed_secret": "331cc743251c3b9504229de4d139c539da121a33", "is_verified": false, - "line_number": 261 + "line_number": 238 }, { "type": "Secret Keyword", "filename": "apis/clusters/v1beta1/cassandra_types.go", "hashed_secret": "0ad8d7005e084d4f028a4277b73c6fab24269c17", "is_verified": false, - "line_number": 347 + "line_number": 324 }, { "type": "Secret Keyword", @@ -1146,5 +1146,5 @@ } ] }, - "generated_at": "2024-02-19T13:01:03Z" + "generated_at": "2024-02-19T13:43:58Z" } diff --git a/apis/clusters/v1beta1/cassandra_types.go b/apis/clusters/v1beta1/cassandra_types.go index 5808ee3f9..438b2eace 100644 --- a/apis/clusters/v1beta1/cassandra_types.go +++ b/apis/clusters/v1beta1/cassandra_types.go @@ -58,6 +58,7 @@ type CassandraSpec struct { LuceneEnabled bool `json:"luceneEnabled,omitempty"` PasswordAndUserAuth bool `json:"passwordAndUserAuth,omitempty"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` + PCICompliance bool `json:"pciCompliance,omitempty"` UserRefs References `json:"userRefs,omitempty"` ResizeSettings GenericResizeSettings `json:"resizeSettings,omitempty"` } @@ -147,31 +148,7 @@ type CassandraDataCentreStatus struct { func (s *CassandraDataCentreStatus) Equals(o *CassandraDataCentreStatus) bool { return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) && - s.nodesEqual(o.Nodes) -} - -func (s *CassandraDataCentreStatus) nodesEqual(nodes []*Node) bool { - if len(s.Nodes) != len(nodes) { - return false - } - - sNodes := map[string]*Node{} - for _, node := range s.Nodes { - sNodes[node.ID] = node - } - - for _, node := range nodes { - sNode, ok := sNodes[node.ID] - if !ok { - return false - } - - if !sNode.Equals(node) { - return false - } - } - - return true + nodesEqual(s.Nodes, o.Nodes) } func (s *CassandraDataCentreStatus) FromInstAPI(instModel *models.CassandraDataCentre) { @@ -347,18 +324,36 @@ func (cs *CassandraSpec) FromInstAPI(instModel *models.CassandraCluster) { cs.PasswordAndUserAuth = instModel.PasswordAndUserAuth cs.BundledUseOnly = instModel.BundledUseOnly cs.Version = instModel.CassandraVersion + cs.PCICompliance = instModel.PCIComplianceMode cs.ResizeSettings.FromInstAPI(instModel.ResizeSettings) cs.dcsFromInstAPI(instModel.DataCentres) } func (cs *CassandraSpec) dcsFromInstAPI(instModels []*models.CassandraDataCentre) { - cs.DataCentres = make([]*CassandraDataCentre, len(instModels)) + dcs := make([]*CassandraDataCentre, len(instModels)) for i, instModel := range instModels { dc := &CassandraDataCentre{} + dcs[i] = dc + + if index := cs.getDC(instModel.Name); index > -1 { + dc.Debezium = cs.DataCentres[index].Debezium + } + dc.FromInstAPI(instModel) - cs.DataCentres[i] = dc } + + cs.DataCentres = dcs +} + +func (c *CassandraSpec) getDC(name string) int { + for i, dc := range c.DataCentres { + if dc.Name == name { + return i + } + } + + return -1 } func (d *CassandraDataCentre) FromInstAPI(instModel *models.CassandraDataCentre) { @@ -377,15 +372,20 @@ func (d *CassandraDataCentre) FromInstAPI(instModel *models.CassandraDataCentre) } func (cs *CassandraDataCentre) debeziumFromInstAPI(instModels []*models.Debezium) { - cs.Debezium = make([]*DebeziumCassandraSpec, len(instModels)) + debezium := make([]*DebeziumCassandraSpec, len(instModels)) for i, instModel := range instModels { - cs.Debezium[i] = &DebeziumCassandraSpec{ + debezium[i] = &DebeziumCassandraSpec{ KafkaVPCType: instModel.KafkaVPCType, KafkaTopicPrefix: instModel.KafkaTopicPrefix, KafkaDataCentreID: instModel.KafkaDataCentreID, Version: instModel.Version, } + + if len(cs.Debezium)-1 >= i { + debezium[i].ClusterRef = cs.Debezium[i].ClusterRef + } } + cs.Debezium = debezium } func (cs *CassandraDataCentre) shotoverProxyFromInstAPI(instModels []*models.ShotoverProxy) { @@ -411,6 +411,7 @@ func (cs *CassandraSpec) ToInstAPI() *models.CassandraCluster { LuceneEnabled: cs.LuceneEnabled, PasswordAndUserAuth: cs.PasswordAndUserAuth, BundledUseOnly: cs.BundledUseOnly, + PCIComplianceMode: cs.PCICompliance, DataCentres: cs.DCsToInstAPI(), ResizeSettings: cs.ResizeSettings.ToInstAPI(), } diff --git a/apis/clusters/v1beta1/cassandra_webhook.go b/apis/clusters/v1beta1/cassandra_webhook.go index 7f8d0cb01..7e0a4ec15 100644 --- a/apis/clusters/v1beta1/cassandra_webhook.go +++ b/apis/clusters/v1beta1/cassandra_webhook.go @@ -213,6 +213,7 @@ type immutableCassandraFields struct { type specificCassandra struct { LuceneEnabled bool PasswordAndUserAuth bool + PCICompliance bool } type immutableCassandraDCFields struct { @@ -233,6 +234,7 @@ func (cs *CassandraSpec) newImmutableFields() *immutableCassandraFields { specificCassandra: specificCassandra{ LuceneEnabled: cs.LuceneEnabled, PasswordAndUserAuth: cs.PasswordAndUserAuth, + PCICompliance: cs.PCICompliance, }, immutableCluster: cs.GenericClusterSpec.immutableFields(), } diff --git a/apis/clusters/v1beta1/generic_spec.go b/apis/clusters/v1beta1/generic_spec.go index 55aef4011..d2ea520c5 100644 --- a/apis/clusters/v1beta1/generic_spec.go +++ b/apis/clusters/v1beta1/generic_spec.go @@ -11,10 +11,6 @@ type GenericClusterSpec struct { Version string `json:"version,omitempty"` - // The PCI compliance standards relate to the security of user data and transactional information. - // Can only be applied clusters provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch and Redis. - PCICompliance bool `json:"pciCompliance,omitempty"` - PrivateNetwork bool `json:"privateNetwork,omitempty"` // Non-production clusters may receive lower priority support and reduced SLAs. @@ -30,7 +26,6 @@ type GenericClusterSpec struct { func (s *GenericClusterSpec) Equals(o *GenericClusterSpec) bool { return s.Name == o.Name && s.Version == o.Version && - s.PCICompliance == o.PCICompliance && s.PrivateNetwork == o.PrivateNetwork && s.SLATier == o.SLATier && s.Description == o.Description && @@ -39,7 +34,6 @@ func (s *GenericClusterSpec) Equals(o *GenericClusterSpec) bool { func (s *GenericClusterSpec) FromInstAPI(model *models.GenericClusterFields) { s.Name = model.Name - s.PCICompliance = model.PCIComplianceMode s.PrivateNetwork = model.PrivateNetworkCluster s.SLATier = model.SLATier s.Description = model.Description @@ -60,7 +54,6 @@ func (s *GenericClusterSpec) ToInstAPI() models.GenericClusterFields { return models.GenericClusterFields{ Name: s.Name, Description: s.Description, - PCIComplianceMode: s.PCICompliance, PrivateNetworkCluster: s.PrivateNetwork, SLATier: s.SLATier, TwoFactorDelete: s.TwoFactorDeleteToInstAPI(), diff --git a/apis/clusters/v1beta1/kafka_types.go b/apis/clusters/v1beta1/kafka_types.go index 3ca495dff..e1f6ee8ab 100644 --- a/apis/clusters/v1beta1/kafka_types.go +++ b/apis/clusters/v1beta1/kafka_types.go @@ -72,6 +72,7 @@ type KafkaSpec struct { ClientToClusterEncryption bool `json:"clientToClusterEncryption"` ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` + PCICompliance bool `json:"pciCompliance,omitempty"` UserRefs References `json:"userRefs,omitempty"` // Provision additional dedicated nodes for Apache Zookeeper to run on. @@ -133,7 +134,7 @@ type KafkaDataCentreStatus struct { func (s *KafkaDataCentreStatus) Equals(o *KafkaDataCentreStatus) bool { return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) && - s.nodesEqual(o.Nodes) && + nodesEqual(s.Nodes, o.Nodes) && slices.EqualsPtr(s.PrivateLink, o.PrivateLink) } @@ -178,30 +179,6 @@ func (s *KafkaStatus) ToOnPremises() ClusterStatus { } } -func (s *KafkaDataCentreStatus) nodesEqual(nodes []*Node) bool { - if len(s.Nodes) != len(nodes) { - return false - } - - sNodes := map[string]*Node{} - for _, node := range s.Nodes { - sNodes[node.ID] = node - } - - for _, node := range nodes { - sNode, ok := sNodes[node.ID] - if !ok { - return false - } - - if !sNode.Equals(node) { - return false - } - } - - return true -} - //+kubebuilder:object:root=true //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" @@ -260,6 +237,7 @@ func (k *KafkaSpec) ToInstAPI() *models.KafkaCluster { KarapaceRestProxy: k.karapaceRestProxyToInstAPI(), KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(), ResizeSettings: k.ResizeSettings.ToInstAPI(), + PCIComplianceMode: k.PCICompliance, } } @@ -386,6 +364,7 @@ func (ks *KafkaSpec) FromInstAPI(instaModel *models.KafkaCluster) { ks.ClientBrokerAuthWithMTLS = instaModel.ClientBrokerAuthWithMtls ks.BundledUseOnly = instaModel.BundledUseOnly ks.Version = instaModel.KafkaVersion + ks.PCICompliance = instaModel.PCIComplianceMode ks.DCsFromInstAPI(instaModel.DataCentres) ks.kraftFromInstAPI(instaModel.Kraft) @@ -491,17 +470,8 @@ func (ks *KafkaStatus) DCsFromInstAPI(instaModels []*models.KafkaDataCentre) { func (s *KafkaDataCentreStatus) FromInstAPI(instaModel *models.KafkaDataCentre) { s.GenericDataCentreStatus.FromInstAPI(&instaModel.GenericDataCentreFields) - s.nodesFromInstAPI(instaModel.Nodes) s.PrivateLink.FromInstAPI(instaModel.PrivateLink) -} - -func (s *KafkaDataCentreStatus) nodesFromInstAPI(instaModels []*models.Node) { - s.Nodes = make([]*Node, len(instaModels)) - for i, instaModel := range instaModels { - node := Node{} - node.FromInstAPI(instaModel) - s.Nodes[i] = &node - } + s.Nodes = nodesFromInstAPI(instaModel.Nodes) } func (a *KafkaSpec) IsEqual(b KafkaSpec) bool { diff --git a/apis/clusters/v1beta1/kafka_webhook.go b/apis/clusters/v1beta1/kafka_webhook.go index e7475780a..7564d30a0 100644 --- a/apis/clusters/v1beta1/kafka_webhook.go +++ b/apis/clusters/v1beta1/kafka_webhook.go @@ -372,6 +372,7 @@ type specificKafkaFields struct { bundledUseOnly bool privateNetworkCluster bool clientBrokerAuthWithMtls bool + pciCompliance bool } func (ks *KafkaSpec) newKafkaImmutableFields() *immutableKafkaFields { @@ -385,6 +386,7 @@ func (ks *KafkaSpec) newKafkaImmutableFields() *immutableKafkaFields { bundledUseOnly: ks.BundledUseOnly, privateNetworkCluster: ks.PrivateNetwork, clientBrokerAuthWithMtls: ks.ClientBrokerAuthWithMTLS, + pciCompliance: ks.PCICompliance, }, cluster: ks.GenericClusterSpec.immutableFields(), } diff --git a/apis/clusters/v1beta1/kafkaconnect_types.go b/apis/clusters/v1beta1/kafkaconnect_types.go index 8cd9cda8c..d7e171cc4 100644 --- a/apis/clusters/v1beta1/kafkaconnect_types.go +++ b/apis/clusters/v1beta1/kafkaconnect_types.go @@ -17,7 +17,6 @@ limitations under the License. package v1beta1 import ( - "encoding/json" "fmt" k8scorev1 "k8s.io/api/core/v1" @@ -27,6 +26,7 @@ import ( clusterresource "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/utils/slices" ) type TargetCluster struct { @@ -102,13 +102,17 @@ type GCPConnectorSettings struct { } type KafkaConnectDataCentre struct { - DataCentre `json:",inline"` - ReplicationFactor int `json:"replicationFactor"` + GenericDataCentreSpec `json:",inline"` + + NodeSize string `json:"nodeSize"` + NumberOfNodes int `json:"numberOfNodes"` + ReplicationFactor int `json:"replicationFactor"` } // KafkaConnectSpec defines the desired state of KafkaConnect type KafkaConnectSpec struct { - Cluster `json:",inline"` + GenericClusterSpec `json:",inline"` + DataCentres []*KafkaConnectDataCentre `json:"dataCentres"` TargetCluster []*TargetCluster `json:"targetCluster"` @@ -118,8 +122,18 @@ type KafkaConnectSpec struct { // KafkaConnectStatus defines the observed state of KafkaConnect type KafkaConnectStatus struct { - ClusterStatus `json:",inline"` - TargetKafkaClusterID string `json:"targetKafkaClusterId,omitempty"` + GenericStatus `json:",inline"` + + DataCentres []*KafkaConnectDataCentreStatus `json:"dataCentres,omitempty"` + + DefaultUserSecretRef *Reference `json:"defaultUserSecretRef,omitempty"` +} + +type KafkaConnectDataCentreStatus struct { + GenericDataCentreStatus `json:",inline"` + + NumberOfNodes int `json:"numberOfNodes,omitempty"` + Nodes []*Node `json:"nodes,omitempty"` DefaultUserSecretRef *Reference `json:"defaultUserSecretRef,omitempty"` } @@ -177,67 +191,59 @@ func init() { SchemeBuilder.Register(&KafkaConnect{}, &KafkaConnectList{}) } -func (k *KafkaConnect) FromInst(iKCData []byte) (*KafkaConnect, error) { - iKC := &models.KafkaConnectCluster{} - err := json.Unmarshal(iKCData, iKC) - if err != nil { - return nil, err - } +func (k *KafkaConnect) FromInstAPI(instaModel *models.KafkaConnectCluster) { + k.Spec.FromInstAPI(instaModel) + k.Status.FromInstAPI(instaModel) +} + +func (ks *KafkaConnectSpec) FromInstAPI(instaModel *models.KafkaConnectCluster) { + ks.GenericClusterSpec.FromInstAPI(&instaModel.GenericClusterFields) - return &KafkaConnect{ - TypeMeta: k.TypeMeta, - ObjectMeta: k.ObjectMeta, - Spec: k.Spec.FromInstAPI(iKC), - Status: k.Status.FromInstAPI(iKC), - }, nil + ks.Version = instaModel.KafkaConnectVersion + + ks.DCsFromInstAPI(instaModel.DataCentres) + ks.TargetClustersFromInstAPI(instaModel.TargetCluster) + ks.CustomConnectorsFromInstAPI(instaModel.CustomConnectors) } -func (ks *KafkaConnectSpec) FromInstAPI(iKC *models.KafkaConnectCluster) KafkaConnectSpec { - return KafkaConnectSpec{ - Cluster: Cluster{ - Name: iKC.Name, - Version: iKC.KafkaConnectVersion, - Description: iKC.Description, - PrivateNetworkCluster: iKC.PrivateNetworkCluster, - SLATier: iKC.SLATier, - TwoFactorDelete: ks.Cluster.TwoFactorDeleteFromInstAPI(iKC.TwoFactorDelete), - }, - DataCentres: ks.DCsFromInstAPI(iKC.DataCentres), - TargetCluster: ks.TargetClustersFromInstAPI(iKC.TargetCluster), - CustomConnectors: ks.CustomConnectorsFromInstAPI(iKC.CustomConnectors), - } +func (ks *KafkaConnectStatus) FromInstAPI(instaModel *models.KafkaConnectCluster) { + ks.GenericStatus.FromInstAPI(&instaModel.GenericClusterFields) + ks.DCsFromInstAPI(instaModel.DataCentres) } -func (ks *KafkaConnectStatus) FromInstAPI(iKC *models.KafkaConnectCluster) KafkaConnectStatus { - return KafkaConnectStatus{ - ClusterStatus: ClusterStatus{ - ID: iKC.ID, - State: iKC.Status, - DataCentres: ks.DCsFromInstAPI(iKC.DataCentres), - CurrentClusterOperationStatus: iKC.CurrentClusterOperationStatus, - MaintenanceEvents: ks.MaintenanceEvents, - }, +func (ks *KafkaConnectSpec) DCsFromInstAPI(instaModels []*models.KafkaConnectDataCentre) { + ks.DataCentres = make([]*KafkaConnectDataCentre, len(instaModels)) + for i, instaModel := range instaModels { + dc := KafkaConnectDataCentre{} + dc.FromInstAPI(instaModel) + ks.DataCentres[i] = &dc } } -func (ks *KafkaConnectSpec) DCsFromInstAPI(iDCs []*models.KafkaConnectDataCentre) (dcs []*KafkaConnectDataCentre) { - for _, iDC := range iDCs { - dcs = append(dcs, &KafkaConnectDataCentre{ - DataCentre: ks.Cluster.DCFromInstAPI(iDC.DataCentre), - ReplicationFactor: iDC.ReplicationFactor, - }) - } - return +func (k *KafkaConnectDataCentre) FromInstAPI(instaModel *models.KafkaConnectDataCentre) { + k.GenericDataCentreSpec.FromInstAPI(&instaModel.GenericDataCentreFields) + + k.NodeSize = instaModel.NodeSize + k.NumberOfNodes = instaModel.NumberOfNodes + k.ReplicationFactor = instaModel.ReplicationFactor } -func (ks *KafkaConnectSpec) TargetClustersFromInstAPI(iClusters []*models.TargetCluster) (clusters []*TargetCluster) { - for _, iCluster := range iClusters { - clusters = append(clusters, &TargetCluster{ - ExternalCluster: ks.ExternalClustersFromInstAPI(iCluster.ExternalCluster), - ManagedCluster: ks.ManagedClustersFromInstAPI(iCluster.ManagedCluster), - }) +func (k *KafkaConnectDataCentre) Equals(o *KafkaConnectDataCentre) bool { + return k.GenericDataCentreSpec.Equals(&o.GenericDataCentreSpec) && + k.NumberOfNodes == o.NumberOfNodes && + k.ReplicationFactor == o.ReplicationFactor && + k.NodeSize == o.NodeSize +} + +func (ks *KafkaConnectSpec) TargetClustersFromInstAPI(instaModels []*models.TargetCluster) { + targetCluster := make([]*TargetCluster, len(instaModels)) + for i, instaModel := range instaModels { + targetCluster[i] = &TargetCluster{ + ExternalCluster: ks.ExternalClustersFromInstAPI(instaModel.ExternalCluster), + ManagedCluster: ks.ManagedClustersFromInstAPI(instaModel.ManagedCluster), + } } - return + ks.TargetCluster = targetCluster } func (ks *KafkaConnectSpec) ExternalClustersFromInstAPI(iClusters []*models.ExternalCluster) (clusters []*ExternalCluster) { @@ -257,24 +263,30 @@ func (ks *KafkaConnectSpec) ExternalClustersFromInstAPI(iClusters []*models.Exte } func (ks *KafkaConnectSpec) ManagedClustersFromInstAPI(iClusters []*models.ManagedCluster) (clusters []*ManagedCluster) { + var clusterRef *clusterresource.ClusterRef + if managedCluster := ks.GetManagedCluster(); managedCluster != nil { + clusterRef = managedCluster.ClusterRef + } + for _, iCluster := range iClusters { clusters = append(clusters, &ManagedCluster{ TargetKafkaClusterID: iCluster.TargetKafkaClusterID, KafkaConnectVPCType: iCluster.KafkaConnectVPCType, + ClusterRef: clusterRef, }) } return } -func (ks *KafkaConnectSpec) CustomConnectorsFromInstAPI(iConns []*models.CustomConnectors) (conns []*CustomConnectors) { - for _, iConn := range iConns { - conns = append(conns, &CustomConnectors{ +func (ks *KafkaConnectSpec) CustomConnectorsFromInstAPI(instaModels []*models.CustomConnectors) { + ks.CustomConnectors = make([]*CustomConnectors, len(instaModels)) + for i, iConn := range instaModels { + ks.CustomConnectors[i] = &CustomConnectors{ AzureConnectorSettings: ks.AzureConnectorSettingsFromInstAPI(iConn.AzureConnectorSettings), AWSConnectorSettings: ks.AWSConnectorSettingsFromInstAPI(iConn.AWSConnectorSettings), GCPConnectorSettings: ks.GCPConnectorSettingsFromInstAPI(iConn.GCPConnectorSettings), - }) + } } - return } func (ks *KafkaConnectSpec) AzureConnectorSettingsFromInstAPI(iSettings []*models.AzureConnectorSettings) (settings []*AzureConnectorSettings) { @@ -314,118 +326,45 @@ func (ks *KafkaConnectSpec) GCPConnectorSettingsFromInstAPI(iSettings []*models. return } -func (ks *KafkaConnectStatus) DCsFromInstAPI(iDCs []*models.KafkaConnectDataCentre) (dcs []*DataCentreStatus) { - for _, iDC := range iDCs { - dcs = append(dcs, ks.DCFromInstAPI(iDC.DataCentre)) +func (ks *KafkaConnectStatus) DCsFromInstAPI(instaModels []*models.KafkaConnectDataCentre) { + ks.DataCentres = make([]*KafkaConnectDataCentreStatus, len(instaModels)) + for i, instaModel := range instaModels { + dc := KafkaConnectDataCentreStatus{} + dc.FromInstAPI(instaModel) + ks.DataCentres[i] = &dc } - return } func (c *KafkaConnect) GetSpec() KafkaConnectSpec { return c.Spec } func (c *KafkaConnect) IsSpecEqual(spec KafkaConnectSpec) bool { - return c.Spec.IsEqual(spec) -} - -func (ks *KafkaConnectSpec) IsEqual(kc KafkaConnectSpec) bool { - return ks.Cluster.IsEqual(kc.Cluster) && - ks.AreDataCentresEqual(kc.DataCentres) && - ks.AreTargetClustersEqual(kc.TargetCluster) && - ks.AreCustomConnectorsEqual(kc.CustomConnectors) -} - -func (ks *KafkaConnectSpec) AreDataCentresEqual(dcs []*KafkaConnectDataCentre) bool { - if len(ks.DataCentres) != len(dcs) { - return false - } - - for i, iDC := range dcs { - dataCentre := ks.DataCentres[i] - - if iDC.Name != dataCentre.Name { - continue - } - - if !dataCentre.IsEqual(iDC.DataCentre) || - iDC.ReplicationFactor != dataCentre.ReplicationFactor { - return false - } - } - - return true -} - -func (ks *KafkaConnectSpec) AreTargetClustersEqual(tClusters []*TargetCluster) bool { - if len(ks.TargetCluster) != len(tClusters) { - return false - } - - for i, tCluster := range tClusters { - cluster := ks.TargetCluster[i] - if !cluster.AreExternalClustersEqual(tCluster.ExternalCluster) || - !cluster.AreManagedClustersEqual(tCluster.ManagedCluster) { - return false - } - } - - return true + return c.Spec.Equals(&spec) } -func (tc *TargetCluster) AreExternalClustersEqual(eClusters []*ExternalCluster) bool { - if len(tc.ExternalCluster) != len(eClusters) { - return false - } - - for i, eCluster := range eClusters { - cluster := tc.ExternalCluster[i] - if *eCluster != *cluster { - return false - } - } - - return true +func (ks *KafkaConnectSpec) Equals(instaModel *KafkaConnectSpec) bool { + return ks.GenericClusterSpec.Equals(&instaModel.GenericClusterSpec) && + ks.DCsEqual(instaModel.DataCentres) && + ks.TargetClustersEqual(instaModel.TargetCluster) && + ks.CustomConnectorsEqual(instaModel.CustomConnectors) } -func (tc *TargetCluster) AreManagedClustersEqual(mClusters []*ManagedCluster) bool { - if len(tc.ManagedCluster) != len(mClusters) { +func (ks *KafkaConnectSpec) DCsEqual(o []*KafkaConnectDataCentre) bool { + if len(ks.DataCentres) != len(o) { return false } - for i, mCluster := range mClusters { - cluster := tc.ManagedCluster[i] - if mCluster.KafkaConnectVPCType != cluster.KafkaConnectVPCType { - return false - } - } - - return true -} - -func (ks *KafkaConnectSpec) AreCustomConnectorsEqual(cConns []*CustomConnectors) bool { - if len(ks.CustomConnectors) != len(cConns) { - return false + m := map[string]*KafkaConnectDataCentre{} + for _, dc := range ks.DataCentres { + m[dc.Name] = dc } - for i, cConn := range cConns { - conn := ks.CustomConnectors[i] - if !conn.AreAzureConnectorSettingsEqual(cConn.AzureConnectorSettings) || - !conn.AreAWSConnectorSettingsEqual(cConn.AWSConnectorSettings) || - !conn.AreGCPConnectorSettingsEqual(cConn.GCPConnectorSettings) { + for _, dc := range o { + mDC, ok := m[dc.Name] + if !ok { return false } - } - return true -} - -func (cc *CustomConnectors) AreAzureConnectorSettingsEqual(aSettings []*AzureConnectorSettings) bool { - if len(cc.AzureConnectorSettings) != len(aSettings) { - return false - } - - for i, aSetting := range aSettings { - settings := cc.AzureConnectorSettings[i] - if *aSetting != *settings { + if !mDC.Equals(dc) { return false } } @@ -433,14 +372,14 @@ func (cc *CustomConnectors) AreAzureConnectorSettingsEqual(aSettings []*AzureCon return true } -func (cc *CustomConnectors) AreAWSConnectorSettingsEqual(aSettings []*AWSConnectorSettings) bool { - if len(cc.AWSConnectorSettings) != len(aSettings) { +func (ks *KafkaConnectSpec) TargetClustersEqual(o []*TargetCluster) bool { + if len(ks.TargetCluster) != len(o) { return false } - for i, aSetting := range aSettings { - settings := cc.AWSConnectorSettings[i] - if *aSetting != *settings { + for i := range o { + if !slices.EqualsPtr(ks.TargetCluster[i].ExternalCluster, o[i].ExternalCluster) || + !slices.EqualsPtr(ks.TargetCluster[i].ManagedCluster, o[i].ManagedCluster) { return false } } @@ -448,14 +387,15 @@ func (cc *CustomConnectors) AreAWSConnectorSettingsEqual(aSettings []*AWSConnect return true } -func (cc *CustomConnectors) AreGCPConnectorSettingsEqual(gSettings []*GCPConnectorSettings) bool { - if len(cc.GCPConnectorSettings) != len(gSettings) { +func (ks *KafkaConnectSpec) CustomConnectorsEqual(o []*CustomConnectors) bool { + if len(ks.CustomConnectors) != len(o) { return false } - for i, gSetting := range gSettings { - settings := cc.GCPConnectorSettings[i] - if *gSetting != *settings { + for i := range ks.CustomConnectors { + if !slices.EqualsPtr(ks.CustomConnectors[i].AWSConnectorSettings, o[i].AWSConnectorSettings) || + !slices.EqualsPtr(ks.CustomConnectors[i].GCPConnectorSettings, o[i].GCPConnectorSettings) || + !slices.EqualsPtr(ks.CustomConnectors[i].AzureConnectorSettings, o[i].AzureConnectorSettings) { return false } } @@ -469,17 +409,13 @@ func (ks *KafkaConnectSpec) NewDCsUpdate() models.KafkaConnectAPIUpdate { } } -func (ks *KafkaConnectSpec) ToInstAPI() models.KafkaConnectCluster { - return models.KafkaConnectCluster{ - Name: ks.Name, - KafkaConnectVersion: ks.Version, - PrivateNetworkCluster: ks.PrivateNetworkCluster, - Description: ks.Description, - SLATier: ks.SLATier, - TwoFactorDelete: ks.TwoFactorDeletesToInstAPI(), - CustomConnectors: ks.CustomConnectorsToInstAPI(), - TargetCluster: ks.TargetClustersToInstAPI(), - DataCentres: ks.DCsToInstAPI(), +func (ks *KafkaConnectSpec) ToInstAPI() *models.KafkaConnectCluster { + return &models.KafkaConnectCluster{ + GenericClusterFields: ks.GenericClusterSpec.ToInstAPI(), + KafkaConnectVersion: ks.Version, + CustomConnectors: ks.CustomConnectorsToInstAPI(), + TargetCluster: ks.TargetClustersToInstAPI(), + DataCentres: ks.DCsToInstAPI(), } } @@ -492,8 +428,10 @@ func (ks *KafkaConnectSpec) DCsToInstAPI() (iDCs []*models.KafkaConnectDataCentr func (kdc *KafkaConnectDataCentre) ToInstAPI() *models.KafkaConnectDataCentre { return &models.KafkaConnectDataCentre{ - DataCentre: kdc.DataCentre.ToInstAPI(), - ReplicationFactor: kdc.ReplicationFactor, + GenericDataCentreFields: kdc.GenericDataCentreSpec.ToInstAPI(), + NodeSize: kdc.NodeSize, + NumberOfNodes: kdc.NumberOfNodes, + ReplicationFactor: kdc.ReplicationFactor, } } @@ -612,7 +550,7 @@ func (k *KafkaConnect) NewDefaultUserSecret(username, password string) *k8scorev func (k *KafkaConnect) GetExposePorts() []k8scorev1.ServicePort { var exposePorts []k8scorev1.ServicePort - if !k.Spec.PrivateNetworkCluster { + if !k.Spec.PrivateNetwork { exposePorts = []k8scorev1.ServicePort{ { Name: models.KafkaConnectAPI, @@ -640,3 +578,56 @@ func (k *KafkaConnect) GetHeadlessPorts() []k8scorev1.ServicePort { } return headlessPorts } + +func (s *KafkaConnectDataCentreStatus) FromInstAPI(instaModel *models.KafkaConnectDataCentre) { + s.GenericDataCentreStatus.FromInstAPI(&instaModel.GenericDataCentreFields) + s.NumberOfNodes = instaModel.NumberOfNodes + s.Nodes = nodesFromInstAPI(instaModel.Nodes) +} + +func (s *KafkaConnectDataCentreStatus) Equals(o *KafkaConnectDataCentreStatus) bool { + return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) && + nodesEqual(s.Nodes, o.Nodes) && + s.NumberOfNodes == o.NumberOfNodes +} + +func (ks *KafkaConnectSpec) GetManagedCluster() *ManagedCluster { + if len(ks.TargetCluster) < 1 { + return nil + } + + if len(ks.TargetCluster[0].ManagedCluster) < 1 || ks.TargetCluster[0].ManagedCluster[0] == nil { + return nil + } + + return ks.TargetCluster[0].ManagedCluster[0] +} + +func (ks *KafkaConnectStatus) Equals(o *KafkaConnectStatus) bool { + return ks.GenericStatus.Equals(&o.GenericStatus) && + ks.DCsEqual(o.DataCentres) +} + +func (ks *KafkaConnectStatus) DCsEqual(o []*KafkaConnectDataCentreStatus) bool { + if len(ks.DataCentres) != len(o) { + return false + } + + m := map[string]*KafkaConnectDataCentreStatus{} + for _, dc := range ks.DataCentres { + m[dc.ID] = dc + } + + for _, dc := range o { + mDC, ok := m[dc.ID] + if !ok { + return false + } + + if !mDC.Equals(dc) { + return false + } + } + + return true +} diff --git a/apis/clusters/v1beta1/kafkaconnect_types_test.go b/apis/clusters/v1beta1/kafkaconnect_types_test.go new file mode 100644 index 000000000..7b5d42e32 --- /dev/null +++ b/apis/clusters/v1beta1/kafkaconnect_types_test.go @@ -0,0 +1,72 @@ +package v1beta1 + +import ( + "reflect" + "testing" + + clusterresource "github.com/instaclustr/operator/apis/clusterresources/v1beta1" + "github.com/instaclustr/operator/pkg/models" +) + +func TestKafkaConnectSpec_ManagedClustersFromInstAPI(t *testing.T) { + type fields struct { + GenericClusterSpec GenericClusterSpec + DataCentres []*KafkaConnectDataCentre + TargetCluster []*TargetCluster + CustomConnectors []*CustomConnectors + } + type args struct { + iClusters []*models.ManagedCluster + } + tests := []struct { + name string + fields fields + args args + wantClusters []*ManagedCluster + }{ + { + name: "empty response", + args: args{nil}, + wantClusters: nil, + }, + { + name: "1 managed cluster from inst API, in k8s we have ref to cluster", + fields: fields{ + TargetCluster: []*TargetCluster{{ManagedCluster: []*ManagedCluster{{ClusterRef: &clusterresource.ClusterRef{Name: "test-ref"}}}}}, + }, + args: args{iClusters: []*models.ManagedCluster{{ + TargetKafkaClusterID: "test-id", + KafkaConnectVPCType: "test-vpc", + }}}, + wantClusters: []*ManagedCluster{{ + TargetKafkaClusterID: "test-id", + KafkaConnectVPCType: "test-vpc", + ClusterRef: &clusterresource.ClusterRef{Name: "test-ref"}, + }}, + }, + { + name: "1 managed cluster from inst API, in k8s we don't have ref to cluster", + args: args{iClusters: []*models.ManagedCluster{{ + TargetKafkaClusterID: "test-id", + KafkaConnectVPCType: "test-vpc", + }}}, + wantClusters: []*ManagedCluster{{ + TargetKafkaClusterID: "test-id", + KafkaConnectVPCType: "test-vpc", + }}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ks := &KafkaConnectSpec{ + GenericClusterSpec: tt.fields.GenericClusterSpec, + DataCentres: tt.fields.DataCentres, + TargetCluster: tt.fields.TargetCluster, + CustomConnectors: tt.fields.CustomConnectors, + } + if gotClusters := ks.ManagedClustersFromInstAPI(tt.args.iClusters); !reflect.DeepEqual(gotClusters, tt.wantClusters) { + t.Errorf("ManagedClustersFromInstAPI() = %v, want %v", gotClusters, tt.wantClusters) + } + }) + } +} diff --git a/apis/clusters/v1beta1/kafkaconnect_webhook.go b/apis/clusters/v1beta1/kafkaconnect_webhook.go index 96c3ff2a2..de354016e 100644 --- a/apis/clusters/v1beta1/kafkaconnect_webhook.go +++ b/apis/clusters/v1beta1/kafkaconnect_webhook.go @@ -65,10 +65,6 @@ func (r *KafkaConnect) Default() { models.ResourceStateAnnotation: "", }) } - - for _, dataCentre := range r.Spec.DataCentres { - dataCentre.SetDefaultValues() - } } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. @@ -90,7 +86,7 @@ func (kcv *kafkaConnectValidator) ValidateCreate(ctx context.Context, obj runtim return err } - err = kc.Spec.Cluster.ValidateCreation() + err = kc.Spec.GenericClusterSpec.ValidateCreation() if err != nil { return err } @@ -152,7 +148,7 @@ func (kcv *kafkaConnectValidator) ValidateCreate(ctx context.Context, obj runtim return models.ErrOnPremisesWithMultiDC } - err = dc.DataCentre.ValidateCreation() + err = dc.GenericDataCentreSpec.validateCreation() if err != nil { return err } @@ -162,7 +158,7 @@ func (kcv *kafkaConnectValidator) ValidateCreate(ctx context.Context, obj runtim return err } - if ((dc.NodesNumber*dc.ReplicationFactor)/dc.ReplicationFactor)%dc.ReplicationFactor != 0 { + if ((dc.NumberOfNodes*dc.ReplicationFactor)/dc.ReplicationFactor)%dc.ReplicationFactor != 0 { return fmt.Errorf("number of nodes must be a multiple of replication factor: %v", dc.ReplicationFactor) } } @@ -179,6 +175,10 @@ func (kcv *kafkaConnectValidator) ValidateUpdate(ctx context.Context, old runtim kafkaconnectlog.Info("validate update", "name", kc.Name) + if kc.Annotations[models.ResourceStateAnnotation] == models.CreatingEvent { + return nil + } + // skip validation when we receive cluster specification update from the Instaclustr Console. if kc.Annotations[models.ExternalChangesAnnotation] == models.True { return nil @@ -230,7 +230,7 @@ type immutableKafkaConnectDCFields struct { func (kc *KafkaConnectSpec) newImmutableFields() *immutableKafkaConnectFields { return &immutableKafkaConnectFields{ - immutableCluster: kc.Cluster.newImmutableFields(), + immutableCluster: kc.GenericClusterSpec.immutableFields(), } } @@ -274,7 +274,7 @@ func (kc *KafkaConnectSpec) validateImmutableDataCentresFieldsUpdate(oldSpec Kaf return fmt.Errorf("cannot update immutable data centre fields: new spec: %v: old spec: %v", newDCImmutableFields, oldDCImmutableFields) } - err := newDC.validateImmutableCloudProviderSettingsUpdate(oldDC.CloudProviderSettings) + err := newDC.validateImmutableCloudProviderSettingsUpdate(&oldDC.GenericDataCentreSpec) if err != nil { return err } @@ -284,11 +284,11 @@ func (kc *KafkaConnectSpec) validateImmutableDataCentresFieldsUpdate(oldSpec Kaf return err } - if newDC.NodesNumber < oldDC.NodesNumber { - return fmt.Errorf("deleting nodes is not supported. Number of nodes must be greater than: %v", oldDC.NodesNumber) + if newDC.NumberOfNodes < oldDC.NumberOfNodes { + return fmt.Errorf("deleting nodes is not supported. Number of nodes must be greater than: %v", oldDC.NumberOfNodes) } - if ((newDC.NodesNumber*newDC.ReplicationFactor)/newDC.ReplicationFactor)%newDC.ReplicationFactor != 0 { + if ((newDC.NumberOfNodes*newDC.ReplicationFactor)/newDC.ReplicationFactor)%newDC.ReplicationFactor != 0 { return fmt.Errorf("number of nodes must be a multiple of replication factor: %v", newDC.ReplicationFactor) } } diff --git a/apis/clusters/v1beta1/opensearch_types.go b/apis/clusters/v1beta1/opensearch_types.go index ec710cdde..11b4cf76b 100644 --- a/apis/clusters/v1beta1/opensearch_types.go +++ b/apis/clusters/v1beta1/opensearch_types.go @@ -49,6 +49,7 @@ type OpenSearchSpec struct { IndexManagementPlugin bool `json:"indexManagementPlugin,omitempty"` AlertingPlugin bool `json:"alertingPlugin,omitempty"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` + PCICompliance bool `json:"pciCompliance,omitempty"` UserRefs References `json:"userRefs,omitempty"` //+kubuilder:validation:MaxItems:=1 ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` @@ -71,6 +72,7 @@ func (s *OpenSearchSpec) FromInstAPI(instaModel *models.OpenSearchCluster) { s.IndexManagementPlugin = instaModel.IndexManagementPlugin s.AlertingPlugin = instaModel.AlertingPlugin s.BundledUseOnly = instaModel.BundledUseOnly + s.PCICompliance = instaModel.PCIComplianceMode s.dcsFromInstAPI(instaModel.DataCentres) s.dataNodesFromInstAPI(instaModel.DataNodes) @@ -191,6 +193,7 @@ func (oss *OpenSearchSpec) ToInstAPI() *models.OpenSearchCluster { IndexManagementPlugin: oss.IndexManagementPlugin, AlertingPlugin: oss.AlertingPlugin, IngestNodes: oss.ingestNodesToInstAPI(), + PCIComplianceMode: oss.PCICompliance, } } diff --git a/apis/clusters/v1beta1/opensearch_webhook.go b/apis/clusters/v1beta1/opensearch_webhook.go index b0e6bdef3..54293fd70 100644 --- a/apis/clusters/v1beta1/opensearch_webhook.go +++ b/apis/clusters/v1beta1/opensearch_webhook.go @@ -245,6 +245,7 @@ type specificOpenSearchFields struct { IndexManagementPlugin bool AlertingPlugin bool BundledUseOnly bool + PCICompliance bool } func (oss *OpenSearchSpec) newImmutableFields() *immutableOpenSearchFields { @@ -261,6 +262,7 @@ func (oss *OpenSearchSpec) newImmutableFields() *immutableOpenSearchFields { IndexManagementPlugin: oss.IndexManagementPlugin, AlertingPlugin: oss.AlertingPlugin, BundledUseOnly: oss.BundledUseOnly, + PCICompliance: oss.PCICompliance, }, cluster: oss.GenericClusterSpec.immutableFields(), } diff --git a/apis/clusters/v1beta1/redis_types.go b/apis/clusters/v1beta1/redis_types.go index 10fba2e8e..33f52d276 100644 --- a/apis/clusters/v1beta1/redis_types.go +++ b/apis/clusters/v1beta1/redis_types.go @@ -73,6 +73,7 @@ type RedisSpec struct { ClientEncryption bool `json:"clientEncryption"` // Enables Password Authentication and User Authorization PasswordAndUserAuth bool `json:"passwordAndUserAuth"` + PCICompliance bool `json:"pciCompliance,omitempty"` //+kubebuilder:validation:MaxItems:=2 DataCentres []*RedisDataCentre `json:"dataCentres"` @@ -166,6 +167,7 @@ func (rs *RedisSpec) ToInstAPI() *models.RedisCluster { RedisVersion: rs.Version, ClientToNodeEncryption: rs.ClientEncryption, PasswordAndUserAuth: rs.PasswordAndUserAuth, + PCIComplianceMode: rs.PCICompliance, DataCentres: rs.DCsToInstAPI(), } } @@ -267,6 +269,7 @@ func (rs *RedisSpec) FromInstAPI(instaModel *models.RedisCluster) { rs.ClientEncryption = instaModel.ClientToNodeEncryption rs.PasswordAndUserAuth = instaModel.PasswordAndUserAuth rs.Version = instaModel.RedisVersion + rs.PCICompliance = instaModel.PCIComplianceMode rs.DCsFromInstAPI(instaModel.DataCentres) } @@ -308,21 +311,12 @@ func (rs *RedisStatus) DCsFromInstAPI(instaModels []*models.RedisDataCentre) { func (s *RedisDataCentreStatus) FromInstAPI(instaModel *models.RedisDataCentre) { s.GenericDataCentreStatus.FromInstAPI(&instaModel.GenericDataCentreFields) s.PrivateLink.FromInstAPI(instaModel.PrivateLink) - s.nodesFromInstAPI(instaModel.Nodes) -} - -func (s *RedisDataCentreStatus) nodesFromInstAPI(instaModels []*models.Node) { - s.Nodes = make([]*Node, len(instaModels)) - for i, instaModel := range instaModels { - n := Node{} - n.FromInstAPI(instaModel) - s.Nodes[i] = &n - } + s.Nodes = nodesFromInstAPI(instaModel.Nodes) } func (s *RedisDataCentreStatus) Equals(o *RedisDataCentreStatus) bool { return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) && - s.nodesEqual(o.Nodes) && + nodesEqual(s.Nodes, o.Nodes) && slices.EqualsPtr(s.PrivateLink, o.PrivateLink) } @@ -355,30 +349,6 @@ func (rs *RedisStatus) DCsEqual(o []*RedisDataCentreStatus) bool { return true } -func (s *RedisDataCentreStatus) nodesEqual(o []*Node) bool { - if len(s.Nodes) != len(o) { - return false - } - - m := map[string]*Node{} - for _, node := range s.Nodes { - m[node.ID] = node - } - - for _, node := range o { - mNode, ok := m[node.ID] - if !ok { - return false - } - - if !mNode.Equals(node) { - return false - } - } - - return true -} - func (s *RedisStatus) ToOnPremises() ClusterStatus { dc := &DataCentreStatus{ ID: s.DataCentres[0].ID, diff --git a/apis/clusters/v1beta1/redis_webhook.go b/apis/clusters/v1beta1/redis_webhook.go index a174d9cda..b0aaa1176 100644 --- a/apis/clusters/v1beta1/redis_webhook.go +++ b/apis/clusters/v1beta1/redis_webhook.go @@ -226,6 +226,7 @@ type immutableRedisFields struct { type specificRedisFields struct { ClientEncryption bool PasswordAndUserAuth bool + PCICompliance bool } type immutableRedisDCFields struct { @@ -320,6 +321,7 @@ func (rs *RedisSpec) newImmutableFields() *immutableRedisFields { specificRedisFields: specificRedisFields{ ClientEncryption: rs.ClientEncryption, PasswordAndUserAuth: rs.PasswordAndUserAuth, + PCICompliance: rs.PCICompliance, }, immutableCluster: rs.GenericClusterSpec.immutableFields(), } diff --git a/apis/clusters/v1beta1/structs.go b/apis/clusters/v1beta1/structs.go index 8a9744104..09e8fe002 100644 --- a/apis/clusters/v1beta1/structs.go +++ b/apis/clusters/v1beta1/structs.go @@ -885,3 +885,38 @@ type AzureSettings struct { // The network must have a prefix length between /16 and /28, and must be part of a private address range. StorageNetwork string `json:"storageNetwork,omitempty"` } + +func nodesEqual(s1, s2 []*Node) bool { + if len(s1) != len(s2) { + return false + } + + m := map[string]*Node{} + for _, node := range s1 { + m[node.ID] = node + } + + for _, s2Node := range s2 { + s1Node, ok := m[s2Node.ID] + if !ok { + return false + } + + if !s1Node.Equals(s2Node) { + return false + } + } + + return true +} + +func nodesFromInstAPI(instaModels []*models.Node) []*Node { + nodes := make([]*Node, len(instaModels)) + for i, instaModel := range instaModels { + n := Node{} + n.FromInstAPI(instaModel) + nodes[i] = &n + } + + return nodes +} diff --git a/apis/clusters/v1beta1/validation.go b/apis/clusters/v1beta1/validation.go index 42ddb6ce1..3985c8b3e 100644 --- a/apis/clusters/v1beta1/validation.go +++ b/apis/clusters/v1beta1/validation.go @@ -339,7 +339,6 @@ func (s *GenericClusterSpec) immutableFields() immutableCluster { return immutableCluster{ Name: s.Name, Version: s.Version, - PCICompliance: s.PCICompliance, PrivateNetworkCluster: s.PrivateNetwork, SLATier: s.SLATier, } diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index 50c6ab8cf..204f11f2d 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -1221,7 +1221,7 @@ func (in *KafkaConnect) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaConnectDataCentre) DeepCopyInto(out *KafkaConnectDataCentre) { *out = *in - in.DataCentre.DeepCopyInto(&out.DataCentre) + in.GenericDataCentreSpec.DeepCopyInto(&out.GenericDataCentreSpec) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaConnectDataCentre. @@ -1234,6 +1234,38 @@ func (in *KafkaConnectDataCentre) DeepCopy() *KafkaConnectDataCentre { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaConnectDataCentreStatus) DeepCopyInto(out *KafkaConnectDataCentreStatus) { + *out = *in + in.GenericDataCentreStatus.DeepCopyInto(&out.GenericDataCentreStatus) + if in.Nodes != nil { + in, out := &in.Nodes, &out.Nodes + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Node) + (*in).DeepCopyInto(*out) + } + } + } + if in.DefaultUserSecretRef != nil { + in, out := &in.DefaultUserSecretRef, &out.DefaultUserSecretRef + *out = new(apiextensions.ObjectReference) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaConnectDataCentreStatus. +func (in *KafkaConnectDataCentreStatus) DeepCopy() *KafkaConnectDataCentreStatus { + if in == nil { + return nil + } + out := new(KafkaConnectDataCentreStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaConnectList) DeepCopyInto(out *KafkaConnectList) { *out = *in @@ -1269,7 +1301,7 @@ func (in *KafkaConnectList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaConnectSpec) DeepCopyInto(out *KafkaConnectSpec) { *out = *in - in.Cluster.DeepCopyInto(&out.Cluster) + in.GenericClusterSpec.DeepCopyInto(&out.GenericClusterSpec) if in.DataCentres != nil { in, out := &in.DataCentres, &out.DataCentres *out = make([]*KafkaConnectDataCentre, len(*in)) @@ -1318,7 +1350,18 @@ func (in *KafkaConnectSpec) DeepCopy() *KafkaConnectSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaConnectStatus) DeepCopyInto(out *KafkaConnectStatus) { *out = *in - in.ClusterStatus.DeepCopyInto(&out.ClusterStatus) + in.GenericStatus.DeepCopyInto(&out.GenericStatus) + if in.DataCentres != nil { + in, out := &in.DataCentres, &out.DataCentres + *out = make([]*KafkaConnectDataCentreStatus, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(KafkaConnectDataCentreStatus) + (*in).DeepCopyInto(*out) + } + } + } if in.DefaultUserSecretRef != nil { in, out := &in.DefaultUserSecretRef, &out.DefaultUserSecretRef *out = new(apiextensions.ObjectReference) diff --git a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml index 7b16982a6..5ef899082 100644 --- a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml +++ b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml @@ -245,10 +245,6 @@ spec: passwordAndUserAuth: type: boolean pciCompliance: - description: The PCI compliance standards relate to the security of - user data and transactional information. Can only be applied clusters - provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch - and Redis. type: boolean privateNetwork: type: boolean diff --git a/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml b/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml index 4f51936db..2ad0321c3 100644 --- a/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml +++ b/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml @@ -135,45 +135,131 @@ spec: items: properties: accountName: + default: INSTACLUSTR + description: For customers running in their own account. Your + provider account can be found on the Create Cluster page on + the Instaclustr Console, or the "Provider Account" property + on any existing cluster. For customers provisioning on Instaclustr's + cloud provider accounts, this property may be omitted. type: string - cloudProvider: - type: string - cloudProviderSettings: + awsSettings: + description: AWS specific settings for the Data Centre. Cannot + be provided with GCP or Azure settings. items: properties: backupBucket: + description: Specify the S3 bucket to use for storing + backup data for the cluster data centre. Only available + for customers running in their own cloud provider accounts. + Currently supported for OpenSearch clusters only. type: string customVirtualNetworkId: + description: VPC ID into which the Data Centre will be + provisioned. The Data Centre's network allocation must + match the IPv4 CIDR block of the specified VPC. type: string - disableSnapshotAutoExpiry: - type: boolean - diskEncryptionKey: + encryptionKey: + description: ID of a KMS encryption key to encrypt data + on nodes. KMS encryption key must be set in Cluster + Resources through the Instaclustr Console before provisioning + an encrypted Data Centre. + type: string + type: object + maxItems: 1 + type: array + azureSettings: + description: Azure specific settings for the Data Centre. Cannot + be provided with AWS or GCP settings. + items: + properties: + customVirtualNetworkId: + description: VNet ID into which the Data Centre will be + provisioned. The VNet must have an available address + space for the Data Centre's network allocation to be + appended to the VNet. Currently supported for PostgreSQL + clusters only. type: string resourceGroup: + description: The name of the Azure Resource Group into + which the Data Centre will be provisioned. + type: string + storageNetwork: + description: 'The private network address block to be + used for the storage network. This is only used for + certain node sizes, currently limited to those which + use Azure NetApp Files: for all other node sizes, this + field should not be provided. The network must have + a prefix length between /16 and /28, and must be part + of a private address range.' type: string type: object + maxItems: 1 + type: array + cloudProvider: + description: Name of a cloud provider service. + type: string + gcpSettings: + description: GCP specific settings for the Data Centre. Cannot + be provided with AWS or Azure settings. + items: + properties: + customVirtualNetworkId: + description: "Network name or a relative Network or Subnetwork + URI. The Data Centre's network allocation must match + the IPv4 CIDR block of the specified subnet. \n Examples: + Network URI: projects/{riyoa-gcp-project-name}/global/networks/{network-name}. + Network name: {network-name}, equivalent to projects/{riyoa-gcp-project-name}/global/networks/{network-name}. + Same-project subnetwork URI: projects/{riyoa-gcp-project-name}/regions/{region-id}/subnetworks/{subnetwork-name}. + Shared VPC subnetwork URI: projects/{riyoa-gcp-host-project-name}/regions/{region-id}/subnetworks/{subnetwork-name}." + type: string + disableSnapshotAutoExpiry: + description: Specify whether the GCS backup bucket should + automatically expire data after 7 days or not. Setting + this to true will disable automatic expiry and will + allow for creation of custom snapshot repositories with + customisable retention using the Index Management Plugin. + The storage will have to be manually cleared after the + cluster is deleted. Only available for customers running + in their own cloud provider accounts. Currently supported + for OpenSearch clusters only. + type: boolean + type: object + maxItems: 1 type: array name: + description: A logical name for the data centre within a cluster. + These names must be unique in the cluster. type: string network: + description: The private network address block for the Data + Centre specified using CIDR address notation. The network + must have a prefix length between /12 and /22 and must be + part of a private address space. type: string nodeSize: type: string - nodesNumber: + numberOfNodes: type: integer region: + description: Region of the Data Centre. type: string replicationFactor: type: integer tags: additionalProperties: type: string + description: List of tags to apply to the Data Centre. Tags + are metadata labels which allow you to identify, categorize + and filter clusters. This can be useful for grouping together + clusters into applications, environments, or any category + that you require. type: object required: - cloudProvider + - name - network - nodeSize - - nodesNumber + - numberOfNodes - region - replicationFactor type: object @@ -183,13 +269,7 @@ spec: name: description: Name [ 3 .. 32 ] characters. type: string - pciCompliance: - description: The PCI compliance standards relate to the security of - user data and transactional information. Can only be applied clusters - provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch - and Redis. - type: boolean - privateNetworkCluster: + privateNetwork: type: boolean slaTier: description: 'Non-production clusters may receive lower priority support @@ -274,15 +354,22 @@ spec: status: description: KafkaConnectStatus defines the observed state of KafkaConnect properties: - cdcid: - type: string currentClusterOperationStatus: type: string dataCentres: items: properties: - encryptionKeyId: - type: string + defaultUserSecretRef: + description: ObjectReference is namespaced reference to an object + properties: + name: + type: string + namespace: + type: string + required: + - name + - namespace + type: object id: type: string name: @@ -308,21 +395,8 @@ spec: type: string type: object type: array - nodesNumber: + numberOfNodes: type: integer - privateLink: - items: - properties: - advertisedHostname: - type: string - endPointServiceId: - type: string - endPointServiceName: - type: string - required: - - advertisedHostname - type: object - type: array resizeOperations: items: properties: @@ -480,21 +554,8 @@ spec: type: array type: object type: array - options: - properties: - dataNodeSize: - type: string - masterNodeSize: - type: string - openSearchDashboardsNodeSize: - type: string - type: object state: type: string - targetKafkaClusterId: - type: string - twoFactorDeleteEnabled: - type: boolean type: object type: object served: true diff --git a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml index b81e1f630..faf584408 100644 --- a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml +++ b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml @@ -260,10 +260,6 @@ spec: new topics. type: integer pciCompliance: - description: The PCI compliance standards relate to the security of - user data and transactional information. Can only be applied clusters - provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch - and Redis. type: boolean privateNetwork: type: boolean diff --git a/config/crd/bases/clusters.instaclustr.com_opensearches.yaml b/config/crd/bases/clusters.instaclustr.com_opensearches.yaml index 3cff6afc5..1688290ab 100644 --- a/config/crd/bases/clusters.instaclustr.com_opensearches.yaml +++ b/config/crd/bases/clusters.instaclustr.com_opensearches.yaml @@ -250,10 +250,6 @@ spec: type: object type: array pciCompliance: - description: The PCI compliance standards relate to the security of - user data and transactional information. Can only be applied clusters - provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch - and Redis. type: boolean privateNetwork: type: boolean diff --git a/config/crd/bases/clusters.instaclustr.com_redis.yaml b/config/crd/bases/clusters.instaclustr.com_redis.yaml index 95b8761b5..757936350 100644 --- a/config/crd/bases/clusters.instaclustr.com_redis.yaml +++ b/config/crd/bases/clusters.instaclustr.com_redis.yaml @@ -212,10 +212,6 @@ spec: description: Enables Password Authentication and User Authorization type: boolean pciCompliance: - description: The PCI compliance standards relate to the security of - user data and transactional information. Can only be applied clusters - provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch - and Redis. type: boolean privateNetwork: type: boolean diff --git a/config/samples/clusters_v1beta1_cassandra.yaml b/config/samples/clusters_v1beta1_cassandra.yaml index 1f3797102..9044a7461 100644 --- a/config/samples/clusters_v1beta1_cassandra.yaml +++ b/config/samples/clusters_v1beta1_cassandra.yaml @@ -1,9 +1,9 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: Cassandra metadata: - name: cassandra-cluster + name: cassandra-cluster-2 spec: - name: "example-cassandra" #(immutable) + name: "bohdan-cassandra" #(immutable) version: "4.0.10" #(immutable) privateNetwork: false #(immutable) dataCentres: @@ -12,11 +12,11 @@ spec: # debezium: # - kafkaVpcType: "VPC_PEERED" #(immutable) # kafkaTopicPrefix: "test" #(immutable) -# kafkaCdcId: "556b283e-d094-4bcf-a068-14fa68c2a43a" #(mutable) -## clusterRef:1 -## name: kafka -## namespace: default -## clusterKind: Kafka +# kafkaCdcId: 8db44d06-3cd6-4bb3-86fb-c9b1e2a03941 #(mutable) +# clusterRef: +# name: kafka +# namespace: default +# clusterKind: Kafka # version: "2.0.1" #(immutable) cloudProvider: "AWS_VPC" #(immutable) continuousBackup: false #(immutable) @@ -51,9 +51,9 @@ spec: pciCompliance: false #(immutable) luceneEnabled: false #(immutable) # can be enabled only on 3.11.13 version of Cassandra (currently it could be enabled on 4.0.10 version as well) passwordAndUserAuth: true #(immutable) - # userRefs: - # - namespace: default - # name: cassandrauser-sample + userRefs: + - namespace: default + name: cassandrauser-sample # - namespace: default # name: cassandrauser-sample2 # - namespace: default diff --git a/config/samples/clusters_v1beta1_kafka.yaml b/config/samples/clusters_v1beta1_kafka.yaml index 514dc1c6e..2fb94d3d0 100644 --- a/config/samples/clusters_v1beta1_kafka.yaml +++ b/config/samples/clusters_v1beta1_kafka.yaml @@ -1,33 +1,33 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: Kafka metadata: - name: kafka + name: kafka-2 spec: - name: "example-Kafka" + name: "bohdan-kafka-test" version: "3.5.1" pciCompliance: false replicationFactor: 3 partitionsNumber: 3 allowDeleteTopics: true autoCreateTopics: true - clientToClusterEncryption: false + clientToClusterEncryption: true privateNetwork: false slaTier: "NON_PRODUCTION" # bundledUseOnly: true -# clientBrokerAuthWithMtls: true + clientBrokerAuthWithMtls: true # dedicatedZookeeper: # - nodeSize: "KDZ-DEV-t4g.small-30" # nodesNumber: 3 # twoFactorDelete: # - email: "asdfadfsdsf" # phone: "ddsafasdf" -# karapaceSchemaRegistry: -# - version: "3.6.2" + karapaceSchemaRegistry: + - version: "3.6.2" # schemaRegistry: # - version: "3.0.0" -# karapaceRestProxy: -# - integrateRestProxyWithSchemaRegistry: true -# version: "3.6.2" + karapaceRestProxy: + - integrateRestProxyWithSchemaRegistry: true + version: "3.6.2" # kraft: # - controllerNodeCount: 3 # restProxy: diff --git a/config/samples/clusters_v1beta1_kafkaconnect.yaml b/config/samples/clusters_v1beta1_kafkaconnect.yaml index 681369028..fff310b53 100644 --- a/config/samples/clusters_v1beta1_kafkaconnect.yaml +++ b/config/samples/clusters_v1beta1_kafkaconnect.yaml @@ -3,9 +3,10 @@ kind: KafkaConnect metadata: name: kafkaconnect-sample spec: + name: "bohdan-KC-test" dataCentres: - name: "US_EAST_1_DC_KAFKA" - nodesNumber: 3 + numberOfNodes: 3 # nodesNumber: 6 cloudProvider: "AWS_VPC" replicationFactor: 3 @@ -15,9 +16,8 @@ spec: nodeSize: "KCN-DEV-t4g.medium-30" network: "10.15.0.0/16" region: "US_EAST_1" - name: "Username-KC" version: "3.5.1" - privateNetworkCluster: false + privateNetwork: false slaTier: "NON_PRODUCTION" targetCluster: - managedCluster: diff --git a/controllers/clusterresources/helpers.go b/controllers/clusterresources/helpers.go index bbd41807e..61d4303f9 100644 --- a/controllers/clusterresources/helpers.go +++ b/controllers/clusterresources/helpers.go @@ -144,10 +144,6 @@ func GetDataCentreID(cl client.Client, ctx context.Context, ref *v1beta1.Cluster func GetClusterID(cl client.Client, ctx context.Context, ref *v1beta1.ClusterRef) (string, error) { var obj ClusterIDProvider - ns := types.NamespacedName{ - Namespace: ref.Namespace, - Name: ref.Name, - } switch ref.ClusterKind { case models.RedisClusterKind: @@ -170,7 +166,7 @@ func GetClusterID(cl client.Client, ctx context.Context, ref *v1beta1.ClusterRef return "", models.ErrUnsupportedClusterKind } - err := cl.Get(ctx, ns, obj) + err := cl.Get(ctx, ref.AsNamespacedName(), obj) if err != nil { return "", err } diff --git a/controllers/clusters/cadence_controller.go b/controllers/clusters/cadence_controller.go index 8a196ef7b..6a502cce2 100644 --- a/controllers/clusters/cadence_controller.go +++ b/controllers/clusters/cadence_controller.go @@ -22,7 +22,6 @@ import ( "fmt" "github.com/go-logr/logr" - rlimiter "github.com/instaclustr/operator/pkg/ratelimiter" k8serrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -38,6 +37,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/ratelimiter" + rlimiter "github.com/instaclustr/operator/pkg/ratelimiter" + "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" @@ -696,11 +697,11 @@ func (r *CadenceReconciler) newCassandraSpec(c *v1beta1.Cadence, latestCassandra SLATier: slaTier, PrivateNetwork: privateNetwork, TwoFactorDelete: twoFactorDelete, - PCICompliance: pciCompliance, }, DataCentres: cassandraDataCentres, PasswordAndUserAuth: cassPasswordAndUserAuth, BundledUseOnly: true, + PCICompliance: pciCompliance, } return &v1beta1.Cassandra{ @@ -955,7 +956,6 @@ func (r *CadenceReconciler) newKafkaSpec(c *v1beta1.Cadence, latestKafkaVersion SLATier: slaTier, PrivateNetwork: privateClusterNetwork, TwoFactorDelete: kafkaTFD, - PCICompliance: pciCompliance, }, DataCentres: kafkaDataCentres, ReplicationFactor: bundledKafkaSpec.ReplicationFactor, @@ -964,6 +964,7 @@ func (r *CadenceReconciler) newKafkaSpec(c *v1beta1.Cadence, latestKafkaVersion AutoCreateTopics: true, ClientToClusterEncryption: clientEncryption, BundledUseOnly: true, + PCICompliance: pciCompliance, } return &v1beta1.Kafka{ @@ -1046,11 +1047,11 @@ func (r *CadenceReconciler) newOpenSearchSpec(c *v1beta1.Cadence, oldestOpenSear SLATier: slaTier, PrivateNetwork: privateClusterNetwork, TwoFactorDelete: twoFactorDelete, - PCICompliance: pciCompliance, }, DataCentres: osDataCentres, ClusterManagerNodes: managerNodes, BundledUseOnly: true, + PCICompliance: pciCompliance, } return &v1beta1.OpenSearch{ diff --git a/controllers/clusters/datatest/kafkaconnect_v1beta1.yaml b/controllers/clusters/datatest/kafkaconnect_v1beta1.yaml index 97419dcc3..72ffd33a3 100644 --- a/controllers/clusters/datatest/kafkaconnect_v1beta1.yaml +++ b/controllers/clusters/datatest/kafkaconnect_v1beta1.yaml @@ -8,7 +8,7 @@ metadata: spec: dataCentres: - name: "US_EAST_1_DC_KAFKA" - nodesNumber: 3 + numberOfNodes: 3 # accountName: "TestAccName" cloudProvider: "AWS_VPC" replicationFactor: 3 diff --git a/controllers/clusters/kafkaconnect_controller.go b/controllers/clusters/kafkaconnect_controller.go index 7d3c4356d..e7fd49d6e 100644 --- a/controllers/clusters/kafkaconnect_controller.go +++ b/controllers/clusters/kafkaconnect_controller.go @@ -18,7 +18,9 @@ package clusters import ( "context" + "encoding/json" "errors" + "fmt" "github.com/go-logr/logr" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -101,120 +103,103 @@ func (r *KafkaConnectReconciler) Reconcile(ctx context.Context, req ctrl.Request } } +func (r *KafkaConnectReconciler) mergeManagedClusterFromRef(ctx context.Context, kc *v1beta1.KafkaConnect) error { + managedCluster := kc.Spec.GetManagedCluster() + if managedCluster == nil || managedCluster.ClusterRef == nil { + return nil + } + + targetClusterID, err := clusterresources.GetClusterID(r.Client, ctx, managedCluster.ClusterRef) + if err != nil { + return fmt.Errorf("failed to get managed cluster id by ref %s/%s for kind %s, err: %w", + managedCluster.ClusterRef.Namespace, + managedCluster.ClusterRef.Name, + managedCluster.ClusterRef.ClusterKind, + err) + } + + managedCluster.TargetKafkaClusterID = targetClusterID + + return nil +} + +func (r *KafkaConnectReconciler) createCluster(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) error { + err := r.mergeManagedClusterFromRef(ctx, kc) + if err != nil { + return err + } + + b, err := r.API.CreateClusterRaw(instaclustr.KafkaConnectEndpoint, kc.Spec.ToInstAPI()) + if err != nil { + return fmt.Errorf("failed to create KafkaConnect cluster, err: %w", err) + } + + var instaModel models.KafkaConnectCluster + err = json.Unmarshal(b, &instaModel) + if err != nil { + return fmt.Errorf("failed to unmarshal body to KafkaConnect model, err: %w", err) + } + + kc.Spec.FromInstAPI(&instaModel) + + err = r.Update(ctx, kc) + if err != nil { + return fmt.Errorf("failed to update resource spec, err: %w", err) + } + + kc.Status.FromInstAPI(&instaModel) + err = r.Status().Update(ctx, kc) + if err != nil { + return fmt.Errorf("failed to update resource status, err: %w", err) + } + + l.Info("KafkaConnect cluster has been created", + "clusterID", kc.Status.ID, + ) + r.EventRecorder.Eventf( + kc, models.Normal, models.Created, + "Cluster creation request is sent. Cluster ID: %s", + kc.Status.ID, + ) + + err = r.createDefaultSecret(ctx, kc, l) + if err != nil { + return err + } + + return nil +} + func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) (reconcile.Result, error) { l = l.WithName("Creation Event") if kc.Status.ID == "" { - var err error - iKafkaConnectSpec := kc.Spec.ToInstAPI() - var targetClusterID string - - for i, targetCluster := range kc.Spec.TargetCluster { - for j, managedCluster := range targetCluster.ManagedCluster { - if managedCluster.ClusterRef != nil { - targetClusterID, err = clusterresources.GetClusterID(r.Client, ctx, managedCluster.ClusterRef) - if err != nil { - l.Error(err, "Cannot get cluster ID", - "Cluster reference", managedCluster.ClusterRef, - ) - return ctrl.Result{}, err - } - - iKafkaConnectSpec.TargetCluster[i].ManagedCluster[j].TargetKafkaClusterID = targetClusterID - l.Info( - "Creating KafkaConnect cluster from cluster reference", - "cluster reference", managedCluster.ClusterRef, - "cluster ID", targetClusterID, - ) - } else { - targetClusterID = managedCluster.TargetKafkaClusterID - l.Info( - "Creating Kafka Connect cluster", - "cluster name", kc.Spec.Name, - "cluster ID", targetClusterID, - "data centres", kc.Spec.DataCentres, - ) - } - } - } - - patch := kc.NewPatch() - kc.Status.ID, err = r.API.CreateCluster(instaclustr.KafkaConnectEndpoint, iKafkaConnectSpec) + err := r.createCluster(ctx, kc, l) if err != nil { - l.Error(err, "cannot create Kafka Connect in Instaclustr", "Kafka Connect manifest", kc.Spec) - r.EventRecorder.Eventf( - kc, models.Warning, models.CreationFailed, - "Cluster creation on the Instaclustr is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - kc.Status.TargetKafkaClusterID = targetClusterID - - r.EventRecorder.Eventf( - kc, models.Normal, models.Created, - "Cluster creation request is sent. Cluster ID: %s", - kc.Status.ID, - ) - err = r.Status().Patch(ctx, kc, patch) - if err != nil { - l.Error(err, "cannot patch Kafka Connect status ", "KC ID", kc.Status.ID) - r.EventRecorder.Eventf( - kc, models.Warning, models.PatchFailed, - "Cluster resource status patch is failed. Reason: %v", - err, + r.EventRecorder.Eventf(kc, models.Warning, models.CreationFailed, + "Failed to create cluster resource. Reason: %v", err, ) return reconcile.Result{}, err } + } + if kc.Status.State != models.DeletedStatus { + patch := kc.NewPatch() kc.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent controllerutil.AddFinalizer(kc, models.DeletionFinalizer) - err = r.Patch(ctx, kc, patch) + err := r.Patch(ctx, kc, patch) if err != nil { - l.Error(err, "Cannot patch Kafka Connect", "cluster name", kc.Spec.Name) - r.EventRecorder.Eventf( - kc, models.Warning, models.PatchFailed, - "Cluster resource patch is failed. Reason: %v", - err, - ) return reconcile.Result{}, err } - err = r.createDefaultSecret(ctx, kc, l) + err = r.startSyncJob(kc) if err != nil { - l.Error(err, "Cannot create default secret for Kafka Connect", - "cluster name", kc.Spec.Name, - "clusterID", kc.Status.ID, - ) - r.EventRecorder.Eventf( - kc, models.Warning, models.CreationFailed, - "Default user secret creation on the Instaclustr is failed. Reason: %v", - err, - ) - - return reconcile.Result{}, err - } - - l.Info("Kafka Connect cluster has been created", - "cluster ID", kc.Status.ID, - ) - } - - if kc.Status.State != models.DeletedStatus { - err := r.startClusterStatusJob(kc) - if err != nil { - l.Error(err, "Cannot start cluster status job", "cluster ID", kc.Status.ID) - r.EventRecorder.Eventf( - kc, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err + return reconcile.Result{}, fmt.Errorf("failed to start sync job, err: %w", err) } r.EventRecorder.Eventf( kc, models.Normal, models.Created, - "Cluster status check job is started", + "Cluster sync job is started", ) } @@ -229,7 +214,7 @@ func (r *KafkaConnectReconciler) handleUpdateCluster( ) (reconcile.Result, error) { l = l.WithName("Update Event") - iData, err := r.API.GetKafkaConnect(kc.Status.ID) + instaModel, err := r.API.GetKafkaConnect(kc.Status.ID) if err != nil { l.Error(err, "Cannot get Kafka Connect from Instaclustr", "ClusterID", kc.Status.ID) @@ -241,24 +226,15 @@ func (r *KafkaConnectReconciler) handleUpdateCluster( return reconcile.Result{}, err } - iKC, err := kc.FromInst(iData) - if err != nil { - l.Error(err, "Cannot convert Kafka Connect from Instaclustr", - "ClusterID", kc.Status.ID) - r.EventRecorder.Eventf( - kc, models.Warning, models.ConversionFailed, - "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + iKC := &v1beta1.KafkaConnect{} + iKC.FromInstAPI(instaModel) if kc.Annotations[models.ExternalChangesAnnotation] == models.True || r.RateLimiter.NumRequeues(req) == rlimiter.DefaultMaxTries { return handleExternalChanges[v1beta1.KafkaConnectSpec](r.EventRecorder, r.Client, kc, iKC, l) } - if kc.Spec.ClusterSettingsNeedUpdate(iKC.Spec.Cluster) { + if kc.Spec.ClusterSettingsNeedUpdate(&iKC.Spec.GenericClusterSpec) { l.Info("Updating cluster settings", "instaclustr description", iKC.Spec.Description, "instaclustr two factor delete", iKC.Spec.TwoFactorDelete) @@ -274,7 +250,7 @@ func (r *KafkaConnectReconciler) handleUpdateCluster( } } - if !kc.Spec.IsEqual(iKC.Spec) { + if !kc.Spec.Equals(&iKC.Spec) { l.Info("Update request to Instaclustr API has been sent", "spec data centres", kc.Spec.DataCentres) @@ -322,7 +298,7 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1 if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error(err, "Cannot get Kafka Connect cluster", "cluster name", kc.Spec.Name, - "cluster state", kc.Status.ClusterStatus.State) + "cluster state", kc.Status.State) r.EventRecorder.Eventf( kc, models.Warning, models.FetchFailed, "Cluster fetch from the Instaclustr API is failed. Reason: %v", @@ -489,8 +465,8 @@ func (r *KafkaConnectReconciler) startClusterOnPremisesIPsJob(k *v1beta1.KafkaCo return nil } -func (r *KafkaConnectReconciler) startClusterStatusJob(kc *v1beta1.KafkaConnect) error { - job := r.newWatchStatusJob(kc) +func (r *KafkaConnectReconciler) startSyncJob(kc *v1beta1.KafkaConnect) error { + job := r.newSyncJob(kc) err := r.Scheduler.ScheduleJob(kc.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) if err != nil { @@ -500,8 +476,9 @@ func (r *KafkaConnectReconciler) startClusterStatusJob(kc *v1beta1.KafkaConnect) return nil } -func (r *KafkaConnectReconciler) newWatchStatusJob(kc *v1beta1.KafkaConnect) scheduler.Job { +func (r *KafkaConnectReconciler) newSyncJob(kc *v1beta1.KafkaConnect) scheduler.Job { l := log.Log.WithValues("component", "kafkaConnectStatusClusterJob") + return func() error { namespacedName := client.ObjectKeyFromObject(kc) err := r.Get(context.Background(), namespacedName, kc) @@ -517,7 +494,7 @@ func (r *KafkaConnectReconciler) newWatchStatusJob(kc *v1beta1.KafkaConnect) sch return err } - iData, err := r.API.GetKafkaConnect(kc.Status.ID) + instaModel, err := r.API.GetKafkaConnect(kc.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { if kc.DeletionTimestamp != nil { @@ -533,22 +510,16 @@ func (r *KafkaConnectReconciler) newWatchStatusJob(kc *v1beta1.KafkaConnect) sch return err } - iKC, err := kc.FromInst(iData) - if err != nil { - l.Error(err, "Cannot convert Kafka Connect from Instaclustr", - "cluster ID", kc.Status.ID) - return err - } + iKC := &v1beta1.KafkaConnect{} + iKC.FromInstAPI(instaModel) - if !areStatusesEqual(&iKC.Status.ClusterStatus, &kc.Status.ClusterStatus) { - l.Info("Kafka Connect status of k8s is different from Instaclustr. Reconcile statuses..", - "instaclustr status", iKC.Status, - "status", kc.Status.ClusterStatus) + if !kc.Status.Equals(&iKC.Status) { + l.Info("Kafka Connect status of k8s is different from Instaclustr. Reconcile statuses..") - areDCsEqual := areDataCentresEqual(iKC.Status.ClusterStatus.DataCentres, kc.Status.ClusterStatus.DataCentres) + areDCsEqual := kc.Status.DCsEqual(iKC.Status.DataCentres) patch := kc.NewPatch() - kc.Status.ClusterStatus = iKC.Status.ClusterStatus + kc.Status.FromInstAPI(instaModel) err = r.Status().Patch(context.Background(), kc, patch) if err != nil { l.Error(err, "Cannot patch Kafka Connect cluster", @@ -559,14 +530,14 @@ func (r *KafkaConnectReconciler) newWatchStatusJob(kc *v1beta1.KafkaConnect) sch if !areDCsEqual { var nodes []*v1beta1.Node - for _, dc := range iKC.Status.ClusterStatus.DataCentres { + for _, dc := range iKC.Status.DataCentres { nodes = append(nodes, dc.Nodes...) } err = exposeservice.Create(r.Client, kc.Name, kc.Namespace, - kc.Spec.PrivateNetworkCluster, + kc.Spec.PrivateNetwork, nodes, models.KafkaConnectConnectionPort) if err != nil { @@ -575,19 +546,13 @@ func (r *KafkaConnectReconciler) newWatchStatusJob(kc *v1beta1.KafkaConnect) sch } } - equals := kc.Spec.IsEqual(iKC.Spec) + equals := kc.Spec.Equals(&iKC.Spec) if equals && kc.Annotations[models.ExternalChangesAnnotation] == models.True { - patch := kc.NewPatch() - delete(kc.Annotations, models.ExternalChangesAnnotation) - err := r.Patch(context.Background(), kc, patch) + err = reconcileExternalChanges(r.Client, r.EventRecorder, kc) if err != nil { return err } - - r.EventRecorder.Event(kc, models.Normal, models.ExternalChanges, - "External changes were automatically reconciled", - ) } else if kc.Status.CurrentClusterOperationStatus == models.NoOperation && kc.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent && !equals { @@ -655,6 +620,10 @@ func (r *KafkaConnectReconciler) SetupWithManager(mgr ctrl.Manager) error { return false } + if newObj.Status.ID == "" && newObj.Annotations[models.ResourceStateAnnotation] == models.CreatingEvent { + return false + } + if newObj.Status.ID == "" { newObj.Annotations[models.ResourceStateAnnotation] = models.CreatingEvent return true @@ -681,7 +650,7 @@ func (r *KafkaConnectReconciler) reconcileMaintenanceEvents(ctx context.Context, return err } - if !kc.Status.AreMaintenanceEventStatusesEqual(iMEStatuses) { + if !kc.Status.MaintenanceEventsEqual(iMEStatuses) { patch := kc.NewPatch() kc.Status.MaintenanceEvents = iMEStatuses err = r.Status().Patch(ctx, kc, patch) diff --git a/controllers/clusters/kafkaconnect_controller_test.go b/controllers/clusters/kafkaconnect_controller_test.go index 8fa6e945b..a305c6b44 100644 --- a/controllers/clusters/kafkaconnect_controller_test.go +++ b/controllers/clusters/kafkaconnect_controller_test.go @@ -90,7 +90,7 @@ var _ = Describe("Kafka Connect Controller", func() { Expect(k8sClient.Get(ctx, kafkaConnectNamespacedName, &kafkaConnect)).Should(Succeed()) patch := kafkaConnect.NewPatch() - kafkaConnect.Spec.DataCentres[0].NodesNumber = newKafkaConnectNodeNumbers + kafkaConnect.Spec.DataCentres[0].NumberOfNodes = newKafkaConnectNodeNumbers Expect(k8sClient.Patch(ctx, &kafkaConnect, patch)).Should(Succeed()) By("sending a resize request to the Instaclustr API. And when the resize is completed, " + @@ -104,7 +104,7 @@ var _ = Describe("Kafka Connect Controller", func() { return false } - return kafkaConnect.Status.DataCentres[0].NodesNumber == newKafkaConnectNodeNumbers + return kafkaConnect.Status.DataCentres[0].NumberOfNodes == newKafkaConnectNodeNumbers }, timeout, interval).Should(BeTrue()) }) }) diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index 90048aad2..c9ce45b59 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -448,7 +448,7 @@ func (c *Client) GetKafka(id string) (*models.KafkaCluster, error) { return &instaModel, nil } -func (c *Client) GetKafkaConnect(id string) ([]byte, error) { +func (c *Client) GetKafkaConnect(id string) (*models.KafkaConnectCluster, error) { url := c.serverHostname + KafkaConnectEndpoint + id resp, err := c.DoRequest(url, http.MethodGet, nil) @@ -470,7 +470,13 @@ func (c *Client) GetKafkaConnect(id string) ([]byte, error) { return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) } - return body, nil + var instaModel models.KafkaConnectCluster + err = json.Unmarshal(body, &instaModel) + if err != nil { + return nil, err + } + + return &instaModel, nil } func (c *Client) UpdateKafkaConnect(id string, kc models.KafkaConnectAPIUpdate) error { diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index 528be78c1..1ecb489a6 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -82,7 +82,7 @@ type API interface { GetCassandra(id string) (*models.CassandraCluster, error) UpdateCassandra(id string, cassandra models.CassandraClusterAPIUpdate) error GetKafka(id string) (*models.KafkaCluster, error) - GetKafkaConnect(id string) ([]byte, error) + GetKafkaConnect(id string) (*models.KafkaConnectCluster, error) UpdateKafkaConnect(id string, kc models.KafkaConnectAPIUpdate) error GetZookeeper(id string) ([]byte, error) RestoreCluster(restoreData any, clusterKind string) (string, error) diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index 64abb0c61..22bd521c0 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -310,7 +310,7 @@ func (c *mockClient) GetKafka(id string) (*models.KafkaCluster, error) { panic("GetKafka: is not implemented") } -func (c *mockClient) GetKafkaConnect(id string) ([]byte, error) { +func (c *mockClient) GetKafkaConnect(id string) (*models.KafkaConnectCluster, error) { panic("GetKafkaConnect: is not implemented") } diff --git a/pkg/models/apiv2_generic.go b/pkg/models/apiv2_generic.go index 8d4386ff1..e7ea9c85c 100644 --- a/pkg/models/apiv2_generic.go +++ b/pkg/models/apiv2_generic.go @@ -7,7 +7,6 @@ type GenericClusterFields struct { Name string `json:"name"` Description string `json:"description,omitempty"` - PCIComplianceMode bool `json:"pciComplianceMode"` PrivateNetworkCluster bool `json:"privateNetworkCluster"` SLATier string `json:"slaTier,omitempty"` TwoFactorDelete []*TwoFactorDelete `json:"twoFactorDelete,omitempty"` diff --git a/pkg/models/cassandra_apiv2.go b/pkg/models/cassandra_apiv2.go index 981bc56c8..718ddd20c 100644 --- a/pkg/models/cassandra_apiv2.go +++ b/pkg/models/cassandra_apiv2.go @@ -23,6 +23,7 @@ type CassandraCluster struct { LuceneEnabled bool `json:"luceneEnabled"` PasswordAndUserAuth bool `json:"passwordAndUserAuth"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` + PCIComplianceMode bool `json:"pciComplianceMode"` DataCentres []*CassandraDataCentre `json:"dataCentres"` ResizeSettings []*ResizeSettings `json:"resizeSettings"` diff --git a/pkg/models/kafka_apiv2.go b/pkg/models/kafka_apiv2.go index 843c78576..468097540 100644 --- a/pkg/models/kafka_apiv2.go +++ b/pkg/models/kafka_apiv2.go @@ -25,6 +25,7 @@ type KafkaCluster struct { BundledUseOnly bool `json:"bundledUseOnly"` ClientBrokerAuthWithMtls bool `json:"clientBrokerAuthWithMtls"` ClientToClusterEncryption bool `json:"clientToClusterEncryption"` + PCIComplianceMode bool `json:"pciComplianceMode"` DefaultNumberOfPartitions int `json:"defaultNumberOfPartitions"` DefaultReplicationFactor int `json:"defaultReplicationFactor"` diff --git a/pkg/models/kafka_connect_apiv2.go b/pkg/models/kafka_connect_apiv2.go index 2785bf06d..aadc86814 100644 --- a/pkg/models/kafka_connect_apiv2.go +++ b/pkg/models/kafka_connect_apiv2.go @@ -17,17 +17,14 @@ limitations under the License. package models type KafkaConnectCluster struct { - ClusterStatus `json:",inline"` - Name string `json:"name,omitempty"` - KafkaConnectVersion string `json:"kafkaConnectVersion,omitempty"` + GenericClusterFields `json:",inline"` + + KafkaConnectVersion string `json:"kafkaConnectVersion"` PrivateNetworkCluster bool `json:"privateNetworkCluster"` - SLATier string `json:"slaTier,omitempty"` - TwoFactorDelete []*TwoFactorDelete `json:"twoFactorDelete,omitempty"` CustomConnectors []*CustomConnectors `json:"customConnectors,omitempty"` - TargetCluster []*TargetCluster `json:"targetCluster,omitempty"` - DataCentres []*KafkaConnectDataCentre `json:"dataCentres,omitempty"` + TargetCluster []*TargetCluster `json:"targetCluster"` + DataCentres []*KafkaConnectDataCentre `json:"dataCentres"` ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` - Description string `json:"description,omitempty"` } type ManagedCluster struct { @@ -64,9 +61,9 @@ type AzureConnectorSettings struct { } type AWSConnectorSettings struct { - S3RoleArn string `json:"s3RoleArn"` - SecretKey string `json:"secretKey"` - AccessKey string `json:"accessKey"` + S3RoleArn string `json:"s3RoleArn,omitempty"` + SecretKey string `json:"secretKey,omitempty"` + AccessKey string `json:"accessKey,omitempty"` S3BucketName string `json:"s3BucketName"` } @@ -80,8 +77,13 @@ type GCPConnectorSettings struct { } type KafkaConnectDataCentre struct { - DataCentre `json:",inline"` - ReplicationFactor int `json:"replicationFactor"` + GenericDataCentreFields `json:",inline"` + + NodeSize string `json:"nodeSize"` + NumberOfNodes int `json:"numberOfNodes"` + ReplicationFactor int `json:"replicationFactor"` + + Nodes []*Node `json:"nodes,omitempty"` } type KafkaConnectAPIUpdate struct { diff --git a/pkg/models/opensearch_apiv2.go b/pkg/models/opensearch_apiv2.go index 429b321f7..c9f7851a6 100644 --- a/pkg/models/opensearch_apiv2.go +++ b/pkg/models/opensearch_apiv2.go @@ -31,6 +31,7 @@ type OpenSearchCluster struct { BundledUseOnly bool `json:"bundledUseOnly"` IndexManagementPlugin bool `json:"indexManagementPlugin"` AlertingPlugin bool `json:"alertingPlugin"` + PCIComplianceMode bool `json:"pciComplianceMode"` DataCentres []*OpenSearchDataCentre `json:"dataCentres"` DataNodes []*OpenSearchDataNodes `json:"dataNodes,omitempty"` OpenSearchDashboards []*OpenSearchDashboards `json:"opensearchDashboards,omitempty"` diff --git a/pkg/models/redis_apiv2.go b/pkg/models/redis_apiv2.go index 6434353af..003d88a40 100644 --- a/pkg/models/redis_apiv2.go +++ b/pkg/models/redis_apiv2.go @@ -22,6 +22,7 @@ type RedisCluster struct { RedisVersion string `json:"redisVersion"` ClientToNodeEncryption bool `json:"clientToNodeEncryption"` PasswordAndUserAuth bool `json:"passwordAndUserAuth"` + PCIComplianceMode bool `json:"pciComplianceMode"` DataCentres []*RedisDataCentre `json:"dataCentres,omitempty"` }