Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Rate limiter #579

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,11 @@ cert-deploy: ## Deploy cert-manager
.PHONY: cert-undeploy
cert-undeploy: ## UnDeploy cert-manager
kubectl delete -f https://github.com/cert-manager/cert-manager/releases/download/v1.10.0/cert-manager.yaml

.PHONY dev-build:
dev-build: docker-build kind-load deploy ## builds docker-image, loads it to kind cluster and deploys operator

.PHONY: kind-load
kind-load: ## loads given image to kind cluster
kind load docker-image ${IMG}

107 changes: 56 additions & 51 deletions controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
"github.com/instaclustr/operator/pkg/scheduler"
)

Expand Down Expand Up @@ -87,22 +89,22 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

switch cassandra.Annotations[models.ResourceStateAnnotation] {
case models.CreatingEvent:
return r.handleCreateCluster(ctx, l, cassandra), nil
return r.handleCreateCluster(ctx, l, cassandra)
case models.UpdatingEvent:
return r.handleUpdateCluster(ctx, l, cassandra), nil
return r.handleUpdateCluster(ctx, l, cassandra)
case models.DeletingEvent:
return r.handleDeleteCluster(ctx, l, cassandra), nil
return r.handleDeleteCluster(ctx, l, cassandra)
case models.GenericEvent:
l.Info("Event isn't handled",
"cluster name", cassandra.Spec.Name,
"request", req,
"event", cassandra.Annotations[models.ResourceStateAnnotation])
return models.ExitReconcile, nil
default:
l.Info("Unknown event isn't handled",
l.Info("Event isn't handled",
"request", req,
"event", cassandra.Annotations[models.ResourceStateAnnotation],
)
"event", cassandra.Annotations[models.ResourceStateAnnotation])

return models.ExitReconcile, nil
}
}
Expand All @@ -111,7 +113,7 @@ func (r *CassandraReconciler) handleCreateCluster(
ctx context.Context,
l logr.Logger,
cassandra *v1beta1.Cassandra,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Cassandra creation event")
var err error
patch := cassandra.NewPatch()
Expand All @@ -135,7 +137,7 @@ func (r *CassandraReconciler) handleCreateCluster(
err,
)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -162,7 +164,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster creation on the Instaclustr is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -188,7 +190,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster resource status patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

controllerutil.AddFinalizer(cassandra, models.DeletionFinalizer)
Expand All @@ -208,7 +210,7 @@ func (r *CassandraReconciler) handleCreateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(
Expand All @@ -227,13 +229,13 @@ func (r *CassandraReconciler) handleCreateCluster(
l.Error(err, "Cannot start cluster status job",
"cassandra cluster ID", cassandra.Status.ID)

r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
"Cluster status check job is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
}
r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
"Cluster status check job is failed. Reason: %v",
err,
)
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
cassandra, models.Normal, models.Created,
Expand All @@ -246,13 +248,13 @@ func (r *CassandraReconciler) handleCreateCluster(
"cluster ID", cassandra.Status.ID,
)

r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
"Cluster backups check job is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
}
r.EventRecorder.Eventf(
cassandra, models.Warning, models.CreationFailed,
"Cluster backups check job is failed. Reason: %v",
err,
)
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
cassandra, models.Normal, models.Created,
Expand All @@ -265,22 +267,22 @@ func (r *CassandraReconciler) handleCreateCluster(
l.Error(err, "Failed to start user creation job")
r.EventRecorder.Eventf(cassandra, models.Warning, models.CreationFailed,
"User creation job is failed. Reason: %v", err)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(cassandra, models.Normal, models.Created,
"Cluster user creation job is started")
}
}

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleUpdateCluster(
ctx context.Context,
l logr.Logger,
cassandra *v1beta1.Cassandra,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Cassandra update event")

iData, err := r.API.GetCassandra(cassandra.Status.ID)
Expand All @@ -295,7 +297,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

iCassandra, err := cassandra.FromInstAPI(iData)
Expand All @@ -311,7 +313,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

if cassandra.Annotations[models.ExternalChangesAnnotation] == models.True {
Expand All @@ -331,7 +333,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
r.EventRecorder.Eventf(cassandra, models.Warning, models.UpdateFailed,
"Cannot update cluster settings. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

err = r.API.UpdateClusterSettings(cassandra.Status.ID, settingsToInstAPI)
Expand All @@ -342,7 +344,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
r.EventRecorder.Eventf(cassandra, models.Warning, models.UpdateFailed,
"Cannot update cluster settings. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand Down Expand Up @@ -385,10 +387,11 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}
}
return models.ReconcileRequeue

return reconcile.Result{}, err
}
}

Expand All @@ -409,7 +412,7 @@ func (r *CassandraReconciler) handleUpdateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(
Expand All @@ -419,10 +422,10 @@ func (r *CassandraReconciler) handleUpdateCluster(
"data centres", cassandra.Spec.DataCentres,
)

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1beta1.Cassandra, l logr.Logger) reconcile.Result {
func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1beta1.Cassandra, l logr.Logger) (reconcile.Result, error) {
if !cassandra.Spec.IsEqual(iCassandra.Spec) {
l.Info(msgSpecStillNoMatch,
"specification of k8s resource", cassandra.Spec,
Expand All @@ -432,12 +435,12 @@ func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1bet
if err != nil {
l.Error(err, "Cannot create specification difference message",
"instaclustr data", iCassandra.Spec, "k8s resource spec", cassandra.Spec)
return models.ExitReconcile
return models.ExitReconcile, nil
}

r.EventRecorder.Eventf(cassandra, models.Warning, models.ExternalChanges, msgDiffSpecs)

return models.ExitReconcile
return models.ExitReconcile, nil
}

patch := cassandra.NewPatch()
Expand All @@ -452,20 +455,20 @@ func (r *CassandraReconciler) handleExternalChanges(cassandra, iCassandra *v1bet
r.EventRecorder.Eventf(cassandra, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("External changes have been reconciled", "resource ID", cassandra.Status.ID)
r.EventRecorder.Event(cassandra, models.Normal, models.ExternalChanges, "External changes have been reconciled")

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleDeleteCluster(
ctx context.Context,
l logr.Logger,
cassandra *v1beta1.Cassandra,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Cassandra deletion event")

_, err := r.API.GetCassandra(cassandra.Status.ID)
Expand All @@ -483,7 +486,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

patch := cassandra.NewPatch()
Expand All @@ -507,7 +510,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster deletion on the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(cassandra, models.Normal, models.DeletionStarted,
Expand All @@ -524,15 +527,15 @@ func (r *CassandraReconciler) handleDeleteCluster(
r.EventRecorder.Eventf(cassandra, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", cassandra.Status.ID)

r.EventRecorder.Event(cassandra, models.Normal, models.DeletionStarted,
"Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.")

return models.ExitReconcile
return models.ExitReconcile, nil
}
}

Expand All @@ -552,7 +555,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster backups deletion is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("Cluster backup resources were deleted",
Expand All @@ -567,7 +570,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
for _, ref := range cassandra.Spec.UserRefs {
err = r.handleUsersDetach(ctx, l, cassandra, ref)
if err != nil {
return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand All @@ -589,7 +592,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

err = exposeservice.Delete(r.Client, cassandra.Name, cassandra.Namespace)
Expand All @@ -599,7 +602,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"cluster name", cassandra.Spec.Name,
)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("Cluster has been deleted",
Expand All @@ -613,7 +616,7 @@ func (r *CassandraReconciler) handleDeleteCluster(
"Cluster resource is deleted",
)

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *CassandraReconciler) handleUsersCreate(
Expand Down Expand Up @@ -1249,6 +1252,8 @@ func (r *CassandraReconciler) handleExternalDelete(ctx context.Context, c *v1bet
// SetupWithManager sets up the controller with the Manager.
func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}).
For(&v1beta1.Cassandra{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
if deleting := confirmDeletion(event.Object); deleting {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-version v1.6.0
github.com/jackc/pgx/v5 v5.4.3
github.com/lib/pq v1.10.9
github.com/onsi/ginkgo/v2 v2.0.0
github.com/onsi/gomega v1.18.1
go.uber.org/zap v1.19.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down
Loading