Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConnect code base refactor #709

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,14 @@
"filename": "apis/clusters/v1beta1/cassandra_types.go",
"hashed_secret": "331cc743251c3b9504229de4d139c539da121a33",
"is_verified": false,
"line_number": 261
"line_number": 238
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/cassandra_types.go",
"hashed_secret": "0ad8d7005e084d4f028a4277b73c6fab24269c17",
"is_verified": false,
"line_number": 347
"line_number": 324
},
{
"type": "Secret Keyword",
Expand Down Expand Up @@ -233,84 +233,84 @@
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "46fe9b29395041087f91b33bd8c5c6177cd42fd1",
"is_verified": false,
"line_number": 247
"line_number": 253
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "4b3af1508421e2fa591c5b260c36dd06fdd872a5",
"is_verified": false,
"line_number": 285
"line_number": 297
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "cf45830dd81b7e1a8b5ffbc2d95b112771524117",
"is_verified": false,
"line_number": 295
"line_number": 307
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "138905ac46675150bf790088ec56b2efc6a64697",
"is_verified": false,
"line_number": 306
"line_number": 318
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "3948059919ffeee8ecc42149cb386f43d2f06f74",
"is_verified": false,
"line_number": 311
"line_number": 323
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "87f1180476a944c4162d1af55efedc8f3e3b609c",
"is_verified": false,
"line_number": 520
"line_number": 474
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "f0f06c9167ce61a586749bb183ac6a3756dd6010",
"is_verified": false,
"line_number": 530
"line_number": 484
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "2042128e13ef5ede4af44271160c72f64564c632",
"is_verified": false,
"line_number": 541
"line_number": 495
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "82dc9ca8ba09262ce948227aeb5d9db8084eeb5d",
"is_verified": false,
"line_number": 546
"line_number": 500
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "5f915325aef923cdc945f639f14c2f854b4214d6",
"is_verified": false,
"line_number": 570
"line_number": 524
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "5ffe533b830f08a0326348a9160afafc8ada44db",
"is_verified": false,
"line_number": 603
"line_number": 557
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/kafkaconnect_types.go",
"hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8",
"is_verified": false,
"line_number": 608
"line_number": 562
}
],
"apis/clusters/v1beta1/postgresql_types.go": [
Expand Down Expand Up @@ -538,7 +538,7 @@
"filename": "controllers/clusters/cadence_controller.go",
"hashed_secret": "192d703e91a60432ce06bfe26adfd12f5c7b931f",
"is_verified": false,
"line_number": 702
"line_number": 701
}
],
"controllers/clusters/datatest/kafka_v1beta1.yaml": [
Expand Down Expand Up @@ -1146,5 +1146,5 @@
}
]
},
"generated_at": "2024-02-19T13:01:03Z"
"generated_at": "2024-02-20T12:39:50Z"
}
59 changes: 30 additions & 29 deletions apis/clusters/v1beta1/cassandra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type CassandraSpec struct {
LuceneEnabled bool `json:"luceneEnabled,omitempty"`
PasswordAndUserAuth bool `json:"passwordAndUserAuth,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
PCICompliance bool `json:"pciCompliance,omitempty"`
UserRefs References `json:"userRefs,omitempty"`
ResizeSettings GenericResizeSettings `json:"resizeSettings,omitempty"`
}
Expand Down Expand Up @@ -147,31 +148,7 @@ type CassandraDataCentreStatus struct {

func (s *CassandraDataCentreStatus) Equals(o *CassandraDataCentreStatus) bool {
return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) &&
s.nodesEqual(o.Nodes)
}

func (s *CassandraDataCentreStatus) nodesEqual(nodes []*Node) bool {
if len(s.Nodes) != len(nodes) {
return false
}

sNodes := map[string]*Node{}
for _, node := range s.Nodes {
sNodes[node.ID] = node
}

for _, node := range nodes {
sNode, ok := sNodes[node.ID]
if !ok {
return false
}

if !sNode.Equals(node) {
return false
}
}

return true
nodesEqual(s.Nodes, o.Nodes)
}

func (s *CassandraDataCentreStatus) FromInstAPI(instModel *models.CassandraDataCentre) {
Expand Down Expand Up @@ -347,18 +324,36 @@ func (cs *CassandraSpec) FromInstAPI(instModel *models.CassandraCluster) {
cs.PasswordAndUserAuth = instModel.PasswordAndUserAuth
cs.BundledUseOnly = instModel.BundledUseOnly
cs.Version = instModel.CassandraVersion
cs.PCICompliance = instModel.PCIComplianceMode
cs.ResizeSettings.FromInstAPI(instModel.ResizeSettings)

cs.dcsFromInstAPI(instModel.DataCentres)
}

func (cs *CassandraSpec) dcsFromInstAPI(instModels []*models.CassandraDataCentre) {
cs.DataCentres = make([]*CassandraDataCentre, len(instModels))
dcs := make([]*CassandraDataCentre, len(instModels))
for i, instModel := range instModels {
dc := &CassandraDataCentre{}
dcs[i] = dc

if index := cs.getDCIndexByName(instModel.Name); index > -1 {
dc.Debezium = cs.DataCentres[index].Debezium
}

dc.FromInstAPI(instModel)
cs.DataCentres[i] = dc
}

cs.DataCentres = dcs
}

func (c *CassandraSpec) getDCIndexByName(name string) int {
for i, dc := range c.DataCentres {
if dc.Name == name {
return i
}
}

return -1
}

func (d *CassandraDataCentre) FromInstAPI(instModel *models.CassandraDataCentre) {
Expand All @@ -377,15 +372,20 @@ func (d *CassandraDataCentre) FromInstAPI(instModel *models.CassandraDataCentre)
}

func (cs *CassandraDataCentre) debeziumFromInstAPI(instModels []*models.Debezium) {
cs.Debezium = make([]*DebeziumCassandraSpec, len(instModels))
debezium := make([]*DebeziumCassandraSpec, len(instModels))
for i, instModel := range instModels {
cs.Debezium[i] = &DebeziumCassandraSpec{
debezium[i] = &DebeziumCassandraSpec{
KafkaVPCType: instModel.KafkaVPCType,
KafkaTopicPrefix: instModel.KafkaTopicPrefix,
KafkaDataCentreID: instModel.KafkaDataCentreID,
Version: instModel.Version,
}

if len(cs.Debezium)-1 >= i {
debezium[i].ClusterRef = cs.Debezium[i].ClusterRef
}
}
cs.Debezium = debezium
}

func (cs *CassandraDataCentre) shotoverProxyFromInstAPI(instModels []*models.ShotoverProxy) {
Expand All @@ -411,6 +411,7 @@ func (cs *CassandraSpec) ToInstAPI() *models.CassandraCluster {
LuceneEnabled: cs.LuceneEnabled,
PasswordAndUserAuth: cs.PasswordAndUserAuth,
BundledUseOnly: cs.BundledUseOnly,
PCIComplianceMode: cs.PCICompliance,
DataCentres: cs.DCsToInstAPI(),
ResizeSettings: cs.ResizeSettings.ToInstAPI(),
}
Expand Down
2 changes: 2 additions & 0 deletions apis/clusters/v1beta1/cassandra_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ type immutableCassandraFields struct {
type specificCassandra struct {
LuceneEnabled bool
PasswordAndUserAuth bool
PCICompliance bool
}

type immutableCassandraDCFields struct {
Expand All @@ -233,6 +234,7 @@ func (cs *CassandraSpec) newImmutableFields() *immutableCassandraFields {
specificCassandra: specificCassandra{
LuceneEnabled: cs.LuceneEnabled,
PasswordAndUserAuth: cs.PasswordAndUserAuth,
PCICompliance: cs.PCICompliance,
},
immutableCluster: cs.GenericClusterSpec.immutableFields(),
}
Expand Down
7 changes: 0 additions & 7 deletions apis/clusters/v1beta1/generic_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ type GenericClusterSpec struct {

Version string `json:"version,omitempty"`

// The PCI compliance standards relate to the security of user data and transactional information.
// Can only be applied clusters provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch and Redis.
PCICompliance bool `json:"pciCompliance,omitempty"`

PrivateNetwork bool `json:"privateNetwork,omitempty"`

// Non-production clusters may receive lower priority support and reduced SLAs.
Expand All @@ -30,7 +26,6 @@ type GenericClusterSpec struct {
func (s *GenericClusterSpec) Equals(o *GenericClusterSpec) bool {
return s.Name == o.Name &&
s.Version == o.Version &&
s.PCICompliance == o.PCICompliance &&
s.PrivateNetwork == o.PrivateNetwork &&
s.SLATier == o.SLATier &&
s.Description == o.Description &&
Expand All @@ -39,7 +34,6 @@ func (s *GenericClusterSpec) Equals(o *GenericClusterSpec) bool {

func (s *GenericClusterSpec) FromInstAPI(model *models.GenericClusterFields) {
s.Name = model.Name
s.PCICompliance = model.PCIComplianceMode
s.PrivateNetwork = model.PrivateNetworkCluster
s.SLATier = model.SLATier
s.Description = model.Description
Expand All @@ -60,7 +54,6 @@ func (s *GenericClusterSpec) ToInstAPI() models.GenericClusterFields {
return models.GenericClusterFields{
Name: s.Name,
Description: s.Description,
PCIComplianceMode: s.PCICompliance,
PrivateNetworkCluster: s.PrivateNetwork,
SLATier: s.SLATier,
TwoFactorDelete: s.TwoFactorDeleteToInstAPI(),
Expand Down
40 changes: 5 additions & 35 deletions apis/clusters/v1beta1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type KafkaSpec struct {
ClientToClusterEncryption bool `json:"clientToClusterEncryption"`
ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
PCICompliance bool `json:"pciCompliance,omitempty"`
UserRefs References `json:"userRefs,omitempty"`

// Provision additional dedicated nodes for Apache Zookeeper to run on.
Expand Down Expand Up @@ -133,7 +134,7 @@ type KafkaDataCentreStatus struct {

func (s *KafkaDataCentreStatus) Equals(o *KafkaDataCentreStatus) bool {
return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) &&
s.nodesEqual(o.Nodes) &&
nodesEqual(s.Nodes, o.Nodes) &&
slices.EqualsPtr(s.PrivateLink, o.PrivateLink)
}

Expand Down Expand Up @@ -178,30 +179,6 @@ func (s *KafkaStatus) ToOnPremises() ClusterStatus {
}
}

func (s *KafkaDataCentreStatus) nodesEqual(nodes []*Node) bool {
if len(s.Nodes) != len(nodes) {
return false
}

sNodes := map[string]*Node{}
for _, node := range s.Nodes {
sNodes[node.ID] = node
}

for _, node := range nodes {
sNode, ok := sNodes[node.ID]
if !ok {
return false
}

if !sNode.Equals(node) {
return false
}
}

return true
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
Expand Down Expand Up @@ -260,6 +237,7 @@ func (k *KafkaSpec) ToInstAPI() *models.KafkaCluster {
KarapaceRestProxy: k.karapaceRestProxyToInstAPI(),
KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(),
ResizeSettings: k.ResizeSettings.ToInstAPI(),
PCIComplianceMode: k.PCICompliance,
}
}

Expand Down Expand Up @@ -386,6 +364,7 @@ func (ks *KafkaSpec) FromInstAPI(instaModel *models.KafkaCluster) {
ks.ClientBrokerAuthWithMTLS = instaModel.ClientBrokerAuthWithMtls
ks.BundledUseOnly = instaModel.BundledUseOnly
ks.Version = instaModel.KafkaVersion
ks.PCICompliance = instaModel.PCIComplianceMode

ks.DCsFromInstAPI(instaModel.DataCentres)
ks.kraftFromInstAPI(instaModel.Kraft)
Expand Down Expand Up @@ -491,17 +470,8 @@ func (ks *KafkaStatus) DCsFromInstAPI(instaModels []*models.KafkaDataCentre) {

func (s *KafkaDataCentreStatus) FromInstAPI(instaModel *models.KafkaDataCentre) {
s.GenericDataCentreStatus.FromInstAPI(&instaModel.GenericDataCentreFields)
s.nodesFromInstAPI(instaModel.Nodes)
s.PrivateLink.FromInstAPI(instaModel.PrivateLink)
}

func (s *KafkaDataCentreStatus) nodesFromInstAPI(instaModels []*models.Node) {
s.Nodes = make([]*Node, len(instaModels))
for i, instaModel := range instaModels {
node := Node{}
node.FromInstAPI(instaModel)
s.Nodes[i] = &node
}
s.Nodes = nodesFromInstAPI(instaModel.Nodes)
}

func (a *KafkaSpec) IsEqual(b KafkaSpec) bool {
Expand Down
2 changes: 2 additions & 0 deletions apis/clusters/v1beta1/kafka_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ type specificKafkaFields struct {
bundledUseOnly bool
privateNetworkCluster bool
clientBrokerAuthWithMtls bool
pciCompliance bool
}

func (ks *KafkaSpec) newKafkaImmutableFields() *immutableKafkaFields {
Expand All @@ -385,6 +386,7 @@ func (ks *KafkaSpec) newKafkaImmutableFields() *immutableKafkaFields {
bundledUseOnly: ks.BundledUseOnly,
privateNetworkCluster: ks.PrivateNetwork,
clientBrokerAuthWithMtls: ks.ClientBrokerAuthWithMTLS,
pciCompliance: ks.PCICompliance,
},
cluster: ks.GenericClusterSpec.immutableFields(),
}
Expand Down
Loading
Loading