Skip to content

Commit

Permalink
Merge pull request #12 from smartnews/v1.0.4-sn-main
Browse files Browse the repository at this point in the history
V1.0.4 sn main
  • Loading branch information
Luke-Smartnews authored Nov 1, 2024
2 parents 008f465 + c627b7d commit 8eb4714
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ coverage.html
*.test
*.cpuprofile
*.heapprofile
*.swp

# Common in OSs and IDEs
.idea
Expand Down
7 changes: 7 additions & 0 deletions kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ spec:
- WhenEmpty
- WhenEmptyOrUnderutilized
type: string
utilizationThreshold:
description: |-
UtilizationThreshold is defined as sum of requested resources divided by capacity
below which a node can be considered for disruption.
maximum: 100
minimum: 1
type: integer
required:
- consolidateAfter
type: object
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ spec:
- WhenEmpty
- WhenEmptyOrUnderutilized
type: string
utilizationThreshold:
description: |-
UtilizationThreshold is defined as sum of requested resources divided by capacity
below which a node can be considered for disruption.
maximum: 100
minimum: 1
type: integer
required:
- consolidateAfter
type: object
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/v1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ type Disruption struct {
// +kubebuilder:validation:Enum:={WhenEmpty,WhenEmptyOrUnderutilized}
// +optional
ConsolidationPolicy ConsolidationPolicy `json:"consolidationPolicy,omitempty"`
// UtilizationThreshold is defined as sum of requested resources divided by capacity
// below which a node can be considered for disruption.
// +kubebuilder:validation:Minimum:=1
// +kubebuilder:validation:Maximum:=100
// +optional
UtilizationThreshold *int `json:"utilizationThreshold,omitempty"`
// Budgets is a list of Budgets.
// If there are multiple active budgets, Karpenter uses
// the most restrictive value. If left undefined,
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 58 additions & 0 deletions pkg/controllers/nodeclaim/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ package disruption

import (
"context"
"fmt"

"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/karpenter/pkg/utils/node"

corev1 "k8s.io/api/core/v1"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)

// Consolidation is a nodeclaim sub-controller that adds or removes status conditions on empty nodeclaims based on consolidateAfter
Expand Down Expand Up @@ -69,10 +76,61 @@ func (c *Consolidation) Reconcile(ctx context.Context, nodePool *v1.NodePool, no
return reconcile.Result{RequeueAfter: consolidatableTime.Sub(c.clock.Now())}, nil
}

// Get the node to check utilization
n, err := nodeclaimutil.NodeForNodeClaim(ctx, c.kubeClient, nodeClaim)
if err != nil {
if nodeclaimutil.IsDuplicateNodeError(err) || nodeclaimutil.IsNodeNotFoundError(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
// Check the node utilization if the utilizationThreshold is specified, the node can be disruptted only if the utilization is below the threshold.
threshold := nodePool.Spec.Disruption.UtilizationThreshold
if threshold != nil {
pods, err := node.GetPods(ctx, c.kubeClient, n)
if err != nil {
return reconcile.Result{}, fmt.Errorf("retrieving node pods, %w", err)
}
cpu, err := calculateUtilizationOfResource(n, corev1.ResourceCPU, pods)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to calculate CPU, %w", err)
}
memory, err := calculateUtilizationOfResource(n, corev1.ResourceMemory, pods)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to calculate memory, %w", err)
}
if cpu > float64(*threshold)/100 || memory > float64(*threshold)/100 {
if hasConsolidatableCondition {
_ = nodeClaim.StatusConditions().Clear(v1.ConditionTypeConsolidatable)
log.FromContext(ctx).V(1).Info("removing consolidatable status condition due to high utilization")
}
}
}

// 6. Otherwise, add the consolidatable status condition
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
if !hasConsolidatableCondition {
log.FromContext(ctx).V(1).Info("marking consolidatable")
}
return reconcile.Result{}, nil
}

