Skip to content

Commit

Permalink
add rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
ribaraka committed Oct 4, 2023
1 parent a2dd930 commit 0638ce1
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 51 deletions.
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,11 @@ cert-deploy: ## Deploy cert-manager
.PHONY: cert-undeploy
cert-undeploy: ## UnDeploy cert-manager
kubectl delete -f https://github.com/cert-manager/cert-manager/releases/download/v1.10.0/cert-manager.yaml

.PHONY dev-build:
dev-build: docker-build kind-load deploy ## builds docker-image, loads it to kind cluster and deploys operator

.PHONY: kind-load
kind-load: ## loads given image to kind cluster
kind load docker-image ${IMG}

107 changes: 56 additions & 51 deletions controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
"github.com/instaclustr/operator/pkg/scheduler"
)

Expand Down Expand Up @@ -87,22 +89,22 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

switch cassandra.Annotations[models.ResourceStateAnnotation] {
case models.CreatingEvent:
return r.handleCreateCluster(ctx, l, cassandra), nil
return r.handleCreateCluster(ctx, l, cassandra)
case models.UpdatingEvent:
return r.handleUpdateCluster(ctx, l, cassandra), nil
return r.handleUpdateCluster(ctx, l, cassandra)
case models.DeletingEvent:
return r.handleDeleteCluster(ctx, l, cassandra), nil
return r.handleDeleteCluster(ctx, l, cassandra)
case models.GenericEvent:
l.Info("Event isn't handled",
"cluster name", cassandra.Spec.Name,
"request", req,
"event", cassandra.Annotations[models.ResourceStateAnnotation])
return models.ExitReconcile, nil
default:
l.Info("Unknown event isn't handled",
l.Info("Event isn't handled",
"request", req,
"event", cassandra.Annotations[models.ResourceStateAnnotation],
)
"event", cassandra.Annotations[models.ResourceStateAnnotation])

return models.ExitReconcile, nil
}
}
Expand All @@ -111,7 +113,7 @@ func (r *CassandraReconciler) handleCreateCluster(
ctx context.Context,
l logr.Logger,
cassandra *v1beta1.Cassandra,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Cassandra creation event")
var err error
patch := cassandra.NewPatch()
Expand All @@ -135,7 +137,7 @@ func (r *CassandraReconciler) handleCreateCluster(
err,
)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -162,7 +164,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster creation on the Instaclustr is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -188,7 +190,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster resource status patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

controllerutil.AddFinalizer(cassandra, models.DeletionFinalizer)
Expand All @@ -208,7 +210,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(
Expand All @@ -227,13 +229,13 @@ func (r *CassandraReconciler) handleCreateCluster(
l.Error(err, "Cannot start cluster status job",
"cassandra cluster ID", cassandra.Status.ID)

r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
"Cluster status check job is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
}
r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
"Cluster status check job is failed. Reason: %v",
err,
)
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
cassandra, models.Normal, models.Created,
Expand All @@ -246,13 +248,13 @@ func (r *CassandraReconciler) handleCreateCluster(
"cluster ID", cassandra.Status.ID,
)

r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
"Cluster backups check job is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
}
r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
"Cluster backups check job is failed. Reason: %v",
err,
)
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
cassandra, models.Normal, models.Created,
Expand All @@ -265,22 +267,22 @@ func (r *CassandraReconciler) handleCreateCluster(
l.Error(err, "Failed to start user creation job")
r.EventRecorder.Eventf(cassandra, models.Warning, models.CreationFailed,
"User creation job is failed. Reason: %v", err)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(cassandra, models.Normal, models.Created,
"Cluster user creation job is started")
}
}

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleUpdateCluster(
ctx context.Context,
l logr.Logger,
cassandra *v1beta1.Cassandra,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Cassandra update event")

iData, err := r.API.GetCassandra(cassandra.Status.ID)
Expand All @@ -295,7 +297,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

iCassandra, err := cassandra.FromInstAPI(iData)
Expand All @@ -311,7 +313,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

if cassandra.Annotations[models.ExternalChangesAnnotation] == models.True {
Expand All @@ -331,7 +333,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
r.EventRecorder.Eventf(cassandra, models.Warning, models.UpdateFailed,
"Cannot update cluster settings. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

err = r.API.UpdateClusterSettings(cassandra.Status.ID, settingsToInstAPI)
Expand All @@ -342,7 +344,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
r.EventRecorder.Eventf(cassandra, models.Warning, models.UpdateFailed,
"Cannot update cluster settings. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand Down Expand Up @@ -385,10 +387,11 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}
}
return models.ReconcileRequeue

return reconcile.Result{}, err
}
}

Expand All @@ -409,7 +412,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(
Expand All @@ -419,10 +422,10 @@ func (r *CassandraReconciler) handleUpdateCluster(
"data centres", cassandra.Spec.DataCentres,
)

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1beta1.Cassandra, l logr.Logger) reconcile.Result {
func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1beta1.Cassandra, l logr.Logger) (reconcile.Result, error) {
if !cassandra.Spec.IsEqual(iCassandra.Spec) {
l.Info(msgSpecStillNoMatch,
"specification of k8s resource", cassandra.Spec,
Expand All @@ -432,12 +435,12 @@ func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1bet
if err != nil {
l.Error(err, "Cannot create specification difference message",
"instaclustr data", iCassandra.Spec, "k8s resource spec", cassandra.Spec)
return models.ExitReconcile
return models.ExitReconcile, nil
}

r.EventRecorder.Eventf(cassandra, models.Warning, models.ExternalChanges, msgDiffSpecs)

return models.ExitReconcile
return models.ExitReconcile, nil
}

patch := cassandra.NewPatch()
Expand All @@ -452,20 +455,20 @@ func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1bet
r.EventRecorder.Eventf(cassandra, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("External changes have been reconciled", "resource ID", cassandra.Status.ID)
r.EventRecorder.Event(cassandra, models.Normal, models.ExternalChanges, "External changes have been reconciled")

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleDeleteCluster(
ctx context.Context,
l logr.Logger,
cassandra *v1beta1.Cassandra,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Cassandra deletion event")

_, err := r.API.GetCassandra(cassandra.Status.ID)
Expand All @@ -483,7 +486,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

patch := cassandra.NewPatch()
Expand All @@ -507,7 +510,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster deletion on the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(cassandra, models.Normal, models.DeletionStarted,
Expand All @@ -524,15 +527,15 @@ func (r *CassandraReconciler) handleDeleteCluster(
r.EventRecorder.Eventf(cassandra, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", cassandra.Status.ID)

r.EventRecorder.Event(cassandra, models.Normal, models.DeletionStarted,
"Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.")

return models.ExitReconcile
return models.ExitReconcile, nil
}
}

Expand All @@ -552,7 +555,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster backups deletion is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("Cluster backup resources were deleted",
Expand All @@ -567,7 +570,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
for _, ref := range cassandra.Spec.UserRefs {
err = r.handleUsersDetach(ctx, l, cassandra, ref)
if err != nil {
return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand All @@ -589,7 +592,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

err = exposeservice.Delete(r.Client, cassandra.Name, cassandra.Namespace)
Expand All @@ -599,7 +602,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"cluster name", cassandra.Spec.Name,
)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("Cluster has been deleted",
Expand All @@ -613,7 +616,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster resource is deleted",
)

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleUsersCreate(
Expand Down Expand Up @@ -1249,6 +1252,8 @@ func (r *CassandraReconciler) handleExternalDelete(ctx context.Context, c *v1bet
// SetupWithManager sets up the controller with the Manager.
func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}).
For(&v1beta1.Cassandra{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
if deleting := confirmDeletion(event.Object); deleting {
Expand Down
Loading

0 comments on commit 0638ce1

Please sign in to comment.