Skip to content

Commit

Permalink
issue-639, integration tests for kafkauser controller
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Siryk authored and Bohdan Siryk committed Dec 11, 2023
1 parent 8dd9a0e commit 9e94866
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 1 deletion.
13 changes: 13 additions & 0 deletions controllers/kafkamanagement/datatest/kafkauser_v1beta1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: kafkamanagement.instaclustr.com/v1beta1
kind: KafkaUser
metadata:
name: kafkauser-test-sample
namespace: default
spec:
secretRef:
name: "secret-test-sample"
namespace: "default"
initialPermissions: "standard"
overrideExistingUser: true
saslScramMechanism: "SCRAM-SHA-256"
authMechanism: "SASL"
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: v1
kind: Secret
metadata:
name: secret-test-sample
namespace: default
data:
username: "U2FuY2gtdHdvCg=="
password: "Qm9oZGFuX3ViZXJfdW50ZXJfaWdvcl90b2xpazEK"
7 changes: 6 additions & 1 deletion controllers/kafkamanagement/kafkauser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,12 @@ func (r *KafkaUserReconciler) findSecretObjects(secret client.Object) []ctrl.Req
requests := make([]ctrl.Request, len(kafkaUserList.Items))
for i, item := range kafkaUserList.Items {
patch := item.NewPatch()
item.GetAnnotations()[models.ResourceStateAnnotation] = models.SecretEvent
annotations := item.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
item.SetAnnotations(annotations)
}
annotations[models.ResourceStateAnnotation] = models.SecretEvent
err = r.Patch(context.TODO(), &item, patch)
if err != nil {
return []ctrl.Request{}
Expand Down
77 changes: 77 additions & 0 deletions controllers/kafkamanagement/kafkauser_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package kafkamanagement

import (
"os"
"path/filepath"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/yaml"

"github.com/instaclustr/operator/apis/kafkamanagement/v1beta1"
"github.com/instaclustr/operator/pkg/models"
)

var _ = Describe("Kafka User Controller", func() {
secret := &v1.Secret{}
kafkaUser := &v1beta1.KafkaUser{}

b, err := os.ReadFile(filepath.Join(".", "datatest", "kafkauser_v1beta1.yaml"))
Expect(err).NotTo(HaveOccurred())

err = yaml.Unmarshal(b, kafkaUser)
Expect(err).NotTo(HaveOccurred())

b, err = os.ReadFile(filepath.Join(".", "datatest", "kafkauser_v1beta1_secret.yaml"))
Expect(err).NotTo(HaveOccurred())

err = yaml.Unmarshal(b, secret)
Expect(err).NotTo(HaveOccurred())

const testClusterID = "testClusterID"

When("Apply kafka user manifest", func() {
It("creates kafka user resource and waits until status is filled", func() {
Expect(k8sClient.Create(ctx, secret)).Should(Succeed())
Expect(k8sClient.Create(ctx, kafkaUser)).Should(Succeed())

Expect(kafkaUser.Status.ClustersEvents).Should(BeEmpty())
})
})

When("Add kafka user reference in kafka cluster resource", func() {
It("should create user for the given cluster", func() {
patch := kafkaUser.NewPatch()
kafkaUser.Status.ClustersEvents = map[string]string{
testClusterID: models.CreatingEvent,
}

Expect(k8sClient.Status().Patch(ctx, kafkaUser, patch)).Should(Succeed())

Expect(kafkaUser.Status.ClustersEvents).ShouldNot(BeEmpty())

Eventually(func(g Gomega) {
g.Expect(kafkaUser.Status.ClustersEvents[testClusterID]).Should(Or(
Equal(models.Created),
Equal(models.UpdatedEvent),
))
}, timeout, interval)
})
})

When("Remove kafka user reference from kafka cluster resource", func() {
It("should delete user from the given cluster", func() {
patch := kafkaUser.NewPatch()
kafkaUser.Status.ClustersEvents = map[string]string{
testClusterID: models.DeletingEvent,
}

Expect(k8sClient.Status().Patch(ctx, kafkaUser, patch)).Should(Succeed())

Eventually(func(g Gomega) {
g.Expect(kafkaUser.Status.ClustersEvents).Should(BeEmpty())
}, timeout, interval)
})
})
})
42 changes: 42 additions & 0 deletions pkg/instaclustr/mock/server/go/api_bundle_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ func (c *BundleUserAPIController) Routes() Routes {
"/provisioning/v1/{clusterId}",
c.GetDefaultCreds,
},
"CreateKafkaUser": Route{
Method: strings.ToUpper("POST"),
Pattern: "/cluster-management/v2/resources/applications/kafka/users/v3/",
HandlerFunc: c.CreateKafkaUser,
},
"DeleteKafkaUser": Route{
Method: strings.ToUpper("POST"),
Pattern: "/cluster-management/v2/resources/applications/kafka/users/v3/{userId}",
HandlerFunc: c.CreateKafkaUser,
},
}
}

