diff --git a/.gitignore b/.gitignore index 95f28cff37..5dac84ced1 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ coverage.html *.test *.cpuprofile *.heapprofile +*.swp # Common in OSs and IDEs .idea diff --git a/kwok/charts/crds/karpenter.sh_nodepools.yaml b/kwok/charts/crds/karpenter.sh_nodepools.yaml index 05bafa01f9..8925c7a524 100644 --- a/kwok/charts/crds/karpenter.sh_nodepools.yaml +++ b/kwok/charts/crds/karpenter.sh_nodepools.yaml @@ -155,6 +155,13 @@ spec: - WhenEmpty - WhenEmptyOrUnderutilized type: string + utilizationThreshold: + description: |- + UtilizationThreshold is defined as sum of requested resources divided by capacity + below which a node can be considered for disruption. + maximum: 100 + minimum: 1 + type: integer required: - consolidateAfter type: object diff --git a/pkg/apis/crds/karpenter.sh_nodepools.yaml b/pkg/apis/crds/karpenter.sh_nodepools.yaml index 6bfc0532a3..2f7826dd01 100644 --- a/pkg/apis/crds/karpenter.sh_nodepools.yaml +++ b/pkg/apis/crds/karpenter.sh_nodepools.yaml @@ -155,6 +155,13 @@ spec: - WhenEmpty - WhenEmptyOrUnderutilized type: string + utilizationThreshold: + description: |- + UtilizationThreshold is defined as sum of requested resources divided by capacity + below which a node can be considered for disruption. + maximum: 100 + minimum: 1 + type: integer required: - consolidateAfter type: object diff --git a/pkg/apis/v1/nodepool.go b/pkg/apis/v1/nodepool.go index 681d933344..03f7229a04 100644 --- a/pkg/apis/v1/nodepool.go +++ b/pkg/apis/v1/nodepool.go @@ -74,6 +74,12 @@ type Disruption struct { // +kubebuilder:validation:Enum:={WhenEmpty,WhenEmptyOrUnderutilized} // +optional ConsolidationPolicy ConsolidationPolicy `json:"consolidationPolicy,omitempty"` + // UtilizationThreshold is defined as sum of requested resources divided by capacity + // below which a node can be considered for disruption. + // +kubebuilder:validation:Minimum:=1 + // +kubebuilder:validation:Maximum:=100 + // +optional + UtilizationThreshold *int `json:"utilizationThreshold,omitempty"` // Budgets is a list of Budgets. // If there are multiple active budgets, Karpenter uses // the most restrictive value. If left undefined, diff --git a/pkg/apis/v1/zz_generated.deepcopy.go b/pkg/apis/v1/zz_generated.deepcopy.go index 4ccbab50db..7c9b34182b 100644 --- a/pkg/apis/v1/zz_generated.deepcopy.go +++ b/pkg/apis/v1/zz_generated.deepcopy.go @@ -62,6 +62,11 @@ func (in *Budget) DeepCopy() *Budget { func (in *Disruption) DeepCopyInto(out *Disruption) { *out = *in in.ConsolidateAfter.DeepCopyInto(&out.ConsolidateAfter) + if in.UtilizationThreshold != nil { + in, out := &in.UtilizationThreshold, &out.UtilizationThreshold + *out = new(int) + **out = **in + } if in.Budgets != nil { in, out := &in.Budgets, &out.Budgets *out = make([]Budget, len(*in)) diff --git a/pkg/controllers/nodeclaim/disruption/consolidation.go b/pkg/controllers/nodeclaim/disruption/consolidation.go index 1e993129fd..bd39b13a78 100644 --- a/pkg/controllers/nodeclaim/disruption/consolidation.go +++ b/pkg/controllers/nodeclaim/disruption/consolidation.go @@ -18,14 +18,21 @@ package disruption import ( "context" + "fmt" "github.com/samber/lo" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/karpenter/pkg/utils/node" + + corev1 "k8s.io/api/core/v1" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim" ) // Consolidation is a nodeclaim sub-controller that adds or removes status conditions on empty nodeclaims based on consolidateAfter @@ -69,6 +76,37 @@ func (c *Consolidation) Reconcile(ctx context.Context, nodePool *v1.NodePool, no return reconcile.Result{RequeueAfter: consolidatableTime.Sub(c.clock.Now())}, nil } + // Get the node to check utilization + n, err := nodeclaimutil.NodeForNodeClaim(ctx, c.kubeClient, nodeClaim) + if err != nil { + if nodeclaimutil.IsDuplicateNodeError(err) || nodeclaimutil.IsNodeNotFoundError(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + // Check the node utilization if the utilizationThreshold is specified, the node can be disruptted only if the utilization is below the threshold. + threshold := nodePool.Spec.Disruption.UtilizationThreshold + if threshold != nil { + pods, err := node.GetPods(ctx, c.kubeClient, n) + if err != nil { + return reconcile.Result{}, fmt.Errorf("retrieving node pods, %w", err) + } + cpu, err := calculateUtilizationOfResource(n, corev1.ResourceCPU, pods) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to calculate CPU, %w", err) + } + memory, err := calculateUtilizationOfResource(n, corev1.ResourceMemory, pods) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to calculate memory, %w", err) + } + if cpu > float64(*threshold)/100 || memory > float64(*threshold)/100 { + if hasConsolidatableCondition { + _ = nodeClaim.StatusConditions().Clear(v1.ConditionTypeConsolidatable) + log.FromContext(ctx).V(1).Info("removing consolidatable status condition due to high utilization") + } + } + } + // 6. Otherwise, add the consolidatable status condition nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable) if !hasConsolidatableCondition { @@ -76,3 +114,23 @@ func (c *Consolidation) Reconcile(ctx context.Context, nodePool *v1.NodePool, no } return reconcile.Result{}, nil } + +// CalculateUtilizationOfResource calculates utilization of a given resource for a node. +func calculateUtilizationOfResource(node *corev1.Node, resourceName corev1.ResourceName, pods []*corev1.Pod) (float64, error) { + allocatable, found := node.Status.Allocatable[resourceName] + if !found { + return 0, fmt.Errorf("failed to get %v from %s", resourceName, node.Name) + } + if allocatable.MilliValue() == 0 { + return 0, fmt.Errorf("%v is 0 at %s", resourceName, node.Name) + } + podsRequest := resource.MustParse("0") + for _, pod := range pods { + for _, container := range pod.Spec.Containers { + if resourceValue, found := container.Resources.Requests[resourceName]; found { + podsRequest.Add(resourceValue) + } + } + } + return float64(podsRequest.MilliValue()) / float64(allocatable.MilliValue()), nil +} diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 5bf01d1616..8c73d2d783 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -340,11 +340,18 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { return scheduler.Results{}, err } pods := append(pendingPods, deletingNodePods...) + // filter pods which are alredy handled in last 3 minute + targetPods := lo.FilterMap(pods, func(pod *corev1.Pod, _ int) (*corev1.Pod, bool) { + if p.isPodHandled(ctx, pod) { + return nil, false + } + return pod, true + }) // nothing to schedule, so just return success - if len(pods) == 0 { + if len(targetPods) == 0 { return scheduler.Results{}, nil } - s, err := p.NewScheduler(ctx, pods, nodes.Active()) + s, err := p.NewScheduler(ctx, targetPods, nodes.Active()) if err != nil { if errors.Is(err, ErrNodePoolsNotFound) { log.FromContext(ctx).Info("no nodepools found") @@ -352,14 +359,37 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { } return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err) } - results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes) + results := s.Solve(ctx, targetPods).TruncateInstanceTypes(scheduler.MaxInstanceTypes) if len(results.NewNodeClaims) > 0 { - log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)") + log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(targetPods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)") } results.Record(ctx, p.recorder, p.cluster) return results, nil } +func (p *Provisioner) isPodHandled(ctx context.Context, pod *corev1.Pod) bool { + var events corev1.EventList + filter := client.MatchingFields{ + "namespace": pod.Namespace, + "involvedObject.kind": "Pod", + "involvedObject.name": pod.Name, + "reason": "HandledByKarpenter", + } + if err := p.kubeClient.List(ctx, &events, filter); err == nil { + for _, event := range events.Items { + // ignore the pod if it's already handled in 3 minute + if time.Now().Before(event.LastTimestamp.Time.Add(3 * time.Minute)) { + log.FromContext(ctx).Info(fmt.Sprintf("pod %s/%s is handled", pod.Namespace, pod.Name)) + return true + } + } + } else { + log.FromContext(ctx).Error(err, fmt.Sprintf("failed to get event for %s/%s", pod.Namespace, pod.Name)) + } + p.recorder.Publish(scheduler.PodHandledEvent(pod)) + return false +} + func (p *Provisioner) Create(ctx context.Context, n *scheduler.NodeClaim, opts ...option.Function[LaunchOptions]) (string, error) { ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodePool", klog.KRef("", n.NodePoolName))) options := option.Resolve(opts...) diff --git a/pkg/controllers/provisioning/scheduling/events.go b/pkg/controllers/provisioning/scheduling/events.go index de25d4532e..e44b730140 100644 --- a/pkg/controllers/provisioning/scheduling/events.go +++ b/pkg/controllers/provisioning/scheduling/events.go @@ -59,3 +59,14 @@ func PodFailedToScheduleEvent(pod *corev1.Pod, err error) events.Event { DedupeTimeout: 5 * time.Minute, } } + +func PodHandledEvent(pod *corev1.Pod) events.Event { + return events.Event{ + InvolvedObject: pod, + Type: corev1.EventTypeNormal, + Reason: "HandledByKarpenter", + Message: "Pod is handled by karpenter", + DedupeValues: []string{string(pod.UID)}, + DedupeTimeout: 5 * time.Minute, + } +} diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index e238a3d200..48159698cc 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -198,6 +198,18 @@ func NewOperator() (context.Context, *Operator) { lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Node{}, "spec.providerID", func(o client.Object) []string { return []string{o.(*corev1.Node).Spec.ProviderID} }), "failed to setup node provider id indexer") + lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Event{}, "involvedObject.kind", func(o client.Object) []string { + return []string{o.(*corev1.Event).InvolvedObject.Kind} + }), "failed to setup event kind indexer") + lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Event{}, "involvedObject.name", func(o client.Object) []string { + return []string{o.(*corev1.Event).InvolvedObject.Name} + }), "failed to setup event name indexer") + lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Event{}, "namespace", func(o client.Object) []string { + return []string{o.(*corev1.Event).Namespace} + }), "failed to setup event namespace indexer") + lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Event{}, "reason", func(o client.Object) []string { + return []string{o.(*corev1.Event).Reason} + }), "failed to setup event reason indexer") lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "status.providerID", func(o client.Object) []string { return []string{o.(*v1.NodeClaim).Status.ProviderID} }), "failed to setup nodeclaim provider id indexer")