Skip to content

Commit

Permalink
issue-572-2, handling of external deletion for clusterresources was i…
Browse files Browse the repository at this point in the history
…mplemented
  • Loading branch information
Bohdan Siryk authored and Bohdan Siryk committed Oct 3, 2023
1 parent 3d59a9a commit 659e937
Show file tree
Hide file tree
Showing 23 changed files with 261 additions and 14 deletions.
1 change: 1 addition & 0 deletions apis/clusterresources/v1beta1/awsencryptionkey_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type AWSEncryptionKeySpec struct {
type AWSEncryptionKeyStatus struct {
ID string `json:"id,omitempty"`
InUse bool `json:"inUse,omitempty"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type AWSEndpointServicePrincipalStatus struct {

// The Instaclustr ID of the AWS endpoint service
EndPointServiceID string `json:"endPointServiceId,omitempty"`

// State describe current state of the resource
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand All @@ -58,6 +61,10 @@ func (r *AWSEndpointServicePrincipal) NewPatch() client.Patch {
return client.MergeFrom(r.DeepCopy())
}

func (r *AWSEndpointServicePrincipal) GetJobID(job string) string {
return r.Namespace + "/" + r.Name + "/" + job
}

//+kubebuilder:object:root=true

// AWSEndpointServicePrincipalList contains a list of AWSEndpointServicePrincipal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type AWSSecurityGroupFirewallRuleSpec struct {
// AWSSecurityGroupFirewallRuleStatus defines the observed state of AWSSecurityGroupFirewallRule
type AWSSecurityGroupFirewallRuleStatus struct {
FirewallRuleStatus `json:",inline"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
1 change: 1 addition & 0 deletions apis/clusterresources/v1beta1/awsvpcpeering_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type AWSVPCPeeringSpec struct {
// AWSVPCPeeringStatus defines the observed state of AWSVPCPeering
type AWSVPCPeeringStatus struct {
PeeringStatus `json:",inline"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
1 change: 1 addition & 0 deletions apis/clusterresources/v1beta1/azurevnetpeering_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type AzureVNetPeeringSpec struct {
// AzureVNetPeeringStatus defines the observed state of AzureVNetPeering
type AzureVNetPeeringStatus struct {
PeeringStatus `json:",inline"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
1 change: 1 addition & 0 deletions apis/clusterresources/v1beta1/gcpvpcpeering_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type GCPVPCPeeringSpec struct {
// GCPVPCPeeringStatus defines the observed state of GCPVPCPeering
type GCPVPCPeeringStatus struct {
PeeringStatus `json:",inline"`
State string `json:"state,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ spec:
type: string
inUse:
type: boolean
state:
type: string
type: object
type: object
served: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ spec:
id:
description: The Instaclustr ID of the IAM Principal ARN
type: string
state:
description: State describe current state of the resource
type: string
type: object
type: object
served: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ spec:
type: string
id:
type: string
state:
type: string
status:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ spec:
type: string
name:
type: string
state:
type: string
statusCode:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ spec:
type: string
name:
type: string
state:
type: string
statusCode:
type: string
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ spec:
type: string
name:
type: string
state:
type: string
statusCode:
type: string
type: object
Expand Down
27 changes: 26 additions & 1 deletion controllers/clusterresources/awsencryptionkey_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (r *AWSEncryptionKeyReconciler) handleCreate(
)

encryptionKey.Status = *encryptionKeyStatus
encryptionKey.Status.State = models.CreatedStatus
err = r.Status().Patch(ctx, encryptionKey, patch)
if err != nil {
l.Error(err, "Cannot patch AWS encryption key status ", "ID", encryptionKey.Status.ID)
Expand Down Expand Up @@ -274,8 +275,14 @@ func (r *AWSEncryptionKeyReconciler) startEncryptionKeyStatusJob(encryptionKey *
func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AWSEncryptionKey) scheduler.Job {
l := log.Log.WithValues("component", "EncryptionKeyStatusJob")
return func() error {
ctx := context.Background()

instaEncryptionKeyStatus, err := r.API.GetEncryptionKeyStatus(encryptionKey.Status.ID, instaclustr.AWSEncryptionKeyEndpoint)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
return r.handleExternalDelete(ctx, encryptionKey)
}

l.Error(err, "Cannot get AWS encryption key status from Inst API", "encryption key ID", encryptionKey.Status.ID)
return err
}
Expand All @@ -286,7 +293,7 @@ func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AW
"encryption key status", encryptionKey.Status)
patch := encryptionKey.NewPatch()
encryptionKey.Status = *instaEncryptionKeyStatus
err := r.Status().Patch(context.Background(), encryptionKey, patch)
err := r.Status().Patch(ctx, encryptionKey, patch)
if err != nil {
return err
}
Expand All @@ -296,6 +303,24 @@ func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AW
}
}

func (r *AWSEncryptionKeyReconciler) handleExternalDelete(ctx context.Context, key *v1beta1.AWSEncryptionKey) error {
l := log.FromContext(ctx)

patch := key.NewPatch()
key.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, key, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSEncryptionKeyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,19 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context
"AWS endpoint service principal resource has been created",
)

err = r.startWatchStatusJob(ctx, principal)
if err != nil {
l.Error(err, "failed to start status checker job")
r.EventRecorder.Eventf(principal, models.Warning, models.CreationFailed,
"Failed to start status checker job. Reason: %w", err,
)

return err
}
r.EventRecorder.Eventf(principal, models.Normal, models.Created,
"Status check job %s has been started", principal.GetJobID(scheduler.StatusChecker),
)

return nil
}

Expand Down Expand Up @@ -175,6 +188,44 @@ func (r *AWSEndpointServicePrincipalReconciler) handleDelete(ctx context.Context
return nil
}

func (r *AWSEndpointServicePrincipalReconciler) startWatchStatusJob(ctx context.Context, resource *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error {
job := r.newWatchStatusJob(ctx, resource)
return r.Scheduler.ScheduleJob(resource.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job)
}

func (r *AWSEndpointServicePrincipalReconciler) newWatchStatusJob(ctx context.Context, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) scheduler.Job {
return func() error {
exists, err := r.API.CheckIfAWSEndpointServicePrincipalExists(principal.Status.ID)
if err != nil {
return err
}

if !exists {
return r.handleExternalDelete(ctx, principal)
}

return nil
}
}

func (r *AWSEndpointServicePrincipalReconciler) handleExternalDelete(ctx context.Context, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error {
l := log.FromContext(ctx)

patch := principal.NewPatch()
principal.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, principal, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(principal, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(principal.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSEndpointServicePrincipalReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule(
)

firewallRule.Status.FirewallRuleStatus = *firewallRuleStatus
firewallRule.Status.State = models.CreatedStatus
err = r.Status().Patch(ctx, firewallRule, patch)
if err != nil {
l.Error(err, "Cannot patch AWS security group firewall rule status ", "ID", firewallRule.Status.ID)
Expand Down Expand Up @@ -291,8 +292,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule
instaFirewallRuleStatus, err := r.API.GetFirewallRuleStatus(firewallRule.Status.ID, instaclustr.AWSSecurityGroupFirewallRuleEndpoint)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...")
return r.Delete(ctx, firewallRule)
return r.handleExternalDelete(ctx, firewallRule)
}

l.Error(err, "Cannot get AWS security group firewall rule status from Inst API", "firewall rule ID", firewallRule.Status.ID)
Expand All @@ -309,16 +309,30 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule
if err != nil {
return err
}

if instaFirewallRuleStatus.Status == statusDELETED {
r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker))
}
}

return nil
}
}

func (r *AWSSecurityGroupFirewallRuleReconciler) handleExternalDelete(ctx context.Context, rule *v1beta1.AWSSecurityGroupFirewallRule) error {
l := log.FromContext(ctx)

patch := rule.NewPatch()
rule.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, rule, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(rule, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(rule.GetJobID(scheduler.StatusChecker))

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSSecurityGroupFirewallRuleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
40 changes: 35 additions & 5 deletions controllers/clusterresources/awsvpcpeering_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/utils/strings/slices"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -49,6 +48,8 @@ type AWSVPCPeeringReconciler struct {
EventRecorder record.EventRecorder
}

const awsVPCPeeringStatusCodeDeleted = "deleted"

//+kubebuilder:rbac:groups=clusterresources.instaclustr.com,resources=awsvpcpeerings,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=clusterresources.instaclustr.com,resources=awsvpcpeerings/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=clusterresources.instaclustr.com,resources=awsvpcpeerings/finalizers,verbs=update
Expand Down Expand Up @@ -126,6 +127,7 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering(

patch := aws.NewPatch()
aws.Status.PeeringStatus = *awsStatus
aws.Status.State = models.CreatedStatus
err = r.Status().Patch(ctx, aws, patch)
if err != nil {
l.Error(err, "cannot patch AWS VPC Peering resource status",
Expand Down Expand Up @@ -208,7 +210,7 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering(
}

if aws.Annotations[models.ExternalChangesAnnotation] == models.True {
if !slices.Equal(instaAWSPeering.PeerSubnets, aws.Spec.PeerSubnets) {
if !subnetsEqual(instaAWSPeering.PeerSubnets, aws.Spec.PeerSubnets) {
l.Info("The resource specification still differs from the Instaclustr resource specification, please reconcile it manually",
"AWS VPC ID", aws.Status.ID,
"k8s peerSubnets", aws.Spec.PeerSubnets,
Expand Down Expand Up @@ -404,8 +406,7 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe
instaAWSPeering, err := r.API.GetAWSVPCPeering(awsPeering.Status.ID)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...")
return r.Delete(ctx, awsPeering)
return r.handleExternalDelete(ctx, awsPeering)
}

l.Error(err, "cannot get AWS VPC Peering Status from Inst API",
Expand All @@ -432,9 +433,20 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe
}
}

if awsPeering.Status.StatusCode == awsVPCPeeringStatusCodeDeleted {
l.Info("The AWSPeering was deleted on AWS, stopping job...")
r.EventRecorder.Event(awsPeering, models.Warning, models.DeletedEvent,
"The AWSPeering was deleted on AWS, stopping job...",
)

r.Scheduler.RemoveJob(awsPeering.GetJobID(scheduler.StatusChecker))

return nil
}

if awsPeering.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent &&
awsPeering.Annotations[models.ExternalChangesAnnotation] != models.True &&
!slices.Equal(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) {
!subnetsEqual(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) {
l.Info("The k8s resource specification doesn't match the specification of Instaclustr, please change it manually",
"k8s peerSubnets", instaAWSPeering.PeerSubnets,
"instaclutr peerSubnets", awsPeering.Spec.PeerSubnets,
Expand Down Expand Up @@ -498,3 +510,21 @@ func (r *AWSVPCPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error {
},
})).Complete(r)
}

func (r *AWSVPCPeeringReconciler) handleExternalDelete(ctx context.Context, key *v1beta1.AWSVPCPeering) error {
l := log.FromContext(ctx)

patch := key.NewPatch()
key.Status.State = models.DeletedStatus
err := r.Status().Patch(ctx, key, patch)
if err != nil {
return err
}

l.Info(instaclustr.MsgInstaclustrResourceNotFound)
r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound)

r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker))

return nil
}
Loading

0 comments on commit 659e937

Please sign in to comment.