Skip to content

Commit

Permalink
This commit introduces a new redesign on how the operator resets the …
Browse files Browse the repository at this point in the history
…device plugin

* use a general nodeSelector to avoid updating the daemonset yaml
* remove the config-daemon removing pod (better security)
* make the operator in charge of resetting the device plugin via annotations
* mark the node as cordon BEFORE we remove the device plugin (without drain) to avoid scheduling new pods until the device plugin is backed up

Signed-off-by: Sebastian Sch <[email protected]>
  • Loading branch information
SchSeba committed Jul 29, 2024
1 parent ee40683 commit a3e8a58
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 582 deletions.
149 changes: 148 additions & 1 deletion controllers/drain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,29 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

// check the device plugin exited and enable it again
// only of we have something in the node state spec
if len(nodeNetworkState.Spec.Interfaces) > 0 {
completed, err = dr.enableSriovDevicePlugin(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to enable SriovDevicePlugin")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"failed to enable SriovDevicePlugin")
return ctrl.Result{}, err
}

if !completed {
reqLogger.Info("sriov device plugin enable was not completed")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"sriov device plugin enable was not completed")
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}
}

// move the node state back to idle
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
Expand Down Expand Up @@ -209,7 +232,7 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}
}

// class the drain function that will also call drain to other platform providers like openshift
// call the drain function that will also call drain to other platform providers like openshift
drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired)
if err != nil {
reqLogger.Error(err, "error trying to drain the node")
Expand All @@ -230,6 +253,17 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

reqLogger.Info("remove Device plugin from node")
err = utils.LabelNode(ctx, node.Name, constants.SriovDevicePluginEnabledLabel, constants.SriovDevicePluginEnabledLabelDisabled, dr.Client)
if err != nil {
log.Log.Error(err, "failed to label node for device plugin label",
"labelKey",
constants.SriovDevicePluginEnabledLabel,
"labelValue",
constants.SriovDevicePluginEnabledLabelDisabled)
return reconcile.Result{}, err
}

// if we manage to drain we label the node state with drain completed and finish
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client)
if err != nil {
Expand All @@ -243,6 +277,60 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
"DrainController",
"node drain completed")
return ctrl.Result{}, nil
} else if nodeDrainAnnotation == constants.DevicePluginResetRequired {
// nothing to do here we need to wait for the node to move back to idle
if nodeStateDrainAnnotationCurrent == constants.DrainComplete {
reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo")
return ctrl.Result{}, nil
}

// if we are on idle state we move it to drain
if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

// This cover a case where we only need to reset the device plugin
// for that we are going to cordon the node, so we don't get new pods allocated
// to the node in the time we remove the device plugin
err = dr.drainer.RunCordonOrUncordon(ctx, node, true)
if err != nil {
log.Log.Error(err, "failed to cordon on node")
return reconcile.Result{}, err
}

// we switch the sriov label to disable and mark the drain as completed
// no need to wait for the device plugin to exist here as we cordon the node,
// and we want to config-daemon to start the configuration in parallel of the kube-controller to remove the pod
// we check the device plugin was removed when the config-daemon moves is desire state to idle
reqLogger.Info("disable Device plugin from node")
err = utils.LabelNode(ctx, node.Name, constants.SriovDevicePluginEnabledLabel, constants.SriovDevicePluginEnabledLabelDisabled, dr.Client)
if err != nil {
log.Log.Error(err, "failed to label node for device plugin label",
"labelKey",
constants.SriovDevicePluginEnabledLabel,
"labelValue",
constants.SriovDevicePluginEnabledLabelDisabled)
return reconcile.Result{}, err
}

// if we manage to cordon we label the node state with drain completed and finish
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete)
return ctrl.Result{}, err
}

reqLogger.Info("node cordoned successfully and device plugin removed")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node cordoned and device plugin removed completed")
return ctrl.Result{}, nil
}

reqLogger.Error(nil, "unexpected node drain annotation")
Expand Down Expand Up @@ -436,6 +524,65 @@ func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.N
}
}

