diff --git a/kwok/charts/templates/clusterrole.yaml b/kwok/charts/templates/clusterrole.yaml index 0314b734f1..f8594e383b 100644 --- a/kwok/charts/templates/clusterrole.yaml +++ b/kwok/charts/templates/clusterrole.yaml @@ -36,7 +36,7 @@ rules: resources: ["pods", "nodes", "persistentvolumes", "persistentvolumeclaims", "replicationcontrollers", "namespaces"] verbs: ["get", "list", "watch"] - apiGroups: ["storage.k8s.io"] - resources: ["storageclasses", "csinodes"] + resources: ["storageclasses", "csinodes", "volumeattachments" ] verbs: ["get", "watch", "list"] - apiGroups: ["apps"] resources: ["daemonsets", "deployments", "replicasets", "statefulsets"] diff --git a/pkg/controllers/node/termination/controller.go b/pkg/controllers/node/termination/controller.go index 86de091b91..a16e4d3b7e 100644 --- a/pkg/controllers/node/termination/controller.go +++ b/pkg/controllers/node/termination/controller.go @@ -47,6 +47,7 @@ import ( "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" + volumeattachmentutils "sigs.k8s.io/karpenter/pkg/utils/volumeattachment" ) // Controller for the resource @@ -107,6 +108,15 @@ func (c *Controller) finalize(ctx context.Context, node *v1.Node) (reconcile.Res } return reconcile.Result{RequeueAfter: 1 * time.Second}, nil } + // In order for stateful pods to smoothly migrate from the terminating Node, we wait for VolumeAttachments + // of drain-able pods to be cleaned up before terminating the node and removing it from the cluster. + areVolumesDetached, err := c.ensureVolumesDetached(ctx, node) + if err != nil { + return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err) + } + if !areVolumesDetached { + return reconcile.Result{RequeueAfter: 1 * time.Second}, nil + } nodeClaims, err := nodeutils.GetNodeClaims(ctx, node, c.kubeClient) if err != nil { return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) @@ -150,6 +160,19 @@ func (c *Controller) deleteAllNodeClaims(ctx context.Context, node *v1.Node) err return nil } +func (c *Controller) ensureVolumesDetached(ctx context.Context, node *v1.Node) (volumesDetached bool, err error) { + volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node) + if err != nil { + return false, err + } + // Filter out volume attachments associated with non-drain-able nodes or multi-attachable volumes + filteredVolumeAttachments, err := volumeattachmentutils.FilterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments) + if err != nil { + return false, err + } + return len(filteredVolumeAttachments) == 0, nil +} + func (c *Controller) removeFinalizer(ctx context.Context, n *v1.Node) error { stored := n.DeepCopy() controllerutil.RemoveFinalizer(n, v1beta1.TerminationFinalizer) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index fbed5673d5..108b920c28 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -30,6 +30,8 @@ import ( "github.com/awslabs/operatorpkg/status" "k8s.io/apimachinery/pkg/runtime/schema" + storagev1 "k8s.io/api/storage/v1" + "github.com/awslabs/operatorpkg/controller" opmetrics "github.com/awslabs/operatorpkg/metrics" "github.com/prometheus/client_golang/prometheus" @@ -217,6 +219,9 @@ func NewOperator() (context.Context, *Operator) { lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodePool{}, "spec.template.spec.nodeClassRef.name", func(o client.Object) []string { return []string{o.(*v1beta1.NodePool).Spec.Template.Spec.NodeClassRef.Name} }), "failed to setup nodepool nodeclassref name indexer") + lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(o client.Object) []string { + return []string{o.(*storagev1.VolumeAttachment).Spec.NodeName} + }), "failed to setup volumeattachment indexer") lo.Must0(mgr.AddReadyzCheck("manager", func(req *http.Request) error { return lo.Ternary(mgr.GetCache().WaitForCacheSync(req.Context()), nil, fmt.Errorf("failed to sync caches")) diff --git a/pkg/utils/node/node.go b/pkg/utils/node/node.go index b6133b9d36..bf6d93f968 100644 --- a/pkg/utils/node/node.go +++ b/pkg/utils/node/node.go @@ -20,12 +20,13 @@ import ( "context" "fmt" - "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + storagev1 "k8s.io/api/storage/v1" "github.com/samber/lo" v1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" "sigs.k8s.io/karpenter/pkg/utils/pod" ) @@ -75,6 +76,24 @@ func GetProvisionablePods(ctx context.Context, kubeClient client.Client) ([]*v1. }), nil } +// GetVolumeAttachments grabs all volumeAttachments of passed node +func GetVolumeAttachments(ctx context.Context, kubeClient client.Client, node *v1.Node) ([]*storagev1.VolumeAttachment, error) { + var volumeAttachments []*storagev1.VolumeAttachment + var volumeAttachmentList storagev1.VolumeAttachmentList + if err := kubeClient.List(ctx, &volumeAttachmentList, client.MatchingFields{"spec.nodeName": node.Name}); err != nil { + // If there have not been any volumeAttachments, index may not exist. Therefore, fall back to default List + if err = kubeClient.List(ctx, &volumeAttachmentList); err != nil { + return nil, fmt.Errorf("listing volumeattachments, %w", err) + } + } + for i := range volumeAttachmentList.Items { + if volumeAttachmentList.Items[i].Spec.NodeName == node.Name { + volumeAttachments = append(volumeAttachments, &volumeAttachmentList.Items[i]) + } + } + return volumeAttachments, nil +} + func GetCondition(n *v1.Node, match v1.NodeConditionType) v1.NodeCondition { for _, condition := range n.Status.Conditions { if condition.Type == match { diff --git a/pkg/utils/volumeattachment/volumeattachment.go b/pkg/utils/volumeattachment/volumeattachment.go new file mode 100644 index 0000000000..e621b2a72d --- /dev/null +++ b/pkg/utils/volumeattachment/volumeattachment.go @@ -0,0 +1,80 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumeattachment + +import ( + "context" + + "github.com/samber/lo" + + "sigs.k8s.io/karpenter/pkg/utils/pod" + + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + nodeutil "sigs.k8s.io/karpenter/pkg/utils/node" + volumeutil "sigs.k8s.io/karpenter/pkg/utils/volume" +) + +// FilterVolumeAttachments filters out volumeAttachments that should not block the termination of the passed node +func FilterVolumeAttachments(ctx context.Context, kubeClient client.Client, node *v1.Node, volumeAttachments []*storagev1.VolumeAttachment) ([]*storagev1.VolumeAttachment, error) { + var filteredVolumeAttachments []*storagev1.VolumeAttachment + // No need to filter empty volumeAttachments list + if len(volumeAttachments) == 0 { + return volumeAttachments, nil + } + // Filter out non-drain-able pods + pods, err := nodeutil.GetPods(ctx, kubeClient, node) + if err != nil { + return nil, err + } + drainablePods := lo.Reject(pods, func(p *v1.Pod, _ int) bool { + return pod.ToleratesDisruptionNoScheduleTaint(p) + }) + // Filter out Multi-Attach volumes + shouldFilterOutVolume := make(map[string]bool) + for _, p := range drainablePods { + for _, v := range p.Spec.Volumes { + pvc, err := volumeutil.GetPersistentVolumeClaim(ctx, kubeClient, p, v) + if err != nil { + return nil, err + } + if pvc != nil { + shouldFilterOutVolume[pvc.Spec.VolumeName] = CannotMultiAttach(*pvc) + } + } + } + for i := range volumeAttachments { + pvName := volumeAttachments[i].Spec.Source.PersistentVolumeName + if pvName != nil && shouldFilterOutVolume[*pvName] { + filteredVolumeAttachments = append(filteredVolumeAttachments, volumeAttachments[i]) + } + } + return filteredVolumeAttachments, nil +} + +// CannotMultiAttach returns true if the persistentVolumeClaim's underlying volume cannot be attached to multiple nodes +// i.e. its access mode is not ReadWriteOnce/ReadWriteOncePod +func CannotMultiAttach(pvc v1.PersistentVolumeClaim) bool { + for _, accessMode := range pvc.Spec.AccessModes { + if accessMode == v1.ReadWriteOnce || accessMode == v1.ReadWriteOncePod { + return true + } + } + return false +}