Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom rate limiter for all clusters #601

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *CassandraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

l.Error(err, "Unable to fetch Cassandra cluster",
"request", req)
return models.ReconcileRequeue, err
return reconcile.Result{}, err
}

switch cassandra.Annotations[models.ResourceStateAnnotation] {
Expand Down
79 changes: 40 additions & 39 deletions controllers/clusters/kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
"github.com/instaclustr/operator/pkg/scheduler"
)

Expand Down Expand Up @@ -73,19 +75,16 @@ func (r *KafkaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}

l.Error(err, "Unable to fetch Kafka", "request", req)
return models.ReconcileRequeue, err
return reconcile.Result{}, err
}

switch kafka.Annotations[models.ResourceStateAnnotation] {
case models.CreatingEvent:
return r.handleCreateCluster(ctx, &kafka, l), nil

return r.handleCreateCluster(ctx, &kafka, l)
case models.UpdatingEvent:
return r.handleUpdateCluster(ctx, &kafka, l), nil

return r.handleUpdateCluster(ctx, &kafka, l)
case models.DeletingEvent:
return r.handleDeleteCluster(ctx, &kafka, l), nil

return r.handleDeleteCluster(ctx, &kafka, l)
case models.GenericEvent:
l.Info("Event isn't handled", "cluster name", kafka.Spec.Name, "request", req,
"event", kafka.Annotations[models.ResourceStateAnnotation])
Expand All @@ -95,7 +94,7 @@ func (r *KafkaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) reconcile.Result {
func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) {
l = l.WithName("Kafka creation Event")

var err error
Expand All @@ -115,7 +114,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
"Cluster creation on the Instaclustr is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -134,7 +133,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
"Cluster resource status patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

kafka.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent
Expand All @@ -149,7 +148,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("Cluster has been created",
Expand All @@ -167,7 +166,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
"Cluster status check job creation is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -182,7 +181,7 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
r.EventRecorder.Eventf(kafka, models.Warning, models.CreationFailed,
"User creation job is failed. Reason: %v", err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(kafka, models.Normal, models.Created,
Expand All @@ -191,26 +190,26 @@ func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, kafka *v1beta
}
}

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) handleUpdateCluster(
ctx context.Context,
k *v1beta1.Kafka,
l logr.Logger,
) reconcile.Result {
) (reconcile.Result, error) {
l = l.WithName("Kafka update Event")

iData, err := r.API.GetKafka(k.Status.ID)
if err != nil {
l.Error(err, "Cannot get cluster from the Instaclustr", "cluster ID", k.Status.ID)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

iKafka, err := k.FromInstAPI(iData)
if err != nil {
l.Error(err, "Cannot convert cluster from the Instaclustr API", "cluster ID", k.Status.ID)
return models.ExitReconcile
return reconcile.Result{}, err
}

if iKafka.Status.ClusterStatus.State != StatusRUNNING {
Expand All @@ -230,9 +229,9 @@ func (r *KafkaReconciler) handleUpdateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}
return models.ReconcileRequeue
return reconcile.Result{}, err
}

if k.Annotations[models.ExternalChangesAnnotation] == models.True {
Expand All @@ -251,12 +250,12 @@ func (r *KafkaReconciler) handleUpdateCluster(
r.EventRecorder.Eventf(k, models.Warning, models.UpdateFailed,
"Cannot update cluster settings. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

if k.Spec.IsEqual(iKafka.Spec) {
return models.ExitReconcile
return models.ExitReconcile, nil
}

l.Info("Update request to Instaclustr API has been sent",
Expand Down Expand Up @@ -284,10 +283,10 @@ func (r *KafkaReconciler) handleUpdateCluster(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

return models.ReconcileRequeue
return reconcile.Result{}, err
}

patch := k.NewPatch()
Expand All @@ -301,7 +300,7 @@ func (r *KafkaReconciler) handleUpdateCluster(
r.EventRecorder.Eventf(k, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(
Expand All @@ -311,7 +310,7 @@ func (r *KafkaReconciler) handleUpdateCluster(
"data centres", k.Spec.DataCentres,
)

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) handleCreateUser(
Expand Down Expand Up @@ -531,7 +530,7 @@ func (r *KafkaReconciler) handleUserEvent(
}
}

func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Logger) reconcile.Result {
func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) {
if !k.Spec.IsEqual(ik.Spec) {
l.Info("The k8s specification is different from Instaclustr Console. Update operations are blocked.",
"specification of k8s resource", k.Spec,
Expand All @@ -541,10 +540,10 @@ func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Log
if err != nil {
l.Error(err, "Cannot create specification difference message",
"instaclustr data", ik.Spec, "k8s resource spec", k.Spec)
return models.ExitReconcile
return models.ExitReconcile, nil
}
r.EventRecorder.Eventf(k, models.Warning, models.ExternalChanges, msgDiffSpecs)
return models.ExitReconcile
return models.ExitReconcile, nil
}

patch := k.NewPatch()
Expand All @@ -559,16 +558,16 @@ func (r *KafkaReconciler) handleExternalChanges(k, ik *v1beta1.Kafka, l logr.Log
r.EventRecorder.Eventf(k, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("External changes have been reconciled", "kafka ID", k.Status.ID)
r.EventRecorder.Event(k, models.Normal, models.ExternalChanges, "External changes have been reconciled")

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) reconcile.Result {
func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) {
l = l.WithName("Kafka deletion Event")

_, err := r.API.GetKafka(kafka.Status.ID)
Expand All @@ -581,7 +580,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"Cluster resource fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

patch := kafka.NewPatch()
Expand All @@ -600,7 +599,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"Cluster deletion is failed on the Instaclustr. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -621,23 +620,23 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", kafka.Status.ID)

r.EventRecorder.Event(kafka, models.Normal, models.DeletionStarted,
"Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.")

return models.ExitReconcile
return models.ExitReconcile, nil
}
}

for _, ref := range kafka.Spec.UserRefs {
err = r.detachUser(ctx, kafka, l, ref)
if err != nil {

return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand All @@ -654,7 +653,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

err = exposeservice.Delete(r.Client, kafka.Name, kafka.Namespace)
Expand All @@ -664,7 +663,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"cluster name", kafka.Spec.Name,
)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("Cluster was deleted",
Expand All @@ -676,7 +675,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, kafka *v1beta
"Cluster resource is deleted",
)

return models.ExitReconcile
return models.ExitReconcile, nil
}

func (r *KafkaReconciler) startClusterStatusJob(kafka *v1beta1.Kafka) error {
Expand Down Expand Up @@ -915,6 +914,8 @@ func (r *KafkaReconciler) handleExternalDelete(ctx context.Context, kafka *v1bet
// SetupWithManager sets up the controller with the Manager.
func (r *KafkaReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}).
For(&v1beta1.Kafka{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
annots := event.Object.GetAnnotations()
Expand Down
Loading