Skip to content

Commit

Permalink
connectivity: Add Control Plane Node Connectivity Tests
Browse files Browse the repository at this point in the history
The control plane nodes create unique policy selection
contexts that allow us to test that label selection of
host and kube-apiserver entities is correct. This change
adds control plane client-pods on every control plan node
when the hidden test variable `--k8s-localhost-test` is enabled.
Control plane components, as well as the control plane host
are queried with control plane policy selections in place.

Signed-off-by: Nate Sweet <[email protected]>
  • Loading branch information
nathanjsweet committed Oct 19, 2023
1 parent b649f96 commit b0496ba
Show file tree
Hide file tree
Showing 13 changed files with 352 additions and 32 deletions.
3 changes: 3 additions & 0 deletions connectivity/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Parameters struct {
ForceDeploy bool
Hubble bool
HubbleServer string
K8sLocalHostTest bool
MultiCluster string
RunTests []*regexp.Regexp
SkipTests []*regexp.Regexp
Expand Down Expand Up @@ -61,6 +62,8 @@ type Parameters struct {
ExternalOtherIP string
PodCIDRs []podCIDRs
NodeCIDRs []string
ControlPlaneCIDRs []string
K8sCIDR string
NodesWithoutCiliumIPs []nodesWithoutCiliumIP
JunitFile string
JunitProperties map[string]string
Expand Down
102 changes: 81 additions & 21 deletions connectivity/check/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ type ConnectivityTest struct {
echoPods map[string]Pod
echoExternalPods map[string]Pod
clientPods map[string]Pod
clientCPPods map[string]Pod
perfClientPods map[string]Pod
perfServerPod map[string]Pod
PerfResults map[PerfTests]PerfResult
echoServices map[string]Service
ingressService map[string]Service
k8sService Service
externalWorkloads map[string]ExternalWorkload

hostNetNSPodsByNode map[string]Pod
Expand All @@ -78,6 +80,7 @@ type ConnectivityTest struct {
lastFlowTimestamps map[string]time.Time

nodes map[string]*corev1.Node
controlPlaneNodes map[string]*corev1.Node
nodesWithoutCilium map[string]struct{}

manifests map[string]string
Expand All @@ -98,6 +101,29 @@ type PerfResult struct {
Avg float64
}

func netIPToCIDRs(netIPs []netip.Addr) (netCIDRs []netip.Prefix) {
for _, ip := range netIPs {
found := false
for _, cidr := range netCIDRs {
if cidr.Addr().Is4() == ip.Is4() && cidr.Contains(ip) {
found = true
break
}
}
if found {
continue
}

// Generate a /24 or /64 accordingly
bits := 24
if ip.Is6() {
bits = 64
}
netCIDRs = append(netCIDRs, netip.PrefixFrom(ip, bits).Masked())
}
return
}

// verbose returns the value of the user-provided verbosity flag.
func (ct *ConnectivityTest) verbose() bool {
return ct.params.Verbose
Expand Down Expand Up @@ -192,6 +218,7 @@ func NewConnectivityTest(client *k8s.Client, p Parameters, version string) (*Con
echoPods: make(map[string]Pod),
echoExternalPods: make(map[string]Pod),
clientPods: make(map[string]Pod),
clientCPPods: make(map[string]Pod),
perfClientPods: make(map[string]Pod),
perfServerPod: make(map[string]Pod),
PerfResults: make(map[PerfTests]PerfResult),
Expand Down Expand Up @@ -336,6 +363,11 @@ func (ct *ConnectivityTest) SetupAndValidate(ctx context.Context, setupAndValida
return fmt.Errorf("unable to detect node CIDRs: %w", err)
}
}
if ct.params.K8sLocalHostTest {
if err := ct.detectK8sCIDR(ctx); err != nil {
return fmt.Errorf("unable to detect K8s CIDR: %w", err)
}
}
return nil
}

Expand Down Expand Up @@ -639,6 +671,7 @@ func (ct *ConnectivityTest) detectNodeCIDRs(ctx context.Context) error {
}

nodeIPs := make([]netip.Addr, 0, len(nodes.Items))
cPIPs := make([]netip.Addr, 0, 1)

for _, node := range nodes.Items {
for _, addr := range node.Status.Addresses {
Expand All @@ -651,6 +684,9 @@ func (ct *ConnectivityTest) detectNodeCIDRs(ctx context.Context) error {
continue

Check failure on line 684 in connectivity/check/context.go

View workflow job for this annotation

GitHub Actions / build

G601: Implicit memory aliasing in for loop. (gosec)
}
nodeIPs = append(nodeIPs, ip)
if isControlPlane(&node) {
cPIPs = append(cPIPs, ip)
}
}
}

Expand All @@ -659,36 +695,44 @@ func (ct *ConnectivityTest) detectNodeCIDRs(ctx context.Context) error {
}

// collapse set of IPs in to CIDRs
nodeCIDRs := []netip.Prefix{}

for _, ip := range nodeIPs {
found := false
for _, cidr := range nodeCIDRs {
if cidr.Addr().Is4() == ip.Is4() && cidr.Contains(ip) {
found = true
break
}
}
if found {
continue
}

// Generate a /24 or /64 accordingly
bits := 24
if ip.Is6() {
bits = 64
}
nodeCIDRs = append(nodeCIDRs, netip.PrefixFrom(ip, bits).Masked())
}
nodeCIDRs := netIPToCIDRs(nodeIPs)
cPCIDRs := netIPToCIDRs(cPIPs)

ct.params.NodeCIDRs = make([]string, 0, len(nodeCIDRs))
for _, cidr := range nodeCIDRs {
ct.params.NodeCIDRs = append(ct.params.NodeCIDRs, cidr.String())
}
ct.params.ControlPlaneCIDRs = make([]string, 0, len(cPCIDRs))
for _, cidr := range cPCIDRs {
ct.params.ControlPlaneCIDRs = append(ct.params.ControlPlaneCIDRs, cidr.String())
}
ct.Debugf("Detected NodeCIDRs: %v", ct.params.NodeCIDRs)
return nil
}

// detectK8sCIDR produces one CIDR that covers the kube-apiserver address.
// ipv4 addresses are collapsed in to one or more /24s, and v6 to one or more /64s
func (ct *ConnectivityTest) detectK8sCIDR(ctx context.Context) error {
service, err := ct.client.GetService(ctx, "default", "kubernetes", metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get \"kubernetes.default\" service: %w", err)
}
addr, err := netip.ParseAddr(service.Spec.ClusterIP)
if err != nil {
return fmt.Errorf("failed to parse \"kubernetes.default\" service Cluster IP: %w", err)
}

// Generate a /24 or /64 accordingly
bits := 24
if addr.Is6() {
bits = 64
}
ct.params.K8sCIDR = netip.PrefixFrom(addr, bits).Masked().String()
ct.k8sService = Service{Service: service, URLPath: "/healthz"}
ct.Debugf("Detected K8sCIDR: %q", ct.params.K8sCIDR)
return nil
}

func (ct *ConnectivityTest) detectNodesWithoutCiliumIPs() error {
for n := range ct.nodesWithoutCilium {
pod := ct.hostNetNSPodsByNode[n]
Expand Down Expand Up @@ -812,6 +856,7 @@ func (ct *ConnectivityTest) initCiliumPods(ctx context.Context) error {

func (ct *ConnectivityTest) getNodes(ctx context.Context) error {
ct.nodes = make(map[string]*corev1.Node)
ct.controlPlaneNodes = make(map[string]*corev1.Node)
ct.nodesWithoutCilium = make(map[string]struct{})
nodeList, err := ct.client.ListNodes(ctx, metav1.ListOptions{})
if err != nil {
Expand All @@ -821,6 +866,9 @@ func (ct *ConnectivityTest) getNodes(ctx context.Context) error {
for _, node := range nodeList.Items {
node := node
if canNodeRunCilium(&node) {
if isControlPlane(&node) {
ct.controlPlaneNodes[node.ObjectMeta.Name] = node.DeepCopy()
}
ct.nodes[node.ObjectMeta.Name] = node.DeepCopy()
} else {
ct.nodesWithoutCilium[node.ObjectMeta.Name] = struct{}{}
Expand Down Expand Up @@ -960,10 +1008,18 @@ func (ct *ConnectivityTest) Nodes() map[string]*corev1.Node {
return ct.nodes
}

func (ct *ConnectivityTest) ControlPlaneNodes() map[string]*corev1.Node {
return ct.controlPlaneNodes
}

func (ct *ConnectivityTest) ClientPods() map[string]Pod {
return ct.clientPods
}

func (ct *ConnectivityTest) ControlPlaneClientPods() map[string]Pod {
return ct.clientCPPods
}

func (ct *ConnectivityTest) HostNetNSPodsByNode() map[string]Pod {
return ct.hostNetNSPodsByNode
}
Expand Down Expand Up @@ -1000,6 +1056,10 @@ func (ct *ConnectivityTest) IngressService() map[string]Service {
return ct.ingressService
}

func (ct *ConnectivityTest) K8sService() Service {
return ct.k8sService
}

func (ct *ConnectivityTest) ExternalWorkloads() map[string]ExternalWorkload {
return ct.externalWorkloads
}
Expand Down
52 changes: 48 additions & 4 deletions connectivity/check/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand All @@ -33,6 +34,7 @@ const (

clientDeploymentName = "client"
client2DeploymentName = "client2"
clientCPDeployment = "client-cp"

DNSTestServerContainerName = "dns-test-server"

Expand Down Expand Up @@ -852,6 +854,36 @@ func (ct *ConnectivityTest) deploy(ctx context.Context) error {
}
}

// 3rd client scheduled on the control plane
if ct.params.K8sLocalHostTest {
ct.Logf("✨ [%s] Deploying %s deployment...", ct.clients.src.ClusterName(), clientCPDeployment)
clientDeployment := newDeployment(deploymentParameters{
Name: clientCPDeployment,
Kind: kindClientName,
NamedPort: "http-8080",
Port: 8080,
Image: ct.params.CurlImage,
Command: []string{"/bin/ash", "-c", "sleep 10000000"},
Labels: map[string]string{"other": "client"},
Annotations: ct.params.DeploymentAnnotations.Match(client2DeploymentName),
NodeSelector: map[string]string{
"node-role.kubernetes.io/control-plane": "",
},
Replicas: len(ct.ControlPlaneNodes()),
Tolerations: []corev1.Toleration{
{Key: "node-role.kubernetes.io/control-plane"},
},
})
_, err = ct.clients.src.CreateServiceAccount(ctx, ct.params.TestNamespace, k8s.NewServiceAccount(clientCPDeployment), metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
return fmt.Errorf("unable to create service account %s: %s", clientCPDeployment, err)
}
_, err = ct.clients.src.CreateDeployment(ctx, ct.params.TestNamespace, clientDeployment, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
return fmt.Errorf("unable to create deployment %s: %s", clientCPDeployment, err)
}
}

if !ct.params.SingleNode || ct.params.MultiCluster != "" {
_, err = ct.clients.dst.GetService(ctx, ct.params.TestNamespace, echoOtherNodeDeploymentName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -1158,9 +1190,17 @@ func (ct *ConnectivityTest) validateDeployment(ctx context.Context) error {
return err
}

ct.clientPods[pod.Name] = Pod{
K8sClient: ct.client,
Pod: pod.DeepCopy(),
if strings.Contains(pod.Name, clientCPDeployment) {
ct.clientCPPods[pod.Name] = Pod{
K8sClient: ct.client,
Pod: pod.DeepCopy(),
}
} else {
ct.clientPods[pod.Name] = Pod{
K8sClient: ct.client,
Pod: pod.DeepCopy(),
}

}
}

Expand Down Expand Up @@ -1222,7 +1262,11 @@ func (ct *ConnectivityTest) validateDeployment(ctx context.Context) error {
return err
}
}

for _, cpp := range ct.clientCPPods {
if err := WaitForCoreDNS(ctx, ct, cpp); err != nil {
return err
}
}
for _, client := range ct.clients.clients() {
echoPods, err := client.ListPods(ctx, ct.params.TestNamespace, metav1.ListOptions{LabelSelector: "kind=" + kindEchoName})
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions connectivity/check/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,11 @@ func canNodeRunCilium(node *corev1.Node) bool {
val, ok := node.ObjectMeta.Labels["cilium.io/no-schedule"]
return !ok || val == "false"
}

func isControlPlane(node *corev1.Node) bool {
if node != nil {
_, ok := node.Labels["node-role.kubernetes.io/control-plane"]
return ok
}
return false
}
16 changes: 12 additions & 4 deletions connectivity/check/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package check

import (
"fmt"
"net"
"net/url"
"strconv"
Expand Down Expand Up @@ -162,6 +163,8 @@ func (p Pod) FlowFilters() []*flow.FlowFilter {
type Service struct {
// Service is the Kubernetes service resource
Service *corev1.Service

URLPath string
}

// Name returns the absolute name of the service.
Expand All @@ -174,23 +177,28 @@ func (s Service) NameWithoutNamespace() string {
return s.Service.Name
}

// Scheme returns the string 'http'.
// Scheme returns the string 'https' if the port is 443 or 6443, otherwise
// it returns 'http'.
func (s Service) Scheme() string {
// We only have http services for now.
switch s.Port() {
case 443, 6443:
return "https"
}
return "http"

}

// Path returns the string '/'.
func (s Service) Path() string {
// No support for paths yet.
return ""
return s.URLPath
}

// Address returns the network address of the Service.
func (s Service) Address(family features.IPFamily) string {
// If the cluster IP is empty (headless service case) or the IP family is set to any, return the service name
if s.Service.Spec.ClusterIP == "" || family == features.IPFamilyAny {
return s.Service.Name
return fmt.Sprintf("%s.%s", s.Service.Name, s.Service.Namespace)
}

getClusterIPForIPFamily := func(family v1.IPFamily) string {
Expand Down
15 changes: 15 additions & 0 deletions connectivity/manifests/client-egress-to-cidr-cp-host-knp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# This policy allows packets to all node IPs
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: client-egress-to-cidr-cp-host
spec:
podSelector:
matchLabels:
kind: client
egress:
- to:
{{- range .ControlPlaneCIDRs }}
- ipBlock:
cidr: {{.}}
{{- end }}
Loading

0 comments on commit b0496ba

Please sign in to comment.