Skip to content

Commit

Permalink
controller: send NetworkFence requests to the leading CSI-Addons sidecar
Browse files Browse the repository at this point in the history
NetworkFence operations should only be sent to a CSI-Addons sidecar that
has the CONTROLLER_SERVICE capability. There should be a single leader
for the CSI-Addons sidecars that support that, and the leader can be
identified by the Lease object for the CSI-drivername.

Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic committed Dec 7, 2023
1 parent c9f864d commit 9ebdebd
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 22 deletions.
2 changes: 1 addition & 1 deletion controllers/csiaddons/csiaddonsnode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

logger.Info("Connecting to sidecar")
newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName)
newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Name)
if err != nil {
logger.Error(err, "Failed to establish connection with sidecar")

Expand Down
54 changes: 35 additions & 19 deletions controllers/csiaddons/networkfence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/go-logr/logr"
coordination "k8s.io/api/coordination/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (r *NetworkFenceReconciler) Reconcile(ctx context.Context, req ctrl.Request

logger = logger.WithValues("DriverName", nwFence.Spec.Driver, "CIDRs", nwFence.Spec.Cidrs)

client, err := r.getNetworkFenceClient(nwFence.Spec.Driver, "")
client, err := r.getNetworkFenceClient(ctx, nwFence.Spec.Driver)
if err != nil {
logger.Error(err, "Failed to get NetworkFenceClient")
return ctrl.Result{}, err
Expand Down Expand Up @@ -298,26 +299,41 @@ func (nf *NetworkFenceInstance) removeFinalizerFromNetworkFence(ctx context.Cont
return nil
}

// getNetworkFenceClient returns a NetworkFenceClient for the given driver.
func (r *NetworkFenceReconciler) getNetworkFenceClient(drivername, nodeID string) (proto.NetworkFenceClient, error) {
conns := r.Connpool.GetByNodeID(drivername, nodeID)

// Iterate through the connections and find the one that matches the driver name
// provided in the NetworkFence spec; so that corresponding network fence and
// unfence operations can be performed.
for _, v := range conns {
for _, cap := range v.Capabilities {
// validate if NETWORK_FENCE capability is supported by the driver.
if cap.GetNetworkFence() == nil {
continue
}
// getNetworkFenceClient returns a NetworkFenceClient that is the leader for
// the given driver.
// The NetworkFenceClient should only run on a CONTROLLER_SERVICE capable
// CSI-Addons plugin, there can only be one plugin that holds the lease.
func (r *NetworkFenceReconciler) getNetworkFenceClient(ctx context.Context, drivername string) (proto.NetworkFenceClient, error) {
// get the Lease for the driver
leaseName := util.NormalizeLeaseName(drivername) + "-csi-addons"
var lease coordination.Lease
err := r.Client.Get(ctx, client.ObjectKey{Name: leaseName}, &lease)
if err != nil {
return nil, fmt.Errorf("no leader found for driver %q: %w", drivername, err)
} else if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity == "" {
return nil, fmt.Errorf("lease %q for driver %q does not have a leader", leaseName, drivername)
}

// validate of NETWORK_FENCE capability is enabled by the storage driver.
if cap.GetNetworkFence().GetType() == identity.Capability_NetworkFence_NETWORK_FENCE {
return proto.NewNetworkFenceClient(v.Client), nil
}
// get the connection for the leader
connID := lease.Namespace + "/" + *lease.Spec.HolderIdentity
conn := r.Connpool.Get(connID)
if conn == nil {
return nil, fmt.Errorf("no connection with key %q found for driver %q: %w", connID, drivername, err)
}

// verify that the CSI-Addons plugin holding the lease supports
// NetworkFence, it probably is a bug if it doesn't
for _, capability := range conn.Capabilities {
// validate if NETWORK_FENCE capability is supported by the driver.
if capability.GetNetworkFence() == nil {
continue
}

// validate of NETWORK_FENCE capability is enabled by the storage driver.
if capability.GetNetworkFence().GetType() == identity.Capability_NetworkFence_NETWORK_FENCE {
return proto.NewNetworkFenceClient(conn.Client), nil
}
}

return nil, fmt.Errorf("no connections for driver: %s", drivername)
return nil, fmt.Errorf("leading CSIAddonsNode %q for driver %q does not support NetworkFence", conn.Name, drivername)
}
15 changes: 14 additions & 1 deletion internal/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import (
type Connection struct {
Client *grpc.ClientConn
Capabilities []*identity.Capability
Name string
NodeID string
DriverName string
Timeout time.Duration
}

// NewConnection establishes connection with sidecar, fetches capability and returns Connection object
// filled with required information.
func NewConnection(ctx context.Context, endpoint, nodeID, driverName string) (*Connection, error) {
func NewConnection(ctx context.Context, endpoint, nodeID, driverName, podName string) (*Connection, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithIdleTimeout(time.Duration(0)),
Expand All @@ -49,6 +50,7 @@ func NewConnection(ctx context.Context, endpoint, nodeID, driverName string) (*C

conn := &Connection{
Client: cc,
Name: podName,
NodeID: nodeID,
DriverName: driverName,
Timeout: time.Minute,
Expand Down Expand Up @@ -83,3 +85,14 @@ func (c *Connection) fetchCapabilities(ctx context.Context) error {

return nil
}

func (c *Connection) HasControllerService() bool {
for _, capability := range c.Capabilities {
svc := capability.GetService()
if svc != nil && svc.GetType() == identity.Capability_Service_CONTROLLER_SERVICE {
return true
}
}

return false
}
17 changes: 16 additions & 1 deletion internal/connection/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ limitations under the License.

package connection

import "sync"
import (
"sync"
)

// ConnectionPool consists of map of Connection objects and
// methods Put, Get & Delete which operates with required rw locks
Expand Down Expand Up @@ -49,6 +51,19 @@ func (cp *ConnectionPool) Put(key string, conn *Connection) {
cp.pool[key] = conn
}

// Get returns the connection object corresponding to given key.
func (cp *ConnectionPool) Get(key string) *Connection {
cp.rwlock.Lock()
defer cp.rwlock.Unlock()

conn, ok := cp.pool[key]
if !ok {
return nil
}

return conn
}

// Delete deletes connection object corresponding to given key.
func (cp *ConnectionPool) Delete(key string) {
cp.rwlock.Lock()
Expand Down

0 comments on commit 9ebdebd

Please sign in to comment.