Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration of the rate limiter to the kafka management group #600

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controllers/clusters/zookeeper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *ZookeeperReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if k8serrors.IsNotFound(err) {
l.Info("Zookeeper resource is not found",
"request", req)
return models.ExitReconcile, nil
return ctrl.Result{}, nil
}

l.Error(err, "unable to fetch Zookeeper",
Expand Down
50 changes: 28 additions & 22 deletions controllers/kafkamanagement/kafkaacl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/instaclustr/operator/apis/kafkamanagement/v1beta1"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
)

// KafkaACLReconciler reconciles a KafkaACL object
Expand Down Expand Up @@ -64,37 +65,37 @@ func (r *KafkaACLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if err != nil {
if k8serrors.IsNotFound(err) {
l.Error(err, "Kafka ACL resource is not found", "request", req)
return models.ExitReconcile, nil
return ctrl.Result{}, nil
}

l.Error(err, "Unable to fetch kafka ACL", "request", req)
return models.ReconcileRequeue, nil
return ctrl.Result{}, err
}

switch kafkaACL.Annotations[models.ResourceStateAnnotation] {
case models.CreatingEvent:
return r.handleCreateKafkaACL(ctx, &kafkaACL, l), nil
return r.handleCreateKafkaACL(ctx, &kafkaACL, l)

case models.UpdatingEvent:
return r.handleUpdateKafkaACL(ctx, &kafkaACL, l), nil
return r.handleUpdateKafkaACL(ctx, &kafkaACL, l)

case models.DeletingEvent:
return r.handleDeleteKafkaACL(ctx, &kafkaACL, l), nil
return r.handleDeleteKafkaACL(ctx, &kafkaACL, l)
default:
l.Info("Event isn't handled",
"cluster ID", kafkaACL.Spec.ClusterID,
"user query", kafkaACL.Spec.UserQuery,
"request", req,
"event", kafkaACL.Annotations[models.ResourceStateAnnotation])
return models.ExitReconcile, nil
return ctrl.Result{}, nil
}
}

func (r *KafkaACLReconciler) handleCreateKafkaACL(
ctx context.Context,
acl *v1beta1.KafkaACL,
l logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
if acl.Status.ID == "" {
l.Info("Creating kafka ACL",
"cluster ID", acl.Spec.ClusterID,
Expand All @@ -111,7 +112,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
"Resource creation on the Instaclustr is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

r.EventRecorder.Eventf(
Expand All @@ -133,7 +134,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
"Cluster resource status patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

controllerutil.AddFinalizer(acl, models.DeletionFinalizer)
Expand All @@ -149,7 +150,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

l.Info(
Expand All @@ -159,14 +160,14 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL(
)
}

return models.ExitReconcile
return ctrl.Result{}, nil
}

func (r *KafkaACLReconciler) handleUpdateKafkaACL(
ctx context.Context,
acl *v1beta1.KafkaACL,
l logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
err := r.API.UpdateKafkaACL(acl.Status.ID, instaclustr.KafkaACLEndpoint, &acl.Spec)
if err != nil {
l.Error(err, "Cannot update kafka ACL",
Expand All @@ -178,7 +179,7 @@ func (r *KafkaACLReconciler) handleUpdateKafkaACL(
"Resource update on the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

patch := acl.NewPatch()
Expand All @@ -194,21 +195,21 @@ func (r *KafkaACLReconciler) handleUpdateKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

l.Info("Kafka ACL has been updated",
"cluster ID", acl.Spec.ClusterID,
"user query", acl.Spec.UserQuery,
)
return models.ExitReconcile
return ctrl.Result{}, nil
}

func (r *KafkaACLReconciler) handleDeleteKafkaACL(
ctx context.Context,
acl *v1beta1.KafkaACL,
l logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
patch := acl.NewPatch()
err := r.Patch(ctx, acl, patch)
if err != nil {
Expand All @@ -221,7 +222,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

status, err := r.API.GetKafkaACLStatus(acl.Status.ID, instaclustr.KafkaACLEndpoint)
Expand All @@ -236,7 +237,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Resource fetch from the Instaclustr API is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

if status != nil {
Expand All @@ -251,7 +252,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Resource deletion is failed on the Instaclustr. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}
r.EventRecorder.Eventf(
acl, models.Normal, models.DeletionStarted,
Expand All @@ -272,7 +273,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Cluster resource patch is failed. Reason: %v",
err,
)
return models.ReconcileRequeue
return ctrl.Result{}, err
}

l.Info("Kafka ACL has been deleted",
Expand All @@ -285,12 +286,17 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL(
"Resource is deleted",
)

return models.ExitReconcile
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *KafkaACLReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(
ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay,
),
}).
For(&v1beta1.KafkaACL{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
if event.Object.GetDeletionTimestamp() != nil {
Expand Down
Loading