Skip to content

Commit

Permalink
issue-572, handling external deletion of clusters was implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Siryk authored and Bohdan Siryk committed Sep 26, 2023
1 parent 3723b60 commit ee6d5c8
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 324 deletions.
60 changes: 21 additions & 39 deletions controllers/clusters/cadence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,45 +790,7 @@ func (r *CadenceReconciler) newWatchStatusJob(cadence *v1beta1.Cadence) schedule
iData, err := r.API.GetCadence(cadence.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
activeClusters, err := r.API.ListClusters()
if err != nil {
l.Error(err, "Cannot list account active clusters")
return err
}

if !isClusterActive(cadence.Status.ID, activeClusters) {
l.Info("Cluster is not found in Instaclustr. Deleting resource.",
"cluster ID", cadence.Status.ClusterStatus.ID,
"cluster name", cadence.Spec.Name,
)

patch := cadence.NewPatch()
cadence.Annotations[models.ClusterDeletionAnnotation] = ""
cadence.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent
err = r.Patch(context.TODO(), cadence, patch)
if err != nil {
l.Error(err, "Cannot patch Cadence cluster resource",
"cluster ID", cadence.Status.ID,
"cluster name", cadence.Spec.Name,
"resource name", cadence.Name,
)

return err
}

err = r.Delete(context.TODO(), cadence)
if err != nil {
l.Error(err, "Cannot delete Cadence cluster resource",
"cluster ID", cadence.Status.ID,
"cluster name", cadence.Spec.Name,
"resource name", cadence.Name,
)

return err
}

return nil
}
return r.handleExternalDelete(context.Background(), cadence)
}

