Skip to content

Commit

Permalink
This commit introduces a new redesign on how the operator resets the …
Browse files Browse the repository at this point in the history
…device plugin

* use a general nodeSelector to avoid updating the daemonset yaml
* remove the config-daemon removing pod (better security)
* make the operator in charge of resetting the device plugin via annotations
* mark the node as cordon BEFORE we remove the device plugin (without drain) to avoid scheduling new pods until the device plugin is backed up

Signed-off-by: Sebastian Sch <[email protected]>
  • Loading branch information
SchSeba committed Oct 7, 2024
1 parent 4bae6ce commit 146b706
Show file tree
Hide file tree
Showing 18 changed files with 758 additions and 853 deletions.
272 changes: 14 additions & 258 deletions controllers/drain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import (
"context"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -136,11 +134,11 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
if nodeDrainAnnotation == constants.DrainIdle {
// this cover the case the node is on idle

// node request to be on idle and the currect state is idle
// we don't do anything
// node request to be on idle and the current state is idle
// in this case we check if vf configuration exist in the nodeState
// if not we remove the device plugin node selector label from the node
if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
reqLogger.Info("node and nodeState are on idle nothing todo")
return reconcile.Result{}, nil
return dr.handleNodeIdleNodeStateIdle(ctx, &reqLogger, node, nodeNetworkState)
}

// we have two options here:
Expand All @@ -151,98 +149,19 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// doesn't need to drain anymore, so we can stop the drain
if nodeStateDrainAnnotationCurrent == constants.DrainComplete ||
nodeStateDrainAnnotationCurrent == constants.Draining {
completed, err := dr.drainer.CompleteDrainNode(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to complete drain on node")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"failed to drain node")
return ctrl.Result{}, err
}

// if we didn't manage to complete the un drain of the node we retry
if !completed {
reqLogger.Info("complete drain was not completed re queueing the request")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node complete drain was not completed")
// TODO: make this time configurable
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

// move the node state back to idle
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle)
return ctrl.Result{}, err
}

reqLogger.Info("completed the un drain for node")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node un drain completed")
return ctrl.Result{}, nil
}
} else if nodeDrainAnnotation == constants.DrainRequired || nodeDrainAnnotation == constants.RebootRequired {
// this cover the case a node request to drain or reboot

// nothing to do here we need to wait for the node to move back to idle
if nodeStateDrainAnnotationCurrent == constants.DrainComplete {
reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo")
return ctrl.Result{}, nil
}

// we need to start the drain, but first we need to check that we can drain the node
if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
result, err := dr.tryDrainNode(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to check if we can drain the node")
return ctrl.Result{}, err
}

// in case we need to wait because we just to the max number of draining nodes
if result != nil {
return *result, nil
}
}

// class the drain function that will also call drain to other platform providers like openshift
drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired)
if err != nil {
reqLogger.Error(err, "error trying to drain the node")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"failed to drain node")
return reconcile.Result{}, err
}

// if we didn't manage to complete the drain of the node we retry
if !drained {
reqLogger.Info("the nodes was not drained re queueing the request")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node drain operation was not completed")
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
return dr.handleNodeIdleNodeStateDrainingOrCompleted(ctx, &reqLogger, node, nodeNetworkState)
}
}

// if we manage to drain we label the node state with drain completed and finish
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete)
return ctrl.Result{}, err
}
// this cover the case a node request to drain or reboot
if nodeDrainAnnotation == constants.DrainRequired ||
nodeDrainAnnotation == constants.RebootRequired {
return dr.handleNodeDrainOrReboot(ctx, &reqLogger, node, nodeNetworkState, nodeDrainAnnotation, nodeStateDrainAnnotationCurrent)
}

reqLogger.Info("node drained successfully")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node drain completed")
return ctrl.Result{}, nil
// this cover the case a node request to only reset the device plugin
if nodeDrainAnnotation == constants.DevicePluginResetRequired {
return dr.handleNodeDPReset(ctx, &reqLogger, node, nodeNetworkState, nodeStateDrainAnnotationCurrent)
}

reqLogger.Error(nil, "unexpected node drain annotation")
Expand Down Expand Up @@ -273,169 +192,6 @@ func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object cli
return value, nil
}

