From 097d4c3d1f77ba7fcda8aba74423a37ab8fff0dd Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Tue, 17 Oct 2023 16:04:19 +0300 Subject: [PATCH] the rate limiter was integrated into resourses of kafkamanagement group --- controllers/clusters/zookeeper_controller.go | 2 +- .../kafkamanagement/kafkaacl_controller.go | 50 ++++++++------- .../kafkamanagement/kafkauser_controller.go | 55 ++++++++-------- .../kafkamanagement/mirror_controller.go | 62 ++++++++++--------- .../kafkamanagement/topic_controller.go | 60 ++++++++++-------- 5 files changed, 126 insertions(+), 103 deletions(-) diff --git a/controllers/clusters/zookeeper_controller.go b/controllers/clusters/zookeeper_controller.go index d2252ce7b..2ff9d6282 100644 --- a/controllers/clusters/zookeeper_controller.go +++ b/controllers/clusters/zookeeper_controller.go @@ -68,7 +68,7 @@ func (r *ZookeeperReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if k8serrors.IsNotFound(err) { l.Info("Zookeeper resource is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "unable to fetch Zookeeper", diff --git a/controllers/kafkamanagement/kafkaacl_controller.go b/controllers/kafkamanagement/kafkaacl_controller.go index d23969f5b..823033f58 100644 --- a/controllers/kafkamanagement/kafkaacl_controller.go +++ b/controllers/kafkamanagement/kafkaacl_controller.go @@ -27,15 +27,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" ) // KafkaACLReconciler reconciles a KafkaACL object @@ -64,29 +65,29 @@ func (r *KafkaACLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if err != nil { if k8serrors.IsNotFound(err) { l.Error(err, "Kafka ACL resource is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Unable to fetch kafka ACL", "request", req) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } switch kafkaACL.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateKafkaACL(ctx, &kafkaACL, l), nil + return r.handleCreateKafkaACL(ctx, &kafkaACL, l) case models.UpdatingEvent: - return r.handleUpdateKafkaACL(ctx, &kafkaACL, l), nil + return r.handleUpdateKafkaACL(ctx, &kafkaACL, l) case models.DeletingEvent: - return r.handleDeleteKafkaACL(ctx, &kafkaACL, l), nil + return r.handleDeleteKafkaACL(ctx, &kafkaACL, l) default: l.Info("Event isn't handled", "cluster ID", kafkaACL.Spec.ClusterID, "user query", kafkaACL.Spec.UserQuery, "request", req, "event", kafkaACL.Annotations[models.ResourceStateAnnotation]) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } } @@ -94,7 +95,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL( ctx context.Context, acl *v1beta1.KafkaACL, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { if acl.Status.ID == "" { l.Info("Creating kafka ACL", "cluster ID", acl.Spec.ClusterID, @@ -111,7 +112,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL( "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -133,7 +134,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL( "Cluster resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } controllerutil.AddFinalizer(acl, models.DeletionFinalizer) @@ -149,7 +150,7 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info( @@ -159,14 +160,14 @@ func (r *KafkaACLReconciler) handleCreateKafkaACL( ) } - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *KafkaACLReconciler) handleUpdateKafkaACL( ctx context.Context, acl *v1beta1.KafkaACL, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { err := r.API.UpdateKafkaACL(acl.Status.ID, instaclustr.KafkaACLEndpoint, &acl.Spec) if err != nil { l.Error(err, "Cannot update kafka ACL", @@ -178,7 +179,7 @@ func (r *KafkaACLReconciler) handleUpdateKafkaACL( "Resource update on the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } patch := acl.NewPatch() @@ -194,21 +195,21 @@ func (r *KafkaACLReconciler) handleUpdateKafkaACL( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Kafka ACL has been updated", "cluster ID", acl.Spec.ClusterID, "user query", acl.Spec.UserQuery, ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *KafkaACLReconciler) handleDeleteKafkaACL( ctx context.Context, acl *v1beta1.KafkaACL, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { patch := acl.NewPatch() err := r.Patch(ctx, acl, patch) if err != nil { @@ -221,7 +222,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } status, err := r.API.GetKafkaACLStatus(acl.Status.ID, instaclustr.KafkaACLEndpoint) @@ -236,7 +237,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL( "Resource fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } if status != nil { @@ -251,7 +252,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL( "Resource deletion is failed on the Instaclustr. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( acl, models.Normal, models.DeletionStarted, @@ -272,7 +273,7 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL( "Cluster resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Kafka ACL has been deleted", @@ -285,12 +286,17 @@ func (r *KafkaACLReconciler) handleDeleteKafkaACL( "Resource is deleted", ) - return models.ExitReconcile + return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *KafkaACLReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries( + ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay, + ), + }). For(&v1beta1.KafkaACL{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { if event.Object.GetDeletionTimestamp() != nil { diff --git a/controllers/kafkamanagement/kafkauser_controller.go b/controllers/kafkamanagement/kafkauser_controller.go index 9659b7758..2c3c72ccc 100644 --- a/controllers/kafkamanagement/kafkauser_controller.go +++ b/controllers/kafkamanagement/kafkauser_controller.go @@ -35,17 +35,18 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" ) const ( @@ -78,7 +79,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err != nil { if k8serrors.IsNotFound(err) { l.Info("Kafka user resource is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Unable to fetch Kafka user", "request", req) r.EventRecorder.Eventf( @@ -87,7 +88,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } secret := &v1.Secret{} @@ -101,7 +102,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.FetchFailed, "Fetch user credentials secret is failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } username, password, err := r.getKafkaUserCredsFromSecret(user.Spec) @@ -110,7 +111,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.FetchFailed, "Fetch user credentials from secret is failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } if controllerutil.AddFinalizer(secret, user.GetDeletionFinalizer()) { @@ -122,7 +123,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.UpdatedEvent, "Cannot assign Kafka user to a k8s secret. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -135,7 +136,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( user, models.Warning, models.PatchFailed, "Patching Kafka user with finalizer has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -158,7 +159,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -185,7 +186,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info( @@ -210,7 +211,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -232,7 +233,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("Kafka user has been deleted", @@ -260,7 +261,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("Kafka user has been detached", @@ -272,7 +273,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( ) continue - } iKafkaUser := user.Spec.ToInstAPI(clusterID, username, password) @@ -288,7 +288,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } user.Annotations[models.ResourceStateAnnotation] = models.UpdatedEvent @@ -304,7 +304,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } user.Status.ClustersEvents[clusterID] = models.UpdatedEvent @@ -320,7 +320,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("Kafka user resource has been updated", @@ -337,7 +337,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( "username", username, "cluster ID", clusterID) r.EventRecorder.Event(user, models.Warning, models.DeletingEvent, instaclustr.MsgDeleteUser) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -350,7 +350,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( "Deleting finalizer from the Kafka user resource has been failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } controllerutil.RemoveFinalizer(secret, user.GetDeletionFinalizer()) @@ -361,13 +361,13 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.PatchFailed, "Resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("Kafka user resource has been deleted", "username", username) } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. @@ -391,6 +391,11 @@ func (r *KafkaUserReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries( + ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay, + ), + }). For(&v1beta1.KafkaUser{}, builder.WithPredicates(predicate.Funcs{ UpdateFunc: func(event event.UpdateEvent) bool { newObj := event.ObjectNew.(*v1beta1.KafkaUser) @@ -409,7 +414,7 @@ func (r *KafkaUserReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *KafkaUserReconciler) findSecretObjects(secret client.Object) []reconcile.Request { +func (r *KafkaUserReconciler) findSecretObjects(secret client.Object) []ctrl.Request { kafkaUserList := &v1beta1.KafkaUserList{} listOps := &client.ListOptions{ FieldSelector: fields.OneTermEqualSelector(kafkaUserField, secret.GetName()), @@ -417,18 +422,18 @@ func (r *KafkaUserReconciler) findSecretObjects(secret client.Object) []reconcil } err := r.List(context.TODO(), kafkaUserList, listOps) if err != nil { - return []reconcile.Request{} + return []ctrl.Request{} } - requests := make([]reconcile.Request, len(kafkaUserList.Items)) + requests := make([]ctrl.Request, len(kafkaUserList.Items)) for i, item := range kafkaUserList.Items { patch := item.NewPatch() item.GetAnnotations()[models.ResourceStateAnnotation] = models.SecretEvent err = r.Patch(context.TODO(), &item, patch) if err != nil { - return []reconcile.Request{} + return []ctrl.Request{} } - requests[i] = reconcile.Request{ + requests[i] = ctrl.Request{ NamespacedName: types.NamespacedName{ Name: item.GetName(), Namespace: item.GetNamespace(), diff --git a/controllers/kafkamanagement/mirror_controller.go b/controllers/kafkamanagement/mirror_controller.go index 9122be99c..4b5677de0 100644 --- a/controllers/kafkamanagement/mirror_controller.go +++ b/controllers/kafkamanagement/mirror_controller.go @@ -27,15 +27,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -66,35 +67,35 @@ func (r *MirrorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if err != nil { if k8serrors.IsNotFound(err) { l.Error(err, "Kafka mirror is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Unable to fetch Kafka mirror", "request", req) - return models.ReconcileRequeue, err + return ctrl.Result{}, err } switch mirror.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateCluster(ctx, mirror, l), nil + return r.handleCreateMirror(ctx, mirror, l) case models.UpdatingEvent: - return r.handleUpdateCluster(ctx, mirror, l), nil + return r.handleUpdateMirror(ctx, mirror, l) case models.DeletingEvent: - return r.handleDeleteCluster(ctx, mirror, l), nil + return r.handleDeleteMirror(ctx, mirror, l) case models.GenericEvent: l.Info("Event isn't handled", "kafka Connect ID to mirror", mirror.Spec.KafkaConnectClusterID, "request", req, "event", mirror.Annotations[models.ResourceStateAnnotation]) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } -func (r *MirrorReconciler) handleCreateCluster( +func (r *MirrorReconciler) handleCreateMirror( ctx context.Context, mirror *v1beta1.Mirror, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { l = l.WithName("Creation Event") if mirror.Status.ID == "" { @@ -104,7 +105,7 @@ func (r *MirrorReconciler) handleCreateCluster( l.Error(err, "Cannot create Kafka mirror", "spec", mirror.Spec) r.EventRecorder.Eventf(mirror, models.Warning, models.CreationFailed, "Resource creation on the Instaclustr is failed. Reason: %v", err) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Kafka mirror has been created", "mirror ID", iStatus.ID) r.EventRecorder.Eventf(mirror, models.Normal, models.Created, @@ -119,7 +120,7 @@ func (r *MirrorReconciler) handleCreateCluster( "kafka mirror connector name", iStatus.ConnectorName) r.EventRecorder.Eventf(mirror, models.Warning, models.PatchFailed, "Resource patch is failed after request to Instaclustr was sent. Reason: %v", err) - return models.ReconcileRequeue + return ctrl.Result{}, err } patch = mirror.NewPatch() @@ -130,7 +131,7 @@ func (r *MirrorReconciler) handleCreateCluster( "spec", mirror.Spec, "status", iStatus) r.EventRecorder.Eventf(mirror, models.Warning, models.PatchFailed, "Resource status patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return ctrl.Result{}, err } } @@ -140,19 +141,19 @@ func (r *MirrorReconciler) handleCreateCluster( "mirror cluster ID", mirror.Status.ID) r.EventRecorder.Eventf(mirror, models.Warning, models.CreationFailed, "Mirror status job creation is failed. Reason: %v", err) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Event(mirror, models.Normal, models.Created, "mirror status check job is started") - return models.ExitReconcile + return ctrl.Result{}, nil } -func (r *MirrorReconciler) handleUpdateCluster( +func (r *MirrorReconciler) handleUpdateMirror( ctx context.Context, mirror *v1beta1.Mirror, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { l = l.WithName("Update Event") iMirror, err := r.API.GetMirrorStatus(mirror.Status.ID) @@ -160,7 +161,7 @@ func (r *MirrorReconciler) handleUpdateCluster( l.Error(err, "Cannot get Kafka mirror from Instaclustr", "mirror ID", mirror.Status.ID) r.EventRecorder.Eventf(mirror, models.Warning, models.UpdateFailed, "Resource update on the Instaclustr API is failed. Reason: %v", err) - return models.ReconcileRequeue + return ctrl.Result{}, err } if mirror.Spec.TargetLatency != iMirror.TargetLatency { @@ -170,7 +171,7 @@ func (r *MirrorReconciler) handleUpdateCluster( "kafka connect ID", mirror.Spec.KafkaConnectClusterID, "mirror ID", mirror.Status.ID) r.EventRecorder.Eventf(mirror, models.Warning, models.UpdateFailed, "Failed to sent a update request to the Instaclustr API. Reason: %v", err) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Kafka mirror has been updated", @@ -179,7 +180,7 @@ func (r *MirrorReconciler) handleUpdateCluster( "target latency", mirror.Spec.TargetLatency, "connector name", mirror.Status.ConnectorName) - return models.ExitReconcile + return ctrl.Result{}, nil } patch := mirror.NewPatch() @@ -191,17 +192,17 @@ func (r *MirrorReconciler) handleUpdateCluster( "kafka connect", mirror.Spec.KafkaConnectClusterID) r.EventRecorder.Eventf(mirror, models.Warning, models.PatchFailed, "Resource status patch is failed. Reason: %v", err) - return models.ReconcileRequeue + return ctrl.Result{}, err } - return models.ExitReconcile + return ctrl.Result{}, nil } -func (r *MirrorReconciler) handleDeleteCluster( +func (r *MirrorReconciler) handleDeleteMirror( ctx context.Context, mirror *v1beta1.Mirror, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { l = l.WithName("Deletion Event") iMirror, err := r.API.GetMirrorStatus(mirror.Status.ID) @@ -211,7 +212,7 @@ func (r *MirrorReconciler) handleDeleteCluster( "mirror id", mirror.Status.ID) r.EventRecorder.Eventf(mirror, models.Warning, models.FetchFailed, "Fetch resource from the Instaclustr API is failed. Reason: %v", err) - return models.ReconcileRequeue + return ctrl.Result{}, err } if iMirror != nil { @@ -222,7 +223,7 @@ func (r *MirrorReconciler) handleDeleteCluster( "mirror ID", mirror.Status.ID) r.EventRecorder.Eventf(mirror, models.Warning, models.DeletionFailed, "Resource deletion on the Instaclustr is failed. Reason: %v", err) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf(mirror, models.Normal, models.DeletionStarted, @@ -239,7 +240,7 @@ func (r *MirrorReconciler) handleDeleteCluster( "cluster name", mirror.Spec.KafkaConnectClusterID) r.EventRecorder.Eventf(mirror, models.Warning, models.PatchFailed, "Mirror patch is failed after deletion. Reason: %v", err) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Kafka mirror has been deleted", @@ -249,7 +250,7 @@ func (r *MirrorReconciler) handleDeleteCluster( r.EventRecorder.Eventf(mirror, models.Normal, models.Deleted, "Resource is deleted") - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *MirrorReconciler) startClusterStatusJob(mirror *v1beta1.Mirror) error { @@ -346,6 +347,11 @@ func (r *MirrorReconciler) newWatchStatusJob(mirror *v1beta1.Mirror) scheduler.J // SetupWithManager sets up the controller with the Manager. func (r *MirrorReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries( + ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay, + ), + }). For(&v1beta1.Mirror{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { event.Object.GetAnnotations()[models.ResourceStateAnnotation] = models.CreatingEvent diff --git a/controllers/kafkamanagement/topic_controller.go b/controllers/kafkamanagement/topic_controller.go index 4f2d061a1..5f2290ba2 100644 --- a/controllers/kafkamanagement/topic_controller.go +++ b/controllers/kafkamanagement/topic_controller.go @@ -28,15 +28,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" ) // TopicReconciler reconciles a TopicName object @@ -60,42 +61,42 @@ type TopicReconciler struct { func (r *TopicReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { l := log.FromContext(ctx) - var topic v1beta1.Topic - err := r.Client.Get(ctx, req.NamespacedName, &topic) + topic := &v1beta1.Topic{} + err := r.Client.Get(ctx, req.NamespacedName, topic) if err != nil { if k8serrors.IsNotFound(err) { l.Error(err, "Kafka topic is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Unable to fetch Kafka topic", "request", req) - return models.ReconcileRequeue, err + return ctrl.Result{}, err } switch topic.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateCluster(ctx, &topic, l), nil + return r.handleCreateTopic(ctx, topic, l) case models.UpdatingEvent: - return r.handleUpdateCluster(ctx, &topic, l), nil + return r.handleUpdateTopic(ctx, topic, l) case models.DeletingEvent: - return r.handleDeleteCluster(ctx, &topic, l), nil + return r.handleDeleteTopic(ctx, topic, l) case models.GenericEvent: l.Info("Event isn't handled", "topic name", topic.Spec.TopicName, "request", req, "event", topic.Annotations[models.ResourceStateAnnotation]) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } -func (r *TopicReconciler) handleCreateCluster( +func (r *TopicReconciler) handleCreateTopic( ctx context.Context, topic *v1beta1.Topic, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { l = l.WithName("Creation Event") if topic.Status.ID == "" { @@ -113,7 +114,7 @@ func (r *TopicReconciler) handleCreateCluster( "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Kafka topic has been created", "cluster ID", topic.Status.ID) @@ -132,7 +133,7 @@ func (r *TopicReconciler) handleCreateCluster( "Resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } topic.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent @@ -146,18 +147,18 @@ func (r *TopicReconciler) handleCreateCluster( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } } - return models.ExitReconcile + return ctrl.Result{}, nil } -func (r *TopicReconciler) handleUpdateCluster( +func (r *TopicReconciler) handleUpdateTopic( ctx context.Context, t *v1beta1.Topic, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { l = l.WithName("Update Event") patch := t.NewPatch() @@ -174,7 +175,7 @@ func (r *TopicReconciler) handleUpdateCluster( "Resource update on the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Kafka topic has been updated", @@ -190,17 +191,17 @@ func (r *TopicReconciler) handleUpdateCluster( "Resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } - return models.ExitReconcile + return ctrl.Result{}, nil } -func (r *TopicReconciler) handleDeleteCluster( +func (r *TopicReconciler) handleDeleteTopic( ctx context.Context, topic *v1beta1.Topic, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { l = l.WithName("Deletion Event") _, err := r.API.GetTopicStatus(topic.Status.ID) @@ -213,7 +214,7 @@ func (r *TopicReconciler) handleDeleteCluster( "Fetch resource from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } if !errors.Is(err, instaclustr.NotFound) { @@ -227,7 +228,7 @@ func (r *TopicReconciler) handleDeleteCluster( "Resource deletion on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( topic, models.Normal, models.DeletionStarted, @@ -247,7 +248,7 @@ func (r *TopicReconciler) handleDeleteCluster( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Kafka topic has been deleted", @@ -258,7 +259,7 @@ func (r *TopicReconciler) handleDeleteCluster( "Cluster resource is deleted", ) - return models.ExitReconcile + return ctrl.Result{}, nil } // confirmDeletion confirms if resource is deleting and set appropriate annotation. @@ -272,6 +273,11 @@ func confirmDeletion(obj client.Object) { // SetupWithManager sets up the controller with the Manager. func (r *TopicReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries( + ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay, + ), + }). For(&v1beta1.Topic{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { event.Object.GetAnnotations()[models.ResourceStateAnnotation] = models.CreatingEvent