diff --git a/apis/kafkamanagement/v1beta1/kafkauser_types.go b/apis/kafkamanagement/v1beta1/kafkauser_types.go index ea7979c17..18952439e 100644 --- a/apis/kafkamanagement/v1beta1/kafkauser_types.go +++ b/apis/kafkamanagement/v1beta1/kafkauser_types.go @@ -27,15 +27,12 @@ import ( // KafkaUserSpec defines the desired state of KafkaUser type KafkaUserSpec struct { - Options *KafkaUserOptions `json:"options"` - SecretRef *v1beta1.SecretReference `json:"secretRef"` - CertificateRequests []*CertificateRequest `json:"certificateRequests,omitempty"` - InitialPermissions string `json:"initialPermissions"` -} - -type KafkaUserOptions struct { - OverrideExistingUser bool `json:"overrideExistingUser,omitempty"` - SASLSCRAMMechanism string `json:"saslScramMechanism"` + SecretRef *v1beta1.SecretReference `json:"secretRef"` + CertificateRequests []*CertificateRequest `json:"certificateRequests,omitempty"` + InitialPermissions string `json:"initialPermissions"` + OverrideExistingUser bool `json:"overrideExistingUser,omitempty"` + SASLSCRAMMechanism string `json:"saslScramMechanism"` + AuthMechanism string `json:"authMechanism"` } // KafkaUserStatus defines the observed state of KafkaUser @@ -121,20 +118,14 @@ func init() { func (ks *KafkaUserSpec) ToInstAPI(clusterID string, username string, password string) *models.KafkaUser { return &models.KafkaUser{ - ClusterID: clusterID, - InitialPermissions: ks.InitialPermissions, - Options: ks.Options.ToInstAPI(), - Username: username, - Password: password, - } -} - -func (ko *KafkaUserOptions) ToInstAPI() *models.KafkaUserOptions { - return &models.KafkaUserOptions{ - OverrideExistingUser: ko.OverrideExistingUser, - SASLSCRAMMechanism: ko.SASLSCRAMMechanism, + Password: password, + OverrideExistingUser: ks.OverrideExistingUser, + SASLSCRAMMechanism: ks.SASLSCRAMMechanism, + AuthMechanism: ks.AuthMechanism, + ClusterID: clusterID, + InitialPermissions: ks.InitialPermissions, + Username: username, } - } func (cr *CertificateRequest) ToInstAPI(username string) *models.CertificateRequest { diff --git a/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go b/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go index e4ce1a987..b0731c9d1 100644 --- a/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go +++ b/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go @@ -254,29 +254,9 @@ func (in *KafkaUserList) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *KafkaUserOptions) DeepCopyInto(out *KafkaUserOptions) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaUserOptions. -func (in *KafkaUserOptions) DeepCopy() *KafkaUserOptions { - if in == nil { - return nil - } - out := new(KafkaUserOptions) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaUserSpec) DeepCopyInto(out *KafkaUserSpec) { *out = *in - if in.Options != nil { - in, out := &in.Options, &out.Options - *out = new(KafkaUserOptions) - **out = **in - } if in.SecretRef != nil { in, out := &in.SecretRef, &out.SecretRef *out = new(clusterresourcesv1beta1.SecretReference) diff --git a/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml b/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml index fac3c07b5..7cb6d462f 100644 --- a/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml +++ b/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml @@ -35,6 +35,8 @@ spec: spec: description: KafkaUserSpec defines the desired state of KafkaUser properties: + authMechanism: + type: string certificateRequests: items: properties: @@ -68,15 +70,10 @@ spec: type: array initialPermissions: type: string - options: - properties: - overrideExistingUser: - type: boolean - saslScramMechanism: - type: string - required: - - saslScramMechanism - type: object + overrideExistingUser: + type: boolean + saslScramMechanism: + type: string secretRef: properties: name: @@ -88,8 +85,9 @@ spec: - namespace type: object required: + - authMechanism - initialPermissions - - options + - saslScramMechanism - secretRef type: object status: diff --git a/config/samples/kafkamanagement_v1beta1_kafkauser.yaml b/config/samples/kafkamanagement_v1beta1_kafkauser.yaml index 8fc250079..74d632245 100644 --- a/config/samples/kafkamanagement_v1beta1_kafkauser.yaml +++ b/config/samples/kafkamanagement_v1beta1_kafkauser.yaml @@ -4,7 +4,7 @@ metadata: name: secret-test data: username: "U2FuY2gtdHdvCg==" - password: "Qm9oZGFuXyF1YmVyX3VudGVyX3ZhZmVsb2sxNDg4X2lnb3JfdG9saWsxNDg4Cg==" + password: "Qm9oZGFuX3ViZXJfdW50ZXJfaWdvcl90b2xpazEK" --- apiVersion: kafkamanagement.instaclustr.com/v1beta1 kind: KafkaUser @@ -34,6 +34,6 @@ spec: # validPeriod: 6 # autoRenew: false initialPermissions: "standard" - options: - overrideExistingUser: true - saslScramMechanism: "SCRAM-SHA-256" \ No newline at end of file + overrideExistingUser: true + saslScramMechanism: "SCRAM-SHA-256" + authMechanism: "SASL" diff --git a/controllers/kafkamanagement/kafkauser_controller.go b/controllers/kafkamanagement/kafkauser_controller.go index 923eb64d5..9659b7758 100644 --- a/controllers/kafkamanagement/kafkauser_controller.go +++ b/controllers/kafkamanagement/kafkauser_controller.go @@ -144,7 +144,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( l.Info( "Creating Kafka user", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, ) iKafkaUser := user.Spec.ToInstAPI(clusterID, username, password) _, err = r.API.CreateKafkaUser(instaclustr.KafkaUserEndpoint, iKafkaUser) @@ -178,7 +177,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err != nil { l.Error(err, "Cannot patch Kafka user resource status", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, "kafka user metadata", user.ObjectMeta, ) r.EventRecorder.Eventf( @@ -193,7 +191,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( l.Info( "Kafka user was created", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, ) continue @@ -205,7 +202,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err != nil { l.Error(err, "cannot delete Kafka user", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, "kafka user metadata", user.ObjectMeta, ) r.EventRecorder.Eventf( @@ -228,7 +224,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err != nil { l.Error(err, "Cannot patch Kafka user resource status", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, "kafka user metadata", user.ObjectMeta, ) r.EventRecorder.Eventf( @@ -242,7 +237,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( l.Info("Kafka user has been deleted", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, ) r.EventRecorder.Eventf(user, models.Normal, models.Deleted, @@ -258,7 +252,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err != nil { l.Error(err, "Cannot patch Kafka user resource status", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, "kafka user metadata", user.ObjectMeta, ) r.EventRecorder.Eventf( @@ -272,7 +265,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( l.Info("Kafka user has been detached", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, ) r.EventRecorder.Eventf( user, models.Normal, models.Deleted, @@ -289,7 +281,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err != nil { l.Error(err, "Cannot update Kafka user", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, ) r.EventRecorder.Eventf( user, models.Warning, models.UpdateFailed, @@ -305,7 +296,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err != nil { l.Error(err, "Cannot patch Kafka user resource metadata", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, "kafka user metadata", user.ObjectMeta, ) r.EventRecorder.Eventf( @@ -322,7 +312,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err != nil { l.Error(err, "Cannot patch Kafka user resource status", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, "kafka user metadata", user.ObjectMeta, ) r.EventRecorder.Eventf( @@ -336,7 +325,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( l.Info("Kafka user resource has been updated", "initial permissions", user.Spec.InitialPermissions, - "kafka user options", user.Spec.Options, ) continue diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index bfa9ac122..fbfd87979 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -887,40 +887,6 @@ func (c *Client) UpdateKafkaTopic(url string, t *kafkamanagementv1beta1.Topic) e return nil } -func (c *Client) GetKafkaUserStatus( - kafkaUserID, - kafkaUserEndpoint string, -) (*kafkamanagementv1beta1.KafkaUserStatus, error) { - url := c.serverHostname + kafkaUserEndpoint + kafkaUserID - - resp, err := c.DoRequest(url, http.MethodGet, nil) - if err != nil { - return nil, err - } - - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - if resp.StatusCode == http.StatusNotFound { - return nil, NotFound - } - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) - } - - var kafkaUserStatus kafkamanagementv1beta1.KafkaUserStatus - err = json.Unmarshal(body, &kafkaUserStatus) - if err != nil { - return nil, err - } - - return &kafkaUserStatus, nil -} - func (c *Client) CreateKafkaUser( url string, kafkaUser *models.KafkaUser, diff --git a/pkg/instaclustr/config.go b/pkg/instaclustr/config.go index f814da91e..8bd94f499 100644 --- a/pkg/instaclustr/config.go +++ b/pkg/instaclustr/config.go @@ -39,7 +39,7 @@ const ( ClusterNetworkFirewallRuleEndpoint = "/cluster-management/v2/resources/network-firewall-rules/v2/" AWSSecurityGroupFirewallRuleEndpoint = "/cluster-management/v2/resources/providers/aws/security-group-firewall-rules/v2/" GCPPeeringEndpoint = "/cluster-management/v2/resources/providers/gcp/vpc-peers/v2/" - KafkaUserEndpoint = "/cluster-management/v2/resources/applications/kafka/users/v2/" + KafkaUserEndpoint = "/cluster-management/v2/resources/applications/kafka/users/v3/" KafkauserCertificatesEndpoint = "/cluster-management/v2/resources/applications/kafka/user-certificates/v2/" KafkaUserCertificatesRenewEndpoint = "/cluster-management/v2/operations/applications/kafka/user-certificates/renew/v2/" KafkaACLEndpoint = "/cluster-management/v2/resources/applications/kafka/acls/v2/" diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index 8cf37aa3c..fca23f78d 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -41,7 +41,6 @@ type API interface { GetFirewallRuleStatus(firewallRuleID string, firewallRuleEndpoint string) (*clusterresourcesv1beta1.FirewallRuleStatus, error) CreateFirewallRule(url string, firewallRuleSpec any) (*clusterresourcesv1beta1.FirewallRuleStatus, error) DeleteFirewallRule(firewallRuleID string, firewallRuleEndpoint string) error - GetKafkaUserStatus(kafkaUserID, kafkaUserEndpoint string) (*kafkamanagementv1beta1.KafkaUserStatus, error) CreateKafkaUser(url string, kafkaUser *models.KafkaUser) (*kafkamanagementv1beta1.KafkaUserStatus, error) UpdateKafkaUser(kafkaUserID string, kafkaUserSpec *models.KafkaUser) error DeleteKafkaUser(kafkaUserID, kafkaUserEndpoint string) error diff --git a/pkg/models/kafka_user_apv2.go b/pkg/models/kafka_user_apv2.go index 97d556123..0aa688f7a 100644 --- a/pkg/models/kafka_user_apv2.go +++ b/pkg/models/kafka_user_apv2.go @@ -17,16 +17,13 @@ limitations under the License. package models type KafkaUser struct { - Username string `json:"username,omitempty"` - Password string `json:"password,omitempty"` - Options *KafkaUserOptions `json:"options"` - ClusterID string `json:"clusterId"` - InitialPermissions string `json:"initialPermissions"` -} - -type KafkaUserOptions struct { + Password string `json:"password,omitempty"` OverrideExistingUser bool `json:"overrideExistingUser,omitempty"` - SASLSCRAMMechanism string `json:"saslScramMechanism"` + SASLSCRAMMechanism string `json:"saslScramMechanism,omitempty"` + AuthMechanism string `json:"authMechanism"` + ClusterID string `json:"clusterId"` + InitialPermissions string `json:"initialPermissions"` + Username string `json:"username"` } type CertificateRequest struct {