// enableSriovDevicePlugin change the device plugin label on the requested node to enable
// if there is a pod still running we will return false
func (dr *DrainReconcile) enableSriovDevicePlugin(ctx context.Context, node *corev1.Node) (bool, error) {
logger := log.FromContext(ctx)
logger.Info("enableSriovDevicePlugin():")

// check if the device plugin is terminating only if the node annotation for device plugin is disabled
if node.Annotations[constants.SriovDevicePluginEnabledLabel] == constants.SriovDevicePluginEnabledLabelDisabled {
pods, err := dr.getDevicePluginPodsOnNode(node.Name)
if err != nil {
logger.Error(err, "failed to list device plugin pods running on node")
return false, err
}

if len(pods.Items) != 0 {
log.Log.V(2).Info("device plugin pod still terminating on node")
return false, nil
}
}

logger.Info("enable Device plugin from node")
err := utils.LabelNode(ctx, node.Name, constants.SriovDevicePluginEnabledLabel, constants.SriovDevicePluginEnabledLabelEnabled, dr.Client)
if err != nil {
log.Log.Error(err, "failed to label node for device plugin label",
"labelKey",
constants.SriovDevicePluginEnabledLabel,
"labelValue",
constants.SriovDevicePluginEnabledLabelEnabled)
return false, err
}

// check if the device plugin pod is running on the node
pods, err := dr.getDevicePluginPodsOnNode(node.Name)
if err != nil {
logger.Error(err, "failed to list device plugin pods running on node")
return false, err
}

if len(pods.Items) == 1 && pods.Items[0].Status.Phase == corev1.PodRunning {
logger.Info("Device plugin pod running on node")
return true, nil
}

logger.V(2).Info("Device plugin pod still not running on node")
return false, nil
}

func (dr *DrainReconcile) getDevicePluginPodsOnNode(nodeName string) (*corev1.PodList, error) {
pods := &corev1.PodList{}
err := dr.List(context.Background(), pods, &client.ListOptions{
Raw: &metav1.ListOptions{
LabelSelector: "app=sriov-device-plugin",
FieldSelector: "spec.nodeName=" + nodeName,
ResourceVersion: "0"},
})

return pods, err
}

