Skip to content

Commit

Permalink
add rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
ribaraka committed Sep 27, 2023
1 parent e765c81 commit de281c0
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 39 deletions.
83 changes: 44 additions & 39 deletions controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
"github.com/instaclustr/operator/pkg/scheduler"
)

Expand Down Expand Up @@ -87,22 +89,22 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

switch cassandra.Annotations[models.ResourceStateAnnotation] {
case models.CreatingEvent:
return r.handleCreateCluster(ctx, l, cassandra), nil
return r.handleCreateCluster(ctx, l, cassandra)
case models.UpdatingEvent:
return r.handleUpdateCluster(ctx, l, cassandra), nil
return r.handleUpdateCluster(ctx, l, cassandra)
case models.DeletingEvent:
return r.handleDeleteCluster(ctx, l, cassandra), nil
return r.handleDeleteCluster(ctx, l, cassandra)
case models.GenericEvent:
l.Info("Event isn't handled",
"cluster name", cassandra.Spec.Name,
"request", req,
"event", cassandra.Annotations[models.ResourceStateAnnotation])
return models.ExitReconcile, nil
default:
l.Info("Unknown event isn't handled",
l.Info("Event isn't handled",
"request", req,
"event", cassandra.Annotations[models.ResourceStateAnnotation],
)
"event", cassandra.Annotations[models.ResourceStateAnnotation])

return models.ExitReconcile, nil
}
}
Expand All @@ -111,7 +113,7 @@ func (r *CassandraReconciler) handleCreateCluster(
ctx context.Context,
l logr.Logger,
cassandra *v1beta1.Cassandra,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Cassandra creation event")
var err error
patch := cassandra.NewPatch()
Expand All @@ -135,7 +137,7 @@ func (r *CassandraReconciler) handleCreateCluster(
err,
)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -162,7 +164,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster creation on the Instaclustr is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -188,7 +190,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster resource status patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

controllerutil.AddFinalizer(cassandra, models.DeletionFinalizer)
Expand All @@ -208,7 +210,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(
Expand All @@ -231,7 +233,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster status check job is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -250,7 +252,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster backups check job is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -264,21 +266,21 @@ func (r *CassandraReconciler) handleCreateCluster(
l.Error(err, "Failed to start user creation job")
r.EventRecorder.Eventf(cassandra, models.Warning, models.CreationFailed,
"User creation job is failed. Reason: %v", err)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(cassandra, models.Normal, models.Created,
"Cluster user creation job is started")
}

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleUpdateCluster(
ctx context.Context,
l logr.Logger,
cassandra *v1beta1.Cassandra,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Cassandra update event")

iData, err := r.API.GetCassandra(cassandra.Status.ID)
Expand All @@ -293,7 +295,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

iCassandra, err := cassandra.FromInstAPI(iData)
Expand All @@ -309,7 +311,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

if cassandra.Annotations[models.ExternalChangesAnnotation] == models.True {
Expand All @@ -329,7 +331,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
r.EventRecorder.Eventf(cassandra, models.Warning, models.UpdateFailed,
"Cannot update cluster settings. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

err = r.API.UpdateClusterSettings(cassandra.Status.ID, settingsToInstAPI)
Expand All @@ -340,7 +342,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
r.EventRecorder.Eventf(cassandra, models.Warning, models.UpdateFailed,
"Cannot update cluster settings. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand Down Expand Up @@ -383,10 +385,11 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}
}
return models.ReconcileRequeue

return reconcile.Result{}, err
}
}

Expand All @@ -407,7 +410,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(
Expand All @@ -417,10 +420,10 @@ func (r *CassandraReconciler) handleUpdateCluster(
"data centres", cassandra.Spec.DataCentres,
)

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1beta1.Cassandra, l logr.Logger) reconcile.Result {
func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1beta1.Cassandra, l logr.Logger) (reconcile.Result, error) {
if !cassandra.Spec.IsEqual(iCassandra.Spec) {
l.Info(msgSpecStillNoMatch,
"specification of k8s resource", cassandra.Spec,
Expand All @@ -430,12 +433,12 @@ func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1bet
if err != nil {
l.Error(err, "Cannot create specification difference message",
"instaclustr data", iCassandra.Spec, "k8s resource spec", cassandra.Spec)
return models.ExitReconcile
return models.ExitReconcile, nil
}

r.EventRecorder.Eventf(cassandra, models.Warning, models.ExternalChanges, msgDiffSpecs)

return models.ExitReconcile
return models.ExitReconcile, nil
}

patch := cassandra.NewPatch()
Expand All @@ -450,20 +453,20 @@ func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1bet
r.EventRecorder.Eventf(cassandra, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("External changes have been reconciled", "resource ID", cassandra.Status.ID)
r.EventRecorder.Event(cassandra, models.Normal, models.ExternalChanges, "External changes have been reconciled")

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleDeleteCluster(
ctx context.Context,
l logr.Logger,
cassandra *v1beta1.Cassandra,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Cassandra deletion event")

_, err := r.API.GetCassandra(cassandra.Status.ID)
Expand All @@ -481,7 +484,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

patch := cassandra.NewPatch()
Expand All @@ -505,7 +508,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster deletion on the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(cassandra, models.Normal, models.DeletionStarted,
Expand All @@ -522,15 +525,15 @@ func (r *CassandraReconciler) handleDeleteCluster(
r.EventRecorder.Eventf(cassandra, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", cassandra.Status.ID)

r.EventRecorder.Event(cassandra, models.Normal, models.DeletionStarted,
"Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.")

return models.ExitReconcile
return models.ExitReconcile, nil
}
}

Expand All @@ -550,7 +553,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster backups deletion is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("Cluster backup resources were deleted",
Expand All @@ -565,7 +568,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
for _, ref := range cassandra.Spec.UserRefs {
err = r.handleUsersDetach(ctx, l, cassandra, ref)
if err != nil {
return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand All @@ -587,7 +590,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

err = exposeservice.Delete(r.Client, cassandra.Name, cassandra.Namespace)
Expand All @@ -597,7 +600,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"cluster name", cassandra.Spec.Name,
)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("Cluster has been deleted",
Expand All @@ -611,7 +614,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster resource is deleted",
)

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleUsersCreate(
Expand Down Expand Up @@ -1265,6 +1268,8 @@ func (r *CassandraReconciler) reconcileMaintenanceEvents(ctx context.Context, c
// SetupWithManager sets up the controller with the Manager.
func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}).
For(&v1beta1.Cassandra{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
if deleting := confirmDeletion(event.Object); deleting {
Expand Down
13 changes: 13 additions & 0 deletions pkg/ratelimiter/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ratelimiter

import "time"

type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}
Loading

0 comments on commit de281c0

Please sign in to comment.