Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support multiple MOFED DS #691

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion controllers/hostdevicenetwork_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers //nolint:dupl

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
Expand All @@ -43,7 +44,8 @@ import (
// HostDeviceNetworkReconciler reconciles a HostDeviceNetwork object
type HostDeviceNetworkReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
MigrationCh chan struct{}

stateManager state.Manager
}
Expand All @@ -59,6 +61,12 @@ type HostDeviceNetworkReconciler struct {
//
//nolint:dupl
func (r *HostDeviceNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling HostDeviceNetwork")

Expand Down
10 changes: 9 additions & 1 deletion controllers/ipoibnetwork_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers //nolint:dupl

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
Expand All @@ -41,7 +42,8 @@ import (
// IPoIBNetworkReconciler reconciles a IPoIBNetwork object
type IPoIBNetworkReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
MigrationCh chan struct{}

stateManager state.Manager
}
Expand All @@ -57,6 +59,12 @@ type IPoIBNetworkReconciler struct {
//
//nolint:dupl
func (r *IPoIBNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling IPoIBNetwork")

Expand Down
12 changes: 10 additions & 2 deletions controllers/macvlannetwork_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers //nolint:dupl

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
Expand All @@ -41,8 +42,9 @@ import (
// MacvlanNetworkReconciler reconciles a MacvlanNetwork object
type MacvlanNetworkReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Log logr.Logger
Scheme *runtime.Scheme
MigrationCh chan struct{}

stateManager state.Manager
}
Expand All @@ -58,6 +60,12 @@ type MacvlanNetworkReconciler struct {
//
//nolint:dupl
func (r *MacvlanNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling MacvlanNetwork")

Expand Down
11 changes: 11 additions & 0 deletions controllers/nicclusterpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type NicClusterPolicyReconciler struct {
Scheme *runtime.Scheme
ClusterTypeProvider clustertype.Provider
StaticConfigProvider staticconfig.Provider
MigrationCh chan struct{}

stateManager state.Manager
}
Expand Down Expand Up @@ -87,6 +88,12 @@ type NicClusterPolicyReconciler struct {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *NicClusterPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.V(consts.LogLevelInfo).Info("Reconciling NicClusterPolicy")

Expand Down Expand Up @@ -179,6 +186,10 @@ func (r *NicClusterPolicyReconciler) handleMOFEDWaitLabels(
_ = r.Client.List(ctx, pods, client.MatchingLabels{"nvidia.com/ofed-driver": ""})
for i := range pods.Items {
pod := pods.Items[i]
if pod.Spec.NodeName == "" {
// In case that Pod is in Pending state
continue
}
labelValue := "true"
// We assume that OFED pod contains only one container to simplify the logic.
// We can revisit this logic in the future if needed
Expand Down
19 changes: 13 additions & 6 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,27 @@ var _ = BeforeSuite(func() {
})
Expect(err).ToNot(HaveOccurred())

migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)

err = (&HostDeviceNetworkReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

err = (&IPoIBNetworkReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

err = (&MacvlanNetworkReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

Expand All @@ -133,6 +139,7 @@ var _ = BeforeSuite(func() {
Scheme: k8sManager.GetScheme(),
ClusterTypeProvider: clusterTypeProvider,
StaticConfigProvider: staticConfigProvider,
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

Expand Down
8 changes: 8 additions & 0 deletions controllers/upgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"
"time"

"github.com/NVIDIA/k8s-operator-libs/pkg/upgrade"
Expand Down Expand Up @@ -47,6 +48,7 @@ type UpgradeReconciler struct {
client.Client
Scheme *runtime.Scheme
StateManager upgrade.ClusterUpgradeStateManager
MigrationCh chan struct{}
}

const plannedRequeueInterval = time.Minute * 2
Expand All @@ -64,6 +66,12 @@ const UpgradeStateAnnotation = "nvidia.com/ofed-upgrade-state"
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *UpgradeReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
Comment on lines +70 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add this to all controllers - even if not strictly needed right now. Might even be worth doing a reconcile-wrapper, but I think this snipped as is in all controllers is good enough for now. Maybe also add a comment to give folks clues as to what we're waiting for at this point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it would be good to make it in a separate commit

reqLogger := log.FromContext(ctx)
reqLogger.V(consts.LogLevelInfo).Info("Reconciling Upgrade")

Expand Down
14 changes: 10 additions & 4 deletions controllers/upgrade_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ var _ = Describe("Upgrade Controller", func() {
})
Context("When NicClusterPolicy CR is created", func() {
It("Upgrade policy is disabled", func() {
migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)
upgradeReconciler := &UpgradeReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
MigrationCh: migrationCompletionChan,
}

req := ctrl.Request{NamespacedName: types.NamespacedName{Name: consts.NicClusterPolicyResourceName}}
Expand All @@ -76,10 +79,13 @@ var _ = Describe("Upgrade Controller", func() {
err := k8sClient.Create(goctx.TODO(), node)
Expect(err).NotTo(HaveOccurred())
}
migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)

upgradeReconciler := &UpgradeReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
MigrationCh: migrationCompletionChan,
}
// Call removeNodeUpgradeStateLabels function
err := upgradeReconciler.removeNodeUpgradeStateLabels(goctx.TODO())
Expand Down
74 changes: 47 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func setupWebhookControllers(mgr ctrl.Manager) error {
return nil
}

func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager) error {
func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager, migrationChan chan struct{}) error {
ctrLog := setupLog.WithName("controller")
clusterTypeProvider, err := clustertype.NewProvider(ctx, c)

Expand All @@ -98,27 +98,31 @@ func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager)
Scheme: mgr.GetScheme(),
ClusterTypeProvider: clusterTypeProvider, // we want to cache information about the cluster type
StaticConfigProvider: staticInfoProvider,
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("NicClusterPolicy")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NicClusterPolicy")
return err
}
if err := (&controllers.MacvlanNetworkReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("MacVlanNetwork")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MacvlanNetwork")
return err
}
if err := (&controllers.HostDeviceNetworkReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("HostDeviceNetwork")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HostDeviceNetwork")
return err
}
if err := (&controllers.IPoIBNetworkReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("IPoIBNetwork")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "IPoIBNetwork")
return err
Expand Down Expand Up @@ -166,35 +170,26 @@ func main() {
os.Exit(1)
}

// run migration logic before controllers start
if err := migrate.Migrate(stopCtx, setupLog.WithName("migrate"), directClient); err != nil {
setupLog.Error(err, "failed to run migration logic")
os.Exit(1)
migrationCompletionChan := make(chan struct{})
m := migrate.Migrator{
K8sClient: directClient,
MigrationCh: migrationCompletionChan,
LeaderElection: enableLeaderElection,
Logger: ctrl.Log.WithName("Migrator"),
}
Comment on lines -170 to 179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for moving this logic to a runnable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to use the leader election mechanism and make sure it runs only when we get the leader election by using mgr.Add(&m) which requires a runnable.
Otherwise old instance of operator can recreate the DS after the new one deleted.
In Helm, we have a pre-hook that scale down the deployment, but we won't have it in OpenShift

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to deprecate the helm hooks in favor of this approach?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one of thing we would like is to get rid of creating the instance of NicClusterPolicy as part of the helm before we go and remove this hook.

The process should be:

  • upgrade operator
  • wait for operator to be running
  • update NCP with new values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to get this prioritized IMO - makes a lot of things simpler for deploying network operator.


err = setupCRDControllers(stopCtx, directClient, mgr)
err = mgr.Add(&m)
if err != nil {
setupLog.Error(err, "failed to add Migrator to the Manager")
os.Exit(1)
}

upgrade.SetDriverName("ofed")

upgradeLogger := ctrl.Log.WithName("controllers").WithName("Upgrade")

clusterUpdateStateManager, err := upgrade.NewClusterUpgradeStateManager(
upgradeLogger.WithName("clusterUpgradeManager"), config.GetConfigOrDie(), nil)

err = setupCRDControllers(stopCtx, directClient, mgr, migrationCompletionChan)
if err != nil {
setupLog.Error(err, "unable to create new ClusterUpdateStateManager", "controller", "Upgrade")
os.Exit(1)
}

if err = (&controllers.UpgradeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StateManager: clusterUpdateStateManager,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Upgrade")
err = setupUpgradeController(mgr, migrationCompletionChan)
if err != nil {
os.Exit(1)
}

Expand All @@ -221,3 +216,28 @@ func main() {
os.Exit(1)
}
}

func setupUpgradeController(mgr ctrl.Manager, migrationChan chan struct{}) error {
upgrade.SetDriverName("ofed")

upgradeLogger := ctrl.Log.WithName("controllers").WithName("Upgrade")

clusterUpdateStateManager, err := upgrade.NewClusterUpgradeStateManager(
upgradeLogger.WithName("clusterUpgradeManager"), config.GetConfigOrDie(), nil)

if err != nil {
setupLog.Error(err, "unable to create new ClusterUpdateStateManager", "controller", "Upgrade")
return err
}

if err = (&controllers.UpgradeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StateManager: clusterUpdateStateManager,
MigrationCh: migrationChan,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Upgrade")
return err
}
return nil
}
14 changes: 10 additions & 4 deletions manifests/state-ofed-driver/0050_ofed-driver-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@ apiVersion: apps/v1
kind: DaemonSet
metadata:
labels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}
nvidia.com/ofed-driver: ""
name: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-ds
mofed-ds-format-version: "1"
name: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}-ds
namespace: {{ .RuntimeSpec.Namespace }}
spec:
updateStrategy:
type: OnDelete
selector:
matchLabels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}
template:
metadata:
labels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}
kernel: {{ .RuntimeSpec.Kernel }}
nvidia.com/ofed-driver: ""
spec:
priorityClassName: system-node-critical
Expand Down Expand Up @@ -250,6 +252,10 @@ spec:
feature.node.kubernetes.io/pci-15b3.present: "true"
feature.node.kubernetes.io/system-os_release.ID: {{ .RuntimeSpec.OSName }}
feature.node.kubernetes.io/system-os_release.VERSION_ID: "{{ .RuntimeSpec.OSVer }}"
feature.node.kubernetes.io/kernel-version.full: "{{ .RuntimeSpec.Kernel }}"
{{- if .RuntimeSpec.UseDtk }}
feature.node.kubernetes.io/system-os_release.OSTREE_VERSION: "{{ .RuntimeSpec.RhcosVersion }}"
{{- end }}
{{- if .NodeAffinity }}
affinity:
nodeAffinity:
Expand Down
Loading
Loading