// SetupWithManager sets up the controller with the Manager.
func (dr *DrainReconcile) SetupWithManager(mgr ctrl.Manager) error {
createUpdateEnqueue := handler.Funcs{
Expand Down
106 changes: 16 additions & 90 deletions controllers/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import (
"encoding/json"
"fmt"
"os"
"sort"
"strings"

errs "github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
uns "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -152,29 +150,25 @@ func formatJSON(str string) (string, error) {
return prettyJSON.String(), nil
}

// GetDefaultNodeSelector return a nodeSelector with worker and linux os
func GetDefaultNodeSelector() map[string]string {
return map[string]string{"node-role.kubernetes.io/worker": "",
"kubernetes.io/os": "linux"}
}

// hasNoValidPolicy returns true if no SriovNetworkNodePolicy
// or only the (deprecated) "default" policy is present
func hasNoValidPolicy(pl []sriovnetworkv1.SriovNetworkNodePolicy) bool {
switch len(pl) {
case 0:
return true
case 1:
return pl[0].Name == constants.DefaultPolicyName
default:
return false
}
// GetDefaultNodeSelectorForDevicePlugin return a nodeSelector with worker linux os
// and the enabled sriov device plugin
func GetDefaultNodeSelectorForDevicePlugin() map[string]string {
return map[string]string{
"node-role.kubernetes.io/worker": "",
"kubernetes.io/os": "linux",
constants.SriovDevicePluginEnabledLabel: constants.SriovDevicePluginEnabledLabelEnabled}
}

func syncPluginDaemonObjs(ctx context.Context,
client k8sclient.Client,
scheme *runtime.Scheme,
dc *sriovnetworkv1.SriovOperatorConfig,
pl *sriovnetworkv1.SriovNetworkNodePolicyList) error {
dc *sriovnetworkv1.SriovOperatorConfig) error {
logger := log.Log.WithName("syncPluginDaemonObjs")
logger.V(1).Info("Start to sync sriov daemons objects")

Expand All @@ -185,24 +179,14 @@ func syncPluginDaemonObjs(ctx context.Context,
data.Data["ReleaseVersion"] = os.Getenv("RELEASEVERSION")
data.Data["ResourcePrefix"] = vars.ResourcePrefix
data.Data["ImagePullSecrets"] = GetImagePullSecrets()
data.Data["NodeSelectorField"] = GetDefaultNodeSelector()
data.Data["NodeSelectorField"] = GetDefaultNodeSelectorForDevicePlugin()
data.Data["UseCDI"] = dc.Spec.UseCDI
objs, err := renderDsForCR(constants.PluginPath, &data)
if err != nil {
logger.Error(err, "Fail to render SR-IoV manifests")
return err
}

if hasNoValidPolicy(pl.Items) {
for _, obj := range objs {
err := deleteK8sResource(ctx, client, obj)
if err != nil {
return err
}
}
return nil
}

// Sync DaemonSets
for _, obj := range objs {
if obj.GetKind() == constants.DaemonSet && len(dc.Spec.ConfigDaemonNodeSelector) > 0 {
Expand All @@ -214,13 +198,15 @@ func syncPluginDaemonObjs(ctx context.Context,
return err
}
ds.Spec.Template.Spec.NodeSelector = dc.Spec.ConfigDaemonNodeSelector
// add the special node selector for the device plugin
ds.Spec.Template.Spec.NodeSelector[constants.SriovDevicePluginEnabledLabel] = constants.SriovDevicePluginEnabledLabelEnabled
err = scheme.Convert(ds, obj, nil)
if err != nil {
logger.Error(err, "Fail to convert to Unstructured")
return err
}
}
err = syncDsObject(ctx, client, scheme, dc, pl, obj)
err = syncDsObject(ctx, client, scheme, dc, obj)
if err != nil {
logger.Error(err, "Couldn't sync SR-IoV daemons objects")
return err
Expand All @@ -230,14 +216,7 @@ func syncPluginDaemonObjs(ctx context.Context,
return nil
}

func deleteK8sResource(ctx context.Context, client k8sclient.Client, in *uns.Unstructured) error {
if err := apply.DeleteObject(ctx, client, in); err != nil {
return fmt.Errorf("failed to delete object %v with err: %v", in, err)
}
return nil
}

func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, pl *sriovnetworkv1.SriovNetworkNodePolicyList, obj *uns.Unstructured) error {
func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, obj *uns.Unstructured) error {
logger := log.Log.WithName("syncDsObject")
kind := obj.GetKind()
logger.V(1).Info("Start to sync Objects", "Kind", kind)
Expand All @@ -257,7 +236,7 @@ func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime.
logger.Error(err, "Fail to convert to DaemonSet")
return err
}
err = syncDaemonSet(ctx, client, scheme, dc, pl, ds)
err = syncDaemonSet(ctx, client, scheme, dc, ds)
if err != nil {
logger.Error(err, "Fail to sync DaemonSet", "Namespace", ds.Namespace, "Name", ds.Name)
return err
Expand All @@ -266,54 +245,6 @@ func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime.
return nil
}

func setDsNodeAffinity(pl *sriovnetworkv1.SriovNetworkNodePolicyList, ds *appsv1.DaemonSet) error {
terms := nodeSelectorTermsForPolicyList(pl.Items)
if len(terms) > 0 {
ds.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: terms,
},
},
}
}
return nil
}

func nodeSelectorTermsForPolicyList(policies []sriovnetworkv1.SriovNetworkNodePolicy) []corev1.NodeSelectorTerm {
terms := []corev1.NodeSelectorTerm{}
for _, p := range policies {
// Note(adrianc): default policy is deprecated and ignored.
if p.Name == constants.DefaultPolicyName {
continue
}

if len(p.Spec.NodeSelector) == 0 {
continue
}
expressions := []corev1.NodeSelectorRequirement{}
for k, v := range p.Spec.NodeSelector {
exp := corev1.NodeSelectorRequirement{
Operator: corev1.NodeSelectorOpIn,
Key: k,
Values: []string{v},
}
expressions = append(expressions, exp)
}
// sorting is needed to keep the daemon spec stable.
// the items are popped in a random order from the map
sort.Slice(expressions, func(i, j int) bool {
return expressions[i].Key < expressions[j].Key
})
nodeSelector := corev1.NodeSelectorTerm{
MatchExpressions: expressions,
}
terms = append(terms, nodeSelector)
}

return terms
}

// renderDsForCR returns a busybox pod with the same name/namespace as the cr
func renderDsForCR(path string, data *render.RenderData) ([]*uns.Unstructured, error) {
logger := log.Log.WithName("renderDsForCR")
Expand All @@ -326,16 +257,11 @@ func renderDsForCR(path string, data *render.RenderData) ([]*uns.Unstructured, e
return objs, nil
}

func syncDaemonSet(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, pl *sriovnetworkv1.SriovNetworkNodePolicyList, in *appsv1.DaemonSet) error {
func syncDaemonSet(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, in *appsv1.DaemonSet) error {
logger := log.Log.WithName("syncDaemonSet")
logger.V(1).Info("Start to sync DaemonSet", "Namespace", in.Namespace, "Name", in.Name)
var err error

if pl != nil {
if err = setDsNodeAffinity(pl, in); err != nil {
return err
}
}
if err = controllerutil.SetControllerReference(dc, in, scheme); err != nil {
return err
}
Expand Down
Loading

0 comments on commit a3e8a58

Please sign in to comment.