From c9f864df795253a9cd77300d1a0bfff8744aa30f Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Tue, 5 Dec 2023 14:02:00 +0100 Subject: [PATCH] sidecar: use leader election when CONTROLLER_SERVICE is supported When a CSI-driver provides the CONTROLLER_SERVICE capability, the sidecar will try to become the leader by obtaining a Lease based on the name of the CSI-driver. Signed-off-by: Niels de Vos --- sidecar/internal/client/client.go | 29 +++ .../csiaddonsnode/csiaddonsnode_test.go | 8 + sidecar/main.go | 52 +++- .../leaderelection/leader_election.go | 240 ++++++++++++++++++ vendor/modules.txt | 1 + 5 files changed, 327 insertions(+), 3 deletions(-) create mode 100644 vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go diff --git a/sidecar/internal/client/client.go b/sidecar/internal/client/client.go index c00b04e78..d74553d27 100644 --- a/sidecar/internal/client/client.go +++ b/sidecar/internal/client/client.go @@ -39,6 +39,7 @@ type Client interface { GetGRPCClient() *grpc.ClientConn Probe() error GetDriverName() (string, error) + HasControllerService() (bool, error) } // clientImpl holds the GRPC connenction details @@ -123,6 +124,34 @@ func (c *clientImpl) probeOnce() (bool, error) { return r.GetValue(), nil } +// HasControllerService gets the driver name from the driver +func (c *clientImpl) HasControllerService() (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.Timeout) + defer cancel() + + identityClient := identity.NewIdentityClient(c.client) + + req := identity.GetCapabilitiesRequest{} + rsp, err := identityClient.GetCapabilities(ctx, &req) + if err != nil { + return false, err + } + + caps := rsp.GetCapabilities() + if len(caps) == 0 { + return false, errors.New("driver does not have any capabilities") + } + + for _, c := range caps { + svc := c.GetService() + if svc != nil && svc.GetType() == identity.Capability_Service_CONTROLLER_SERVICE { + return true, nil + } + } + + return false, nil +} + // GetDriverName gets the driver name from the driver func (c *clientImpl) GetDriverName() (string, error) { ctx, cancel := context.WithTimeout(context.Background(), c.Timeout) diff --git a/sidecar/internal/csiaddonsnode/csiaddonsnode_test.go b/sidecar/internal/csiaddonsnode/csiaddonsnode_test.go index 2d6db4ad1..4bbf96383 100644 --- a/sidecar/internal/csiaddonsnode/csiaddonsnode_test.go +++ b/sidecar/internal/csiaddonsnode/csiaddonsnode_test.go @@ -32,6 +32,10 @@ import ( type mockClient struct { // Driver contains the drivername obtained with GetDriverName() driver string + + // isController is set to true when the CSI-plugin supports the + // CONTROLLER_SERVICE + isController bool } func NewMockClient(driver string) client.Client { @@ -50,6 +54,10 @@ func (mc *mockClient) GetDriverName() (string, error) { return mc.driver, nil } +func (mc *mockClient) HasControllerService() (bool, error) { + return mc.isController, nil +} + func Test_getCSIAddonsNode(t *testing.T) { var ( podName = "pod" diff --git a/sidecar/main.go b/sidecar/main.go index 53ca2947f..9c2cf1af6 100644 --- a/sidecar/main.go +++ b/sidecar/main.go @@ -17,16 +17,19 @@ limitations under the License. package main import ( + "context" "flag" "time" "github.com/csi-addons/kubernetes-csi-addons/internal/sidecar/service" + "github.com/csi-addons/kubernetes-csi-addons/internal/util" "github.com/csi-addons/kubernetes-csi-addons/internal/version" "github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/client" "github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/csiaddonsnode" "github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/server" - "github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/util" + sideutil "github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/util" + "github.com/kubernetes-csi/csi-lib-utils/leaderelection" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -48,6 +51,11 @@ func main() { podNamespace = flag.String("namespace", "", "namespace of the Pod that contains this sidecar") podUID = flag.String("pod-uid", "", "UID of the Pod that contains this sidecar") showVersion = flag.Bool("version", false, "Print Version details") + + leaderElectionNamespace = flag.String("leader-election-namespace", "", "The namespace where the leader election resource exists. Defaults to the pod namespace if not set.") + leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") + leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") + leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") ) klog.InitFlags(nil) @@ -62,7 +70,7 @@ func main() { return } - controllerEndpoint, err := util.BuildEndpointURL(*controllerIP, *controllerPort, *podName, *podNamespace) + controllerEndpoint, err := sideutil.BuildEndpointURL(*controllerIP, *controllerPort, *podName, *podNamespace) if err != nil { klog.Fatalf("Failed to validate controller endpoint: %v", err) } @@ -107,5 +115,43 @@ func main() { sidecarServer.RegisterService(service.NewNetworkFenceServer(csiClient.GetGRPCClient(), kubeClient)) sidecarServer.RegisterService(service.NewReplicationServer(csiClient.GetGRPCClient(), kubeClient)) - sidecarServer.Start() + isController, err := csiClient.HasControllerService() + if err != nil { + klog.Fatalf("Failed to check if the CSI-plugin supports CONTROLLER_SERVICE: %v", err) + } + + // do not use leaderelection when the CSI-plugin does not have + // CONTROLLER_SERVICE + if !isController { + klog.Info("The CSI-plugin does not have the CSI-Addons CONTROLLER_SERVICE capability, not running leader election") + sidecarServer.Start() + } else { + // start the server in a go-routine so that the controller can + // connect to it, even if this service is not the leaser + go sidecarServer.Start() + + driver, err := csiClient.GetDriverName() + if err != nil { + klog.Fatalf("Failed to get the drivername from the CSI-plugin: %v", err) + } + + leaseName := util.NormalizeLeaseName(driver) + "-csi-addons" + le := leaderelection.NewLeaderElection(kubeClient, leaseName, func(context.Context) { + klog.Info("Yay! I became the leader. Send me the CONTROL_SERVICE requests.") + }) + + if *podName != "" { + le.WithIdentity(*podName) + } + + if *leaderElectionNamespace != "" { + le.WithNamespace(*leaderElectionNamespace) + } + + le.WithLeaseDuration(*leaderElectionLeaseDuration) + le.WithRenewDeadline(*leaderElectionRenewDeadline) + le.WithRetryPeriod(*leaderElectionRetryPeriod) + + le.Run() + } } diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go new file mode 100644 index 000000000..8f5b1b99a --- /dev/null +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/leaderelection/leader_election.go @@ -0,0 +1,240 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "context" + "fmt" + "net/http" + "os" + "regexp" + "strings" + "time" + + "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" +) + +const ( + defaultLeaseDuration = 15 * time.Second + defaultRenewDeadline = 10 * time.Second + defaultRetryPeriod = 5 * time.Second + + DefaultHealthCheckTimeout = 20 * time.Second + + // HealthCheckerAddress is the address at which the leader election health + // checker reports status. + // The caller sidecar should document this address in appropriate flag + // descriptions. + HealthCheckerAddress = "/healthz/leader-election" +) + +// leaderElection is a convenience wrapper around client-go's leader election library. +type leaderElection struct { + runFunc func(ctx context.Context) + + // the lockName identifies the leader election config and should be shared across all members + lockName string + // the identity is the unique identity of the currently running member + identity string + // the namespace to store the lock resource + namespace string + // resourceLock defines the type of leaderelection that should be used + // Only resourcelock.LeasesResourceLock is valid at the moment. + resourceLock string + // healthCheck reports unhealthy if leader election fails to renew leadership + // within a timeout period. + healthCheck *leaderelection.HealthzAdaptor + + leaseDuration time.Duration + renewDeadline time.Duration + retryPeriod time.Duration + + ctx context.Context + + clientset kubernetes.Interface +} + +// NewLeaderElection returns the default & preferred leader election type +func NewLeaderElection(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection { + return NewLeaderElectionWithLeases(clientset, lockName, runFunc) +} + +// NewLeaderElectionWithLeases returns an implementation of leader election using Leases +func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection { + return &leaderElection{ + runFunc: runFunc, + lockName: lockName, + resourceLock: resourcelock.LeasesResourceLock, + leaseDuration: defaultLeaseDuration, + renewDeadline: defaultRenewDeadline, + retryPeriod: defaultRetryPeriod, + clientset: clientset, + } +} + +func (l *leaderElection) WithIdentity(identity string) { + l.identity = identity +} + +func (l *leaderElection) WithNamespace(namespace string) { + l.namespace = namespace +} + +func (l *leaderElection) WithLeaseDuration(leaseDuration time.Duration) { + l.leaseDuration = leaseDuration +} + +func (l *leaderElection) WithRenewDeadline(renewDeadline time.Duration) { + l.renewDeadline = renewDeadline +} + +func (l *leaderElection) WithRetryPeriod(retryPeriod time.Duration) { + l.retryPeriod = retryPeriod +} + +// WithContext Add context +func (l *leaderElection) WithContext(ctx context.Context) { + l.ctx = ctx +} + +// Server represents any type that could serve HTTP requests for the leader +// election health check endpoint. +type Server interface { + Handle(pattern string, handler http.Handler) +} + +// PrepareHealthCheck creates a health check for this leader election object +// with the given healthCheckTimeout and registers its HTTP handler to the given +// server at the path specified by the constant "healthCheckerAddress". +// healthCheckTimeout determines the max duration beyond lease expiration +// allowed before reporting unhealthy. +// The caller sidecar should document the handler address in appropriate flag +// descriptions. +func (l *leaderElection) PrepareHealthCheck( + s Server, + healthCheckTimeout time.Duration) { + + l.healthCheck = leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout) + s.Handle(HealthCheckerAddress, adaptCheckToHandler(l.healthCheck.Check)) +} + +func (l *leaderElection) Run() error { + if l.identity == "" { + id, err := defaultLeaderElectionIdentity() + if err != nil { + return fmt.Errorf("error getting the default leader identity: %v", err) + } + + l.identity = id + } + + if l.namespace == "" { + l.namespace = inClusterNamespace() + } + + broadcaster := record.NewBroadcaster() + broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)}) + eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))}) + + rlConfig := resourcelock.ResourceLockConfig{ + Identity: sanitizeName(l.identity), + EventRecorder: eventRecorder, + } + + lock, err := resourcelock.New(l.resourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), l.clientset.CoordinationV1(), rlConfig) + if err != nil { + return err + } + + leaderConfig := leaderelection.LeaderElectionConfig{ + Lock: lock, + LeaseDuration: l.leaseDuration, + RenewDeadline: l.renewDeadline, + RetryPeriod: l.retryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.V(2).Info("became leader, starting") + l.runFunc(ctx) + }, + OnStoppedLeading: func() { + klog.Fatal("stopped leading") + }, + OnNewLeader: func(identity string) { + klog.V(3).Infof("new leader detected, current leader: %s", identity) + }, + }, + WatchDog: l.healthCheck, + } + + ctx := l.ctx + if ctx == nil { + ctx = context.Background() + } + leaderelection.RunOrDie(ctx, leaderConfig) + return nil // should never reach here +} + +func defaultLeaderElectionIdentity() (string, error) { + return os.Hostname() +} + +// sanitizeName sanitizes the provided string so it can be consumed by leader election library +func sanitizeName(name string) string { + re := regexp.MustCompile("[^a-zA-Z0-9-]") + name = re.ReplaceAllString(name, "-") + if name[len(name)-1] == '-' { + // name must not end with '-' + name = name + "X" + } + return name +} + +// inClusterNamespace returns the namespace in which the pod is running in by checking +// the env var POD_NAMESPACE, then the file /var/run/secrets/kubernetes.io/serviceaccount/namespace. +// if neither returns a valid namespace, the "default" namespace is returned +func inClusterNamespace() string { + if ns := os.Getenv("POD_NAMESPACE"); ns != "" { + return ns + } + + if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + if ns := strings.TrimSpace(string(data)); len(ns) > 0 { + return ns + } + } + + return "default" +} + +// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks. +func adaptCheckToHandler(c func(r *http.Request) error) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := c(r) + if err != nil { + http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError) + } else { + fmt.Fprint(w, "ok") + } + }) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b78ae6c51..95d1680c0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -106,6 +106,7 @@ github.com/json-iterator/go ## explicit; go 1.18 github.com/kubernetes-csi/csi-lib-utils/accessmodes github.com/kubernetes-csi/csi-lib-utils/connection +github.com/kubernetes-csi/csi-lib-utils/leaderelection github.com/kubernetes-csi/csi-lib-utils/metrics github.com/kubernetes-csi/csi-lib-utils/protosanitizer # github.com/mailru/easyjson v0.7.7