diff --git a/apis/clusterresources/v1beta1/cassandrauser_types.go b/apis/clusterresources/v1beta1/cassandrauser_types.go index f68c18434..b3906302e 100644 --- a/apis/clusterresources/v1beta1/cassandrauser_types.go +++ b/apis/clusterresources/v1beta1/cassandrauser_types.go @@ -76,3 +76,11 @@ func (r *CassandraUser) ToInstAPI(username, password string) *models.InstaUser { func (r *CassandraUser) GetDeletionFinalizer() string { return models.DeletionFinalizer + "_" + r.Namespace + "_" + r.Name } + +func (r *CassandraUser) GetClusterEvents() map[string]string { + return r.Status.ClustersEvents +} + +func (r *CassandraUser) SetClusterEvents(events map[string]string) { + r.Status.ClustersEvents = events +} diff --git a/apis/clusters/v1beta1/cassandra_types.go b/apis/clusters/v1beta1/cassandra_types.go index 71fdf78dc..cd19c9374 100644 --- a/apis/clusters/v1beta1/cassandra_types.go +++ b/apis/clusters/v1beta1/cassandra_types.go @@ -62,14 +62,15 @@ type CassandraSpec struct { PasswordAndUserAuth bool `json:"passwordAndUserAuth,omitempty"` Spark []*Spark `json:"spark,omitempty"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` - UserRefs []*UserReference `json:"userRefs,omitempty"` + UserRefs References `json:"userRefs,omitempty"` //+kubebuilder:validate:MaxItems:=1 ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` } // CassandraStatus defines the observed state of Cassandra type CassandraStatus struct { - ClusterStatus `json:",inline"` + ClusterStatus `json:",inline"` + AvailableUsers References `json:"availableUsers,omitempty"` } type CassandraDataCentre struct { @@ -494,6 +495,30 @@ func (c *CassandraSpec) validateResizeSettings(nodeNumber int) error { return nil } +func (c *Cassandra) GetAvailableUsers() References { + return c.Status.AvailableUsers +} + +func (c *Cassandra) SetAvailableUsers(users References) { + c.Status.AvailableUsers = users +} + +func (c *Cassandra) GetUserRefs() References { + return c.Spec.UserRefs +} + +func (c *Cassandra) SetUserRefs(refs References) { + c.Spec.UserRefs = refs +} + +func (c *Cassandra) GetClusterID() string { + return c.Status.ID +} + +func (c *Cassandra) SetClusterID(id string) { + c.Status.ID = id +} + func init() { SchemeBuilder.Register(&Cassandra{}, &CassandraList{}) } diff --git a/apis/clusters/v1beta1/kafka_types.go b/apis/clusters/v1beta1/kafka_types.go index 2c6900c26..52acd5d51 100644 --- a/apis/clusters/v1beta1/kafka_types.go +++ b/apis/clusters/v1beta1/kafka_types.go @@ -88,7 +88,7 @@ type KafkaSpec struct { KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"` KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` - UserRefs []*UserReference `json:"userRefs,omitempty"` + UserRefs []*Reference `json:"userRefs,omitempty"` Kraft []*Kraft `json:"kraft,omitempty"` } diff --git a/apis/clusters/v1beta1/opensearch_types.go b/apis/clusters/v1beta1/opensearch_types.go index 959e60f9e..a0ca59d28 100644 --- a/apis/clusters/v1beta1/opensearch_types.go +++ b/apis/clusters/v1beta1/opensearch_types.go @@ -53,7 +53,7 @@ type OpenSearchSpec struct { IndexManagementPlugin bool `json:"indexManagementPlugin,omitempty"` AlertingPlugin bool `json:"alertingPlugin,omitempty"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` - UserRefs []*UserReference `json:"userRefs,omitempty"` + UserRefs []*Reference `json:"userRefs,omitempty"` //+kubuilder:validation:MaxItems:=1 ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` //+kubuilder:validation:MaxItems:=1 diff --git a/apis/clusters/v1beta1/postgresql_types.go b/apis/clusters/v1beta1/postgresql_types.go index 91924f5d7..bde017933 100644 --- a/apis/clusters/v1beta1/postgresql_types.go +++ b/apis/clusters/v1beta1/postgresql_types.go @@ -78,7 +78,7 @@ type PgSpec struct { DataCentres []*PgDataCentre `json:"dataCentres,omitempty"` ClusterConfigurations map[string]string `json:"clusterConfigurations,omitempty"` SynchronousModeStrict bool `json:"synchronousModeStrict,omitempty"` - UserRefs []*UserReference `json:"userRefs,omitempty"` + UserRefs []*Reference `json:"userRefs,omitempty"` //+kubebuilder:validate:MaxItems:=1 ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` } diff --git a/apis/clusters/v1beta1/redis_types.go b/apis/clusters/v1beta1/redis_types.go index bd384d6b2..0bc202f00 100644 --- a/apis/clusters/v1beta1/redis_types.go +++ b/apis/clusters/v1beta1/redis_types.go @@ -69,7 +69,7 @@ type RedisSpec struct { //+kubebuilder:validation:MaxItems:=2 DataCentres []*RedisDataCentre `json:"dataCentres,omitempty"` - UserRefs []*UserReference `json:"userRefs,omitempty"` + UserRefs []*Reference `json:"userRefs,omitempty"` //+kubebuilder:validation:MaxItems:=1 ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` } diff --git a/apis/clusters/v1beta1/structs.go b/apis/clusters/v1beta1/structs.go index 1ee503af5..f8444df55 100644 --- a/apis/clusters/v1beta1/structs.go +++ b/apis/clusters/v1beta1/structs.go @@ -20,6 +20,8 @@ import ( "encoding/json" "net" + "k8s.io/apimachinery/pkg/types" + clusterresource "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/models" ) @@ -711,7 +713,51 @@ func (cs *ClusterStatus) PrivateLinkStatusesEqual(iStatus *ClusterStatus) bool { return true } -type UserReference struct { - Namespace string `json:"namespace"` +type Reference struct { Name string `json:"name"` + Namespace string `json:"namespace"` +} + +func (r *Reference) AsNamespacedName() types.NamespacedName { + return types.NamespacedName{ + Name: r.Name, + Namespace: r.Namespace, + } +} + +type References []*Reference + +// Diff returns difference between two References. +// Added stores elements which are presented in new References, but aren't presented in old. +// Deleted stores elements which aren't presented in new References, but are presented in old. +func (old References) Diff(new References) (added, deleted References) { + // filtering deleted references + for _, oldRef := range old { + var exists bool + for _, newRef := range new { + if *oldRef == *newRef { + exists = true + } + } + + if !exists { + deleted = append(deleted, oldRef) + } + } + + // filtering added references + for _, newRef := range new { + var exists bool + for _, oldRef := range old { + if *newRef == *oldRef { + exists = true + } + } + + if !exists { + added = append(added, newRef) + } + } + + return added, deleted } diff --git a/apis/clusters/v1beta1/structs_test.go b/apis/clusters/v1beta1/structs_test.go new file mode 100644 index 000000000..3f1fd4ef6 --- /dev/null +++ b/apis/clusters/v1beta1/structs_test.go @@ -0,0 +1,218 @@ +package v1beta1_test + +import ( + "encoding/json" + "reflect" + "testing" + + "github.com/instaclustr/operator/apis/clusters/v1beta1" +) + +func TestUserReferencesDiff(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + old v1beta1.References + new v1beta1.References + expectedAdded v1beta1.References + expectedDeleted v1beta1.References + }{ + { + name: "nothing changed", + old: v1beta1.References{ + { + Name: "name1", + Namespace: "namespace1", + }, + }, + new: v1beta1.References{ + { + Name: "name1", + Namespace: "namespace1", + }, + }, + }, + { + name: "added new reference", + old: v1beta1.References{ + { + Name: "name1", + Namespace: "namespace1", + }, + }, + new: v1beta1.References{ + { + Name: "name1", + Namespace: "namespace1", + }, + { + Name: "name2", + Namespace: "namespace2", + }, + }, + expectedAdded: v1beta1.References{ + { + Name: "name2", + Namespace: "namespace2", + }, + }, + }, + { + name: "deleted old reference", + old: v1beta1.References{ + { + Name: "name1", + Namespace: "namespace1", + }, + { + Name: "name2", + Namespace: "namespace2", + }, + }, + new: v1beta1.References{ + { + Name: "name1", + Namespace: "namespace1", + }, + }, + expectedDeleted: v1beta1.References{ + { + Name: "name2", + Namespace: "namespace2", + }, + }, + }, + { + name: "both slices are nil", + old: nil, + new: nil, + }, + { + name: "deleting the first out of 3", + old: v1beta1.References{ + { + Name: "name1", + Namespace: "namespace1", + }, + { + Name: "name2", + Namespace: "namespace2", + }, + { + Name: "name3", + Namespace: "namespace3", + }, + }, + new: v1beta1.References{ + { + Name: "name2", + Namespace: "namespace2", + }, + { + Name: "name3", + Namespace: "namespace3", + }, + }, + expectedDeleted: v1beta1.References{ + { + Name: "name1", + Namespace: "namespace1", + }, + }, + }, + { + name: "deleting the first and adding a new one", + old: v1beta1.References{ + { + Name: "name1", + Namespace: "namespace1", + }, + { + Name: "name2", + Namespace: "namespace2", + }, + { + Name: "name3", + Namespace: "namespace3", + }, + }, + new: v1beta1.References{ + { + Name: "name2", + Namespace: "namespace2", + }, + { + Name: "name3", + Namespace: "namespace3", + }, + { + Name: "name4", + Namespace: "namespace4", + }, + }, + expectedDeleted: v1beta1.References{ + { + Name: "name1", + Namespace: "namespace1", + }, + }, + expectedAdded: v1beta1.References{ + { + Name: "name4", + Namespace: "namespace4", + }, + }, + }, + { + name: "deleting the whole references", + old: v1beta1.References{ + { + Name: "name2", + Namespace: "namespace2", + }, + { + Name: "name3", + Namespace: "namespace3", + }, + { + Name: "name4", + Namespace: "namespace4", + }, + }, + expectedDeleted: v1beta1.References{ + { + Name: "name2", + Namespace: "namespace2", + }, + { + Name: "name3", + Namespace: "namespace3", + }, + { + Name: "name4", + Namespace: "namespace4", + }, + }, + }, + } + + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + added, deleted := c.old.Diff(c.new) + + if !reflect.DeepEqual(added, c.expectedAdded) || !reflect.DeepEqual(deleted, c.expectedDeleted) { + t.Errorf("expected added %s, got %s; expected deleted %s, got %s", + toJson(c.expectedAdded), toJson(added), + toJson(c.expectedDeleted), toJson(deleted), + ) + } + }) + } +} + +func toJson(obj any) string { + b, _ := json.Marshal(obj) + return string(b) +} diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index 8346d5ca2..50328d2aa 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -482,11 +482,11 @@ func (in *CassandraSpec) DeepCopyInto(out *CassandraSpec) { } if in.UserRefs != nil { in, out := &in.UserRefs, &out.UserRefs - *out = make([]*UserReference, len(*in)) + *out = make(References, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(UserReference) + *out = new(Reference) **out = **in } } @@ -518,6 +518,17 @@ func (in *CassandraSpec) DeepCopy() *CassandraSpec { func (in *CassandraStatus) DeepCopyInto(out *CassandraStatus) { *out = *in in.ClusterStatus.DeepCopyInto(&out.ClusterStatus) + if in.AvailableUsers != nil { + in, out := &in.AvailableUsers, &out.AvailableUsers + *out = make(References, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Reference) + **out = **in + } + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CassandraStatus. @@ -1189,11 +1200,11 @@ func (in *KafkaSpec) DeepCopyInto(out *KafkaSpec) { } if in.UserRefs != nil { in, out := &in.UserRefs, &out.UserRefs - *out = make([]*UserReference, len(*in)) + *out = make([]*Reference, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(UserReference) + *out = new(Reference) **out = **in } } @@ -1535,11 +1546,11 @@ func (in *OpenSearchSpec) DeepCopyInto(out *OpenSearchSpec) { } if in.UserRefs != nil { in, out := &in.UserRefs, &out.UserRefs - *out = make([]*UserReference, len(*in)) + *out = make([]*Reference, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(UserReference) + *out = new(Reference) **out = **in } } @@ -1778,11 +1789,11 @@ func (in *PgSpec) DeepCopyInto(out *PgSpec) { } if in.UserRefs != nil { in, out := &in.UserRefs, &out.UserRefs - *out = make([]*UserReference, len(*in)) + *out = make([]*Reference, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(UserReference) + *out = new(Reference) **out = **in } } @@ -2079,11 +2090,11 @@ func (in *RedisSpec) DeepCopyInto(out *RedisSpec) { } if in.UserRefs != nil { in, out := &in.UserRefs, &out.UserRefs - *out = make([]*UserReference, len(*in)) + *out = make([]*Reference, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(UserReference) + *out = new(Reference) **out = **in } } @@ -2127,6 +2138,46 @@ func (in *RedisStatus) DeepCopy() *RedisStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Reference) DeepCopyInto(out *Reference) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Reference. +func (in *Reference) DeepCopy() *Reference { + if in == nil { + return nil + } + out := new(Reference) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in References) DeepCopyInto(out *References) { + { + in := &in + *out = make(References, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Reference) + **out = **in + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new References. +func (in References) DeepCopy() References { + if in == nil { + return nil + } + out := new(References) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReplaceOperation) DeepCopyInto(out *ReplaceOperation) { *out = *in @@ -2421,21 +2472,6 @@ func (in *TwoFactorDelete) DeepCopy() *TwoFactorDelete { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *UserReference) DeepCopyInto(out *UserReference) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UserReference. -func (in *UserReference) DeepCopy() *UserReference { - if in == nil { - return nil - } - out := new(UserReference) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Zookeeper) DeepCopyInto(out *Zookeeper) { *out = *in diff --git a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml index 862c6bcee..b92d77a37 100644 --- a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml +++ b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml @@ -224,6 +224,18 @@ spec: status: description: CassandraStatus defines the observed state of Cassandra properties: + availableUsers: + items: + properties: + name: + type: string + namespace: + type: string + required: + - name + - namespace + type: object + type: array cdcid: type: string currentClusterOperationStatus: diff --git a/config/samples/clusterresources_v1beta1_cassandrauser.yaml b/config/samples/clusterresources_v1beta1_cassandrauser.yaml index f12a6fd0a..cc800df17 100644 --- a/config/samples/clusterresources_v1beta1_cassandrauser.yaml +++ b/config/samples/clusterresources_v1beta1_cassandrauser.yaml @@ -15,3 +15,40 @@ spec: secretRef: name: "cassandra-user-secret" namespace: "default" +--- +apiVersion: v1 +kind: Secret +metadata: + name: cassandra-user-secret2 +data: + password: NDgxMzU5ODM1NzlmMDU0ZTlhY2I4ZjcxMTMzMzQ1MjM3ZQ== + username: b2xvbG8xCg== +--- + +apiVersion: clusterresources.instaclustr.com/v1beta1 +kind: CassandraUser +metadata: + name: cassandrauser-sample2 +spec: + secretRef: + name: "cassandra-user-secret2" + namespace: "default" +--- +apiVersion: v1 +kind: Secret +metadata: + name: cassandra-user-secret3 +data: + password: NDgxMzU5ODM1NzlmMDU0ZTlhY2I4ZjcxMTMzMzQ1MjM3ZQ== + username: b2xvbG8yCg== +--- + +apiVersion: clusterresources.instaclustr.com/v1beta1 +kind: CassandraUser +metadata: + name: cassandrauser-sample3 +spec: + secretRef: + name: "cassandra-user-secret3" + namespace: "default" +--- \ No newline at end of file diff --git a/config/samples/clusters_v1beta1_cassandra.yaml b/config/samples/clusters_v1beta1_cassandra.yaml index d1e9be089..b0e1cb1a0 100644 --- a/config/samples/clusters_v1beta1_cassandra.yaml +++ b/config/samples/clusters_v1beta1_cassandra.yaml @@ -42,6 +42,8 @@ spec: # name: cassandrauser-sample # - namespace: default # name: cassandrauser-sample2 +# - namespace: default +# name: cassandrauser-sample3 slaTier: "NON_PRODUCTION" # resizeSettings: # - notifySupportContacts: false diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index de2036fe4..c69128726 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -261,7 +261,7 @@ func (r *CassandraReconciler) handleCreateCluster( "Cluster backups check job is started", ) - if cassandra.Spec.UserRefs != nil { + if cassandra.Spec.UserRefs != nil && cassandra.Status.AvailableUsers == nil { err = r.startUsersCreationJob(cassandra) if err != nil { l.Error(err, "Failed to start user creation job") @@ -385,6 +385,15 @@ func (r *CassandraReconciler) handleUpdateCluster( } } + err = handleUsersChanges(ctx, r.Client, r, cassandra) + if err != nil { + l.Error(err, "Failed to handle users changes") + r.EventRecorder.Eventf(cassandra, models.Warning, models.PatchFailed, + "Handling users changes is failed. Reason: %w", err, + ) + return reconcile.Result{}, err + } + cassandra.Annotations[models.ResourceStateAnnotation] = models.UpdatedEvent cassandra.Annotations[models.UpdateQueuedAnnotation] = "" err = r.Patch(ctx, cassandra, patch) @@ -557,11 +566,13 @@ func (r *CassandraReconciler) handleDeleteCluster( "Cluster backup resources are deleted", ) - for _, ref := range cassandra.Spec.UserRefs { - err = r.handleUsersDetach(ctx, l, cassandra, ref) - if err != nil { - return reconcile.Result{}, err - } + err = detachUsers(ctx, r.Client, r, cassandra) + if err != nil { + l.Error(err, "Failed to detach users from the cluster") + r.EventRecorder.Eventf(cassandra, models.Warning, models.DeletionFailed, + "Detaching users from the cluster is failed. Reason: %w", err, + ) + return reconcile.Result{}, err } controllerutil.RemoveFinalizer(cassandra, models.DeletionFinalizer) @@ -609,227 +620,6 @@ func (r *CassandraReconciler) handleDeleteCluster( return models.ExitReconcile, nil } -func (r *CassandraReconciler) handleUsersCreate( - ctx context.Context, - l logr.Logger, - c *v1beta1.Cassandra, - uRef *v1beta1.UserReference, -) error { - req := types.NamespacedName{ - Namespace: uRef.Namespace, - Name: uRef.Name, - } - - u := &clusterresourcesv1beta1.CassandraUser{} - err := r.Get(ctx, req, u) - if err != nil { - if k8serrors.IsNotFound(err) { - l.Error(err, "Cannot create a Cassandra user. The resource is not found", "request", req) - r.EventRecorder.Eventf(c, models.Warning, models.NotFound, - "User is not found, create a new one Cassandra User or provide correct userRef."+ - "Current provided reference: %v", uRef) - return err - } - - l.Error(err, "Cannot get Cassandra user", "user", u.Spec) - r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, - "Cannot get Cassandra user. user reference: %v", uRef) - return err - } - - if _, exist := u.Status.ClustersEvents[c.Status.ID]; exist { - l.Info("User is already existing on the cluster", - "user reference", uRef) - r.EventRecorder.Eventf(c, models.Normal, models.CreationFailed, - "User is already existing on the cluster. User reference: %v", uRef) - - return nil - } - - patch := u.NewPatch() - - if u.Status.ClustersEvents == nil { - u.Status.ClustersEvents = make(map[string]string) - } - - u.Status.ClustersEvents[c.Status.ID] = models.CreatingEvent - - err = r.Status().Patch(ctx, u, patch) - if err != nil { - l.Error(err, "Cannot patch the Cassandra User status with the CreatingEvent", - "cluster name", c.Spec.Name, "cluster ID", c.Status.ID) - r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, - "Cannot add Cassandra User to the cluster. Reason: %v", err) - return err - } - - l.Info("User has been added to the queue for creation", "username", u.Name) - - return nil -} - -func (r *CassandraReconciler) handleUsersDelete( - ctx context.Context, - l logr.Logger, - c *v1beta1.Cassandra, - uRef *v1beta1.UserReference, -) error { - req := types.NamespacedName{ - Namespace: uRef.Namespace, - Name: uRef.Name, - } - - u := &clusterresourcesv1beta1.CassandraUser{} - err := r.Get(ctx, req, u) - if err != nil { - if k8serrors.IsNotFound(err) { - l.Error(err, "Cannot delete a Cassandra user, the user is not found", "request", req) - r.EventRecorder.Eventf(c, models.Warning, models.NotFound, - "Cannot delete a Cassandra user, the user %v is not found", req) - return nil - } - - l.Error(err, "Cannot get Cassandra user", "user", req) - r.EventRecorder.Eventf(c, models.Warning, models.DeletionFailed, - "Cannot get Cassandra user. user reference: %v", req) - return err - } - - if _, exist := u.Status.ClustersEvents[c.Status.ID]; !exist { - l.Info("User is not existing on the cluster", - "user reference", uRef) - r.EventRecorder.Eventf(c, models.Normal, models.DeletionFailed, - "User is not existing on the cluster. User reference: %v", req) - - return nil - } - - patch := u.NewPatch() - - u.Status.ClustersEvents[c.Status.ID] = models.DeletingEvent - - err = r.Status().Patch(ctx, u, patch) - if err != nil { - l.Error(err, "Cannot patch the Cassandra User status with the DeletingEvent", - "cluster name", c.Spec.Name, "cluster ID", c.Status.ID) - r.EventRecorder.Eventf(c, models.Warning, models.DeletionFailed, - "Cannot patch the Cassandra User status with the DeletingEvent. Reason: %v", err) - return err - } - - l.Info("User has been added to the queue for deletion", - "User resource", u.Namespace+"/"+u.Name, - "Cassandra resource", c.Namespace+"/"+c.Name) - - return nil -} - -func (r *CassandraReconciler) handleUsersDetach( - ctx context.Context, - l logr.Logger, - c *v1beta1.Cassandra, - uRef *v1beta1.UserReference, -) error { - req := types.NamespacedName{ - Namespace: uRef.Namespace, - Name: uRef.Name, - } - - u := &clusterresourcesv1beta1.CassandraUser{} - err := r.Get(ctx, req, u) - if err != nil { - if k8serrors.IsNotFound(err) { - l.Error(err, "Cannot detach a Cassandra user, the user is not found", "request", req) - r.EventRecorder.Eventf(c, models.Warning, models.NotFound, - "Cannot detach a Cassandra user, the user %v is not found", req) - return nil - } - - l.Error(err, "Cannot get Cassandra user", "user", req) - r.EventRecorder.Eventf(c, models.Warning, models.DeletionFailed, - "Cannot get Cassandra user. user reference: %v", req) - return err - } - - if _, exist := u.Status.ClustersEvents[c.Status.ID]; !exist { - l.Info("User is not existing in the cluster", "user reference", uRef) - r.EventRecorder.Eventf(c, models.Normal, models.DeletionFailed, - "User is not existing in the cluster. User reference: %v", uRef) - return nil - } - - patch := u.NewPatch() - u.Status.ClustersEvents[c.Status.ID] = models.ClusterDeletingEvent - err = r.Status().Patch(ctx, u, patch) - if err != nil { - l.Error(err, "Cannot patch the Cassandra user status with the ClusterDeletingEvent", - "cluster name", c.Spec.Name, "cluster ID", c.Status.ID) - r.EventRecorder.Eventf(c, models.Warning, models.DeletionFailed, - "Cannot patch the Cassandra user status with the ClusterDeletingEvent. Reason: %v", err) - return err - } - - l.Info("User has been added to the queue for detaching", "username", u.Name) - - return nil -} - -func (r *CassandraReconciler) handleUserEvent( - newObj *v1beta1.Cassandra, - oldUsers []*v1beta1.UserReference, -) { - ctx := context.TODO() - l := log.FromContext(ctx) - - for _, newUser := range newObj.Spec.UserRefs { - var exist bool - - for _, oldUser := range oldUsers { - - if *newUser == *oldUser { - exist = true - break - } - } - - if exist { - continue - } - - err := r.handleUsersCreate(ctx, l, newObj, newUser) - if err != nil { - l.Error(err, "Cannot create Cassandra user in predicate", "user", newUser) - r.EventRecorder.Eventf(newObj, models.Warning, models.CreatingEvent, - "Cannot create user. Reason: %v", err) - } - - oldUsers = append(oldUsers, newUser) - } - - for _, oldUser := range oldUsers { - var exist bool - - for _, newUser := range newObj.Spec.UserRefs { - - if *oldUser == *newUser { - exist = true - break - } - } - - if exist { - continue - } - - err := r.handleUsersDelete(ctx, l, newObj, oldUser) - if err != nil { - l.Error(err, "Cannot delete Cassandra user", "user", oldUser) - r.EventRecorder.Eventf(newObj, models.Warning, models.CreatingEvent, - "Cannot delete user from cluster. Reason: %v", err) - } - } -} - func (r *CassandraReconciler) startClusterStatusJob(cassandraCluster *v1beta1.Cassandra) error { job := r.newWatchStatusJob(cassandraCluster) @@ -1135,14 +925,12 @@ func (r *CassandraReconciler) newUsersCreationJob(c *v1beta1.Cassandra) schedule return nil } - for _, ref := range c.Spec.UserRefs { - err = r.handleUsersCreate(ctx, l, c, ref) - if err != nil { - l.Error(err, "Failed to create a user for the cluster", "user ref", ref) - r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, - "Failed to create a user for the cluster. Reason: %v", err) - return err - } + err = handleUsersChanges(ctx, r.Client, r, c) + if err != nil { + l.Error(err, "Failed to create users for the cluster") + r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, + "Failed to create users for the cluster. Reason: %v", err) + return err } l.Info("User creation job successfully finished", "resource name", c.Name) @@ -1245,6 +1033,11 @@ func (r *CassandraReconciler) handleExternalDelete(ctx context.Context, c *v1bet return nil } +// NewUserResource implements userResourceFactory interface +func (r *CassandraReconciler) NewUserResource() userObject { + return &clusterresourcesv1beta1.CassandraUser{} +} + // SetupWithManager sets up the controller with the Manager. func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). @@ -1278,10 +1071,6 @@ func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error { return false } - oldObj := event.ObjectOld.(*v1beta1.Cassandra) - - r.handleUserEvent(newObj, oldObj.Spec.UserRefs) - newObj.Annotations[models.ResourceStateAnnotation] = models.UpdatingEvent return true }, diff --git a/controllers/clusters/helpers.go b/controllers/clusters/helpers.go index 01353baff..6ee2bb09f 100644 --- a/controllers/clusters/helpers.go +++ b/controllers/clusters/helpers.go @@ -216,3 +216,9 @@ func deleteDefaultUserSecret( return client.Delete(ctx, secret) } + +// Object is a general representation of any object the operator works with +type Object interface { + client.Object + NewPatch() client.Patch +} diff --git a/controllers/clusters/kafka_controller.go b/controllers/clusters/kafka_controller.go index fa58db605..d1db4ac44 100644 --- a/controllers/clusters/kafka_controller.go +++ b/controllers/clusters/kafka_controller.go @@ -317,7 +317,7 @@ func (r *KafkaReconciler) handleCreateUser( ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger, - userRef *v1beta1.UserReference, + userRef *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: userRef.Namespace, @@ -377,7 +377,7 @@ func (r *KafkaReconciler) handleDeleteUser( ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger, - userRef *v1beta1.UserReference, + userRef *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: userRef.Namespace, @@ -433,7 +433,7 @@ func (r *KafkaReconciler) detachUser( ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger, - userRef *v1beta1.UserReference) error { + userRef *v1beta1.Reference) error { req := types.NamespacedName{ Namespace: userRef.Namespace, Name: userRef.Name, @@ -482,7 +482,7 @@ func (r *KafkaReconciler) detachUser( func (r *KafkaReconciler) handleUserEvent( newObj *v1beta1.Kafka, - oldUsers []*v1beta1.UserReference, + oldUsers []*v1beta1.Reference, ) { ctx := context.TODO() l := log.FromContext(ctx) diff --git a/controllers/clusters/opensearch_controller.go b/controllers/clusters/opensearch_controller.go index 8d10ea65c..b19a53b27 100644 --- a/controllers/clusters/opensearch_controller.go +++ b/controllers/clusters/opensearch_controller.go @@ -952,7 +952,7 @@ func (r *OpenSearchReconciler) deleteUser( ctx context.Context, l logr.Logger, c *v1beta1.OpenSearch, - uRef *v1beta1.UserReference, + uRef *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: uRef.Namespace, @@ -1005,7 +1005,7 @@ func (r *OpenSearchReconciler) createUser( ctx context.Context, logger logr.Logger, c *v1beta1.OpenSearch, - uRef *v1beta1.UserReference, + uRef *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: uRef.Namespace, @@ -1064,7 +1064,7 @@ func (r *OpenSearchReconciler) detachUserResource( ctx context.Context, l logr.Logger, c *v1beta1.OpenSearch, - uRef *v1beta1.UserReference, + uRef *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: uRef.Namespace, @@ -1110,7 +1110,7 @@ func (r *OpenSearchReconciler) detachUserResource( func (r *OpenSearchReconciler) handleUserEvent( newObj *v1beta1.OpenSearch, - oldUsers []*v1beta1.UserReference, + oldUsers []*v1beta1.Reference, ) { ctx := context.TODO() l := log.FromContext(ctx) diff --git a/controllers/clusters/postgresql_controller.go b/controllers/clusters/postgresql_controller.go index ec7bac8f2..046f3f383 100644 --- a/controllers/clusters/postgresql_controller.go +++ b/controllers/clusters/postgresql_controller.go @@ -498,7 +498,7 @@ func (r *PostgreSQLReconciler) createUser( ctx context.Context, l logr.Logger, c *v1beta1.PostgreSQL, - uRef *v1beta1.UserReference, + uRef *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: uRef.Namespace, @@ -577,7 +577,7 @@ func (r *PostgreSQLReconciler) handleUsersDelete( ctx context.Context, l logr.Logger, pg *v1beta1.PostgreSQL, - uRef *v1beta1.UserReference, + uRef *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: uRef.Namespace, @@ -641,7 +641,7 @@ func (r *PostgreSQLReconciler) handleUsersDetach( ctx context.Context, l logr.Logger, c *v1beta1.PostgreSQL, - uRef *v1beta1.UserReference, + uRef *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: uRef.Namespace, @@ -698,7 +698,7 @@ func (r *PostgreSQLReconciler) handleUsersDetach( func (r *PostgreSQLReconciler) handleUserEvent( newObj *v1beta1.PostgreSQL, - oldUsers []*v1beta1.UserReference, + oldUsers []*v1beta1.Reference, ) { ctx := context.TODO() l := log.FromContext(ctx) diff --git a/controllers/clusters/redis_controller.go b/controllers/clusters/redis_controller.go index f246ac3cf..0be37a0f8 100644 --- a/controllers/clusters/redis_controller.go +++ b/controllers/clusters/redis_controller.go @@ -419,7 +419,7 @@ func (r *RedisReconciler) handleCreateUsers( ctx context.Context, redis *v1beta1.Redis, l logr.Logger, - userRefs *v1beta1.UserReference, + userRefs *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: userRefs.Namespace, @@ -673,7 +673,7 @@ func (r *RedisReconciler) detachUserResource( ctx context.Context, l logr.Logger, redis *v1beta1.Redis, - uRef *v1beta1.UserReference, + uRef *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: uRef.Namespace, @@ -719,7 +719,7 @@ func (r *RedisReconciler) detachUserResource( func (r *RedisReconciler) handleUserEvent( newObj *v1beta1.Redis, - oldUsers []*v1beta1.UserReference, + oldUsers []*v1beta1.Reference, ) { ctx := context.TODO() l := log.FromContext(ctx) @@ -777,7 +777,7 @@ func (r *RedisReconciler) handleUsersDelete( ctx context.Context, l logr.Logger, c *v1beta1.Redis, - uRef *v1beta1.UserReference, + uRef *v1beta1.Reference, ) error { req := types.NamespacedName{ Namespace: uRef.Namespace, diff --git a/controllers/clusters/user.go b/controllers/clusters/user.go new file mode 100644 index 000000000..d4eb3e7b2 --- /dev/null +++ b/controllers/clusters/user.go @@ -0,0 +1,126 @@ +package clusters + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/instaclustr/operator/apis/clusters/v1beta1" + "github.com/instaclustr/operator/pkg/models" +) + +type userObject interface { + Object + GetClusterEvents() map[string]string + SetClusterEvents(events map[string]string) +} + +type clusterObject interface { + Object + GetUserRefs() v1beta1.References + SetUserRefs(refs v1beta1.References) + GetAvailableUsers() v1beta1.References + SetAvailableUsers(users v1beta1.References) + GetClusterID() string + SetClusterID(id string) +} + +// userResourceFactory knows which user should be created. +// It helps client.Client to understand which resource should be got +type userResourceFactory interface { + NewUserResource() userObject +} + +// handleUsersChanges handles changes of user creation or deletion events +func handleUsersChanges( + ctx context.Context, + c client.Client, + userFactory userResourceFactory, + cluster clusterObject, +) error { + l := log.FromContext(ctx).V(1) + + l.Info("users::handleUsersChanges") + + oldUsers := cluster.GetAvailableUsers() + newUsers := cluster.GetUserRefs() + clusterID := cluster.GetClusterID() + + added, deleted := oldUsers.Diff(newUsers) + + for _, ref := range added { + err := setClusterEvent(ctx, c, ref, userFactory, clusterID, models.CreatingEvent) + if err != nil { + return err + } + } + + for _, ref := range deleted { + err := setClusterEvent(ctx, c, ref, userFactory, clusterID, models.DeletingEvent) + if err != nil { + return err + } + } + + if added != nil || deleted != nil { + patch := cluster.NewPatch() + cluster.SetAvailableUsers(newUsers) + err := c.Status().Patch(ctx, cluster, patch) + if err != nil { + return err + } + } + + return nil +} + +// setClusterEvent sets a given event to the resource is got by ref +func setClusterEvent( + ctx context.Context, + c client.Client, + ref *v1beta1.Reference, + userFactory userResourceFactory, + clusterID, + event string, +) error { + user := userFactory.NewUserResource() + err := c.Get(ctx, ref.AsNamespacedName(), user) + if err != nil { + return err + } + + patch := user.NewPatch() + + events := user.GetClusterEvents() + if events == nil { + events = map[string]string{} + user.SetClusterEvents(events) + } + + events[clusterID] = event + return c.Status().Patch(ctx, user, patch) +} + +// detachUsers detaches users from the given cluster +func detachUsers( + ctx context.Context, + c client.Client, + userFactory userResourceFactory, + cluster clusterObject, +) error { + l := log.FromContext(ctx).V(1) + + l.Info("users::detachUsers") + + clusterID := cluster.GetClusterID() + + for _, ref := range cluster.GetAvailableUsers() { + err := setClusterEvent(ctx, c, ref, userFactory, clusterID, models.ClusterDeletingEvent) + if err != nil { + return err + } + } + + return nil +} diff --git a/controllers/tests/cassandra_plus_users_test.go b/controllers/tests/cassandra_plus_users_test.go index 816d41974..4c18f10e8 100644 --- a/controllers/tests/cassandra_plus_users_test.go +++ b/controllers/tests/cassandra_plus_users_test.go @@ -142,11 +142,13 @@ var _ = Describe("Basic Cassandra User controller + Basic Cassandra cluster cont When("add the user to a Cassandra UserReference", func() { It("should create the user for the cluster", func() { - newUsers := []*v1beta1.UserReference{{ + newUsers := []*v1beta1.Reference{{ Namespace: userManifest1.Namespace, Name: userManifest1.Name, }} + doneCh := NewChannelWithTimeout(timeout) + Expect(k8sClient.Get(ctx, cassandraNamespacedName1, &cassandra1)).Should(Succeed()) patch := cassandra1.NewPatch() @@ -166,6 +168,7 @@ var _ = Describe("Basic Cassandra User controller + Basic Cassandra cluster cont return true }, timeout, interval).Should(BeTrue()) + <-doneCh }) }) @@ -175,9 +178,10 @@ var _ = Describe("Basic Cassandra User controller + Basic Cassandra cluster cont patch := cassandra1.NewPatch() // removing user - cassandra1.Spec.UserRefs = []*v1beta1.UserReference{} + cassandra1.Spec.UserRefs = []*v1beta1.Reference{} Expect(k8sClient.Patch(ctx, &cassandra1, patch)).Should(Succeed()) - By("going to Cassandra(cluster) controller predicate and put user entity to deletion state. " + + + By("going to Cassandra(cluster) controller and put user entity to deletion state. " + "Finally deletes the user for the corresponded cluster") Eventually(func() bool { if err := k8sClient.Get(ctx, userNamespacedName1, &user1); err != nil { @@ -221,7 +225,7 @@ var _ = Describe("Basic Cassandra User controller + Basic Cassandra cluster cont Expect(k8sClient.Create(ctx, &userManifest2)).Should(Succeed()) By("adding the batch of users to the cluster, Cassandra(cluster) controller predicate set them creation state") - newUsers := []*v1beta1.UserReference{ + newUsers := []*v1beta1.Reference{ { Namespace: userManifest1.Namespace, Name: userManifest1.Name, @@ -342,7 +346,7 @@ var _ = Describe("Basic Cassandra User controller + Basic Cassandra cluster cont Expect(k8sClient.Get(ctx, userNamespacedName2, &user2)).Should(Succeed()) By("creating another Cassandra cluster manifest with filled user ref, " + "we make sure the user creation job works properly and show us that the user is available for use") - newUsers := []*v1beta1.UserReference{{ + newUsers := []*v1beta1.Reference{{ Namespace: user2.Namespace, Name: user2.Name, }} diff --git a/controllers/tests/helpers.go b/controllers/tests/helpers.go index bf3046f5f..db69a608f 100644 --- a/controllers/tests/helpers.go +++ b/controllers/tests/helpers.go @@ -17,6 +17,6 @@ func NewChannelWithTimeout(timeout time.Duration) chan struct{} { return done } -func removeUserByIndex(s []*v1beta1.UserReference, index int) []*v1beta1.UserReference { +func removeUserByIndex(s []*v1beta1.Reference, index int) []*v1beta1.Reference { return append(s[:index], s[index+1:]...) } diff --git a/controllers/tests/opensearch_plus_users_test.go b/controllers/tests/opensearch_plus_users_test.go index 517d0f8c2..a6fd11740 100644 --- a/controllers/tests/opensearch_plus_users_test.go +++ b/controllers/tests/opensearch_plus_users_test.go @@ -139,9 +139,9 @@ var _ = Describe("Basic openSearch User controller + Basic openSearch cluster co }) }) - When("add the user to a openSearch UserReference", func() { + When("add the user to a openSearch Reference", func() { It("should create the user for the cluster", func() { - newUsers := []*v1beta1.UserReference{{ + newUsers := []*v1beta1.Reference{{ Namespace: userManifest1.Namespace, Name: userManifest1.Name, }} @@ -168,13 +168,13 @@ var _ = Describe("Basic openSearch User controller + Basic openSearch cluster co }) }) - When("remove the user from the openSearch UserReference", func() { + When("remove the user from the openSearch Reference", func() { It("should delete the user for the cluster", func() { Expect(k8sClient.Get(ctx, openSearchNamespacedName1, &openSearch1)).Should(Succeed()) patch := openSearch1.NewPatch() // removing user - openSearch1.Spec.UserRefs = []*v1beta1.UserReference{} + openSearch1.Spec.UserRefs = []*v1beta1.Reference{} Expect(k8sClient.Patch(ctx, &openSearch1, patch)).Should(Succeed()) By("going to openSearch(cluster) controller predicate and put user entity to deletion state. " + "Finally deletes the user for the corresponded cluster") @@ -220,7 +220,7 @@ var _ = Describe("Basic openSearch User controller + Basic openSearch cluster co Expect(k8sClient.Create(ctx, &userManifest2)).Should(Succeed()) By("adding the batch of users to the cluster, openSearch(cluster) controller predicate set them creation state") - newUsers := []*v1beta1.UserReference{ + newUsers := []*v1beta1.Reference{ { Namespace: userManifest1.Namespace, Name: userManifest1.Name, @@ -342,7 +342,7 @@ var _ = Describe("Basic openSearch User controller + Basic openSearch cluster co Expect(k8sClient.Get(ctx, userNamespacedName2, &user2)).Should(Succeed()) By("creating another openSearch cluster manifest with filled user ref, " + "we make sure the user creation job works properly and show us that the user is available for use") - newUsers := []*v1beta1.UserReference{{ + newUsers := []*v1beta1.Reference{{ Namespace: user2.Namespace, Name: user2.Name, }}