Skip to content

Commit

Permalink
Update to use RepairStatements
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Nov 8, 2024
1 parent 897855b commit 2338123
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 45 deletions.
4 changes: 2 additions & 2 deletions kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (c CloudProvider) GetSupportedNodeClasses() []status.Object {
return []status.Object{&v1alpha1.KWOKNodeClass{}}
}

func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairPolicy {
return []cloudprovider.RepairPolicy{}
func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairStatements {
return []cloudprovider.RepairStatements{}
}

func (c CloudProvider) getInstanceType(instanceTypeName string) (*cloudprovider.InstanceType, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ func (c *CloudProvider) IsDrifted(context.Context, *v1.NodeClaim) (cloudprovider
return c.Drifted, nil
}

func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairPolicy {
return []cloudprovider.RepairPolicy{
func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairStatements {
return []cloudprovider.RepairStatements{
{
Type: "HealthyNode",
Status: corev1.ConditionFalse,
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (

type DriftReason string

type RepairPolicy struct {
type RepairStatements struct {
// Type of unhealthy state that is found on the node
Type corev1.NodeConditionType
// Status condition of when a node is unhealthy
Expand Down Expand Up @@ -76,7 +76,7 @@ type CloudProvider interface {
IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error)
// RepairPolicy is for CloudProviders to define a set Unhealthy condition for Karpenter
// to monitor on the node.
RepairPolicy() []RepairPolicy
RepairPolicy() []RepairStatements
// Name returns the CloudProvider implementation name.
Name() string
// GetSupportedNodeClasses returns CloudProvider NodeClass that implements status.Object
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func NewControllers(
nodeclaimdisruption.NewController(clock, kubeClient, cloudProvider),
status.NewController[*v1.NodeClaim](kubeClient, mgr.GetEventRecorderFor("karpenter")),
status.NewController[*v1.NodePool](kubeClient, mgr.GetEventRecorderFor("karpenter")),
// status.NewGenericObjectController[*corev1.Node](kubeClient, mgr.GetEventRecorderFor("karpenter")),
// status.NewGenericObjectController[*corev1.Pod](kubeClient, mgr.GetEventRecorderFor("karpenter")),
}

// The cloud proivder must define status condation for the node repair controller to used for dectecting unhealthy nodes
Expand Down
61 changes: 22 additions & 39 deletions pkg/controllers/node/health/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -57,45 +54,52 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr
}
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("node.health").
For(&corev1.Node{}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "node.health")
nodeHealthCondation := corev1.NodeCondition{}
cloudProivderPolicy := cloudprovider.RepairPolicy{}
nodeHealthCondition := corev1.NodeCondition{}
foundCloudProviderPolicy := cloudprovider.RepairStatements{}

if !node.GetDeletionTimestamp().IsZero() {
return reconcile.Result{}, nil
}

for _, policy := range c.cloudProvider.RepairPolicy() {
nodeHealthCondation = nodeutils.GetCondition(node, policy.Type)
if nodeHealthCondation.Status == policy.Status {
cloudProivderPolicy = policy
nodeHealthCondition = nodeutils.GetCondition(node, policy.Type)
if nodeHealthCondition.Status == policy.Status {
// found unhealthy condition on the node
foundCloudProviderPolicy = policy
break
}
}

// From here there are three scenarios to handle:
// 1. If node is healthy, exit node healhty loop
if cloudProivderPolicy.Type == "" {
// 1. If node is healthy, exit node repair loop
if foundCloudProviderPolicy.Type == "" {
return reconcile.Result{}, nil
}

// 2. If the Node is unhealthy, but has not reached it's full toleration disruption, exit the loop
dusruptionTime := nodeHealthCondation.LastTransitionTime.Add(cloudProivderPolicy.TolerationDuration)
if c.clock.Now().Before(dusruptionTime) {
// Use t.Sub(clock.Now()) instead of time.Until() to ensure we're using the injected clock.
return reconcile.Result{RequeueAfter: dusruptionTime.Sub(c.clock.Now())}, nil
disruptionTime := nodeHealthCondition.LastTransitionTime.Add(foundCloudProviderPolicy.TolerationDuration)
if c.clock.Now().Before(disruptionTime) {
return reconcile.Result{RequeueAfter: disruptionTime.Sub(c.clock.Now())}, nil
}

nodeClaims, err := nodeutils.GetNodeClaims(ctx, node, c.kubeClient)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
}
if err := c.annotateTerminationGracePeriodByDefualt(ctx, nodeClaims[0]); err != nil {
return reconcile.Result{}, fmt.Errorf("annotated termination grace peirod on nodeclaim, %w", err)
if err := c.annotateTerminationGracePeriod(ctx, nodeClaims[0]); err != nil {
return reconcile.Result{}, fmt.Errorf("annotated termination grace period on nodeclaim, %w", err)
}

// 3. Otherwise, if the Node is unhealthy we can forcefully remove the node (by deleting it)
// 3. Otherwise, if the Node is unhealthy we can forcefully terminate the node (by deleting it)
if err := c.kubeClient.Delete(ctx, node); err != nil {
return reconcile.Result{}, err
}
Expand All @@ -109,11 +113,7 @@ func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcil
return reconcile.Result{}, nil
}

func (c *Controller) annotateTerminationGracePeriodByDefualt(ctx context.Context, nodeClaim *v1.NodeClaim) error {
if _, ok := nodeClaim.ObjectMeta.Annotations[v1.NodeClaimTerminationTimestampAnnotationKey]; ok {
return nil
}

func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeClaim *v1.NodeClaim) error {
stored := nodeClaim.DeepCopy()
terminationTime := c.clock.Now().Format(time.RFC3339)
nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: terminationTime})
Expand All @@ -124,20 +124,3 @@ func (c *Controller) annotateTerminationGracePeriodByDefualt(ctx context.Context

return nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("node.health").
For(&corev1.Node{}).
WithOptions(
controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](100*time.Millisecond, 10*time.Second),
// 10 qps, 100 bucket size
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
MaxConcurrentReconciles: 100,
},
).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

0 comments on commit 2338123

Please sign in to comment.