Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send NetworkFence requests to the leading CSI-Addons sidecar #492

Merged
merged 4 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controllers/csiaddons/csiaddonsnode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

logger.Info("Connecting to sidecar")
newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName)
newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Name)
if err != nil {
logger.Error(err, "Failed to establish connection with sidecar")

Expand Down
41 changes: 22 additions & 19 deletions controllers/csiaddons/networkfence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (r *NetworkFenceReconciler) Reconcile(ctx context.Context, req ctrl.Request

logger = logger.WithValues("DriverName", nwFence.Spec.Driver, "CIDRs", nwFence.Spec.Cidrs)

client, err := r.getNetworkFenceClient(nwFence.Spec.Driver, "")
client, err := r.getNetworkFenceClient(ctx, nwFence.Spec.Driver)
if err != nil {
logger.Error(err, "Failed to get NetworkFenceClient")
return ctrl.Result{}, err
Expand Down Expand Up @@ -298,26 +298,29 @@ func (nf *NetworkFenceInstance) removeFinalizerFromNetworkFence(ctx context.Cont
return nil
}

// getNetworkFenceClient returns a NetworkFenceClient for the given driver.
func (r *NetworkFenceReconciler) getNetworkFenceClient(drivername, nodeID string) (proto.NetworkFenceClient, error) {
conns := r.Connpool.GetByNodeID(drivername, nodeID)

// Iterate through the connections and find the one that matches the driver name
// provided in the NetworkFence spec; so that corresponding network fence and
// unfence operations can be performed.
for _, v := range conns {
for _, cap := range v.Capabilities {
// validate if NETWORK_FENCE capability is supported by the driver.
if cap.GetNetworkFence() == nil {
continue
}
// getNetworkFenceClient returns a NetworkFenceClient that is the leader for
// the given driver.
// The NetworkFenceClient should only run on a CONTROLLER_SERVICE capable
// CSI-Addons plugin, there can only be one plugin that holds the lease.
func (r *NetworkFenceReconciler) getNetworkFenceClient(ctx context.Context, drivername string) (proto.NetworkFenceClient, error) {
conn, err := r.Connpool.GetLeaderByDriver(ctx, r.Client, drivername)
if err != nil {
return nil, err
}

// validate of NETWORK_FENCE capability is enabled by the storage driver.
if cap.GetNetworkFence().GetType() == identity.Capability_NetworkFence_NETWORK_FENCE {
return proto.NewNetworkFenceClient(v.Client), nil
}
// verify that the CSI-Addons plugin holding the lease supports
// NetworkFence, it probably is a bug if it doesn't
for _, capability := range conn.Capabilities {
// validate if NETWORK_FENCE capability is supported by the driver.
if capability.GetNetworkFence() == nil {
continue
}

// validate of NETWORK_FENCE capability is enabled by the storage driver.
if capability.GetNetworkFence().GetType() == identity.Capability_NetworkFence_NETWORK_FENCE {
return proto.NewNetworkFenceClient(conn.Client), nil
}
}

return nil, fmt.Errorf("no connections for driver: %s", drivername)
return nil, fmt.Errorf("leading CSIAddonsNode %q for driver %q does not support NetworkFence", conn.Name, drivername)
}
15 changes: 14 additions & 1 deletion internal/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import (
type Connection struct {
Client *grpc.ClientConn
Capabilities []*identity.Capability
Name string
NodeID string
DriverName string
Timeout time.Duration
}

// NewConnection establishes connection with sidecar, fetches capability and returns Connection object
// filled with required information.
func NewConnection(ctx context.Context, endpoint, nodeID, driverName string) (*Connection, error) {
func NewConnection(ctx context.Context, endpoint, nodeID, driverName, podName string) (*Connection, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithIdleTimeout(time.Duration(0)),
Expand All @@ -49,6 +50,7 @@ func NewConnection(ctx context.Context, endpoint, nodeID, driverName string) (*C

conn := &Connection{
Client: cc,
Name: podName,
NodeID: nodeID,
DriverName: driverName,
Timeout: time.Minute,
Expand Down Expand Up @@ -83,3 +85,14 @@ func (c *Connection) fetchCapabilities(ctx context.Context) error {

return nil
}

func (c *Connection) HasControllerService() bool {
for _, capability := range c.Capabilities {
svc := capability.GetService()
if svc != nil && svc.GetType() == identity.Capability_Service_CONTROLLER_SERVICE {
return true
}
}

return false
}
38 changes: 37 additions & 1 deletion internal/connection/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@ limitations under the License.

package connection

import "sync"
import (
"context"
"fmt"
"sync"

coordination "k8s.io/api/coordination/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/csi-addons/kubernetes-csi-addons/internal/util"
)

// ConnectionPool consists of map of Connection objects and
// methods Put, Get & Delete which operates with required rw locks
Expand Down Expand Up @@ -93,3 +102,30 @@ func (cp *ConnectionPool) GetByNodeID(driverName, nodeID string) map[string]*Con

return result
}

// GetLeaderByDriver finds the holder of the lease for the driver, and returns
// the connection to that particular CSI-Addons sidecar.
func (cp *ConnectionPool) GetLeaderByDriver(ctx context.Context, reconciler client.Client, driverName string) (*Connection, error) {
// get the Lease for the driver
leaseName := util.NormalizeLeaseName(driverName) + "-csi-addons"
var lease coordination.Lease
err := reconciler.Get(ctx, client.ObjectKey{Name: leaseName}, &lease)
if err != nil {
return nil, fmt.Errorf("no leader found for driver %q: %w", driverName, err)
} else if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity == "" {
return nil, fmt.Errorf("lease %q for driver %q does not have a leader", leaseName, driverName)
}

// get the connection for the leader
key := lease.Namespace + "/" + *lease.Spec.HolderIdentity

cp.rwlock.RLock()
defer cp.rwlock.RUnlock()

conn, ok := cp.pool[key]
if !ok {
return nil, fmt.Errorf("no connection with key %q found for driver %q: %w", key, driverName, err)
}

return conn, nil
}
35 changes: 35 additions & 0 deletions internal/util/normalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2023 The Kubernetes-CSI-Addons Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"regexp"
)

// NormalizeLeaseName sanitizes the provided string so it can be used as name
// for a Lease object.
//
// Taken from csi-lib-utils/leaderelection:sanitizeName()
func NormalizeLeaseName(name string) string {
re := regexp.MustCompile("[^a-zA-Z0-9-]")
name = re.ReplaceAllString(name, "-")
if name[len(name)-1] == '-' {
// name must not end with '-'
name = name + "X"
}
return name
}
53 changes: 53 additions & 0 deletions internal/util/normalize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2023 The Kubernetes-CSI-Addons Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNormalizeLeaseName(t *testing.T) {
tests := []struct {
name string
want string
}{
{
name: "some.csi.driver",
want: "some-csi-driver",
},
{
name: "some.csi/driver",
want: "some-csi-driver",
},
{
name: "some.csi.driver...",
want: "some-csi-driver---X",
},
{
name: "#some.csi.driver",
want: "-some-csi-driver",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
name := NormalizeLeaseName(tt.name)
assert.Equal(t, tt.want, name)
})
}
}
29 changes: 29 additions & 0 deletions sidecar/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Client interface {
GetGRPCClient() *grpc.ClientConn
Probe() error
GetDriverName() (string, error)
HasControllerService() (bool, error)
}

// clientImpl holds the GRPC connenction details
Expand Down Expand Up @@ -123,6 +124,34 @@ func (c *clientImpl) probeOnce() (bool, error) {
return r.GetValue(), nil
}

// HasControllerService gets the driver name from the driver
func (c *clientImpl) HasControllerService() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
defer cancel()

identityClient := identity.NewIdentityClient(c.client)

req := identity.GetCapabilitiesRequest{}
rsp, err := identityClient.GetCapabilities(ctx, &req)
if err != nil {
return false, err
}

caps := rsp.GetCapabilities()
if len(caps) == 0 {
return false, errors.New("driver does not have any capabilities")
}

for _, c := range caps {
svc := c.GetService()
if svc != nil && svc.GetType() == identity.Capability_Service_CONTROLLER_SERVICE {
return true, nil
}
}

return false, nil
}

// GetDriverName gets the driver name from the driver
func (c *clientImpl) GetDriverName() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
Expand Down
8 changes: 8 additions & 0 deletions sidecar/internal/csiaddonsnode/csiaddonsnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
type mockClient struct {
// Driver contains the drivername obtained with GetDriverName()
driver string

// isController is set to true when the CSI-plugin supports the
// CONTROLLER_SERVICE
isController bool
}

func NewMockClient(driver string) client.Client {
Expand All @@ -50,6 +54,10 @@ func (mc *mockClient) GetDriverName() (string, error) {
return mc.driver, nil
}

func (mc *mockClient) HasControllerService() (bool, error) {
return mc.isController, nil
}

func Test_getCSIAddonsNode(t *testing.T) {
var (
podName = "pod"
Expand Down
Loading