diff --git a/apis/clusterresources/v1beta1/awsencryptionkey_types.go b/apis/clusterresources/v1beta1/awsencryptionkey_types.go index ec11cb02e..428aea855 100644 --- a/apis/clusterresources/v1beta1/awsencryptionkey_types.go +++ b/apis/clusterresources/v1beta1/awsencryptionkey_types.go @@ -34,6 +34,7 @@ type AWSEncryptionKeySpec struct { type AWSEncryptionKeyStatus struct { ID string `json:"id,omitempty"` InUse bool `json:"inUse,omitempty"` + State string `json:"state,omitempty"` } //+kubebuilder:object:root=true diff --git a/apis/clusterresources/v1beta1/awsendpointserviceprincipal_types.go b/apis/clusterresources/v1beta1/awsendpointserviceprincipal_types.go index 7f1f92309..4d08db9b3 100644 --- a/apis/clusterresources/v1beta1/awsendpointserviceprincipal_types.go +++ b/apis/clusterresources/v1beta1/awsendpointserviceprincipal_types.go @@ -40,6 +40,9 @@ type AWSEndpointServicePrincipalStatus struct { // The Instaclustr ID of the AWS endpoint service EndPointServiceID string `json:"endPointServiceId,omitempty"` + + // State describe current state of the resource + State string `json:"state,omitempty"` } //+kubebuilder:object:root=true @@ -58,6 +61,10 @@ func (r *AWSEndpointServicePrincipal) NewPatch() client.Patch { return client.MergeFrom(r.DeepCopy()) } +func (r *AWSEndpointServicePrincipal) GetJobID(job string) string { + return r.Namespace + "/" + r.Name + "/" + job +} + //+kubebuilder:object:root=true // AWSEndpointServicePrincipalList contains a list of AWSEndpointServicePrincipal diff --git a/apis/clusterresources/v1beta1/awssecuritygroupfirewallrule_types.go b/apis/clusterresources/v1beta1/awssecuritygroupfirewallrule_types.go index 063af3fee..1362746e5 100644 --- a/apis/clusterresources/v1beta1/awssecuritygroupfirewallrule_types.go +++ b/apis/clusterresources/v1beta1/awssecuritygroupfirewallrule_types.go @@ -32,6 +32,7 @@ type AWSSecurityGroupFirewallRuleSpec struct { // AWSSecurityGroupFirewallRuleStatus defines the observed state of AWSSecurityGroupFirewallRule type AWSSecurityGroupFirewallRuleStatus struct { FirewallRuleStatus `json:",inline"` + State string `json:"state,omitempty"` } //+kubebuilder:object:root=true diff --git a/apis/clusterresources/v1beta1/awsvpcpeering_types.go b/apis/clusterresources/v1beta1/awsvpcpeering_types.go index bc9083fb9..76e24d216 100644 --- a/apis/clusterresources/v1beta1/awsvpcpeering_types.go +++ b/apis/clusterresources/v1beta1/awsvpcpeering_types.go @@ -38,6 +38,7 @@ type AWSVPCPeeringSpec struct { // AWSVPCPeeringStatus defines the observed state of AWSVPCPeering type AWSVPCPeeringStatus struct { PeeringStatus `json:",inline"` + State string `json:"state,omitempty"` } //+kubebuilder:object:root=true diff --git a/apis/clusterresources/v1beta1/azurevnetpeering_types.go b/apis/clusterresources/v1beta1/azurevnetpeering_types.go index 4de680fb0..8ab24acdb 100644 --- a/apis/clusterresources/v1beta1/azurevnetpeering_types.go +++ b/apis/clusterresources/v1beta1/azurevnetpeering_types.go @@ -38,6 +38,7 @@ type AzureVNetPeeringSpec struct { // AzureVNetPeeringStatus defines the observed state of AzureVNetPeering type AzureVNetPeeringStatus struct { PeeringStatus `json:",inline"` + State string `json:"state,omitempty"` } //+kubebuilder:object:root=true diff --git a/apis/clusterresources/v1beta1/gcpvpcpeering_types.go b/apis/clusterresources/v1beta1/gcpvpcpeering_types.go index 64e41d2b9..cbb634113 100644 --- a/apis/clusterresources/v1beta1/gcpvpcpeering_types.go +++ b/apis/clusterresources/v1beta1/gcpvpcpeering_types.go @@ -36,6 +36,7 @@ type GCPVPCPeeringSpec struct { // GCPVPCPeeringStatus defines the observed state of GCPVPCPeering type GCPVPCPeeringStatus struct { PeeringStatus `json:",inline"` + State string `json:"state,omitempty"` } //+kubebuilder:object:root=true diff --git a/config/crd/bases/clusterresources.instaclustr.com_awsencryptionkeys.yaml b/config/crd/bases/clusterresources.instaclustr.com_awsencryptionkeys.yaml index 781499026..5a578a20d 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_awsencryptionkeys.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_awsencryptionkeys.yaml @@ -52,6 +52,8 @@ spec: type: string inUse: type: boolean + state: + type: string type: object type: object served: true diff --git a/config/crd/bases/clusterresources.instaclustr.com_awsendpointserviceprincipals.yaml b/config/crd/bases/clusterresources.instaclustr.com_awsendpointserviceprincipals.yaml index dc93be7ee..7c7ea2823 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_awsendpointserviceprincipals.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_awsendpointserviceprincipals.yaml @@ -60,6 +60,9 @@ spec: id: description: The Instaclustr ID of the IAM Principal ARN type: string + state: + description: State describe current state of the resource + type: string type: object type: object served: true diff --git a/config/crd/bases/clusterresources.instaclustr.com_awssecuritygroupfirewallrules.yaml b/config/crd/bases/clusterresources.instaclustr.com_awssecuritygroupfirewallrules.yaml index 66567ac8c..1fb2aa453 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_awssecuritygroupfirewallrules.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_awssecuritygroupfirewallrules.yaml @@ -56,6 +56,8 @@ spec: type: string id: type: string + state: + type: string status: type: string type: object diff --git a/config/crd/bases/clusterresources.instaclustr.com_awsvpcpeerings.yaml b/config/crd/bases/clusterresources.instaclustr.com_awsvpcpeerings.yaml index 4bde626a6..fa041902d 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_awsvpcpeerings.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_awsvpcpeerings.yaml @@ -62,6 +62,8 @@ spec: type: string name: type: string + state: + type: string statusCode: type: string type: object diff --git a/config/crd/bases/clusterresources.instaclustr.com_azurevnetpeerings.yaml b/config/crd/bases/clusterresources.instaclustr.com_azurevnetpeerings.yaml index b29ca2c93..22c524207 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_azurevnetpeerings.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_azurevnetpeerings.yaml @@ -65,6 +65,8 @@ spec: type: string name: type: string + state: + type: string statusCode: type: string type: object diff --git a/config/crd/bases/clusterresources.instaclustr.com_gcpvpcpeerings.yaml b/config/crd/bases/clusterresources.instaclustr.com_gcpvpcpeerings.yaml index 1fbad4150..7dbd3b11e 100644 --- a/config/crd/bases/clusterresources.instaclustr.com_gcpvpcpeerings.yaml +++ b/config/crd/bases/clusterresources.instaclustr.com_gcpvpcpeerings.yaml @@ -60,6 +60,8 @@ spec: type: string name: type: string + state: + type: string statusCode: type: string type: object diff --git a/controllers/clusterresources/awsencryptionkey_controller.go b/controllers/clusterresources/awsencryptionkey_controller.go index 73d6d54a0..fc456f4a9 100644 --- a/controllers/clusterresources/awsencryptionkey_controller.go +++ b/controllers/clusterresources/awsencryptionkey_controller.go @@ -129,6 +129,7 @@ func (r *AWSEncryptionKeyReconciler) handleCreate( ) encryptionKey.Status = *encryptionKeyStatus + encryptionKey.Status.State = models.CreatedStatus err = r.Status().Patch(ctx, encryptionKey, patch) if err != nil { l.Error(err, "Cannot patch AWS encryption key status ", "ID", encryptionKey.Status.ID) @@ -274,8 +275,14 @@ func (r *AWSEncryptionKeyReconciler) startEncryptionKeyStatusJob(encryptionKey * func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AWSEncryptionKey) scheduler.Job { l := log.Log.WithValues("component", "EncryptionKeyStatusJob") return func() error { + ctx := context.Background() + instaEncryptionKeyStatus, err := r.API.GetEncryptionKeyStatus(encryptionKey.Status.ID, instaclustr.AWSEncryptionKeyEndpoint) if err != nil { + if errors.Is(err, instaclustr.NotFound) { + return r.handleExternalDelete(ctx, encryptionKey) + } + l.Error(err, "Cannot get AWS encryption key status from Inst API", "encryption key ID", encryptionKey.Status.ID) return err } @@ -286,7 +293,7 @@ func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AW "encryption key status", encryptionKey.Status) patch := encryptionKey.NewPatch() encryptionKey.Status = *instaEncryptionKeyStatus - err := r.Status().Patch(context.Background(), encryptionKey, patch) + err := r.Status().Patch(ctx, encryptionKey, patch) if err != nil { return err } @@ -296,6 +303,24 @@ func (r *AWSEncryptionKeyReconciler) newWatchStatusJob(encryptionKey *v1beta1.AW } } +func (r *AWSEncryptionKeyReconciler) handleExternalDelete(ctx context.Context, key *v1beta1.AWSEncryptionKey) error { + l := log.FromContext(ctx) + + patch := key.NewPatch() + key.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, key, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker)) + + return nil +} + // SetupWithManager sets up the controller with the Manager. func (r *AWSEncryptionKeyReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/controllers/clusterresources/awsendpointserviceprincipal_controller.go b/controllers/clusterresources/awsendpointserviceprincipal_controller.go index d7659ce11..c24d2c1ef 100644 --- a/controllers/clusterresources/awsendpointserviceprincipal_controller.go +++ b/controllers/clusterresources/awsendpointserviceprincipal_controller.go @@ -144,6 +144,19 @@ func (r *AWSEndpointServicePrincipalReconciler) handleCreate(ctx context.Context "AWS endpoint service principal resource has been created", ) + err = r.startWatchStatusJob(ctx, principal) + if err != nil { + l.Error(err, "failed to start status checker job") + r.EventRecorder.Eventf(principal, models.Warning, models.CreationFailed, + "Failed to start status checker job. Reason: %w", err, + ) + + return err + } + r.EventRecorder.Eventf(principal, models.Normal, models.Created, + "Status check job %s has been started", principal.GetJobID(scheduler.StatusChecker), + ) + return nil } @@ -175,6 +188,44 @@ func (r *AWSEndpointServicePrincipalReconciler) handleDelete(ctx context.Context return nil } +func (r *AWSEndpointServicePrincipalReconciler) startWatchStatusJob(ctx context.Context, resource *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error { + job := r.newWatchStatusJob(ctx, resource) + return r.Scheduler.ScheduleJob(resource.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) +} + +func (r *AWSEndpointServicePrincipalReconciler) newWatchStatusJob(ctx context.Context, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) scheduler.Job { + return func() error { + exists, err := r.API.CheckIfAWSEndpointServicePrincipalExists(principal.Status.ID) + if err != nil { + return err + } + + if !exists { + return r.handleExternalDelete(ctx, principal) + } + + return nil + } +} + +func (r *AWSEndpointServicePrincipalReconciler) handleExternalDelete(ctx context.Context, principal *clusterresourcesv1beta1.AWSEndpointServicePrincipal) error { + l := log.FromContext(ctx) + + patch := principal.NewPatch() + principal.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, principal, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(principal, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(principal.GetJobID(scheduler.StatusChecker)) + + return nil +} + // SetupWithManager sets up the controller with the Manager. func (r *AWSEndpointServicePrincipalReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go b/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go index 1c1f454d0..8adfaec1e 100644 --- a/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go +++ b/controllers/clusterresources/awssecuritygroupfirewallrule_controller.go @@ -127,6 +127,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) handleCreateFirewallRule( ) firewallRule.Status.FirewallRuleStatus = *firewallRuleStatus + firewallRule.Status.State = models.CreatedStatus err = r.Status().Patch(ctx, firewallRule, patch) if err != nil { l.Error(err, "Cannot patch AWS security group firewall rule status ", "ID", firewallRule.Status.ID) @@ -291,8 +292,7 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule instaFirewallRuleStatus, err := r.API.GetFirewallRuleStatus(firewallRule.Status.ID, instaclustr.AWSSecurityGroupFirewallRuleEndpoint) if err != nil { if errors.Is(err, instaclustr.NotFound) { - l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...") - return r.Delete(ctx, firewallRule) + return r.handleExternalDelete(ctx, firewallRule) } l.Error(err, "Cannot get AWS security group firewall rule status from Inst API", "firewall rule ID", firewallRule.Status.ID) @@ -309,16 +309,30 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule if err != nil { return err } - - if instaFirewallRuleStatus.Status == statusDELETED { - r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker)) - } } return nil } } +func (r *AWSSecurityGroupFirewallRuleReconciler) handleExternalDelete(ctx context.Context, rule *v1beta1.AWSSecurityGroupFirewallRule) error { + l := log.FromContext(ctx) + + patch := rule.NewPatch() + rule.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, rule, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(rule, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(rule.GetJobID(scheduler.StatusChecker)) + + return nil +} + // SetupWithManager sets up the controller with the Manager. func (r *AWSSecurityGroupFirewallRuleReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/controllers/clusterresources/awsvpcpeering_controller.go b/controllers/clusterresources/awsvpcpeering_controller.go index a3cef3aaf..ec9def98b 100644 --- a/controllers/clusterresources/awsvpcpeering_controller.go +++ b/controllers/clusterresources/awsvpcpeering_controller.go @@ -24,7 +24,6 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" - "k8s.io/utils/strings/slices" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,6 +48,8 @@ type AWSVPCPeeringReconciler struct { EventRecorder record.EventRecorder } +const awsVPCPeeringStatusCodeDeleted = "deleted" + //+kubebuilder:rbac:groups=clusterresources.instaclustr.com,resources=awsvpcpeerings,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=clusterresources.instaclustr.com,resources=awsvpcpeerings/status,verbs=get;update;patch //+kubebuilder:rbac:groups=clusterresources.instaclustr.com,resources=awsvpcpeerings/finalizers,verbs=update @@ -126,6 +127,7 @@ func (r *AWSVPCPeeringReconciler) handleCreatePeering( patch := aws.NewPatch() aws.Status.PeeringStatus = *awsStatus + aws.Status.State = models.CreatedStatus err = r.Status().Patch(ctx, aws, patch) if err != nil { l.Error(err, "cannot patch AWS VPC Peering resource status", @@ -208,7 +210,7 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering( } if aws.Annotations[models.ExternalChangesAnnotation] == models.True { - if !slices.Equal(instaAWSPeering.PeerSubnets, aws.Spec.PeerSubnets) { + if !subnetsEqual(instaAWSPeering.PeerSubnets, aws.Spec.PeerSubnets) { l.Info("The resource specification still differs from the Instaclustr resource specification, please reconcile it manually", "AWS VPC ID", aws.Status.ID, "k8s peerSubnets", aws.Spec.PeerSubnets, @@ -404,8 +406,7 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe instaAWSPeering, err := r.API.GetAWSVPCPeering(awsPeering.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { - l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...") - return r.Delete(ctx, awsPeering) + return r.handleExternalDelete(ctx, awsPeering) } l.Error(err, "cannot get AWS VPC Peering Status from Inst API", @@ -432,9 +433,20 @@ func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPe } } + if awsPeering.Status.StatusCode == awsVPCPeeringStatusCodeDeleted { + l.Info("The AWSPeering was deleted on AWS, stopping job...") + r.EventRecorder.Event(awsPeering, models.Warning, models.DeletedEvent, + "The AWSPeering was deleted on AWS, stopping job...", + ) + + r.Scheduler.RemoveJob(awsPeering.GetJobID(scheduler.StatusChecker)) + + return nil + } + if awsPeering.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent && awsPeering.Annotations[models.ExternalChangesAnnotation] != models.True && - !slices.Equal(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) { + !subnetsEqual(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) { l.Info("The k8s resource specification doesn't match the specification of Instaclustr, please change it manually", "k8s peerSubnets", instaAWSPeering.PeerSubnets, "instaclutr peerSubnets", awsPeering.Spec.PeerSubnets, @@ -498,3 +510,21 @@ func (r *AWSVPCPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error { }, })).Complete(r) } + +func (r *AWSVPCPeeringReconciler) handleExternalDelete(ctx context.Context, key *v1beta1.AWSVPCPeering) error { + l := log.FromContext(ctx) + + patch := key.NewPatch() + key.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, key, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker)) + + return nil +} diff --git a/controllers/clusterresources/azurevnetpeering_controller.go b/controllers/clusterresources/azurevnetpeering_controller.go index a8415eb6e..2fd4f9f79 100644 --- a/controllers/clusterresources/azurevnetpeering_controller.go +++ b/controllers/clusterresources/azurevnetpeering_controller.go @@ -129,6 +129,7 @@ func (r *AzureVNetPeeringReconciler) handleCreatePeering( patch := azure.NewPatch() azure.Status.PeeringStatus = *azureStatus + azure.Status.State = models.CreatedStatus err = r.Status().Patch(ctx, azure, patch) if err != nil { l.Error(err, "cannot patch Azure VNet Peering resource status", @@ -304,8 +305,14 @@ func (r *AzureVNetPeeringReconciler) newWatchStatusJob(azureVNetPeering *v1beta1 ) scheduler.Job { l := log.Log.WithValues("component", "AzureVNetPeeringStatusJob") return func() error { + ctx := context.Background() + instaPeeringStatus, err := r.API.GetPeeringStatus(azureVNetPeering.Status.ID, instaclustr.AzurePeeringEndpoint) if err != nil { + if errors.Is(err, instaclustr.NotFound) { + return r.handleExternalDelete(ctx, azureVNetPeering) + } + l.Error(err, "cannot get Azure VNet Peering Status from Inst API", "Azure VNet Peering ID", azureVNetPeering.Status.ID) return err } @@ -317,7 +324,7 @@ func (r *AzureVNetPeeringReconciler) newWatchStatusJob(azureVNetPeering *v1beta1 patch := azureVNetPeering.NewPatch() azureVNetPeering.Status.PeeringStatus = *instaPeeringStatus - err := r.Status().Patch(context.Background(), azureVNetPeering, patch) + err := r.Status().Patch(ctx, azureVNetPeering, patch) if err != nil { return err } @@ -327,6 +334,24 @@ func (r *AzureVNetPeeringReconciler) newWatchStatusJob(azureVNetPeering *v1beta1 } } +func (r *AzureVNetPeeringReconciler) handleExternalDelete(ctx context.Context, key *v1beta1.AzureVNetPeering) error { + l := log.FromContext(ctx) + + patch := key.NewPatch() + key.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, key, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker)) + + return nil +} + // SetupWithManager sets up the controller with the Manager. func (r *AzureVNetPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/controllers/clusterresources/gcpvpcpeering_controller.go b/controllers/clusterresources/gcpvpcpeering_controller.go index e11f67306..e86a03d4b 100644 --- a/controllers/clusterresources/gcpvpcpeering_controller.go +++ b/controllers/clusterresources/gcpvpcpeering_controller.go @@ -125,6 +125,7 @@ func (r *GCPVPCPeeringReconciler) handleCreateCluster( patch := gcp.NewPatch() gcp.Status.PeeringStatus = *gcpStatus + gcp.Status.State = models.CreatedStatus err = r.Status().Patch(ctx, gcp, patch) if err != nil { l.Error(err, "Cannot patch GCP VPC Peering resource status", @@ -287,8 +288,14 @@ func (r *GCPVPCPeeringReconciler) startGCPVPCPeeringStatusJob(gcpPeering *v1beta func (r *GCPVPCPeeringReconciler) newWatchStatusJob(gcpPeering *v1beta1.GCPVPCPeering) scheduler.Job { l := log.Log.WithValues("component", "GCPVPCPeeringStatusJob") return func() error { + ctx := context.Background() + instaPeeringStatus, err := r.API.GetPeeringStatus(gcpPeering.Status.ID, instaclustr.GCPPeeringEndpoint) if err != nil { + if errors.Is(err, instaclustr.NotFound) { + return r.handleExternalDelete(ctx, gcpPeering) + } + l.Error(err, "Cannot get GCP VPC Peering Status from Inst API", "id", gcpPeering.Status.ID) return err } @@ -300,7 +307,7 @@ func (r *GCPVPCPeeringReconciler) newWatchStatusJob(gcpPeering *v1beta1.GCPVPCPe patch := gcpPeering.NewPatch() gcpPeering.Status.PeeringStatus = *instaPeeringStatus - err := r.Status().Patch(context.Background(), gcpPeering, patch) + err := r.Status().Patch(ctx, gcpPeering, patch) if err != nil { return err } @@ -310,6 +317,24 @@ func (r *GCPVPCPeeringReconciler) newWatchStatusJob(gcpPeering *v1beta1.GCPVPCPe } } +func (r *GCPVPCPeeringReconciler) handleExternalDelete(ctx context.Context, key *v1beta1.GCPVPCPeering) error { + l := log.FromContext(ctx) + + patch := key.NewPatch() + key.Status.State = models.DeletedStatus + err := r.Status().Patch(ctx, key, patch) + if err != nil { + return err + } + + l.Info(instaclustr.MsgInstaclustrResourceNotFound) + r.EventRecorder.Eventf(key, models.Warning, models.ExternalDeleted, instaclustr.MsgInstaclustrResourceNotFound) + + r.Scheduler.RemoveJob(key.GetJobID(scheduler.StatusChecker)) + + return nil +} + // SetupWithManager sets up the controller with the Manager. func (r *GCPVPCPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/controllers/clusterresources/helpers.go b/controllers/clusterresources/helpers.go index 1735d8c59..799ae9bae 100644 --- a/controllers/clusterresources/helpers.go +++ b/controllers/clusterresources/helpers.go @@ -98,3 +98,24 @@ func getUserCreds(secret *k8sCore.Secret) (username, password string, err error) return username, password, nil } + +func subnetsEqual(subnets1, subnets2 []string) bool { + if len(subnets1) != len(subnets2) { + return false + } + + for _, s1 := range subnets1 { + var equal bool + for _, s2 := range subnets2 { + if s1 == s2 { + equal = true + } + } + + if !equal { + return false + } + } + + return true +} diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index bfa9ac122..e1d0db40b 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -2215,6 +2215,31 @@ func (c *Client) UpdateClusterSettings(clusterID string, settings *models.Cluste return nil } +func (c *Client) CheckIfAWSEndpointServicePrincipalExists(id string) (bool, error) { + url := c.serverHostname + AWSEndpointServicePrincipalEndpoint + "/" + id + + resp, err := c.DoRequest(url, http.MethodGet, nil) + if err != nil { + return false, err + } + + defer resp.Body.Close() + b, err := io.ReadAll(resp.Body) + if err != nil { + return false, err + } + + if resp.StatusCode == http.StatusNotFound { + return false, nil + } + + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, b) + } + + return true, nil +} + func (c *Client) CreateAWSEndpointServicePrincipal(spec any) ([]byte, error) { url := c.serverHostname + AWSEndpointServicePrincipalEndpoint diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index 8cf37aa3c..e160c85f6 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -101,6 +101,7 @@ type API interface { ListAppVersions(app string) ([]*models.AppVersions, error) GetDefaultCredentialsV1(clusterID string) (string, string, error) UpdateClusterSettings(clusterID string, settings *models.ClusterSettings) error + CheckIfAWSEndpointServicePrincipalExists(id string) (bool, error) CreateAWSEndpointServicePrincipal(spec any) ([]byte, error) DeleteAWSEndpointServicePrincipal(principalID string) error GetResizeOperationsByClusterDataCentreID(cdcID string) ([]*v1beta1.ResizeOperation, error) diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index 92da8f4c6..7d0e81d90 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -375,3 +375,7 @@ func (c *mockClient) GetResizeOperationsByClusterDataCentreID(cdcID string) ([]* func (c *mockClient) GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error) { panic("GetAWSVPCPeering: is not implemented") } + +func (c *mockClient) CheckIfAWSEndpointServicePrincipalExists(id string) (bool, error) { + panic("CheckIfAWSEndpointServicePrincipalExists: is not implemented") +} diff --git a/pkg/models/apiv2.go b/pkg/models/apiv2.go index a2e11dfa1..5fdd3028f 100644 --- a/pkg/models/apiv2.go +++ b/pkg/models/apiv2.go @@ -20,6 +20,7 @@ const ( NoOperation = "NO_OPERATION" OperationInProgress = "OPERATION_IN_PROGRESS" + CreatedStatus = "CREATED" DeletedStatus = "DELETED" DefaultAccountName = "INSTACLUSTR"