From 0638ce1c88a29f06664ebbc3526480c76390faec Mon Sep 17 00:00:00 2001 From: Bychkov Date: Wed, 27 Sep 2023 17:01:30 +0300 Subject: [PATCH] add rate limiter --- Makefile | 8 ++ controllers/clusters/cassandra_controller.go | 107 ++++++++++--------- pkg/ratelimiter/rate_limiter.go | 69 ++++++++++++ 3 files changed, 133 insertions(+), 51 deletions(-) create mode 100644 pkg/ratelimiter/rate_limiter.go diff --git a/Makefile b/Makefile index b12ed5db3..f384d360f 100644 --- a/Makefile +++ b/Makefile @@ -188,3 +188,11 @@ cert-deploy: ## Deploy cert-manager .PHONY: cert-undeploy cert-undeploy: ## UnDeploy cert-manager kubectl delete -f https://github.com/cert-manager/cert-manager/releases/download/v1.10.0/cert-manager.yaml + +.PHONY dev-build: +dev-build: docker-build kind-load deploy ## builds docker-image, loads it to kind cluster and deploys operator + +.PHONY: kind-load +kind-load: ## loads given image to kind cluster + kind load docker-image ${IMG} + diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index 8de997e52..72a96a2c4 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -29,6 +29,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" @@ -40,6 +41,7 @@ import ( "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -87,11 +89,11 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( switch cassandra.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateCluster(ctx, l, cassandra), nil + return r.handleCreateCluster(ctx, l, cassandra) case models.UpdatingEvent: - return r.handleUpdateCluster(ctx, l, cassandra), nil + return r.handleUpdateCluster(ctx, l, cassandra) case models.DeletingEvent: - return r.handleDeleteCluster(ctx, l, cassandra), nil + return r.handleDeleteCluster(ctx, l, cassandra) case models.GenericEvent: l.Info("Event isn't handled", "cluster name", cassandra.Spec.Name, @@ -99,10 +101,10 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( "event", cassandra.Annotations[models.ResourceStateAnnotation]) return models.ExitReconcile, nil default: - l.Info("Unknown event isn't handled", + l.Info("Event isn't handled", "request", req, - "event", cassandra.Annotations[models.ResourceStateAnnotation], - ) + "event", cassandra.Annotations[models.ResourceStateAnnotation]) + return models.ExitReconcile, nil } } @@ -111,7 +113,7 @@ func (r *CassandraReconciler) handleCreateCluster( ctx context.Context, l logr.Logger, cassandra *v1beta1.Cassandra, -) reconcile.Result { +) (reconcile.Result, error) { l = l.WithName("Cassandra creation event") var err error patch := cassandra.NewPatch() @@ -135,7 +137,7 @@ func (r *CassandraReconciler) handleCreateCluster( err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -162,7 +164,7 @@ func (r *CassandraReconciler) handleCreateCluster( "Cluster creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Eventf( @@ -188,7 +190,7 @@ func (r *CassandraReconciler) handleCreateCluster( "Cluster resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } controllerutil.AddFinalizer(cassandra, models.DeletionFinalizer) @@ -208,7 +210,7 @@ func (r *CassandraReconciler) handleCreateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info( @@ -227,13 +229,13 @@ func (r *CassandraReconciler) handleCreateCluster( l.Error(err, "Cannot start cluster status job", "cassandra cluster ID", cassandra.Status.ID) - r.EventRecorder.Eventf( - cassandra, models.Warning, models.CreationFailed, - "Cluster status check job is failed. Reason: %v", - err, - ) - return models.ReconcileRequeue - } + r.EventRecorder.Eventf( + cassandra, models.Warning, models.CreationFailed, + "Cluster status check job is failed. Reason: %v", + err, + ) + return reconcile.Result{}, err + } r.EventRecorder.Eventf( cassandra, models.Normal, models.Created, @@ -246,13 +248,13 @@ func (r *CassandraReconciler) handleCreateCluster( "cluster ID", cassandra.Status.ID, ) - r.EventRecorder.Eventf( - cassandra, models.Warning, models.CreationFailed, - "Cluster backups check job is failed. Reason: %v", - err, - ) - return models.ReconcileRequeue - } + r.EventRecorder.Eventf( + cassandra, models.Warning, models.CreationFailed, + "Cluster backups check job is failed. Reason: %v", + err, + ) + return reconcile.Result{}, err + } r.EventRecorder.Eventf( cassandra, models.Normal, models.Created, @@ -265,7 +267,7 @@ func (r *CassandraReconciler) handleCreateCluster( l.Error(err, "Failed to start user creation job") r.EventRecorder.Eventf(cassandra, models.Warning, models.CreationFailed, "User creation job is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(cassandra, models.Normal, models.Created, @@ -273,14 +275,14 @@ func (r *CassandraReconciler) handleCreateCluster( } } - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *CassandraReconciler) handleUpdateCluster( ctx context.Context, l logr.Logger, cassandra *v1beta1.Cassandra, -) reconcile.Result { +) (reconcile.Result, error) { l = l.WithName("Cassandra update event") iData, err := r.API.GetCassandra(cassandra.Status.ID) @@ -295,7 +297,7 @@ func (r *CassandraReconciler) handleUpdateCluster( "Cluster fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } iCassandra, err := cassandra.FromInstAPI(iData) @@ -311,7 +313,7 @@ func (r *CassandraReconciler) handleUpdateCluster( "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } if cassandra.Annotations[models.ExternalChangesAnnotation] == models.True { @@ -331,7 +333,7 @@ func (r *CassandraReconciler) handleUpdateCluster( r.EventRecorder.Eventf(cassandra, models.Warning, models.UpdateFailed, "Cannot update cluster settings. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } err = r.API.UpdateClusterSettings(cassandra.Status.ID, settingsToInstAPI) @@ -342,7 +344,7 @@ func (r *CassandraReconciler) handleUpdateCluster( r.EventRecorder.Eventf(cassandra, models.Warning, models.UpdateFailed, "Cannot update cluster settings. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -385,10 +387,11 @@ func (r *CassandraReconciler) handleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } } - return models.ReconcileRequeue + + return reconcile.Result{}, err } } @@ -409,7 +412,7 @@ func (r *CassandraReconciler) handleUpdateCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info( @@ -419,10 +422,10 @@ func (r *CassandraReconciler) handleUpdateCluster( "data centres", cassandra.Spec.DataCentres, ) - return models.ExitReconcile + return models.ExitReconcile, nil } -func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1beta1.Cassandra, l logr.Logger) reconcile.Result { +func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1beta1.Cassandra, l logr.Logger) (reconcile.Result, error) { if !cassandra.Spec.IsEqual(iCassandra.Spec) { l.Info(msgSpecStillNoMatch, "specification of k8s resource", cassandra.Spec, @@ -432,12 +435,12 @@ func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1bet if err != nil { l.Error(err, "Cannot create specification difference message", "instaclustr data", iCassandra.Spec, "k8s resource spec", cassandra.Spec) - return models.ExitReconcile + return models.ExitReconcile, nil } r.EventRecorder.Eventf(cassandra, models.Warning, models.ExternalChanges, msgDiffSpecs) - return models.ExitReconcile + return models.ExitReconcile, nil } patch := cassandra.NewPatch() @@ -452,20 +455,20 @@ func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1bet r.EventRecorder.Eventf(cassandra, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("External changes have been reconciled", "resource ID", cassandra.Status.ID) r.EventRecorder.Event(cassandra, models.Normal, models.ExternalChanges, "External changes have been reconciled") - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *CassandraReconciler) handleDeleteCluster( ctx context.Context, l logr.Logger, cassandra *v1beta1.Cassandra, -) reconcile.Result { +) (reconcile.Result, error) { l = l.WithName("Cassandra deletion event") _, err := r.API.GetCassandra(cassandra.Status.ID) @@ -483,7 +486,7 @@ func (r *CassandraReconciler) handleDeleteCluster( "Cluster fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } patch := cassandra.NewPatch() @@ -507,7 +510,7 @@ func (r *CassandraReconciler) handleDeleteCluster( "Cluster deletion on the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } r.EventRecorder.Event(cassandra, models.Normal, models.DeletionStarted, @@ -524,7 +527,7 @@ func (r *CassandraReconciler) handleDeleteCluster( r.EventRecorder.Eventf(cassandra, models.Warning, models.PatchFailed, "Cluster resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", cassandra.Status.ID) @@ -532,7 +535,7 @@ func (r *CassandraReconciler) handleDeleteCluster( r.EventRecorder.Event(cassandra, models.Normal, models.DeletionStarted, "Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.") - return models.ExitReconcile + return models.ExitReconcile, nil } } @@ -552,7 +555,7 @@ func (r *CassandraReconciler) handleDeleteCluster( "Cluster backups deletion is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("Cluster backup resources were deleted", @@ -567,7 +570,7 @@ func (r *CassandraReconciler) handleDeleteCluster( for _, ref := range cassandra.Spec.UserRefs { err = r.handleUsersDetach(ctx, l, cassandra, ref) if err != nil { - return models.ReconcileRequeue + return reconcile.Result{}, err } } @@ -589,7 +592,7 @@ func (r *CassandraReconciler) handleDeleteCluster( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } err = exposeservice.Delete(r.Client, cassandra.Name, cassandra.Namespace) @@ -599,7 +602,7 @@ func (r *CassandraReconciler) handleDeleteCluster( "cluster name", cassandra.Spec.Name, ) - return models.ReconcileRequeue + return reconcile.Result{}, err } l.Info("Cluster has been deleted", @@ -613,7 +616,7 @@ func (r *CassandraReconciler) handleDeleteCluster( "Cluster resource is deleted", ) - return models.ExitReconcile + return models.ExitReconcile, nil } func (r *CassandraReconciler) handleUsersCreate( @@ -1249,6 +1252,8 @@ func (r *CassandraReconciler) handleExternalDelete(ctx context.Context, c *v1bet // SetupWithManager sets up the controller with the Manager. func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.Cassandra{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { if deleting := confirmDeletion(event.Object); deleting { diff --git a/pkg/ratelimiter/rate_limiter.go b/pkg/ratelimiter/rate_limiter.go new file mode 100644 index 000000000..84b656d34 --- /dev/null +++ b/pkg/ratelimiter/rate_limiter.go @@ -0,0 +1,69 @@ +package ratelimiter + +import ( + "math" + "sync" + "time" + + "sigs.k8s.io/controller-runtime/pkg/ratelimiter" +) + +var ( + DefaultBaseDelay = 1 * time.Minute + DefaultMaxDelay = 10 * time.Minute + defaultMaxTries = 3 +) + +type ItemExponentialFailureRateLimiterWithMaxTries struct { + // failuresLock mutex, which is necessary due to the fact that controller-runtime allows for concurrent reconciler invocations for a single controller + failuresLock sync.Mutex + failures map[interface{}]int + maxTries int + + baseDelay time.Duration + maxDelay time.Duration +} + +func NewItemExponentialFailureRateLimiterWithMaxTries(baseDelay time.Duration, maxDelay time.Duration) ratelimiter.RateLimiter { + return &ItemExponentialFailureRateLimiterWithMaxTries{ + failures: map[interface{}]int{}, + baseDelay: baseDelay, + maxDelay: maxDelay, + maxTries: defaultMaxTries, + } +} + +func (r *ItemExponentialFailureRateLimiterWithMaxTries) When(item interface{}) time.Duration { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(r.failures[item])) + durationBackoff := time.Duration(backoff) + + r.failures[item] = r.failures[item] + 1 + + if r.failures[item] > r.maxTries { + // reminder + return 60 * time.Minute + } + + if durationBackoff > r.maxDelay { + return r.maxDelay + } + + return durationBackoff +} + +func (r *ItemExponentialFailureRateLimiterWithMaxTries) NumRequeues(item interface{}) int { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + return r.failures[item] +} + +func (r *ItemExponentialFailureRateLimiterWithMaxTries) Forget(item interface{}) { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + delete(r.failures, item) +}