From e17b5845518f1550ab2619b8463fc9b418928780 Mon Sep 17 00:00:00 2001 From: Sebastian Sch Date: Mon, 9 Sep 2024 12:24:40 +0300 Subject: [PATCH] WIP Signed-off-by: Sebastian Sch --- Dockerfile | 1 + Dockerfile.rhel7 | 16 + Dockerfile.sriov-network-config-daemon | 2 +- Dockerfile.sriov-network-config-daemon.rhel7 | 16 + Dockerfile.webhook.rhel7 | 13 + bindata/manifests/daemon/daemonset.yaml | 16 +- .../manifests/operator-webhook/server.yaml | 1 + .../plugins/sriov-device-plugin.yaml | 1 + bindata/manifests/webhook/server.yaml | 1 + cmd/sriov-network-config-daemon/start.go | 300 ++++--- controllers/drain_controller_test.go | 2 +- .../sriovoperatorconfig_controller_test.go | 1 + deploy/operator.yaml | 11 + go.mod | 2 +- hack/build-go.sh | 4 +- main.go | 5 +- pkg/daemon/config.go | 61 ++ pkg/daemon/config_test.go | 170 ++++ pkg/daemon/daemon.go | 817 ++++++++---------- pkg/daemon/daemon_suite_test.go | 135 +++ pkg/daemon/daemon_test.go | 451 ++++------ pkg/daemon/event_recorder.go | 10 +- pkg/daemon/status.go | 89 ++ pkg/daemon/writer.go | 282 ------ pkg/host/internal/network/network.go | 2 +- pkg/log/log.go | 4 + pkg/plugins/mellanox/mellanox_plugin.go | 2 +- pkg/vars/vars.go | 12 +- test/conformance/tests/test_sriov_operator.go | 2 +- 29 files changed, 1271 insertions(+), 1158 deletions(-) create mode 100644 Dockerfile.rhel7 create mode 100644 Dockerfile.sriov-network-config-daemon.rhel7 create mode 100644 Dockerfile.webhook.rhel7 create mode 100644 pkg/daemon/config.go create mode 100644 pkg/daemon/config_test.go create mode 100644 pkg/daemon/daemon_suite_test.go create mode 100644 pkg/daemon/status.go delete mode 100644 pkg/daemon/writer.go diff --git a/Dockerfile b/Dockerfile index 2b26247e80..8be58533b4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,6 +4,7 @@ COPY . . RUN make _build-manager BIN_PATH=build/_output/cmd FROM quay.io/centos/centos:stream9 +RUN yum -y install delve procps-ng COPY --from=builder /go/src/github.com/k8snetworkplumbingwg/sriov-network-operator/build/_output/cmd/manager /usr/bin/sriov-network-operator COPY bindata /bindata ENV OPERATOR_NAME=sriov-network-operator diff --git a/Dockerfile.rhel7 b/Dockerfile.rhel7 new file mode 100644 index 0000000000..4b4762c0e5 --- /dev/null +++ b/Dockerfile.rhel7 @@ -0,0 +1,16 @@ +FROM registry.ci.openshift.org/ocp/builder:rhel-9-golang-1.22-openshift-4.17 AS builder +WORKDIR /go/src/github.com/k8snetworkplumbingwg/sriov-network-operator +COPY . . +RUN make _build-manager BIN_PATH=build/_output/cmd + +FROM registry.ci.openshift.org/ocp/4.17:base-rhel9 +LABEL io.k8s.display-name="OpenShift sriov-network-operator" \ + io.k8s.description="This is the component that manange and config sriov component in Openshift cluster" \ + io.openshift.tags="openshift,networking,sr-iov" \ + com.redhat.delivery.appregistry=true \ + maintainer="Multus team " +COPY --from=builder /go/src/github.com/k8snetworkplumbingwg/sriov-network-operator/build/_output/cmd/manager /usr/bin/sriov-network-operator +COPY bindata /bindata +ENV OPERATOR_NAME=sriov-network-operator +ENV CLUSTER_TYPE=openshift +CMD ["/usr/bin/sriov-network-operator"] \ No newline at end of file diff --git a/Dockerfile.sriov-network-config-daemon b/Dockerfile.sriov-network-config-daemon index 35533448fe..44a61a8d87 100644 --- a/Dockerfile.sriov-network-config-daemon +++ b/Dockerfile.sriov-network-config-daemon @@ -7,7 +7,7 @@ FROM quay.io/centos/centos:stream9 ARG MSTFLINT=mstflint # We have to ensure that pciutils is installed. This package is needed for mstfwreset to succeed. # xref pkg/vendors/mellanox/mellanox.go#L150 -RUN ARCH_DEP_PKGS=$(if [ "$(uname -m)" != "s390x" ]; then echo -n ${MSTFLINT} ; fi) && yum -y install hwdata pciutils $ARCH_DEP_PKGS && yum clean all +RUN ARCH_DEP_PKGS=$(if [ "$(uname -m)" != "s390x" ]; then echo -n ${MSTFLINT} ; fi) && yum -y install delve procps-ng hwdata pciutils $ARCH_DEP_PKGS && yum clean all LABEL io.k8s.display-name="sriov-network-config-daemon" \ io.k8s.description="This is a daemon that manage and config sriov network devices in Kubernetes cluster" COPY --from=builder /go/src/github.com/k8snetworkplumbingwg/sriov-network-operator/build/_output/cmd/sriov-network-config-daemon /usr/bin/ diff --git a/Dockerfile.sriov-network-config-daemon.rhel7 b/Dockerfile.sriov-network-config-daemon.rhel7 new file mode 100644 index 0000000000..bdf9b11874 --- /dev/null +++ b/Dockerfile.sriov-network-config-daemon.rhel7 @@ -0,0 +1,16 @@ +FROM registry.ci.openshift.org/ocp/builder:rhel-9-golang-1.22-openshift-4.17 AS builder +WORKDIR /go/src/github.com/k8snetworkplumbingwg/sriov-network-operator +COPY . . +RUN make _build-sriov-network-config-daemon BIN_PATH=build/_output/cmd + +FROM registry.ci.openshift.org/ocp/4.17:base-rhel9 +RUN yum -y update && ARCH_DEP_PKGS=$(if [ "$(uname -m)" != "s390x" ]; then echo -n mstflint ; fi) && yum -y install pciutils hwdata $ARCH_DEP_PKGS && yum clean all +LABEL io.k8s.display-name="OpenShift sriov-network-config-daemon" \ + io.k8s.description="This is a daemon that manage and config sriov network devices in Openshift cluster" \ + io.openshift.tags="openshift,networking,sr-iov" \ + maintainer="Multus team " +COPY --from=builder /go/src/github.com/k8snetworkplumbingwg/sriov-network-operator/build/_output/cmd/sriov-network-config-daemon /usr/bin/ +COPY bindata /bindata +ENV PLUGINSPATH=/plugins +ENV CLUSTER_TYPE=openshift +CMD ["/usr/bin/sriov-network-config-daemon"] \ No newline at end of file diff --git a/Dockerfile.webhook.rhel7 b/Dockerfile.webhook.rhel7 new file mode 100644 index 0000000000..3bb1a40003 --- /dev/null +++ b/Dockerfile.webhook.rhel7 @@ -0,0 +1,13 @@ +FROM registry.ci.openshift.org/ocp/builder:rhel-9-golang-1.22-openshift-4.17 AS builder +WORKDIR /go/src/github.com/k8snetworkplumbingwg/sriov-network-operator +COPY . . +RUN make _build-webhook BIN_PATH=build/_output/cmd + +FROM registry.ci.openshift.org/ocp/4.17:base-rhel9 +LABEL io.k8s.display-name="OpenShift sriov-network-webhook" \ + io.k8s.description="This is an admission controller webhook that mutates and validates customer resources of sriov network operator." \ + io.openshift.tags="openshift,networking,sr-iov" \ + maintainer="Multus team " +COPY --from=builder /go/src/github.com/k8snetworkplumbingwg/sriov-network-operator/build/_output/cmd/webhook /usr/bin/webhook +ENV CLUSTER_TYPE=openshift +CMD ["/usr/bin/webhook"] \ No newline at end of file diff --git a/bindata/manifests/daemon/daemonset.yaml b/bindata/manifests/daemon/daemonset.yaml index c9b1515200..29abb865e3 100644 --- a/bindata/manifests/daemon/daemonset.yaml +++ b/bindata/manifests/daemon/daemonset.yaml @@ -130,10 +130,22 @@ spec: containers: - name: sriov-network-config-daemon image: {{.Image}} - command: - - sriov-network-config-daemon + imagePullPolicy: Always +# command: +# - dlv +# - --listen=:2345 +# - --headless=true +# - --api-version=2 +# - --accept-multiclient +# - --log +# - exec +# - /usr/bin/sriov-network-config-daemon +# - -- +# - start securityContext: privileged: true + command: + - sriov-network-config-daemon args: - "start" {{- if .UsedSystemdMode}} diff --git a/bindata/manifests/operator-webhook/server.yaml b/bindata/manifests/operator-webhook/server.yaml index 29e3696801..1fbdbfd49e 100644 --- a/bindata/manifests/operator-webhook/server.yaml +++ b/bindata/manifests/operator-webhook/server.yaml @@ -64,6 +64,7 @@ spec: containers: - name: webhook-server image: {{.SriovNetworkWebhookImage}} + imagePullPolicy: Always command: - webhook args: diff --git a/bindata/manifests/plugins/sriov-device-plugin.yaml b/bindata/manifests/plugins/sriov-device-plugin.yaml index a0f433a063..3938167135 100644 --- a/bindata/manifests/plugins/sriov-device-plugin.yaml +++ b/bindata/manifests/plugins/sriov-device-plugin.yaml @@ -42,6 +42,7 @@ spec: containers: - name: sriov-device-plugin image: {{.SRIOVDevicePluginImage}} + imagePullPolicy: Always args: - --log-level=10 - --resource-prefix={{.ResourcePrefix}} diff --git a/bindata/manifests/webhook/server.yaml b/bindata/manifests/webhook/server.yaml index 95e020671c..0db3dc5f5d 100644 --- a/bindata/manifests/webhook/server.yaml +++ b/bindata/manifests/webhook/server.yaml @@ -67,6 +67,7 @@ spec: containers: - name: webhook-server image: {{.NetworkResourcesInjectorImage}} + imagePullPolicy: Always command: - webhook args: diff --git a/cmd/sriov-network-config-daemon/start.go b/cmd/sriov-network-config-daemon/start.go index b1ad0f6671..394fbb6309 100644 --- a/cmd/sriov-network-config-daemon/start.go +++ b/cmd/sriov-network-config-daemon/start.go @@ -18,25 +18,27 @@ package main import ( "context" "fmt" - "net" "net/url" "os" "strings" "time" + ocpconfigapi "github.com/openshift/api/config/v1" "github.com/spf13/cobra" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/connrotation" - "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - - configv1 "github.com/openshift/api/config/v1" - mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" @@ -89,6 +91,8 @@ var ( manageSoftwareBridges bool ovsSocketPath string } + + scheme = runtime.NewScheme() ) func init() { @@ -100,13 +104,17 @@ func init() { startCmd.PersistentFlags().BoolVar(&startOpts.parallelNicConfig, "parallel-nic-config", false, "perform NIC configuration in parallel") startCmd.PersistentFlags().BoolVar(&startOpts.manageSoftwareBridges, "manage-software-bridges", false, "enable management of software bridges") startCmd.PersistentFlags().StringVar(&startOpts.ovsSocketPath, "ovs-socket-path", vars.OVSDBSocketPath, "path for OVSDB socket") -} -func runStartCmd(cmd *cobra.Command, args []string) error { - // init logger + // Init Scheme + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(sriovnetworkv1.AddToScheme(scheme)) + utilruntime.Must(ocpconfigapi.AddToScheme(scheme)) + + // Init logger snolog.InitLog() - setupLog := log.Log.WithName("sriov-network-config-daemon") +} +func configGlobalVariables() error { // Mark that we are running inside a container vars.UsingSystemdMode = false if startOpts.systemd { @@ -132,102 +140,112 @@ func runStartCmd(cmd *cobra.Command, args []string) error { } } - // This channel is used to ensure all spawned goroutines exit when we exit. - stopCh := make(chan struct{}) - defer close(stopCh) + vars.Scheme = scheme - // This channel is used to signal Run() something failed and to jump ship. - // It's purely a chan<- in the Daemon struct for goroutines to write to, and - // a <-chan in Run() for the main thread to listen on. - exitCh := make(chan error) - defer close(exitCh) - - // This channel is to make sure main thread will wait until the writer finish - // to report lastSyncError in SriovNetworkNodeState object. - syncCh := make(chan struct{}) - defer close(syncCh) + return nil +} - refreshCh := make(chan daemon.Message) - defer close(refreshCh) +func UseKubeletKubeConfig() { + fnLogger := log.Log.WithName("sriov-network-config-daemon") - var config *rest.Config - var err error - - // On openshift we use the kubeconfig from kubelet on the node where the daemon is running - // this allow us to improve security as every daemon has access only to its own node - if vars.ClusterType == consts.ClusterTypeOpenshift { - kubeconfig, err := clientcmd.LoadFromFile("/host/etc/kubernetes/kubeconfig") - if err != nil { - setupLog.Error(err, "failed to load kubelet kubeconfig") - } - clusterName := kubeconfig.Contexts[kubeconfig.CurrentContext].Cluster - apiURL := kubeconfig.Clusters[clusterName].Server + kubeconfig, err := clientcmd.LoadFromFile("/host/etc/kubernetes/kubeconfig") + if err != nil { + fnLogger.Error(err, "failed to load kubelet kubeconfig") + } + clusterName := kubeconfig.Contexts[kubeconfig.CurrentContext].Cluster + apiURL := kubeconfig.Clusters[clusterName].Server - urlPath, err := url.Parse(apiURL) - if err != nil { - setupLog.Error(err, "failed to parse api url from kubelet kubeconfig") - } + urlPath, err := url.Parse(apiURL) + if err != nil { + fnLogger.Error(err, "failed to parse api url from kubelet kubeconfig") + } - // The kubernetes in-cluster functions don't let you override the apiserver - // directly; gotta "pass" it via environment vars. - setupLog.V(0).Info("overriding kubernetes api", "new-url", apiURL) - err = os.Setenv("KUBERNETES_SERVICE_HOST", urlPath.Hostname()) - if err != nil { - setupLog.Error(err, "failed to set KUBERNETES_SERVICE_HOST environment variable") - } - err = os.Setenv("KUBERNETES_SERVICE_PORT", urlPath.Port()) - if err != nil { - setupLog.Error(err, "failed to set KUBERNETES_SERVICE_PORT environment variable") - } + // The kubernetes in-cluster functions don't let you override the apiserver + // directly; gotta "pass" it via environment vars. + fnLogger.V(0).Info("overriding kubernetes api", "new-url", apiURL) + err = os.Setenv("KUBERNETES_SERVICE_HOST", urlPath.Hostname()) + if err != nil { + fnLogger.Error(err, "failed to set KUBERNETES_SERVICE_HOST environment variable") } + err = os.Setenv("KUBERNETES_SERVICE_PORT", urlPath.Port()) + if err != nil { + fnLogger.Error(err, "failed to set KUBERNETES_SERVICE_PORT environment variable") + } +} - kubeconfig := os.Getenv("KUBECONFIG") - if kubeconfig != "" { - config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) - } else { - // creates the in-cluster config - config, err = rest.InClusterConfig() +func getOperatorConfig(kClient runtimeclient.Client) (*sriovnetworkv1.SriovOperatorConfig, error) { + // Init feature gates once to prevent race conditions. + defaultConfig := &sriovnetworkv1.SriovOperatorConfig{} + err := kClient.Get(context.Background(), types.NamespacedName{Namespace: vars.Namespace, Name: consts.DefaultConfigName}, defaultConfig) + if err != nil { + return nil, err } + return defaultConfig, nil +} +func initFeatureGates(kClient runtimeclient.Client) (featuregate.FeatureGate, error) { + fnLogger := log.Log.WithName("initFeatureGates") + // Init feature gates once to prevent race conditions. + defaultConfig, err := getOperatorConfig(kClient) if err != nil { - return err + fnLogger.Error(err, "Failed to get default SriovOperatorConfig object") + return nil, err } + featureGates := featuregate.New() + featureGates.Init(defaultConfig.Spec.FeatureGates) + fnLogger.Info("Enabled featureGates", "featureGates", featureGates.String()) - vars.Config = config - vars.Scheme = scheme.Scheme + return featureGates, nil +} - closeAllConns, err := updateDialer(config) +func initLogLevel(kClient runtimeclient.Client) error { + fnLogger := log.Log.WithName("initLogLevel") + // Init feature gates once to prevent race conditions. + defaultConfig, err := getOperatorConfig(kClient) if err != nil { + fnLogger.Error(err, "Failed to get default SriovOperatorConfig object") return err } + fnLogger.V(2).Info("DEBUG", defaultConfig) + snolog.SetLogLevel(defaultConfig.Spec.LogLevel) + fnLogger.V(2).Info("logLevel sets", "logLevel", defaultConfig.Spec.LogLevel) + return nil +} + +func runStartCmd(cmd *cobra.Command, args []string) error { + setupLog := log.Log.WithName("sriov-network-config-daemon") + stopSignalCh := ctrl.SetupSignalHandler() - err = sriovnetworkv1.AddToScheme(scheme.Scheme) + // Load global variables + err := configGlobalVariables() if err != nil { - setupLog.Error(err, "failed to load sriov network CRDs to scheme") + setupLog.Error(err, "unable to config global variables") return err } - err = mcfgv1.AddToScheme(scheme.Scheme) - if err != nil { - setupLog.Error(err, "failed to load machine config CRDs to scheme") - return err + var config *rest.Config + + // On openshift we use the kubeconfig from kubelet on the node where the daemon is running + // this allow us to improve security as every daemon has access only to its own node + if vars.ClusterType == consts.ClusterTypeOpenshift { + UseKubeletKubeConfig() } - err = configv1.Install(scheme.Scheme) - if err != nil { - setupLog.Error(err, "failed to load openshift config CRDs to scheme") - return err + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig != "" { + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } else { + // creates the in-cluster config + config, err = rest.InClusterConfig() } - kClient, err := client.New(config, client.Options{Scheme: scheme.Scheme}) if err != nil { - setupLog.Error(err, "couldn't create client") - os.Exit(1) + return err } + vars.Config = config + config.Timeout = 5 * time.Second - snclient := snclientset.NewForConfigOrDie(config) - kubeclient := kubernetes.NewForConfigOrDie(config) - + // create helpers hostHelpers, err := helper.NewDefaultHostHelpers() if err != nil { setupLog.Error(err, "failed to create hostHelpers") @@ -240,88 +258,108 @@ func runStartCmd(cmd *cobra.Command, args []string) error { return err } - config.Timeout = 5 * time.Second - writerclient := snclientset.NewForConfigOrDie(config) + // create clients + snclient := snclientset.NewForConfigOrDie(config) + kubeclient := kubernetes.NewForConfigOrDie(config) + kClient, err := runtimeclient.New( + config, + runtimeclient.Options{ + Scheme: vars.Scheme}) + if err != nil { + setupLog.Error(err, "couldn't create generic client") + os.Exit(1) + } - eventRecorder := daemon.NewEventRecorder(writerclient, kubeclient) + eventRecorder := daemon.NewEventRecorder(snclient, kubeclient, scheme) defer eventRecorder.Shutdown() - setupLog.V(0).Info("starting node writer") - nodeWriter := daemon.NewNodeStateStatusWriter(writerclient, - closeAllConns, - eventRecorder, - hostHelpers, - platformHelper) - nodeInfo, err := kubeclient.CoreV1().Nodes().Get(context.Background(), startOpts.nodeName, v1.GetOptions{}) - if err == nil { - for key, pType := range vars.PlatformsMap { - if strings.Contains(strings.ToLower(nodeInfo.Spec.ProviderID), strings.ToLower(key)) { - vars.PlatformType = pType - } - } - } else { + if err != nil { setupLog.Error(err, "failed to fetch node state, exiting", "node-name", startOpts.nodeName) return err } + + // check for platform + for key, pType := range vars.PlatformsMap { + if strings.Contains(strings.ToLower(nodeInfo.Spec.ProviderID), strings.ToLower(key)) { + vars.PlatformType = pType + } + } setupLog.Info("Running on", "platform", vars.PlatformType.String()) + // Initial supported nic IDs if err := sriovnetworkv1.InitNicIDMapFromConfigMap(kubeclient, vars.Namespace); err != nil { setupLog.Error(err, "failed to run init NicIdMap") return err } - eventRecorder.SendEvent("ConfigDaemonStart", "Config Daemon starting") + eventRecorder.SendEvent(stopSignalCh, "ConfigDaemonStart", "Config Daemon starting") - // block the deamon process until nodeWriter finish first its run - err = nodeWriter.RunOnce() + fg, err := initFeatureGates(kClient) if err != nil { - setupLog.Error(err, "failed to run writer") + setupLog.Error(err, "failed to initialize feature gates") return err } - go nodeWriter.Run(stopCh, refreshCh, syncCh) - // Init feature gates once to prevent race conditions. - defaultConfig := &sriovnetworkv1.SriovOperatorConfig{} - err = kClient.Get(context.Background(), types.NamespacedName{Namespace: vars.Namespace, Name: consts.DefaultConfigName}, defaultConfig) + if err := initLogLevel(kClient); err != nil { + setupLog.Error(err, "failed to initialize log level") + return err + } + + // Init manager + setupLog.V(0).Info("Starting SR-IOV Network Config Daemon") + nodeStateSelector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s,metadata.namespace=%s", vars.NodeName, vars.Namespace)) if err != nil { - log.Log.Error(err, "Failed to get default SriovOperatorConfig object") + setupLog.Error(err, "failed to parse sriovNetworkNodeState name selector") + return err + } + operatorConfigSelector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s,metadata.namespace=%s", consts.DefaultConfigName, vars.Namespace)) + if err != nil { + setupLog.Error(err, "failed to parse sriovOperatorConfig name selector") return err } - featureGates := featuregate.New() - featureGates.Init(defaultConfig.Spec.FeatureGates) - vars.MlxPluginFwReset = featureGates.IsEnabled(consts.MellanoxFirmwareResetFeatureGate) - log.Log.Info("Enabled featureGates", "featureGates", featureGates.String()) - setupLog.V(0).Info("Starting SriovNetworkConfigDaemon") - err = daemon.New( + mgr, err := ctrl.NewManager(vars.Config, ctrl.Options{ + Scheme: vars.Scheme, + Metrics: server.Options{BindAddress: "0"}, // disable metrics server for now as the daemon runs with hostNetwork + Cache: cache.Options{ // cache only the SriovNetworkNodeState with the node name + ByObject: map[runtimeclient.Object]cache.ByObject{ + &sriovnetworkv1.SriovNetworkNodeState{}: {Field: nodeStateSelector}, + &sriovnetworkv1.SriovOperatorConfig{}: {Field: operatorConfigSelector}}}, + }) + if err != nil { + setupLog.Error(err, "unable to create manager") + os.Exit(1) + } + + dm := daemon.New( kClient, snclient, kubeclient, hostHelpers, platformHelper, - exitCh, - stopCh, - syncCh, - refreshCh, eventRecorder, - featureGates, - startOpts.disabledPlugins, - ).Run(stopCh, exitCh) - if err != nil { - setupLog.Error(err, "failed to run daemon") + fg, + startOpts.disabledPlugins) + + // Init Daemon configuration on the node + if err = dm.DaemonInitialization(); err != nil { + setupLog.Error(err, "unable to initialize daemon") + os.Exit(1) + } + + // Setup reconcile loop with manager + if err = dm.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create setup daemon manager for SriovNetworkNodeState") + os.Exit(1) } - setupLog.V(0).Info("Shutting down SriovNetworkConfigDaemon") - return err -} -// updateDialer instruments a restconfig with a dial. the returned function allows forcefully closing all active connections. -func updateDialer(clientConfig *rest.Config) (func(), error) { - if clientConfig.Transport != nil || clientConfig.Dial != nil { - return nil, fmt.Errorf("there is already a transport or dialer configured") + // Setup reconcile loop with manager + if err = daemon.NewOperatorConfigReconcile(kClient).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create setup daemon manager for OperatorConfig") + os.Exit(1) } - f := &net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second} - d := connrotation.NewDialer(f.DialContext) - clientConfig.Dial = d.DialContext - return d.CloseAll, nil + + setupLog.Info("Starting Manager") + return mgr.Start(stopSignalCh) } diff --git a/controllers/drain_controller_test.go b/controllers/drain_controller_test.go index 73062c0efd..b11c299101 100644 --- a/controllers/drain_controller_test.go +++ b/controllers/drain_controller_test.go @@ -520,7 +520,7 @@ func createNodeWithLabel(ctx context.Context, nodeName string, label string) (*c ObjectMeta: metav1.ObjectMeta{ Name: nodeName, Namespace: vars.Namespace, - Labels: map[string]string{ + Annotations: map[string]string{ constants.NodeStateDrainAnnotationCurrent: constants.DrainIdle, }, }, diff --git a/controllers/sriovoperatorconfig_controller_test.go b/controllers/sriovoperatorconfig_controller_test.go index 8bd2830844..1fa01ff4cf 100644 --- a/controllers/sriovoperatorconfig_controller_test.go +++ b/controllers/sriovoperatorconfig_controller_test.go @@ -12,6 +12,7 @@ import ( corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" diff --git a/deploy/operator.yaml b/deploy/operator.yaml index e9fb25de37..d3c5a11bf8 100644 --- a/deploy/operator.yaml +++ b/deploy/operator.yaml @@ -45,6 +45,17 @@ spec: - sriov-network-operator args: - --leader-elect=$OPERATOR_LEADER_ELECTION_ENABLE +# command: +# - dlv +# - --listen=:2345 +# - --headless=true +# - --api-version=2 +# - --accept-multiclient +# - --log +# - exec +# - /usr/bin/sriov-network-operator +# - -- +# - --leader-elect=$OPERATOR_LEADER_ELECTION_ENABLE resources: requests: cpu: 100m diff --git a/go.mod b/go.mod index 0353c7ec18..9aa49db521 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,6 @@ require ( github.com/vishvananda/netlink v1.2.1-beta.2.0.20240221172127-ec7bcb248e94 github.com/vishvananda/netns v0.0.4 go.uber.org/zap v1.25.0 - golang.org/x/time v0.3.0 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.28.3 @@ -150,6 +149,7 @@ require ( golang.org/x/sys v0.20.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.14.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/hack/build-go.sh b/hack/build-go.sh index bb1cead414..b1eeb50b99 100755 --- a/hack/build-go.sh +++ b/hack/build-go.sh @@ -37,8 +37,8 @@ CGO_ENABLED=${CGO_ENABLED:-0} if [[ ${WHAT} == "manager" ]]; then echo "Building ${WHAT} (${VERSION_OVERRIDE})" -CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build ${GOFLAGS} -ldflags "${GLDFLAGS} -s -w" -o ${BIN_PATH}/${WHAT} main.go +CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build ${GOFLAGS} -gcflags "all=-N -l" -ldflags "${GLDFLAGS}" -o ${BIN_PATH}/${WHAT} main.go else echo "Building ${REPO}/cmd/${WHAT} (${VERSION_OVERRIDE})" -CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build ${GOFLAGS} -ldflags "${GLDFLAGS} -s -w" -o ${BIN_PATH}/${WHAT} ${REPO}/cmd/${WHAT} +CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build ${GOFLAGS} -gcflags "all=-N -l" -ldflags "${GLDFLAGS}" -o ${BIN_PATH}/${WHAT} ${REPO}/cmd/${WHAT} fi diff --git a/main.go b/main.go index 7195c59c36..d7bf589abc 100644 --- a/main.go +++ b/main.go @@ -114,7 +114,7 @@ func main() { LeaderElectionID: consts.LeaderElectionID, }) if err != nil { - setupLog.Error(err, "unable to start leader election manager") + setupLog.Error(err, "unable to create leader election manager") os.Exit(1) } @@ -134,7 +134,7 @@ func main() { Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{vars.Namespace: {}}}, }) if err != nil { - setupLog.Error(err, "unable to start manager") + setupLog.Error(err, "unable to create manager") os.Exit(1) } @@ -166,7 +166,6 @@ func main() { err = mgrGlobal.GetCache().IndexField(context.Background(), &sriovnetworkv1.OVSNetwork{}, "spec.networkNamespace", func(o client.Object) []string { return []string{o.(*sriovnetworkv1.OVSNetwork).Spec.NetworkNamespace} }) - if err != nil { setupLog.Error(err, "unable to create index field for cache") os.Exit(1) diff --git a/pkg/daemon/config.go b/pkg/daemon/config.go new file mode 100644 index 0000000000..ddeba84dbe --- /dev/null +++ b/pkg/daemon/config.go @@ -0,0 +1,61 @@ +package daemon + +import ( + "context" + "reflect" + + "k8s.io/apimachinery/pkg/api/errors" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" +) + +type OperatorConfigReconcile struct { + client client.Client + latestFeatureGates map[string]bool +} + +func NewOperatorConfigReconcile(client client.Client) *OperatorConfigReconcile { + return &OperatorConfigReconcile{client: client, latestFeatureGates: make(map[string]bool)} +} + +func (oc *OperatorConfigReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + reqLogger := log.FromContext(ctx).WithName("Reconcile") + operatorConfig := &sriovnetworkv1.SriovOperatorConfig{} + err := oc.client.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, operatorConfig) + if err != nil { + if errors.IsNotFound(err) { + reqLogger.Info("OperatorConfig doesn't exist", "name", req.Name, "namespace", req.Namespace) + return ctrl.Result{}, nil + } + reqLogger.Error(err, "Failed to operatorConfig", "name", req.Name, "namespace", req.Namespace) + return ctrl.Result{}, err + } + + // update log level + snolog.SetLogLevel(operatorConfig.Spec.LogLevel) + + newDisableDrain := operatorConfig.Spec.DisableDrain + if vars.DisableDrain != newDisableDrain { + vars.DisableDrain = newDisableDrain + log.Log.Info("Set Disable Drain", "value", vars.DisableDrain) + } + + if !reflect.DeepEqual(oc.latestFeatureGates, operatorConfig.Spec.FeatureGates) { + vars.FeatureGate.Init(operatorConfig.Spec.FeatureGates) + oc.latestFeatureGates = operatorConfig.Spec.FeatureGates + log.Log.Info("Updated featureGates", "featureGates", vars.FeatureGate.String()) + } + + return ctrl.Result{}, nil +} + +func (oc *OperatorConfigReconcile) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&sriovnetworkv1.SriovOperatorConfig{}). + Complete(oc) +} diff --git a/pkg/daemon/config_test.go b/pkg/daemon/config_test.go new file mode 100644 index 0000000000..ad84cd5bfc --- /dev/null +++ b/pkg/daemon/config_test.go @@ -0,0 +1,170 @@ +package daemon_test + +import ( + "context" + "sync" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/daemon" + snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" +) + +var _ = Describe("Daemon OperatorConfig Controller", Ordered, func() { + var cancel context.CancelFunc + var ctx context.Context + + BeforeAll(func() { + By("Setup controller manager") + k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{ + Scheme: scheme.Scheme, + }) + Expect(err).ToNot(HaveOccurred()) + + configController := daemon.NewOperatorConfigReconcile(k8sClient) + err = configController.SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + + ctx, cancel = context.WithCancel(context.Background()) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + defer GinkgoRecover() + By("Start controller manager") + err := k8sManager.Start(ctx) + Expect(err).ToNot(HaveOccurred()) + }() + + DeferCleanup(func() { + By("Shutdown controller manager") + cancel() + wg.Wait() + }) + + err = k8sClient.Create(ctx, &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Name: "default", Namespace: "default"}}) + Expect(err).ToNot(HaveOccurred()) + }) + + BeforeEach(func() { + Expect(k8sClient.DeleteAllOf(context.Background(), &sriovnetworkv1.SriovOperatorConfig{}, client.InNamespace(testNamespace))).ToNot(HaveOccurred()) + }) + + Context("LogLevel", func() { + It("should configure the log level base on sriovOperatorConfig", func() { + soc := &sriovnetworkv1.SriovOperatorConfig{ObjectMeta: metav1.ObjectMeta{ + Name: consts.DefaultConfigName, + Namespace: testNamespace, + }, + Spec: sriovnetworkv1.SriovOperatorConfigSpec{ + LogLevel: 1, + }, + } + + err := k8sClient.Create(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + validateExpectedLogLevel(1) + + }) + + It("should update the log level in runtime", func() { + soc := &sriovnetworkv1.SriovOperatorConfig{ObjectMeta: metav1.ObjectMeta{ + Name: consts.DefaultConfigName, + Namespace: testNamespace, + }, + Spec: sriovnetworkv1.SriovOperatorConfigSpec{ + LogLevel: 1, + }, + } + + err := k8sClient.Create(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + validateExpectedLogLevel(1) + + soc.Spec.LogLevel = 2 + err = k8sClient.Update(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + validateExpectedLogLevel(2) + }) + }) + + Context("Disable Drain", func() { + It("should update the skip drain flag", func() { + soc := &sriovnetworkv1.SriovOperatorConfig{ObjectMeta: metav1.ObjectMeta{ + Name: consts.DefaultConfigName, + Namespace: testNamespace, + }, + Spec: sriovnetworkv1.SriovOperatorConfigSpec{ + DisableDrain: true, + }, + } + + err := k8sClient.Create(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + validateExpectedDrain(true) + + soc.Spec.DisableDrain = false + err = k8sClient.Update(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + validateExpectedDrain(false) + }) + }) + + Context("Feature gates", func() { + It("should update the feature gates struct", func() { + soc := &sriovnetworkv1.SriovOperatorConfig{ObjectMeta: metav1.ObjectMeta{ + Name: consts.DefaultConfigName, + Namespace: testNamespace, + }, + Spec: sriovnetworkv1.SriovOperatorConfigSpec{ + FeatureGates: map[string]bool{ + "test": true, + "bla": true, + }, + }, + } + + err := k8sClient.Create(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(vars.FeatureGate.IsEnabled("test")).To(BeTrue()) + }, "15s", "3s").Should(Succeed()) + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(vars.FeatureGate.IsEnabled("bla")).To(BeTrue()) + }, "15s", "3s").Should(Succeed()) + + soc.Spec.FeatureGates["test"] = false + err = k8sClient.Update(ctx, soc) + Expect(err).ToNot(HaveOccurred()) + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(vars.FeatureGate.IsEnabled("test")).To(BeFalse()) + }, "15s", "3s").Should(Succeed()) + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(vars.FeatureGate.IsEnabled("bla")).To(BeTrue()) + }, "15s", "3s").Should(Succeed()) + }) + }) +}) + +func validateExpectedLogLevel(level int) { + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(snolog.GetLogLevel()).To(Equal(level)) + }, "15s", "3s").Should(Succeed()) +} + +func validateExpectedDrain(disableDrain bool) { + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(vars.DisableDrain).To(Equal(disableDrain)) + }, "15s", "3s").Should(Succeed()) +} diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index ee4ac5acb9..a06030ac3f 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -3,29 +3,21 @@ package daemon import ( "context" "fmt" - "math/rand" "os/exec" "reflect" - "sync" "time" - "golang.org/x/time/rate" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" - sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper" - snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms" plugin "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/systemd" @@ -33,58 +25,27 @@ import ( "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" ) -const ( - // updateDelay is the baseline speed at which we react to changes. We don't - // need to react in milliseconds as any change would involve rebooting the node. - updateDelay = 5 * time.Second - // maxUpdateBackoff is the maximum time to react to a change as we back off - // in the face of errors. - maxUpdateBackoff = 60 * time.Second -) - -type Message struct { - syncStatus string - lastSyncError string -} - -type Daemon struct { +type DaemonReconcile struct { client client.Client sriovClient snclientset.Interface // kubeClient allows interaction with Kubernetes, including the node we are running on. kubeClient kubernetes.Interface - desiredNodeState *sriovnetworkv1.SriovNetworkNodeState - currentNodeState *sriovnetworkv1.SriovNetworkNodeState - - // list of disabled plugins - disabledPlugins []string - - loadedPlugins map[string]plugin.VendorPlugin - HostHelpers helper.HostHelpersInterface platformHelpers platforms.Interface - // channel used by callbacks to signal Run() of an error - exitCh chan<- error - - // channel used to ensure all spawned goroutines exit when we exit. - stopCh <-chan struct{} - - syncCh <-chan struct{} - - refreshCh chan<- Message - - mu *sync.Mutex - - disableDrain bool - - workqueue workqueue.RateLimitingInterface - eventRecorder *EventRecorder featureGate featuregate.FeatureGate + + // list of disabled plugins + disabledPlugins []string + + loadedPlugins map[string]plugin.VendorPlugin + lastAppliedGeneration int64 + disableDrain bool } func New( @@ -93,319 +54,111 @@ func New( kubeClient kubernetes.Interface, hostHelpers helper.HostHelpersInterface, platformHelper platforms.Interface, - exitCh chan<- error, - stopCh <-chan struct{}, - syncCh <-chan struct{}, - refreshCh chan<- Message, er *EventRecorder, featureGates featuregate.FeatureGate, disabledPlugins []string, -) *Daemon { - return &Daemon{ - client: client, - sriovClient: sriovClient, - kubeClient: kubeClient, - HostHelpers: hostHelpers, - platformHelpers: platformHelper, - exitCh: exitCh, - stopCh: stopCh, - syncCh: syncCh, - refreshCh: refreshCh, - desiredNodeState: &sriovnetworkv1.SriovNetworkNodeState{}, - currentNodeState: &sriovnetworkv1.SriovNetworkNodeState{}, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)}, - workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "SriovNetworkNodeState"), - eventRecorder: er, - featureGate: featureGates, - disabledPlugins: disabledPlugins, - mu: &sync.Mutex{}, - } -} - -// Run the config daemon -func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error { - log.Log.V(0).Info("Run()", "node", vars.NodeName) - - if vars.ClusterType == consts.ClusterTypeOpenshift { - log.Log.V(0).Info("Run(): start daemon.", "openshiftFlavor", dn.platformHelpers.GetFlavor()) - } else { - log.Log.V(0).Info("Run(): start daemon.") - } - - if !vars.UsingSystemdMode { - log.Log.V(0).Info("Run(): daemon running in daemon mode") - dn.HostHelpers.CheckRDMAEnabled() - dn.HostHelpers.TryEnableTun() - dn.HostHelpers.TryEnableVhostNet() - err := systemd.CleanSriovFilesFromHost(vars.ClusterType == consts.ClusterTypeOpenshift) - if err != nil { - log.Log.Error(err, "failed to remove all the systemd sriov files") - } - } else { - log.Log.V(0).Info("Run(): daemon running in systemd mode") - } - - // Only watch own SriovNetworkNodeState CR - defer utilruntime.HandleCrash() - defer dn.workqueue.ShutDown() - - if err := dn.prepareNMUdevRule(); err != nil { - log.Log.Error(err, "failed to prepare udev files to disable network manager on requested VFs") - } - if err := dn.HostHelpers.PrepareVFRepUdevRule(); err != nil { - log.Log.Error(err, "failed to prepare udev files to rename VF representors for requested VFs") - } - - var timeout int64 = 5 - var metadataKey = "metadata.name" - informerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient, - time.Second*15, - vars.Namespace, - func(lo *metav1.ListOptions) { - lo.FieldSelector = metadataKey + "=" + vars.NodeName - lo.TimeoutSeconds = &timeout - }, - ) - - informer := informerFactory.Sriovnetwork().V1().SriovNetworkNodeStates().Informer() - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: dn.enqueueNodeState, - UpdateFunc: func(old, new interface{}) { - dn.enqueueNodeState(new) - }, - }) - - cfgInformerFactory := sninformer.NewFilteredSharedInformerFactory(dn.sriovClient, - time.Second*30, - vars.Namespace, - func(lo *metav1.ListOptions) { - lo.FieldSelector = metadataKey + "=" + "default" - }, - ) - - cfgInformer := cfgInformerFactory.Sriovnetwork().V1().SriovOperatorConfigs().Informer() - cfgInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: dn.operatorConfigAddHandler, - UpdateFunc: dn.operatorConfigChangeHandler, - }) - - rand.Seed(time.Now().UnixNano()) - go cfgInformer.Run(dn.stopCh) - time.Sleep(5 * time.Second) - go informer.Run(dn.stopCh) - if ok := cache.WaitForCacheSync(stopCh, cfgInformer.HasSynced, informer.HasSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - log.Log.Info("Starting workers") - // Launch one worker to process - go wait.Until(dn.runWorker, time.Second, stopCh) - log.Log.Info("Started workers") - - for { - select { - case <-stopCh: - log.Log.V(0).Info("Run(): stop daemon") - return nil - case err, more := <-exitCh: - log.Log.Error(err, "got an error") - if more { - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusFailed, - lastSyncError: err.Error(), - } - } - return err - } +) *DaemonReconcile { + return &DaemonReconcile{ + client: client, + sriovClient: sriovClient, + kubeClient: kubeClient, + HostHelpers: hostHelpers, + platformHelpers: platformHelper, + + lastAppliedGeneration: 0, + eventRecorder: er, + featureGate: featureGates, + disabledPlugins: disabledPlugins, } } -func (dn *Daemon) runWorker() { - for dn.processNextWorkItem() { - } -} - -func (dn *Daemon) enqueueNodeState(obj interface{}) { - var ns *sriovnetworkv1.SriovNetworkNodeState - var ok bool - if ns, ok = obj.(*sriovnetworkv1.SriovNetworkNodeState); !ok { - utilruntime.HandleError(fmt.Errorf("expected SriovNetworkNodeState but got %#v", obj)) - return - } - key := ns.GetGeneration() - dn.workqueue.Add(key) -} - -func (dn *Daemon) processNextWorkItem() bool { - log.Log.V(2).Info("processNextWorkItem", "worker-queue-size", dn.workqueue.Len()) - obj, shutdown := dn.workqueue.Get() - if shutdown { - return false - } - - log.Log.V(2).Info("get item from queue", "item", obj.(int64)) - - // We wrap this block in a func so we can defer c.workqueue.Done. - err := func(obj interface{}) error { - // We call Done here so the workqueue knows we have finished - // processing this item. - defer dn.workqueue.Done(obj) - var key int64 - var ok bool - if key, ok = obj.(int64); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here. - dn.workqueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected workItem in workqueue but got %#v", obj)) - return nil - } - - err := dn.nodeStateSyncHandler() - if err != nil { - // Ereport error message, and put the item back to work queue for retry. - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusFailed, - lastSyncError: err.Error(), - } - <-dn.syncCh - dn.workqueue.AddRateLimited(key) - return fmt.Errorf("error syncing: %s, requeuing", err.Error()) - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - dn.workqueue.Forget(obj) - log.Log.Info("Successfully synced") - return nil - }(obj) - +func (dn *DaemonReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + reqLogger := log.FromContext(ctx).WithName("Reconcile") + // Get the latest NodeState + sriovResult := &systemd.SriovResult{SyncStatus: consts.SyncStatusSucceeded, LastSyncError: ""} + desiredNodeState := &sriovnetworkv1.SriovNetworkNodeState{} + err := dn.client.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, desiredNodeState) if err != nil { - utilruntime.HandleError(err) - } - - return true -} - -func (dn *Daemon) operatorConfigAddHandler(obj interface{}) { - dn.operatorConfigChangeHandler(&sriovnetworkv1.SriovOperatorConfig{}, obj) -} - -func (dn *Daemon) operatorConfigChangeHandler(old, new interface{}) { - oldCfg := old.(*sriovnetworkv1.SriovOperatorConfig) - newCfg := new.(*sriovnetworkv1.SriovOperatorConfig) - if newCfg.Namespace != vars.Namespace || newCfg.Name != consts.DefaultConfigName { - log.Log.V(2).Info("unsupported SriovOperatorConfig", "namespace", newCfg.Namespace, "name", newCfg.Name) - return - } - - snolog.SetLogLevel(newCfg.Spec.LogLevel) - - newDisableDrain := newCfg.Spec.DisableDrain - if dn.disableDrain != newDisableDrain { - dn.disableDrain = newDisableDrain - log.Log.Info("Set Disable Drain", "value", dn.disableDrain) + if errors.IsNotFound(err) { + reqLogger.Info("NodeState doesn't exist") + return ctrl.Result{}, nil + } + reqLogger.Error(err, "Failed to fetch node state", "name", vars.NodeName) + return ctrl.Result{}, err } + originalNodeState := desiredNodeState.DeepCopy() - if !reflect.DeepEqual(oldCfg.Spec.FeatureGates, newCfg.Spec.FeatureGates) { - dn.featureGate.Init(newCfg.Spec.FeatureGates) - log.Log.Info("Updated featureGates", "featureGates", dn.featureGate.String()) - } + latest := desiredNodeState.GetGeneration() + reqLogger.V(0).Info("new generation", "generation", latest) - vars.MlxPluginFwReset = dn.featureGate.IsEnabled(consts.MellanoxFirmwareResetFeatureGate) -} - -func (dn *Daemon) nodeStateSyncHandler() error { - var err error - // Get the latest NodeState - var sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusSucceeded, LastSyncError: ""} - dn.desiredNodeState, err = dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{}) + // Update the nodeState Status object with the existing network interfaces + err = dn.getHostNetworkStatus(desiredNodeState) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName) - return err + reqLogger.Error(err, "failed to get host network status") + return ctrl.Result{}, err } - latest := dn.desiredNodeState.GetGeneration() - log.Log.V(0).Info("nodeStateSyncHandler(): new generation", "generation", latest) // load plugins if it has not loaded if len(dn.loadedPlugins) == 0 { - dn.loadedPlugins, err = loadPlugins(dn.desiredNodeState, dn.HostHelpers, dn.disabledPlugins) + dn.loadedPlugins, err = loadPlugins(desiredNodeState, dn.HostHelpers, dn.disabledPlugins) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to enable vendor plugins") - return err + reqLogger.Error(err, "failed to enable vendor plugins") + return ctrl.Result{}, err } } - skipReconciliation := true - // if the operator complete the drain operator we should continue the configuration - if !dn.isDrainCompleted() { - if vars.UsingSystemdMode && dn.currentNodeState.GetGeneration() == latest { - serviceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovServicePath) + if dn.lastAppliedGeneration == latest { + if vars.UsingSystemdMode && dn.lastAppliedGeneration == latest { + // Check for systemd services and output of the systemd service run + sriovResult, err = dn.CheckSystemdStatus(ctx, desiredNodeState) + //TODO: in the case we need to think what to do if we try to apply again or not + // for now I will leave it like that so we don't enter into a boot loop if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config service exist on host") - return err - } - postNetworkServiceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovPostNetworkServicePath) - if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to check if sriov-config-post-network service exist on host") - return err + reqLogger.Error(err, "failed to check systemd status unexpected error") + return ctrl.Result{}, nil } - // if the service doesn't exist we should continue to let the k8s plugin to create the service files - // this is only for k8s base environments, for openshift the sriov-operator creates a machine config to will apply - // the system service and reboot the node the config-daemon doesn't need to do anything. - if !(serviceEnabled && postNetworkServiceEnabled) { - sriovResult = &systemd.SriovResult{SyncStatus: consts.SyncStatusFailed, - LastSyncError: fmt.Sprintf("some sriov systemd services are not available on node: "+ - "sriov-config available:%t, sriov-config-post-network available:%t", serviceEnabled, postNetworkServiceEnabled)} - } else { - sriovResult, err = systemd.ReadSriovResult() + // only if something is not equal we apply of not we continue to check if something change on the node, + // and we need to trigger a reconfiguration + if desiredNodeState.Status.SyncStatus != sriovResult.SyncStatus || + desiredNodeState.Status.LastSyncError != sriovResult.LastSyncError { + err = dn.updateSyncState(ctx, desiredNodeState, sriovResult.SyncStatus, sriovResult.LastSyncError) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to load sriov result file from host") - return err - } - } - if sriovResult.LastSyncError != "" || sriovResult.SyncStatus == consts.SyncStatusFailed { - log.Log.Info("nodeStateSyncHandler(): sync failed systemd service error", "last-sync-error", sriovResult.LastSyncError) - - // add the error but don't requeue - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusFailed, - lastSyncError: sriovResult.LastSyncError, + reqLogger.Error(err, "failed to update sync status") } - <-dn.syncCh - return nil + return ctrl.Result{}, err } } - skipReconciliation, err = dn.shouldSkipReconciliation(dn.desiredNodeState) + // Check if there is a change in the host network interfaces that require a reconfiguration by the daemon + skipReconciliation, err := dn.shouldSkipReconciliation(ctx, desiredNodeState) if err != nil { - return err + return ctrl.Result{}, err } - } - - // we are done with the configuration just return here - if dn.currentNodeState.GetGeneration() == dn.desiredNodeState.GetGeneration() && - dn.desiredNodeState.Status.SyncStatus == consts.SyncStatusSucceeded && skipReconciliation { - log.Log.Info("Current state and desire state are equal together with sync status succeeded nothing to do") - return nil - } - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusInProgress, - lastSyncError: "", + if skipReconciliation { + // Check if we need to update the nodeState status + if dn.shouldUpdateStatus(desiredNodeState, originalNodeState) { + err = dn.updateSyncState(ctx, desiredNodeState, desiredNodeState.Status.SyncStatus, desiredNodeState.Status.LastSyncError) + if err != nil { + reqLogger.Error(err, "failed to update NodeState status") + return ctrl.Result{}, err + } + } + // if we didn't update the status we requeue the request to check the interfaces again after the expected time + reqLogger.Info("Current state and desire state are equal together with sync status succeeded nothing to do") + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } } - // wait for writer to refresh status then pull again the latest node state - <-dn.syncCh - // we need to load the latest status to our object - // if we don't do it we can have a race here where the user remove the virtual functions but the operator didn't - // trigger the refresh - updatedState, err := dn.sriovClient.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{}) - if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): Failed to fetch node state", "name", vars.NodeName) - return err + // if the sync status is not inProgress we set it to inProgress and wait for another reconciliation loop + if desiredNodeState.Status.SyncStatus != consts.SyncStatusInProgress { + err = dn.updateSyncState(ctx, desiredNodeState, consts.SyncStatusInProgress, "") + if err != nil { + reqLogger.Error(err, "failed to update sync status to inProgress") + return ctrl.Result{}, err + } } - dn.desiredNodeState.Status = updatedState.Status reqReboot := false reqDrain := false @@ -413,17 +166,15 @@ func (dn *Daemon) nodeStateSyncHandler() error { // check if any of the plugins required to drain or reboot the node for k, p := range dn.loadedPlugins { d, r := false, false - if dn.currentNodeState.GetName() == "" { - log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for a new node state") - } else { - log.Log.V(0).Info("nodeStateSyncHandler(): calling OnNodeStateChange for an updated node state") - } - d, r, err = p.OnNodeStateChange(dn.desiredNodeState) + d, r, err = p.OnNodeStateChange(desiredNodeState) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): OnNodeStateChange plugin error", "plugin-name", k) - return err + reqLogger.Error(err, "OnNodeStateChange plugin error", "plugin-name", k) + return ctrl.Result{}, err } - log.Log.V(0).Info("nodeStateSyncHandler(): OnNodeStateChange result", "plugin", k, "drain-required", d, "reboot-required", r) + reqLogger.V(0).Info("OnNodeStateChange result", + "plugin", k, + "drain-required", d, + "reboot-required", r) reqDrain = reqDrain || d reqReboot = reqReboot || r } @@ -432,80 +183,182 @@ func (dn *Daemon) nodeStateSyncHandler() error { // or there is a new config we need to apply // When using systemd configuration we write the file if vars.UsingSystemdMode { - log.Log.V(0).Info("nodeStateSyncHandler(): writing systemd config file to host") - systemdConfModified, err := systemd.WriteConfFile(dn.desiredNodeState) + reqLogger.V(0).Info("writing systemd config file to host") + systemdConfModified, err := systemd.WriteConfFile(desiredNodeState) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to write configuration file for systemd mode") - return err + reqLogger.Error(err, "failed to write configuration file for systemd mode") + return ctrl.Result{}, err } if systemdConfModified { // remove existing result file to make sure that we will not use outdated result, e.g. in case if // systemd service was not triggered for some reason err = systemd.RemoveSriovResult() if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to remove result file for systemd mode") - return err + reqLogger.Error(err, "failed to remove result file for systemd mode") + return ctrl.Result{}, err } } reqDrain = reqDrain || systemdConfModified // require reboot if drain needed for systemd mode reqReboot = reqReboot || systemdConfModified || reqDrain - log.Log.V(0).Info("nodeStateSyncHandler(): systemd mode WriteConfFile results", + reqLogger.V(0).Info("systemd mode WriteConfFile results", "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain) err = systemd.WriteSriovSupportedNics() if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): failed to write supported nic ids file for systemd mode") - return err + reqLogger.Error(err, "failed to write supported nic ids file for systemd mode") + return ctrl.Result{}, err } } - log.Log.V(0).Info("nodeStateSyncHandler(): aggregated daemon", + reqLogger.V(0).Info("aggregated daemon node state requirement", "drain-required", reqDrain, "reboot-required", reqReboot, "disable-drain", dn.disableDrain) - // handle drain only if the plugin request drain, or we are already in a draining request state - if reqDrain || !utils.ObjectHasAnnotation(dn.desiredNodeState, - consts.NodeStateDrainAnnotationCurrent, - consts.DrainIdle) { - drainInProcess, err := dn.handleDrain(reqReboot) + // handle drain only if the plugins request drain, or we are already in a draining request state + if reqDrain || (utils.ObjectHasAnnotationKey(desiredNodeState, consts.NodeStateDrainAnnotationCurrent) && + !utils.ObjectHasAnnotation(desiredNodeState, + consts.NodeStateDrainAnnotationCurrent, + consts.DrainIdle)) { + drainInProcess, err := dn.handleDrain(ctx, desiredNodeState, reqReboot) if err != nil { - log.Log.Error(err, "failed to handle drain") - return err + reqLogger.Error(err, "failed to handle drain") + return ctrl.Result{}, err } + // drain is still in progress we don't need to re-queue the request as the operator will update the annotation if drainInProcess { - return nil + return ctrl.Result{}, nil } } // if we don't need to drain, and we are on idle we need to request the device plugin reset - if !reqDrain && utils.ObjectHasAnnotation(dn.desiredNodeState, + if !reqDrain && utils.ObjectHasAnnotation(desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainIdle) { - log.Log.Info("nodeStateSyncHandler(): apply 'Device_Plugin_Reset_Required' annotation for node") - err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DevicePluginResetRequired, dn.client) + _, err = dn.annotate(ctx, desiredNodeState, consts.DevicePluginResetRequired) if err != nil { - log.Log.Error(err, "handleDrain(): Failed to annotate node") - return err + reqLogger.Error(err, "failed to request device plugin reset") + return ctrl.Result{}, err } + // we return here and wait for another reconcile loop where the operator will finish + // the device plugin removal from the node + return ctrl.Result{}, nil + } + + // if we finish the drain we should run apply here + if dn.isDrainCompleted(desiredNodeState) { + return dn.Apply(ctx, desiredNodeState, reqReboot, sriovResult) + } - log.Log.Info("handleDrain(): apply 'Device_Plugin_Reset_Required' annotation for nodeState") - if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState, - consts.NodeStateDrainAnnotation, - consts.DevicePluginResetRequired, dn.client); err != nil { + return ctrl.Result{}, err +} + +func (dn *DaemonReconcile) DaemonInitialization() error { + funcLog := log.Log.WithName("DaemonInitialization") + var err error + + if !vars.UsingSystemdMode { + funcLog.V(0).Info("daemon running in daemon mode") + _, err = dn.HostHelpers.CheckRDMAEnabled() + if err != nil { + funcLog.Error(err, "warning, failed to check RDMA state") + } + dn.HostHelpers.TryEnableTun() + dn.HostHelpers.TryEnableVhostNet() + err = systemd.CleanSriovFilesFromHost(vars.ClusterType == consts.ClusterTypeOpenshift) + if err != nil { + funcLog.Error(err, "failed to remove all the systemd sriov files") + } + } else { + funcLog.V(0).Info("Run(): daemon running in systemd mode") + } + + if err := dn.prepareNMUdevRule(); err != nil { + funcLog.Error(err, "failed to prepare udev files to disable network manager on requested VFs") + } + if err := dn.HostHelpers.PrepareVFRepUdevRule(); err != nil { + funcLog.Error(err, "failed to prepare udev files to rename VF representors for requested VFs") + } + + ns := &sriovnetworkv1.SriovNetworkNodeState{} + // init openstack info + if vars.PlatformType == consts.VirtualOpenStack { + ns, err = dn.HostHelpers.GetCheckPointNodeState() + if err != nil { return err } - return nil + if ns == nil { + err = dn.platformHelpers.CreateOpenstackDevicesInfo() + if err != nil { + return err + } + } else { + dn.platformHelpers.CreateOpenstackDevicesInfoFromNodeStatus(ns) + } + } + + err = dn.getHostNetworkStatus(ns) + if err != nil { + funcLog.Error(err, "failed to get host network status on init") + return err + } + + // save init state + err = dn.HostHelpers.WriteCheckpointFile(ns) + if err != nil { + funcLog.Error(err, "failed to write checkpoint file on host") + } + return nil +} + +func (dn *DaemonReconcile) CheckSystemdStatus(ctx context.Context, desiredNodeState *sriovnetworkv1.SriovNetworkNodeState) (*systemd.SriovResult, error) { + funcLog := log.Log.WithName("CheckSystemdStatus") + serviceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovServicePath) + if err != nil { + funcLog.Error(err, "failed to check if sriov-config service exist on host") + return nil, err + } + postNetworkServiceEnabled, err := dn.HostHelpers.IsServiceEnabled(systemd.SriovPostNetworkServicePath) + if err != nil { + funcLog.Error(err, "failed to check if sriov-config-post-network service exist on host") + return nil, err + } + + // if the service doesn't exist we should continue to let the k8s plugin to create the service files + // this is only for k8s base environments, for openshift the sriov-operator creates a machine config to will apply + // the system service and reboot the node the config-daemon doesn't need to do anything. + sriovResult := &systemd.SriovResult{SyncStatus: consts.SyncStatusFailed, + LastSyncError: fmt.Sprintf("some sriov systemd services are not available on node: "+ + "sriov-config available:%t, sriov-config-post-network available:%t", serviceEnabled, postNetworkServiceEnabled)} + if serviceEnabled && postNetworkServiceEnabled { + sriovResult, err = systemd.ReadSriovResult() + if err != nil { + funcLog.Error(err, "failed to load sriov result file from host") + return nil, err + } + } + + if sriovResult.LastSyncError != "" || sriovResult.SyncStatus == consts.SyncStatusFailed { + funcLog.Info("sync failed systemd service error", "last-sync-error", sriovResult.LastSyncError) + err = dn.updateSyncState(ctx, desiredNodeState, consts.SyncStatusFailed, sriovResult.LastSyncError) + if err != nil { + return nil, err + } + dn.lastAppliedGeneration = desiredNodeState.Generation } + return sriovResult, nil +} +func (dn *DaemonReconcile) Apply(ctx context.Context, desiredNodeState *sriovnetworkv1.SriovNetworkNodeState, reqReboot bool, sriovResult *systemd.SriovResult) (ctrl.Result, error) { + reqLogger := log.FromContext(ctx).WithName("Apply") // apply the vendor plugins after we are done with drain if needed for k, p := range dn.loadedPlugins { // Skip both the general and virtual plugin apply them last if k != GenericPluginName && k != VirtualPluginName { err := p.Apply() if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): plugin Apply failed", "plugin-name", k) - return err + reqLogger.Error(err, "plugin Apply failed", "plugin-name", k) + return ctrl.Result{}, err } } } @@ -517,10 +370,10 @@ func (dn *Daemon) nodeStateSyncHandler() error { selectedPlugin, ok := dn.loadedPlugins[GenericPluginName] if ok { // Apply generic plugin last - err = selectedPlugin.Apply() + err := selectedPlugin.Apply() if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): generic plugin fail to apply") - return err + reqLogger.Error(err, "generic plugin fail to apply") + return ctrl.Result{}, err } } @@ -528,81 +381,75 @@ func (dn *Daemon) nodeStateSyncHandler() error { selectedPlugin, ok = dn.loadedPlugins[VirtualPluginName] if ok { // Apply virtual plugin last - err = selectedPlugin.Apply() + err := selectedPlugin.Apply() if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): virtual plugin failed to apply") - return err + reqLogger.Error(err, "virtual plugin failed to apply") + return ctrl.Result{}, err } } } if reqReboot { - log.Log.Info("nodeStateSyncHandler(): reboot node") - dn.eventRecorder.SendEvent("RebootNode", "Reboot node has been initiated") - dn.rebootNode() - return nil + reqLogger.Info("reboot node") + dn.eventRecorder.SendEvent(ctx, "RebootNode", "Reboot node has been initiated") + return ctrl.Result{}, dn.rebootNode() } - log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for node") - err = utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainIdle, dn.client) + _, err := dn.annotate(ctx, desiredNodeState, consts.DrainIdle) if err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): Failed to annotate node") - return err + reqLogger.Error(err, "failed to request annotation update to idle") + return ctrl.Result{}, err } - log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for nodeState") - if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState, - consts.NodeStateDrainAnnotation, - consts.DrainIdle, dn.client); err != nil { - return err + reqLogger.Info("sync succeeded") + syncStatus := consts.SyncStatusSucceeded + lastSyncError := "" + if vars.UsingSystemdMode { + syncStatus = sriovResult.SyncStatus + lastSyncError = sriovResult.LastSyncError } - log.Log.Info("nodeStateSyncHandler(): sync succeeded") - dn.currentNodeState = dn.desiredNodeState.DeepCopy() - if vars.UsingSystemdMode { - dn.refreshCh <- Message{ - syncStatus: sriovResult.SyncStatus, - lastSyncError: sriovResult.LastSyncError, - } - } else { - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusSucceeded, - lastSyncError: "", - } + // Update the nodeState Status object with the existing network interfaces + err = dn.getHostNetworkStatus(desiredNodeState) + if err != nil { + reqLogger.Error(err, "failed to get host network status") + return ctrl.Result{}, err } - // wait for writer to refresh the status - <-dn.syncCh - return nil + + err = dn.updateSyncState(ctx, desiredNodeState, syncStatus, lastSyncError) + if err != nil { + reqLogger.Error(err, "failed to update sync status") + } + dn.lastAppliedGeneration = desiredNodeState.Generation + return ctrl.Result{}, err } -func (dn *Daemon) shouldSkipReconciliation(latestState *sriovnetworkv1.SriovNetworkNodeState) (bool, error) { - log.Log.V(0).Info("shouldSkipReconciliation()") +func (dn *DaemonReconcile) shouldSkipReconciliation(ctx context.Context, latestState *sriovnetworkv1.SriovNetworkNodeState) (bool, error) { + funcLog := log.Log.WithName("shouldSkipReconciliation") var err error // Skip when SriovNetworkNodeState object has just been created. if latestState.GetGeneration() == 1 && len(latestState.Spec.Interfaces) == 0 { err = dn.HostHelpers.ClearPCIAddressFolder() if err != nil { - log.Log.Error(err, "failed to clear the PCI address configuration") + funcLog.Error(err, "failed to clear the PCI address configuration") return false, err } - log.Log.V(0).Info( - "shouldSkipReconciliation(): interface policy spec not yet set by controller for sriovNetworkNodeState", + funcLog.V(0).Info("interface policy spec not yet set by controller for sriovNetworkNodeState", "name", latestState.Name) - if latestState.Status.SyncStatus != consts.SyncStatusSucceeded { - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusSucceeded, - lastSyncError: "", + if latestState.Status.SyncStatus != consts.SyncStatusSucceeded || + latestState.Status.LastSyncError != "" { + err = dn.updateSyncState(ctx, latestState, consts.SyncStatusSucceeded, "") + if err != nil { + return false, err } - // wait for writer to refresh status - <-dn.syncCh } return true, nil } // Verify changes in the status of the SriovNetworkNodeState CR. - if dn.currentNodeState.GetGeneration() == latestState.GetGeneration() { + if dn.lastAppliedGeneration == latestState.Generation { log.Log.V(0).Info("shouldSkipReconciliation() verifying status change") for _, p := range dn.loadedPlugins { // Verify changes in the status of the SriovNetworkNodeState CR. @@ -620,82 +467,89 @@ func (dn *Daemon) shouldSkipReconciliation(latestState *sriovnetworkv1.SriovNetw log.Log.V(0).Info("shouldSkipReconciliation(): Interface not changed") if latestState.Status.LastSyncError != "" || latestState.Status.SyncStatus != consts.SyncStatusSucceeded { - dn.refreshCh <- Message{ - syncStatus: consts.SyncStatusSucceeded, - lastSyncError: "", + err = dn.updateSyncState(ctx, latestState, consts.SyncStatusSucceeded, "") + if err != nil { + return false, err } - // wait for writer to refresh the status - <-dn.syncCh } - return true, nil } return false, nil } +func (dn *DaemonReconcile) shouldUpdateStatus(latestState, originalState *sriovnetworkv1.SriovNetworkNodeState) bool { + funcLog := log.Log.WithName("shouldUpdateStatus") + for _, latestInt := range latestState.Status.Interfaces { + found := false + for _, originalInt := range originalState.Status.Interfaces { + if latestInt.PciAddress == originalInt.PciAddress { + found = true + + // we remove the VFs list as the info change based on if the vf is allocated to a pod or not + copyLatestInt := latestInt.DeepCopy() + copyLatestInt.VFs = nil + copyOriginalInt := originalInt.DeepCopy() + copyOriginalInt.VFs = nil + if !reflect.DeepEqual(copyLatestInt, copyOriginalInt) { + if funcLog.V(2).Enabled() { + funcLog.V(2).Info("interface status changed", "originalInterface", copyOriginalInt, "latestInterface", copyLatestInt) + } else { + funcLog.Info("interface status changed", "pciAddress", latestInt.PciAddress) + } + return true + } + funcLog.V(2).Info("DEBUG interface not changed", "lastest", latestInt, "original", originalInt) + break + } + } + if !found { + funcLog.Info("PF doesn't exist in current nodeState status need to update nodeState on api server", + "pciAddress", + latestInt.PciAddress) + return true + } + } + return false +} + // handleDrain: adds the right annotation to the node and nodeState object // returns true if we need to finish the reconcile loop and wait for a new object -func (dn *Daemon) handleDrain(reqReboot bool) (bool, error) { +func (dn *DaemonReconcile) handleDrain(ctx context.Context, desiredNodeState *sriovnetworkv1.SriovNetworkNodeState, reqReboot bool) (bool, error) { + funcLog := log.Log.WithName("handleDrain") // done with the drain we can continue with the configuration - if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) { - log.Log.Info("handleDrain(): the node complete the draining") + if utils.ObjectHasAnnotation(desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) { + funcLog.Info("the node complete the draining") return false, nil } // the operator is still draining the node so we reconcile - if utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.Draining) { - log.Log.Info("handleDrain(): the node is still draining") + if utils.ObjectHasAnnotation(desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.Draining) { + funcLog.Info("the node is still draining") return true, nil } // drain is disabled we continue with the configuration if dn.disableDrain { - log.Log.Info("handleDrain(): drain is disabled in sriovOperatorConfig") + funcLog.Info("drain is disabled in sriovOperatorConfig") return false, nil } + // annotate both node and node state with drain or reboot + annotation := consts.DrainRequired if reqReboot { - log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for node") - err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.RebootRequired, dn.client) - if err != nil { - log.Log.Error(err, "applyDrainRequired(): Failed to annotate node") - return false, err - } - - log.Log.Info("handleDrain(): apply 'Reboot_Required' annotation for nodeState") - if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState, - consts.NodeStateDrainAnnotation, - consts.RebootRequired, dn.client); err != nil { - return false, err - } - - // the node was annotated we need to wait for the operator to finish the drain - return true, nil + annotation = consts.RebootRequired } - log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for node") - err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainRequired, dn.client) - if err != nil { - log.Log.Error(err, "handleDrain(): Failed to annotate node") - return false, err - } - - log.Log.Info("handleDrain(): apply 'Drain_Required' annotation for nodeState") - if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState, - consts.NodeStateDrainAnnotation, - consts.DrainRequired, dn.client); err != nil { - return false, err - } - - // the node was annotated we need to wait for the operator to finish the drain - return true, nil + return dn.annotate(ctx, desiredNodeState, annotation) } -func (dn *Daemon) rebootNode() { - log.Log.Info("rebootNode(): trigger node reboot") +func (dn *DaemonReconcile) rebootNode() error { + funcLog := log.Log.WithName("rebootNode") + funcLog.Info("trigger node reboot") exit, err := dn.HostHelpers.Chroot(consts.Host) if err != nil { - log.Log.Error(err, "rebootNode(): chroot command failed") + funcLog.Error(err, "chroot command failed") + return err } defer exit() // creates a new transient systemd unit to reboot the system. @@ -709,11 +563,13 @@ func (dn *Daemon) rebootNode() { "--description", "sriov-network-config-daemon reboot node", "/bin/sh", "-c", "systemctl stop kubelet.service; reboot") if err := cmd.Run(); err != nil { - log.Log.Error(err, "failed to reboot node") + funcLog.Error(err, "failed to reboot node") + return err } + return nil } -func (dn *Daemon) prepareNMUdevRule() error { +func (dn *DaemonReconcile) prepareNMUdevRule() error { // we need to remove the Red Hat Virtio network device from the udev rule configuration // if we don't remove it when running the config-daemon on a virtual node it will disconnect the node after a reboot // even that the operator should not be installed on virtual environments that are not openstack @@ -730,6 +586,37 @@ func (dn *Daemon) prepareNMUdevRule() error { } // isDrainCompleted returns true if the current-state annotation is drain completed -func (dn *Daemon) isDrainCompleted() bool { - return utils.ObjectHasAnnotation(dn.desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) +func (dn *DaemonReconcile) isDrainCompleted(desiredNodeState *sriovnetworkv1.SriovNetworkNodeState) bool { + return utils.ObjectHasAnnotation(desiredNodeState, consts.NodeStateDrainAnnotationCurrent, consts.DrainComplete) +} + +func (dn *DaemonReconcile) annotate( + ctx context.Context, + desiredNodeState *sriovnetworkv1.SriovNetworkNodeState, + annotationState string) (bool, error) { + funcLog := log.Log.WithName("annotate") + + funcLog.Info(fmt.Sprintf("apply '%s' annotation for node", annotationState)) + err := utils.AnnotateNode(ctx, desiredNodeState.Name, consts.NodeDrainAnnotation, annotationState, dn.client) + if err != nil { + log.Log.Error(err, "Failed to annotate node") + return false, err + } + + funcLog.Info(fmt.Sprintf("apply '%s' annotation for nodeState", annotationState)) + if err := utils.AnnotateObject(context.Background(), desiredNodeState, + consts.NodeStateDrainAnnotation, + annotationState, dn.client); err != nil { + return false, err + } + + // the node was annotated we need to wait for the operator to finish the drain + return true, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (dn *DaemonReconcile) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&sriovnetworkv1.SriovNetworkNodeState{}). + Complete(dn) } diff --git a/pkg/daemon/daemon_suite_test.go b/pkg/daemon/daemon_suite_test.go new file mode 100644 index 0000000000..89432eae76 --- /dev/null +++ b/pkg/daemon/daemon_suite_test.go @@ -0,0 +1,135 @@ +package daemon_test + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" + openshiftconfigv1 "github.com/openshift/api/config/v1" + mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + "go.uber.org/zap/zapcore" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" + "github.com/k8snetworkplumbingwg/sriov-network-operator/test/util" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var ( + k8sClient client.Client + testEnv *envtest.Environment + cfg *rest.Config +) + +// Define utility constants for object names and testing timeouts/durations and intervals. +const testNamespace = "openshift-sriov-network-operator" + +var _ = BeforeSuite(func() { + var err error + + logf.SetLogger(zap.New( + zap.WriteTo(GinkgoWriter), + zap.UseDevMode(true), + func(o *zap.Options) { + o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder + })) + + // Go to project root directory + err = os.Chdir("../..") + Expect(err).NotTo(HaveOccurred()) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("config", "crd", "bases"), filepath.Join("test", "util", "crds")}, + ErrorIfCRDPathMissing: true, + } + + testEnv.ControlPlane.GetAPIServer().Configure().Set("disable-admission-plugins", "MutatingAdmissionWebhook", "ValidatingAdmissionWebhook") + + cfg, err = testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + By("registering schemes") + err = sriovnetworkv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = netattdefv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = mcfgv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = openshiftconfigv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = monitoringv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + vars.Config = cfg + vars.Scheme = scheme.Scheme + vars.Namespace = testNamespace + + By("creating K8s client") + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + + By("creating default/common k8s objects for tests") + // Create test namespace + ns := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: testNamespace, + }, + Spec: corev1.NamespaceSpec{}, + Status: corev1.NamespaceStatus{}, + } + Expect(k8sClient.Create(context.Background(), ns)).Should(Succeed()) + + sa := &corev1.ServiceAccount{TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + Namespace: testNamespace, + }} + Expect(k8sClient.Create(context.Background(), sa)).Should(Succeed()) + + // Create openshift Infrastructure + infra := &openshiftconfigv1.Infrastructure{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + }, + Spec: openshiftconfigv1.InfrastructureSpec{}, + Status: openshiftconfigv1.InfrastructureStatus{ + ControlPlaneTopology: openshiftconfigv1.HighlyAvailableTopologyMode, + }, + } + Expect(k8sClient.Create(context.Background(), infra)).Should(Succeed()) +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + if testEnv != nil { + Eventually(func() error { + return testEnv.Stop() + }, util.APITimeout, time.Second).ShouldNot(HaveOccurred()) + } +}) + +func TestDaemon(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Daemon Suite") +} diff --git a/pkg/daemon/daemon_test.go b/pkg/daemon/daemon_test.go index 6104dd99cd..50f2b47494 100644 --- a/pkg/daemon/daemon_test.go +++ b/pkg/daemon/daemon_test.go @@ -1,302 +1,235 @@ -package daemon +package daemon_test import ( "context" - "flag" - "testing" + "sync" + "time" - "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "go.uber.org/zap/zapcore" + + "github.com/golang/mock/gomock" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - fakek8s "k8s.io/client-go/kubernetes/fake" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - kclient "sigs.k8s.io/controller-runtime/pkg/client/fake" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" - snclient "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" - snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned/fake" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" + constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/daemon" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper" mock_helper "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper/mock" + snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms" mock_platforms "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms/mock" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms/openshift" - plugin "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins/fake" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins/generic" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" - "github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/fakefilesystem" ) -var SriovDevicePluginPod corev1.Pod - -func TestConfigDaemon(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Config Daemon Suite") -} - -var _ = BeforeSuite(func() { - // Increase verbosity to help debugging failures - flag.Set("logtostderr", "true") - flag.Set("stderrthreshold", "WARNING") - flag.Set("v", "2") - - logf.SetLogger(zap.New( - zap.WriteTo(GinkgoWriter), - zap.UseDevMode(true), - func(o *zap.Options) { - o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder - })) -}) - -var _ = Describe("Config Daemon", func() { - var stopCh chan struct{} - var syncCh chan struct{} - var exitCh chan error - var refreshCh chan Message - - var cleanFakeFs func() +var ( + cancel context.CancelFunc + ctx context.Context + k8sManager manager.Manager + snclient *snclientset.Clientset + kubeclient *kubernetes.Clientset + eventRecorder *daemon.EventRecorder + wg sync.WaitGroup + startDaemon func(dc *daemon.DaemonReconcile) +) - var sut *Daemon +const ( + waitTime = 10 * time.Minute + retryTime = 5 * time.Second +) - BeforeEach(func() { - stopCh = make(chan struct{}) - refreshCh = make(chan Message) - exitCh = make(chan error) - syncCh = make(chan struct{}, 64) - - // Fill syncCh with values so daemon doesn't wait for a writer - for i := 0; i < 64; i++ { - syncCh <- struct{}{} +var _ = Describe("Daemon Controller", Ordered, func() { + BeforeAll(func() { + ctx, cancel = context.WithCancel(context.Background()) + wg = sync.WaitGroup{} + startDaemon = func(dc *daemon.DaemonReconcile) { + By("start controller manager") + wg.Add(1) + go func() { + defer wg.Done() + defer GinkgoRecover() + By("Start controller manager") + err := k8sManager.Start(ctx) + Expect(err).ToNot(HaveOccurred()) + }() } - // Create virtual filesystem for Daemon - fakeFs := &fakefilesystem.FS{ - Dirs: []string{ - "bindata/scripts", - "host/etc/sriov-operator", - "host/etc/sriov-operator/pci", - "host/etc/udev/rules.d", - }, - Symlinks: map[string]string{}, - Files: map[string][]byte{ - "/bindata/scripts/enable-rdma.sh": []byte(""), - "/bindata/scripts/load-kmod.sh": []byte(""), + Expect(k8sClient.DeleteAllOf(context.Background(), &sriovnetworkv1.SriovOperatorConfig{}, client.InNamespace(testNamespace))).ToNot(HaveOccurred()) + soc := &sriovnetworkv1.SriovOperatorConfig{ObjectMeta: metav1.ObjectMeta{ + Name: constants.DefaultConfigName, + Namespace: testNamespace, + }, + Spec: sriovnetworkv1.SriovOperatorConfigSpec{ + LogLevel: 2, }, } - - var err error - vars.FilesystemRoot, cleanFakeFs, err = fakeFs.Use() + err := k8sClient.Create(ctx, soc) Expect(err).ToNot(HaveOccurred()) - vars.UsingSystemdMode = false - vars.NodeName = "test-node" - vars.Namespace = "sriov-network-operator" - vars.PlatformType = consts.Baremetal - - FakeSupportedNicIDs := corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: sriovnetworkv1.SupportedNicIDConfigmap, - Namespace: vars.Namespace, - }, - Data: map[string]string{ - "Intel_i40e_XXV710": "8086 158a 154c", - "Nvidia_mlx5_ConnectX-4": "15b3 1013 1014", - }, - } - - err = sriovnetworkv1.AddToScheme(scheme.Scheme) - Expect(err).ToNot(HaveOccurred()) - kClient := kclient.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(&corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "test-node"}}, - &sriovnetworkv1.SriovNetworkNodeState{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - Namespace: vars.Namespace, - }}).Build() - - kubeClient := fakek8s.NewSimpleClientset(&FakeSupportedNicIDs) - snclient := snclientset.NewSimpleClientset() - err = sriovnetworkv1.InitNicIDMapFromConfigMap(kubeClient, vars.Namespace) - Expect(err).ToNot(HaveOccurred()) + snclient = snclientset.NewForConfigOrDie(cfg) + kubeclient = kubernetes.NewForConfigOrDie(cfg) + eventRecorder = daemon.NewEventRecorder(snclient, kubeclient, scheme.Scheme) + DeferCleanup(func() { + eventRecorder.Shutdown() + }) - er := NewEventRecorder(snclient, kubeClient) - - t := GinkgoT() - mockCtrl := gomock.NewController(t) - platformHelper := mock_platforms.NewMockInterface(mockCtrl) - platformHelper.EXPECT().GetFlavor().Return(openshift.OpenshiftFlavorDefault).AnyTimes() - platformHelper.EXPECT().IsOpenshiftCluster().Return(false).AnyTimes() - platformHelper.EXPECT().IsHypershift().Return(false).AnyTimes() - - vendorHelper := mock_helper.NewMockHostHelpersInterface(mockCtrl) - vendorHelper.EXPECT().CheckRDMAEnabled().Return(true, nil).AnyTimes() - vendorHelper.EXPECT().TryEnableVhostNet().AnyTimes() - vendorHelper.EXPECT().TryEnableTun().AnyTimes() - vendorHelper.EXPECT().PrepareNMUdevRule([]string{"0x1014", "0x154c"}).Return(nil).AnyTimes() - vendorHelper.EXPECT().PrepareVFRepUdevRule().Return(nil).AnyTimes() - - featureGates := featuregate.New() - - sut = New( - kClient, - snclient, - kubeClient, - vendorHelper, - platformHelper, - exitCh, - stopCh, - syncCh, - refreshCh, - er, - featureGates, - nil, - ) - - sut.loadedPlugins = map[string]plugin.VendorPlugin{generic.PluginName: &fake.FakePlugin{PluginName: "fake"}} - - go func() { - defer GinkgoRecover() - err := sut.Run(stopCh, exitCh) - Expect(err).ToNot(HaveOccurred()) - }() - - SriovDevicePluginPod = corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "sriov-device-plugin-xxxx", - Namespace: vars.Namespace, - Labels: map[string]string{ - "app": "sriov-device-plugin", - }, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - } - _, err = sut.kubeClient.CoreV1().Pods(vars.Namespace).Create(context.Background(), &SriovDevicePluginPod, metav1.CreateOptions{}) - Expect(err).ToNot(HaveOccurred()) + snolog.SetLogLevel(2) + vars.ClusterType = constants.ClusterTypeOpenshift + }) + BeforeEach(func() { + Expect(k8sClient.DeleteAllOf(context.Background(), &sriovnetworkv1.SriovNetworkNodeState{}, client.InNamespace(testNamespace))).ToNot(HaveOccurred()) }) AfterEach(func() { - close(stopCh) - close(syncCh) - close(exitCh) - close(refreshCh) - - cleanFakeFs() + By("Shutdown controller manager") + cancel() + wg.Wait() }) - Context("Should", func() { - It("restart sriov-device-plugin pod", func() { - - _, err := sut.kubeClient.CoreV1().Nodes(). - Create(context.Background(), &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "test-node"}, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - nodeState := &sriovnetworkv1.SriovNetworkNodeState{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - Generation: 123, - Annotations: map[string]string{consts.NodeStateDrainAnnotationCurrent: consts.DrainIdle}, + Context("Config Daemon", func() { + It("Should expose nodeState Status section", func() { + By("Init mock functions") + t := GinkgoT() + mockCtrl := gomock.NewController(t) + hostHelper := mock_helper.NewMockHostHelpersInterface(mockCtrl) + platformHelper := mock_platforms.NewMockInterface(mockCtrl) + hostHelper.EXPECT().DiscoverSriovDevices(hostHelper).Return([]sriovnetworkv1.InterfaceExt{ + { + Name: "eno1", + Driver: "mlx5_core", + PciAddress: "0000:16:00.0", + DeviceID: "1015", + Vendor: "15b3", + EswitchMode: "legacy", + LinkAdminState: "up", + LinkSpeed: "10000 Mb/s", + LinkType: "ETH", + Mac: "aa:bb:cc:dd:ee:ff", + Mtu: 1500, }, - Spec: sriovnetworkv1.SriovNetworkNodeStateSpec{}, - Status: sriovnetworkv1.SriovNetworkNodeStateStatus{ - Interfaces: []sriovnetworkv1.InterfaceExt{ - { - VFs: []sriovnetworkv1.VirtualFunction{ - {}, - }, - DeviceID: "158b", - Driver: "i40e", - Mtu: 1500, - Name: "ens803f0", - PciAddress: "0000:86:00.0", - Vendor: "8086", - NumVfs: 4, - TotalVfs: 64, - }, - }, - }, - } - Expect( - createSriovNetworkNodeState(sut.sriovClient, nodeState)). - To(BeNil()) - - var msg Message - Eventually(refreshCh, "30s").Should(Receive(&msg)) - Expect(msg.syncStatus).To(Equal("InProgress")) - - nodeState.Annotations[consts.NodeStateDrainAnnotationCurrent] = consts.DrainComplete - err = updateSriovNetworkNodeState(sut.sriovClient, nodeState) - Expect(err).ToNot(HaveOccurred()) - - Eventually(refreshCh, "30s").Should(Receive(&msg)) - Expect(msg.syncStatus).To(Equal("InProgress")) - Eventually(refreshCh, "30s").Should(Receive(&msg)) - Expect(msg.syncStatus).To(Equal("Succeeded")) - + }, nil).AnyTimes() + hostHelper.EXPECT().IsKernelLockdownMode().Return(false).AnyTimes() + hostHelper.EXPECT().LoadPfsStatus("0000:16:00.0").Return(nil, false, nil).AnyTimes() + hostHelper.EXPECT().MlxConfigFW(gomock.Any()).Return(nil) + hostHelper.EXPECT().Chroot(gomock.Any()).Return(func() error { return nil }, nil) + hostHelper.EXPECT().ConfigSriovInterfaces(gomock.Any(), gomock.Any(), gomock.Any(), false).Return(nil) + hostHelper.EXPECT().ClearPCIAddressFolder().Return(nil).AnyTimes() + + featureGates := featuregate.New() + featureGates.Init(map[string]bool{}) + dc := CreateDaemon(hostHelper, platformHelper, featureGates, []string{}) + startDaemon(dc) + + _, nodeState := createNode("node1") + // state moves to in progress + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState)). + ToNot(HaveOccurred()) + + g.Expect(nodeState.Status.SyncStatus).To(Equal(constants.SyncStatusInProgress)) + }, waitTime, retryTime).Should(Succeed()) + + // daemon request to reset device plugin + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState)). + ToNot(HaveOccurred()) + + g.Expect(nodeState.Annotations[constants.NodeStateDrainAnnotation]).To(Equal(constants.DevicePluginResetRequired)) + }, waitTime, retryTime).Should(Succeed()) + + // finis drain + patchAnnotation(nodeState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState)). + ToNot(HaveOccurred()) + + g.Expect(nodeState.Annotations[constants.NodeStateDrainAnnotation]).To(Equal(constants.DrainIdle)) + }, waitTime, retryTime).Should(Succeed()) + + // mode current status to idle also (from the operator) + patchAnnotation(nodeState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle) + + // Validate status + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: nodeState.Namespace, Name: nodeState.Name}, nodeState)). + ToNot(HaveOccurred()) + + g.Expect(nodeState.Status.SyncStatus).To(Equal(constants.SyncStatusSucceeded)) + }, waitTime, retryTime).Should(Succeed()) + Expect(nodeState.Status.LastSyncError).To(Equal("")) + Expect(len(nodeState.Status.Interfaces)).To(Equal(1)) }) + }) +}) - It("ignore non latest SriovNetworkNodeState generations", func() { +func patchAnnotation(nodeState *sriovnetworkv1.SriovNetworkNodeState, key, value string) { + originalNodeState := nodeState.DeepCopy() + nodeState.Annotations[key] = value + err := k8sClient.Patch(ctx, nodeState, client.MergeFrom(originalNodeState)) + Expect(err).ToNot(HaveOccurred()) +} - _, err := sut.kubeClient.CoreV1().Nodes().Create(context.Background(), &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - }, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - nodeState1 := &sriovnetworkv1.SriovNetworkNodeState{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - Generation: 123, - Annotations: map[string]string{consts.NodeStateDrainAnnotationCurrent: consts.DrainIdle}, - }, - } - Expect( - createSriovNetworkNodeState(sut.sriovClient, nodeState1)). - To(BeNil()) - - nodeState2 := &sriovnetworkv1.SriovNetworkNodeState{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - Generation: 777, - Annotations: map[string]string{consts.NodeStateDrainAnnotationCurrent: consts.DrainComplete}, - }, - } - Expect( - updateSriovNetworkNodeState(sut.sriovClient, nodeState2)). - To(BeNil()) +func createNode(nodeName string) (*corev1.Node, *sriovnetworkv1.SriovNetworkNodeState) { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Annotations: map[string]string{ + constants.NodeDrainAnnotation: constants.DrainIdle, + "machineconfiguration.openshift.io/desiredConfig": "worker-1", + }, + Labels: map[string]string{ + "test": "", + }, + }, + } + + nodeState := sriovnetworkv1.SriovNetworkNodeState{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Namespace: testNamespace, + Annotations: map[string]string{ + constants.NodeDrainAnnotation: constants.DrainIdle, + constants.NodeStateDrainAnnotationCurrent: constants.DrainIdle, + }, + }, + } - var msg Message - Eventually(refreshCh, "10s").Should(Receive(&msg)) - Expect(msg.syncStatus).To(Equal("InProgress")) + Expect(k8sClient.Create(ctx, &node)).ToNot(HaveOccurred()) + Expect(k8sClient.Create(ctx, &nodeState)).ToNot(HaveOccurred()) - Eventually(refreshCh, "10s").Should(Receive(&msg)) - Expect(msg.syncStatus).To(Equal("Succeeded")) + return &node, &nodeState +} - Expect(sut.desiredNodeState.GetGeneration()).To(BeNumerically("==", 777)) - }) +func CreateDaemon( + hostHelper helper.HostHelpersInterface, + platformHelper platforms.Interface, + featureGates featuregate.FeatureGate, + disablePlugins []string) *daemon.DaemonReconcile { + kClient, err := client.New( + cfg, + client.Options{ + Scheme: scheme.Scheme}) + Expect(err).ToNot(HaveOccurred()) + + By("Setup controller manager") + k8sManager, err = ctrl.NewManager(cfg, ctrl.Options{ + Scheme: scheme.Scheme, }) -}) + Expect(err).ToNot(HaveOccurred()) -func createSriovNetworkNodeState(c snclient.Interface, nodeState *sriovnetworkv1.SriovNetworkNodeState) error { - _, err := c.SriovnetworkV1(). - SriovNetworkNodeStates(vars.Namespace). - Create(context.Background(), nodeState, metav1.CreateOptions{}) - return err -} + configController := daemon.New(kClient, snclient, kubeclient, hostHelper, platformHelper, eventRecorder, featureGates, disablePlugins) + err = configController.SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) -func updateSriovNetworkNodeState(c snclient.Interface, nodeState *sriovnetworkv1.SriovNetworkNodeState) error { - _, err := c.SriovnetworkV1(). - SriovNetworkNodeStates(vars.Namespace). - Update(context.Background(), nodeState, metav1.UpdateOptions{}) - return err + return configController } diff --git a/pkg/daemon/event_recorder.go b/pkg/daemon/event_recorder.go index 25b2d2351b..9fe766eb4c 100644 --- a/pkg/daemon/event_recorder.go +++ b/pkg/daemon/event_recorder.go @@ -5,8 +5,8 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" typedv1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/log" @@ -22,11 +22,11 @@ type EventRecorder struct { } // NewEventRecorder Create a new EventRecorder -func NewEventRecorder(c snclientset.Interface, kubeclient kubernetes.Interface) *EventRecorder { +func NewEventRecorder(c snclientset.Interface, kubeclient kubernetes.Interface, s *runtime.Scheme) *EventRecorder { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartStructuredLogging(4) eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: kubeclient.CoreV1().Events("")}) - eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "config-daemon"}) + eventRecorder := eventBroadcaster.NewRecorder(s, corev1.EventSource{Component: "config-daemon"}) return &EventRecorder{ client: c, eventRecorder: eventRecorder, @@ -35,8 +35,8 @@ func NewEventRecorder(c snclientset.Interface, kubeclient kubernetes.Interface) } // SendEvent Send an Event on the NodeState object -func (e *EventRecorder) SendEvent(eventType string, msg string) { - nodeState, err := e.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{}) +func (e *EventRecorder) SendEvent(ctx context.Context, eventType string, msg string) { + nodeState, err := e.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(ctx, vars.NodeName, metav1.GetOptions{}) if err != nil { log.Log.V(2).Error(err, "SendEvent(): Failed to fetch node state, skip SendEvent", "name", vars.NodeName) return diff --git a/pkg/daemon/status.go b/pkg/daemon/status.go new file mode 100644 index 0000000000..6234841d11 --- /dev/null +++ b/pkg/daemon/status.go @@ -0,0 +1,89 @@ +package daemon + +import ( + "context" + "fmt" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" +) + +const ( + Unknown = "Unknown" +) + +func (dn *DaemonReconcile) updateSyncState(ctx context.Context, nodeState *sriovnetworkv1.SriovNetworkNodeState, status, failedMessage string) error { + funcLog := log.FromContext(ctx).WithName("updateSyncState") + oldStatus := nodeState.Status.SyncStatus + copyState := nodeState.DeepCopy() + // clear status for patch + copyState.Status = sriovnetworkv1.SriovNetworkNodeStateStatus{} + + funcLog.V(2).Info("update nodeState status", + "CurrentSyncStatus", nodeState.Status.SyncStatus, + "CurrentLastSyncError", nodeState.Status.LastSyncError, + "NewSyncStatus", status, + "NewFailedMessage", failedMessage) + nodeState.Status.SyncStatus = status + nodeState.Status.LastSyncError = failedMessage + + err := dn.client.Status().Patch(ctx, nodeState, client.MergeFrom(copyState)) + if err != nil { + funcLog.Error(err, "failed to update node state status", + "SyncStatus", status, + "LastSyncError", failedMessage) + } + + dn.recordStatusChangeEvent(ctx, oldStatus, nodeState.Status.SyncStatus, failedMessage) + return err +} + +func (dn *DaemonReconcile) getHostNetworkStatus(nodeState *sriovnetworkv1.SriovNetworkNodeState) error { + log.Log.WithName("GetHostNetworkStatus").Info("Getting host network status") + var iface []sriovnetworkv1.InterfaceExt + var bridges sriovnetworkv1.Bridges + var err error + + if vars.PlatformType == consts.VirtualOpenStack { + iface, err = dn.platformHelpers.DiscoverSriovDevicesVirtual() + if err != nil { + return err + } + } else { + iface, err = dn.HostHelpers.DiscoverSriovDevices(dn.HostHelpers) + if err != nil { + return err + } + if vars.ManageSoftwareBridges { + bridges, err = dn.HostHelpers.DiscoverBridges() + if err != nil { + return err + } + } + } + + nodeState.Status.Interfaces = iface + nodeState.Status.Bridges = bridges + + return nil +} + +func (dn *DaemonReconcile) recordStatusChangeEvent(ctx context.Context, oldStatus, newStatus, lastError string) { + if oldStatus != newStatus { + if oldStatus == "" { + oldStatus = Unknown + } + if newStatus == "" { + newStatus = Unknown + } + eventMsg := fmt.Sprintf("Status changed from: %s to: %s", oldStatus, newStatus) + if lastError != "" { + eventMsg = fmt.Sprintf("%s. Last Error: %s", eventMsg, lastError) + } + dn.eventRecorder.SendEvent(ctx, "SyncStatusChanged", eventMsg) + } +} diff --git a/pkg/daemon/writer.go b/pkg/daemon/writer.go deleted file mode 100644 index 09d06d8f99..0000000000 --- a/pkg/daemon/writer.go +++ /dev/null @@ -1,282 +0,0 @@ -package daemon - -import ( - "context" - "encoding/json" - "fmt" - "os" - "path/filepath" - "time" - - "github.com/pkg/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/util/retry" - "sigs.k8s.io/controller-runtime/pkg/log" - - sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" - snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/helper" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" -) - -const ( - CheckpointFileName = "sno-initial-node-state.json" - Unknown = "Unknown" -) - -type NodeStateStatusWriter struct { - client snclientset.Interface - status sriovnetworkv1.SriovNetworkNodeStateStatus - OnHeartbeatFailure func() - platformHelper platforms.Interface - hostHelper helper.HostHelpersInterface - eventRecorder *EventRecorder -} - -// NewNodeStateStatusWriter Create a new NodeStateStatusWriter -func NewNodeStateStatusWriter(c snclientset.Interface, - f func(), er *EventRecorder, - hostHelper helper.HostHelpersInterface, - platformHelper platforms.Interface) *NodeStateStatusWriter { - return &NodeStateStatusWriter{ - client: c, - OnHeartbeatFailure: f, - eventRecorder: er, - hostHelper: hostHelper, - platformHelper: platformHelper, - } -} - -// RunOnce initial the interface status for both baremetal and virtual environments -func (w *NodeStateStatusWriter) RunOnce() error { - log.Log.V(0).Info("RunOnce()") - msg := Message{} - - if vars.PlatformType == consts.VirtualOpenStack { - ns, err := w.getCheckPointNodeState() - if err != nil { - return err - } - - if ns == nil { - err = w.platformHelper.CreateOpenstackDevicesInfo() - if err != nil { - return err - } - } else { - w.platformHelper.CreateOpenstackDevicesInfoFromNodeStatus(ns) - } - } - - log.Log.V(0).Info("RunOnce(): first poll for nic status") - if err := w.pollNicStatus(); err != nil { - log.Log.Error(err, "RunOnce(): first poll failed") - } - - ns, err := w.setNodeStateStatus(msg) - if err != nil { - log.Log.Error(err, "RunOnce(): first writing to node status failed") - } - return w.writeCheckpointFile(ns) -} - -// Run reads from the writer channel and sets the interface status. It will -// return if the stop channel is closed. Intended to be run via a goroutine. -func (w *NodeStateStatusWriter) Run(stop <-chan struct{}, refresh <-chan Message, syncCh chan<- struct{}) error { - log.Log.V(0).Info("Run(): start writer") - msg := Message{} - - for { - select { - case <-stop: - log.Log.V(0).Info("Run(): stop writer") - return nil - case msg = <-refresh: - log.Log.V(0).Info("Run(): refresh trigger") - if err := w.pollNicStatus(); err != nil { - continue - } - _, err := w.setNodeStateStatus(msg) - if err != nil { - log.Log.Error(err, "Run() refresh: writing to node status failed") - } - syncCh <- struct{}{} - case <-time.After(30 * time.Second): - log.Log.V(2).Info("Run(): period refresh") - if err := w.pollNicStatus(); err != nil { - continue - } - w.setNodeStateStatus(msg) - } - } -} - -func (w *NodeStateStatusWriter) pollNicStatus() error { - log.Log.V(2).Info("pollNicStatus()") - var iface []sriovnetworkv1.InterfaceExt - var bridges sriovnetworkv1.Bridges - var err error - - if vars.PlatformType == consts.VirtualOpenStack { - iface, err = w.platformHelper.DiscoverSriovDevicesVirtual() - if err != nil { - return err - } - } else { - iface, err = w.hostHelper.DiscoverSriovDevices(w.hostHelper) - if err != nil { - return err - } - if vars.ManageSoftwareBridges { - bridges, err = w.hostHelper.DiscoverBridges() - if err != nil { - return err - } - } - } - - w.status.Interfaces = iface - w.status.Bridges = bridges - - return nil -} - -func (w *NodeStateStatusWriter) updateNodeStateStatusRetry(f func(*sriovnetworkv1.SriovNetworkNodeState)) (*sriovnetworkv1.SriovNetworkNodeState, error) { - var nodeState *sriovnetworkv1.SriovNetworkNodeState - var oldStatus, newStatus, lastError string - - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - n, getErr := w.getNodeState() - if getErr != nil { - return getErr - } - oldStatus = n.Status.SyncStatus - - // Call the status modifier. - f(n) - - newStatus = n.Status.SyncStatus - lastError = n.Status.LastSyncError - - var err error - nodeState, err = w.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).UpdateStatus(context.Background(), n, metav1.UpdateOptions{}) - if err != nil { - log.Log.V(0).Error(err, "updateNodeStateStatusRetry(): fail to update the node status") - } - return err - }) - if err != nil { - // may be conflict if max retries were hit - return nil, fmt.Errorf("unable to update node %v: %v", nodeState, err) - } - - w.recordStatusChangeEvent(oldStatus, newStatus, lastError) - - return nodeState, nil -} - -func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1.SriovNetworkNodeState, error) { - nodeState, err := w.updateNodeStateStatusRetry(func(nodeState *sriovnetworkv1.SriovNetworkNodeState) { - nodeState.Status.Interfaces = w.status.Interfaces - nodeState.Status.Bridges = w.status.Bridges - if msg.lastSyncError != "" || msg.syncStatus == consts.SyncStatusSucceeded { - // clear lastSyncError when sync Succeeded - nodeState.Status.LastSyncError = msg.lastSyncError - } - nodeState.Status.SyncStatus = msg.syncStatus - - log.Log.V(0).Info("setNodeStateStatus(): status", - "sync-status", nodeState.Status.SyncStatus, - "last-sync-error", nodeState.Status.LastSyncError) - }) - if err != nil { - return nil, err - } - return nodeState, nil -} - -// recordStatusChangeEvent sends event in case oldStatus differs from newStatus -func (w *NodeStateStatusWriter) recordStatusChangeEvent(oldStatus, newStatus, lastError string) { - if oldStatus != newStatus { - if oldStatus == "" { - oldStatus = Unknown - } - if newStatus == "" { - newStatus = Unknown - } - eventMsg := fmt.Sprintf("Status changed from: %s to: %s", oldStatus, newStatus) - if lastError != "" { - eventMsg = fmt.Sprintf("%s. Last Error: %s", eventMsg, lastError) - } - w.eventRecorder.SendEvent("SyncStatusChanged", eventMsg) - } -} - -// getNodeState queries the kube apiserver to get the SriovNetworkNodeState CR -func (w *NodeStateStatusWriter) getNodeState() (*sriovnetworkv1.SriovNetworkNodeState, error) { - var lastErr error - var n *sriovnetworkv1.SriovNetworkNodeState - err := wait.PollImmediate(10*time.Second, 5*time.Minute, func() (bool, error) { - n, lastErr = w.client.SriovnetworkV1().SriovNetworkNodeStates(vars.Namespace).Get(context.Background(), vars.NodeName, metav1.GetOptions{}) - if lastErr == nil { - return true, nil - } - log.Log.Error(lastErr, "getNodeState(): Failed to fetch node state, close all connections and retry...", "name", vars.NodeName) - // Use the Get() also as an client-go keepalive indicator for the TCP connection. - w.OnHeartbeatFailure() - return false, nil - }) - if err != nil { - if err == wait.ErrWaitTimeout { - return nil, errors.Wrapf(lastErr, "Timed out trying to fetch node %s", vars.NodeName) - } - return nil, err - } - return n, nil -} - -func (w *NodeStateStatusWriter) writeCheckpointFile(ns *sriovnetworkv1.SriovNetworkNodeState) error { - configdir := filepath.Join(vars.Destdir, CheckpointFileName) - file, err := os.OpenFile(configdir, os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - return err - } - defer file.Close() - log.Log.Info("writeCheckpointFile(): try to decode the checkpoint file") - if err = json.NewDecoder(file).Decode(&sriovnetworkv1.InitialState); err != nil { - log.Log.V(2).Error(err, "writeCheckpointFile(): fail to decode, writing new file instead") - log.Log.Info("writeCheckpointFile(): write checkpoint file") - if err = file.Truncate(0); err != nil { - return err - } - if _, err = file.Seek(0, 0); err != nil { - return err - } - if err = json.NewEncoder(file).Encode(*ns); err != nil { - return err - } - sriovnetworkv1.InitialState = *ns - } - return nil -} - -func (w *NodeStateStatusWriter) getCheckPointNodeState() (*sriovnetworkv1.SriovNetworkNodeState, error) { - log.Log.Info("getCheckPointNodeState()") - configdir := filepath.Join(vars.Destdir, CheckpointFileName) - file, err := os.OpenFile(configdir, os.O_RDONLY, 0644) - if err != nil { - if os.IsNotExist(err) { - return nil, nil - } - return nil, err - } - defer file.Close() - if err = json.NewDecoder(file).Decode(&sriovnetworkv1.InitialState); err != nil { - return nil, err - } - - return &sriovnetworkv1.InitialState, nil -} diff --git a/pkg/host/internal/network/network.go b/pkg/host/internal/network/network.go index 2eb40dd690..c73d3dc99e 100644 --- a/pkg/host/internal/network/network.go +++ b/pkg/host/internal/network/network.go @@ -75,7 +75,7 @@ func (n *network) TryToGetVirtualInterfaceName(pciAddr string) string { func (n *network) TryGetInterfaceName(pciAddr string) string { names, err := n.dputilsLib.GetNetNames(pciAddr) if err != nil || len(names) < 1 { - log.Log.Error(err, "TryGetInterfaceName(): failed to get interface name") + log.Log.V(2).Info("TryGetInterfaceName(): failed to get interface name", "err", err) return "" } netDevName := names[0] diff --git a/pkg/log/log.go b/pkg/log/log.go index 1e76facc57..5ecc16893b 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -60,6 +60,10 @@ func SetLogLevel(operatorLevel int) { } } +func GetLogLevel() int { + return zapToOperatorLevel(Options.Level.(zzap.AtomicLevel).Level()) +} + func zapToOperatorLevel(zapLevel zapcore.Level) int { return int(zapLevel) * -1 } diff --git a/pkg/plugins/mellanox/mellanox_plugin.go b/pkg/plugins/mellanox/mellanox_plugin.go index 10b0152bb0..c89e163db1 100644 --- a/pkg/plugins/mellanox/mellanox_plugin.go +++ b/pkg/plugins/mellanox/mellanox_plugin.go @@ -212,7 +212,7 @@ func (p *MellanoxPlugin) Apply() error { if err := p.helpers.MlxConfigFW(attributesToChange); err != nil { return err } - if vars.MlxPluginFwReset { + if vars.FeatureGate.IsEnabled(consts.MellanoxFirmwareResetFeatureGate) { return p.helpers.MlxResetFW(pciAddressesToReset) } return nil diff --git a/pkg/vars/vars.go b/pkg/vars/vars.go index fc7108ed80..44106575c4 100644 --- a/pkg/vars/vars.go +++ b/pkg/vars/vars.go @@ -8,6 +8,7 @@ import ( "k8s.io/client-go/rest" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate" ) var ( @@ -54,9 +55,6 @@ var ( // ManageSoftwareBridges global variable which reflects state of manageSoftwareBridges feature ManageSoftwareBridges = false - // MlxPluginFwReset global variable enables mstfwreset before rebooting a node on VF changes - MlxPluginFwReset = false - // FilesystemRoot used by test to mock interactions with filesystem FilesystemRoot = "" @@ -75,6 +73,12 @@ var ( // DisableablePlugins contains which plugins can be disabled in sriov config daemon DisableablePlugins = map[string]struct{}{"mellanox": {}} + + // DisableDrain controls if the daemon will drain the node before configuration + DisableDrain = false + + // FeatureGates interface to interact with feature gates + FeatureGate featuregate.FeatureGate ) func init() { @@ -95,4 +99,6 @@ func init() { } ResourcePrefix = os.Getenv("RESOURCE_PREFIX") + + FeatureGate = featuregate.New() } diff --git a/test/conformance/tests/test_sriov_operator.go b/test/conformance/tests/test_sriov_operator.go index 430770ab60..84841b0bce 100644 --- a/test/conformance/tests/test_sriov_operator.go +++ b/test/conformance/tests/test_sriov_operator.go @@ -1622,7 +1622,7 @@ var _ = Describe("[sriov] operator", func() { intf = getInterfaceFromNodeStateByPciAddress(node, intf.PciAddress) }) - FIt("should reconcile managed VF if status is changed", func() { + It("should reconcile managed VF if status is changed", func() { originalMtu := intf.Mtu lowerMtu := originalMtu - 500