Skip to content

Commit

Permalink
feat: Support multiple MOFED DS
Browse files Browse the repository at this point in the history
Mofed driver precompiled container images
are compiled using a specific Kernel.

As a result, the Mofed Driver DaemonSet should
have the Kernel as part of the Node Selector.

In addition, since there can be Nodes with different
Kernel versions, a DaemonSet for each existing Kernel
in the cluster is created.

In the Migration module, the former DS is deleted
with DeletePropagationOrphan so that MOFED pods will
still exists until manual or auto-upgrade is done.

Signed-off-by: Fred Rolland <[email protected]>
  • Loading branch information
rollandf committed Mar 7, 2024
1 parent e6cfb4d commit 7d70631
Show file tree
Hide file tree
Showing 18 changed files with 769 additions and 120 deletions.
10 changes: 9 additions & 1 deletion controllers/hostdevicenetwork_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers //nolint:dupl

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
Expand All @@ -43,7 +44,8 @@ import (
// HostDeviceNetworkReconciler reconciles a HostDeviceNetwork object
type HostDeviceNetworkReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
MigrationCh chan struct{}

stateManager state.Manager
}
Expand All @@ -59,6 +61,12 @@ type HostDeviceNetworkReconciler struct {
//
//nolint:dupl
func (r *HostDeviceNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling HostDeviceNetwork")

Expand Down
10 changes: 9 additions & 1 deletion controllers/ipoibnetwork_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers //nolint:dupl

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
Expand All @@ -41,7 +42,8 @@ import (
// IPoIBNetworkReconciler reconciles a IPoIBNetwork object
type IPoIBNetworkReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
MigrationCh chan struct{}

stateManager state.Manager
}
Expand All @@ -57,6 +59,12 @@ type IPoIBNetworkReconciler struct {
//
//nolint:dupl
func (r *IPoIBNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling IPoIBNetwork")

Expand Down
12 changes: 10 additions & 2 deletions controllers/macvlannetwork_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers //nolint:dupl

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
Expand All @@ -41,8 +42,9 @@ import (
// MacvlanNetworkReconciler reconciles a MacvlanNetwork object
type MacvlanNetworkReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Log logr.Logger
Scheme *runtime.Scheme
MigrationCh chan struct{}

stateManager state.Manager
}
Expand All @@ -58,6 +60,12 @@ type MacvlanNetworkReconciler struct {
//
//nolint:dupl
func (r *MacvlanNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling MacvlanNetwork")

Expand Down
11 changes: 11 additions & 0 deletions controllers/nicclusterpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type NicClusterPolicyReconciler struct {
Scheme *runtime.Scheme
ClusterTypeProvider clustertype.Provider
StaticConfigProvider staticconfig.Provider
MigrationCh chan struct{}

stateManager state.Manager
}
Expand Down Expand Up @@ -87,6 +88,12 @@ type NicClusterPolicyReconciler struct {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *NicClusterPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.V(consts.LogLevelInfo).Info("Reconciling NicClusterPolicy")

Expand Down Expand Up @@ -179,6 +186,10 @@ func (r *NicClusterPolicyReconciler) handleMOFEDWaitLabels(
_ = r.Client.List(ctx, pods, client.MatchingLabels{"nvidia.com/ofed-driver": ""})
for i := range pods.Items {
pod := pods.Items[i]
if pod.Spec.NodeName == "" {
// In case that Pod is in Pending state
continue
}
labelValue := "true"
// We assume that OFED pod contains only one container to simplify the logic.
// We can revisit this logic in the future if needed
Expand Down
19 changes: 13 additions & 6 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,27 @@ var _ = BeforeSuite(func() {
})
Expect(err).ToNot(HaveOccurred())

migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)

err = (&HostDeviceNetworkReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

err = (&IPoIBNetworkReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

err = (&MacvlanNetworkReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

Expand All @@ -133,6 +139,7 @@ var _ = BeforeSuite(func() {
Scheme: k8sManager.GetScheme(),
ClusterTypeProvider: clusterTypeProvider,
StaticConfigProvider: staticConfigProvider,
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

Expand Down
8 changes: 8 additions & 0 deletions controllers/upgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"
"time"

"github.com/NVIDIA/k8s-operator-libs/pkg/upgrade"
Expand Down Expand Up @@ -47,6 +48,7 @@ type UpgradeReconciler struct {
client.Client
Scheme *runtime.Scheme
StateManager upgrade.ClusterUpgradeStateManager
MigrationCh chan struct{}
}

const plannedRequeueInterval = time.Minute * 2
Expand All @@ -64,6 +66,12 @@ const UpgradeStateAnnotation = "nvidia.com/ofed-upgrade-state"
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *UpgradeReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.V(consts.LogLevelInfo).Info("Reconciling Upgrade")

Expand Down
14 changes: 10 additions & 4 deletions controllers/upgrade_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ var _ = Describe("Upgrade Controller", func() {
})
Context("When NicClusterPolicy CR is created", func() {
It("Upgrade policy is disabled", func() {
migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)
upgradeReconciler := &UpgradeReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
MigrationCh: migrationCompletionChan,
}

req := ctrl.Request{NamespacedName: types.NamespacedName{Name: consts.NicClusterPolicyResourceName}}
Expand All @@ -76,10 +79,13 @@ var _ = Describe("Upgrade Controller", func() {
err := k8sClient.Create(goctx.TODO(), node)
Expect(err).NotTo(HaveOccurred())
}
migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)

upgradeReconciler := &UpgradeReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
MigrationCh: migrationCompletionChan,
}
// Call removeNodeUpgradeStateLabels function
err := upgradeReconciler.removeNodeUpgradeStateLabels(goctx.TODO())
Expand Down
74 changes: 47 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func setupWebhookControllers(mgr ctrl.Manager) error {
return nil
}

func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager) error {
func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager, migrationChan chan struct{}) error {
ctrLog := setupLog.WithName("controller")
clusterTypeProvider, err := clustertype.NewProvider(ctx, c)

Expand All @@ -98,27 +98,31 @@ func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager)
Scheme: mgr.GetScheme(),
ClusterTypeProvider: clusterTypeProvider, // we want to cache information about the cluster type
StaticConfigProvider: staticInfoProvider,
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("NicClusterPolicy")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NicClusterPolicy")
return err
}
if err := (&controllers.MacvlanNetworkReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("MacVlanNetwork")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MacvlanNetwork")
return err
}
if err := (&controllers.HostDeviceNetworkReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("HostDeviceNetwork")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HostDeviceNetwork")
return err
}
if err := (&controllers.IPoIBNetworkReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("IPoIBNetwork")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "IPoIBNetwork")
return err
Expand Down Expand Up @@ -166,35 +170,26 @@ func main() {
os.Exit(1)
}

// run migration logic before controllers start
if err := migrate.Migrate(stopCtx, setupLog.WithName("migrate"), directClient); err != nil {
setupLog.Error(err, "failed to run migration logic")
os.Exit(1)
migrationCompletionChan := make(chan struct{})
m := migrate.Migrator{
K8sClient: directClient,
MigrationCh: migrationCompletionChan,
LeaderElection: enableLeaderElection,
Logger: ctrl.Log.WithName("Migrator"),
}

err = setupCRDControllers(stopCtx, directClient, mgr)
err = mgr.Add(&m)
if err != nil {
setupLog.Error(err, "failed to add Migrator to the Manager")
os.Exit(1)
}

upgrade.SetDriverName("ofed")

upgradeLogger := ctrl.Log.WithName("controllers").WithName("Upgrade")

clusterUpdateStateManager, err := upgrade.NewClusterUpgradeStateManager(
upgradeLogger.WithName("clusterUpgradeManager"), config.GetConfigOrDie(), nil)

err = setupCRDControllers(stopCtx, directClient, mgr, migrationCompletionChan)
if err != nil {
setupLog.Error(err, "unable to create new ClusterUpdateStateManager", "controller", "Upgrade")
os.Exit(1)
}

if err = (&controllers.UpgradeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StateManager: clusterUpdateStateManager,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Upgrade")
err = setupUpgradeController(mgr, migrationCompletionChan)
if err != nil {
os.Exit(1)
}

Expand All @@ -221,3 +216,28 @@ func main() {
os.Exit(1)
}
}

func setupUpgradeController(mgr ctrl.Manager, migrationChan chan struct{}) error {
upgrade.SetDriverName("ofed")

upgradeLogger := ctrl.Log.WithName("controllers").WithName("Upgrade")

clusterUpdateStateManager, err := upgrade.NewClusterUpgradeStateManager(
upgradeLogger.WithName("clusterUpgradeManager"), config.GetConfigOrDie(), nil)

if err != nil {
setupLog.Error(err, "unable to create new ClusterUpdateStateManager", "controller", "Upgrade")
return err
}

if err = (&controllers.UpgradeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StateManager: clusterUpdateStateManager,
MigrationCh: migrationChan,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Upgrade")
return err
}
return nil
}
14 changes: 10 additions & 4 deletions manifests/state-ofed-driver/0050_ofed-driver-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@ apiVersion: apps/v1
kind: DaemonSet
metadata:
labels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}
nvidia.com/ofed-driver: ""
name: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-ds
mofed-ds-format-version: "1"
name: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}-ds
namespace: {{ .RuntimeSpec.Namespace }}
spec:
updateStrategy:
type: OnDelete
selector:
matchLabels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}
template:
metadata:
labels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}
kernel: {{ .RuntimeSpec.Kernel }}
nvidia.com/ofed-driver: ""
spec:
priorityClassName: system-node-critical
Expand Down Expand Up @@ -250,6 +252,10 @@ spec:
feature.node.kubernetes.io/pci-15b3.present: "true"
feature.node.kubernetes.io/system-os_release.ID: {{ .RuntimeSpec.OSName }}
feature.node.kubernetes.io/system-os_release.VERSION_ID: "{{ .RuntimeSpec.OSVer }}"
feature.node.kubernetes.io/kernel-version.full: "{{ .RuntimeSpec.Kernel }}"
{{- if .RuntimeSpec.UseDtk }}
feature.node.kubernetes.io/system-os_release.OSTREE_VERSION: "{{ .RuntimeSpec.RhcosVersion }}"
{{- end }}
{{- if .NodeAffinity }}
affinity:
nodeAffinity:
Expand Down
Loading

0 comments on commit 7d70631

Please sign in to comment.