diff --git a/pkg/plugin/linuxutil/linuxutil_linux.go b/pkg/plugin/linuxutil/linuxutil_linux.go index 1999675d4d..ec89eb254d 100644 --- a/pkg/plugin/linuxutil/linuxutil_linux.go +++ b/pkg/plugin/linuxutil/linuxutil_linux.go @@ -65,9 +65,10 @@ func (lu *linuxUtil) run(ctx context.Context) error { return nil case <-ticker.C: opts := &NetstatOpts{ - CuratedKeys: false, - AddZeroVal: false, - ListenSock: false, + CuratedKeys: false, + AddZeroVal: false, + ListenSock: false, + PrevTCPSockStats: lu.prevTCPSockStats, } var wg sync.WaitGroup @@ -76,10 +77,11 @@ func (lu *linuxUtil) run(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - err := nsReader.readAndUpdate() + tcpSocketStats, err := nsReader.readAndUpdate() if err != nil { lu.l.Error("Reading netstat failed", zap.Error(err)) } + lu.prevTCPSockStats = tcpSocketStats }() ethtoolOpts := &EthtoolOpts{ diff --git a/pkg/plugin/linuxutil/netstat_stats_linux.go b/pkg/plugin/linuxutil/netstat_stats_linux.go index 10c9ab65a3..f3fb79ffec 100644 --- a/pkg/plugin/linuxutil/netstat_stats_linux.go +++ b/pkg/plugin/linuxutil/netstat_stats_linux.go @@ -4,6 +4,7 @@ package linuxutil import ( "fmt" + "net/netip" "os" "strconv" "strings" @@ -12,6 +13,7 @@ import ( "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/utils" + "github.com/pkg/errors" "go.uber.org/zap" ) @@ -38,20 +40,20 @@ func NewNetstatReader(opts *NetstatOpts, ns NetstatInterface) *NetstatReader { } } -func (nr *NetstatReader) readAndUpdate() error { +func (nr *NetstatReader) readAndUpdate() (*SocketStats, error) { if err := nr.readConnectionStats(pathNetNetstat); err != nil { - return err + return nil, err } // Get Socket stats if err := nr.readSockStats(); err != nil { - return err + return nil, err } nr.updateMetrics() nr.l.Debug("Done reading and updating connections stats") - return nil + return &nr.connStats.TcpSockets, nil } //nolint:gomnd // magic numbers are sufficiently explained in this function @@ -184,6 +186,25 @@ func (nr *NetstatReader) readSockStats() error { return err } else { sockStats := processSocks(socks) + // Compare existing tcp socket connections with updated ones, remove the ones that are not seen in the new sockStats map + // Log the socketByRemoteAddr map + if nr.opts.PrevTCPSockStats != nil { + for remoteAddr := range nr.opts.PrevTCPSockStats.socketByRemoteAddr { + addrPort, err := netip.ParseAddrPort(remoteAddr) + if err != nil { + return errors.Wrapf(err, "failed to parse remote address %s", remoteAddr) + } + addr := addrPort.Addr().String() + port := strconv.Itoa(int(addrPort.Port())) + // Check if the remote address is in the new sockStats map + if _, ok := sockStats.socketByRemoteAddr[remoteAddr]; !ok { + nr.l.Debug("Removing remote address from metrics", zap.String("remoteAddr", remoteAddr)) + // If not, set the value to 0 + metrics.TCPConnectionRemoteGauge.WithLabelValues(addr, port).Set(0) + } + } + } + nr.connStats.TcpSockets = *sockStats } @@ -231,15 +252,13 @@ func (nr *NetstatReader) updateMetrics() { } for remoteAddr, v := range nr.connStats.TcpSockets.socketByRemoteAddr { - addr := "" - port := "" - splitAddr := strings.Split(remoteAddr, ":") - if len(splitAddr) == 2 { - addr = splitAddr[0] - port = splitAddr[1] - } else { - addr = remoteAddr + addrPort, err := netip.ParseAddrPort(remoteAddr) + if err != nil { + nr.l.Error("Failed to parse remote address", zap.Error(err)) + continue } + addr := addrPort.Addr().String() + port := strconv.Itoa(int(addrPort.Port())) if !validateRemoteAddr(addr) { continue } diff --git a/pkg/plugin/linuxutil/netstat_stats_linux_test.go b/pkg/plugin/linuxutil/netstat_stats_linux_test.go index fc5dabcdd4..7491e994a6 100644 --- a/pkg/plugin/linuxutil/netstat_stats_linux_test.go +++ b/pkg/plugin/linuxutil/netstat_stats_linux_test.go @@ -11,6 +11,7 @@ import ( "github.com/microsoft/retina/pkg/log" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" gomock "go.uber.org/mock/gomock" ) @@ -335,3 +336,44 @@ func TestReadSockStats(t *testing.T) { nr.updateMetrics() } + +// Test IP that belongs to a closed connection being removed from the metrics +func TestReadSockStatsRemoveClosedConnection(t *testing.T) { + _, err := log.SetupZapLogger(log.GetDefaultLogOpts()) + require.NoError(t, err) + opts := &NetstatOpts{ + CuratedKeys: false, + AddZeroVal: false, + ListenSock: false, + } + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ns := NewMockNetstatInterface(ctrl) + nr := NewNetstatReader(opts, ns) + assert.NotNil(t, nr) + InitalizeMetricsForTesting(ctrl) + + testmetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "testmetric", + Help: "testmetric", + }) + + // Initial value + nr.opts.PrevTCPSockStats = &SocketStats{ + socketByRemoteAddr: map[string]int{ + "127.0.0.1:80": 1, + }, + } + + // Latest read would not contain the IP in PrevTCPSockStats + ns.EXPECT().TCPSocks(gomock.Any()).Return([]netstat.SockTabEntry{}, nil).Times(1) + ns.EXPECT().UDPSocks(gomock.Any()).Return([]netstat.SockTabEntry{}, nil).Times(1) + + // We are expecting the gauge to be called once for this value as it is removed + MockGaugeVec.EXPECT().WithLabelValues("127.0.0.1", "80").Return(testmetric).Times(1) + + err = nr.readSockStats() + require.NoError(t, err) +} diff --git a/pkg/plugin/linuxutil/types_linux.go b/pkg/plugin/linuxutil/types_linux.go index 266999cd0b..08f1af74e6 100644 --- a/pkg/plugin/linuxutil/types_linux.go +++ b/pkg/plugin/linuxutil/types_linux.go @@ -15,9 +15,10 @@ const ( //go:generate go run go.uber.org/mock/mockgen@v0.4.0 -source=types_linux.go -destination=linuxutil_mock_generated.go -package=linuxutil type linuxUtil struct { - cfg *kcfg.Config - l *log.ZapLogger - isRunning bool + cfg *kcfg.Config + l *log.ZapLogger + isRunning bool + prevTCPSockStats *SocketStats } var netstatCuratedKeys = map[string]struct{}{ @@ -85,6 +86,9 @@ type NetstatOpts struct { // get only listening sockets ListenSock bool + + // previous TCP socket stats + PrevTCPSockStats *SocketStats } type EthtoolStats struct {