l.Error(err, "Cannot get Cadence cluster from the Instaclustr API",
Expand Down Expand Up @@ -1281,3 +1243,23 @@ func (r *CadenceReconciler) reconcileMaintenanceEvents(ctx context.Context, c *v

return nil
}

func (r *CadenceReconciler) handleExternalDelete(ctx context.Context, c *v1beta1.Cadence) error {
l := log.FromContext(ctx)

patch := c.NewPatch()
c.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, c, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(c, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(c.GetJobID(scheduler.BackupsChecker))
r.Scheduler.RemoveJob(c.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker))

return nil
}
60 changes: 21 additions & 39 deletions controllers/clusters/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,45 +885,7 @@ func (r *CassandraReconciler) newWatchStatusJob(cassandra *v1beta1.Cassandra) sc
iData, err := r.API.GetCassandra(cassandra.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
activeClusters, err := r.API.ListClusters()
if err != nil {
l.Error(err, "Cannot list account active clusters")
return err
}

if !isClusterActive(cassandra.Status.ID, activeClusters) {
l.Info("Cluster is not found in Instaclustr. Deleting resource.",
"cluster ID", cassandra.Status.ClusterStatus.ID,
"cluster name", cassandra.Spec.Name,
)

patch := cassandra.NewPatch()
cassandra.Annotations[models.ClusterDeletionAnnotation] = ""
cassandra.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent
err = r.Patch(context.TODO(), cassandra, patch)
if err != nil {
l.Error(err, "Cannot patch Cassandra cluster resource",
"cluster ID", cassandra.Status.ID,
"cluster name", cassandra.Spec.Name,
"resource name", cassandra.Name,
)

return err
}

err = r.Delete(context.TODO(), cassandra)
if err != nil {
l.Error(err, "Cannot delete Cassandra cluster resource",
"cluster ID", cassandra.Status.ID,
"cluster name", cassandra.Spec.Name,
"resource name", cassandra.Name,
)

return err
}

return nil
}
return r.handleExternalDelete(context.Background(), cassandra)
}

l.Error(err, "Cannot get cluster from the Instaclustr API",
Expand Down Expand Up @@ -1262,6 +1224,26 @@ func (r *CassandraReconciler) reconcileMaintenanceEvents(ctx context.Context, c
return nil
}

func (r *CassandraReconciler) handleExternalDelete(ctx context.Context, c *v1beta1.Cassandra) error {
l := log.FromContext(ctx)

patch := c.NewPatch()
c.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, c, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(c, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(c.GetJobID(scheduler.BackupsChecker))
r.Scheduler.RemoveJob(c.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(c.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
17 changes: 2 additions & 15 deletions controllers/clusters/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"sort"

"github.com/hashicorp/go-version"
"k8s.io/utils/strings/slices"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/instaclustr/operator/apis/clusters/v1beta1"
"github.com/instaclustr/operator/pkg/models"
"k8s.io/utils/strings/slices"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// confirmDeletion confirms if resource is deleting and set appropriate annotation.
Expand Down Expand Up @@ -135,18 +134,6 @@ func isDataCentreNodesEqual(a, b []*v1beta1.Node) bool {
return true
}

func isClusterActive(clusterID string, activeClusters []*models.ActiveClusters) bool {
for _, activeCluster := range activeClusters {
for _, cluster := range activeCluster.Clusters {
if cluster.ID == clusterID {
return true
}
}
}

return false
}

func getSortedAppVersions(versions []*models.AppVersions, appType string) []*version.Version {
for _, apps := range versions {
if apps.Application == appType {
Expand Down
48 changes: 12 additions & 36 deletions controllers/clusters/kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ func (r *KafkaReconciler) newWatchStatusJob(kafka *v1beta1.Kafka) scheduler.Job
"namespaced name", namespacedName)
r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.StatusChecker))
r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.BackupsChecker))
return nil
}
if err != nil {
Expand All @@ -688,7 +689,7 @@ func (r *KafkaReconciler) newWatchStatusJob(kafka *v1beta1.Kafka) scheduler.Job
iData, err := r.API.GetKafka(kafka.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
return r.handleDeleteFromInstaclustrUI(kafka, l)
return r.handleExternalDelete(context.Background(), kafka)
}

l.Error(err, "Cannot get cluster from the Instaclustr", "cluster ID", kafka.Status.ID)
Expand Down Expand Up @@ -868,47 +869,22 @@ func (r *KafkaReconciler) newUsersCreationJob(kafka *v1beta1.Kafka) scheduler.Jo
}
}

func (r *KafkaReconciler) handleDeleteFromInstaclustrUI(kafka *v1beta1.Kafka, l logr.Logger) error {
activeClusters, err := r.API.ListClusters()
if err != nil {
l.Error(err, "Cannot list account active clusters")
return err
}

if isClusterActive(kafka.Status.ID, activeClusters) {
l.Info("Kafka is not found in the Instaclustr but still exist in the Instaclustr list of active cluster",
"cluster ID", kafka.Status.ID,
"cluster name", kafka.Spec.Name,
"resource name", kafka.Name)

return nil
}

l.Info("Cluster is not found in Instaclustr. Deleting resource.",
"cluster ID", kafka.Status.ClusterStatus.ID,
"cluster name", kafka.Spec.Name)
func (r *KafkaReconciler) handleExternalDelete(ctx context.Context, kafka *v1beta1.Kafka) error {
l := log.FromContext(ctx)

patch := kafka.NewPatch()

kafka.Annotations[models.ClusterDeletionAnnotation] = ""
kafka.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent
err = r.Patch(context.TODO(), kafka, patch)
kafka.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, kafka, patch)
if err != nil {
l.Error(err, "Cannot patch Kafka cluster resource",
"cluster ID", kafka.Status.ID,
"cluster name", kafka.Spec.Name,
"resource name", kafka.Name)
return err
}

err = r.Delete(context.TODO(), kafka)
if err != nil {
l.Error(err, "Cannot delete Kafka cluster resource",
"cluster ID", kafka.Status.ID,
"cluster name", kafka.Spec.Name,
"resource name", kafka.Name)
return err
}
l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(kafka, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.BackupsChecker))
r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(kafka.GetJobID(scheduler.StatusChecker))

return nil
}
Expand Down
60 changes: 21 additions & 39 deletions controllers/clusters/kafkaconnect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,45 +468,7 @@ func (r *KafkaConnectReconciler) newWatchStatusJob(kc *v1beta1.KafkaConnect) sch
iData, err := r.API.GetKafkaConnect(kc.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
activeClusters, err := r.API.ListClusters()
if err != nil {
l.Error(err, "Cannot list account active clusters")
return err
}

if !isClusterActive(kc.Status.ID, activeClusters) {
l.Info("Cluster is not found in Instaclustr. Deleting resource.",
"cluster ID", kc.Status.ClusterStatus.ID,
"cluster name", kc.Spec.Name,
)

patch := kc.NewPatch()
kc.Annotations[models.ClusterDeletionAnnotation] = ""
kc.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent
err = r.Patch(context.TODO(), kc, patch)
if err != nil {
l.Error(err, "Cannot patch KafkaConnect cluster resource",
"cluster ID", kc.Status.ID,
"cluster name", kc.Spec.Name,
"resource name", kc.Name,
)

return err
}

err = r.Delete(context.TODO(), kc)
if err != nil {
l.Error(err, "Cannot delete KafkaConnect cluster resource",
"cluster ID", kc.Status.ID,
"cluster name", kc.Spec.Name,
"resource name", kc.Name,
)

return err
}

return nil
}
return r.handleExternalDelete(context.Background(), kc)
}

l.Error(err, "Cannot get Kafka Connect from Instaclustr",
Expand Down Expand Up @@ -657,3 +619,23 @@ func (r *KafkaConnectReconciler) reconcileMaintenanceEvents(ctx context.Context,

return nil
}

func (r *KafkaConnectReconciler) handleExternalDelete(ctx context.Context, kc *v1beta1.KafkaConnect) error {
l := log.FromContext(ctx)

patch := kc.NewPatch()
kc.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, kc, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(kc, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(kc.GetJobID(scheduler.BackupsChecker))
r.Scheduler.RemoveJob(kc.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(kc.GetJobID(scheduler.StatusChecker))

return nil
}
60 changes: 21 additions & 39 deletions controllers/clusters/opensearch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,45 +597,7 @@ func (r *OpenSearchReconciler) newWatchStatusJob(o *v1beta1.OpenSearch) schedule
iData, err := r.API.GetOpenSearch(o.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
activeClusters, err := r.API.ListClusters()
if err != nil {
l.Error(err, "Cannot list account active clusters")
return err
}

if !isClusterActive(o.Status.ID, activeClusters) {
l.Info("Cluster is not found in Instaclustr. Deleting resource.",
"cluster ID", o.Status.ClusterStatus.ID,
"cluster name", o.Spec.Name,
)

patch := o.NewPatch()
o.Annotations[models.ClusterDeletionAnnotation] = ""
o.Annotations[models.ResourceStateAnnotation] = models.DeletingEvent
err = r.Patch(context.TODO(), o, patch)
if err != nil {
l.Error(err, "Cannot patch OpenSearch cluster resource",
"cluster ID", o.Status.ID,
"cluster name", o.Spec.Name,
"resource name", o.Name,
)

return err
}

err = r.Delete(context.TODO(), o)
if err != nil {
l.Error(err, "Cannot delete OpenSearch cluster resource",
"cluster ID", o.Status.ID,
"cluster name", o.Spec.Name,
"resource name", o.Name,
)

return err
}

return nil
}
return r.handleExternalDelete(context.Background(), o)
}

l.Error(err, "Cannot get OpenSearch cluster from the Instaclustr API",
Expand Down Expand Up @@ -1248,3 +1210,23 @@ func (r *OpenSearchReconciler) reconcileMaintenanceEvents(ctx context.Context, o

return nil
}

func (r *OpenSearchReconciler) handleExternalDelete(ctx context.Context, o *v1beta1.OpenSearch) error {
l := log.FromContext(ctx)

patch := o.NewPatch()
o.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, o, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(o, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(o.GetJobID(scheduler.BackupsChecker))
r.Scheduler.RemoveJob(o.GetJobID(scheduler.UserCreator))
r.Scheduler.RemoveJob(o.GetJobID(scheduler.StatusChecker))

return nil
}
Loading

0 comments on commit ee6d5c8

Please sign in to comment.