diff --git a/controllers/clusterresources/awsencryptionkey_controller.go b/controllers/clusterresources/awsencryptionkey_controller.go index da4baba0f..9eaadb87f 100644 --- a/controllers/clusterresources/awsencryptionkey_controller.go +++ b/controllers/clusterresources/awsencryptionkey_controller.go @@ -27,15 +27,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -70,18 +71,18 @@ func (r *AWSEncryptionKeyReconciler) Reconcile(ctx context.Context, req ctrl.Req l.Info("AWS encryption key resource is not found", "resource name", req.NamespacedName, ) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Unable to fetch AWS encryption key") - return models.ReconcileRequeue, err + return ctrl.Result{}, err } switch encryptionKey.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreate(ctx, encryptionKey, &l), nil + return r.handleCreate(ctx, encryptionKey, &l) case models.DeletingEvent: - return r.handleDelete(ctx, encryptionKey, &l), nil + return r.handleDelete(ctx, encryptionKey, &l) case models.GenericEvent: l.Info("AWS encryption key event isn't handled", "alias", encryptionKey.Spec.Alias, @@ -89,17 +90,17 @@ func (r *AWSEncryptionKeyReconciler) Reconcile(ctx context.Context, req ctrl.Req "provider account name", encryptionKey.Spec.ProviderAccountName, "request", req, "event", encryptionKey.Annotations[models.ResourceStateAnnotation]) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } func (r *AWSEncryptionKeyReconciler) handleCreate( ctx context.Context, encryptionKey *v1beta1.AWSEncryptionKey, l *logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { if encryptionKey.Status.ID == "" { l.Info( "Creating AWS encryption key", @@ -120,7 +121,7 @@ func (r *AWSEncryptionKeyReconciler) handleCreate( "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -138,7 +139,7 @@ func (r *AWSEncryptionKeyReconciler) handleCreate( "Resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } encryptionKey.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent @@ -154,7 +155,7 @@ func (r *AWSEncryptionKeyReconciler) handleCreate( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info( @@ -173,7 +174,7 @@ func (r *AWSEncryptionKeyReconciler) handleCreate( "Resource status job creation is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -181,14 +182,14 @@ func (r *AWSEncryptionKeyReconciler) handleCreate( "Resource status check job is started", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *AWSEncryptionKeyReconciler) handleDelete( ctx context.Context, encryptionKey *v1beta1.AWSEncryptionKey, l *logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { status, err := r.API.GetEncryptionKeyStatus(encryptionKey.Status.ID, instaclustr.AWSEncryptionKeyEndpoint) if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error( @@ -202,7 +203,7 @@ func (r *AWSEncryptionKeyReconciler) handleDelete( "Fetch resource from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } if status != nil { @@ -219,7 +220,7 @@ func (r *AWSEncryptionKeyReconciler) handleDelete( "Resource deletion on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( encryptionKey, models.Normal, models.DeletionStarted, @@ -244,7 +245,7 @@ func (r *AWSEncryptionKeyReconciler) handleDelete( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("AWS encryption key has been deleted", @@ -258,7 +259,7 @@ func (r *AWSEncryptionKeyReconciler) handleDelete( "Resource is deleted", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *AWSEncryptionKeyReconciler) startEncryptionKeyStatusJob(encryptionKey *v1beta1.AWSEncryptionKey) error { @@ -340,6 +341,8 @@ func (r *AWSEncryptionKeyReconciler) handleExternalDelete(ctx context.Context, k // SetupWithManager sets up the controller with the Manager. func (r *AWSEncryptionKeyReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.AWSEncryptionKey{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { if event.Object.GetDeletionTimestamp() != nil { diff --git a/controllers/clusterresources/awsendpointserviceprincipal_controller.go b/controllers/clusterresources/awsendpointserviceprincipal_controller.go index fb81ddbb3..c28c11b78 100644 --- a/controllers/clusterresources/awsendpointserviceprincipal_controller.go +++ b/controllers/clusterresources/awsendpointserviceprincipal_controller.go @@ -28,12 +28,14 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" clusterresourcesv1beta1 "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -65,38 +67,26 @@ func (r *AWSEndpointServicePrincipalReconciler) Reconcile(ctx context.Context, r }, principal) if err != nil { if k8serrors.IsNotFound(err) { - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Unable to fetch an AWS endpoint service principal resource") - return models.ReconcileRequeue, err + return ctrl.Result{}, err } - // Handle resource deletion if principal.DeletionTimestamp != nil { - err = r.handleDelete(ctx, l, principal) - if err != nil { - return models.ReconcileRequeue, err - } - - return models.ExitReconcile, nil + return r.handleDelete(ctx, l, principal) } - // Handle resource creation if principal.Status.ID == "" { - err = r.handleCreate(ctx, l, principal) - if err != nil { - return models.ReconcileRequeue, nil - } - - return models.ExitReconcile, nil + return r.handleCreate(ctx, l, principal) } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } -func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context, l logr.Logger, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error { +func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context, l logr.Logger, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) (ctrl.Result, error) { b, err := r.API.CreateAWSEndpointServicePrincipal(principal.Spec) if err != nil { l.Error(err, "failed to create an AWS endpoint service principal resource on Instaclustr") @@ -104,7 +94,7 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context "Failed to create an AWS endpoint service principal on Instaclustr. Reason: %v", err, ) - return err + return ctrl.Result{}, err } patch := principal.NewPatch() @@ -115,7 +105,7 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context "Failed to parse an AWS endpoint service principal resource response from Instaclustr. Reason: %v", err, ) - return err + return ctrl.Result{}, err } err = r.Status().Patch(ctx, principal, patch) @@ -125,7 +115,7 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context "Failed to patch an AWS endpoint service principal resource with its ID. Reason: %v", err, ) - return err + return ctrl.Result{}, err } controllerutil.AddFinalizer(principal, models.DeletionFinalizer) @@ -136,7 +126,7 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context "Failed to patch an AWS endpoint service principal resource with finalizer. Reason: %v", err, ) - return err + return ctrl.Result{}, err } l.Info("AWS endpoint service principal resource has been created") @@ -151,16 +141,16 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context "Failed to start status checker job. Reason: %w", err, ) - return err + return ctrl.Result{}, err } r.EventRecorder.Eventf(principal, models.Normal, models.Created, "Status check job %s has been started", principal.GetJobID(scheduler.StatusChecker), ) - return nil + return ctrl.Result{}, nil } -func (r *AWSEndpointServicePrincipalReconciler) handleDelete(ctx context.Context, logger logr.Logger, resource *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error { +func (r *AWSEndpointServicePrincipalReconciler) handleDelete(ctx context.Context, logger logr.Logger, resource *clusterresourcesv1beta1.AWSEndpointServicePrincipal) (ctrl.Result, error) { err := r.API.DeleteAWSEndpointServicePrincipal(resource.Status.ID) if err != nil && !errors.Is(err, instaclustr.NotFound) { logger.Error(err, "failed to delete an AWS endpoint service principal resource on Instaclustr") @@ -168,7 +158,7 @@ func (r *AWSEndpointServicePrincipalReconciler) handleDelete(ctx context.Context "Failed to delete an AWS endpoint service principal on Instaclustr. Reason: %v", err, ) - return err + return ctrl.Result{}, err } patch := resource.NewPatch() @@ -180,12 +170,12 @@ func (r *AWSEndpointServicePrincipalReconciler) handleDelete(ctx context.Context "Failed to delete finalizer from an AWS endpoint service principal resource. Reason: %v", err, ) - return err + return ctrl.Result{}, err } logger.Info("AWS endpoint service principal resource has been deleted") - return nil + return ctrl.Result{}, nil } func (r *AWSEndpointServicePrincipalReconciler) startWatchStatusJob(ctx context.Context, resource *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error { @@ -247,6 +237,8 @@ func (r *AWSEndpointServicePrincipalReconciler) handleExternalDelete(ctx context // SetupWithManager sets up the controller with the Manager. func (r *AWSEndpointServicePrincipalReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&clusterresourcesv1beta1.AWSEndpointServicePrincipal{}). Complete(r) } diff --git a/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go b/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go index 11da47237..742228611 100644 --- a/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go +++ b/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go @@ -27,15 +27,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -67,37 +68,35 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) Reconcile(ctx context.Context, l.Info("AWS security group firewall rule resource is not found", "resource name", req.NamespacedName, ) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Unable to fetch AWS security group firewall rule") - return models.ReconcileRequeue, err + return ctrl.Result{}, err } switch firewallRule.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - reconcileResult := r.handleCreateFirewallRule(ctx, firewallRule, &l) - return reconcileResult, nil + return r.handleCreateFirewallRule(ctx, firewallRule, &l) case models.DeletingEvent: - reconcileResult := r.handleDeleteFirewallRule(ctx, firewallRule, &l) - return reconcileResult, nil + return r.handleDeleteFirewallRule(ctx, firewallRule, &l) case models.GenericEvent: l.Info("AWS security group firewall rule event isn't handled", "cluster ID", firewallRule.Spec.ClusterID, "type", firewallRule.Spec.Type, "request", req, "event", firewallRule.Annotations[models.ResourceStateAnnotation]) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule( ctx context.Context, firewallRule *v1beta1.AWSSecurityGroupFirewallRule, l *logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { if firewallRule.Status.ID == "" { l.Info( "Creating AWS security group firewall rule", @@ -118,7 +117,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule( "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -135,7 +134,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule( "Resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } firewallRule.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent @@ -151,7 +150,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info( @@ -170,7 +169,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule( "Resource status job creation is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -178,14 +177,14 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule( "Resource status check job is started", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( ctx context.Context, firewallRule *v1beta1.AWSSecurityGroupFirewallRule, l *logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { patch := firewallRule.NewPatch() err := r.Patch(ctx, firewallRule, patch) if err != nil { @@ -199,7 +198,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } status, err := r.API.GetFirewallRuleStatus(firewallRule.Status.ID, instaclustr.AWSSecurityGroupFirewallRuleEndpoint) @@ -215,7 +214,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( "Fetch resource from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } if status != nil && status.Status != statusDELETED { @@ -232,7 +231,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( "Resource deletion on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( firewallRule, models.Normal, models.DeletionStarted, @@ -256,7 +255,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("AWS security group firewall rule has been deleted", @@ -270,7 +269,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleDeleteFirewallRule( "Resource is deleted", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *AWSSecurityGroupFirewallRuleReconciler) startFirewallRuleStatusJob(firewallRule *v1beta1.AWSSecurityGroupFirewallRule) error { @@ -352,6 +351,8 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleExternalDelete(ctx contex // SetupWithManager sets up the controller with the Manager. func (r *AWSSecurityGroupFirewallRuleReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.AWSSecurityGroupFirewallRule{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { if event.Object.GetDeletionTimestamp() != nil { diff --git a/controllers/clusterresources/awsvpcpeering_controller.go b/controllers/clusterresources/awsvpcpeering_controller.go index 7aa4708fd..246b23c84 100644 --- a/controllers/clusterresources/awsvpcpeering_controller.go +++ b/controllers/clusterresources/awsvpcpeering_controller.go @@ -27,15 +27,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -66,19 +67,19 @@ func (r *AWSVPCPeeringReconciler) Reconcile(ctx context.Context, req ctrl.Reques if err != nil { if k8serrors.IsNotFound(err) { l.Error(err, "AWS VPC Peering resource is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "unable to fetch AWS VPC Peering", "request", req) - return models.ReconcileRequeue, err + return ctrl.Result{}, err } switch aws.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreatePeering(ctx, aws, l), nil + return r.handleCreatePeering(ctx, aws, l) case models.UpdatingEvent: - return r.handleUpdatePeering(ctx, aws, l), nil + return r.handleUpdatePeering(ctx, aws, l) case models.DeletingEvent: - return r.handleDeletePeering(ctx, aws, l), nil + return r.handleDeletePeering(ctx, aws, l) default: l.Info("event isn't handled", "AWS Account ID", aws.Spec.PeerAWSAccountID, @@ -86,7 +87,7 @@ func (r *AWSVPCPeeringReconciler) Reconcile(ctx context.Context, req ctrl.Reques "Region", aws.Spec.PeerRegion, "Request", req, "event", aws.Annotations[models.ResourceStateAnnotation]) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } } @@ -94,7 +95,7 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering( ctx context.Context, aws *v1beta1.AWSVPCPeering, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { if aws.Status.ID == "" { l.Info( "Creating AWS VPC Peering resource", @@ -114,7 +115,7 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering( "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -139,7 +140,7 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering( "Resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } controllerutil.AddFinalizer(aws, models.DeletionFinalizer) @@ -158,7 +159,7 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info( @@ -178,7 +179,7 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering( "Resource status check job is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -186,14 +187,14 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering( "Resource status check job is started", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *AWSVPCPeeringReconciler) handleUpdatePeering( ctx context.Context, aws *v1beta1.AWSVPCPeering, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { instaAWSPeering, err := r.API.GetAWSVPCPeering(aws.Status.ID) if err != nil { l.Error(err, "Cannot get AWS VPC Peering from Instaclutr", @@ -203,7 +204,7 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering( "Cannot get AWS VPC Peering from Instaclutr. Reason: %v", ) - return models.ReconcileRequeue + return ctrl.Result{}, err } if aws.Annotations[models.ExternalChangesAnnotation] == models.True { @@ -217,7 +218,7 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering( "The resource specification still differs from the Instaclustr resource specification, please reconcile it manually.", ) - return models.ExitReconcile + return ctrl.Result{}, nil } patch := aws.NewPatch() @@ -233,7 +234,7 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering( err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("External changes of the k8s resource specification was fixed", @@ -243,7 +244,7 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering( "External changes of the k8s resource specification was fixed", ) - return models.ExitReconcile + return ctrl.Result{}, nil } err = r.API.UpdatePeering(aws.Status.ID, instaclustr.AWSPeeringEndpoint, &aws.Spec) @@ -259,7 +260,7 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering( "Resource update on the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } patch := aws.NewPatch() @@ -278,7 +279,7 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("AWS VPC Peering resource has been updated", @@ -290,14 +291,14 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering( "AWS VPC Peering Status", aws.Status.PeeringStatus, ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *AWSVPCPeeringReconciler) handleDeletePeering( ctx context.Context, aws *v1beta1.AWSVPCPeering, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { status, err := r.API.GetPeeringStatus(aws.Status.ID, instaclustr.AWSPeeringEndpoint) if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error( @@ -312,7 +313,7 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering( "Resource fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } if status != nil { @@ -330,13 +331,13 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering( "Resource deletion on the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( aws, models.Normal, models.DeletionStarted, "Resource deletion request is sent", ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.Scheduler.RemoveJob(aws.GetJobID(scheduler.StatusChecker)) @@ -358,7 +359,7 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("AWS VPC Peering has been deleted", @@ -369,7 +370,7 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering( "AWS VPC Peering Status", aws.Status.PeeringStatus, ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *AWSVPCPeeringReconciler) startAWSVPCPeeringStatusJob(awsPeering *v1beta1.AWSVPCPeering) error { @@ -471,6 +472,8 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe // SetupWithManager sets up the controller with the Manager. func (r *AWSVPCPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.AWSVPCPeering{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { event.Object.SetAnnotations(map[string]string{models.ResourceStateAnnotation: models.CreatingEvent}) diff --git a/controllers/clusterresources/azurevnetpeering_controller.go b/controllers/clusterresources/azurevnetpeering_controller.go index f3871baa9..f6f2f9e78 100644 --- a/controllers/clusterresources/azurevnetpeering_controller.go +++ b/controllers/clusterresources/azurevnetpeering_controller.go @@ -27,15 +27,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -66,21 +67,19 @@ func (r *AzureVNetPeeringReconciler) Reconcile(ctx context.Context, req ctrl.Req if err != nil { if k8serrors.IsNotFound(err) { l.Error(err, "Azure VNet Peering resource is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "unable to fetch Azure VNet Peering", "request", req) - return models.ReconcileRequeue, err + return ctrl.Result{}, err } switch azure.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreatePeering(ctx, azure, l), nil - + return r.handleCreatePeering(ctx, azure, l) case models.UpdatingEvent: - return r.handleUpdatePeering(ctx, azure, &l), nil - + return r.handleUpdatePeering(ctx, azure, &l) case models.DeletingEvent: - return r.handleDeletePeering(ctx, azure, &l), nil + return r.handleDeletePeering(ctx, azure, &l) default: l.Info("event isn't handled", "Azure Subscription ID", azure.Spec.PeerSubscriptionID, @@ -89,7 +88,7 @@ func (r *AzureVNetPeeringReconciler) Reconcile(ctx context.Context, req ctrl.Req "Vnet Name", azure.Spec.PeerVirtualNetworkName, "Request", req, "event", azure.Annotations[models.ResourceStateAnnotation]) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } } @@ -97,7 +96,7 @@ func (r *AzureVNetPeeringReconciler) handleCreatePeering( ctx context.Context, azure *v1beta1.AzureVNetPeering, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { if azure.Status.ID == "" { l.Info( "Creating Azure VNet Peering resource", @@ -118,7 +117,7 @@ func (r *AzureVNetPeeringReconciler) handleCreatePeering( "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -142,7 +141,7 @@ func (r *AzureVNetPeeringReconciler) handleCreatePeering( "Resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } controllerutil.AddFinalizer(azure, models.DeletionFinalizer) @@ -161,7 +160,7 @@ func (r *AzureVNetPeeringReconciler) handleCreatePeering( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info( @@ -182,7 +181,7 @@ func (r *AzureVNetPeeringReconciler) handleCreatePeering( "Resource status check job is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -190,24 +189,24 @@ func (r *AzureVNetPeeringReconciler) handleCreatePeering( "Resource status check job is started", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *AzureVNetPeeringReconciler) handleUpdatePeering( ctx context.Context, azure *v1beta1.AzureVNetPeering, l *logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { l.Info("Update is not implemented") - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *AzureVNetPeeringReconciler) handleDeletePeering( ctx context.Context, azure *v1beta1.AzureVNetPeering, l *logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { status, err := r.API.GetPeeringStatus(azure.Status.ID, instaclustr.AzurePeeringEndpoint) if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error( @@ -222,7 +221,7 @@ func (r *AzureVNetPeeringReconciler) handleDeletePeering( "Resource fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } if status != nil { @@ -241,14 +240,14 @@ func (r *AzureVNetPeeringReconciler) handleDeletePeering( "Resource deletion on the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( azure, models.Normal, models.DeletionStarted, "Resource deletion request is sent", ) - return models.ReconcileRequeue + return ctrl.Result{}, err } patch := azure.NewPatch() @@ -268,7 +267,7 @@ func (r *AzureVNetPeeringReconciler) handleDeletePeering( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Azure VNet Peering has been deleted", @@ -285,7 +284,7 @@ func (r *AzureVNetPeeringReconciler) handleDeletePeering( "Resource is deleted", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *AzureVNetPeeringReconciler) startAzureVNetPeeringStatusJob(azurePeering *v1beta1.AzureVNetPeering, @@ -370,6 +369,8 @@ func (r *AzureVNetPeeringReconciler) handleExternalDelete(ctx context.Context, k // SetupWithManager sets up the controller with the Manager. func (r *AzureVNetPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.AzureVNetPeering{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { event.Object.SetAnnotations(map[string]string{models.ResourceStateAnnotation: models.CreatingEvent}) diff --git a/controllers/clusterresources/cassandrauser_controller.go b/controllers/clusterresources/cassandrauser_controller.go index 02991b0e5..48c0184e7 100644 --- a/controllers/clusterresources/cassandrauser_controller.go +++ b/controllers/clusterresources/cassandrauser_controller.go @@ -26,12 +26,14 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" ) // CassandraUserReconciler reconciles a CassandraUser object @@ -61,11 +63,11 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques if k8sErrors.IsNotFound(err) { l.Info("Cassandra user resource is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Cannot fetch Cassandra user resource", "request", req) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } s := &k8sCore.Secret{} @@ -78,14 +80,14 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques l.Info("Cassandra user secret is not found", "request", req) r.EventRecorder.Event(u, models.Warning, models.NotFound, "Secret is not found, please create a new secret or set an actual reference") - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Error(err, "Cannot get Cassandra user secret", "request", req) r.EventRecorder.Eventf(u, models.Warning, models.NotFound, "Cannot get user secret. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } username, password, err := getUserCreds(s) @@ -96,7 +98,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques r.EventRecorder.Eventf(u, models.Warning, models.CreatingEvent, "Cannot get the Cassandra user credentials from the secret. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } if controllerutil.AddFinalizer(s, u.GetDeletionFinalizer()) { @@ -106,7 +108,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques "secret name", s.Name, "secret namespace", s.Namespace) r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Update secret with deletion finalizer has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -117,7 +119,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques l.Error(err, "Cannot patch Cassandra user with deletion finalizer") r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Patching Cassandra user with deletion finalizer has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -132,7 +134,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques u, models.Warning, models.CreationFailed, "Cannot check if user exists. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } if !exists { @@ -144,7 +146,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques r.EventRecorder.Eventf(u, models.Warning, models.CreatingEvent, "Cannot create user. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -157,7 +159,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("User has been created", "username", username, "cluster ID", clusterID) @@ -178,7 +180,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques u, models.Warning, models.CreationFailed, "Cannot check if user exists. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } if exists { @@ -188,7 +190,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques r.EventRecorder.Eventf(u, models.Warning, models.DeletingEvent, "Cannot delete user. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -206,7 +208,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } continue @@ -220,7 +222,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques "cluster ID", clusterID) r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Detaching clusterID from the Cassandra user resource has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("Cassandra user has been detached from the cluster", "cluster ID", clusterID) @@ -234,7 +236,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques l.Error(models.ErrUserStillExist, instaclustr.MsgDeleteUser) r.EventRecorder.Event(u, models.Warning, models.DeletingEvent, instaclustr.MsgDeleteUser) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } controllerutil.RemoveFinalizer(s, u.GetDeletionFinalizer()) @@ -243,7 +245,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques l.Error(err, "Cannot delete finalizer from the user's secret") r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Deleting finalizer from the user's secret has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } controllerutil.RemoveFinalizer(u, u.GetDeletionFinalizer()) @@ -252,19 +254,21 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques l.Error(err, "Cannot delete finalizer from the Cassandra user resource") r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Deleting finalizer from the OpenSearch user resource has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("The user resource has been deleted") - return models.ExitReconcile, nil + return ctrl.Result{}, nil } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *CassandraUserReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.CassandraUser{}). Complete(r) } diff --git a/controllers/clusterresources/clusterbackup_controller.go b/controllers/clusterresources/clusterbackup_controller.go index cbd72f2c6..aa25574f9 100644 --- a/controllers/clusterresources/clusterbackup_controller.go +++ b/controllers/clusterresources/clusterbackup_controller.go @@ -26,6 +26,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" @@ -34,6 +35,7 @@ import ( "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" ) // ClusterBackupReconciler reconciles a ClusterBackup object @@ -65,14 +67,14 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques "resource name", req.NamespacedName, ) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } logger.Error(err, "Cannot get cluster backup", "backup name", req.NamespacedName, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } patch := backup.NewPatch() @@ -94,7 +96,7 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -110,7 +112,7 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques "Fetch resource from the k8s cluster is failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } clusterKind := models.ClusterKindsMap[backup.Spec.ClusterKind] @@ -130,7 +132,7 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques "Fetch resource from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } iBackupEvents := iBackup.GetBackupEvents(backup.Spec.ClusterKind) @@ -148,7 +150,7 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -174,7 +176,7 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques "Start timestamp annotation convertion to int is failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } err = r.Status().Patch(ctx, backup, patch) @@ -188,7 +190,7 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques "Resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -209,7 +211,7 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } logger.Info("Cluster backup resource was reconciled", @@ -217,7 +219,7 @@ func (r *ClusterBackupReconciler) Reconcile(ctx context.Context, req ctrl.Reques "cluster ID", backup.Spec.ClusterID, ) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } func (r *ClusterBackupReconciler) listClusterBackups(ctx context.Context, clusterID, namespace string) (*v1beta1.ClusterBackupList, error) { @@ -237,6 +239,8 @@ func (r *ClusterBackupReconciler) listClusterBackups(ctx context.Context, cluste // SetupWithManager sets up the controller with the Manager. func (r *ClusterBackupReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.ClusterBackup{}, builder.WithPredicates(predicate.Funcs{ UpdateFunc: func(event event.UpdateEvent) bool { return false diff --git a/controllers/clusterresources/clusternetworkfirewallrule_controller.go b/controllers/clusterresources/clusternetworkfirewallrule_controller.go index 2e8afe312..fa145717c 100644 --- a/controllers/clusterresources/clusternetworkfirewallrule_controller.go +++ b/controllers/clusterresources/clusternetworkfirewallrule_controller.go @@ -27,15 +27,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -71,40 +72,37 @@ func (r *ClusterNetworkFirewallRuleReconciler) Reconcile(ctx context.Context, re l.Info("Cluster network firewall rule resource is not found", "resource name", req.NamespacedName, ) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Unable to fetch cluster network firewall rule") - return models.ReconcileRequeue, err + return ctrl.Result{}, err } switch firewallRule.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - reconcileResult := r.HandleCreateFirewallRule(ctx, firewallRule, &l) - return reconcileResult, nil + return r.HandleCreateFirewallRule(ctx, firewallRule, &l) case models.UpdatingEvent: - reconcileResult := r.HandleUpdateFirewallRule(ctx, firewallRule, &l) - return reconcileResult, nil + return r.HandleUpdateFirewallRule(ctx, firewallRule, &l) case models.DeletingEvent: - reconcileResult := r.HandleDeleteFirewallRule(ctx, firewallRule, &l) - return reconcileResult, nil + return r.HandleDeleteFirewallRule(ctx, firewallRule, &l) case models.GenericEvent: l.Info("Cluster network firewall rule event isn't handled", "cluster ID", firewallRule.Spec.ClusterID, "type", firewallRule.Spec.Type, "request", req, "event", firewallRule.Annotations[models.ResourceStateAnnotation]) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } func (r *ClusterNetworkFirewallRuleReconciler) HandleCreateFirewallRule( ctx context.Context, firewallRule *v1beta1.ClusterNetworkFirewallRule, l *logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { if firewallRule.Status.ID == "" { l.Info( "Creating cluster network firewall rule", @@ -125,7 +123,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleCreateFirewallRule( "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -143,7 +141,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleCreateFirewallRule( "Resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } firewallRule.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent @@ -160,7 +158,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleCreateFirewallRule( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info( @@ -179,7 +177,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleCreateFirewallRule( "Resource status job creation is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -187,27 +185,27 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleCreateFirewallRule( "Resource status check job is started", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *ClusterNetworkFirewallRuleReconciler) HandleUpdateFirewallRule( ctx context.Context, firewallRule *v1beta1.ClusterNetworkFirewallRule, l *logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { l.Info("Cluster network firewall rule update is not implemented", "firewall rule ID", firewallRule.Spec.ClusterID, "type", firewallRule.Spec.Type, ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( ctx context.Context, firewallRule *v1beta1.ClusterNetworkFirewallRule, l *logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { patch := firewallRule.NewPatch() err := r.Patch(ctx, firewallRule, patch) if err != nil { @@ -220,7 +218,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } status, err := r.API.GetFirewallRuleStatus(firewallRule.Status.ID, instaclustr.ClusterNetworkFirewallRuleEndpoint) @@ -235,7 +233,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( "Fetch resource from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } if status != nil && status.Status != statusDELETED { @@ -251,7 +249,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( "Resource deletion on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -275,7 +273,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Cluster network firewall rule has been deleted", @@ -289,7 +287,7 @@ func (r *ClusterNetworkFirewallRuleReconciler) HandleDeleteFirewallRule( "Resource is deleted", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *ClusterNetworkFirewallRuleReconciler) startFirewallRuleStatusJob(firewallRule *v1beta1.ClusterNetworkFirewallRule) error { @@ -330,6 +328,8 @@ func (r *ClusterNetworkFirewallRuleReconciler) newWatchStatusJob(firewallRule *v // SetupWithManager sets up the controller with the Manager. func (r *ClusterNetworkFirewallRuleReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.ClusterNetworkFirewallRule{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { if event.Object.GetDeletionTimestamp() != nil { diff --git a/controllers/clusterresources/exclusionwindow_controller.go b/controllers/clusterresources/exclusionwindow_controller.go index d2600c2d2..c46b07f7e 100644 --- a/controllers/clusterresources/exclusionwindow_controller.go +++ b/controllers/clusterresources/exclusionwindow_controller.go @@ -27,15 +27,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" ) // ExclusionWindowReconciler reconciles a ExclusionWindow object @@ -67,25 +68,25 @@ func (r *ExclusionWindowReconciler) Reconcile(ctx context.Context, req ctrl.Requ if err != nil { if k8serrors.IsNotFound(err) { l.Info("Exclusion Window resource is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Cannot get Exclusion Window resource", "request", req) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } switch ew.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateWindow(ctx, ew, l), nil + return r.handleCreateWindow(ctx, ew, l) case models.DeletingEvent: - return r.handleDeleteWindow(ctx, ew, l), nil + return r.handleDeleteWindow(ctx, ew, l) default: l.Info("event isn't handled", "Cluster ID", ew.Spec.ClusterID, "Exclusion Window Spec", ew.Spec, "Request", req, "event", ew.Annotations[models.ResourceStateAnnotation]) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } } @@ -93,7 +94,7 @@ func (r *ExclusionWindowReconciler) handleCreateWindow( ctx context.Context, ew *v1beta1.ExclusionWindow, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { if ew.Status.ID == "" { l.Info( "Creating Exclusion Window resource", @@ -112,7 +113,7 @@ func (r *ExclusionWindowReconciler) handleCreateWindow( "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( ew, models.Normal, models.Created, @@ -134,7 +135,7 @@ func (r *ExclusionWindowReconciler) handleCreateWindow( "Status patch is failed after resource creation. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } controllerutil.AddFinalizer(ew, models.DeletionFinalizer) @@ -151,7 +152,7 @@ func (r *ExclusionWindowReconciler) handleCreateWindow( "Resource patch is failed after resource creation. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info( @@ -160,14 +161,15 @@ func (r *ExclusionWindowReconciler) handleCreateWindow( "Exclusion Window Spec", ew.Spec, ) } - return models.ExitReconcile + + return ctrl.Result{}, nil } func (r *ExclusionWindowReconciler) handleDeleteWindow( ctx context.Context, ew *v1beta1.ExclusionWindow, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { status, err := r.API.GetExclusionWindowsStatus(ew.Status.ID) if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error( @@ -180,7 +182,7 @@ func (r *ExclusionWindowReconciler) handleDeleteWindow( "Resource fetch from the Instaclustr API is failed while deletion. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } if status != "" { @@ -196,7 +198,7 @@ func (r *ExclusionWindowReconciler) handleDeleteWindow( "Resource deletion on the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( ew, models.Normal, models.DeletionStarted, @@ -219,7 +221,7 @@ func (r *ExclusionWindowReconciler) handleDeleteWindow( "Resource patch is failed while deletion. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("Exclusion Window has been deleted", @@ -233,12 +235,14 @@ func (r *ExclusionWindowReconciler) handleDeleteWindow( "Resource is deleted", ) - return models.ExitReconcile + return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *ExclusionWindowReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.ExclusionWindow{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { event.Object.SetAnnotations(map[string]string{models.ResourceStateAnnotation: models.CreatingEvent}) diff --git a/controllers/clusterresources/gcpvpcpeering_controller.go b/controllers/clusterresources/gcpvpcpeering_controller.go index f2ce92a0d..e8c16c5cc 100644 --- a/controllers/clusterresources/gcpvpcpeering_controller.go +++ b/controllers/clusterresources/gcpvpcpeering_controller.go @@ -27,15 +27,16 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -66,28 +67,26 @@ func (r *GCPVPCPeeringReconciler) Reconcile(ctx context.Context, req ctrl.Reques if err != nil { if k8serrors.IsNotFound(err) { l.Error(err, "GCP VPC Peering resource is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Unable to fetch GCP VPC Peering", "request", req) - return models.ReconcileRequeue, err + return ctrl.Result{}, err } switch gcp.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: - return r.handleCreateCluster(ctx, gcp, l), nil - + return r.handleCreateCluster(ctx, gcp, l) case models.UpdatingEvent: - return r.handleUpdateCluster(ctx, gcp, l), nil - + return r.handleUpdateCluster(ctx, gcp, l) case models.DeletingEvent: - return r.handleDeleteCluster(ctx, gcp, l), nil + return r.handleDeleteCluster(ctx, gcp, l) default: l.Info("Event isn't handled", "project ID", gcp.Spec.PeerProjectID, "network name", gcp.Spec.PeerVPCNetworkName, "request", req, "event", gcp.Annotations[models.ResourceStateAnnotation]) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } } @@ -95,7 +94,7 @@ func (r *GCPVPCPeeringReconciler) handleCreateCluster( ctx context.Context, gcp *v1beta1.GCPVPCPeering, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { if gcp.Status.ID == "" { l.Info( "Creating GCP VPC Peering resource", @@ -114,7 +113,7 @@ func (r *GCPVPCPeeringReconciler) handleCreateCluster( "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -137,7 +136,7 @@ func (r *GCPVPCPeeringReconciler) handleCreateCluster( "Resource status patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } controllerutil.AddFinalizer(gcp, models.DeletionFinalizer) @@ -154,7 +153,7 @@ func (r *GCPVPCPeeringReconciler) handleCreateCluster( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info( @@ -173,7 +172,7 @@ func (r *GCPVPCPeeringReconciler) handleCreateCluster( "Resource status check job is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -181,24 +180,24 @@ func (r *GCPVPCPeeringReconciler) handleCreateCluster( "Resource status check job is started", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *GCPVPCPeeringReconciler) handleUpdateCluster( ctx context.Context, gcp *v1beta1.GCPVPCPeering, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { l.Info("Update is not implemented") - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *GCPVPCPeeringReconciler) handleDeleteCluster( ctx context.Context, gcp *v1beta1.GCPVPCPeering, l logr.Logger, -) reconcile.Result { +) (ctrl.Result, error) { status, err := r.API.GetPeeringStatus(gcp.Status.ID, instaclustr.GCPPeeringEndpoint) if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error( @@ -212,7 +211,7 @@ func (r *GCPVPCPeeringReconciler) handleDeleteCluster( "Resource fetch from the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } if status != nil { @@ -229,7 +228,7 @@ func (r *GCPVPCPeeringReconciler) handleDeleteCluster( "Resource deletion on the Instaclustr API is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } r.EventRecorder.Eventf( gcp, models.Normal, models.DeletionStarted, @@ -254,7 +253,7 @@ func (r *GCPVPCPeeringReconciler) handleDeleteCluster( "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue + return ctrl.Result{}, err } l.Info("GCP VPC Peering has been deleted", @@ -270,7 +269,7 @@ func (r *GCPVPCPeeringReconciler) handleDeleteCluster( "Resource is deleted", ) - return models.ExitReconcile + return ctrl.Result{}, nil } func (r *GCPVPCPeeringReconciler) startGCPVPCPeeringStatusJob(gcpPeering *v1beta1.GCPVPCPeering) error { @@ -353,6 +352,8 @@ func (r *GCPVPCPeeringReconciler) handleExternalDelete(ctx context.Context, key // SetupWithManager sets up the controller with the Manager. func (r *GCPVPCPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.GCPVPCPeering{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(event event.CreateEvent) bool { event.Object.SetAnnotations(map[string]string{models.ResourceStateAnnotation: models.CreatingEvent}) diff --git a/controllers/clusterresources/maintenanceevents_controller.go b/controllers/clusterresources/maintenanceevents_controller.go index 01eca05d5..199dfb5a9 100644 --- a/controllers/clusterresources/maintenanceevents_controller.go +++ b/controllers/clusterresources/maintenanceevents_controller.go @@ -25,6 +25,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -32,6 +33,7 @@ import ( "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" "github.com/instaclustr/operator/pkg/scheduler" ) @@ -67,12 +69,12 @@ func (r *MaintenanceEventsReconciler) Reconcile(ctx context.Context, req ctrl.Re l.Info("Maintenance Event resource is not found", "request", req, ) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Cannot get Maintenance Event resource", "request", req, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } if len(me.Spec.MaintenanceEventsReschedules) == 0 { @@ -87,7 +89,7 @@ func (r *MaintenanceEventsReconciler) Reconcile(ctx context.Context, req ctrl.Re "Resource deletion is failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } r.EventRecorder.Eventf( me, models.Normal, models.DeletionStarted, @@ -97,7 +99,7 @@ func (r *MaintenanceEventsReconciler) Reconcile(ctx context.Context, req ctrl.Re "Maintenance Events were rescheduled, resource was deleted", "Maintenance Events status", me.Status, ) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } if me.Status.CurrentRescheduledEvent.MaintenanceEventID == "" { @@ -117,7 +119,7 @@ func (r *MaintenanceEventsReconciler) Reconcile(ctx context.Context, req ctrl.Re "Resource creation on the Instaclustr is failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } r.EventRecorder.Eventf( @@ -138,16 +140,18 @@ func (r *MaintenanceEventsReconciler) Reconcile(ctx context.Context, req ctrl.Re "Resource patch is failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *MaintenanceEventsReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.MaintenanceEvents{}, builder.WithPredicates(predicate.Funcs{ UpdateFunc: func(event event.UpdateEvent) bool { return !(event.ObjectNew.GetGeneration() == event.ObjectOld.GetGeneration()) diff --git a/controllers/clusterresources/nodereload_controller.go b/controllers/clusterresources/nodereload_controller.go index cda6e6d85..2042d627f 100644 --- a/controllers/clusterresources/nodereload_controller.go +++ b/controllers/clusterresources/nodereload_controller.go @@ -30,7 +30,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" @@ -68,10 +67,10 @@ func (r *NodeReloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { if k8serrors.IsNotFound(err) { l.Error(err, "Node Reload resource is not found", "request", req) - return reconcile.Result{}, nil + return ctrl.Result{}, nil } l.Error(err, "Unable to fetch Node Reload", "request", req) - return reconcile.Result{}, err + return ctrl.Result{}, err } patch := nrs.NewPatch() @@ -85,7 +84,7 @@ func (r *NodeReloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) "Failed to patch pending nodes to the resource. Reason: %w", err, ) - return reconcile.Result{}, err + return ctrl.Result{}, err } } @@ -100,7 +99,7 @@ func (r *NodeReloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) "failed nodes", nrs.Status.FailedNodes, ) - return reconcile.Result{}, nil + return ctrl.Result{}, nil } if nrs.Status.NodeInProgress == nil { @@ -117,7 +116,7 @@ func (r *NodeReloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) "Failed to trigger node reload. Reason: %w", err, ) - return reconcile.Result{}, err + return ctrl.Result{}, err } nrs.Status.NodeInProgress = nodeInProgress @@ -134,7 +133,7 @@ func (r *NodeReloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) "Failed to patch node in progress. Reason: %w", err, ) - return reconcile.Result{}, err + return ctrl.Result{}, err } } @@ -151,7 +150,7 @@ func (r *NodeReloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) "Failed to fetch node reload status from Instaclustr. Reason: %w", err, ) - return reconcile.Result{}, err + return ctrl.Result{}, err } nrs.Status.CurrentOperationStatus = &v1beta1.Operation{ @@ -171,7 +170,7 @@ func (r *NodeReloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) "Failed to patch current operation status. Reason: %w", err, ) - return reconcile.Result{}, err + return ctrl.Result{}, err } if nodeReloadStatus.Status != nodeReloadOperationStatusCompleted { @@ -201,13 +200,13 @@ func (r *NodeReloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) "Failed to patch completed nodes. Reason: %w", err, ) - return reconcile.Result{}, err + return ctrl.Result{}, err } return models.ImmediatelyRequeue, nil } -func (r *NodeReloadReconciler) handleNodeNotFound(ctx context.Context, node *v1beta1.Node, nrs *v1beta1.NodeReload) (reconcile.Result, error) { +func (r *NodeReloadReconciler) handleNodeNotFound(ctx context.Context, node *v1beta1.Node, nrs *v1beta1.NodeReload) (ctrl.Result, error) { l := log.FromContext(ctx) patch := nrs.NewPatch() @@ -226,7 +225,7 @@ func (r *NodeReloadReconciler) handleNodeNotFound(ctx context.Context, node *v1b "Cannot patch failed node", ) - return reconcile.Result{}, err + return ctrl.Result{}, err } l.Error(err, "Node is not found on the Instaclustr side", diff --git a/controllers/clusterresources/opensearchuser_controller.go b/controllers/clusterresources/opensearchuser_controller.go index 3afcf88bb..19a1dabf2 100644 --- a/controllers/clusterresources/opensearchuser_controller.go +++ b/controllers/clusterresources/opensearchuser_controller.go @@ -28,12 +28,14 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" clusterresourcesv1beta1 "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" ) // OpenSearchUserReconciler reconciles a OpenSearchUser object @@ -66,12 +68,12 @@ func (r *OpenSearchUserReconciler) Reconcile( logger.Info("OpenSearch user resource is not found", "request", req, ) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } logger.Error(err, "Cannot fetch OpenSearch user resource", "request", req, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } secret := &k8sCore.Secret{} @@ -88,7 +90,7 @@ func (r *OpenSearchUserReconciler) Reconcile( user, models.Warning, models.NotFound, "Secret is not found, please create a new secret or set an actual reference", ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } logger.Error(err, "Cannot get OpenSearch user's secret") @@ -97,7 +99,7 @@ func (r *OpenSearchUserReconciler) Reconcile( "User's secret fetching has been failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } patch := client.MergeFrom(secret.DeepCopy()) @@ -109,7 +111,7 @@ func (r *OpenSearchUserReconciler) Reconcile( user, models.Warning, models.PatchFailed, "Patching secret with deletion finalizer has been failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -122,7 +124,7 @@ func (r *OpenSearchUserReconciler) Reconcile( user, models.Warning, models.PatchFailed, "Patching OpenSearch user with deletion finalizer has been failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -147,7 +149,7 @@ func (r *OpenSearchUserReconciler) Reconcile( } if errorOccurred { - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } if user.DeletionTimestamp != nil { @@ -155,7 +157,7 @@ func (r *OpenSearchUserReconciler) Reconcile( logger.Error(models.ErrUserStillExist, instaclustr.MsgDeleteUser) r.EventRecorder.Event(user, models.Warning, models.DeletingEvent, instaclustr.MsgDeleteUser) - return models.ExitReconcile, nil + return ctrl.Result{}, err } patch = client.MergeFrom(secret.DeepCopy()) @@ -167,7 +169,7 @@ func (r *OpenSearchUserReconciler) Reconcile( user, models.Warning, models.PatchFailed, "Deleting finalizer from the user's secret has been failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } patch = user.NewPatch() @@ -179,13 +181,13 @@ func (r *OpenSearchUserReconciler) Reconcile( user, models.Warning, models.PatchFailed, "Deleting finalizer from the OpenSearch user resource has been failed. Reason: %v", err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } logger.Info("The user resource has been deleted") } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } func (r *OpenSearchUserReconciler) createUser( @@ -366,6 +368,8 @@ func (r *OpenSearchUserReconciler) detachUserFromDeletedCluster( // SetupWithManager sets up the controller with the Manager. func (r *OpenSearchUserReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&clusterresourcesv1beta1.OpenSearchUser{}). Complete(r) } diff --git a/controllers/clusterresources/postgresqluser_controller.go b/controllers/clusterresources/postgresqluser_controller.go index 6269b4e7a..71ac7aaa0 100644 --- a/controllers/clusterresources/postgresqluser_controller.go +++ b/controllers/clusterresources/postgresqluser_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" @@ -36,6 +37,7 @@ import ( "github.com/instaclustr/operator/pkg/exposeservice" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" ) // PostgreSQLUserReconciler reconciles a PostgreSQLUser object @@ -63,11 +65,11 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque if k8sErrors.IsNotFound(err) { l.Info("PostgreSQL user resource is not found", "request", req) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Cannot fetch PostgreSQL user resource", "request", req) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } s := &k8sCore.Secret{} @@ -85,7 +87,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque l.Error(err, "Cannot get PostgreSQL user secret", "user", u.Name) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } newUsername, newPassword, err := getUserCreds(s) @@ -96,7 +98,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque r.EventRecorder.Eventf(u, models.Warning, models.CreatingEvent, "Cannot get the PostgreSQL user credentials from the secret. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } if controllerutil.AddFinalizer(s, u.GetDeletionFinalizer()) { @@ -106,7 +108,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque "secret name", s.Name, "secret namespace", s.Namespace) r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Update secret with deletion finalizer has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -117,7 +119,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque l.Error(err, "Cannot patch PostgreSQL user with deletion finalizer") r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Patching PostgreSQL user with deletion finalizer has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -132,7 +134,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque l.Info("Expose service or expose service endpoints are not created yet", "username", newUsername) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Error(err, "Cannot create a user for the PostgreSQL cluster", @@ -141,7 +143,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque r.EventRecorder.Eventf(u, models.Warning, models.CreatingEvent, "Cannot create user. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } u.Status.ClustersInfo[clusterID] = clusterresourcesv1beta1.ClusterInfo{ @@ -157,7 +159,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("User has been created", "username", newUsername) @@ -178,7 +180,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque r.EventRecorder.Eventf(u, models.Warning, models.DeletingEvent, "Cannot delete user. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("User has been deleted for cluster", "username", newUsername, @@ -195,7 +197,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } continue @@ -210,7 +212,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque "cluster ID", clusterID) r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Detaching clusterID from the PostgreSQL user resource has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("PostgreSQL user has been detached from the cluster", "cluster ID", clusterID) @@ -224,7 +226,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque l.Error(models.ErrUserStillExist, instaclustr.MsgDeleteUser) r.EventRecorder.Event(u, models.Warning, models.DeletingEvent, instaclustr.MsgDeleteUser) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } controllerutil.RemoveFinalizer(s, u.GetDeletionFinalizer()) @@ -233,7 +235,7 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque l.Error(err, "Cannot delete finalizer from the user's secret") r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Deleting finalizer from the user's secret has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } controllerutil.RemoveFinalizer(u, u.GetDeletionFinalizer()) @@ -242,15 +244,15 @@ func (r *PostgreSQLUserReconciler) Reconcile(ctx context.Context, req ctrl.Reque l.Error(err, "Cannot delete finalizer from the PostgreSQL user resource") r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, "Deleting finalizer from the PostgreSQL user resource has been failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("PostgreSQL user resource has been deleted") - return models.ExitReconcile, nil + return ctrl.Result{}, nil } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } func (r *PostgreSQLUserReconciler) createUser( @@ -454,6 +456,8 @@ func (r *PostgreSQLUserReconciler) firewallRuleExists(ctx context.Context, firew // SetupWithManager sets up the controller with the Manager. func (r *PostgreSQLUserReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&clusterresourcesv1beta1.PostgreSQLUser{}). Complete(r) } diff --git a/controllers/clusterresources/redisuser_controller.go b/controllers/clusterresources/redisuser_controller.go index 070ccf923..83d11f7f9 100644 --- a/controllers/clusterresources/redisuser_controller.go +++ b/controllers/clusterresources/redisuser_controller.go @@ -29,12 +29,14 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "github.com/instaclustr/operator/apis/clusterresources/v1beta1" "github.com/instaclustr/operator/pkg/instaclustr" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/ratelimiter" ) // RedisUserReconciler reconciles a RedisUser object @@ -68,13 +70,13 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( "request", req, ) - return models.ExitReconcile, nil + return ctrl.Result{}, nil } l.Error(err, "Cannot fetch Redis user resource", "request", req, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } secret := &k8sCore.Secret{} @@ -96,7 +98,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } username, password, err := getUserCreds(secret) @@ -105,7 +107,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.CreatingEvent, "Cannot get user. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } if controllerutil.AddFinalizer(secret, user.GetDeletionFinalizer()) { @@ -117,7 +119,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.UpdatedEvent, "Cannot assign k8s secret to a Redis user. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -128,7 +130,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( l.Error(err, "Patch is failed. Cannot set finalizer to the user") r.EventRecorder.Eventf(user, models.Warning, models.PatchFailed, "Patch is failed. Cannot set finalizer to the user. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -144,7 +146,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.CreatingEvent, "Cannot create user. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } if errors.Is(err, instaclustr.NotFound) { @@ -156,7 +158,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.CreatingEvent, "Cannot create user. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } @@ -167,7 +169,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.PatchFailed, "Resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("User has been created for a Redis cluster", "username", username) @@ -187,7 +189,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.DeletingEvent, "Cannot delete Redis user from the cluster. Cluster ID: %s. Reason: %v", clusterID, err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } l.Info("User has been deleted for cluster", "username", username, @@ -204,7 +206,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.PatchFailed, "Resource patch is failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } continue @@ -219,7 +221,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.EventRecorder.Eventf(user, models.Warning, models.DeletionFailed, "Resource detach is failed. Reason: %v", err) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } continue } @@ -240,7 +242,7 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( err, ) - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } if errors.Is(err, instaclustr.NotFound) { l.Info("Cannot update redis user password, the user doesn't exist on the given cluster", @@ -267,11 +269,11 @@ func (r *RedisUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if user.DeletionTimestamp != nil { err = r.handleDeleteUser(ctx, l, secret, user) if err != nil { - return models.ReconcileRequeue, nil + return ctrl.Result{}, err } } - return models.ExitReconcile, nil + return ctrl.Result{}, nil } func (r *RedisUserReconciler) detachUserFromDeletedCluster( @@ -375,6 +377,8 @@ func (r *RedisUserReconciler) handleDeleteUser( // SetupWithManager sets up the controller with the Manager. func (r *RedisUserReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(ratelimiter.DefaultBaseDelay, ratelimiter.DefaultMaxDelay)}). For(&v1beta1.RedisUser{}). Complete(r) }