Skip to content

Commit

Permalink
feat: Support multiple MOFED DS
Browse files Browse the repository at this point in the history
Mofed driver precompiled container images
are compiled using a specific Kernel.

As a result, the Mofed Driver DaemonSet should
have the Kernel as part of the Node Selector.

In addition, since there can be Nodes with different
Kernel versions, a DaemonSet for each existing Kernel
in the cluster is created.

In the Migration module, the former DS is deleted
with DeletePropagationOrphan so that MOFED pods will
still exists until manual or auto-upgrade is done.

Signed-off-by: Fred Rolland <[email protected]>
  • Loading branch information
rollandf committed Dec 31, 2023
1 parent c7d9a93 commit 329b06a
Show file tree
Hide file tree
Showing 17 changed files with 606 additions and 129 deletions.
10 changes: 10 additions & 0 deletions controllers/nicclusterpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type NicClusterPolicyReconciler struct {
Scheme *runtime.Scheme
ClusterTypeProvider clustertype.Provider
StaticConfigProvider staticconfig.Provider
MigrationCh chan struct{}

stateManager state.Manager
}
Expand Down Expand Up @@ -86,6 +87,11 @@ type NicClusterPolicyReconciler struct {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *NicClusterPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.V(consts.LogLevelInfo).Info("Reconciling NicClusterPolicy")

Expand Down Expand Up @@ -179,6 +185,10 @@ func (r *NicClusterPolicyReconciler) handleMOFEDWaitLabels(
_ = r.Client.List(ctx, pods, client.MatchingLabels{"driver-pod": podLabel})
for i := range pods.Items {
pod := pods.Items[i]
if pod.Spec.NodeName == "" {
// In case that Pod is in Pending state
continue
}
labelValue := "true"
// We assume that OFED pod contains only one container to simplify the logic.
// We can revisit this logic in the future if needed
Expand Down
4 changes: 4 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,15 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
staticConfigProvider := staticconfig.NewProvider(staticconfig.StaticConfig{CniBinDirectory: "/opt/cni/bin"})

migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)

err = (&NicClusterPolicyReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
ClusterTypeProvider: clusterTypeProvider,
StaticConfigProvider: staticConfigProvider,
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

Expand Down
7 changes: 7 additions & 0 deletions controllers/upgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"
"time"

"github.com/NVIDIA/k8s-operator-libs/pkg/upgrade"
Expand Down Expand Up @@ -47,6 +48,7 @@ type UpgradeReconciler struct {
client.Client
Scheme *runtime.Scheme
StateManager upgrade.ClusterUpgradeStateManager
MigrationCh chan struct{}
}

const plannedRequeueInterval = time.Minute * 2
Expand All @@ -64,6 +66,11 @@ const UpgradeStateAnnotation = "nvidia.com/ofed-upgrade-state"
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *UpgradeReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.V(consts.LogLevelInfo).Info("Reconciling Upgrade")

Expand Down
14 changes: 10 additions & 4 deletions controllers/upgrade_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ var _ = Describe("Upgrade Controller", func() {
})
Context("When NicClusterPolicy CR is created", func() {
It("Upgrade policy is disabled", func() {
migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)
upgradeReconciler := &UpgradeReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
MigrationCh: migrationCompletionChan,
}

req := ctrl.Request{NamespacedName: types.NamespacedName{Name: consts.NicClusterPolicyResourceName}}
Expand All @@ -76,10 +79,13 @@ var _ = Describe("Upgrade Controller", func() {
err := k8sClient.Create(goctx.TODO(), node)
Expect(err).NotTo(HaveOccurred())
}
migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)

upgradeReconciler := &UpgradeReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
MigrationCh: migrationCompletionChan,
}
// Call removeNodeUpgradeStateLabels function
err := upgradeReconciler.removeNodeUpgradeStateLabels(goctx.TODO())
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
github.com/xeipuuv/gojsonschema v1.2.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.28.4
k8s.io/apimachinery v0.28.4
k8s.io/client-go v0.28.4
Expand Down Expand Up @@ -83,7 +85,6 @@ require (
go.starlark.net v0.0.0-20231101134539-556fd59b42f6 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
golang.org/x/sync v0.5.0 // indirect
Expand All @@ -98,7 +99,6 @@ require (
gopkg.in/evanphx/json-patch.v5 v5.7.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.28.4 // indirect
k8s.io/cli-runtime v0.28.4 // indirect
k8s.io/component-base v0.28.4 // indirect
Expand Down
59 changes: 38 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func setupWebhookControllers(mgr ctrl.Manager) error {
return nil
}

func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager) error {
func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager, migrationChan chan struct{}) error {
ctrLog := setupLog.WithName("controller")
clusterTypeProvider, err := clustertype.NewProvider(ctx, c)

Expand All @@ -95,6 +95,7 @@ func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager)
Scheme: mgr.GetScheme(),
ClusterTypeProvider: clusterTypeProvider, // we want to cache information about the cluster type
StaticConfigProvider: staticInfoProvider,
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("NicClusterPolicy")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NicClusterPolicy")
return err
Expand Down Expand Up @@ -163,35 +164,26 @@ func main() {
os.Exit(1)
}

// run migration logic before controllers start
if err := migrate.Migrate(stopCtx, setupLog.WithName("migrate"), directClient); err != nil {
setupLog.Error(err, "failed to run migration logic")
os.Exit(1)
migrationCompletionChan := make(chan struct{})
m := migrate.Migrator{
K8sClient: directClient,
MigrationCh: migrationCompletionChan,
LeaderElection: enableLeaderElection,
Logger: ctrl.Log.WithName("Migrator"),
}

err = setupCRDControllers(stopCtx, directClient, mgr)
err = mgr.Add(&m)
if err != nil {
setupLog.Error(err, "failed to add Migrator to the Manager")
os.Exit(1)
}

upgrade.SetDriverName("ofed")

upgradeLogger := ctrl.Log.WithName("controllers").WithName("Upgrade")

clusterUpdateStateManager, err := upgrade.NewClusterUpgradeStateManager(
upgradeLogger.WithName("clusterUpgradeManager"), config.GetConfigOrDie(), nil)

err = setupCRDControllers(stopCtx, directClient, mgr, migrationCompletionChan)
if err != nil {
setupLog.Error(err, "unable to create new ClusterUpdateStateManager", "controller", "Upgrade")
os.Exit(1)
}

if err = (&controllers.UpgradeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StateManager: clusterUpdateStateManager,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Upgrade")
err = setupUpgradeController(mgr, migrationCompletionChan)
if err != nil {
os.Exit(1)
}

Expand All @@ -218,3 +210,28 @@ func main() {
os.Exit(1)
}
}

func setupUpgradeController(mgr ctrl.Manager, migrationChan chan struct{}) error {
upgrade.SetDriverName("ofed")

upgradeLogger := ctrl.Log.WithName("controllers").WithName("Upgrade")

clusterUpdateStateManager, err := upgrade.NewClusterUpgradeStateManager(
upgradeLogger.WithName("clusterUpgradeManager"), config.GetConfigOrDie(), nil)

if err != nil {
setupLog.Error(err, "unable to create new ClusterUpdateStateManager", "controller", "Upgrade")
return err
}

if err = (&controllers.UpgradeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StateManager: clusterUpdateStateManager,
MigrationCh: migrationChan,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Upgrade")
return err
}
return nil
}
11 changes: 7 additions & 4 deletions manifests/state-ofed-driver/0050_ofed-driver-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@ apiVersion: apps/v1
kind: DaemonSet
metadata:
labels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelMajor }}.{{ .RuntimeSpec.KernelMinor }}
nvidia.com/ofed-driver: ""
name: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-ds
mofed-ds-format-version: "1"
name: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelMajor }}.{{ .RuntimeSpec.KernelMinor }}-ds
namespace: {{ .RuntimeSpec.Namespace }}
spec:
updateStrategy:
type: OnDelete
selector:
matchLabels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelMajor }}.{{ .RuntimeSpec.KernelMinor }}
template:
metadata:
labels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelMajor }}.{{ .RuntimeSpec.KernelMinor }}
driver-pod: mofed-{{ .CrSpec.Version }}
nvidia.com/ofed-driver: ""
spec:
Expand Down Expand Up @@ -180,6 +181,8 @@ spec:
feature.node.kubernetes.io/pci-15b3.present: "true"
feature.node.kubernetes.io/system-os_release.ID: {{ .RuntimeSpec.OSName }}
feature.node.kubernetes.io/system-os_release.VERSION_ID: "{{ .RuntimeSpec.OSVer }}"
feature.node.kubernetes.io/kernel-version.major: "{{ .RuntimeSpec.KernelMajor }}"
feature.node.kubernetes.io/kernel-version.minor: "{{ .RuntimeSpec.KernelMinor }}"
{{- if .NodeAffinity }}
affinity:
nodeAffinity:
Expand Down
76 changes: 76 additions & 0 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -31,8 +32,34 @@ import (

"github.com/Mellanox/network-operator/pkg/config"
"github.com/Mellanox/network-operator/pkg/consts"

mellanoxv1alpha1 "github.com/Mellanox/network-operator/api/v1alpha1"
)