Expand Down Expand Up @@ -160,3 +170,35 @@ func (c *BundleUserAPIController) GetDefaultCreds(w http.ResponseWriter, r *http

EncodeJSONResponse(result.Body, &result.Code, w)
}

func (c *BundleUserAPIController) CreateKafkaUser(w http.ResponseWriter, r *http.Request) {
kafkaUserCreate := &KafkaUserV3{}
err := json.NewDecoder(r.Body).Decode(kafkaUserCreate)
if err != nil {
c.errorHandler(w, r, &ParsingError{Err: err}, nil)
return
}

result, err := c.service.CreateUser(r.Context(), kafkaUserCreate.ClusterId, "", BundleUserCreateRequest{Username: kafkaUserCreate.Username})
if err != nil {
c.errorHandler(w, r, err, &result)
return
}

EncodeJSONResponse(result.Body, &result.Code, w)
}

func (c *BundleUserAPIController) DeleteKafkaUser(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
parts := strings.Split(params["userId"], "_")

clusterID, username := parts[0], parts[1]

result, err := c.service.DeleteUser(r.Context(), clusterID, "", BundleUserDeleteRequest{Username: username})
if err != nil {
c.errorHandler(w, r, err, &result)
return
}

EncodeJSONResponse(result.Body, &result.Code, w)
}
19 changes: 19 additions & 0 deletions pkg/instaclustr/mock/server/go/api_bundle_user_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"fmt"
"net/http"
"sync"

"k8s.io/utils/strings/slices"
)
Expand All @@ -21,6 +22,7 @@ import (
// This service should implement the business logic for every endpoint for the BundleUserAPI API.
// Include any external packages or services that will be required by this service.
type BundleUserAPIService struct {
mu sync.RWMutex
users map[string][]string
}

Expand All @@ -37,6 +39,9 @@ func (s *BundleUserAPIService) CreateUser(ctx context.Context, clusterId string,
// TODO: Uncomment the next line to return response Response(404, ErrorMessage{}) or use other options such as http.Ok ...
// return Response(404, ErrorMessage{}), nil

s.mu.Lock()
defer s.mu.Unlock()

if slices.Contains(s.users[clusterId], bundleUserCreateRequest.Username) {
return Response(400, GenericResponse{fmt.Sprintf("The user already exists, Username: %s", bundleUserCreateRequest.Username)}), nil
}
Expand Down Expand Up @@ -74,11 +79,25 @@ func (s *BundleUserAPIService) DeleteUser(ctx context.Context, clusterId string,
// TODO: Uncomment the next line to return response Response(504, {}) or use other options such as http.Ok ...
// return Response(504, nil),nil

s.mu.Lock()
defer s.mu.Unlock()

users := s.users[clusterId]
for i, username := range users {
if username == bundleUserDeleteRequest.Username {
users = append(users[:i], users[i+1:]...)
break
}
}
s.users[clusterId] = users

return Response(200, GenericResponse{}), nil
}

// FetchUsers - Fetch a bundle users
func (s *BundleUserAPIService) FetchUsers(ctx context.Context, clusterId string, bundle string) (ImplResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()

users := s.users[clusterId]

Expand Down

0 comments on commit 9e94866

Please sign in to comment.