Skip to content

Commit

Permalink
issue-559, cadence controller integration of the rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Siryk authored and Bohdan Siryk committed Oct 16, 2023
1 parent 5ed2ebd commit 3b49455
Show file tree
Hide file tree
Showing 12 changed files with 597 additions and 73 deletions.
94 changes: 52 additions & 42 deletions controllers/clusters/cadence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
"github.com/instaclustr/operator/pkg/scheduler"
)

Expand All @@ -65,52 +67,52 @@ type CadenceReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *CadenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

cadenceCluster := &v1beta1.Cadence{}
err := r.Client.Get(ctx, req.NamespacedName, cadenceCluster)
if err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("Cadence resource is not found",
"resource name", req.NamespacedName,
)
return models.ExitReconcile, nil
return reconcile.Result{}, nil
}

logger.Error(err, "Unable to fetch Cadence resource",
"resource name", req.NamespacedName,
)
return models.ReconcileRequeue, nil
return reconcile.Result{}, err
}

switch cadenceCluster.Annotations[models.ResourceStateAnnotation] {
case models.CreatingEvent:
return r.HandleCreateCluster(ctx, cadenceCluster, logger), nil
return r.HandleCreateCluster(ctx, cadenceCluster, logger)
case models.UpdatingEvent:
return r.HandleUpdateCluster(ctx, cadenceCluster, logger), nil
return r.HandleUpdateCluster(ctx, cadenceCluster, logger)
case models.DeletingEvent:
return r.HandleDeleteCluster(ctx, cadenceCluster, logger), nil
return r.HandleDeleteCluster(ctx, cadenceCluster, logger)
case models.GenericEvent:
logger.Info("Generic event isn't handled",
"request", req,
"event", cadenceCluster.Annotations[models.ResourceStateAnnotation],
)

return models.ExitReconcile, nil
return reconcile.Result{}, nil
default:
logger.Info("Unknown event isn't handled",
"request", req,
"event", cadenceCluster.Annotations[models.ResourceStateAnnotation],
)

return models.ExitReconcile, nil
return reconcile.Result{}, nil
}
}

func (r *CadenceReconciler) HandleCreateCluster(
ctx context.Context,
cadence *v1beta1.Cadence,
logger logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
fmt.Println("CREATING")
if cadence.Status.ID == "" {
patch := cadence.NewPatch()

Expand All @@ -124,7 +126,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.CreationFailed,
"Cannot prepare packaged solution for Cadence cluster. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

if requeueNeeded {
Expand All @@ -134,7 +136,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Event(cadence, models.Normal, "Waiting",
"Waiting for bundled clusters to be created")

return models.ReconcileRequeue
return models.ReconcileRequeue, nil
}
}

Expand All @@ -152,7 +154,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.ConvertionFailed,
"Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

id, err := r.API.CreateCluster(instaclustr.CadenceEndpoint, cadenceAPISpec)
Expand All @@ -164,8 +166,9 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.CreationFailed,
"Cluster creation on the Instaclustr is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}
fmt.Println("r.API.CreateCluster", id)

cadence.Status.ID = id
err = r.Status().Patch(ctx, cadence, patch)
Expand All @@ -178,7 +181,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.PatchFailed,
"Cluster resource status patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

if cadence.Spec.Description != "" {
Expand All @@ -192,6 +195,8 @@ func (r *CadenceReconciler) HandleCreateCluster(

r.EventRecorder.Eventf(cadence, models.Warning, models.CreationFailed,
"Cluster description and TwoFactoDelete update is failed. Reason: %v", err)

return reconcile.Result{}, err
}
}

Expand All @@ -206,7 +211,7 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.PatchFailed,
"Cluster resource status patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

logger.Info(
Expand All @@ -232,21 +237,21 @@ func (r *CadenceReconciler) HandleCreateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.CreationFailed,
"Cluster status check job is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(cadence, models.Normal, models.Created,
"Cluster status check job is started")
}

return models.ExitReconcile
return reconcile.Result{}, nil
}

