Skip to content

Commit

Permalink
external changes on several failed updates is implemented for all clu…
Browse files Browse the repository at this point in the history
…sters
  • Loading branch information
ribaraka committed Jan 26, 2024
1 parent b0b9418 commit e9e3365
Show file tree
Hide file tree
Showing 21 changed files with 216 additions and 601 deletions.
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/cadence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,12 @@ func (cs *CadenceSpec) PrivateLinkFromInstAPI(iPLs []*models.PrivateLink) (pls [
return
}

func (c *Cadence) GetSpec() CadenceSpec { return c.Spec }

func (c *Cadence) IsSpecEqual(spec CadenceSpec) bool {
return c.Spec.IsEqual(spec)
}

func (cs *CadenceSpec) IsEqual(spec CadenceSpec) bool {
return cs.AreDCsEqual(spec.DataCentres)
}
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/cassandra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ func (cs *CassandraSpec) IsEqual(spec CassandraSpec) bool {
cs.BundledUseOnly == spec.BundledUseOnly
}

func (c *Cassandra) GetSpec() CassandraSpec { return c.Spec }

func (c *Cassandra) IsSpecEqual(spec CassandraSpec) bool {
return c.Spec.IsEqual(spec)
}

func (cs *CassandraSpec) AreDCsEqual(dcs []*CassandraDataCentre) bool {
if len(cs.DataCentres) != len(dcs) {
return false
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,12 @@ func (a *KafkaSpec) IsEqual(b KafkaSpec) bool {
a.IsTwoFactorDeleteEqual(b.TwoFactorDelete)
}

func (c *Kafka) GetSpec() KafkaSpec { return c.Spec }

func (c *Kafka) IsSpecEqual(spec KafkaSpec) bool {
return c.Spec.IsEqual(spec)
}

func (rs *KafkaSpec) areDCsEqual(b []*KafkaDataCentre) bool {
a := rs.DataCentres
if len(a) != len(b) {
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/kafkaconnect_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ func (ks *KafkaConnectStatus) DCsFromInstAPI(iDCs []*models.KafkaConnectDataCent
return
}

func (c *KafkaConnect) GetSpec() KafkaConnectSpec { return c.Spec }

func (c *KafkaConnect) IsSpecEqual(spec KafkaConnectSpec) bool {
return c.Spec.IsEqual(spec)
}

func (ks *KafkaConnectSpec) IsEqual(kc KafkaConnectSpec) bool {
return ks.Cluster.IsEqual(kc.Cluster) &&
ks.AreDataCentresEqual(kc.DataCentres) &&
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/opensearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,12 @@ func (oss *OpenSearchStatus) DCsFromInstAPI(iDCs []*models.OpenSearchDataCentre)
return
}

func (c *OpenSearch) GetSpec() OpenSearchSpec { return c.Spec }

func (c *OpenSearch) IsSpecEqual(spec OpenSearchSpec) bool {
return c.Spec.IsEqual(spec)
}

func (a *OpenSearchSpec) IsEqual(b OpenSearchSpec) bool {
return a.Cluster.IsEqual(b.Cluster) &&
a.IsTwoFactorDeleteEqual(b.TwoFactorDelete) &&
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/postgresql_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ func (pdc *PgDataCentre) PGBouncerToInstAPI() (iPgB []*models.PGBouncer) {
return
}

func (c *PostgreSQL) GetSpec() PgSpec { return c.Spec }

func (c *PostgreSQL) IsSpecEqual(spec PgSpec) bool {
return c.Spec.IsEqual(spec)
}

func (pgs *PgSpec) IsEqual(iPG PgSpec) bool {
return pgs.Cluster.IsEqual(iPG.Cluster) &&
pgs.SynchronousModeStrict == iPG.SynchronousModeStrict &&
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/redis_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (r *Redis) NewBackupSpec(startTimestamp int) *clusterresourcesv1beta1.Clust
}
}

func (c *Redis) GetSpec() RedisSpec { return c.Spec }

func (c *Redis) IsSpecEqual(spec RedisSpec) bool {
return c.Spec.IsEqual(spec)
}

func (rs *RedisSpec) ToInstAPI() *models.RedisCluster {
return &models.RedisCluster{
Name: rs.Name,
Expand Down
4 changes: 4 additions & 0 deletions apis/clusters/v1beta1/zookeeper_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ func (zdc *ZookeeperDataCentre) ToInstAPI() *models.ZookeeperDataCentre {
}
}

func (z *Zookeeper) GetSpec() ZookeeperSpec { return z.Spec }

func (z *Zookeeper) IsSpecEqual(spec ZookeeperSpec) bool { return z.Spec.IsEqual(spec) }

func (a *ZookeeperSpec) IsEqual(b ZookeeperSpec) bool {
return a.Cluster.IsEqual(b.Cluster) &&
a.areDCsEqual(b.DataCentres)
Expand Down
85 changes: 10 additions & 75 deletions controllers/clusters/cadence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/instaclustr/operator/apis/clusters/v1beta1"
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
rlimiter "github.com/instaclustr/operator/pkg/ratelimiter"
"github.com/instaclustr/operator/pkg/scheduler"
)

Expand All @@ -52,6 +53,7 @@ type CadenceReconciler struct {
API instaclustr.API
Scheduler scheduler.Interface
EventRecorder record.EventRecorder
RateLimiter ratelimiter.RateLimiter
}

//+kubebuilder:rbac:groups=clusters.instaclustr.com,resources=cadences,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -95,7 +97,7 @@ func (r *CadenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
case models.CreatingEvent:
return r.handleCreateCluster(ctx, c, l)
case models.UpdatingEvent:
return r.handleUpdateCluster(ctx, c, l)
return r.handleUpdateCluster(ctx, c, req, l)
case models.DeletingEvent:
return r.handleDeleteCluster(ctx, c, l)
case models.GenericEvent:
Expand Down Expand Up @@ -331,6 +333,7 @@ func (r *CadenceReconciler) handleCreateCluster(
func (r *CadenceReconciler) handleUpdateCluster(
ctx context.Context,
c *v1beta1.Cadence,
req ctrl.Request,
l logr.Logger,
) (ctrl.Result, error) {
iData, err := r.API.GetCadence(c.Status.ID)
Expand Down Expand Up @@ -361,33 +364,9 @@ func (r *CadenceReconciler) handleUpdateCluster(
return ctrl.Result{}, err
}

if iCadence.Status.CurrentClusterOperationStatus != models.NoOperation {
l.Info("Cadence cluster is not ready to update",
"cluster name", iCadence.Spec.Name,
"cluster state", iCadence.Status.State,
"current operation status", iCadence.Status.CurrentClusterOperationStatus,
)

patch := c.NewPatch()
c.Annotations[models.ResourceStateAnnotation] = models.UpdatingEvent
c.Annotations[models.UpdateQueuedAnnotation] = models.True
err = r.Patch(ctx, c, patch)
if err != nil {
l.Error(err, "Cannot patch Cadence cluster",
"cluster name", c.Spec.Name,
"patch", patch)

r.EventRecorder.Eventf(c, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return ctrl.Result{}, err
}

return models.ReconcileRequeue, nil
}

if c.Annotations[models.ExternalChangesAnnotation] == models.True {
return r.handleExternalChanges(c, iCadence, l)
if c.Annotations[models.ExternalChangesAnnotation] == models.True ||
r.RateLimiter.NumRequeues(req) == rlimiter.DefaultMaxTries {
return handleExternalChanges[v1beta1.CadenceSpec](r.EventRecorder, r.Client, c, iCadence, l)
}

if c.Spec.ClusterSettingsNeedUpdate(iCadence.Spec.Cluster) {
Expand Down Expand Up @@ -425,7 +404,6 @@ func (r *CadenceReconciler) handleUpdateCluster(

patch := c.NewPatch()
c.Annotations[models.ResourceStateAnnotation] = models.UpdatedEvent
c.Annotations[models.UpdateQueuedAnnotation] = ""
err = r.Patch(ctx, c, patch)
if err != nil {
l.Error(err, "Cannot patch Cadence cluster",
Expand All @@ -448,44 +426,6 @@ func (r *CadenceReconciler) handleUpdateCluster(
return ctrl.Result{}, nil
}

func (r *CadenceReconciler) handleExternalChanges(c, iCadence *v1beta1.Cadence, l logr.Logger) (ctrl.Result, error) {
if !c.Spec.AreDCsEqual(iCadence.Spec.DataCentres) {
l.Info(msgExternalChanges,
"instaclustr data", iCadence.Spec.DataCentres,
"k8s resource spec", c.Spec.DataCentres)

msgDiffSpecs, err := createSpecDifferenceMessage(c.Spec.DataCentres, iCadence.Spec.DataCentres)
if err != nil {
l.Error(err, "Cannot create specification difference message",
"instaclustr data", iCadence.Spec, "k8s resource spec", c.Spec)
return ctrl.Result{}, err
}
r.EventRecorder.Eventf(c, models.Warning, models.ExternalChanges, msgDiffSpecs)

return ctrl.Result{}, nil
}

patch := c.NewPatch()

c.Annotations[models.ExternalChangesAnnotation] = ""

err := r.Patch(context.Background(), c, patch)
if err != nil {
l.Error(err, "Cannot patch cluster resource",
"cluster name", c.Spec.Name, "cluster ID", c.Status.ID)

r.EventRecorder.Eventf(c, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return ctrl.Result{}, err
}

l.Info("External changes have been reconciled", "resource ID", c.Status.ID)
r.EventRecorder.Event(c, models.Normal, models.ExternalChanges, "External changes have been reconciled")

return ctrl.Result{}, nil
}

func (r *CadenceReconciler) handleDeleteCluster(
ctx context.Context,
c *v1beta1.Cadence,
Expand Down Expand Up @@ -1001,7 +941,7 @@ func (r *CadenceReconciler) newWatchStatusJob(c *v1beta1.Cadence) scheduler.Job
"External changes were automatically reconciled",
)
} else if c.Status.CurrentClusterOperationStatus == models.NoOperation &&
c.Annotations[models.UpdateQueuedAnnotation] != models.True &&
c.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent &&
!equals {
l.Info(msgExternalChanges,
"instaclustr data", iCadence.Spec.DataCentres,
Expand Down Expand Up @@ -1313,12 +1253,7 @@ func areSecondaryCadenceTargetsEqual(k8sTargets, iTargets []*v1beta1.TargetCaden
// SetupWithManager sets up the controller with the Manager.
func (r *CadenceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(
ratelimiter.DefaultBaseDelay,
ratelimiter.DefaultMaxDelay,
),
}).
WithOptions(controller.Options{RateLimiter: r.RateLimiter}).
For(&v1beta1.Cadence{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
if deleting := confirmDeletion(event.Object); deleting {
Expand Down
Loading

0 comments on commit e9e3365

Please sign in to comment.