From d287e11f50141a6cbada07e0e3ac00509472082f Mon Sep 17 00:00:00 2001 From: Kubermatic Bot <41968677+kubermatic-bot@users.noreply.github.com> Date: Fri, 18 Mar 2022 09:16:35 +0100 Subject: [PATCH] [release/v1.42] Waiting for volumeAttachments deletion (#1212) * Waiting for volumeAttachments deletion Signed-off-by: Mattia Lavacca * volumeAttachments check only for vSphere Signed-off-by: Mattia Lavacca * ClusterRole updated Signed-off-by: Mattia Lavacca * yaml linter fixed Signed-off-by: Mattia Lavacca * VolumeAttachments correctly handled Signed-off-by: Mattia Lavacca * Code factorized Signed-off-by: Mattia Lavacca * renaming Signed-off-by: Mattia Lavacca * fix yamllint Signed-off-by: Mattia Lavacca * Logic applied only to vSphere Signed-off-by: Mattia Lavacca Co-authored-by: mlavacca --- examples/machine-controller.yaml | 9 + pkg/controller/machine/machine_controller.go | 141 ++++++++---- pkg/controller/machine/machine_test.go | 53 +++-- pkg/node/eviction/eviction.go | 70 ++---- pkg/node/nodemanager/node_manager.go | 94 ++++++++ pkg/node/poddeletion/pod_deletion.go | 216 +++++++++++++++++++ 6 files changed, 458 insertions(+), 125 deletions(-) create mode 100644 pkg/node/nodemanager/node_manager.go create mode 100644 pkg/node/poddeletion/pod_deletion.go diff --git a/examples/machine-controller.yaml b/examples/machine-controller.yaml index 084d92d20..d3ee84c21 100644 --- a/examples/machine-controller.yaml +++ b/examples/machine-controller.yaml @@ -536,6 +536,15 @@ rules: - "list" - "get" - "watch" +# volumeAttachments permissions are needed by vsphere clusters +- apiGroups: + - "storage.k8s.io" + resources: + - "volumeattachments" + verbs: + - "list" + - "get" + - "watch" - apiGroups: - "" resources: diff --git a/pkg/controller/machine/machine_controller.go b/pkg/controller/machine/machine_controller.go index 7431d2634..d7977bf4d 100644 --- a/pkg/controller/machine/machine_controller.go +++ b/pkg/controller/machine/machine_controller.go @@ -39,6 +39,7 @@ import ( "github.com/kubermatic/machine-controller/pkg/containerruntime" kuberneteshelper "github.com/kubermatic/machine-controller/pkg/kubernetes" "github.com/kubermatic/machine-controller/pkg/node/eviction" + "github.com/kubermatic/machine-controller/pkg/node/poddeletion" "github.com/kubermatic/machine-controller/pkg/providerconfig" providerconfigtypes "github.com/kubermatic/machine-controller/pkg/providerconfig/types" "github.com/kubermatic/machine-controller/pkg/rhsm" @@ -408,7 +409,7 @@ func (r *Reconciler) reconcile(ctx context.Context, machine *clusterv1alpha1.Mac // step 2: check if a user requested to delete the machine if machine.DeletionTimestamp != nil { - return r.deleteMachine(ctx, prov, machine) + return r.deleteMachine(ctx, prov, providerConfig.CloudProvider, machine) } // Step 3: Essentially creates an instance for the given machine. @@ -462,6 +463,30 @@ func (r *Reconciler) ensureMachineHasNodeReadyCondition(machine *clusterv1alpha1 }) } +func (r *Reconciler) shouldCleanupVolumes(ctx context.Context, machine *clusterv1alpha1.Machine, providerName providerconfigtypes.CloudProvider) (bool, error) { + // we need to wait for volumeAttachments clean up only for vSphere + if providerName != providerconfigtypes.CloudProviderVsphere { + return false, nil + } + + // No node - No volumeAttachments to be collected + if machine.Status.NodeRef == nil { + klog.V(4).Infof("Skipping eviction for machine %q since it does not have a node", machine.Name) + return false, nil + } + + node := &corev1.Node{} + if err := r.client.Get(ctx, types.NamespacedName{Name: machine.Status.NodeRef.Name}, node); err != nil { + // Node does not exist - No volumeAttachments to be collected + if kerrors.IsNotFound(err) { + klog.V(4).Infof("Skipping eviction for machine %q since it does not have a node", machine.Name) + return false, nil + } + return false, fmt.Errorf("failed to get node %q", machine.Status.NodeRef.Name) + } + return true, nil +} + // evictIfNecessary checks if the machine has a node and evicts it if necessary func (r *Reconciler) shouldEvict(ctx context.Context, machine *clusterv1alpha1.Machine) (bool, error) { // If the deletion got triggered a few hours ago, skip eviction. @@ -521,22 +546,35 @@ func (r *Reconciler) shouldEvict(ctx context.Context, machine *clusterv1alpha1.M } // deleteMachine makes sure that an instance has gone in a series of steps. -func (r *Reconciler) deleteMachine(ctx context.Context, prov cloudprovidertypes.Provider, machine *clusterv1alpha1.Machine) (*reconcile.Result, error) { +func (r *Reconciler) deleteMachine(ctx context.Context, prov cloudprovidertypes.Provider, providerName providerconfigtypes.CloudProvider, machine *clusterv1alpha1.Machine) (*reconcile.Result, error) { shouldEvict, err := r.shouldEvict(ctx, machine) if err != nil { return nil, err } + shouldCleanUpVolumes, err := r.shouldCleanupVolumes(ctx, machine, providerName) + if err != nil { + return nil, err + } + var evictedSomething, deletedSomething bool + var volumesFree = true if shouldEvict { - evictedSomething, err := eviction.New(ctx, machine.Status.NodeRef.Name, r.client, r.kubeClient).Run() + evictedSomething, err = eviction.New(ctx, machine.Status.NodeRef.Name, r.client, r.kubeClient).Run() if err != nil { return nil, fmt.Errorf("failed to evict node %s: %v", machine.Status.NodeRef.Name, err) } - if evictedSomething { - return &reconcile.Result{RequeueAfter: 10 * time.Second}, nil + } + if shouldCleanUpVolumes { + deletedSomething, volumesFree, err = poddeletion.New(ctx, machine.Status.NodeRef.Name, r.client, r.kubeClient).Run() + if err != nil { + return nil, fmt.Errorf("failed to delete pods bound to volumes running on node %s: %v", machine.Status.NodeRef.Name, err) } } + if evictedSomething || deletedSomething || !volumesFree { + return &reconcile.Result{RequeueAfter: 10 * time.Second}, nil + } + if result, err := r.deleteCloudProviderInstance(prov, machine); result != nil || err != nil { return result, err } @@ -550,7 +588,52 @@ func (r *Reconciler) deleteMachine(ctx context.Context, prov cloudprovidertypes. return nil, nil } - return nil, r.deleteNodeForMachine(ctx, machine) + nodes, err := r.retrieveNodesRelatedToMachine(ctx, machine) + if err != nil { + return nil, err + } + + return nil, r.deleteNodeForMachine(ctx, nodes, machine) +} + +func (r *Reconciler) retrieveNodesRelatedToMachine(ctx context.Context, machine *clusterv1alpha1.Machine) ([]*corev1.Node, error) { + nodes := make([]*corev1.Node, 0) + + // If there's NodeRef on the Machine object, retrieve the node by using the + // value of the NodeRef. If there's no NodeRef, try to find the Node by + // listing nodes using the NodeOwner label selector. + if machine.Status.NodeRef != nil { + objKey := ctrlruntimeclient.ObjectKey{Name: machine.Status.NodeRef.Name} + node := &corev1.Node{} + if err := r.client.Get(ctx, objKey, node); err != nil { + if !kerrors.IsNotFound(err) { + return nil, fmt.Errorf("failed to get node %s: %v", machine.Status.NodeRef.Name, err) + } + klog.V(2).Infof("node %q does not longer exist for machine %q", machine.Status.NodeRef.Name, machine.Spec.Name) + } else { + nodes = append(nodes, node) + } + } else { + selector, err := labels.Parse(NodeOwnerLabelName + "=" + string(machine.UID)) + if err != nil { + return nil, fmt.Errorf("failed to parse label selector: %v", err) + } + listOpts := &ctrlruntimeclient.ListOptions{LabelSelector: selector} + nodeList := &corev1.NodeList{} + if err := r.client.List(ctx, nodeList, listOpts); err != nil { + return nil, fmt.Errorf("failed to list nodes: %v", err) + } + if len(nodeList.Items) == 0 { + // We just want log that we didn't found the node. + klog.V(3).Infof("No node found for the machine %s", machine.Spec.Name) + } + + for _, node := range nodeList.Items { + nodes = append(nodes, &node) + } + } + + return nodes, nil } func (r *Reconciler) deleteCloudProviderInstance(prov cloudprovidertypes.Provider, machine *clusterv1alpha1.Machine) (*reconcile.Result, error) { @@ -623,50 +706,14 @@ func (r *Reconciler) deleteCloudProviderInstance(prov cloudprovidertypes.Provide }) } -func (r *Reconciler) deleteNodeForMachine(ctx context.Context, machine *clusterv1alpha1.Machine) error { - // If there's NodeRef on the Machine object, remove the Node by using the - // value of the NodeRef. If there's no NodeRef, try to find the Node by - // listing nodes using the NodeOwner label selector. - if machine.Status.NodeRef != nil { - objKey := ctrlruntimeclient.ObjectKey{Name: machine.Status.NodeRef.Name} - node := &corev1.Node{} - nodeFound := true - if err := r.client.Get(ctx, objKey, node); err != nil { +func (r *Reconciler) deleteNodeForMachine(ctx context.Context, nodes []*corev1.Node, machine *clusterv1alpha1.Machine) error { + // iterates on all nodes and delete them. Finally, remove the finalizer on the machine + for _, node := range nodes { + if err := r.client.Delete(ctx, node); err != nil { if !kerrors.IsNotFound(err) { - return fmt.Errorf("failed to get node %s: %v", machine.Status.NodeRef.Name, err) - } - nodeFound = false - klog.V(2).Infof("node %q does not longer exist for machine %q", machine.Status.NodeRef.Name, machine.Spec.Name) - } - - if nodeFound { - if err := r.client.Delete(ctx, node); err != nil { - if !kerrors.IsNotFound(err) { - return err - } - klog.V(2).Infof("node %q does not longer exist for machine %q", machine.Status.NodeRef.Name, machine.Spec.Name) - } - } - } else { - selector, err := labels.Parse(NodeOwnerLabelName + "=" + string(machine.UID)) - if err != nil { - return fmt.Errorf("failed to parse label selector: %v", err) - } - listOpts := &ctrlruntimeclient.ListOptions{LabelSelector: selector} - nodes := &corev1.NodeList{} - if err := r.client.List(ctx, nodes, listOpts); err != nil { - return fmt.Errorf("failed to list nodes: %v", err) - } - if len(nodes.Items) == 0 { - // We just want log that we didn't found the node. We don't want to - // return here, as we want to remove finalizers at the end. - klog.V(3).Infof("No node found for the machine %s", machine.Spec.Name) - } - - for _, node := range nodes.Items { - if err := r.client.Delete(ctx, &node); err != nil { return err } + klog.V(2).Infof("node %q does not longer exist for machine %q", machine.Status.NodeRef.Name, machine.Spec.Name) } } diff --git a/pkg/controller/machine/machine_test.go b/pkg/controller/machine/machine_test.go index 7f4a65d91..a9322ca32 100644 --- a/pkg/controller/machine/machine_test.go +++ b/pkg/controller/machine/machine_test.go @@ -474,7 +474,7 @@ func TestControllerDeleteNodeForMachine(t *testing.T) { tests := []struct { name string machine *clusterv1alpha1.Machine - nodes []runtime.Object + nodes []*corev1.Node err error shouldDeleteNode string }{ @@ -489,13 +489,17 @@ func TestControllerDeleteNodeForMachine(t *testing.T) { NodeRef: &corev1.ObjectReference{Name: "node-1"}, }, }, - nodes: []runtime.Object{&corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node-0", - }}, &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node-1", - }}, + nodes: []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + }, }, err: nil, shouldDeleteNode: "node-1", @@ -510,8 +514,8 @@ func TestControllerDeleteNodeForMachine(t *testing.T) { }, Status: clusterv1alpha1.MachineStatus{}, }, - nodes: []runtime.Object{ - &corev1.Node{ + nodes: []*corev1.Node{ + { ObjectMeta: metav1.ObjectMeta{ Name: "node-0", Labels: map[string]string{ @@ -519,7 +523,7 @@ func TestControllerDeleteNodeForMachine(t *testing.T) { }, }, }, - &corev1.Node{ + { ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, @@ -538,13 +542,13 @@ func TestControllerDeleteNodeForMachine(t *testing.T) { }, Status: clusterv1alpha1.MachineStatus{}, }, - nodes: []runtime.Object{ - &corev1.Node{ + nodes: []*corev1.Node{ + { ObjectMeta: metav1.ObjectMeta{ Name: "node-0", }, }, - &corev1.Node{ + { ObjectMeta: metav1.ObjectMeta{ Name: "node-1", }, @@ -564,10 +568,12 @@ func TestControllerDeleteNodeForMachine(t *testing.T) { NodeRef: &corev1.ObjectReference{Name: "node-1"}, }, }, - nodes: []runtime.Object{&corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node-0", - }}, + nodes: []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + }, }, err: nil, shouldDeleteNode: "", @@ -579,7 +585,9 @@ func TestControllerDeleteNodeForMachine(t *testing.T) { ctx := context.Background() objects := []runtime.Object{test.machine} - objects = append(objects, test.nodes...) + for _, n := range test.nodes { + objects = append(objects, n) + } client := ctrlruntimefake.NewFakeClient(objects...) @@ -595,7 +603,12 @@ func TestControllerDeleteNodeForMachine(t *testing.T) { providerData: providerData, } - err := reconciler.deleteNodeForMachine(ctx, test.machine) + nodes, err := reconciler.retrieveNodesRelatedToMachine(ctx, test.machine) + if err != nil { + return + } + + err = reconciler.deleteNodeForMachine(ctx, nodes, test.machine) if diff := deep.Equal(err, test.err); diff != nil { t.Errorf("expected to get %v instead got: %v", test.err, err) } diff --git a/pkg/node/eviction/eviction.go b/pkg/node/eviction/eviction.go index 4962770ec..e6d1a2024 100644 --- a/pkg/node/eviction/eviction.go +++ b/pkg/node/eviction/eviction.go @@ -20,44 +20,41 @@ import ( "context" "fmt" "sync" - "time" evictiontypes "github.com/kubermatic/machine-controller/pkg/node/eviction/types" + "github.com/kubermatic/machine-controller/pkg/node/nodemanager" corev1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/util/retry" "k8s.io/klog" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" ) type NodeEviction struct { - ctx context.Context - nodeName string - client ctrlruntimeclient.Client - kubeClient kubernetes.Interface + nodeManager *nodemanager.NodeManager + ctx context.Context + nodeName string + kubeClient kubernetes.Interface } // New returns a new NodeEviction func New(ctx context.Context, nodeName string, client ctrlruntimeclient.Client, kubeClient kubernetes.Interface) *NodeEviction { return &NodeEviction{ - ctx: ctx, - nodeName: nodeName, - client: client, - kubeClient: kubeClient, + nodeManager: nodemanager.New(ctx, client, nodeName), + ctx: ctx, + nodeName: nodeName, + kubeClient: kubeClient, } } // Run executes the eviction func (ne *NodeEviction) Run() (bool, error) { - node := &corev1.Node{} - if err := ne.client.Get(ne.ctx, types.NamespacedName{Name: ne.nodeName}, node); err != nil { + node, err := ne.nodeManager.GetNode() + if err != nil { return false, fmt.Errorf("failed to get node from lister: %v", err) } if _, exists := node.Annotations[evictiontypes.SkipEvictionAnnotationKey]; exists { @@ -66,7 +63,7 @@ func (ne *NodeEviction) Run() (bool, error) { } klog.V(3).Infof("Starting to evict node %s", ne.nodeName) - if err := ne.cordonNode(node); err != nil { + if err := ne.nodeManager.CordonNode(node); err != nil { return false, fmt.Errorf("failed to cordon node %s: %v", ne.nodeName, err) } klog.V(6).Infof("Successfully cordoned node %s", ne.nodeName) @@ -90,34 +87,6 @@ func (ne *NodeEviction) Run() (bool, error) { return true, nil } -func (ne *NodeEviction) cordonNode(node *corev1.Node) error { - if !node.Spec.Unschedulable { - _, err := ne.updateNode(func(n *corev1.Node) { - n.Spec.Unschedulable = true - }) - if err != nil { - return err - } - } - - // Be paranoid and wait until the change got propagated to the lister - // This assumes that the delay between our lister and the APIserver - // is smaller or equal to the delay the schedulers lister has - If - // that is not the case, there is a small chance the scheduler schedules - // pods in between, those will then get deleted upon node deletion and - // not evicted - return wait.Poll(1*time.Second, 10*time.Second, func() (bool, error) { - node := &corev1.Node{} - if err := ne.client.Get(ne.ctx, types.NamespacedName{Name: ne.nodeName}, node); err != nil { - return false, err - } - if node.Spec.Unschedulable { - return true, nil - } - return false, nil - }) -} - func (ne *NodeEviction) getFilteredPods() ([]corev1.Pod, error) { // The lister-backed client from the mgr automatically creates a lister for all objects requested through it. // We explicitly do not want that for pods, hence we have to use the kubernetes core client @@ -202,18 +171,3 @@ func (ne *NodeEviction) evictPod(pod *corev1.Pod) error { } return ne.kubeClient.PolicyV1beta1().Evictions(eviction.Namespace).Evict(ne.ctx, eviction) } - -func (ne *NodeEviction) updateNode(modify func(*corev1.Node)) (*corev1.Node, error) { - node := &corev1.Node{} - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - if err := ne.client.Get(ne.ctx, types.NamespacedName{Name: ne.nodeName}, node); err != nil { - return err - } - // Apply modifications - modify(node) - // Update the node - return ne.client.Update(ne.ctx, node) - }) - - return node, err -} diff --git a/pkg/node/nodemanager/node_manager.go b/pkg/node/nodemanager/node_manager.go new file mode 100644 index 000000000..342a2bf57 --- /dev/null +++ b/pkg/node/nodemanager/node_manager.go @@ -0,0 +1,94 @@ +/* +Copyright 2019 The Machine Controller Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodemanager + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +type NodeManager struct { + ctx context.Context + client ctrlruntimeclient.Client + nodeName string +} + +func New(ctx context.Context, client ctrlruntimeclient.Client, nodeName string) *NodeManager { + return &NodeManager{ + ctx: ctx, + client: client, + nodeName: nodeName, + } +} + +func (nm *NodeManager) GetNode() (*corev1.Node, error) { + node := &corev1.Node{} + if err := nm.client.Get(nm.ctx, types.NamespacedName{Name: nm.nodeName}, node); err != nil { + return nil, fmt.Errorf("failed to get node from lister: %v", err) + } + return node, nil +} + +func (nm *NodeManager) CordonNode(node *corev1.Node) error { + if !node.Spec.Unschedulable { + _, err := nm.updateNode(func(n *corev1.Node) { + n.Spec.Unschedulable = true + }) + if err != nil { + return err + } + } + + // Be paranoid and wait until the change got propagated to the lister + // This assumes that the delay between our lister and the APIserver + // is smaller or equal to the delay the schedulers lister has - If + // that is not the case, there is a small chance the scheduler schedules + // pods in between, those will then get deleted upon node deletion and + // not evicted + return wait.Poll(1*time.Second, 10*time.Second, func() (bool, error) { + node := &corev1.Node{} + if err := nm.client.Get(nm.ctx, types.NamespacedName{Name: nm.nodeName}, node); err != nil { + return false, err + } + if node.Spec.Unschedulable { + return true, nil + } + return false, nil + }) +} + +func (nm *NodeManager) updateNode(modify func(*corev1.Node)) (*corev1.Node, error) { + node := &corev1.Node{} + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := nm.client.Get(nm.ctx, types.NamespacedName{Name: nm.nodeName}, node); err != nil { + return err + } + // Apply modifications + modify(node) + // Update the node + return nm.client.Update(nm.ctx, node) + }) + + return node, err +} diff --git a/pkg/node/poddeletion/pod_deletion.go b/pkg/node/poddeletion/pod_deletion.go new file mode 100644 index 000000000..1b9874aa8 --- /dev/null +++ b/pkg/node/poddeletion/pod_deletion.go @@ -0,0 +1,216 @@ +/* +Copyright 2019 The Machine Controller Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package poddeletion + +import ( + "context" + "fmt" + "sync" + + "github.com/kubermatic/machine-controller/pkg/node/nodemanager" + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + errorQueueLen = 100 +) + +type NodeVolumeAttachmentsCleanup struct { + nodeManager *nodemanager.NodeManager + ctx context.Context + nodeName string + kubeClient kubernetes.Interface +} + +// New returns a new NodeVolumeAttachmentsCleanup +func New(ctx context.Context, nodeName string, client ctrlruntimeclient.Client, kubeClient kubernetes.Interface) *NodeVolumeAttachmentsCleanup { + return &NodeVolumeAttachmentsCleanup{ + nodeManager: nodemanager.New(ctx, client, nodeName), + ctx: ctx, + nodeName: nodeName, + kubeClient: kubeClient, + } +} + +// Run executes the pod deletion +func (vc *NodeVolumeAttachmentsCleanup) Run() (bool, bool, error) { + node, err := vc.nodeManager.GetNode() + if err != nil { + return false, false, fmt.Errorf("failed to get node from lister: %v", err) + } + klog.V(3).Infof("Starting to cleanup node %s", vc.nodeName) + + // if there are no more volumeAttachments related to the node, then it can be deleted + volumeAttachmentsDeleted, err := vc.nodeCanBeDeleted() + if err != nil { + return false, false, fmt.Errorf("failed to check volumeAttachments deletion: %v", err) + } + if volumeAttachmentsDeleted { + return false, true, nil + } + + // cordon the node to be sure that the deleted pods are re-scheduled in the same node + if err := vc.nodeManager.CordonNode(node); err != nil { + return false, false, fmt.Errorf("failed to cordon node %s: %v", vc.nodeName, err) + } + klog.V(6).Infof("Successfully cordoned node %s", vc.nodeName) + + // get all the pods that needs to be deleted (i.e. those mounting volumes attached to the node that is going to be deleted) + podsToDelete, errors := vc.getFilteredPods() + if len(errors) > 0 { + return false, false, fmt.Errorf("failed to get Pods to delete for node %s, errors encountered: %v", vc.nodeName, err) + } + klog.V(6).Infof("Found %v pods to delete for node %s", len(podsToDelete), vc.nodeName) + + if len(podsToDelete) == 0 { + return false, false, nil + } + + // delete the previously filtered pods, then tells the controller to retry later + if errs := vc.deletePods(podsToDelete); len(errs) > 0 { + return false, false, fmt.Errorf("failed to delete pods, errors encountered: %v", errs) + } + klog.V(6).Infof("Successfully deleted all pods mounting persistent volumes attached on node %s", vc.nodeName) + return true, false, err +} + +func (vc *NodeVolumeAttachmentsCleanup) getFilteredPods() ([]corev1.Pod, []error) { + filteredPods := []corev1.Pod{} + lock := sync.Mutex{} + retErrs := []error{} + + volumeAttachments, err := vc.kubeClient.StorageV1().VolumeAttachments().List(vc.ctx, metav1.ListOptions{}) + if err != nil { + retErrs = append(retErrs, fmt.Errorf("failed to list pods: %v", err)) + return nil, retErrs + } + + persistentVolumeClaims, err := vc.kubeClient.CoreV1().PersistentVolumeClaims(metav1.NamespaceAll).List(vc.ctx, metav1.ListOptions{}) + if err != nil { + retErrs = append(retErrs, fmt.Errorf("failed to list persistent volumes: %v", err)) + return nil, retErrs + } + + errCh := make(chan error, errorQueueLen) + wg := sync.WaitGroup{} + for _, va := range volumeAttachments.Items { + if va.Spec.NodeName == vc.nodeName { + for _, pvc := range persistentVolumeClaims.Items { + if va.Spec.Source.PersistentVolumeName != nil && *va.Spec.Source.PersistentVolumeName == pvc.Spec.VolumeName { + wg.Add(1) + go func(pvc corev1.PersistentVolumeClaim) { + defer wg.Done() + pods, err := vc.kubeClient.CoreV1().Pods(pvc.Namespace).List(vc.ctx, metav1.ListOptions{}) + switch { + case kerrors.IsTooManyRequests(err): + return + case err != nil: + errCh <- fmt.Errorf("failed to list pod: %v", err) + default: + for _, pod := range pods.Items { + if doesPodClaimVolume(pod, pvc.Name) && pod.Spec.NodeName == vc.nodeName { + lock.Lock() + filteredPods = append(filteredPods, pod) + lock.Unlock() + } + } + } + }(pvc) + } + } + } + } + wg.Wait() + close(errCh) + + for err := range errCh { + retErrs = append(retErrs, err) + } + + return filteredPods, nil +} + +// nodeCanBeDeleted checks if all the volumeAttachments related to the node have already been collected by the external CSI driver +func (vc *NodeVolumeAttachmentsCleanup) nodeCanBeDeleted() (bool, error) { + volumeAttachments, err := vc.kubeClient.StorageV1().VolumeAttachments().List(vc.ctx, metav1.ListOptions{}) + if err != nil { + return false, fmt.Errorf("error while listing volumeAttachments: %v", err) + } + for _, va := range volumeAttachments.Items { + if va.Spec.NodeName == vc.nodeName { + klog.V(3).Infof("waiting for the volumeAttachment %s to be deleted before deleting node %s", va.Name, vc.nodeName) + return false, nil + } + } + return true, nil +} + +func (vc *NodeVolumeAttachmentsCleanup) deletePods(pods []corev1.Pod) []error { + + errCh := make(chan error, len(pods)) + retErrs := []error{} + + var wg sync.WaitGroup + var isDone bool + defer func() { isDone = true }() + + wg.Add(len(pods)) + for _, pod := range pods { + go func(p corev1.Pod) { + defer wg.Done() + for { + if isDone { + return + } + err := vc.kubeClient.CoreV1().Pods(p.Namespace).Delete(vc.ctx, p.Name, metav1.DeleteOptions{}) + if err == nil || kerrors.IsNotFound(err) { + klog.V(6).Infof("Successfully deleted pod %s/%s on node %s", p.Namespace, p.Name, vc.nodeName) + return + } else if kerrors.IsTooManyRequests(err) { + // PDB prevents pod deletion, return and make the controller retry later + return + } else { + errCh <- fmt.Errorf("error deleting pod %s/%s on node %s: %v", p.Namespace, p.Name, vc.nodeName, err) + return + } + } + }(pod) + } + wg.Wait() + close(errCh) + + for err := range errCh { + retErrs = append(retErrs, err) + } + + return retErrs +} + +// doesPodClaimVolume checks if the volume is mounted by the pod +func doesPodClaimVolume(pod corev1.Pod, pvcName string) bool { + for _, volumeMount := range pod.Spec.Volumes { + if volumeMount.PersistentVolumeClaim != nil && volumeMount.PersistentVolumeClaim.ClaimName == pvcName { + return true + } + } + return false +}