// Migrator migrates from previous versions
type Migrator struct {
K8sClient client.Client
MigrationCh chan struct{}
LeaderElection bool
Logger logr.Logger
}

// NeedLeaderElection implements manager.NeedLeaderElection
func (m *Migrator) NeedLeaderElection() bool {
return m.LeaderElection
}

// Start implements manager.Runnable
func (m *Migrator) Start(ctx context.Context) error {
err := Migrate(ctx, m.Logger, m.K8sClient)
if err != nil {
m.Logger.Error(err, "failed to migrate Network Operator")
return err
}
close(m.MigrationCh)
return nil
}

// Migrate contains logic which should run once during controller start.
// The main use case for this handler is network-operator upgrade
// for example, the handler can contain logic to change old data format to a new one or
Expand All @@ -55,6 +82,11 @@ func migrate(ctx context.Context, log logr.Logger, c client.Client) error {
log.V(consts.LogLevelError).Error(err, "error trying to remove state label on NV IPAM configmap")
return err
}
if err := handleSingleMofedDS(ctx, log, c); err != nil {
// critical for the operator operation, will fail Mofed migration
log.V(consts.LogLevelError).Error(err, "error trying to remove state label on NV IPAM configmap")
return err
}
return nil
}

Expand Down Expand Up @@ -115,3 +147,47 @@ func removeStateLabelFromNVIpamConfigMap(ctx context.Context, log logr.Logger, c
}
return nil
}

