Skip to content

Commit

Permalink
Merge pull request #1 from smartnews/sn-main
Browse files Browse the repository at this point in the history
Improve scaling-in behavior
  • Loading branch information
Luke-Smartnews authored Feb 27, 2024
2 parents e0dc627 + 0943800 commit 01697c1
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ coverage.html
*.test
*.cpuprofile
*.heapprofile
*.swp

# Common in OSs and IDEs
.idea
Expand Down
9 changes: 7 additions & 2 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,15 @@ spec:
memory leak protection, and disruption testing.
pattern: ^(([0-9]+(s|m|h))+)|(Never)$
type: string
utilizationThreshold:
description: |-
UtilizationThreshold is defined as sum of requested resources divided by capacity
below which a node can be considered for disruption.
maximum: 100
minimum: 1
type: integer
type: object
x-kubernetes-validations:
- message: consolidateAfter cannot be combined with consolidationPolicy=WhenUnderutilized
rule: 'has(self.consolidateAfter) ? self.consolidationPolicy != ''WhenUnderutilized'' || self.consolidateAfter == ''Never'' : true'
- message: consolidateAfter must be specified with consolidationPolicy=WhenEmpty
rule: 'self.consolidationPolicy == ''WhenEmpty'' ? has(self.consolidateAfter) : true'
limits:
Expand Down
13 changes: 7 additions & 6 deletions pkg/apis/v1beta1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ var LivingConditions = []apis.ConditionType{
}

var (
Launched apis.ConditionType = "Launched"
Registered apis.ConditionType = "Registered"
Initialized apis.ConditionType = "Initialized"
Empty apis.ConditionType = "Empty"
Drifted apis.ConditionType = "Drifted"
Expired apis.ConditionType = "Expired"
Launched apis.ConditionType = "Launched"
Registered apis.ConditionType = "Registered"
Initialized apis.ConditionType = "Initialized"
Empty apis.ConditionType = "Empty"
Underutilized apis.ConditionType = "Underutilized"
Drifted apis.ConditionType = "Drifted"
Expired apis.ConditionType = "Expired"
)