// CalculateUtilizationOfResource calculates utilization of a given resource for a node.
func calculateUtilizationOfResource(node *corev1.Node, resourceName corev1.ResourceName, pods []*corev1.Pod) (float64, error) {
allocatable, found := node.Status.Allocatable[resourceName]
if !found {
return 0, fmt.Errorf("failed to get %v from %s", resourceName, node.Name)
}
if allocatable.MilliValue() == 0 {
return 0, fmt.Errorf("%v is 0 at %s", resourceName, node.Name)
}
podsRequest := resource.MustParse("0")
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
if resourceValue, found := container.Resources.Requests[resourceName]; found {
podsRequest.Add(resourceValue)
}
}
}
return float64(podsRequest.MilliValue()) / float64(allocatable.MilliValue()), nil
}
38 changes: 34 additions & 4 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,26 +340,56 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
return scheduler.Results{}, err
}
pods := append(pendingPods, deletingNodePods...)
// filter pods which are alredy handled in last 3 minute
targetPods := lo.FilterMap(pods, func(pod *corev1.Pod, _ int) (*corev1.Pod, bool) {
if p.isPodHandled(ctx, pod) {
return nil, false
}
return pod, true
})
// nothing to schedule, so just return success
if len(pods) == 0 {
if len(targetPods) == 0 {
return scheduler.Results{}, nil
}
s, err := p.NewScheduler(ctx, pods, nodes.Active())
s, err := p.NewScheduler(ctx, targetPods, nodes.Active())
if err != nil {
if errors.Is(err, ErrNodePoolsNotFound) {
log.FromContext(ctx).Info("no nodepools found")
return scheduler.Results{}, nil
}
return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
results := s.Solve(ctx, targetPods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
if len(results.NewNodeClaims) > 0 {
log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)")
log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(targetPods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)")
}
results.Record(ctx, p.recorder, p.cluster)
return results, nil
}

func (p *Provisioner) isPodHandled(ctx context.Context, pod *corev1.Pod) bool {
var events corev1.EventList
filter := client.MatchingFields{
"namespace": pod.Namespace,
"involvedObject.kind": "Pod",
"involvedObject.name": pod.Name,
"reason": "HandledByKarpenter",
}
if err := p.kubeClient.List(ctx, &events, filter); err == nil {
for _, event := range events.Items {
// ignore the pod if it's already handled in 3 minute
if time.Now().Before(event.LastTimestamp.Time.Add(3 * time.Minute)) {
log.FromContext(ctx).Info(fmt.Sprintf("pod %s/%s is handled", pod.Namespace, pod.Name))
return true
}
}
} else {
log.FromContext(ctx).Error(err, fmt.Sprintf("failed to get event for %s/%s", pod.Namespace, pod.Name))
}
p.recorder.Publish(scheduler.PodHandledEvent(pod))
return false
}

func (p *Provisioner) Create(ctx context.Context, n *scheduler.NodeClaim, opts ...option.Function[LaunchOptions]) (string, error) {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodePool", klog.KRef("", n.NodePoolName)))
options := option.Resolve(opts...)
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/provisioning/scheduling/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,14 @@ func PodFailedToScheduleEvent(pod *corev1.Pod, err error) events.Event {
DedupeTimeout: 5 * time.Minute,
}
}

func PodHandledEvent(pod *corev1.Pod) events.Event {
return events.Event{
InvolvedObject: pod,
Type: corev1.EventTypeNormal,
Reason: "HandledByKarpenter",
Message: "Pod is handled by karpenter",
DedupeValues: []string{string(pod.UID)},
DedupeTimeout: 5 * time.Minute,
}
}
12 changes: 12 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ func NewOperator() (context.Context, *Operator) {
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Node{}, "spec.providerID", func(o client.Object) []string {
return []string{o.(*corev1.Node).Spec.ProviderID}
}), "failed to setup node provider id indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Event{}, "involvedObject.kind", func(o client.Object) []string {
return []string{o.(*corev1.Event).InvolvedObject.Kind}
}), "failed to setup event kind indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Event{}, "involvedObject.name", func(o client.Object) []string {
return []string{o.(*corev1.Event).InvolvedObject.Name}
}), "failed to setup event name indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Event{}, "namespace", func(o client.Object) []string {
return []string{o.(*corev1.Event).Namespace}
}), "failed to setup event namespace indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Event{}, "reason", func(o client.Object) []string {
return []string{o.(*corev1.Event).Reason}
}), "failed to setup event reason indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "status.providerID", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Status.ProviderID}
}), "failed to setup nodeclaim provider id indexer")
Expand Down

0 comments on commit 8eb4714

Please sign in to comment.