Skip to content

Commit

Permalink
issue-572-2, handling of external deletion for clusterresources was i…
Browse files Browse the repository at this point in the history
…mplemented
  • Loading branch information
Bohdan Siryk authored and Bohdan Siryk committed Oct 3, 2023
1 parent d560e5e commit 3ae4f44
Show file tree
Hide file tree
Showing 23 changed files with 253 additions and 8 deletions.
1 change: 1 addition & 0 deletions apis/clusterresources/v1beta1/awsencryptionkey_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type AWSEncryptionKeySpec struct {
type AWSEncryptionKeyStatus struct {
ID string `json:"id,omitempty"`
InUse bool `json:"inUse,omitempty"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type AWSEndpointServicePrincipalStatus struct {

// The Instaclustr ID of the AWS endpoint service
EndPointServiceID string `json:"endPointServiceId,omitempty"`

// State describe current state of the resource
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand All @@ -58,6 +61,10 @@ func (r *AWSEndpointServicePrincipal) NewPatch() client.Patch {
return client.MergeFrom(r.DeepCopy())
}

func (r *AWSEndpointServicePrincipal) GetJobID(job string) string {
return r.Namespace + "/" + r.Name + "/" + job
}

//+kubebuilder:object:root=true

// AWSEndpointServicePrincipalList contains a list of AWSEndpointServicePrincipal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type AWSSecurityGroupFirewallRuleSpec struct {
// AWSSecurityGroupFirewallRuleStatus defines the observed state of AWSSecurityGroupFirewallRule
type AWSSecurityGroupFirewallRuleStatus struct {
FirewallRuleStatus `json:",inline"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
5 changes: 5 additions & 0 deletions apis/clusterresources/v1beta1/awsvpcpeering_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type AWSVPCPeeringSpec struct {
// AWSVPCPeeringStatus defines the observed state of AWSVPCPeering
type AWSVPCPeeringStatus struct {
PeeringStatus `json:",inline"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down Expand Up @@ -71,6 +72,10 @@ func (aws *AWSVPCPeering) NewPatch() client.Patch {
return client.MergeFrom(old)
}

func (r *AWSVPCPeering) SetState(state string) {
r.Status.State = state
}

func init() {
SchemeBuilder.Register(&AWSVPCPeering{}, &AWSVPCPeeringList{})
}
Expand Down
1 change: 1 addition & 0 deletions apis/clusterresources/v1beta1/azurevnetpeering_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type AzureVNetPeeringSpec struct {
// AzureVNetPeeringStatus defines the observed state of AzureVNetPeering
type AzureVNetPeeringStatus struct {
PeeringStatus `json:",inline"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
1 change: 1 addition & 0 deletions apis/clusterresources/v1beta1/gcpvpcpeering_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type GCPVPCPeeringSpec struct {
// GCPVPCPeeringStatus defines the observed state of GCPVPCPeering
type GCPVPCPeeringStatus struct {
PeeringStatus `json:",inline"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ spec:
type: string
inUse:
type: boolean
state:
type: string
type: object
type: object
served: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ spec:
id:
description: The Instaclustr ID of the IAM Principal ARN
type: string
state:
description: State describe current state of the resource
type: string
type: object
type: object
served: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ spec:
type: string
id:
type: string
state:
type: string
status:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ spec:
type: string
name:
type: string
state:
type: string
statusCode:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ spec:
type: string
name:
type: string
state:
type: string
statusCode:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ spec:
type: string
name:
type: string
state:
type: string
statusCode:
type: string
type: object
Expand Down
27 changes: 26 additions & 1 deletion controllers/clusterresources/awsencryptionkey_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (r *AWSEncryptionKeyReconciler) handleCreate(
)

encryptionKey.Status = *encryptionKeyStatus
encryptionKey.Status.State = models.CreatedStatus
err = r.Status().Patch(ctx, encryptionKey, patch)
if err != nil {
l.Error(err, "Cannot patch AWS encryption key status ", "ID", encryptionKey.Status.ID)
Expand Down Expand Up @@ -274,8 +275,14 @@ func (r *AWSEncryptionKeyReconciler) startEncryptionKeyStatusJob(encryptionKey *
func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AWSEncryptionKey) scheduler.Job {
l := log.Log.WithValues("component", "EncryptionKeyStatusJob")
return func() error {
ctx := context.Background()

instaEncryptionKeyStatus, err := r.API.GetEncryptionKeyStatus(encryptionKey.Status.ID, instaclustr.AWSEncryptionKeyEndpoint)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
return r.handleExternalDelete(ctx, encryptionKey)
}

l.Error(err, "Cannot get AWS encryption key status from Inst API", "encryption key ID", encryptionKey.Status.ID)
return err
}
Expand All @@ -286,7 +293,7 @@ func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AW
"encryption key status", encryptionKey.Status)
patch := encryptionKey.NewPatch()
encryptionKey.Status = *instaEncryptionKeyStatus
err := r.Status().Patch(context.Background(), encryptionKey, patch)
err := r.Status().Patch(ctx, encryptionKey, patch)
if err != nil {
return err
}
Expand All @@ -296,6 +303,24 @@ func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AW
}
}

func (r *AWSEncryptionKeyReconciler) handleExternalDelete(ctx context.Context, key *v1beta1.AWSEncryptionKey) error {
l := log.FromContext(ctx)

patch := key.NewPatch()
key.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, key, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSEncryptionKeyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,19 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context
"AWS endpoint service principal resource has been created",
)

err = r.startWatchStatusJob(ctx, principal)
if err != nil {
l.Error(err, "failed to start status checker job")
r.EventRecorder.Eventf(principal, models.Warning, models.CreationFailed,
"Failed to start status checker job. Reason: %w", err,
)

return err
}
r.EventRecorder.Eventf(principal, models.Normal, models.Created,
"Status check job %s has been started", principal.GetJobID(scheduler.StatusChecker),
)

return nil
}

Expand Down Expand Up @@ -175,6 +188,44 @@ func (r *AWSEndpointServicePrincipalReconciler) handleDelete(ctx context.Context
return nil
}

func (r *AWSEndpointServicePrincipalReconciler) startWatchStatusJob(ctx context.Context, resource *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error {
job := r.newWatchStatusJob(ctx, resource)
return r.Scheduler.ScheduleJob(resource.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job)
}

func (r *AWSEndpointServicePrincipalReconciler) newWatchStatusJob(ctx context.Context, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) scheduler.Job {
return func() error {
exists, err := r.API.CheckIfAWSEndpointServicePrincipalExists(principal.Status.ID)
if err != nil {
return err
}

if !exists {
return r.handleExternalDelete(ctx, principal)
}

return nil
}
}

func (r *AWSEndpointServicePrincipalReconciler) handleExternalDelete(ctx context.Context, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error {
l := log.FromContext(ctx)

patch := principal.NewPatch()
principal.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, principal, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(principal, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(principal.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSEndpointServicePrincipalReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package clusterresources
import (
"context"
"errors"
"fmt"

"github.com/go-logr/logr"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -127,6 +128,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule(
)

firewallRule.Status.FirewallRuleStatus = *firewallRuleStatus
firewallRule.Status.State = models.CreatedStatus
err = r.Status().Patch(ctx, firewallRule, patch)
if err != nil {
l.Error(err, "Cannot patch AWS security group firewall rule status ", "ID", firewallRule.Status.ID)
Expand Down Expand Up @@ -291,8 +293,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule
instaFirewallRuleStatus, err := r.API.GetFirewallRuleStatus(firewallRule.Status.ID, instaclustr.AWSSecurityGroupFirewallRuleEndpoint)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...")
return r.Delete(ctx, firewallRule)
return r.handleExternalDelete(ctx, firewallRule)
}

l.Error(err, "Cannot get AWS security group firewall rule status from Inst API", "firewall rule ID", firewallRule.Status.ID)
Expand All @@ -311,6 +312,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule
}

if instaFirewallRuleStatus.Status == statusDELETED {
fmt.Println("STATUS_DELETED")
r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker))
}
}
Expand All @@ -319,6 +321,24 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule
}
}

func (r *AWSSecurityGroupFirewallRuleReconciler) handleExternalDelete(ctx context.Context, rule *v1beta1.AWSSecurityGroupFirewallRule) error {
l := log.FromContext(ctx)

patch := rule.NewPatch()
rule.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, rule, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(rule, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(rule.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSSecurityGroupFirewallRuleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
24 changes: 21 additions & 3 deletions controllers/clusterresources/awsvpcpeering_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering(

patch := aws.NewPatch()
aws.Status.PeeringStatus = *awsStatus
aws.Status.State = models.CreatedStatus
err = r.Status().Patch(ctx, aws, patch)
if err != nil {
l.Error(err, "cannot patch AWS VPC Peering resource status",
Expand Down Expand Up @@ -404,8 +405,7 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe
instaAWSPeering, err := r.API.GetAWSVPCPeering(awsPeering.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...")
return r.Delete(ctx, awsPeering)
return r.handleExternalDelete(ctx, awsPeering)
}

l.Error(err, "cannot get AWS VPC Peering Status from Inst API",
Expand Down Expand Up @@ -434,7 +434,7 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe

if awsPeering.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent &&
awsPeering.Annotations[models.ExternalChangesAnnotation] != models.True &&
!slices.Equal(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) {
!subnetsEqual(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) {
l.Info("The k8s resource specification doesn't match the specification of Instaclustr, please change it manually",
"k8s peerSubnets", instaAWSPeering.PeerSubnets,
"instaclutr peerSubnets", awsPeering.Spec.PeerSubnets,
Expand Down Expand Up @@ -498,3 +498,21 @@ func (r *AWSVPCPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error {
},
})).Complete(r)
}

func (r *AWSVPCPeeringReconciler) handleExternalDelete(ctx context.Context, key *v1beta1.AWSVPCPeering) error {
l := log.FromContext(ctx)

patch := key.NewPatch()
key.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, key, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker))

return nil
}
Loading

0 comments on commit 3ae4f44

Please sign in to comment.