From 583a637158fbdb55309e8e0ee1f15de797658d84 Mon Sep 17 00:00:00 2001 From: Paul Miller Date: Mon, 18 Nov 2024 07:54:23 +0000 Subject: [PATCH] interceptor builds fix loggin regressions and slim down interface fix test put nfqueue in a seperate package have to default config first duplicate metrics don't clean up rules till the end more to shutdown keep tryig thin down interface some type verdict shave out things that are uncessary for interception --- cmd/main.go | 31 +- pkg/networkpolicy/controller.go | 488 +++------------------ pkg/networkpolicy/controller_test.go | 13 +- pkg/networkpolicy/metrics.go | 114 ----- pkg/networkpolicy/networkpolicy_test.go | 38 +- pkg/networkpolicy/networkpolicyapi_test.go | 34 +- pkg/networkpolicy/packet.go | 17 +- pkg/networkpolicy/packet_test.go | 38 +- pkg/networkpolicy/verdict_string.go | 24 + pkg/nfqinterceptor/metrics.go | 134 ++++++ pkg/nfqinterceptor/nfqinterceptor.go | 401 +++++++++++++++++ 11 files changed, 720 insertions(+), 612 deletions(-) create mode 100644 pkg/networkpolicy/verdict_string.go create mode 100644 pkg/nfqinterceptor/metrics.go create mode 100644 pkg/nfqinterceptor/nfqinterceptor.go diff --git a/cmd/main.go b/cmd/main.go index 1fb9a3f..c6a0429 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,10 +8,10 @@ import ( "net/http" "os" "os/signal" - "time" "github.com/prometheus/client_golang/prometheus/promhttp" "sigs.k8s.io/kube-network-policies/pkg/networkpolicy" + "sigs.k8s.io/kube-network-policies/pkg/nfqinterceptor" npaclient "sigs.k8s.io/network-policy-api/pkg/client/clientset/versioned" npainformers "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions" "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions/apis/v1alpha1" @@ -148,6 +148,20 @@ func run() int { utilruntime.HandleError(err) }() + err = cfg.Defaults() + if err != nil { + logger.Error(err, "could not default config") + return 1 + } + + //TODO log config? + + interceptor, err := nfqinterceptor.New(cfg) + if err != nil { + logger.Error(err, "could not start nfq interceptror") + return 1 + } + networkPolicyController, err := networkpolicy.NewController( clientset, informersFactory.Networking().V1().NetworkPolicies(), @@ -155,6 +169,7 @@ func run() int { informersFactory.Core().V1().Pods(), nodeInformer, npaClient, + interceptor, anpInformer, banpInformer, cfg, @@ -163,19 +178,19 @@ func run() int { logger.Error(err, "Can not start network policy controller") return 1 } - go func() { - err := networkPolicyController.Run(ctx) - utilruntime.HandleError(err) - }() + err = networkPolicyController.Run(ctx) + if err != nil { + logger.Error(err, "Can not start network policy controller") + return 1 + } informersFactory.Start(ctx.Done()) if adminNetworkPolicy || baselineAdminNetworkPolicy { npaInformerFactory.Start(ctx.Done()) } - <-ctx.Done() + //should block till its resources are cleane up. + interceptor.Run(ctx, networkPolicyController.EvaluatePacket) - // grace period to cleanup resources - time.Sleep(5 * time.Second) return 0 } diff --git a/pkg/networkpolicy/controller.go b/pkg/networkpolicy/controller.go index c55c0ae..3f27c87 100644 --- a/pkg/networkpolicy/controller.go +++ b/pkg/networkpolicy/controller.go @@ -6,9 +6,6 @@ import ( "os" "time" - nfqueue "github.com/florianl/go-nfqueue" - "github.com/mdlayher/netlink" - v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/meta" @@ -28,9 +25,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" netutils "k8s.io/utils/net" - "k8s.io/utils/ptr" - "sigs.k8s.io/knftables" npav1alpha1 "sigs.k8s.io/network-policy-api/apis/v1alpha1" npaclient "sigs.k8s.io/network-policy-api/pkg/client/clientset/versioned" policyinformers "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions/apis/v1alpha1" @@ -95,41 +90,7 @@ func NewController(client clientset.Interface, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, npaClient npaclient.Interface, - adminNetworkPolicyInformer policyinformers.AdminNetworkPolicyInformer, - baselineAdminNetworkPolicyInformer policyinformers.BaselineAdminNetworkPolicyInformer, - config Config, -) (*Controller, error) { - err := config.Defaults() - if err != nil { - return nil, err - } - klog.V(2).Info("Initializing nftables") - nft, err := knftables.New(knftables.InetFamily, config.NFTableName) - if err != nil { - return nil, err - } - - return newController( - client, - nft, - networkpolicyInformer, - namespaceInformer, - podInformer, - nodeInformer, - npaClient, - adminNetworkPolicyInformer, - baselineAdminNetworkPolicyInformer, - config, - ) -} - -func newController(client clientset.Interface, - nft knftables.Interface, - networkpolicyInformer networkinginformers.NetworkPolicyInformer, - namespaceInformer coreinformers.NamespaceInformer, - podInformer coreinformers.PodInformer, - nodeInformer coreinformers.NodeInformer, - npaClient npaclient.Interface, + interceptor interceptor, adminNetworkPolicyInformer policyinformers.AdminNetworkPolicyInformer, baselineAdminNetworkPolicyInformer policyinformers.BaselineAdminNetworkPolicyInformer, config Config, @@ -144,11 +105,11 @@ func newController(client clientset.Interface, c := &Controller{ client: client, config: config, - nft: nft, queue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: controllerName}, ), + interceptor: interceptor, } err := podInformer.Informer().AddIndexers(cache.Indexers{ @@ -347,13 +308,23 @@ type Controller struct { // if an error or not found it returns nil getPodAssignedToIP func(podIP string) *v1.Pod - nft knftables.Interface // install the necessary nftables rules - nfq *nfqueue.Nfqueue - flushed bool + interceptor interceptor } -// Run will not return until stopCh is closed. workers determines how many -// endpoints will be handled in parallel. +//go:generate stringer -type=Verdict +type Verdict int + +// Verdicts +const ( + Drop Verdict = iota + Accept +) + +type interceptor interface { + Sync(ctx context.Context, podV4IPs, podV6IPs sets.Set[string]) error +} + +// Run will return after caches are synced but otherwise does not block func (c *Controller) Run(ctx context.Context) error { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -380,131 +351,37 @@ func (c *Controller) Run(ctx context.Context) error { // add metrics registerMetrics(ctx) - // collect metrics periodically - go wait.UntilWithContext(ctx, func(ctx context.Context) { - logger := klog.FromContext(ctx) - queues, err := readNfnetlinkQueueStats() - if err != nil { - logger.Error(err, "reading nfqueue stats") - return - } - logger.V(4).Info("Obtained metrics for queues", "nqueues", len(queues)) - for _, q := range queues { - logger.V(4).Info("Updating metrics", "queue", q.id_sequence) - nfqueueQueueTotal.WithLabelValues(q.queue_number).Set(float64(q.queue_total)) - nfqueueQueueDropped.WithLabelValues(q.queue_number).Set(float64(q.queue_dropped)) - nfqueueUserDropped.WithLabelValues(q.queue_number).Set(float64(q.user_dropped)) - nfqueuePacketID.WithLabelValues(q.queue_number).Set(float64(q.id_sequence)) - } - - }, 30*time.Second) // Start the workers after the repair loop to avoid races - logger.Info("Syncing nftables rules") - _ = c.syncNFTablesRules(ctx) - defer c.cleanNFTablesRules(ctx) go wait.Until(c.runWorker, time.Second, ctx.Done()) - var flags uint32 - // https://netfilter.org/projects/libnetfilter_queue/doxygen/html/group__Queue.html - // the kernel will not normalize offload packets, - // i.e. your application will need to be able to handle packets larger than the mtu. - // Normalization is expensive, so this flag should always be set. - // This also solves a bug with SCTP - // https://github.com/aojea/kube-netpol/issues/8 - // https://bugzilla.netfilter.org/show_bug.cgi?id=1742 - flags = nfqueue.NfQaCfgFlagGSO - if c.config.FailOpen { - flags += nfqueue.NfQaCfgFlagFailOpen - } - - // Set configuration options for nfqueue - config := nfqueue.Config{ - NfQueue: uint16(c.config.QueueID), - Flags: flags, - MaxPacketLen: 128, // only interested in the headers - MaxQueueLen: 1024, - Copymode: nfqueue.NfQnlCopyPacket, // headers - WriteTimeout: 100 * time.Millisecond, - } - - nf, err := nfqueue.Open(&config) - if err != nil { - logger.Info("could not open nfqueue socket", "error", err) - return err - } - defer nf.Close() - - c.nfq = nf - - // Parse the packet and check if it should be accepted - // Packets should be evaludated independently in each direction - fn := func(a nfqueue.Attribute) int { - verdict := nfqueue.NfDrop - if c.config.FailOpen { - verdict = nfqueue.NfAccept - } - - startTime := time.Now() - logger.V(2).Info("Processing sync for packet", "id", *a.PacketID) - - packet, err := parsePacket(*a.Payload) - if err != nil { - logger.Error(err, "Can not process packet, applying default policy", "id", *a.PacketID, "failOpen", c.config.FailOpen) - c.nfq.SetVerdict(*a.PacketID, verdict) //nolint:errcheck - return 0 - } - packet.id = *a.PacketID - - defer func() { - processingTime := float64(time.Since(startTime).Microseconds()) - packetProcessingHist.WithLabelValues(string(packet.proto), string(packet.family)).Observe(processingTime) - packetProcessingSum.Observe(processingTime) - verdictStr := verdictString(verdict) - packetCounterVec.WithLabelValues(string(packet.proto), string(packet.family), verdictStr).Inc() - logger.V(2).Info("Finished syncing packet", "id", *a.PacketID, "duration", time.Since(startTime), "verdict", verdictStr) - }() - - if c.evaluatePacket(ctx, packet) { - verdict = nfqueue.NfAccept - } else { - verdict = nfqueue.NfDrop - } - c.nfq.SetVerdict(*a.PacketID, verdict) //nolint:errcheck - return 0 - } - - // Register your function to listen on nflog group 100 - err = nf.RegisterWithErrorFunc(ctx, fn, func(err error) int { - if opError, ok := err.(*netlink.OpError); ok { - if opError.Timeout() || opError.Temporary() { - return 0 - } - } - logger.Info("Could not receive message", "error", err) - return 0 - }) - if err != nil { - logger.Info("could not open nfqueue socket", "error", err) - return err - } - - <-ctx.Done() - return nil } -// verifctString coverts nfqueue int vericts to strings for metrics/logging -// it does not cover all of them because we should only use a subset. -func verdictString(verdict int) string { - switch verdict { - case nfqueue.NfDrop: - return "drop" - case nfqueue.NfAccept: - return "accept" - default: - return "unknown" +// Parse the packet and check if it should be accepted +// Packets should be evaludated independently in each direction +func (c *Controller) EvaluatePacket(ctx context.Context, packet Packet) Verdict { + + startTime := time.Now() + logger := klog.FromContext(ctx) + + logger.V(2).Info("Processing sync for packet", "id", packet.Id) + verdict := Accept + defer func() { + processingTime := float64(time.Since(startTime).Microseconds()) + packetProcessingHist.WithLabelValues(string(packet.proto), string(packet.family)).Observe(processingTime) + packetProcessingSum.Observe(processingTime) + verdictStr := verdict.String() + packetCounterVec.WithLabelValues(string(packet.proto), string(packet.family), verdictStr).Inc() + logger.V(2).Info("Finished syncing packet", "id", packet.Id, "duration", time.Since(startTime), "verdict", verdictStr) + }() + + if c.evaluatePacket(ctx, packet) { + verdict = Accept + } else { + verdict = Drop } + return verdict } // evaluatePacket evalute the network policies using the following order: @@ -514,7 +391,7 @@ func verdictString(verdict int) string { // 4. AdminNetworkPolicies in Ingress for the destination Pod/IP // 5. NetworkPolicies in Ingress (if needed) for the destination Pod/IP // 6. BaselineAdminNetworkPolicies in Ingress (if needed) for the destination Pod/IP -func (c *Controller) evaluatePacket(ctx context.Context, p packet) bool { +func (c *Controller) evaluatePacket(ctx context.Context, p Packet) bool { logger := klog.FromContext(ctx) srcIP := p.srcIP srcPod := c.getPodAssignedToIP(srcIP.String()) @@ -530,7 +407,7 @@ func (c *Controller) evaluatePacket(ctx context.Context, p packet) bool { tlogger := logger.V(2) if tlogger.Enabled() { tlogger.Info("Evaluating packet", "packet", p) - tlogger = tlogger.WithValues("id", p.id) + tlogger = tlogger.WithValues("id", p.Id) } // Evalute Egress Policies @@ -637,7 +514,28 @@ func (c *Controller) processNextItem() bool { defer c.queue.Done(key) // Invoke the method containing the business logic - err := c.syncNFTablesRules(context.Background()) + + networkPolicies, err := c.networkpolicyLister.List(labels.Everything()) + if err != nil { + c.handleErr(err, key) + return true + } + podV4IPs := sets.New[string]() + podV6IPs := sets.New[string]() + for _, networkPolicy := range networkPolicies { + pods := c.getLocalPodsForNetworkPolicy(networkPolicy) + for _, pod := range pods { + for _, ip := range pod.Status.PodIPs { + if netutils.IsIPv4String(ip.IP) { + podV4IPs.Insert(ip.IP) + } else { + podV6IPs.Insert(ip.IP) + } + } + } + } + + err = c.interceptor.Sync(context.Background(), podV4IPs, podV6IPs) // Handle the error if something went wrong during the execution of the business logic c.handleErr(err, key) return true @@ -668,261 +566,3 @@ func (c *Controller) handleErr(err error, key string) { utilruntime.HandleError(err) klog.InfoS("Dropping out of the queue", "error", err, "key", key) } - -// syncNFTablesRules adds the necessary rules to process the first connection packets in userspace -// and check if network policies must apply. -// TODO: We can divert only the traffic affected by network policies using a set in nftables or an IPset. -func (c *Controller) syncNFTablesRules(ctx context.Context) error { - table := &knftables.Table{ - Comment: knftables.PtrTo("rules for kubernetes NetworkPolicy"), - } - tx := c.nft.NewTransaction() - // do it once to delete the existing table - if !c.flushed { - tx.Add(table) - tx.Delete(table) - c.flushed = true - } - tx.Add(table) - - // only if no admin network policies are used - if !c.config.AdminNetworkPolicy && !c.config.BaselineAdminNetworkPolicy { - // add set with Local Pod IPs impacted by network policies - tx.Add(&knftables.Set{ - Name: podV4IPsSet, - Type: "ipv4_addr", - Comment: ptr.To("Local V4 Pod IPs with Network Policies"), - }) - tx.Flush(&knftables.Set{ - Name: podV4IPsSet, - }) - tx.Add(&knftables.Set{ - Name: podV6IPsSet, - Type: "ipv6_addr", - Comment: ptr.To("Local V6 Pod IPs with Network Policies"), - }) - tx.Flush(&knftables.Set{ - Name: podV6IPsSet, - }) - - networkPolicies, err := c.networkpolicyLister.List(labels.Everything()) - if err != nil { - return err - } - podV4IPs := sets.New[string]() - podV6IPs := sets.New[string]() - for _, networkPolicy := range networkPolicies { - pods := c.getLocalPodsForNetworkPolicy(networkPolicy) - for _, pod := range pods { - for _, ip := range pod.Status.PodIPs { - if netutils.IsIPv4String(ip.IP) { - podV4IPs.Insert(ip.IP) - } else { - podV6IPs.Insert(ip.IP) - } - } - } - } - - for _, ip := range podV4IPs.UnsortedList() { - tx.Add(&knftables.Element{ - Set: podV4IPsSet, - Key: []string{ip}, - }) - } - for _, ip := range podV6IPs.UnsortedList() { - tx.Add(&knftables.Element{ - Set: podV6IPsSet, - Key: []string{ip}, - }) - } - } - // Process the packets that are, usually on the FORWARD hook, but - // IPVS packets follow a different path in netfilter, so we process - // everything in the POSTROUTING hook before SNAT happens. - // Ref: https://github.com/kubernetes-sigs/kube-network-policies/issues/46 - hook := knftables.PostroutingHook - chainName := string(hook) - tx.Add(&knftables.Chain{ - Name: chainName, - Type: knftables.PtrTo(knftables.FilterType), - Hook: knftables.PtrTo(hook), - Priority: knftables.PtrTo(knftables.SNATPriority + "-5"), - }) - tx.Flush(&knftables.Chain{ - Name: chainName, - }) - - // DNS is processed by addDNSRacersWorkaroundRules() - // TODO: remove once kernel fix is on most distros - if c.config.NetfilterBug1766Fix { - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: "udp dport 53 accept", - Comment: ptr.To("process DNS traffic on PREROUTING hook with network policy enforcement to avoid netfilter race condition bug"), - }) - } - - // IPv6 needs ICMP Neighbor Discovery to work - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "icmpv6", "type", "{", "nd-neighbor-solicit, nd-neighbor-advert", "}", "accept"), - }) - // Don't process traffic generated from the root user in the Node, it can block kubelet probes - // or system daemons that depend on the internal node traffic to not be blocked. - // Ref: https://github.com/kubernetes-sigs/kube-network-policies/issues/65 - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: "meta skuid 0 accept", - }) - // instead of aggregating all the expresion in one rule, use two different - // rules to understand if is causing issues with UDP packets with the same - // tuple (https://github.com/kubernetes-sigs/kube-network-policies/issues/12) - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "ct", "state", "established,related", "accept"), - }) - - action := fmt.Sprintf("queue num %d", c.config.QueueID) - if c.config.FailOpen { - action += " bypass" - } - - // only if no admin network policies are used - if !c.config.AdminNetworkPolicy && !c.config.BaselineAdminNetworkPolicy { - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "ip", "saddr", "@", podV4IPsSet, action, - ), - Comment: ptr.To("process IPv4 traffic with network policy enforcement"), - }) - - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "ip", "daddr", "@", podV4IPsSet, action, - ), - Comment: ptr.To("process IPv4 traffic with network policy enforcement"), - }) - - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "ip6", "saddr", "@", podV6IPsSet, action, - ), - Comment: ptr.To("process IPv6 traffic with network policy enforcement"), - }) - - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "ip6", "daddr", "@", podV6IPsSet, action, - ), - Comment: ptr.To("process IPv6 traffic with network policy enforcement"), - }) - } else { - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: action, - }) - } - - if c.config.NetfilterBug1766Fix { - c.addDNSRacersWorkaroundRules(ctx, tx) - } - - if err := c.nft.Run(ctx, tx); err != nil { - klog.FromContext(ctx).Info("syncing nftables rules", "error", err) - return err - } - return nil -} - -// To avoid a kernel bug caused by UDP DNS request racing with conntrack -// process the DNS packets only on the PREROUTING hook after DNAT happens -// so we can see the resolved destination IPs, typically the ones of the Pods -// that are used for the Kubernetes DNS Service. -// xref: https://github.com/kubernetes-sigs/kube-network-policies/issues/12 -// This can be removed once all kernels contain the fix in -// https://github.com/torvalds/linux/commit/8af79d3edb5fd2dce35ea0a71595b6d4f9962350 -// TODO: remove once kernel fix is on most distros -func (c *Controller) addDNSRacersWorkaroundRules(ctx context.Context, tx *knftables.Transaction) { - hook := knftables.PreroutingHook - chainName := string(hook) - tx.Add(&knftables.Chain{ - Name: chainName, - Type: knftables.PtrTo(knftables.FilterType), - Hook: knftables.PtrTo(hook), - Priority: knftables.PtrTo(knftables.DNATPriority + "+5"), - }) - tx.Flush(&knftables.Chain{ - Name: chainName, - }) - - action := fmt.Sprintf("queue num %d", c.config.QueueID) - if c.config.FailOpen { - action += " bypass" - } - - if !c.config.AdminNetworkPolicy && !c.config.BaselineAdminNetworkPolicy { - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "ip", "saddr", "@", podV4IPsSet, "udp dport 53", action, - ), - Comment: ptr.To("process IPv4 traffic destined to a DNS server with network policy enforcement"), - }) - - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "ip", "daddr", "@", podV4IPsSet, "udp dport 53", action, - ), - Comment: ptr.To("process IPv4 traffic destined to a DNS server with network policy enforcement"), - }) - - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "ip6", "saddr", "@", podV6IPsSet, "udp dport 53", action, - ), - Comment: ptr.To("process IPv6 traffic destined to a DNS server with network policy enforcement"), - }) - - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "ip6", "daddr", "@", podV6IPsSet, "udp dport 53", action, - ), - Comment: ptr.To("process IPv6 traffic destined to a DNS server with network policy enforcement"), - }) - } else { - tx.Add(&knftables.Rule{ - Chain: chainName, - Rule: knftables.Concat( - "udp dport 53", action, - ), - }) - } -} - -func (c *Controller) cleanNFTablesRules(ctx context.Context) { - tx := c.nft.NewTransaction() - // Add+Delete is idempotent and won't return an error if the table doesn't already - // exist. - tx.Add(&knftables.Table{}) - tx.Delete(&knftables.Table{}) - - // When this function is called, the ctx is likely cancelled. So - // we only use it for logging, and create a context with timeout - // for nft.Run. There is a grace period of 5s in main, so we keep - // this timeout shorter - nctx, cancel := context.WithTimeout(context.Background(), time.Second*4) - defer cancel() - if err := c.nft.Run(nctx, tx); err != nil { - klog.FromContext(ctx).Error(err, "deleting nftables rules") - } -} diff --git a/pkg/networkpolicy/controller_test.go b/pkg/networkpolicy/controller_test.go index aa310e5..e3594f3 100644 --- a/pkg/networkpolicy/controller_test.go +++ b/pkg/networkpolicy/controller_test.go @@ -1,14 +1,15 @@ package networkpolicy import ( + "context" "testing" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" - "sigs.k8s.io/knftables" npaclientfake "sigs.k8s.io/network-policy-api/pkg/client/clientset/versioned/fake" npainformers "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions" ) @@ -87,6 +88,12 @@ type networkpolicyController struct { nodeStore cache.Store } +type mockInterceptor struct{} + +func (mockInterceptor) Sync(ctx context.Context, podV4IPs sets.Set[string], podV6IPs sets.Set[string]) (_ error) { + return nil +} + func newTestController() *networkpolicyController { client := fake.NewSimpleClientset() informersFactory := informers.NewSharedInformerFactory(client, 0) @@ -94,14 +101,14 @@ func newTestController() *networkpolicyController { npaClient := npaclientfake.NewSimpleClientset() npaInformerFactory := npainformers.NewSharedInformerFactory(npaClient, 0) - controller, err := newController( + controller, err := NewController( client, - knftables.NewFake(knftables.InetFamily, "kube-network-policies"), informersFactory.Networking().V1().NetworkPolicies(), informersFactory.Core().V1().Namespaces(), informersFactory.Core().V1().Pods(), informersFactory.Core().V1().Nodes(), npaClient, + mockInterceptor{}, npaInformerFactory.Policy().V1alpha1().AdminNetworkPolicies(), npaInformerFactory.Policy().V1alpha1().BaselineAdminNetworkPolicies(), Config{ diff --git a/pkg/networkpolicy/metrics.go b/pkg/networkpolicy/metrics.go index 8815489..cc432d7 100644 --- a/pkg/networkpolicy/metrics.go +++ b/pkg/networkpolicy/metrics.go @@ -1,13 +1,7 @@ package networkpolicy import ( - "bufio" "context" - "fmt" - "io" - "os" - "strconv" - "strings" "sync" "github.com/prometheus/client_golang/prometheus" @@ -34,23 +28,6 @@ var ( Name: "packet_count", Help: "Number of packets", }, []string{"protocol", "family", "verdict"}) - - nfqueueQueueTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "nfqueue_queue_total", - Help: "The number of packets currently queued and waiting to be processed by the application", - }, []string{"queue"}) - nfqueueQueueDropped = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "nfqueue_queue_dropped", - Help: "Number of packets that had to be dropped by the kernel because too many packets are already waiting for user space to send back the mandatory accept/drop verdicts", - }, []string{"queue"}) - nfqueueUserDropped = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "nfqueue_user_dropped", - Help: "Number of packets that were dropped within the netlink subsystem. Such drops usually happen when the corresponding socket buffer is full; that is, user space is not able to read messages fast enough", - }, []string{"queue"}) - nfqueuePacketID = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "nfqueue_packet_id", - Help: "ID of the most recent packet queued.", - }, []string{"queue"}) ) var registerMetricsOnce sync.Once @@ -62,96 +39,5 @@ func registerMetrics(ctx context.Context) { prometheus.MustRegister(packetProcessingHist) prometheus.MustRegister(packetProcessingSum) prometheus.MustRegister(packetCounterVec) - prometheus.MustRegister(nfqueueQueueTotal) - prometheus.MustRegister(nfqueueQueueDropped) - prometheus.MustRegister(nfqueueUserDropped) - prometheus.MustRegister(nfqueuePacketID) }) } - -// https://man7.org/linux/man-pages/man5/proc.5.html -type nfnetlinkQueue struct { - queue_number string // The ID of the queue. This matches what is specified in the --queue-num or --queue-balance options. - peer_portid int // The netlink port ID subscribed to the queue. - queue_total int // The number of packets currently queued and waiting to be processed by the application. - copy_mode int // The copy mode of the queue. It is either 1 (metadata only) or 2 (also copy payload data to user space). - copy_range int // Copy range; that is, how many bytes of packet payload should be copied to user space at most. - queue_dropped int // Number of packets that had to be dropped by the kernel because too many packets are already waiting for user space to send back the mandatory accept/drop verdicts. - user_dropped int // Number of packets that were dropped within the netlink subsystem. Such drops usually happen when the corresponding socket buffer is full; that is, user space is not able to read messages fast enough. - id_sequence int // sequence number. Every queued packet is associated with a (32-bit) monotonically increasing sequence number. This shows the ID of the most recent packet queued. - // dummy int // Field is always ‘1’ and is ignored, only kept for compatibility reasons. -} - -func readNfnetlinkQueueStats() ([]nfnetlinkQueue, error) { - const maxBufferSize = 1024 * 1024 - - f, err := os.Open("/proc/net/netfilter/nfnetlink_queue") - if err != nil { - return nil, err - } - defer f.Close() - - entries := []nfnetlinkQueue{} - reader := io.LimitReader(f, maxBufferSize) - - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - fields := strings.Fields(scanner.Text()) - if len(fields) != 9 { - return nil, fmt.Errorf("unexpected number of entries, got %d expected %d", len(fields), 9) - } - - queue_number := fields[0] - - peer_portid, err := parseNfqueueField(fields[1]) - if err != nil { - return nil, err - } - queue_total, err := parseNfqueueField(fields[2]) - if err != nil { - return nil, err - } - copy_mode, err := parseNfqueueField(fields[3]) - if err != nil { - return nil, err - } - copy_range, err := parseNfqueueField(fields[4]) - if err != nil { - return nil, err - } - queue_dropped, err := parseNfqueueField(fields[5]) - if err != nil { - return nil, err - } - user_dropped, err := parseNfqueueField(fields[6]) - if err != nil { - return nil, err - } - id_sequence, err := parseNfqueueField(fields[7]) - if err != nil { - return nil, err - } - - nfqueueEntry := nfnetlinkQueue{ - queue_number: queue_number, - peer_portid: peer_portid, - queue_total: queue_total, - copy_mode: copy_mode, - copy_range: copy_range, - queue_dropped: queue_dropped, - user_dropped: user_dropped, - id_sequence: id_sequence, - } - - entries = append(entries, nfqueueEntry) - } - return entries, nil -} - -func parseNfqueueField(field string) (int, error) { - val, err := strconv.Atoi(field) - if err != nil { - return 0, fmt.Errorf("couldn't parse %q field: %w", field, err) - } - return val, err -} diff --git a/pkg/networkpolicy/networkpolicy_test.go b/pkg/networkpolicy/networkpolicy_test.go index e63fa82..74467b1 100644 --- a/pkg/networkpolicy/networkpolicy_test.go +++ b/pkg/networkpolicy/networkpolicy_test.go @@ -160,7 +160,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy []*networkingv1.NetworkPolicy namespace []*v1.Namespace pod []*v1.Pod - p packet + p Packet expect bool }{ { @@ -168,7 +168,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -182,7 +182,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyIngress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -196,7 +196,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.2.22"), srcPort: 52345, dstIP: net.ParseIP("192.168.1.11"), @@ -210,7 +210,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -224,7 +224,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyIngress, npAllowAllIngress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -238,7 +238,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyIngress, npAllowAllIngressIPBlock}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -252,7 +252,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npAllowAllIngressIPBlock}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("10.0.0.1"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -266,7 +266,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress, npMultiPortEgress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -280,7 +280,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress, npMultiPortEgress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -294,7 +294,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress, npMultiPortEgress, npMultiPortEgressIPBlock}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -308,7 +308,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress, npMultiPortEgress, npMultiPortEgressPodSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -322,7 +322,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress, npMultiPortEgress, npMultiPortEgressPodSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.3.33"), @@ -336,7 +336,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress, npMultiPortEgress, npMultiPortEgressNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -350,7 +350,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress, npMultiPortEgress, npMultiPortEgressNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.3.33"), @@ -364,7 +364,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress, npMultiPortEgress, npMultiPortEgressPodNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -378,7 +378,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyEgress, npMultiPortEgress, npMultiPortEgressPodNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.3.33"), @@ -392,7 +392,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyIngress, npMultiPortIngressPodNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -406,7 +406,7 @@ func TestSyncPacket(t *testing.T) { networkpolicy: []*networkingv1.NetworkPolicy{npDefaultDenyIngress, npMultiPortIngressPodNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.4.44"), diff --git a/pkg/networkpolicy/networkpolicyapi_test.go b/pkg/networkpolicy/networkpolicyapi_test.go index a3cd94f..4b56c0b 100644 --- a/pkg/networkpolicy/networkpolicyapi_test.go +++ b/pkg/networkpolicy/networkpolicyapi_test.go @@ -225,7 +225,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { namespace []*v1.Namespace pod []*v1.Pod node []*v1.Node - p packet + p Packet expect bool }{ { @@ -233,7 +233,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -247,7 +247,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyIngress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -261,7 +261,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyEgress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.2.22"), srcPort: 52345, dstIP: net.ParseIP("192.168.1.11"), @@ -275,7 +275,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyIngress, npaAllowAllIngress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -289,7 +289,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaAllowAllIngressPod}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("10.0.0.1"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -303,7 +303,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyEgress, npaAllowMultiPortEgress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -317,7 +317,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyEgress, npaAllowMultiPortEgressNode}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -331,7 +331,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyEgress, npaAllowMultiPortEgress}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -345,7 +345,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyEgress, npaAllowMultiPortEgressCIDR}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -359,7 +359,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyEgress, npaAllowMultiPortEgressPodSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -373,7 +373,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyEgress, npaMultiPortEgressNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -387,7 +387,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyEgress, npaMultiPortEgressNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.3.33"), @@ -401,7 +401,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyEgress, npaMultiPortEgressPodNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -415,7 +415,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyEgress, npaMultiPortEgressPodNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.3.33"), @@ -429,7 +429,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyIngress, npaMultiPortIngressPodNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.2.22"), @@ -443,7 +443,7 @@ func Test_adminNetworkPolicyAction(t *testing.T) { networkpolicy: []*npav1alpha1.AdminNetworkPolicy{npaDefaultDenyIngress, npaMultiPortIngressPodNsSelector}, namespace: []*v1.Namespace{makeNamespace("foo"), makeNamespace("bar")}, pod: []*v1.Pod{podA, podB, podC, podD}, - p: packet{ + p: Packet{ srcIP: net.ParseIP("192.168.1.11"), srcPort: 52345, dstIP: net.ParseIP("192.168.4.44"), diff --git a/pkg/networkpolicy/packet.go b/pkg/networkpolicy/packet.go index 082a1b5..4480d64 100644 --- a/pkg/networkpolicy/packet.go +++ b/pkg/networkpolicy/packet.go @@ -10,8 +10,9 @@ import ( v1 "k8s.io/api/core/v1" ) -type packet struct { - id uint32 +// what parts of this need to be public? +type Packet struct { + Id uint32 family v1.IPFamily srcIP net.IP dstIP net.IP @@ -24,12 +25,12 @@ type packet struct { var ErrorTooShort = fmt.Errorf("packet too short") var ErrorCorrupted = fmt.Errorf("packet corrupted") -func (p packet) String() string { - return fmt.Sprintf("[%d] %s:%d %s:%d %s\n%s", p.id, p.srcIP.String(), p.srcPort, p.dstIP.String(), p.dstPort, p.proto, hex.Dump(p.payload)) +func (p Packet) String() string { + return fmt.Sprintf("[%d] %s:%d %s:%d %s\n%s", p.Id, p.srcIP.String(), p.srcPort, p.dstIP.String(), p.dstPort, p.proto, hex.Dump(p.payload)) } // This function is used for JSON output (interface logr.Marshaler) -func (p packet) MarshalLog() any { +func (p Packet) MarshalLog() any { return &struct { ID uint32 Family v1.IPFamily @@ -39,7 +40,7 @@ func (p packet) MarshalLog() any { SrcPort int DstPort int }{ - p.id, + p.Id, p.family, p.srcIP, p.dstIP, @@ -52,8 +53,8 @@ func (p packet) MarshalLog() any { // https://en.wikipedia.org/wiki/Internet_Protocol_version_4#Packet_structure // https://en.wikipedia.org/wiki/IPv6_packet // https://github.com/golang/net/blob/master/ipv4/header.go -func parsePacket(b []byte) (packet, error) { - t := packet{} +func ParsePacket(b []byte) (Packet, error) { + t := Packet{} if len(b) < 20 { // 20 is the minimum length of an IPv4 header (IPv6 is 40) return t, ErrorTooShort diff --git a/pkg/networkpolicy/packet_test.go b/pkg/networkpolicy/packet_test.go index 5a7f41c..4c95445 100644 --- a/pkg/networkpolicy/packet_test.go +++ b/pkg/networkpolicy/packet_test.go @@ -25,12 +25,12 @@ func TestUDPFragmentIPv6(t *testing.T) { name string input []byte err bool - expected packet + expected Packet }{ { name: "UDP first fragment", input: packetsUDPFragIPv6[0], - expected: packet{ + expected: Packet{ family: v1.IPv6Protocol, proto: v1.ProtocolUDP, dstIP: net.ParseIP("fd00::c0a8:101"), @@ -40,7 +40,7 @@ func TestUDPFragmentIPv6(t *testing.T) { { name: "UDP not-first fragment", input: packetsUDPFragIPv6[1], - expected: packet{ + expected: Packet{ family: v1.IPv6Protocol, dstIP: net.ParseIP("fd00::c0a8:101"), }, @@ -48,7 +48,7 @@ func TestUDPFragmentIPv6(t *testing.T) { { name: "UDP packet (un-fragmented)", input: packetsUDPFragIPv6[2], - expected: packet{ + expected: Packet{ family: v1.IPv6Protocol, proto: v1.ProtocolUDP, srcIP: net.ParseIP("fd00::c0a8:101"), @@ -57,7 +57,7 @@ func TestUDPFragmentIPv6(t *testing.T) { }, } for _, tc := range tests { - packet, err := parsePacket(tc.input) + packet, err := ParsePacket(tc.input) if err != nil { if !tc.err { t.Fatalf("%s: unexpected error: %v", tc.name, err) @@ -74,12 +74,12 @@ func TestTCPIPv4(t *testing.T) { name string input []byte err bool - expected packet + expected Packet }{ { name: "SYN", input: packetsTCPIPv4[0], - expected: packet{ + expected: Packet{ family: v1.IPv4Protocol, proto: v1.ProtocolTCP, dstIP: net.ParseIP("192.168.1.1"), @@ -90,7 +90,7 @@ func TestTCPIPv4(t *testing.T) { { name: "SYN, ACK", input: packetsTCPIPv4[1], - expected: packet{ + expected: Packet{ family: v1.IPv4Protocol, proto: v1.ProtocolTCP, dstIP: net.ParseIP("192.168.1.201"), @@ -101,7 +101,7 @@ func TestTCPIPv4(t *testing.T) { { name: "ACK 1", input: packetsTCPIPv4[2], - expected: packet{ + expected: Packet{ family: v1.IPv4Protocol, proto: v1.ProtocolTCP, dstIP: net.ParseIP("192.168.1.1"), @@ -112,7 +112,7 @@ func TestTCPIPv4(t *testing.T) { { name: "PSH, ACK", input: packetsTCPIPv4[3], - expected: packet{ + expected: Packet{ family: v1.IPv4Protocol, proto: v1.ProtocolTCP, dstIP: net.ParseIP("192.168.1.201"), @@ -123,7 +123,7 @@ func TestTCPIPv4(t *testing.T) { { name: "ACK 7", input: packetsTCPIPv4[4], - expected: packet{ + expected: Packet{ family: v1.IPv4Protocol, proto: v1.ProtocolTCP, dstIP: net.ParseIP("192.168.1.1"), @@ -134,7 +134,7 @@ func TestTCPIPv4(t *testing.T) { { name: "FIN, ACK 7", input: packetsTCPIPv4[5], - expected: packet{ + expected: Packet{ family: v1.IPv4Protocol, proto: v1.ProtocolTCP, dstIP: net.ParseIP("192.168.1.1"), @@ -145,7 +145,7 @@ func TestTCPIPv4(t *testing.T) { { name: "FIN, ACK 2", input: packetsTCPIPv4[6], - expected: packet{ + expected: Packet{ family: v1.IPv4Protocol, proto: v1.ProtocolTCP, dstIP: net.ParseIP("192.168.1.201"), @@ -156,7 +156,7 @@ func TestTCPIPv4(t *testing.T) { { name: "ACK 8", input: packetsTCPIPv4[7], - expected: packet{ + expected: Packet{ family: v1.IPv4Protocol, proto: v1.ProtocolTCP, dstIP: net.ParseIP("192.168.1.1"), @@ -166,7 +166,7 @@ func TestTCPIPv4(t *testing.T) { }, } for _, tc := range tests { - packet, err := parsePacket(tc.input) + packet, err := ParsePacket(tc.input) if err != nil { if !tc.err { t.Fatalf("%s: unexpected error: %v", tc.name, err) @@ -182,23 +182,23 @@ func TestTooShortPackets(t *testing.T) { rawPacket := packetsUDPFragIPv6[0] var err error // Test a nil packet - _, err = parsePacket(nil) + _, err = ParsePacket(nil) if err == nil { t.Fatalf("No error when parsing a nil-packet") } for i := 0; i < 56; i++ { - _, err = parsePacket(rawPacket[:i]) + _, err = ParsePacket(rawPacket[:i]) if err == nil { t.Fatalf("No error when parsing a packet, length=%d", i) } } - _, err = parsePacket(rawPacket[:56]) + _, err = ParsePacket(rawPacket[:56]) if err != nil { t.Fatalf("Error when parsing a complete packet") } } -func comparePacket(t *testing.T, tc string, expected, got packet) { +func comparePacket(t *testing.T, tc string, expected, got Packet) { if got.family != expected.family { t.Fatalf("%s: family: expected=%v, got=%v", tc, expected.family, got.family) } diff --git a/pkg/networkpolicy/verdict_string.go b/pkg/networkpolicy/verdict_string.go new file mode 100644 index 0000000..3d3735a --- /dev/null +++ b/pkg/networkpolicy/verdict_string.go @@ -0,0 +1,24 @@ +// Code generated by "stringer -type=Verdict"; DO NOT EDIT. + +package networkpolicy + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Drop-0] + _ = x[Accept-1] +} + +const _Verdict_name = "DropAccept" + +var _Verdict_index = [...]uint8{0, 4, 10} + +func (i Verdict) String() string { + if i < 0 || i >= Verdict(len(_Verdict_index)-1) { + return "Verdict(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _Verdict_name[_Verdict_index[i]:_Verdict_index[i+1]] +} diff --git a/pkg/nfqinterceptor/metrics.go b/pkg/nfqinterceptor/metrics.go new file mode 100644 index 0000000..f842d0c --- /dev/null +++ b/pkg/nfqinterceptor/metrics.go @@ -0,0 +1,134 @@ +package nfqinterceptor + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "strconv" + "strings" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "k8s.io/klog/v2" +) + +var ( + nfqueueQueueTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "nfqueue_queue_total", + Help: "The number of packets currently queued and waiting to be processed by the application", + }, []string{"queue"}) + nfqueueQueueDropped = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "nfqueue_queue_dropped", + Help: "Number of packets that had to be dropped by the kernel because too many packets are already waiting for user space to send back the mandatory accept/drop verdicts", + }, []string{"queue"}) + nfqueueUserDropped = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "nfqueue_user_dropped", + Help: "Number of packets that were dropped within the netlink subsystem. Such drops usually happen when the corresponding socket buffer is full; that is, user space is not able to read messages fast enough", + }, []string{"queue"}) + nfqueuePacketID = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "nfqueue_packet_id", + Help: "ID of the most recent packet queued.", + }, []string{"queue"}) +) + +var registerMetricsOnce sync.Once + +// RegisterMetrics registers kube-proxy metrics. +func registerMetrics(ctx context.Context) { + registerMetricsOnce.Do(func() { + klog.Infof("Registering metrics") + prometheus.MustRegister(nfqueueQueueTotal) + prometheus.MustRegister(nfqueueQueueDropped) + prometheus.MustRegister(nfqueueUserDropped) + prometheus.MustRegister(nfqueuePacketID) + }) +} + +// https://man7.org/linux/man-pages/man5/proc.5.html +type nfnetlinkQueue struct { + queue_number string // The ID of the queue. This matches what is specified in the --queue-num or --queue-balance options. + peer_portid int // The netlink port ID subscribed to the queue. + queue_total int // The number of packets currently queued and waiting to be processed by the application. + copy_mode int // The copy mode of the queue. It is either 1 (metadata only) or 2 (also copy payload data to user space). + copy_range int // Copy range; that is, how many bytes of packet payload should be copied to user space at most. + queue_dropped int // Number of packets that had to be dropped by the kernel because too many packets are already waiting for user space to send back the mandatory accept/drop verdicts. + user_dropped int // Number of packets that were dropped within the netlink subsystem. Such drops usually happen when the corresponding socket buffer is full; that is, user space is not able to read messages fast enough. + id_sequence int // sequence number. Every queued packet is associated with a (32-bit) monotonically increasing sequence number. This shows the ID of the most recent packet queued. + // dummy int // Field is always ‘1’ and is ignored, only kept for compatibility reasons. +} + +func readNfnetlinkQueueStats() ([]nfnetlinkQueue, error) { + const maxBufferSize = 1024 * 1024 + + f, err := os.Open("/proc/net/netfilter/nfnetlink_queue") + if err != nil { + return nil, err + } + defer f.Close() + + entries := []nfnetlinkQueue{} + reader := io.LimitReader(f, maxBufferSize) + + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + fields := strings.Fields(scanner.Text()) + if len(fields) != 9 { + return nil, fmt.Errorf("unexpected number of entries, got %d expected %d", len(fields), 9) + } + + queue_number := fields[0] + + peer_portid, err := parseNfqueueField(fields[1]) + if err != nil { + return nil, err + } + queue_total, err := parseNfqueueField(fields[2]) + if err != nil { + return nil, err + } + copy_mode, err := parseNfqueueField(fields[3]) + if err != nil { + return nil, err + } + copy_range, err := parseNfqueueField(fields[4]) + if err != nil { + return nil, err + } + queue_dropped, err := parseNfqueueField(fields[5]) + if err != nil { + return nil, err + } + user_dropped, err := parseNfqueueField(fields[6]) + if err != nil { + return nil, err + } + id_sequence, err := parseNfqueueField(fields[7]) + if err != nil { + return nil, err + } + + nfqueueEntry := nfnetlinkQueue{ + queue_number: queue_number, + peer_portid: peer_portid, + queue_total: queue_total, + copy_mode: copy_mode, + copy_range: copy_range, + queue_dropped: queue_dropped, + user_dropped: user_dropped, + id_sequence: id_sequence, + } + + entries = append(entries, nfqueueEntry) + } + return entries, nil +} + +func parseNfqueueField(field string) (int, error) { + val, err := strconv.Atoi(field) + if err != nil { + return 0, fmt.Errorf("couldn't parse %q field: %w", field, err) + } + return val, err +} diff --git a/pkg/nfqinterceptor/nfqinterceptor.go b/pkg/nfqinterceptor/nfqinterceptor.go new file mode 100644 index 0000000..97112ee --- /dev/null +++ b/pkg/nfqinterceptor/nfqinterceptor.go @@ -0,0 +1,401 @@ +package nfqinterceptor + +import ( + "context" + "fmt" + "time" + + nfqueue "github.com/florianl/go-nfqueue" + "github.com/mdlayher/netlink" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" + + "sigs.k8s.io/knftables" + "sigs.k8s.io/kube-network-policies/pkg/networkpolicy" +) + +// Network policies are hard to implement efficiently, and in large clusters this is +// translated to performance and scalability problems. Most of the existing +// implementations use the same approach of processing the APIs and transforming them in +// the corresponding dataplane implementation; commonly this may be iptables, nftables, +// ebpf or ovs. This takes a different approach, it uses the NFQUEUE functionality +// implemented in netfilter to process the first packet of each connection in userspace +// and emit a verdict. The advantage is that the dataplane implementation does not need to +// represent all the complex logic. There are also some performance improvements that can +// be applied, such as to restrict the packets that are sent to userspace to the ones that +// have network policies only. This effectively means that network policies are applied +// ONLY at the time the connection is initatied by whatever the conntrack kernel +// understand by NEW connection. +// +// https://home.regit.org/netfilter-en/using-nfqueue-and-libnetfilter_queue/ +// https://netfilter.org/projects/libnetfilter_queue/doxygen/html/ + +const ( + controllerName = "kube-network-policies" + podIPIndex = "podIPKeyIndex" + syncKey = "dummy-key" // use the same key to sync to aggregate the events + podV4IPsSet = "podips-v4" + podV6IPsSet = "podips-v6" +) + +// should probably take its own config? +func New(config networkpolicy.Config) (*nfqInterceptor, error) { + nft, err := knftables.New(knftables.InetFamily, config.NFTableName) + if err != nil { + return nil, err + } + return &nfqInterceptor{ + //nfq isn't populated till run which is wierd + nft: nft, + FailOpen: config.FailOpen, + queueid: config.QueueID, + NetfilterBug1766Fix: config.NetfilterBug1766Fix, + interceptAll: config.AdminNetworkPolicy || config.BaselineAdminNetworkPolicy, + }, nil +} + +type nfqInterceptor struct { + nft knftables.Interface // install the necessary nftables rules + flushed bool + FailOpen bool + queueid int + NetfilterBug1766Fix bool + interceptAll bool //!c.config.AdminNetworkPolicy && !c.config.BaselineAdminNetworkPolicy +} + +func (n *nfqInterceptor) Run(ctx context.Context, renderVerdict func(context.Context, networkpolicy.Packet) networkpolicy.Verdict) error { + logger := klog.FromContext(ctx) + registerMetrics(ctx) + go wait.UntilWithContext(ctx, func(ctx context.Context) { + queues, err := readNfnetlinkQueueStats() + if err != nil { + logger.Error(err, "reading nfqueue stats") + return + } + logger.V(4).Info("Obtained metrics for queues", "nqueues", len(queues)) + for _, q := range queues { + logger.V(4).Info("Updating metrics", "queue", q.id_sequence) + nfqueueQueueTotal.WithLabelValues(q.queue_number).Set(float64(q.queue_total)) + nfqueueQueueDropped.WithLabelValues(q.queue_number).Set(float64(q.queue_dropped)) + nfqueueUserDropped.WithLabelValues(q.queue_number).Set(float64(q.user_dropped)) + nfqueuePacketID.WithLabelValues(q.queue_number).Set(float64(q.id_sequence)) + } + + }, 30*time.Second) + + var flags uint32 + // https://netfilter.org/projects/libnetfilter_queue/doxygen/html/group__Queue.html + // the kernel will not normalize offload packets, + // i.e. your application will need to be able to handle packets larger than the mtu. + // Normalization is expensive, so this flag should always be set. + // This also solves a bug with SCTP + // https://github.com/aojea/kube-netpol/issues/8 + // https://bugzilla.netfilter.org/show_bug.cgi?id=1742 + flags = nfqueue.NfQaCfgFlagGSO + if n.FailOpen { + flags += nfqueue.NfQaCfgFlagFailOpen + } + + // Set configuration options for nfqueue + config := nfqueue.Config{ + NfQueue: uint16(n.queueid), + Flags: flags, + MaxPacketLen: 128, // only interested in the headers + MaxQueueLen: 1024, + Copymode: nfqueue.NfQnlCopyPacket, // headers + WriteTimeout: 100 * time.Millisecond, + } + nf, err := nfqueue.Open(&config) + if err != nil { + logger.Info("could not open nfqueue socket", "error", err) + return err + } + + defer func() { + n.cleanNFTablesRules(ctx) + nf.Close() + }() + + logger.Info("Syncing nftables rules") + _ = n.Sync(ctx, sets.Set[string]{}, sets.Set[string]{}) //why bother with empties? + + fn := func(a nfqueue.Attribute) int { + verdict := networkpolicy.Drop + if n.FailOpen { + verdict = networkpolicy.Accept + } + + packet, err := networkpolicy.ParsePacket(*a.Payload) + if err != nil { + logger.Error(err, "Can not process packet, applying default policy", "id", *a.PacketID, "failOpen", n.FailOpen) + nf.SetVerdict(packet.Id, int(verdict)) + return 0 + } + packet.Id = *a.PacketID + verdict = renderVerdict(ctx, packet) + // log error and return default if not Accept or Drop? + nf.SetVerdict(packet.Id, int(verdict)) + return 0 + } + // Register your function to listen on nflog group 100 + err = nf.RegisterWithErrorFunc(ctx, fn, func(err error) int { + if opError, ok := err.(*netlink.OpError); ok { + if opError.Timeout() || opError.Temporary() { + return 0 + } + } + logger.Info("Could not receive message", "error", err) + return 0 + }) + if err != nil { + logger.Info("could not open nfqueue socket", "error", err) + return err + } + + //wait here or we'll cleanup nftable rukes and close the socket + <-ctx.Done() + + return nil +} + +// Sync adds the necessary rules to process the first connection packets in userspace +// and check if network policies must apply. +// TODO: We can divert only the traffic affected by network policies using a set in nftables or an IPset. +func (c *nfqInterceptor) Sync(ctx context.Context, podV4IPs, podV6IPs sets.Set[string]) error { + table := &knftables.Table{ + Comment: knftables.PtrTo("rules for kubernetes NetworkPolicy"), + } + tx := c.nft.NewTransaction() + // do it once to delete the existing table + if !c.flushed { + tx.Add(table) + tx.Delete(table) + c.flushed = true + } + tx.Add(table) + + // only if no admin network policies are used + if !c.interceptAll { + // add set with Local Pod IPs impacted by network policies + tx.Add(&knftables.Set{ + Name: podV4IPsSet, + Type: "ipv4_addr", + Comment: ptr.To("Local V4 Pod IPs with Network Policies"), + }) + tx.Flush(&knftables.Set{ + Name: podV4IPsSet, + }) + tx.Add(&knftables.Set{ + Name: podV6IPsSet, + Type: "ipv6_addr", + Comment: ptr.To("Local V6 Pod IPs with Network Policies"), + }) + tx.Flush(&knftables.Set{ + Name: podV6IPsSet, + }) + + for _, ip := range podV4IPs.UnsortedList() { + tx.Add(&knftables.Element{ + Set: podV4IPsSet, + Key: []string{ip}, + }) + } + for _, ip := range podV6IPs.UnsortedList() { + tx.Add(&knftables.Element{ + Set: podV6IPsSet, + Key: []string{ip}, + }) + } + } + // Process the packets that are, usually on the FORWARD hook, but + // IPVS packets follow a different path in netfilter, so we process + // everything in the POSTROUTING hook before SNAT happens. + // Ref: https://github.com/kubernetes-sigs/kube-network-policies/issues/46 + hook := knftables.PostroutingHook + chainName := string(hook) + tx.Add(&knftables.Chain{ + Name: chainName, + Type: knftables.PtrTo(knftables.FilterType), + Hook: knftables.PtrTo(hook), + Priority: knftables.PtrTo(knftables.SNATPriority + "-5"), + }) + tx.Flush(&knftables.Chain{ + Name: chainName, + }) + + // DNS is processed by addDNSRacersWorkaroundRules() + // TODO: remove once kernel fix is on most distros + if c.NetfilterBug1766Fix { + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: "udp dport 53 accept", + Comment: ptr.To("process DNS traffic on PREROUTING hook with network policy enforcement to avoid netfilter race condition bug"), + }) + } + + // IPv6 needs ICMP Neighbor Discovery to work + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "icmpv6", "type", "{", "nd-neighbor-solicit, nd-neighbor-advert", "}", "accept"), + }) + // Don't process traffic generated from the root user in the Node, it can block kubelet probes + // or system daemons that depend on the internal node traffic to not be blocked. + // Ref: https://github.com/kubernetes-sigs/kube-network-policies/issues/65 + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: "meta skuid 0 accept", + }) + // instead of aggregating all the expresion in one rule, use two different + // rules to understand if is causing issues with UDP packets with the same + // tuple (https://github.com/kubernetes-sigs/kube-network-policies/issues/12) + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ct", "state", "established,related", "accept"), + }) + + action := fmt.Sprintf("queue num %d", c.queueid) + if c.FailOpen { + action += " bypass" + } + + // only if no admin network policies are used + if !c.interceptAll { + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ip", "saddr", "@", podV4IPsSet, action, + ), + Comment: ptr.To("process IPv4 traffic with network policy enforcement"), + }) + + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ip", "daddr", "@", podV4IPsSet, action, + ), + Comment: ptr.To("process IPv4 traffic with network policy enforcement"), + }) + + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ip6", "saddr", "@", podV6IPsSet, action, + ), + Comment: ptr.To("process IPv6 traffic with network policy enforcement"), + }) + + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ip6", "daddr", "@", podV6IPsSet, action, + ), + Comment: ptr.To("process IPv6 traffic with network policy enforcement"), + }) + } else { + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: action, + }) + } + + if c.NetfilterBug1766Fix { + c.addDNSRacersWorkaroundRules(ctx, tx) + } + + if err := c.nft.Run(ctx, tx); err != nil { + klog.FromContext(ctx).Info("syncing nftables rules", "error", err) + return err + } + return nil +} + +// To avoid a kernel bug caused by UDP DNS request racing with conntrack +// process the DNS packets only on the PREROUTING hook after DNAT happens +// so we can see the resolved destination IPs, typically the ones of the Pods +// that are used for the Kubernetes DNS Service. +// xref: https://github.com/kubernetes-sigs/kube-network-policies/issues/12 +// This can be removed once all kernels contain the fix in +// https://github.com/torvalds/linux/commit/8af79d3edb5fd2dce35ea0a71595b6d4f9962350 +// TODO: remove once kernel fix is on most distros +func (c *nfqInterceptor) addDNSRacersWorkaroundRules(ctx context.Context, tx *knftables.Transaction) { + hook := knftables.PreroutingHook + chainName := string(hook) + tx.Add(&knftables.Chain{ + Name: chainName, + Type: knftables.PtrTo(knftables.FilterType), + Hook: knftables.PtrTo(hook), + Priority: knftables.PtrTo(knftables.DNATPriority + "+5"), + }) + tx.Flush(&knftables.Chain{ + Name: chainName, + }) + + action := fmt.Sprintf("queue num %d", c.queueid) + if c.FailOpen { + action += " bypass" + } + + if !c.interceptAll { + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ip", "saddr", "@", podV4IPsSet, "udp dport 53", action, + ), + Comment: ptr.To("process IPv4 traffic destined to a DNS server with network policy enforcement"), + }) + + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ip", "daddr", "@", podV4IPsSet, "udp dport 53", action, + ), + Comment: ptr.To("process IPv4 traffic destined to a DNS server with network policy enforcement"), + }) + + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ip6", "saddr", "@", podV6IPsSet, "udp dport 53", action, + ), + Comment: ptr.To("process IPv6 traffic destined to a DNS server with network policy enforcement"), + }) + + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ip6", "daddr", "@", podV6IPsSet, "udp dport 53", action, + ), + Comment: ptr.To("process IPv6 traffic destined to a DNS server with network policy enforcement"), + }) + } else { + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "udp dport 53", action, + ), + }) + } +} + +func (c *nfqInterceptor) cleanNFTablesRules(ctx context.Context) { + tx := c.nft.NewTransaction() + // Add+Delete is idempotent and won't return an error if the table doesn't already + // exist. + tx.Add(&knftables.Table{}) + tx.Delete(&knftables.Table{}) + + // When this function is called, the ctx is likely cancelled. So + // we only use it for logging, and create a context with timeout + // for nft.Run. There is a grace period of 5s in main, so we keep + // this timeout shorter + nctx, cancel := context.WithTimeout(context.Background(), time.Second*4) + defer cancel() + if err := c.nft.Run(nctx, tx); err != nil { + klog.FromContext(ctx).Error(err, "deleting nftables rules") + } +}