From af21b12d8a7bbb56973c6ea92b2f97ff33e12ffe Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Wed, 11 Oct 2023 10:11:57 +0300 Subject: [PATCH] NodeReload controller was refactored --- .../v1beta1/maintenanceevents_types.go | 6 +- .../v1beta1/nodereload_types.go | 24 +- .../v1beta1/zz_generated.deepcopy.go | 39 +++- ...resources.instaclustr.com_nodereloads.yaml | 27 +++ .../clusters.instaclustr.com_cadences.yaml | 4 - .../clusters.instaclustr.com_cassandras.yaml | 4 - ...lusters.instaclustr.com_kafkaconnects.yaml | 4 - .../clusters.instaclustr.com_kafkas.yaml | 4 - ...clusters.instaclustr.com_opensearches.yaml | 4 - .../clusters.instaclustr.com_postgresqls.yaml | 4 - .../bases/clusters.instaclustr.com_redis.yaml | 4 - .../clusters.instaclustr.com_zookeepers.yaml | 4 - .../clusterresources/nodereload_controller.go | 217 +++++++++++------- .../nodereload_controller_test.go | 131 +++++++++++ controllers/clusterresources/suite_test.go | 11 + controllers/clusters/cassandra_controller.go | 28 +-- pkg/instaclustr/client.go | 4 + pkg/instaclustr/mock/client.go | 36 ++- pkg/models/operator.go | 5 +- 19 files changed, 401 insertions(+), 159 deletions(-) create mode 100644 controllers/clusterresources/nodereload_controller_test.go diff --git a/apis/clusterresources/v1beta1/maintenanceevents_types.go b/apis/clusterresources/v1beta1/maintenanceevents_types.go index 2c59bd541..819552baa 100644 --- a/apis/clusterresources/v1beta1/maintenanceevents_types.go +++ b/apis/clusterresources/v1beta1/maintenanceevents_types.go @@ -51,9 +51,9 @@ type MaintenanceEventStatus struct { } type ClusteredMaintenanceEventStatus struct { - InProgress []*MaintenanceEventStatus `json:"inProgress"` - Past []*MaintenanceEventStatus `json:"past"` - Upcoming []*MaintenanceEventStatus `json:"upcoming"` + InProgress []*MaintenanceEventStatus `json:"inProgress,omitempty"` + Past []*MaintenanceEventStatus `json:"past,omitempty"` + Upcoming []*MaintenanceEventStatus `json:"upcoming,omitempty"` } //+kubebuilder:object:root=true diff --git a/apis/clusterresources/v1beta1/nodereload_types.go b/apis/clusterresources/v1beta1/nodereload_types.go index fcbb47bcf..e5f2f491c 100644 --- a/apis/clusterresources/v1beta1/nodereload_types.go +++ b/apis/clusterresources/v1beta1/nodereload_types.go @@ -19,8 +19,6 @@ package v1beta1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/instaclustr/operator/pkg/models" ) // NodeReloadSpec defines the desired state of NodeReload @@ -30,8 +28,11 @@ type NodeReloadSpec struct { // NodeReloadStatus defines the observed state of NodeReload type NodeReloadStatus struct { - NodeInProgress Node `json:"nodeInProgress,omitempty"` + NodeInProgress *Node `json:"nodeInProgress,omitempty"` CurrentOperationStatus *Operation `json:"currentOperationStatus,omitempty"` + PendingNodes []*Node `json:"pendingNodes,omitempty"` + CompletedNodes []*Node `json:"completedNodes,omitempty"` + FailedNodes []*Node `json:"failedNodes,omitempty"` } type Node struct { @@ -75,20 +76,3 @@ func (nr *NodeReload) NewPatch() client.Patch { func init() { SchemeBuilder.Register(&NodeReload{}, &NodeReloadList{}) } - -func (nr *NodeReloadStatus) FromInstAPI(status *models.NodeReloadStatus) *NodeReloadStatus { - var nrStatus = &NodeReloadStatus{ - NodeInProgress: Node{ - ID: status.NodeID, - }, - CurrentOperationStatus: &Operation{ - OperationID: status.OperationID, - TimeCreated: status.TimeCreated, - TimeModified: status.TimeModified, - Status: status.Status, - Message: status.Message, - }, - } - - return nrStatus -} diff --git a/apis/clusterresources/v1beta1/zz_generated.deepcopy.go b/apis/clusterresources/v1beta1/zz_generated.deepcopy.go index db442acec..c9c23a015 100644 --- a/apis/clusterresources/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusterresources/v1beta1/zz_generated.deepcopy.go @@ -1281,12 +1281,49 @@ func (in *NodeReloadSpec) DeepCopy() *NodeReloadSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NodeReloadStatus) DeepCopyInto(out *NodeReloadStatus) { *out = *in - out.NodeInProgress = in.NodeInProgress + if in.NodeInProgress != nil { + in, out := &in.NodeInProgress, &out.NodeInProgress + *out = new(Node) + **out = **in + } if in.CurrentOperationStatus != nil { in, out := &in.CurrentOperationStatus, &out.CurrentOperationStatus *out = new(Operation) **out = **in } + if in.PendingNodes != nil { + in, out := &in.PendingNodes, &out.PendingNodes + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Node) + **out = **in + } + } + } + if in.CompletedNodes != nil { + in, out := &in.CompletedNodes, &out.CompletedNodes + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Node) + **out = **in + } + } + } + if in.FailedNodes != nil { + in, out := &in.FailedNodes, &out.FailedNodes + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Node) + **out = **in + } + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeReloadStatus. diff --git a/config/crd/bases/clusterresources.instaclustr.com_nodereloads.yaml b/config/crd/bases/clusterresources.instaclustr.com_nodereloads.yaml index 49e44db04..141679c0a 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_nodereloads.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_nodereloads.yaml @@ -50,6 +50,15 @@ spec: status: description: NodeReloadStatus defines the observed state of NodeReload properties: + completedNodes: + items: + properties: + nodeID: + type: string + required: + - nodeID + type: object + type: array currentOperationStatus: properties: message: @@ -68,6 +77,15 @@ spec: - timeCreated - timeModified type: object + failedNodes: + items: + properties: + nodeID: + type: string + required: + - nodeID + type: object + type: array nodeInProgress: properties: nodeID: @@ -75,6 +93,15 @@ spec: required: - nodeID type: object + pendingNodes: + items: + properties: + nodeID: + type: string + required: + - nodeID + type: object + type: array type: object type: object served: true diff --git a/config/crd/bases/clusters.instaclustr.com_cadences.yaml b/config/crd/bases/clusters.instaclustr.com_cadences.yaml index 430055b50..1fd0ba456 100644 --- a/config/crd/bases/clusters.instaclustr.com_cadences.yaml +++ b/config/crd/bases/clusters.instaclustr.com_cadences.yaml @@ -493,10 +493,6 @@ spec: - isFinalized type: object type: array - required: - - inProgress - - past - - upcoming type: object type: array options: diff --git a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml index a28675bc9..862c6bcee 100644 --- a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml +++ b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml @@ -415,10 +415,6 @@ spec: - isFinalized type: object type: array - required: - - inProgress - - past - - upcoming type: object type: array options: diff --git a/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml b/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml index fa695dab9..87209b6b0 100644 --- a/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml +++ b/config/crd/bases/clusters.instaclustr.com_kafkaconnects.yaml @@ -451,10 +451,6 @@ spec: - isFinalized type: object type: array - required: - - inProgress - - past - - upcoming type: object type: array options: diff --git a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml index fa81af863..a6e3de764 100644 --- a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml +++ b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml @@ -455,10 +455,6 @@ spec: - isFinalized type: object type: array - required: - - inProgress - - past - - upcoming type: object type: array options: diff --git a/config/crd/bases/clusters.instaclustr.com_opensearches.yaml b/config/crd/bases/clusters.instaclustr.com_opensearches.yaml index 628ea13f4..f5e0a111a 100644 --- a/config/crd/bases/clusters.instaclustr.com_opensearches.yaml +++ b/config/crd/bases/clusters.instaclustr.com_opensearches.yaml @@ -445,10 +445,6 @@ spec: - isFinalized type: object type: array - required: - - inProgress - - past - - upcoming type: object type: array options: diff --git a/config/crd/bases/clusters.instaclustr.com_postgresqls.yaml b/config/crd/bases/clusters.instaclustr.com_postgresqls.yaml index 86fbce8df..5ec93a122 100644 --- a/config/crd/bases/clusters.instaclustr.com_postgresqls.yaml +++ b/config/crd/bases/clusters.instaclustr.com_postgresqls.yaml @@ -421,10 +421,6 @@ spec: - isFinalized type: object type: array - required: - - inProgress - - past - - upcoming type: object type: array options: diff --git a/config/crd/bases/clusters.instaclustr.com_redis.yaml b/config/crd/bases/clusters.instaclustr.com_redis.yaml index f37da108c..af22bf018 100644 --- a/config/crd/bases/clusters.instaclustr.com_redis.yaml +++ b/config/crd/bases/clusters.instaclustr.com_redis.yaml @@ -409,10 +409,6 @@ spec: - isFinalized type: object type: array - required: - - inProgress - - past - - upcoming type: object type: array options: diff --git a/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml b/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml index 061a0bc49..412c538db 100644 --- a/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml +++ b/config/crd/bases/clusters.instaclustr.com_zookeepers.yaml @@ -328,10 +328,6 @@ spec: - isFinalized type: object type: array - required: - - inProgress - - past - - upcoming type: object type: array options: diff --git a/controllers/clusterresources/nodereload_controller.go b/controllers/clusterresources/nodereload_controller.go index 43865122d..a3fd56a53 100644 --- a/controllers/clusterresources/nodereload_controller.go +++ b/controllers/clusterresources/nodereload_controller.go @@ -18,6 +18,7 @@ package clusterresources import ( "context" + "errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -25,13 +26,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" ) // NodeReloadReconciler reconciles a NodeReload object @@ -42,6 +46,10 @@ type NodeReloadReconciler struct { EventRecorder record.EventRecorder } +const ( + nodeReloadOperationStatusCompleted = "COMPLETED" +) + //+kubebuilder:rbac:groups=clusterresources.instaclustr.com,resources=nodereloads,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=clusterresources.instaclustr.com,resources=nodereloads/status,verbs=get;update;patch //+kubebuilder:rbac:groups=clusterresources.instaclustr.com,resources=nodereloads/finalizers,verbs=update @@ -60,151 +68,188 @@ func (r *NodeReloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { if k8serrors.IsNotFound(err) { l.Error(err, "Node Reload resource is not found", "request", req) - return models.ExitReconcile, nil + return reconcile.Result{}, err } l.Error(err, "Unable to fetch Node Reload", "request", req) - return models.ReconcileRequeue, err + return reconcile.Result{}, err } - if len(nrs.Spec.Nodes) == 0 { - err = r.Client.Delete(ctx, nrs) + patch := nrs.NewPatch() + + if len(nrs.Status.PendingNodes)+len(nrs.Status.CompletedNodes)+len(nrs.Status.FailedNodes) == 0 { + nrs.Status.PendingNodes = nrs.Spec.Nodes + err := r.Status().Patch(ctx, nrs, patch) if err != nil { - l.Error(err, - "Cannot delete Node Reload resource from K8s cluster", - "Node Reload spec", nrs.Spec, + l.Error(err, "Failed to patch pending nodes to the resource") + r.EventRecorder.Eventf(nrs, models.Warning, models.PatchFailed, + "Failed to patch pending nodes to the resource. Reason: %w", err, ) - r.EventRecorder.Eventf( - nrs, models.Warning, models.DeletionFailed, - "Resource deletion is failed. Reason: %v", - err, - ) - return models.ReconcileRequeue, nil + + return reconcile.Result{}, err } + } + + if len(nrs.Status.PendingNodes) == 0 && nrs.Status.NodeInProgress == nil { r.EventRecorder.Eventf( - nrs, models.Normal, models.DeletionStarted, - "Resource is deleted.", + nrs, models.Normal, models.UpdatedEvent, + "The controller has finished working", ) l.Info( - "Nodes were reloaded, resource was deleted", - "Node Reload spec", nrs.Spec, + "The controller has finished working", + "completed nodes", nrs.Status.CompletedNodes, + "failed nodes", nrs.Status.FailedNodes, ) - return models.ExitReconcile, nil + + return reconcile.Result{}, nil } - patch := nrs.NewPatch() - if nrs.Status.NodeInProgress.ID == "" { - nodeInProgress := &v1beta1.Node{ - ID: nrs.Spec.Nodes[len(nrs.Spec.Nodes)-1].ID, - } - nrs.Status.NodeInProgress.ID = nodeInProgress.ID + if nrs.Status.NodeInProgress == nil { + nodeInProgress := nrs.Status.PendingNodes[0] - err = r.API.CreateNodeReload(nodeInProgress) + err := r.API.CreateNodeReload(nodeInProgress) if err != nil { - l.Error(err, - "Cannot start Node Reload process", - "nodeID", nodeInProgress.ID, - ) - r.EventRecorder.Eventf( - nrs, models.Warning, models.CreationFailed, - "Resource creation on the Instaclustr is failed. Reason: %v", - err, + if errors.Is(err, instaclustr.NotFound) { + return r.handleNodeNotFound(ctx, nodeInProgress, nrs) + } + + l.Error(err, "Failed to trigger node reload", "node ID", nodeInProgress.ID) + r.EventRecorder.Eventf(nrs, models.Warning, models.CreationFailed, + "Failed to trigger node reload. Reason: %w", err, ) - return models.ReconcileRequeue, nil + + return reconcile.Result{}, err } - r.EventRecorder.Eventf( - nrs, models.Normal, models.Created, - "Resource creation request is sent. Node ID: %s", - nrs.Status.NodeInProgress.ID, - ) + nrs.Status.NodeInProgress = nodeInProgress + if len(nrs.Status.PendingNodes) > 0 { + nrs.Status.PendingNodes = nrs.Status.PendingNodes[1:] + } err = r.Status().Patch(ctx, nrs, patch) if err != nil { - l.Error(err, - "Cannot patch Node Reload status", - "nodeID", nrs.Status.NodeInProgress, + l.Error(err, "Failed to patch node in progress", + "node ID", nodeInProgress.ID, ) - r.EventRecorder.Eventf( - nrs, models.Warning, models.PatchFailed, - "Resource status patch is failed. Reason: %v", - err, + r.EventRecorder.Eventf(nrs, models.Warning, models.PatchFailed, + "Failed to patch node in progress. Reason: %w", err, ) - return models.ReconcileRequeue, nil + + return reconcile.Result{}, err } } nodeReloadStatus, err := r.API.GetNodeReloadStatus(nrs.Status.NodeInProgress.ID) if err != nil { - l.Error(err, - "Cannot get Node Reload status", - "nodeID", nrs.Status.NodeInProgress, + if errors.Is(err, instaclustr.NotFound) { + return r.handleNodeNotFound(ctx, nrs.Status.NodeInProgress, nrs) + } + + l.Error(err, "Failed to fetch node reload status from Instaclustr", + "node ID", nrs.Status.NodeInProgress.ID, ) - r.EventRecorder.Eventf( - nrs, models.Warning, models.FetchFailed, - "Fetch resource from the Instaclustr API is failed. Reason: %v", - err, + r.EventRecorder.Eventf(nrs, models.Warning, models.FetchFailed, + "Failed to fetch node reload status from Instaclustr. Reason: %w", err, ) - return models.ReconcileRequeue, nil + + return reconcile.Result{}, err + } + + nrs.Status.CurrentOperationStatus = &v1beta1.Operation{ + OperationID: nodeReloadStatus.OperationID, + TimeCreated: nodeReloadStatus.TimeCreated, + TimeModified: nodeReloadStatus.TimeModified, + Status: nodeReloadStatus.Status, + Message: nodeReloadStatus.Message, } - nrs.Status = *nrs.Status.FromInstAPI(nodeReloadStatus) err = r.Status().Patch(ctx, nrs, patch) if err != nil { - l.Error(err, - "Cannot patch Node Reload status", - "nodeID", nrs.Status.NodeInProgress, + l.Error(err, "Failed to patch current operation status", + "node ID", nrs.Status.NodeInProgress.ID, + "currentOperationStatus", nodeReloadStatus, ) - r.EventRecorder.Eventf( - nrs, models.Warning, models.PatchFailed, - "Resource status patch is failed. Reason: %v", - err, + r.EventRecorder.Eventf(nrs, models.Warning, models.FetchFailed, + "Failed to patch current operation status. Reason: %w", err, ) - return models.ReconcileRequeue, nil + + return reconcile.Result{}, err } - if nrs.Status.CurrentOperationStatus.Status != "COMPLETED" { + if nodeReloadStatus.Status != nodeReloadOperationStatusCompleted { l.Info("Node Reload operation is not completed yet, please wait a few minutes", "nodeID", nrs.Status.NodeInProgress, "status", nrs.Status, ) + return models.ReconcileRequeue, nil } - nrs.Status.NodeInProgress.ID = "" + l.Info("The node has been successfully reloaded", + "Node ID", nrs.Status.NodeInProgress.ID, + ) + r.EventRecorder.Eventf(nrs, models.Normal, models.UpdatedEvent, + "Node %s has been successfully reloaded", nrs.Status.NodeInProgress.ID, + ) + + patch = nrs.NewPatch() + + nrs.Status.CompletedNodes = append(nrs.Status.CompletedNodes, nrs.Status.NodeInProgress) + nrs.Status.CurrentOperationStatus, nrs.Status.NodeInProgress = nil, nil + err = r.Status().Patch(ctx, nrs, patch) if err != nil { - l.Error(err, - "Cannot patch Node Reload status", - "nodeID", nrs.Status.NodeInProgress, - ) - r.EventRecorder.Eventf( - nrs, models.Warning, models.PatchFailed, - "Resource status patch is failed. Reason: %v", - err, + l.Error(err, "Failed to patch completed nodes") + r.EventRecorder.Eventf(nrs, models.Warning, models.PatchFailed, + "Failed to patch completed nodes. Reason: %w", err, ) - return models.ReconcileRequeue, nil + + return reconcile.Result{}, err + } + + return models.ImmediatelyRequeue, nil +} + +func (r *NodeReloadReconciler) handleNodeNotFound(ctx context.Context, node *v1beta1.Node, nrs *v1beta1.NodeReload) (reconcile.Result, error) { + l := log.FromContext(ctx) + + patch := nrs.NewPatch() + + if nrs.Status.NodeInProgress == nil && len(nrs.Status.PendingNodes) > 0 { + nrs.Status.PendingNodes = nrs.Status.PendingNodes[1:] } - nrs.Spec.Nodes = nrs.Spec.Nodes[:len(nrs.Spec.Nodes)-1] - err = r.Patch(ctx, nrs, patch) + nrs.Status.FailedNodes = append(nrs.Status.FailedNodes, node) + nrs.Status.CurrentOperationStatus, nrs.Status.NodeInProgress = nil, nil + + err := r.Status().Patch(ctx, nrs, patch) if err != nil { - l.Error(err, "Cannot patch Node Reload cluster", - "spec", nrs.Spec, - ) - r.EventRecorder.Eventf( - nrs, models.Warning, models.PatchFailed, - "Resource patch is failed. Reason: %v", - err, + l.Error(err, "Cannot patch failed node") + r.EventRecorder.Event(nrs, models.Warning, models.PatchFailed, + "Cannot patch failed node", ) - return models.ReconcileRequeue, nil + + return reconcile.Result{}, err } - return models.ExitReconcile, nil + l.Error(err, "Node is not found on Instaclustr", + "nodeID", node.ID, + ) + r.EventRecorder.Eventf(nrs, models.Warning, models.FetchFailed, + "Node %s is not found on Instaclustr", node.ID, + ) + + return models.ImmediatelyRequeue, nil } // SetupWithManager sets up the controller with the Manager. func (r *NodeReloadReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries( + ratelimiter.DefaultBaseDelay, + ratelimiter.DefaultMaxDelay, + ), + }). For(&v1beta1.NodeReload{}, builder.WithPredicates(predicate.Funcs{ UpdateFunc: func(event event.UpdateEvent) bool { return event.ObjectNew.GetGeneration() != event.ObjectOld.GetGeneration() diff --git a/controllers/clusterresources/nodereload_controller_test.go b/controllers/clusterresources/nodereload_controller_test.go new file mode 100644 index 000000000..827dcee9e --- /dev/null +++ b/controllers/clusterresources/nodereload_controller_test.go @@ -0,0 +1,131 @@ +package clusterresources + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/instaclustr/operator/apis/clusterresources/v1beta1" +) + +var _ = Describe("Successful creation of NodeReload resource", func() { + var ( + ctx = context.Background() + ) + + When("apply NodeReload manifest", func() { + It("should create the NodeReload resource and successfully reload all the PostgreSQL nodes", func() { + manifest := &v1beta1.NodeReload{ + TypeMeta: metav1.TypeMeta{ + Kind: "NodeReload", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "nodereload-resource-test", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1beta1.NodeReloadSpec{ + Nodes: []*v1beta1.Node{ + {ID: "mock-node-id-1"}, + {ID: "mock-node-id-2"}, + }, + }, + } + + Expect(k8sClient.Create(ctx, manifest)).Should(Succeed()) + + key := client.ObjectKeyFromObject(manifest) + nrs := &v1beta1.NodeReload{} + + By("sending API calls to Instaclustr to trigger PostgreSQL node reload") + + Eventually(func() ([]*v1beta1.Node, error) { + if err := k8sClient.Get(ctx, key, nrs); err != nil { + return nil, err + } + + return nrs.Status.CompletedNodes, nil + }, timeout, interval).Should(Equal(nrs.Spec.Nodes)) + }) + }) + + When("apply NodeReload manifest with unknown nodeID", func() { + It("should create the NodeReload resource and successfully reload only the first node", func() { + manifest := &v1beta1.NodeReload{ + TypeMeta: metav1.TypeMeta{ + Kind: "NodeReload", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "nodereload-resource-test-wrong-1", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1beta1.NodeReloadSpec{ + Nodes: []*v1beta1.Node{ + {ID: "mock-node-id-1"}, + {ID: "mock-node-id-2-wrong"}, + }, + }, + } + + Expect(k8sClient.Create(ctx, manifest)).Should(Succeed()) + + key := client.ObjectKeyFromObject(manifest) + nrs := &v1beta1.NodeReload{} + + type result struct { + CompletedNodes []*v1beta1.Node + FailedNodes []*v1beta1.Node + } + + Eventually(func() (*result, error) { + if err := k8sClient.Get(ctx, key, nrs); err != nil { + return nil, err + } + + return &result{ + CompletedNodes: nrs.Status.CompletedNodes, + FailedNodes: nrs.Status.FailedNodes, + }, nil + }, timeout, interval).Should(Equal(&result{ + CompletedNodes: manifest.Spec.Nodes[:1], + FailedNodes: manifest.Spec.Nodes[1:], + })) + }) + }) + + When("apply NodeReload manifest with only unknown nodeIDs", func() { + It("should create the NodeReload resource and fail to reload all the nodes", func() { + manifest := &v1beta1.NodeReload{ + TypeMeta: metav1.TypeMeta{ + Kind: "NodeReload", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "nodereload-resource-test-wrong-2", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1beta1.NodeReloadSpec{ + Nodes: []*v1beta1.Node{ + {ID: "mock-node-id-1-wrong"}, + {ID: "mock-node-id-2-wrong"}, + }, + }, + } + + Expect(k8sClient.Create(ctx, manifest)).Should(Succeed()) + + key := client.ObjectKeyFromObject(manifest) + nrs := &v1beta1.NodeReload{} + + Eventually(func() ([]*v1beta1.Node, error) { + if err := k8sClient.Get(ctx, key, nrs); err != nil { + return nil, err + } + + return nrs.Status.FailedNodes, nil + }, timeout, interval).Should(Equal(nrs.Spec.Nodes)) + }) + }) + +}) diff --git a/controllers/clusterresources/suite_test.go b/controllers/clusterresources/suite_test.go index 6ef5c30a3..da975aedb 100644 --- a/controllers/clusterresources/suite_test.go +++ b/controllers/clusterresources/suite_test.go @@ -20,6 +20,7 @@ import ( "context" "path/filepath" "testing" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -34,6 +35,8 @@ import ( "github.com/instaclustr/operator/apis/clusterresources/v1beta1" clusterresourcesv1beta1 "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr/mock" + "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" //+kubebuilder:scaffold:imports ) @@ -45,6 +48,9 @@ var ( ctx context.Context cancel context.CancelFunc MockInstAPI = mock.NewInstAPI() + + timeout = time.Second * 10 + interval = time.Millisecond * 20 ) func TestAPIs(t *testing.T) { @@ -63,6 +69,11 @@ var _ = BeforeSuite(func() { ErrorIfCRDPathMissing: true, } + ratelimiter.DefaultBaseDelay = interval + ratelimiter.DefaultMaxDelay = interval * 3 + + models.ReconcileRequeue.RequeueAfter = time.Millisecond * 20 + var err error // cfg is defined in this file globally. cfg, err = testEnv.Start() diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index 02de8700c..fb8ddd35e 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -229,13 +229,13 @@ func (r *CassandraReconciler) handleCreateCluster( l.Error(err, "Cannot start cluster status job", "cassandra cluster ID", cassandra.Status.ID) - r.EventRecorder.Eventf( - cassandra, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + r.EventRecorder.Eventf( + cassandra, models.Warning, models.CreationFailed, + "Cluster status check job is failed. Reason: %v", + err, + ) + return reconcile.Result{}, err + } r.EventRecorder.Eventf( cassandra, models.Normal, models.Created, @@ -248,13 +248,13 @@ func (r *CassandraReconciler) handleCreateCluster( "cluster ID", cassandra.Status.ID, ) - r.EventRecorder.Eventf( - cassandra, models.Warning, models.CreationFailed, - "Cluster backups check job is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } + r.EventRecorder.Eventf( + cassandra, models.Warning, models.CreationFailed, + "Cluster backups check job is failed. Reason: %v", + err, + ) + return reconcile.Result{}, err + } r.EventRecorder.Eventf( cassandra, models.Normal, models.Created, diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index fbfd87979..c901e2026 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -1442,6 +1442,10 @@ func (c *Client) CreateNodeReload(nr *clusterresourcesv1beta1.Node) error { return err } + if resp.StatusCode == http.StatusNotFound { + return NotFound + } + if resp.StatusCode != http.StatusAccepted { return fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) } diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index 92da8f4c6..8ab32d4f0 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -18,10 +18,12 @@ package mock import ( "net/http" + "time" clusterresourcesv1beta1 "github.com/instaclustr/operator/apis/clusterresources/v1beta1" clustersv1beta1 "github.com/instaclustr/operator/apis/clusters/v1beta1" kafkamanagementv1beta1 "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" + "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" ) @@ -174,11 +176,43 @@ func (c *mockClient) TriggerClusterBackup(clusterID, clusterKind string) error { } func (c *mockClient) CreateNodeReload(nr *clusterresourcesv1beta1.Node) error { + _, exists := nodes[nr.ID] + if !exists { + return instaclustr.NotFound + } + return nil } +var nodes = map[string]*models.NodeReloadStatus{ + "mock-node-id-1": nil, + "mock-node-id-2": nil, +} + func (c *mockClient) GetNodeReloadStatus(nodeID string) (*models.NodeReloadStatus, error) { - return nil, nil + op, exists := nodes[nodeID] + if !exists { + return nil, instaclustr.NotFound + } + + if op == nil { + op = &models.NodeReloadStatus{ + NodeID: nodeID, + OperationID: nodeID + "-operation", + TimeCreated: time.Now().String(), + TimeModified: time.Now().String(), + Status: "RUNNING", + } + + nodes[nodeID] = op + + _ = time.AfterFunc(time.Millisecond*10, func() { + op.TimeModified = time.Now().String() + op.Status = "COMPLETED" + }) + } + + return op, nil } func (c *mockClient) CreateKafkaACL(url string, kafkaACL *kafkamanagementv1beta1.KafkaACLSpec) (*kafkamanagementv1beta1.KafkaACLStatus, error) { diff --git a/pkg/models/operator.go b/pkg/models/operator.go index a5950fe77..2cec5f422 100644 --- a/pkg/models/operator.go +++ b/pkg/models/operator.go @@ -160,8 +160,9 @@ const ( const Requeue60 = time.Second * 60 var ( - ReconcileRequeue = reconcile.Result{RequeueAfter: Requeue60} - ExitReconcile = reconcile.Result{} + ReconcileRequeue = reconcile.Result{RequeueAfter: Requeue60} + ImmediatelyRequeue = reconcile.Result{RequeueAfter: 1} + ExitReconcile = reconcile.Result{} ) type Credentials struct {