Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External changes on several failed updates for all clusters #689

Merged
merged 1 commit into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/cadence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,12 @@ func (cs *CadenceSpec) PrivateLinkFromInstAPI(iPLs []*models.PrivateLink) (pls [
return
}

func (c *Cadence) GetSpec() CadenceSpec { return c.Spec }

func (c *Cadence) IsSpecEqual(spec CadenceSpec) bool {
return c.Spec.IsEqual(spec)
}

func (cs *CadenceSpec) IsEqual(spec CadenceSpec) bool {
return cs.AreDCsEqual(spec.DataCentres)
}
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/cassandra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ func (cs *CassandraSpec) IsEqual(spec CassandraSpec) bool {
cs.BundledUseOnly == spec.BundledUseOnly
}

func (c *Cassandra) GetSpec() CassandraSpec { return c.Spec }

func (c *Cassandra) IsSpecEqual(spec CassandraSpec) bool {
return c.Spec.IsEqual(spec)
}

func (cs *CassandraSpec) AreDCsEqual(dcs []*CassandraDataCentre) bool {
if len(cs.DataCentres) != len(dcs) {
return false
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,12 @@ func (a *KafkaSpec) IsEqual(b KafkaSpec) bool {
a.IsTwoFactorDeleteEqual(b.TwoFactorDelete)
}

func (c *Kafka) GetSpec() KafkaSpec { return c.Spec }

func (c *Kafka) IsSpecEqual(spec KafkaSpec) bool {
return c.Spec.IsEqual(spec)
}

func (rs *KafkaSpec) areDCsEqual(b []*KafkaDataCentre) bool {
a := rs.DataCentres
if len(a) != len(b) {
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/kafkaconnect_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ func (ks *KafkaConnectStatus) DCsFromInstAPI(iDCs []*models.KafkaConnectDataCent
return
}

func (c *KafkaConnect) GetSpec() KafkaConnectSpec { return c.Spec }

func (c *KafkaConnect) IsSpecEqual(spec KafkaConnectSpec) bool {
return c.Spec.IsEqual(spec)
}

func (ks *KafkaConnectSpec) IsEqual(kc KafkaConnectSpec) bool {
return ks.Cluster.IsEqual(kc.Cluster) &&
ks.AreDataCentresEqual(kc.DataCentres) &&
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/opensearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,12 @@ func (oss *OpenSearchStatus) DCsFromInstAPI(iDCs []*models.OpenSearchDataCentre)
return
}

func (c *OpenSearch) GetSpec() OpenSearchSpec { return c.Spec }

func (c *OpenSearch) IsSpecEqual(spec OpenSearchSpec) bool {
return c.Spec.IsEqual(spec)
}

func (a *OpenSearchSpec) IsEqual(b OpenSearchSpec) bool {
return a.Cluster.IsEqual(b.Cluster) &&
a.IsTwoFactorDeleteEqual(b.TwoFactorDelete) &&
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/postgresql_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ func (pdc *PgDataCentre) PGBouncerToInstAPI() (iPgB []*models.PGBouncer) {
return
}

func (c *PostgreSQL) GetSpec() PgSpec { return c.Spec }

func (c *PostgreSQL) IsSpecEqual(spec PgSpec) bool {
return c.Spec.IsEqual(spec)
}

func (pgs *PgSpec) IsEqual(iPG PgSpec) bool {
return pgs.Cluster.IsEqual(iPG.Cluster) &&
pgs.SynchronousModeStrict == iPG.SynchronousModeStrict &&
Expand Down
6 changes: 6 additions & 0 deletions apis/clusters/v1beta1/redis_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (r *Redis) NewBackupSpec(startTimestamp int) *clusterresourcesv1beta1.Clust
}
}

func (c *Redis) GetSpec() RedisSpec { return c.Spec }

func (c *Redis) IsSpecEqual(spec RedisSpec) bool {
return c.Spec.IsEqual(spec)
}

func (rs *RedisSpec) ToInstAPI() *models.RedisCluster {
return &models.RedisCluster{
Name: rs.Name,
Expand Down
4 changes: 4 additions & 0 deletions apis/clusters/v1beta1/zookeeper_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ func (zdc *ZookeeperDataCentre) ToInstAPI() *models.ZookeeperDataCentre {
}
}

func (z *Zookeeper) GetSpec() ZookeeperSpec { return z.Spec }

func (z *Zookeeper) IsSpecEqual(spec ZookeeperSpec) bool { return z.Spec.IsEqual(spec) }

func (a *ZookeeperSpec) IsEqual(b ZookeeperSpec) bool {
return a.Cluster.IsEqual(b.Cluster) &&
a.areDCsEqual(b.DataCentres)
Expand Down
85 changes: 10 additions & 75 deletions controllers/clusters/cadence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/instaclustr/operator/apis/clusters/v1beta1"
"github.com/instaclustr/operator/pkg/exposeservice"
"github.com/instaclustr/operator/pkg/instaclustr"
"github.com/instaclustr/operator/pkg/models"
"github.com/instaclustr/operator/pkg/ratelimiter"
rlimiter "github.com/instaclustr/operator/pkg/ratelimiter"
"github.com/instaclustr/operator/pkg/scheduler"
)

Expand All @@ -52,6 +53,7 @@ type CadenceReconciler struct {
API instaclustr.API
Scheduler scheduler.Interface
EventRecorder record.EventRecorder
RateLimiter ratelimiter.RateLimiter
}

//+kubebuilder:rbac:groups=clusters.instaclustr.com,resources=cadences,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -95,7 +97,7 @@ func (r *CadenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
case models.CreatingEvent:
return r.handleCreateCluster(ctx, c, l)
case models.UpdatingEvent:
return r.handleUpdateCluster(ctx, c, l)
return r.handleUpdateCluster(ctx, c, req, l)
case models.DeletingEvent:
return r.handleDeleteCluster(ctx, c, l)
case models.GenericEvent:
Expand Down Expand Up @@ -331,6 +333,7 @@ func (r *CadenceReconciler) handleCreateCluster(
func (r *CadenceReconciler) handleUpdateCluster(
ctx context.Context,
c *v1beta1.Cadence,
req ctrl.Request,
l logr.Logger,
) (ctrl.Result, error) {
iData, err := r.API.GetCadence(c.Status.ID)
Expand Down Expand Up @@ -361,33 +364,9 @@ func (r *CadenceReconciler) handleUpdateCluster(
return ctrl.Result{}, err
}

if iCadence.Status.CurrentClusterOperationStatus != models.NoOperation {
l.Info("Cadence cluster is not ready to update",
"cluster name", iCadence.Spec.Name,
"cluster state", iCadence.Status.State,
"current operation status", iCadence.Status.CurrentClusterOperationStatus,
)

patch := c.NewPatch()
c.Annotations[models.ResourceStateAnnotation] = models.UpdatingEvent
c.Annotations[models.UpdateQueuedAnnotation] = models.True
err = r.Patch(ctx, c, patch)
if err != nil {
l.Error(err, "Cannot patch Cadence cluster",
"cluster name", c.Spec.Name,
"patch", patch)

r.EventRecorder.Eventf(c, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return ctrl.Result{}, err
}

return models.ReconcileRequeue, nil
}

if c.Annotations[models.ExternalChangesAnnotation] == models.True {
return r.handleExternalChanges(c, iCadence, l)
if c.Annotations[models.ExternalChangesAnnotation] == models.True ||
r.RateLimiter.NumRequeues(req) == rlimiter.DefaultMaxTries {
return handleExternalChanges[v1beta1.CadenceSpec](r.EventRecorder, r.Client, c, iCadence, l)
}

if c.Spec.ClusterSettingsNeedUpdate(iCadence.Spec.Cluster) {
Expand Down Expand Up @@ -425,7 +404,6 @@ func (r *CadenceReconciler) handleUpdateCluster(

patch := c.NewPatch()
c.Annotations[models.ResourceStateAnnotation] = models.UpdatedEvent
c.Annotations[models.UpdateQueuedAnnotation] = ""
err = r.Patch(ctx, c, patch)
if err != nil {
l.Error(err, "Cannot patch Cadence cluster",
Expand All @@ -448,44 +426,6 @@ func (r *CadenceReconciler) handleUpdateCluster(
return ctrl.Result{}, nil
}

func (r *CadenceReconciler) handleExternalChanges(c, iCadence *v1beta1.Cadence, l logr.Logger) (ctrl.Result, error) {
if !c.Spec.AreDCsEqual(iCadence.Spec.DataCentres) {
l.Info(msgExternalChanges,
"instaclustr data", iCadence.Spec.DataCentres,
"k8s resource spec", c.Spec.DataCentres)

msgDiffSpecs, err := createSpecDifferenceMessage(c.Spec.DataCentres, iCadence.Spec.DataCentres)
if err != nil {
l.Error(err, "Cannot create specification difference message",
"instaclustr data", iCadence.Spec, "k8s resource spec", c.Spec)
return ctrl.Result{}, err
}
r.EventRecorder.Eventf(c, models.Warning, models.ExternalChanges, msgDiffSpecs)

return ctrl.Result{}, nil
}

patch := c.NewPatch()

c.Annotations[models.ExternalChangesAnnotation] = ""

err := r.Patch(context.Background(), c, patch)
if err != nil {
l.Error(err, "Cannot patch cluster resource",
"cluster name", c.Spec.Name, "cluster ID", c.Status.ID)

r.EventRecorder.Eventf(c, models.Warning, models.PatchFailed,
"Cluster resource patch is failed. Reason: %v", err)

return ctrl.Result{}, err
}

l.Info("External changes have been reconciled", "resource ID", c.Status.ID)
r.EventRecorder.Event(c, models.Normal, models.ExternalChanges, "External changes have been reconciled")

return ctrl.Result{}, nil
}

func (r *CadenceReconciler) handleDeleteCluster(
ctx context.Context,
c *v1beta1.Cadence,
Expand Down Expand Up @@ -1001,7 +941,7 @@ func (r *CadenceReconciler) newWatchStatusJob(c *v1beta1.Cadence) scheduler.Job
"External changes were automatically reconciled",
)
} else if c.Status.CurrentClusterOperationStatus == models.NoOperation &&
c.Annotations[models.UpdateQueuedAnnotation] != models.True &&
c.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent &&
!equals {
l.Info(msgExternalChanges,
"instaclustr data", iCadence.Spec.DataCentres,
Expand Down Expand Up @@ -1313,12 +1253,7 @@ func areSecondaryCadenceTargetsEqual(k8sTargets, iTargets []*v1beta1.TargetCaden
// SetupWithManager sets up the controller with the Manager.
func (r *CadenceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewItemExponentialFailureRateLimiterWithMaxTries(
ratelimiter.DefaultBaseDelay,
ratelimiter.DefaultMaxDelay,
),
}).
WithOptions(controller.Options{RateLimiter: r.RateLimiter}).
For(&v1beta1.Cadence{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
if deleting := confirmDeletion(event.Object); deleting {
Expand Down
Loading
Loading