func (r *CadenceReconciler) HandleUpdateCluster(
ctx context.Context,
cadence *v1beta1.Cadence,
logger logr.Logger,
) reconcile.Result {
) (ctrl.Result, error) {
iData, err := r.API.GetCadence(cadence.Status.ID)
if err != nil {
logger.Error(
Expand All @@ -258,7 +263,7 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.FetchFailed,
"Cluster fetch from the Instaclustr API is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

iCadence, err := cadence.FromInstAPI(iData)
Expand All @@ -272,7 +277,7 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.ConvertionFailed,
"Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

if iCadence.Status.CurrentClusterOperationStatus != models.NoOperation {
Expand All @@ -294,10 +299,10 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

return models.ReconcileRequeue
return models.ReconcileRequeue, nil
}

if cadence.Annotations[models.ExternalChangesAnnotation] == models.True {
Expand All @@ -315,7 +320,7 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.UpdateFailed,
"Cluster description and TwoFactoDelete update is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

logger.Info("Update request to Instaclustr API has been sent",
Expand All @@ -332,7 +337,7 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.UpdateFailed,
"Cluster update on the Instaclustr API is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

patch := cadence.NewPatch()
Expand All @@ -347,7 +352,7 @@ func (r *CadenceReconciler) HandleUpdateCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

logger.Info(
Expand All @@ -357,10 +362,10 @@ func (r *CadenceReconciler) HandleUpdateCluster(
"data centres", cadence.Spec.DataCentres,
)

return models.ExitReconcile
return reconcile.Result{}, nil
}

func (r *CadenceReconciler) handleExternalChanges(cadence, iCadence *v1beta1.Cadence, l logr.Logger) reconcile.Result {
func (r *CadenceReconciler) handleExternalChanges(cadence, iCadence *v1beta1.Cadence, l logr.Logger) (reconcile.Result, error) {
if !cadence.Spec.AreDCsEqual(iCadence.Spec.DataCentres) {
l.Info(msgExternalChanges,
"instaclustr data", iCadence.Spec.DataCentres,
Expand All @@ -370,11 +375,11 @@ func (r *CadenceReconciler) handleExternalChanges(cadence, iCadence *v1beta1.Cad
if err != nil {
l.Error(err, "Cannot create specification difference message",
"instaclustr data", iCadence.Spec, "k8s resource spec", cadence.Spec)
return models.ExitReconcile
return reconcile.Result{}, err
}
r.EventRecorder.Eventf(cadence, models.Warning, models.ExternalChanges, msgDiffSpecs)

return models.ExitReconcile
return reconcile.Result{}, nil
}

patch := cadence.NewPatch()
Expand All @@ -389,20 +394,20 @@ func (r *CadenceReconciler) handleExternalChanges(cadence, iCadence *v1beta1.Cad
r.EventRecorder.Eventf(cadence, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

l.Info("External changes have been reconciled", "resource ID", cadence.Status.ID)
r.EventRecorder.Event(cadence, models.Normal, models.ExternalChanges, "External changes have been reconciled")

return models.ExitReconcile
return reconcile.Result{}, nil
}

func (r *CadenceReconciler) HandleDeleteCluster(
ctx context.Context,
cadence *v1beta1.Cadence,
logger logr.Logger,
) reconcile.Result {
) (reconcile.Result, error) {
_, err := r.API.GetCadence(cadence.Status.ID)
if err != nil && !errors.Is(err, instaclustr.NotFound) {
logger.Error(
Expand All @@ -414,7 +419,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.FetchFailed,
"Cluster resource fetch from the Instaclustr API is failed. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

if !errors.Is(err, instaclustr.NotFound) {
Expand All @@ -432,7 +437,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.DeletionFailed,
"Cluster deletion is failed on the Instaclustr. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

r.EventRecorder.Event(cadence, models.Normal, models.DeletionStarted,
Expand All @@ -452,18 +457,16 @@ func (r *CadenceReconciler) HandleDeleteCluster(
"Cluster resource patch is failed. Reason: %v",
err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

logger.Info(msgDeleteClusterWithTwoFactorDelete, "cluster ID", cadence.Status.ID)

r.EventRecorder.Event(cadence, models.Normal, models.DeletionStarted,
"Two-Factor Delete is enabled, please confirm cluster deletion via email or phone.")

return models.ExitReconcile
return reconcile.Result{}, nil
}

return models.ReconcileRequeue
}

logger.Info("Cadence cluster is being deleted",
Expand All @@ -482,7 +485,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(
r.EventRecorder.Eventf(cadence, models.Warning, models.DeletionFailed,
"Cannot delete Cadence packaged resources. Reason: %v", err)

return models.ReconcileRequeue
return reconcile.Result{}, err
}
}

Expand All @@ -497,7 +500,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(
"cluster name", cadence.Spec.Name,
"patch", patch,
)
return models.ReconcileRequeue
return reconcile.Result{}, err
}

err = exposeservice.Delete(r.Client, cadence.Name, cadence.Namespace)
Expand All @@ -507,7 +510,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(
"cluster name", cadence.Spec.Name,
)

return models.ReconcileRequeue
return reconcile.Result{}, err
}

logger.Info("Cadence cluster was deleted",
Expand All @@ -517,7 +520,7 @@ func (r *CadenceReconciler) HandleDeleteCluster(

r.EventRecorder.Event(cadence, models.Normal, models.Deleted, "Cluster resource is deleted")

return models.ExitReconcile
return reconcile.Result{}, nil
}

func (r *CadenceReconciler) preparePackagedSolution(
Expand Down Expand Up @@ -849,6 +852,7 @@ func (r *CadenceReconciler) newWatchStatusJob(cadence *v1beta1.Cadence) schedule
}

if iCadence.Status.CurrentClusterOperationStatus == models.NoOperation &&
cadence.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent &&
cadence.Annotations[models.UpdateQueuedAnnotation] != models.True &&
!cadence.Spec.AreDCsEqual(iCadence.Spec.DataCentres) {
l.Info(msgExternalChanges,
Expand Down Expand Up @@ -1173,6 +1177,12 @@ func areSecondaryCadenceTargetsEqual(k8sTargets, iTargets []*v1beta1.TargetCaden
// SetupWithManager sets up the controller with the Manager.
func (r *CadenceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(
ratelimiter.DefaultBaseDelay,
ratelimiter.DefaultMaxDelay,
),
}).
For(&v1beta1.Cadence{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
if deleting := confirmDeletion(event.Object); deleting {
Expand Down
Loading

0 comments on commit 3b49455

Please sign in to comment.