Skip to content

Commit

Permalink
issue-572-2, handling of external deletion for clusterresources was i…
Browse files Browse the repository at this point in the history
…mplemented
  • Loading branch information
Bohdan Siryk authored and testisnullus committed Oct 17, 2023
1 parent 5c49b62 commit 4d5d744
Show file tree
Hide file tree
Showing 16 changed files with 343 additions and 14 deletions.
1 change: 1 addition & 0 deletions apis/clusterresources/v1beta1/awsencryptionkey_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type AWSEncryptionKeySpec struct {
type AWSEncryptionKeyStatus struct {
ID string `json:"id,omitempty"`
InUse bool `json:"inUse,omitempty"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type AWSEndpointServicePrincipalStatus struct {

// The Instaclustr ID of the AWS endpoint service
EndPointServiceID string `json:"endPointServiceId,omitempty"`

// State describe current state of the resource
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand All @@ -58,6 +61,10 @@ func (r *AWSEndpointServicePrincipal) NewPatch() client.Patch {
return client.MergeFrom(r.DeepCopy())
}

func (r *AWSEndpointServicePrincipal) GetJobID(job string) string {
return r.Namespace + "/" + r.Name + "/" + job
}

//+kubebuilder:object:root=true

// AWSEndpointServicePrincipalList contains a list of AWSEndpointServicePrincipal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ spec:
type: string
inUse:
type: boolean
state:
type: string
type: object
type: object
served: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ spec:
id:
description: The Instaclustr ID of the IAM Principal ARN
type: string
state:
description: State describe current state of the resource
type: string
type: object
type: object
served: true
Expand Down
43 changes: 42 additions & 1 deletion controllers/clusterresources/awsencryptionkey_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (r *AWSEncryptionKeyReconciler) handleCreate(
)

encryptionKey.Status = *encryptionKeyStatus
encryptionKey.Status.State = models.CreatedStatus
err = r.Status().Patch(ctx, encryptionKey, patch)
if err != nil {
l.Error(err, "Cannot patch AWS encryption key status ", "ID", encryptionKey.Status.ID)
Expand Down Expand Up @@ -274,8 +275,30 @@ func (r *AWSEncryptionKeyReconciler) startEncryptionKeyStatusJob(encryptionKey *
func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AWSEncryptionKey) scheduler.Job {
l := log.Log.WithValues("component", "EncryptionKeyStatusJob")
return func() error {
ctx := context.Background()

key := client.ObjectKeyFromObject(encryptionKey)
err := r.Get(ctx, key, encryptionKey)
if err != nil {
if k8serrors.IsNotFound(err) {
l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.",
"namespaced name", key,
)

r.Scheduler.RemoveJob(encryptionKey.GetJobID(scheduler.StatusChecker))

return nil
}

return err
}

instaEncryptionKeyStatus, err := r.API.GetEncryptionKeyStatus(encryptionKey.Status.ID, instaclustr.AWSEncryptionKeyEndpoint)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
return r.handleExternalDelete(ctx, encryptionKey)
}

l.Error(err, "Cannot get AWS encryption key status from Inst API", "encryption key ID", encryptionKey.Status.ID)
return err
}
Expand All @@ -286,7 +309,7 @@ func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AW
"encryption key status", encryptionKey.Status)
patch := encryptionKey.NewPatch()
encryptionKey.Status = *instaEncryptionKeyStatus
err := r.Status().Patch(context.Background(), encryptionKey, patch)
err := r.Status().Patch(ctx, encryptionKey, patch)
if err != nil {
return err
}
Expand All @@ -296,6 +319,24 @@ func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AW
}
}

func (r *AWSEncryptionKeyReconciler) handleExternalDelete(ctx context.Context, key *v1beta1.AWSEncryptionKey) error {
l := log.FromContext(ctx)

patch := key.NewPatch()
key.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, key, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSEncryptionKeyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,19 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context
"AWS endpoint service principal resource has been created",
)

err = r.startWatchStatusJob(ctx, principal)
if err != nil {
l.Error(err, "failed to start status checker job")
r.EventRecorder.Eventf(principal, models.Warning, models.CreationFailed,
"Failed to start status checker job. Reason: %w", err,
)

return err
}
r.EventRecorder.Eventf(principal, models.Normal, models.Created,
"Status check job %s has been started", principal.GetJobID(scheduler.StatusChecker),
)

return nil
}

Expand Down Expand Up @@ -175,6 +188,62 @@ func (r *AWSEndpointServicePrincipalReconciler) handleDelete(ctx context.Context
return nil
}

func (r *AWSEndpointServicePrincipalReconciler) startWatchStatusJob(ctx context.Context, resource *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error {
job := r.newWatchStatusJob(ctx, resource)
return r.Scheduler.ScheduleJob(resource.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job)
}

func (r *AWSEndpointServicePrincipalReconciler) newWatchStatusJob(ctx context.Context, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) scheduler.Job {
l := log.FromContext(ctx, "components", "WatchStatusJob")

return func() error {
key := client.ObjectKeyFromObject(principal)
err := r.Get(ctx, key, principal)
if err != nil {
if k8serrors.IsNotFound(err) {
l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.",
"namespaced name", key,
)

r.Scheduler.RemoveJob(principal.GetJobID(scheduler.StatusChecker))

return nil
}

return err
}

_, err = r.API.GetAWSEndpointServicePrincipal(principal.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
return r.handleExternalDelete(ctx, principal)
}

return err
}

return nil
}
}

func (r *AWSEndpointServicePrincipalReconciler) handleExternalDelete(ctx context.Context, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error {
l := log.FromContext(ctx)

patch := principal.NewPatch()
principal.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, principal, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(principal, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(principal.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSEndpointServicePrincipalReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,27 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule
l := log.Log.WithValues("component", "FirewallRuleStatusJob")
return func() error {
ctx := context.Background()

key := client.ObjectKeyFromObject(firewallRule)
err := r.Get(ctx, key, firewallRule)
if err != nil {
if k8serrors.IsNotFound(err) {
l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.",
"namespaced name", key,
)

r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker))

return nil
}

return err
}

instaFirewallRuleStatus, err := r.API.GetFirewallRuleStatus(firewallRule.Status.ID, instaclustr.AWSSecurityGroupFirewallRuleEndpoint)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...")
return r.Delete(ctx, firewallRule)
return r.handleExternalDelete(ctx, firewallRule)
}

l.Error(err, "Cannot get AWS security group firewall rule status from Inst API", "firewall rule ID", firewallRule.Status.ID)
Expand All @@ -309,16 +325,30 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule
if err != nil {
return err
}

if instaFirewallRuleStatus.Status == statusDELETED {
r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker))
}
}

return nil
}
}

func (r *AWSSecurityGroupFirewallRuleReconciler) handleExternalDelete(ctx context.Context, rule *v1beta1.AWSSecurityGroupFirewallRule) error {
l := log.FromContext(ctx)

patch := rule.NewPatch()
rule.Status.Status = models.DeletedStatus
err := r.Status().Patch(ctx, rule, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(rule, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(rule.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSSecurityGroupFirewallRuleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
37 changes: 32 additions & 5 deletions controllers/clusterresources/awsvpcpeering_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/utils/strings/slices"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -208,7 +207,7 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering(
}

if aws.Annotations[models.ExternalChangesAnnotation] == models.True {
if !slices.Equal(instaAWSPeering.PeerSubnets, aws.Spec.PeerSubnets) {
if !subnetsEqual(instaAWSPeering.PeerSubnets, aws.Spec.PeerSubnets) {
l.Info("The resource specification still differs from the Instaclustr resource specification, please reconcile it manually",
"AWS VPC ID", aws.Status.ID,
"k8s peerSubnets", aws.Spec.PeerSubnets,
Expand Down Expand Up @@ -404,8 +403,7 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe
instaAWSPeering, err := r.API.GetAWSVPCPeering(awsPeering.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...")
return r.Delete(ctx, awsPeering)
return r.handleExternalDelete(ctx, awsPeering)
}

l.Error(err, "cannot get AWS VPC Peering Status from Inst API",
Expand All @@ -432,9 +430,20 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe
}
}

if awsPeering.Status.StatusCode == models.AWSVPCPeeringStatusCodeDeleted {
l.Info("The AWSPeering was deleted on AWS, stopping job...")
r.EventRecorder.Event(awsPeering, models.Warning, models.DeletedEvent,
"The AWSPeering was deleted on AWS, stopping job...",
)

r.Scheduler.RemoveJob(awsPeering.GetJobID(scheduler.StatusChecker))

return nil
}

if awsPeering.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent &&
awsPeering.Annotations[models.ExternalChangesAnnotation] != models.True &&
!slices.Equal(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) {
!subnetsEqual(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) {
l.Info("The k8s resource specification doesn't match the specification of Instaclustr, please change it manually",
"k8s peerSubnets", instaAWSPeering.PeerSubnets,
"instaclutr peerSubnets", awsPeering.Spec.PeerSubnets,
Expand Down Expand Up @@ -498,3 +507,21 @@ func (r *AWSVPCPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error {
},
})).Complete(r)
}

func (r *AWSVPCPeeringReconciler) handleExternalDelete(ctx context.Context, key *v1beta1.AWSVPCPeering) error {
l := log.FromContext(ctx)

patch := key.NewPatch()
key.Status.StatusCode = models.DeletedStatus
err := r.Status().Patch(ctx, key, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker))

return nil
}
Loading

0 comments on commit 4d5d744

Please sign in to comment.