func (in *NodeClaim) GetConditions() apis.Conditions {
Expand Down
7 changes: 6 additions & 1 deletion pkg/apis/v1beta1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type NodePoolSpec struct {
Template NodeClaimTemplate `json:"template"`
// Disruption contains the parameters that relate to Karpenter's disruption logic
// +kubebuilder:default={"consolidationPolicy": "WhenUnderutilized", "expireAfter": "720h"}
// +kubebuilder:validation:XValidation:message="consolidateAfter cannot be combined with consolidationPolicy=WhenUnderutilized",rule="has(self.consolidateAfter) ? self.consolidationPolicy != 'WhenUnderutilized' || self.consolidateAfter == 'Never' : true"
// +kubebuilder:validation:XValidation:message="consolidateAfter must be specified with consolidationPolicy=WhenEmpty",rule="self.consolidationPolicy == 'WhenEmpty' ? has(self.consolidateAfter) : true"
// +optional
Disruption Disruption `json:"disruption"`
Expand Down Expand Up @@ -77,6 +76,12 @@ type Disruption struct {
// +kubebuilder:validation:Enum:={WhenEmpty,WhenUnderutilized}
// +optional
ConsolidationPolicy ConsolidationPolicy `json:"consolidationPolicy,omitempty"`
// UtilizationThreshold is defined as sum of requested resources divided by capacity
// below which a node can be considered for disruption.
// +kubebuilder:validation:Minimum:=1
// +kubebuilder:validation:Maximum:=100
// +optional
UtilizationThreshold *int `json:"utilizationThreshold,omitempty"`
// ExpireAfter is the duration the controller will wait
// before terminating a node, measured from when the node is created. This
// is useful to implement features like eventually consistent node upgrade,
Expand Down
3 changes: 0 additions & 3 deletions pkg/apis/v1beta1/nodepool_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ func (in *NodeClaimTemplate) validateRequirementsNodePoolKeyDoesNotExist() (errs

//nolint:gocyclo
func (in *Disruption) validate() (errs *apis.FieldError) {
if in.ConsolidateAfter != nil && in.ConsolidateAfter.Duration != nil && in.ConsolidationPolicy == ConsolidationPolicyWhenUnderutilized {
return errs.Also(apis.ErrGeneric("consolidateAfter cannot be combined with consolidationPolicy=WhenUnderutilized"))
}
if in.ConsolidateAfter == nil && in.ConsolidationPolicy == ConsolidationPolicyWhenEmpty {
return errs.Also(apis.ErrGeneric("consolidateAfter must be specified with consolidationPolicy=WhenEmpty"))
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/apis/v1beta1/nodepool_validation_cel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ var _ = Describe("CEL/Validation", func() {
nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmpty
Expect(env.Client.Create(ctx, nodePool)).To(Succeed())
})
It("should fail when setting consolidateAfter with consolidationPolicy=WhenUnderutilized", func() {
nodePool.Spec.Disruption.ConsolidateAfter = &NillableDuration{Duration: lo.ToPtr(lo.Must(time.ParseDuration("30s")))}
nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenUnderutilized
Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed())
})
It("should succeed when not setting consolidateAfter to 'Never' with consolidationPolicy=WhenUnderutilized", func() {
nodePool.Spec.Disruption.ConsolidateAfter = &NillableDuration{Duration: nil}
nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenUnderutilized
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func (c *consolidation) ShouldDisrupt(_ context.Context, cn *Candidate) bool {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", cn.nodePool.Name))...)
return false
}
// Only check when UtilizationThreshold is specified to make it compatible
if cn.nodePool.Spec.Disruption.UtilizationThreshold != nil {
if !cn.NodeClaim.StatusConditions().GetCondition(v1beta1.Underutilized).IsTrue() ||
c.clock.Now().Before(cn.NodeClaim.StatusConditions().GetCondition(v1beta1.Underutilized).LastTransitionTime.Inner.Add(*cn.nodePool.Spec.Disruption.ConsolidateAfter.Duration)) {
return false
}
}

return true
}

Expand Down
142 changes: 142 additions & 0 deletions pkg/controllers/nodeclaim/disruption/consolidation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package disruption

import (
"context"
"fmt"

"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/clock"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/utils/node"
nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)

// Consolidation is a nodeclaim sub-controller that adds or removes status conditions on nodeclaims when using WhenUnderutilized policy.
type Consolidation struct {
kubeClient client.Client
cluster *state.Cluster
clock clock.Clock
}

//nolint:gocyclo
func (e *Consolidation) Reconcile(ctx context.Context, nodePool *v1beta1.NodePool, nodeClaim *v1beta1.NodeClaim) (reconcile.Result, error) {
hasCondition := nodeClaim.StatusConditions().GetCondition(v1beta1.Underutilized) != nil
if nodePool.Spec.Disruption.ConsolidationPolicy != v1beta1.ConsolidationPolicyWhenUnderutilized {
if hasCondition {
_ = nodeClaim.StatusConditions().ClearCondition(v1beta1.Underutilized)
}
return reconcile.Result{}, nil
}
if initCond := nodeClaim.StatusConditions().GetCondition(v1beta1.Initialized); initCond == nil || initCond.IsFalse() {
if hasCondition {
_ = nodeClaim.StatusConditions().ClearCondition(v1beta1.Underutilized)
logging.FromContext(ctx).Debugf("removing consolidated status condition, isn't initialized")
}
return reconcile.Result{}, nil
}
_, err := nodeclaimutil.NodeForNodeClaim(ctx, e.kubeClient, nodeClaim)
if err != nil {
if nodeclaimutil.IsDuplicateNodeError(err) || nodeclaimutil.IsNodeNotFoundError(err) {
_ = nodeClaim.StatusConditions().ClearCondition(v1beta1.Underutilized)
if hasCondition {
logging.FromContext(ctx).Debugf("removing underutilized status condition, doesn't have a single node mapping")
}
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

// Get the node to check utilization
n, err := nodeclaimutil.NodeForNodeClaim(ctx, e.kubeClient, nodeClaim)
if err != nil {
if nodeclaimutil.IsDuplicateNodeError(err) || nodeclaimutil.IsNodeNotFoundError(err) {
_ = nodeClaim.StatusConditions().ClearCondition(v1beta1.Underutilized)
if hasCondition {
logging.FromContext(ctx).Debugf("removing underutilized status condition, doesn't have a single node mapping")
}
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
pods, err := node.GetPods(ctx, e.kubeClient, n)
if err != nil {
return reconcile.Result{}, fmt.Errorf("retrieving node pods, %w", err)
}
// Check the node utilization if the utilizationThreshold is specified, the node can be disruptted only if the utilization is below the threshold.
threshold := nodePool.Spec.Disruption.UtilizationThreshold
if threshold != nil {
cpu, err := calculateUtilizationOfResource(n, v1.ResourceCPU, pods)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to calculate CPU, %w", err)
}
memory, err := calculateUtilizationOfResource(n, v1.ResourceMemory, pods)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to calculate memory, %w", err)
}
if cpu < float64(*threshold)/100 && memory < float64(*threshold)/100 {
if !hasCondition {
nodeClaim.StatusConditions().SetCondition(apis.Condition{
Type: v1beta1.Underutilized,
Status: v1.ConditionTrue,
Severity: apis.ConditionSeverityWarning,
})
logging.FromContext(ctx).Debugf("marking underutilizate")
metrics.NodeClaimsDisruptedCounter.With(prometheus.Labels{
metrics.TypeLabel: metrics.ConsolidationReason,
metrics.NodePoolLabel: nodeClaim.Labels[v1beta1.NodePoolLabelKey],
}).Inc()
}
} else {
if hasCondition {
_ = nodeClaim.StatusConditions().ClearCondition(v1beta1.Underutilized)
logging.FromContext(ctx).Debugf("removing underutilized status condition, utilization increased")
}
}
}
return reconcile.Result{}, nil
}

// CalculateUtilizationOfResource calculates utilization of a given resource for a node.
func calculateUtilizationOfResource(node *v1.Node, resourceName v1.ResourceName, pods []*v1.Pod) (float64, error) {
allocatable, found := node.Status.Allocatable[resourceName]
if !found {
return 0, fmt.Errorf("failed to get %v from %s", resourceName, node.Name)
}
if allocatable.MilliValue() == 0 {
return 0, fmt.Errorf("%v is 0 at %s", resourceName, node.Name)
}
podsRequest := resource.MustParse("0")
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
if resourceValue, found := container.Resources.Requests[resourceName]; found {
podsRequest.Add(resourceValue)
}
}
}
return float64(podsRequest.MilliValue()) / float64(allocatable.MilliValue()), nil
}
17 changes: 10 additions & 7 deletions pkg/controllers/nodeclaim/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,20 @@ type nodeClaimReconciler interface {
type Controller struct {
kubeClient client.Client

drift *Drift
expiration *Expiration
emptiness *Emptiness
drift *Drift
expiration *Expiration
emptiness *Emptiness
consolidation *Consolidation
}

// NewController constructs a nodeclaim disruption controller
func NewController(clk clock.Clock, kubeClient client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider) operatorcontroller.Controller {
return operatorcontroller.Typed[*v1beta1.NodeClaim](kubeClient, &Controller{
kubeClient: kubeClient,
drift: &Drift{cloudProvider: cloudProvider},
expiration: &Expiration{kubeClient: kubeClient, clock: clk},
emptiness: &Emptiness{kubeClient: kubeClient, cluster: cluster, clock: clk},
kubeClient: kubeClient,
drift: &Drift{cloudProvider: cloudProvider},
expiration: &Expiration{kubeClient: kubeClient, clock: clk},
emptiness: &Emptiness{kubeClient: kubeClient, cluster: cluster, clock: clk},
consolidation: &Consolidation{kubeClient: kubeClient, cluster: cluster, clock: clk},
})
}

Expand All @@ -86,6 +88,7 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1beta1.NodeClaim
c.expiration,
c.drift,
c.emptiness,
c.consolidation,
}
for _, reconciler := range reconcilers {
res, err := reconciler.Reconcile(ctx, nodePool, nodeClaim)
Expand Down

0 comments on commit 01697c1

Please sign in to comment.