Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka cluster create test #988

Merged
merged 74 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
9cbdff2
test(docs/install): added install e2e test
pregnor May 19, 2023
3f98e58
test(docs/install): added localversion 4 Koperator
pregnor May 21, 2023
9a7ce58
feat(e2e): added uninstall koperator and dep
bartam1 May 28, 2023
d1852e7
fix: make ordered root container
bartam1 May 28, 2023
9569f4c
fix: typos
bartam1 May 28, 2023
5b64596
refact 1
bartam1 May 28, 2023
3a9d96d
refact 2
bartam1 May 29, 2023
233cd0f
add(beta): topic produce-consume test
bartam1 May 29, 2023
470c0fc
refactor 3
bartam1 May 29, 2023
5e1b0d3
refactor 4
bartam1 May 30, 2023
c81245c
refactor 5
bartam1 May 30, 2023
0d967fd
add(beta) example for external produce consume
bartam1 May 30, 2023
ac685c1
Fix getK8sResources
bartam1 May 30, 2023
b44b042
fix: checking all resource type
bartam1 May 30, 2023
d9b806a
refactor 6
bartam1 May 31, 2023
0c2ca24
add: more log messages
bartam1 May 31, 2023
02aa315
add(beta) zookeepercluster remove
bartam1 May 31, 2023
d94f68d
fix: remove warnings from output
bartam1 May 31, 2023
438d797
add: const.go
bartam1 Jun 1, 2023
901f912
fix: zookeeper import
bartam1 Jun 1, 2023
f184144
refactor 7
bartam1 Jun 4, 2023
0c55e90
refactor 8
bartam1 Jun 5, 2023
b9ba76d
fix: remove David test
bartam1 Jun 5, 2023
6cbffe3
fix: typo
bartam1 Jun 5, 2023
dbf3ced
Initial simplekafkaclsuter test
Kuvesz May 30, 2023
2bb815a
Updated zookeper address to zookeeper-server
Kuvesz May 30, 2023
2e51ffb
Updated tests to use Marton's wait function
Kuvesz May 30, 2023
fdcac51
Various small fixes, added uninstall steps
Kuvesz May 30, 2023
88a076a
Various small fixes
Kuvesz May 31, 2023
f2c70a4
Random updates regarding reviews
Kuvesz May 31, 2023
10bbfd6
Update after rebase
Kuvesz May 31, 2023
1dece6d
add: const.go
bartam1 Jun 1, 2023
4dd384e
added ssl kafkacluster
Kuvesz Jun 1, 2023
fc6d1f5
Removed debugging stuff accidentally left there
Kuvesz Jun 1, 2023
f48c778
refactor 7
bartam1 Jun 4, 2023
2b2021b
refactor 8
bartam1 Jun 5, 2023
3b57397
Initial simplekafkaclsuter test
Kuvesz May 30, 2023
bde4d21
Updated tests to use Marton's wait function
Kuvesz May 30, 2023
61dbe7e
added ssl kafkacluster
Kuvesz Jun 1, 2023
6c72e68
Updated to latest of the uninstall branch
Kuvesz Jun 5, 2023
ef6db94
Fixes after messing up the rebase a bit
Kuvesz Jun 5, 2023
d6c4d79
fix: remove zookeeperclusterready fn
bartam1 Jun 5, 2023
8ee5d6d
remove: unnecessary tests
bartam1 Jun 5, 2023
5353f0d
add: externalListener go-template
bartam1 Jun 5, 2023
4e3e226
Updates requested by reviews
Kuvesz Jun 5, 2023
da35a73
Merge remote-tracking branch 'origin/test/de2e-koperator-uninstall' i…
Kuvesz Jun 5, 2023
1fb27d3
Using time.Duration everywhere
bartam1 Jun 6, 2023
ded5be7
remove: ordered keyword from unnecessary places
bartam1 Jun 6, 2023
1c73dd8
fix: uninstallHelmChartIfExists
bartam1 Jun 6, 2023
a52164a
fix: typos
bartam1 Jun 6, 2023
e72407e
refactor: based on Kuvesz review
bartam1 Jun 6, 2023
d857945
Fixed rest of review comments
Kuvesz Jun 6, 2023
6c1167b
remove: external consumer-producer test (another PR)
bartam1 Jun 6, 2023
2df9453
Remove unnecessary timeout check
bartam1 Jun 7, 2023
b905456
fix: requireUninstallingKoperator description
bartam1 Jun 7, 2023
be93824
Merge remote-tracking branch 'origin/test/de2e-koperator-uninstall' i…
Kuvesz Jun 7, 2023
d1ded5c
Update zookeeper_cluster_test.go
Kuvesz Jun 7, 2023
264080a
Update zookeeper_cluster_test.go
Kuvesz Jun 7, 2023
f27c658
Update tests/e2e/koperator_test.go
Kuvesz Jun 7, 2023
ee81836
Update kafka_cluster_test.go
Kuvesz Jun 7, 2023
9415e7f
Update zookeeper_cluster_test.go
Kuvesz Jun 7, 2023
2873698
Merge branch 'master' into test/de2e-koperator-simplekafkacluster
Kuvesz Jun 20, 2023
1b37d58
Merge branch 'master' into test/de2e-koperator-simplekafkacluster
Kuvesz Jun 20, 2023
8d8cbb9
Updated implementation to reflect changes on master
Kuvesz Jun 20, 2023
a522975
fixed rebase messup
Kuvesz Jun 21, 2023
795e397
Review updates, removed zookeeper renaming
Kuvesz Jun 22, 2023
6be8bef
Updated configmap_test zookeeper name to original
Kuvesz Jun 22, 2023
09efd20
updates requested in reviews
Kuvesz Jun 22, 2023
ee9a646
Merge branch 'master' into test/de2e-koperator-simplekafkacluster
Kuvesz Jun 22, 2023
d4ac7e6
Merge branch 'master' into test/de2e-koperator-simplekafkacluster
Kuvesz Jun 22, 2023
28c22c9
added debug error log
Kuvesz Jun 22, 2023
643d4bc
Merge branch 'master' into test/de2e-koperator-simplekafkacluster
Kuvesz Jun 22, 2023
37b447b
Merge branch 'master' into test/de2e-koperator-simplekafkacluster
Kuvesz Jun 23, 2023
1480b90
Merge branch 'master' into test/de2e-koperator-simplekafkacluster
Kuvesz Jun 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/resources/kafka/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestGetMountPathsFromBrokerConfigMap(t *testing.T) {
listeners=INTERNAL://:29092,CONTROLLER://:29093
log.dirs=/kafka-logs3/kafka,/kafka-logs/kafka,/kafka-logs2/kafka,/kafka-logs4/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter\noffsets.topic.replication.factor=2
zookeeper.connect=zookeeper-server-client.zookeeper:2181/
zookeeper.connect=zookeeper-client.zookeeper:2181/
pregnor marked this conversation as resolved.
Show resolved Hide resolved
`},
},
expectedLogDirs: []string{"/kafka-logs3/kafka", "/kafka-logs/kafka", "/kafka-logs2/kafka", "/kafka-logs4/kafka"},
Expand Down
13 changes: 10 additions & 3 deletions tests/e2e/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,23 @@ const (
zookeeperClusterName = "zookeeper-server"
managedByHelmLabelTemplate = "app.kubernetes.io/managed-by=Helm,app.kubernetes.io/instance=%s"

cruiseControlPodReadinessTimeout = 50 * time.Second
kafkaClusterResourceReadinessTimeout = 60 * time.Second
defaultDeletionTimeout = 20 * time.Second
defaultPodReadinessWaitTime = 10 * time.Second
defaultTopicCreationWaitTime = 10 * time.Second
kafkaClusterResourceCleanupTimeout = 30 * time.Second
kafkaClusterCreateTimeout = 500 * time.Second
kafkaClusterResourceCleanupTimeout = 120 * time.Second
zookeeperClusterCreateTimeout = 4 * time.Minute
zookeeperClusterResourceCleanupTimeout = 60 * time.Second
externalConsumerTimeout = 5 * time.Second
externalProducerTimeout = 5 * time.Second

kcatPodTemplate = "templates/kcat.yaml.tmpl"
kafkaTopicTemplate = "templates/topic.yaml.tmpl"
zookeeperReplicaCount = 1

kcatPodTemplate = "templates/kcat.yaml.tmpl"
kafkaTopicTemplate = "templates/topic.yaml.tmpl"
zookeeperClusterTemplate = "templates/zookeeper_cluster.yaml.tmpl"

kubectlNotFoundErrorMsg = "NotFound"
)
Expand Down
6 changes: 6 additions & 0 deletions tests/e2e/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ func listHelmReleases(kubectlOptions k8s.KubectlOptions) ([]*HelmRelease, error)
"list",
"--output", "json",
)

fmt.Println("OUTPUT: " + output)
if strings.Contains(output, "[]") || strings.Contains(output, "WARNING") {
return nil, nil
}

Kuvesz marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.WrapIf(err, "listing Helm releases failed")
}
Expand Down
89 changes: 89 additions & 0 deletions tests/e2e/install_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"context"
"fmt"
"time"

"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/gruntwork-io/terratest/modules/k8s"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

// requireCreatingKafkaCluster creates a KafkaCluster and
// checks the success of that operation.
func requireCreatingKafkaCluster(kubectlOptions k8s.KubectlOptions, koperatorVersion string, sampleFile string) {
It("Deploying a KafkaCluster", func() {

By("Checking existing KafkaClusters")
err := checkExistenceOfK8sResource(kubectlOptions, kafkaKind, kafkaClusterName)
if err == nil {
By(fmt.Sprintf("KafkaCluster %s already exists\n", kafkaClusterName))
} else {
By("Deploying a KafkaCluster")
err = applyKoperatorSampleResource(kubectlOptions, koperatorVersion, sampleFile)
Expect(err).NotTo(HaveOccurred())
}

By("Verifying the KafkaCluster state")
err = waitK8sResourceCondition(kubectlOptions, kafkaKind, fmt.Sprintf("jsonpath={.status.state}=%s", string(v1beta1.KafkaClusterRunning)), kafkaClusterCreateTimeout, "", kafkaClusterName)
Expect(err).NotTo(HaveOccurred())

By("Verifying the CruiseControl pod")
Eventually(context.Background(), func() error {
return waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", cruiseControlPodReadinessTimeout, v1beta1.KafkaCRLabelKey+"="+kafkaClusterName+",app=cruisecontrol", "")
}, kafkaClusterResourceReadinessTimeout, 3*time.Second).ShouldNot(HaveOccurred())

By("Verifying all Kafka pods")
Kuvesz marked this conversation as resolved.
Show resolved Hide resolved
err = waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", defaultPodReadinessWaitTime, v1beta1.KafkaCRLabelKey+"="+kafkaClusterName, "")
Expect(err).NotTo(HaveOccurred())
})
}

// requireCreatingZookeeperCluster creates a ZookeeperCluster and
// checks the success of that operation.
func requireCreatingZookeeperCluster(kubectlOptions k8s.KubectlOptions, zkClusterReplicaCount int) {
It("Deploying a ZookeeperCluster", func() {

By("Checking existing ZookeeperClusters")
err := checkExistenceOfK8sResource(kubectlOptions, zookeeperKind, zookeeperClusterName)
if err == nil {
By(fmt.Sprintf("ZookeeperCluster %s already exists\n", zookeeperClusterName))
} else {
By("Deploying the sample ZookeeperCluster")
err = applyK8sResourceFromTemplate(kubectlOptions,
zookeeperClusterTemplate,
map[string]interface{}{
"Name": zookeeperClusterName,
"Namespace": kubectlOptions.Namespace,
"Replicas": zkClusterReplicaCount,
},
)
Expect(err).NotTo(HaveOccurred())
}

By("Verifying the ZookeeperCluster resource")
err = waitK8sResourceCondition(kubectlOptions, zookeeperKind, "jsonpath={.status.readyReplicas}=1", zookeeperClusterCreateTimeout, "", zookeeperClusterName)
Expect(err).NotTo(HaveOccurred())

By("Verifying the ZookeeperCluster's pods")
err = waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", defaultPodReadinessWaitTime, "app="+zookeeperClusterName, "")
Expect(err).NotTo(HaveOccurred())
})
}
11 changes: 11 additions & 0 deletions tests/e2e/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ func applyK8sResourceManifest(kubectlOptions k8s.KubectlOptions, manifestPath st
k8s.KubectlApply(GinkgoT(), &kubectlOptions, manifestPath)
}

// checkExistenceOfK8sResource queries a Resource by it's kind, namespace and name and
// returns the output of stderr
func checkExistenceOfK8sResource(
kubectlOptions k8s.KubectlOptions,
resourceKind string,
resourceName string,
) error {
pregnor marked this conversation as resolved.
Show resolved Hide resolved
By(fmt.Sprintf("Checking the existence of resource %s in namespace %s (kind: %s)", resourceName, kubectlOptions.Namespace, resourceKind))
return k8s.RunKubectlE(GinkgoT(), &kubectlOptions, "get", resourceKind, resourceName)
}

// createOrReplaceK8sResourcesFromManifest creates non-existent Kubernetes
// resources or replaces existing ones from the specified manifest to the
// provided kubectl context and namespace.
Expand Down
79 changes: 79 additions & 0 deletions tests/e2e/koperator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"emperror.dev/errors"
"github.com/gruntwork-io/terratest/modules/k8s"
. "github.com/onsi/ginkgo/v2"
)

// applyKoperatorSampleResource deploys the specified sample resource (config/samples).
// The full path of the manifest also can be specified.
// It supports different versions that can be specified with the koperatorVersion parameter.
func applyKoperatorSampleResource(kubectlOptions k8s.KubectlOptions, koperatorVersion Version, sampleFile string) error {
pregnor marked this conversation as resolved.
Show resolved Hide resolved
By(fmt.Sprintf("Retrieving Koperator sample resource: '%s' with version: '%s' ", sampleFile, koperatorVersion))

sampleFileSplit := strings.Split(sampleFile, "/")
if len(sampleFileSplit) == 0 {
Kuvesz marked this conversation as resolved.
Show resolved Hide resolved
return errors.Errorf("sample file path shouldn't be empty")
}

var err error
var rawKoperatorSampleResource []byte

switch koperatorVersion {
case LocalVersion:
if len(sampleFileSplit) == 1 {
sampleFile = fmt.Sprintf("../../config/samples/%s", sampleFile)
}

rawKoperatorSampleResource, err = os.ReadFile(sampleFile)
if err != nil {
return err
}
default:
httpClient := new(http.Client)
httpClient.Timeout = 5 * time.Second

if len(sampleFileSplit) != 1 {
return errors.Errorf("sample file path shouldn't contain a \"/\" character")
}

response, err := httpClient.Get("https://raw.githubusercontent.com/banzaicloud/koperator/" + koperatorVersion + "/config/samples/" + sampleFile)
if response != nil {
defer func() { _ = response.Body.Close() }()
}
if err != nil {
return err
}

rawKoperatorSampleResource, err = io.ReadAll(response.Body)
if err != nil {
return err
}
}

By(fmt.Sprintf("Applying K8s manifest %s", sampleFile))
k8s.KubectlApplyFromString(GinkgoT(), &kubectlOptions, string(rawKoperatorSampleResource))
return nil
Kuvesz marked this conversation as resolved.
Show resolved Hide resolved
}
7 changes: 4 additions & 3 deletions tests/e2e/koperator_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ var _ = BeforeSuite(func() {
})

var _ = When("Testing e2e test altogether", Ordered, func() {
//testInstall()
//testProduceConsumeInternal()
testUninstallZookeeperCluster()
testInstall()
testInstallZookeeperCluster()
testInstallKafkaCluster("local", "simplekafkacluster.yaml")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SSL cluster install would basically be the same, just with the filename change to simplekafkacluster_ssl.yaml

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required: IMO we MUST add both variants here.

testUninstallKafkaCluster()
testUninstallZookeeperCluster()
testUninstall()
})
9 changes: 9 additions & 0 deletions tests/e2e/templates/zookeeper_cluster.yaml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
name: {{ .Name }}
namespace: {{ or .Namespace "zookeeper" }}
spec:
replicas: {{ or .Replicas 1 }}
persistence:
reclaimPolicy: Delete
51 changes: 51 additions & 0 deletions tests/e2e/test_install_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"github.com/gruntwork-io/terratest/modules/k8s"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func testInstallZookeeperCluster() bool {
return When("Installing Zookeeper cluster", func() {
var kubectlOptions k8s.KubectlOptions
var err error

It("Acquiring K8s config and context", func() {
kubectlOptions, err = kubectlOptionsForCurrentContext()
Expect(err).NotTo(HaveOccurred())
})

kubectlOptions.Namespace = zookeeperOperatorHelmDescriptor.Namespace
requireCreatingZookeeperCluster(kubectlOptions, zookeeperReplicaCount)
})
}
pregnor marked this conversation as resolved.
Show resolved Hide resolved

func testInstallKafkaCluster(koperatorVersion string, kafkaClusterSample string) bool {
return When("Installing Kafka cluster", func() {
var kubectlOptions k8s.KubectlOptions
var err error

It("Acquiring K8s config and context", func() {
kubectlOptions, err = kubectlOptionsForCurrentContext()
Expect(err).NotTo(HaveOccurred())
})

kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace
requireCreatingKafkaCluster(kubectlOptions, koperatorVersion, kafkaClusterSample)
})
}