From 9ebdebdedb90956a7ba039863286eaadb1ddf2ef Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Thu, 12 Oct 2023 09:25:38 +0200 Subject: [PATCH] controller: send NetworkFence requests to the leading CSI-Addons sidecar NetworkFence operations should only be sent to a CSI-Addons sidecar that has the CONTROLLER_SERVICE capability. There should be a single leader for the CSI-Addons sidecars that support that, and the leader can be identified by the Lease object for the CSI-drivername. Signed-off-by: Niels de Vos --- .../csiaddons/csiaddonsnode_controller.go | 2 +- .../csiaddons/networkfence_controller.go | 54 ++++++++++++------- internal/connection/connection.go | 15 +++++- internal/connection/connection_pool.go | 17 +++++- 4 files changed, 66 insertions(+), 22 deletions(-) diff --git a/controllers/csiaddons/csiaddonsnode_controller.go b/controllers/csiaddons/csiaddonsnode_controller.go index 4a59356b6..472d2d319 100644 --- a/controllers/csiaddons/csiaddonsnode_controller.go +++ b/controllers/csiaddons/csiaddonsnode_controller.go @@ -118,7 +118,7 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques } logger.Info("Connecting to sidecar") - newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName) + newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Name) if err != nil { logger.Error(err, "Failed to establish connection with sidecar") diff --git a/controllers/csiaddons/networkfence_controller.go b/controllers/csiaddons/networkfence_controller.go index d859e7567..185dd3c39 100644 --- a/controllers/csiaddons/networkfence_controller.go +++ b/controllers/csiaddons/networkfence_controller.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-logr/logr" + coordination "k8s.io/api/coordination/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -109,7 +110,7 @@ func (r *NetworkFenceReconciler) Reconcile(ctx context.Context, req ctrl.Request logger = logger.WithValues("DriverName", nwFence.Spec.Driver, "CIDRs", nwFence.Spec.Cidrs) - client, err := r.getNetworkFenceClient(nwFence.Spec.Driver, "") + client, err := r.getNetworkFenceClient(ctx, nwFence.Spec.Driver) if err != nil { logger.Error(err, "Failed to get NetworkFenceClient") return ctrl.Result{}, err @@ -298,26 +299,41 @@ func (nf *NetworkFenceInstance) removeFinalizerFromNetworkFence(ctx context.Cont return nil } -// getNetworkFenceClient returns a NetworkFenceClient for the given driver. -func (r *NetworkFenceReconciler) getNetworkFenceClient(drivername, nodeID string) (proto.NetworkFenceClient, error) { - conns := r.Connpool.GetByNodeID(drivername, nodeID) - - // Iterate through the connections and find the one that matches the driver name - // provided in the NetworkFence spec; so that corresponding network fence and - // unfence operations can be performed. - for _, v := range conns { - for _, cap := range v.Capabilities { - // validate if NETWORK_FENCE capability is supported by the driver. - if cap.GetNetworkFence() == nil { - continue - } +// getNetworkFenceClient returns a NetworkFenceClient that is the leader for +// the given driver. +// The NetworkFenceClient should only run on a CONTROLLER_SERVICE capable +// CSI-Addons plugin, there can only be one plugin that holds the lease. +func (r *NetworkFenceReconciler) getNetworkFenceClient(ctx context.Context, drivername string) (proto.NetworkFenceClient, error) { + // get the Lease for the driver + leaseName := util.NormalizeLeaseName(drivername) + "-csi-addons" + var lease coordination.Lease + err := r.Client.Get(ctx, client.ObjectKey{Name: leaseName}, &lease) + if err != nil { + return nil, fmt.Errorf("no leader found for driver %q: %w", drivername, err) + } else if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity == "" { + return nil, fmt.Errorf("lease %q for driver %q does not have a leader", leaseName, drivername) + } - // validate of NETWORK_FENCE capability is enabled by the storage driver. - if cap.GetNetworkFence().GetType() == identity.Capability_NetworkFence_NETWORK_FENCE { - return proto.NewNetworkFenceClient(v.Client), nil - } + // get the connection for the leader + connID := lease.Namespace + "/" + *lease.Spec.HolderIdentity + conn := r.Connpool.Get(connID) + if conn == nil { + return nil, fmt.Errorf("no connection with key %q found for driver %q: %w", connID, drivername, err) + } + + // verify that the CSI-Addons plugin holding the lease supports + // NetworkFence, it probably is a bug if it doesn't + for _, capability := range conn.Capabilities { + // validate if NETWORK_FENCE capability is supported by the driver. + if capability.GetNetworkFence() == nil { + continue + } + + // validate of NETWORK_FENCE capability is enabled by the storage driver. + if capability.GetNetworkFence().GetType() == identity.Capability_NetworkFence_NETWORK_FENCE { + return proto.NewNetworkFenceClient(conn.Client), nil } } - return nil, fmt.Errorf("no connections for driver: %s", drivername) + return nil, fmt.Errorf("leading CSIAddonsNode %q for driver %q does not support NetworkFence", conn.Name, drivername) } diff --git a/internal/connection/connection.go b/internal/connection/connection.go index d439ce9ec..7eb8c8319 100644 --- a/internal/connection/connection.go +++ b/internal/connection/connection.go @@ -30,6 +30,7 @@ import ( type Connection struct { Client *grpc.ClientConn Capabilities []*identity.Capability + Name string NodeID string DriverName string Timeout time.Duration @@ -37,7 +38,7 @@ type Connection struct { // NewConnection establishes connection with sidecar, fetches capability and returns Connection object // filled with required information. -func NewConnection(ctx context.Context, endpoint, nodeID, driverName string) (*Connection, error) { +func NewConnection(ctx context.Context, endpoint, nodeID, driverName, podName string) (*Connection, error) { opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithIdleTimeout(time.Duration(0)), @@ -49,6 +50,7 @@ func NewConnection(ctx context.Context, endpoint, nodeID, driverName string) (*C conn := &Connection{ Client: cc, + Name: podName, NodeID: nodeID, DriverName: driverName, Timeout: time.Minute, @@ -83,3 +85,14 @@ func (c *Connection) fetchCapabilities(ctx context.Context) error { return nil } + +func (c *Connection) HasControllerService() bool { + for _, capability := range c.Capabilities { + svc := capability.GetService() + if svc != nil && svc.GetType() == identity.Capability_Service_CONTROLLER_SERVICE { + return true + } + } + + return false +} diff --git a/internal/connection/connection_pool.go b/internal/connection/connection_pool.go index 3265ad47f..e910f5e55 100644 --- a/internal/connection/connection_pool.go +++ b/internal/connection/connection_pool.go @@ -16,7 +16,9 @@ limitations under the License. package connection -import "sync" +import ( + "sync" +) // ConnectionPool consists of map of Connection objects and // methods Put, Get & Delete which operates with required rw locks @@ -49,6 +51,19 @@ func (cp *ConnectionPool) Put(key string, conn *Connection) { cp.pool[key] = conn } +// Get returns the connection object corresponding to given key. +func (cp *ConnectionPool) Get(key string) *Connection { + cp.rwlock.Lock() + defer cp.rwlock.Unlock() + + conn, ok := cp.pool[key] + if !ok { + return nil + } + + return conn +} + // Delete deletes connection object corresponding to given key. func (cp *ConnectionPool) Delete(key string) { cp.rwlock.Lock()