func handleSingleMofedDS(ctx context.Context, log logr.Logger, c client.Client) error {
ncp := &mellanoxv1alpha1.NicClusterPolicy{}
key := types.NamespacedName{
Name: consts.NicClusterPolicyResourceName,
}
err := c.Get(ctx, key, ncp)
if apiErrors.IsNotFound(err) {
log.V(consts.LogLevelDebug).Info("NIC ClusterPolicy not found, skip handling single MOFED DS")
return nil
} else if err != nil {
log.V(consts.LogLevelError).Error(err, "fail to get NIC ClusterPolicy")
return err
}
if ncp.Spec.OFEDDriver == nil {
return nil
}
log.V(consts.LogLevelDebug).Info("Searching for single MOFED DS")
dsList := &appsv1.DaemonSetList{}
err = c.List(ctx, dsList, client.MatchingLabels{"nvidia.com/ofed-driver": ""})
if err != nil {
log.V(consts.LogLevelError).Error(err, "fail to list MOFED DS")
return err
}
for i := range dsList.Items {
mofedDs := &dsList.Items[i]
// The single MOFED DS does not contain the label "mofed-ds-format-version"
_, ok := mofedDs.Labels["mofed-ds-format-version"]
if ok {
continue
}
log.V(consts.LogLevelDebug).Info("Found single MOFED DS", "name", mofedDs.Name)
policy := metav1.DeletePropagationOrphan
err = c.Delete(ctx, mofedDs, &client.DeleteOptions{PropagationPolicy: &policy})
if err != nil {
log.V(consts.LogLevelError).Error(err, "fail delete single MOFED DS")
return err
}
log.V(consts.LogLevelDebug).Info("Deleted single MOFED DS with orphaned", "name", mofedDs.Name)
return nil
}
log.V(consts.LogLevelDebug).Info("Single MOFED DS not found")
return nil
}
Loading

0 comments on commit 329b06a

Please sign in to comment.