func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) {
// configure logs
reqLogger := log.FromContext(ctx)
reqLogger.Info("checkForNodeDrain():")

//critical section we need to check if we can start the draining
dr.drainCheckMutex.Lock()
defer dr.drainCheckMutex.Unlock()

// find the relevant node pool
nodePool, nodeList, err := dr.findNodePoolConfig(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to find the pool for the requested node")
return nil, err
}

// check how many nodes we can drain in parallel for the specific pool
maxUnv, err := nodePool.MaxUnavailable(len(nodeList))
if err != nil {
reqLogger.Error(err, "failed to calculate max unavailable")
return nil, err
}

current := 0
snns := &sriovnetworkv1.SriovNetworkNodeState{}

var currentSnns *sriovnetworkv1.SriovNetworkNodeState
for _, nodeObj := range nodeList {
err = dr.Get(ctx, client.ObjectKey{Name: nodeObj.GetName(), Namespace: vars.Namespace}, snns)
if err != nil {
if errors.IsNotFound(err) {
reqLogger.V(2).Info("node doesn't have a sriovNetworkNodePolicy")
continue
}
return nil, err
}

if snns.GetName() == node.GetName() {
currentSnns = snns.DeepCopy()
}

if utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.Draining) ||
utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) {
current++
}
}
reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxUnv)
reqLogger.Info("Count of draining", "drainingNodes", current)

// if maxUnv is zero this means we drain all the nodes in parallel without a limit
if maxUnv == -1 {
reqLogger.Info("draining all the nodes in parallel")
} else if current >= maxUnv {
// the node requested to be drained, but we are at the limit so we re-enqueue the request
reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes re-enqueue the request")
// TODO: make this time configurable
return &reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

if currentSnns == nil {
return nil, fmt.Errorf("failed to find sriov network node state for requested node")
}

err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining)
return nil, err
}

return nil, nil
}

func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.Node) (*sriovnetworkv1.SriovNetworkPoolConfig, []corev1.Node, error) {
logger := log.FromContext(ctx)
logger.Info("findNodePoolConfig():")
// get all the sriov network pool configs
npcl := &sriovnetworkv1.SriovNetworkPoolConfigList{}
err := dr.List(ctx, npcl)
if err != nil {
logger.Error(err, "failed to list sriovNetworkPoolConfig")
return nil, nil, err
}

selectedNpcl := []*sriovnetworkv1.SriovNetworkPoolConfig{}
nodesInPools := map[string]interface{}{}

for _, npc := range npcl.Items {
// we skip hw offload objects
if npc.Spec.OvsHardwareOffloadConfig.Name != "" {
continue
}

if npc.Spec.NodeSelector == nil {
npc.Spec.NodeSelector = &metav1.LabelSelector{}
}

selector, err := metav1.LabelSelectorAsSelector(npc.Spec.NodeSelector)
if err != nil {
logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", npc.Spec.NodeSelector)
return nil, nil, err
}

if selector.Matches(labels.Set(node.Labels)) {
selectedNpcl = append(selectedNpcl, npc.DeepCopy())
}

nodeList := &corev1.NodeList{}
err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector})
if err != nil {
logger.Error(err, "failed to list all the nodes matching the pool with label selector from nodeSelector",
"machineConfigPoolName", npc,
"nodeSelector", npc.Spec.NodeSelector)
return nil, nil, err
}

for _, nodeName := range nodeList.Items {
nodesInPools[nodeName.Name] = nil
}
}

if len(selectedNpcl) > 1 {
// don't allow the node to be part of multiple pools
err = fmt.Errorf("node is part of more then one pool")
logger.Error(err, "multiple pools founded for a specific node", "numberOfPools", len(selectedNpcl), "pools", selectedNpcl)
return nil, nil, err
} else if len(selectedNpcl) == 1 {
// found one pool for our node
logger.V(2).Info("found sriovNetworkPool", "pool", *selectedNpcl[0])
selector, err := metav1.LabelSelectorAsSelector(selectedNpcl[0].Spec.NodeSelector)
if err != nil {
logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", selectedNpcl[0].Spec.NodeSelector)
return nil, nil, err
}

// list all the nodes that are also part of this pool and return them
nodeList := &corev1.NodeList{}
err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector})
if err != nil {
logger.Error(err, "failed to list nodes using with label selector", "labelSelector", selector)
return nil, nil, err
}

return selectedNpcl[0], nodeList.Items, nil
} else {
// in this case we get all the nodes and remove the ones that already part of any pool
logger.V(1).Info("node doesn't belong to any pool, using default drain configuration with MaxUnavailable of one", "pool", *defaultNpcl)
nodeList := &corev1.NodeList{}
err = dr.List(ctx, nodeList)
if err != nil {
logger.Error(err, "failed to list all the nodes")
return nil, nil, err
}

defaultNodeLists := []corev1.Node{}
for _, nodeObj := range nodeList.Items {
if _, exist := nodesInPools[nodeObj.Name]; !exist {
defaultNodeLists = append(defaultNodeLists, nodeObj)
}
}
return defaultNpcl, defaultNodeLists, nil
}
}

// SetupWithManager sets up the controller with the Manager.
func (dr *DrainReconcile) SetupWithManager(mgr ctrl.Manager) error {
createUpdateEnqueue := handler.Funcs{
Expand Down
Loading

0 comments on commit 146b706

Please sign in to comment.