Skip to content

Commit

Permalink
kafkauser endpoints were migrated to APIv3
Browse files Browse the repository at this point in the history
  • Loading branch information
tengu-alt committed Oct 3, 2023
1 parent 3723b60 commit 21e3445
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 113 deletions.
35 changes: 13 additions & 22 deletions apis/kafkamanagement/v1beta1/kafkauser_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,12 @@ import (

// KafkaUserSpec defines the desired state of KafkaUser
type KafkaUserSpec struct {
Options *KafkaUserOptions `json:"options"`
SecretRef *v1beta1.SecretReference `json:"secretRef"`
CertificateRequests []*CertificateRequest `json:"certificateRequests,omitempty"`
InitialPermissions string `json:"initialPermissions"`
}

type KafkaUserOptions struct {
OverrideExistingUser bool `json:"overrideExistingUser,omitempty"`
SASLSCRAMMechanism string `json:"saslScramMechanism"`
SecretRef *v1beta1.SecretReference `json:"secretRef"`
CertificateRequests []*CertificateRequest `json:"certificateRequests,omitempty"`
InitialPermissions string `json:"initialPermissions"`
OverrideExistingUser bool `json:"overrideExistingUser,omitempty"`
SASLSCRAMMechanism string `json:"saslScramMechanism"`
AuthMechanism string `json:"authMechanism"`
}

// KafkaUserStatus defines the observed state of KafkaUser
Expand Down Expand Up @@ -121,20 +118,14 @@ func init() {

func (ks *KafkaUserSpec) ToInstAPI(clusterID string, username string, password string) *models.KafkaUser {
return &models.KafkaUser{
ClusterID: clusterID,
InitialPermissions: ks.InitialPermissions,
Options: ks.Options.ToInstAPI(),
Username: username,
Password: password,
}
}

func (ko *KafkaUserOptions) ToInstAPI() *models.KafkaUserOptions {
return &models.KafkaUserOptions{
OverrideExistingUser: ko.OverrideExistingUser,
SASLSCRAMMechanism: ko.SASLSCRAMMechanism,
Password: password,
OverrideExistingUser: ks.OverrideExistingUser,
SASLSCRAMMechanism: ks.SASLSCRAMMechanism,
AuthMechanism: ks.AuthMechanism,
ClusterID: clusterID,
InitialPermissions: ks.InitialPermissions,
Username: username,
}

}

func (cr *CertificateRequest) ToInstAPI(username string) *models.CertificateRequest {
Expand Down
20 changes: 0 additions & 20 deletions apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 8 additions & 10 deletions config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ spec:
spec:
description: KafkaUserSpec defines the desired state of KafkaUser
properties:
authMechanism:
type: string
certificateRequests:
items:
properties:
Expand Down Expand Up @@ -68,15 +70,10 @@ spec:
type: array
initialPermissions:
type: string
options:
properties:
overrideExistingUser:
type: boolean
saslScramMechanism:
type: string
required:
- saslScramMechanism
type: object
overrideExistingUser:
type: boolean
saslScramMechanism:
type: string
secretRef:
properties:
name:
Expand All @@ -88,8 +85,9 @@ spec:
- namespace
type: object
required:
- authMechanism
- initialPermissions
- options
- saslScramMechanism
- secretRef
type: object
status:
Expand Down
8 changes: 4 additions & 4 deletions config/samples/kafkamanagement_v1beta1_kafkauser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: secret-test
data:
username: "U2FuY2gtdHdvCg=="
password: "Qm9oZGFuXyF1YmVyX3VudGVyX3ZhZmVsb2sxNDg4X2lnb3JfdG9saWsxNDg4Cg=="
password: "Qm9oZGFuX3ViZXJfdW50ZXJfaWdvcl90b2xpazEK"
---
apiVersion: kafkamanagement.instaclustr.com/v1beta1
kind: KafkaUser
Expand Down Expand Up @@ -34,6 +34,6 @@ spec:
# validPeriod: 6
# autoRenew: false
initialPermissions: "standard"
options:
overrideExistingUser: true
saslScramMechanism: "SCRAM-SHA-256"
overrideExistingUser: true
saslScramMechanism: "SCRAM-SHA-256"
authMechanism: "SASL"
12 changes: 0 additions & 12 deletions controllers/kafkamanagement/kafkauser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
l.Info(
"Creating Kafka user",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
)
iKafkaUser := user.Spec.ToInstAPI(clusterID, username, password)
_, err = r.API.CreateKafkaUser(instaclustr.KafkaUserEndpoint, iKafkaUser)
Expand Down Expand Up @@ -178,7 +177,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
l.Error(err, "Cannot patch Kafka user resource status",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
"kafka user metadata", user.ObjectMeta,
)
r.EventRecorder.Eventf(
Expand All @@ -193,7 +191,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
l.Info(
"Kafka user was created",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
)

continue
Expand All @@ -205,7 +202,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
l.Error(err, "cannot delete Kafka user",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
"kafka user metadata", user.ObjectMeta,
)
r.EventRecorder.Eventf(
Expand All @@ -228,7 +224,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
l.Error(err, "Cannot patch Kafka user resource status",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
"kafka user metadata", user.ObjectMeta,
)
r.EventRecorder.Eventf(
Expand All @@ -242,7 +237,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

l.Info("Kafka user has been deleted",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
)

r.EventRecorder.Eventf(user, models.Normal, models.Deleted,
Expand All @@ -258,7 +252,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
l.Error(err, "Cannot patch Kafka user resource status",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
"kafka user metadata", user.ObjectMeta,
)
r.EventRecorder.Eventf(
Expand All @@ -272,7 +265,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

l.Info("Kafka user has been detached",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
)
r.EventRecorder.Eventf(
user, models.Normal, models.Deleted,
Expand All @@ -289,7 +281,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
l.Error(err, "Cannot update Kafka user",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
)
r.EventRecorder.Eventf(
user, models.Warning, models.UpdateFailed,
Expand All @@ -305,7 +296,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
l.Error(err, "Cannot patch Kafka user resource metadata",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
"kafka user metadata", user.ObjectMeta,
)
r.EventRecorder.Eventf(
Expand All @@ -322,7 +312,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
l.Error(err, "Cannot patch Kafka user resource status",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
"kafka user metadata", user.ObjectMeta,
)
r.EventRecorder.Eventf(
Expand All @@ -336,7 +325,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

l.Info("Kafka user resource has been updated",
"initial permissions", user.Spec.InitialPermissions,
"kafka user options", user.Spec.Options,
)

continue
Expand Down
34 changes: 0 additions & 34 deletions pkg/instaclustr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,40 +887,6 @@ func (c *Client) UpdateKafkaTopic(url string, t *kafkamanagementv1beta1.Topic) e
return nil
}

func (c *Client) GetKafkaUserStatus(
kafkaUserID,
kafkaUserEndpoint string,
) (*kafkamanagementv1beta1.KafkaUserStatus, error) {
url := c.serverHostname + kafkaUserEndpoint + kafkaUserID

resp, err := c.DoRequest(url, http.MethodGet, nil)
if err != nil {
return nil, err
}

defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode == http.StatusNotFound {
return nil, NotFound
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body)
}

var kafkaUserStatus kafkamanagementv1beta1.KafkaUserStatus
err = json.Unmarshal(body, &kafkaUserStatus)
if err != nil {
return nil, err
}

return &kafkaUserStatus, nil
}

func (c *Client) CreateKafkaUser(
url string,
kafkaUser *models.KafkaUser,
Expand Down
2 changes: 1 addition & 1 deletion pkg/instaclustr/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
ClusterNetworkFirewallRuleEndpoint = "/cluster-management/v2/resources/network-firewall-rules/v2/"
AWSSecurityGroupFirewallRuleEndpoint = "/cluster-management/v2/resources/providers/aws/security-group-firewall-rules/v2/"
GCPPeeringEndpoint = "/cluster-management/v2/resources/providers/gcp/vpc-peers/v2/"
KafkaUserEndpoint = "/cluster-management/v2/resources/applications/kafka/users/v2/"
KafkaUserEndpoint = "/cluster-management/v2/resources/applications/kafka/users/v3/"
KafkauserCertificatesEndpoint = "/cluster-management/v2/resources/applications/kafka/user-certificates/v2/"
KafkaUserCertificatesRenewEndpoint = "/cluster-management/v2/operations/applications/kafka/user-certificates/renew/v2/"
KafkaACLEndpoint = "/cluster-management/v2/resources/applications/kafka/acls/v2/"
Expand Down
1 change: 0 additions & 1 deletion pkg/instaclustr/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type API interface {
GetFirewallRuleStatus(firewallRuleID string, firewallRuleEndpoint string) (*clusterresourcesv1beta1.FirewallRuleStatus, error)
CreateFirewallRule(url string, firewallRuleSpec any) (*clusterresourcesv1beta1.FirewallRuleStatus, error)
DeleteFirewallRule(firewallRuleID string, firewallRuleEndpoint string) error
GetKafkaUserStatus(kafkaUserID, kafkaUserEndpoint string) (*kafkamanagementv1beta1.KafkaUserStatus, error)
CreateKafkaUser(url string, kafkaUser *models.KafkaUser) (*kafkamanagementv1beta1.KafkaUserStatus, error)
UpdateKafkaUser(kafkaUserID string, kafkaUserSpec *models.KafkaUser) error
DeleteKafkaUser(kafkaUserID, kafkaUserEndpoint string) error
Expand Down
15 changes: 6 additions & 9 deletions pkg/models/kafka_user_apv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ limitations under the License.
package models

type KafkaUser struct {
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
Options *KafkaUserOptions `json:"options"`
ClusterID string `json:"clusterId"`
InitialPermissions string `json:"initialPermissions"`
}

type KafkaUserOptions struct {
Password string `json:"password,omitempty"`
OverrideExistingUser bool `json:"overrideExistingUser,omitempty"`
SASLSCRAMMechanism string `json:"saslScramMechanism"`
SASLSCRAMMechanism string `json:"saslScramMechanism,omitempty"`
AuthMechanism string `json:"authMechanism"`
ClusterID string `json:"clusterId"`
InitialPermissions string `json:"initialPermissions"`
Username string `json:"username"`
}

type CertificateRequest struct {
Expand Down

0 comments on commit 21e3445

Please sign in to comment.