diff --git a/Makefile b/Makefile index 154101031..f78ac97da 100644 --- a/Makefile +++ b/Makefile @@ -78,8 +78,12 @@ test-kafkamanagement: test-users: KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./controllers/tests +.PHONY: test-webhooks +test-webhooks: + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./apis/clusters/v1beta1 -coverprofile cover.out + .PHONY: test - test: manifests generate fmt vet docker-build-server-stub run-server-stub envtest test-clusters test-clusterresources test-kafkamanagement test-users stop-server-stub + test: manifests generate fmt vet docker-build-server-stub run-server-stub envtest test-clusters test-clusterresources test-webhooks test-kafkamanagement test-users stop-server-stub .PHONY: goimports goimports: diff --git a/apis/clusters/v1beta1/cassandra_webhook.go b/apis/clusters/v1beta1/cassandra_webhook.go index a70a6a25e..ce8a4cfbc 100644 --- a/apis/clusters/v1beta1/cassandra_webhook.go +++ b/apis/clusters/v1beta1/cassandra_webhook.go @@ -280,47 +280,19 @@ func (cs *CassandraSpec) validateDataCentresUpdate(oldSpec CassandraSpec) error return models.ErrDecreasedDataCentresNumber } + toValidate := map[string]*CassandraDataCentre{} + for _, dc := range oldSpec.DataCentres { + toValidate[dc.Name] = dc + } + for _, newDC := range cs.DataCentres { - var exists bool - for _, oldDC := range oldSpec.DataCentres { - if oldDC.Name == newDC.Name { - newDCImmutableFields := newDC.newImmutableFields() - oldDCImmutableFields := oldDC.newImmutableFields() - - if *newDCImmutableFields != *oldDCImmutableFields { - return fmt.Errorf("cannot update immutable data centre fields: new spec: %v: old spec: %v", newDCImmutableFields, oldDCImmutableFields) - } - - if ((newDC.NodesNumber*newDC.ReplicationFactor)/newDC.ReplicationFactor)%newDC.ReplicationFactor != 0 { - return fmt.Errorf("number of nodes must be a multiple of replication factor: %v", newDC.ReplicationFactor) - } - - if newDC.NodesNumber < oldDC.NodesNumber { - return fmt.Errorf("deleting nodes is not supported. Number of nodes must be greater than: %v", oldDC.NodesNumber) - } - - err := newDC.validateImmutableCloudProviderSettingsUpdate(oldDC.CloudProviderSettings) - if err != nil { - return err - } - - err = validateTagsUpdate(newDC.Tags, oldDC.Tags) - if err != nil { - return err - } - - if !oldDC.DebeziumEquals(newDC) { - return models.ErrDebeziumImmutable - } - - exists = true - break + oldDC, ok := toValidate[newDC.Name] + if !ok { + if len(cs.DataCentres) == len(oldSpec.DataCentres) { + return fmt.Errorf("cannot change datacentre name: %v", newDC.Name) } - } - if !exists { - err := newDC.DataCentre.ValidateCreation() - if err != nil { + if err := newDC.ValidateCreation(); err != nil { return err } @@ -328,7 +300,7 @@ func (cs *CassandraSpec) validateDataCentresUpdate(oldSpec CassandraSpec) error return fmt.Errorf("cannot use private ip broadcast for discovery on public network cluster") } - err = validateReplicationFactor(models.CassandraReplicationFactors, newDC.ReplicationFactor) + err := validateReplicationFactor(models.CassandraReplicationFactors, newDC.ReplicationFactor) if err != nil { return err } @@ -337,9 +309,37 @@ func (cs *CassandraSpec) validateDataCentresUpdate(oldSpec CassandraSpec) error return fmt.Errorf("number of nodes must be a multiple of replication factor: %v", newDC.ReplicationFactor) } - return nil + } + + newDCImmutableFields := newDC.newImmutableFields() + oldDCImmutableFields := oldDC.newImmutableFields() + + if *newDCImmutableFields != *oldDCImmutableFields { + return fmt.Errorf("cannot update immutable data centre fields: new spec: %v: old spec: %v", newDCImmutableFields, oldDCImmutableFields) + } + + if ((newDC.NodesNumber*newDC.ReplicationFactor)/newDC.ReplicationFactor)%newDC.ReplicationFactor != 0 { + return fmt.Errorf("number of nodes must be a multiple of replication factor: %v", newDC.ReplicationFactor) + } + if newDC.NodesNumber < oldDC.NodesNumber { + return fmt.Errorf("deleting nodes is not supported. Number of nodes must be greater than: %v", oldDC.NodesNumber) } + + err := newDC.validateImmutableCloudProviderSettingsUpdate(oldDC.CloudProviderSettings) + if err != nil { + return err + } + + err = validateTagsUpdate(newDC.Tags, oldDC.Tags) + if err != nil { + return err + } + + if !oldDC.DebeziumEquals(newDC) { + return models.ErrDebeziumImmutable + } + } return nil diff --git a/apis/clusters/v1beta1/kafka_webhook.go b/apis/clusters/v1beta1/kafka_webhook.go index 302acd491..ceee02308 100644 --- a/apis/clusters/v1beta1/kafka_webhook.go +++ b/apis/clusters/v1beta1/kafka_webhook.go @@ -186,12 +186,12 @@ func (kv *kafkaValidator) ValidateUpdate(ctx context.Context, old runtime.Object return fmt.Errorf("cannot assert object %v to kafka", new.GetObjectKind()) } - kafkalog.Info("validate update", "name", k.Name) - if k.Status.ID == "" { return kv.ValidateCreate(ctx, k) } + kafkalog.Info("validate update", "name", k.Name) + // skip validation when handle external changes from Instaclustr if k.Annotations[models.ExternalChangesAnnotation] == models.True { return nil @@ -347,34 +347,40 @@ func (ks *KafkaSpec) validateImmutableDataCentresFieldsUpdate(oldSpec *KafkaSpec return models.ErrDecreasedDataCentresNumber } + toValidate := map[string]*KafkaDataCentre{} + for _, dc := range oldSpec.DataCentres { + toValidate[dc.Name] = dc + } + for _, newDC := range ks.DataCentres { - for _, oldDC := range oldSpec.DataCentres { - if oldDC.Name == newDC.Name { - newDCImmutableFields := newDC.newImmutableFields() - oldDCImmutableFields := oldDC.newImmutableFields() - - if *newDCImmutableFields != *oldDCImmutableFields { - return fmt.Errorf("cannot update immutable data centre fields: new spec: %v: old spec: %v", newDCImmutableFields, oldDCImmutableFields) - } - - if newDC.NodesNumber < oldDC.NodesNumber { - return fmt.Errorf("deleting nodes is not supported. Number of nodes must be greater than: %v", oldDC.NodesNumber) - } - - err := newDC.validateImmutableCloudProviderSettingsUpdate(oldDC.CloudProviderSettings) - if err != nil { - return err - } - - err = validateTagsUpdate(newDC.Tags, oldDC.Tags) - if err != nil { - return err - } - - if ok := isPrivateLinkValid(newDC.PrivateLink, oldDC.PrivateLink); !ok { - return fmt.Errorf("advertisedHostname field cannot be changed") - } - } + oldDC, ok := toValidate[newDC.Name] + if !ok { + return fmt.Errorf("cannot change datacentre name: %v", newDC.Name) + } + + newDCImmutableFields := newDC.newImmutableFields() + oldDCImmutableFields := oldDC.newImmutableFields() + + if *newDCImmutableFields != *oldDCImmutableFields { + return fmt.Errorf("cannot update immutable data centre fields: new spec: %v: old spec: %v", newDCImmutableFields, oldDCImmutableFields) + } + + if newDC.NodesNumber < oldDC.NodesNumber { + return fmt.Errorf("deleting nodes is not supported. Number of nodes must be greater than: %v", oldDC.NodesNumber) + } + + err := newDC.validateImmutableCloudProviderSettingsUpdate(oldDC.CloudProviderSettings) + if err != nil { + return err + } + + err = validateTagsUpdate(newDC.Tags, oldDC.Tags) + if err != nil { + return err + } + + if ok = isPrivateLinkValid(newDC.PrivateLink, oldDC.PrivateLink); !ok { + return fmt.Errorf("advertisedHostname field cannot be changed") } } diff --git a/apis/clusters/v1beta1/kafka_webhook_test.go b/apis/clusters/v1beta1/kafka_webhook_test.go new file mode 100644 index 000000000..3328ea706 --- /dev/null +++ b/apis/clusters/v1beta1/kafka_webhook_test.go @@ -0,0 +1,227 @@ +package v1beta1 + +import ( + "context" + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/yaml" + + "github.com/instaclustr/operator/pkg/models" +) + +var _ = Describe("Kafka Controller", Ordered, func() { + kafkaManifest := Kafka{} + kafkaNamespacedName := &types.NamespacedName{} + TestKafkaManifest := Kafka{} + + It("Reading kafka manifest", func() { + yfile, err := os.ReadFile("../../../controllers/clusters/datatest/kafka_v1beta1.yaml") + Expect(err).Should(Succeed()) + + err = yaml.Unmarshal(yfile, &kafkaManifest) + Expect(err).Should(Succeed()) + kafkaNamespacedName.Name = kafkaManifest.ObjectMeta.Name + kafkaNamespacedName.Namespace = defaultNS + TestKafkaManifest = kafkaManifest + }) + + ctx := context.Background() + + When("apply a Kafka manifest", func() { + It("should test kafka creation flow", func() { + + TestKafkaManifest.Spec.Description = "some description" + Expect(k8sClient.Create(ctx, &TestKafkaManifest)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.Description = kafkaManifest.Spec.Description + + if len(TestKafkaManifest.Spec.TwoFactorDelete) == 0 { + TestKafkaManifest.Spec.TwoFactorDelete = []*TwoFactorDelete{{Email: "test@test.com"}, {Email: "test@test.com"}} + } else { + TestKafkaManifest.Spec.TwoFactorDelete = []*TwoFactorDelete{kafkaManifest.Spec.TwoFactorDelete[0], {Email: "some@gmail.com"}} + } + Expect(k8sClient.Create(ctx, &TestKafkaManifest)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.TwoFactorDelete = kafkaManifest.Spec.TwoFactorDelete + + TestKafkaManifest.Spec.SLATier = "some SLATier that is not support" + Expect(k8sClient.Create(ctx, &TestKafkaManifest)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.SLATier = kafkaManifest.Spec.SLATier + + if len(TestKafkaManifest.Spec.Kraft) == 0 { + TestKafkaManifest.Spec.Kraft = []*Kraft{{ControllerNodeCount: 1}, {ControllerNodeCount: 1}} + } else { + TestKafkaManifest.Spec.Kraft = []*Kraft{kafkaManifest.Spec.Kraft[0], {ControllerNodeCount: 1}} + } + Expect(k8sClient.Create(ctx, &TestKafkaManifest)).ShouldNot(Succeed()) + + TestKafkaManifest.Spec.Kraft = []*Kraft{TestKafkaManifest.Spec.Kraft[0]} + TestKafkaManifest.Spec.Kraft[0].ControllerNodeCount = 4 + Expect(k8sClient.Create(ctx, &TestKafkaManifest)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.Kraft[0].ControllerNodeCount = 3 + + done := NewChannelWithTimeout(timeout) + + <-done + }) + }) + + When("updating a Kafka manifest", func() { + It("should test kafka update flow", func() { + TestKafkaManifest.Status.State = models.RunningStatus + Expect(k8sClient.Create(ctx, &TestKafkaManifest)).Should(Succeed()) + + patch := TestKafkaManifest.NewPatch() + TestKafkaManifest.Status.ID = models.CreatedEvent + Expect(k8sClient.Status().Patch(ctx, &TestKafkaManifest, patch)).Should(Succeed()) + + TestKafkaManifest.Spec.Name += "newValue" + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.Name = kafkaManifest.Spec.Name + + TestKafkaManifest.Spec.Version += ".1" + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.Version = kafkaManifest.Spec.Version + + TestKafkaManifest.Spec.PCICompliance = !kafkaManifest.Spec.PCICompliance + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.PCICompliance = kafkaManifest.Spec.PCICompliance + + TestKafkaManifest.Spec.PrivateNetworkCluster = !kafkaManifest.Spec.PrivateNetworkCluster + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.PrivateNetworkCluster = kafkaManifest.Spec.PrivateNetworkCluster + + TestKafkaManifest.Spec.PartitionsNumber += 1 + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.PartitionsNumber = kafkaManifest.Spec.PartitionsNumber + + TestKafkaManifest.Spec.AllowDeleteTopics = !kafkaManifest.Spec.AllowDeleteTopics + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.AllowDeleteTopics = kafkaManifest.Spec.AllowDeleteTopics + + TestKafkaManifest.Spec.AutoCreateTopics = !kafkaManifest.Spec.AutoCreateTopics + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.AutoCreateTopics = kafkaManifest.Spec.AutoCreateTopics + + TestKafkaManifest.Spec.ClientToClusterEncryption = !kafkaManifest.Spec.ClientToClusterEncryption + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.ClientToClusterEncryption = kafkaManifest.Spec.ClientToClusterEncryption + + TestKafkaManifest.Spec.BundledUseOnly = !kafkaManifest.Spec.BundledUseOnly + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.BundledUseOnly = kafkaManifest.Spec.BundledUseOnly + + TestKafkaManifest.Spec.PrivateNetworkCluster = !kafkaManifest.Spec.PrivateNetworkCluster + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.PrivateNetworkCluster = kafkaManifest.Spec.PrivateNetworkCluster + + TestKafkaManifest.Spec.ClientBrokerAuthWithMTLS = !kafkaManifest.Spec.ClientBrokerAuthWithMTLS + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.ClientBrokerAuthWithMTLS = kafkaManifest.Spec.ClientBrokerAuthWithMTLS + + prevSchemaRegistry := kafkaManifest.Spec.SchemaRegistry + TestKafkaManifest.Spec.SchemaRegistry = []*SchemaRegistry{prevSchemaRegistry[0], prevSchemaRegistry[0]} + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.SchemaRegistry = prevSchemaRegistry + + prevKarapaceSchemaRegistry := kafkaManifest.Spec.KarapaceSchemaRegistry + TestKafkaManifest.Spec.KarapaceSchemaRegistry = []*KarapaceSchemaRegistry{prevKarapaceSchemaRegistry[0], prevKarapaceSchemaRegistry[0]} + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.KarapaceSchemaRegistry = prevKarapaceSchemaRegistry + + prevRestProxy := kafkaManifest.Spec.RestProxy + TestKafkaManifest.Spec.RestProxy = []*RestProxy{prevRestProxy[0], prevRestProxy[0]} + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.RestProxy = prevRestProxy + + prevRestProxyVersion := prevRestProxy[0].Version + TestKafkaManifest.Spec.RestProxy[0].Version += ".0" + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.RestProxy[0].Version = prevRestProxyVersion + + prevKraft := kafkaManifest.Spec.Kraft + TestKafkaManifest.Spec.Kraft = []*Kraft{prevKraft[0], prevKraft[0]} + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.Kraft = prevKraft + + prevKraftControllerNodeCount := prevKraft[0].ControllerNodeCount + TestKafkaManifest.Spec.Kraft[0].ControllerNodeCount++ + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.Kraft[0].ControllerNodeCount = prevKraftControllerNodeCount + + prevKarapaceRestProxy := kafkaManifest.Spec.KarapaceRestProxy + TestKafkaManifest.Spec.KarapaceRestProxy = []*KarapaceRestProxy{prevKarapaceRestProxy[0], prevKarapaceRestProxy[0]} + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.KarapaceRestProxy = prevKarapaceRestProxy + + prevConcurrency := kafkaManifest.Spec.ResizeSettings[0].Concurrency + TestKafkaManifest.Spec.ResizeSettings[0].Concurrency++ + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.ResizeSettings[0].Concurrency = prevConcurrency + + prevDedicatedZookeeper := kafkaManifest.Spec.DedicatedZookeeper + TestKafkaManifest.Spec.DedicatedZookeeper = []*DedicatedZookeeper{prevDedicatedZookeeper[0], prevDedicatedZookeeper[0]} + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DedicatedZookeeper = prevDedicatedZookeeper + + prevDedicatedZookeeperNodesNumber := kafkaManifest.Spec.DedicatedZookeeper[0].NodesNumber + TestKafkaManifest.Spec.DedicatedZookeeper[0].NodesNumber++ + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DedicatedZookeeper[0].NodesNumber = prevDedicatedZookeeperNodesNumber + + TestKafkaManifest.Spec.DataCentres = []*KafkaDataCentre{} + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DataCentres = kafkaManifest.Spec.DataCentres + + By("changing datacentres fields") + prevIntField := kafkaManifest.Spec.DataCentres[0].NodesNumber + TestKafkaManifest.Spec.DataCentres[0].NodesNumber++ + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DataCentres[0].NodesNumber = prevIntField + + prevIntField = kafkaManifest.Spec.DataCentres[0].NodesNumber + TestKafkaManifest.Spec.DataCentres[0].NodesNumber = 0 + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DataCentres[0].NodesNumber = prevIntField + + prevStringField := kafkaManifest.Spec.DataCentres[0].Name + TestKafkaManifest.Spec.DataCentres[0].Name += "test" + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DataCentres[0].Name = prevStringField + + prevStringField = kafkaManifest.Spec.DataCentres[0].Region + TestKafkaManifest.Spec.DataCentres[0].Region += "test" + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DataCentres[0].Region = prevStringField + + prevStringField = kafkaManifest.Spec.DataCentres[0].CloudProvider + TestKafkaManifest.Spec.DataCentres[0].CloudProvider += "test" + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DataCentres[0].CloudProvider = prevStringField + + prevStringField = kafkaManifest.Spec.DataCentres[0].ProviderAccountName + TestKafkaManifest.Spec.DataCentres[0].ProviderAccountName += "test" + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DataCentres[0].ProviderAccountName = prevStringField + + prevStringField = kafkaManifest.Spec.DataCentres[0].Network + TestKafkaManifest.Spec.DataCentres[0].Network += "test" + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DataCentres[0].Network = prevStringField + + prevCloudProviderSettings := kafkaManifest.Spec.DataCentres[0].CloudProviderSettings + TestKafkaManifest.Spec.DataCentres[0].CloudProviderSettings = []*CloudProviderSettings{prevCloudProviderSettings[0], prevCloudProviderSettings[0]} + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DataCentres[0].CloudProviderSettings = prevCloudProviderSettings + + prevMapField := TestKafkaManifest.Spec.DataCentres[0].Tags + TestKafkaManifest.Spec.DataCentres[0].Tags["test"] = "test" + Expect(k8sClient.Patch(ctx, &TestKafkaManifest, patch)).ShouldNot(Succeed()) + TestKafkaManifest.Spec.DataCentres[0].Tags = prevMapField + + done := NewChannelWithTimeout(timeout) + <-done + }) + }) +}) diff --git a/apis/clusters/v1beta1/opensearch_webhook.go b/apis/clusters/v1beta1/opensearch_webhook.go index 417c04e03..d2992aa8a 100644 --- a/apis/clusters/v1beta1/opensearch_webhook.go +++ b/apis/clusters/v1beta1/opensearch_webhook.go @@ -363,31 +363,37 @@ func (oss *OpenSearchSpec) validateImmutableDataCentresUpdate(oldDCs []*OpenSear return models.ErrImmutableDataCentresNumber } + toValidate := map[string]*OpenSearchDataCentre{} + for _, dc := range oldDCs { + toValidate[dc.Name] = dc + } + for _, newDC := range newDCs { - for _, oldDC := range oldDCs { - if oldDC.Name == newDC.Name { - newDCImmutableFields := newDC.newImmutableFields() - oldDCImmutableFields := oldDC.newImmutableFields() - - if *newDCImmutableFields != *oldDCImmutableFields { - return fmt.Errorf("cannot update immutable data centre fields: new spec: %v: old spec: %v", newDCImmutableFields, oldDCImmutableFields) - } - - err := validateImmutableCloudProviderSettingsUpdate(newDC.CloudProviderSettings, oldDC.CloudProviderSettings) - if err != nil { - return err - } - - err = newDC.validateDataNode(oss.DataNodes) - if err != nil { - return err - } - - err = validateTagsUpdate(newDC.Tags, oldDC.Tags) - if err != nil { - return err - } - } + oldDC, ok := toValidate[newDC.Name] + if !ok { + return fmt.Errorf("cannot change datacentre name: %v", newDC.Name) + } + + newDCImmutableFields := newDC.newImmutableFields() + oldDCImmutableFields := oldDC.newImmutableFields() + + if *newDCImmutableFields != *oldDCImmutableFields { + return fmt.Errorf("cannot update immutable data centre fields: new spec: %v: old spec: %v", newDCImmutableFields, oldDCImmutableFields) + } + + err := validateImmutableCloudProviderSettingsUpdate(newDC.CloudProviderSettings, oldDC.CloudProviderSettings) + if err != nil { + return err + } + + err = newDC.validateDataNode(oss.DataNodes) + if err != nil { + return err + } + + err = validateTagsUpdate(newDC.Tags, oldDC.Tags) + if err != nil { + return err } } diff --git a/apis/clusters/v1beta1/structs.go b/apis/clusters/v1beta1/structs.go index 118b9d346..f03c453d6 100644 --- a/apis/clusters/v1beta1/structs.go +++ b/apis/clusters/v1beta1/structs.go @@ -19,6 +19,7 @@ package v1beta1 import ( "encoding/json" "net" + "time" clusterresource "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/apiextensions" @@ -725,6 +726,15 @@ func (cs *ClusterStatus) PrivateLinkStatusesEqual(iStatus *ClusterStatus) bool { return true } +func NewChannelWithTimeout(timeout time.Duration) chan struct{} { + done := make(chan struct{}, 1) + _ = time.AfterFunc(timeout, func() { + close(done) + }) + + return done +} + // +kubebuilder:object:generate:=false type Reference = apiextensions.ObjectReference diff --git a/apis/clusters/v1beta1/webhook_suite_test.go b/apis/clusters/v1beta1/webhook_suite_test.go index ac04920c9..1d9616551 100644 --- a/apis/clusters/v1beta1/webhook_suite_test.go +++ b/apis/clusters/v1beta1/webhook_suite_test.go @@ -37,13 +37,21 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "github.com/instaclustr/operator/pkg/instaclustr/mock/appversionsmock" ) -var cfg *rest.Config -var k8sClient client.Client -var testEnv *envtest.Environment -var ctx context.Context -var cancel context.CancelFunc +var ( + defaultNS = "default" + + timeout = time.Millisecond * 1000 + + cfg *rest.Config + k8sClient client.Client + testEnv *envtest.Environment + ctx context.Context + cancel context.CancelFunc +) func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -84,6 +92,7 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) + api := appversionsmock.NewInstAPI() // start webhook server using Manager webhookInstallOptions := &testEnv.WebhookInstallOptions mgr, err := ctrl.NewManager(cfg, ctrl.Options{ @@ -108,7 +117,7 @@ var _ = BeforeSuite(func() { err = (&OpenSearch{}).SetupWebhookWithManager(mgr, nil) Expect(err).NotTo(HaveOccurred()) - err = (&Kafka{}).SetupWebhookWithManager(mgr, nil) + err = (&Kafka{}).SetupWebhookWithManager(mgr, api) Expect(err).NotTo(HaveOccurred()) err = (&KafkaConnect{}).SetupWebhookWithManager(mgr, nil) diff --git a/config/samples/clusters_v1beta1_opensearch.yaml b/config/samples/clusters_v1beta1_opensearch.yaml index 11268d699..7ce9922f0 100644 --- a/config/samples/clusters_v1beta1_opensearch.yaml +++ b/config/samples/clusters_v1beta1_opensearch.yaml @@ -46,7 +46,7 @@ spec: # - nodeSize: SRH-DEV-t4g.small-5 # oidcProvider: '' # version: opensearch-dashboards:2.5.0 - version: 2.9.0 + version: 2.11.1 pciCompliance: false privateNetworkCluster: false reportingPlugin: false diff --git a/controllers/clusters/datatest/kafka_v1beta1.yaml b/controllers/clusters/datatest/kafka_v1beta1.yaml index 400c5cab0..4d73b5344 100644 --- a/controllers/clusters/datatest/kafka_v1beta1.yaml +++ b/controllers/clusters/datatest/kafka_v1beta1.yaml @@ -17,29 +17,32 @@ spec: privateNetworkCluster: true slaTier: "NON_PRODUCTION" # bundledUseOnly: true -# clientAuthBrokerWithEncryption: true -# clientAuthBrokerWithoutEncryption: true # clientBrokerAuthWithMtls: true -# dedicatedZookeeper: -# - nodeSize: "KDZ-DEV-t4g.small-30" -# nodesNumber: 3 -# twoFactorDelete: -# - email: "emailTEST" -# phone: "phoneTEST" -# karapaceSchemaRegistry: -# - version: "3.2.0" -# schemaRegistry: -# - version: "5.0.0" -# karapaceRestProxy: -# - integrateRestProxyWithSchemaRegistry: true -# version: "3.2.0" -# restProxy: -# - integrateRestProxyWithSchemaRegistry: true -# schemaRegistryUsername: "username" -# schemaRegistryPassword: "asdfasdf" -# schemaRegistryServerUrl: "schemaRegistryServerUrl" -# "useLocalSchemaRegistry": true -# version: "5.0.0" + dedicatedZookeeper: + - nodeSize: "KDZ-DEV-t4g.small-30" + nodesNumber: 3 + twoFactorDelete: + - email: "emailTEST" + phone: "phoneTEST" + karapaceSchemaRegistry: + - version: "3.2.0" + schemaRegistry: + - version: "5.0.0" + karapaceRestProxy: + - integrateRestProxyWithSchemaRegistry: true + version: "3.2.0" + kraft: + - controllerNodeCount: 3 + restProxy: + - integrateRestProxyWithSchemaRegistry: true + schemaRegistryUsername: "username" + schemaRegistryPassword: "asdfasdf" + schemaRegistryServerUrl: "schemaRegistryServerUrl" + "useLocalSchemaRegistry": true + version: "5.0.0" + resizeSettings: + - notifySupportContacts: false + concurrency: 1 dataCentres: - name: "AWS_VPC_US_EAST_1" nodesNumber: 3 @@ -50,10 +53,10 @@ spec: nodeSize: "KFK-DEV-t4g.small-5" network: "10.0.0.0/16" region: "US_EAST_1" -# accountName: "Custrom" -# cloudProviderSettings: + accountName: "Custrom" + cloudProviderSettings: # - customVirtualNetworkId: "vpc-12345678" -# diskEncryptionKey: "123e4567-e89b-12d3-a456-426614174000" + - diskEncryptionKey: "123e4567-e89b-12d3-a456-426614174000" # resourceGroup: "asdfadfsdfas" # privateLink: # - advertisedHostname: "asdfadsf" diff --git a/controllers/clusters/kafka_controller_test.go b/controllers/clusters/kafka_controller_test.go index e3839142f..5af07c5c5 100644 --- a/controllers/clusters/kafka_controller_test.go +++ b/controllers/clusters/kafka_controller_test.go @@ -18,7 +18,6 @@ package clusters import ( "context" - openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" "os" . "github.com/onsi/ginkgo/v2" @@ -30,6 +29,7 @@ import ( "github.com/instaclustr/operator/apis/clusters/v1beta1" "github.com/instaclustr/operator/controllers/tests" "github.com/instaclustr/operator/pkg/instaclustr" + openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" "github.com/instaclustr/operator/pkg/models" ) diff --git a/pkg/instaclustr/mock/appversionsmock/models.go b/pkg/instaclustr/mock/appversionsmock/models.go new file mode 100644 index 000000000..efc4ed1da --- /dev/null +++ b/pkg/instaclustr/mock/appversionsmock/models.go @@ -0,0 +1,19 @@ +package appversionsmock + +import ( + "net/http" + + "github.com/instaclustr/operator/pkg/models" +) + +type mockClient struct { + *http.Client +} + +func NewInstAPI() *mockClient { + return &mockClient{} +} + +func (c *mockClient) ListAppVersions(app string) ([]*models.AppVersions, error) { + return []*models.AppVersions{{Application: app, Versions: []string{"1.0.0"}}}, nil +}