Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove inactive connections' IP addresses from tcp_connection_remote metric #388

Merged
merged 11 commits into from
Jun 4, 2024
10 changes: 6 additions & 4 deletions pkg/plugin/linuxutil/linuxutil_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ func (lu *linuxUtil) run(ctx context.Context) error {
return nil
case <-ticker.C:
opts := &NetstatOpts{
CuratedKeys: false,
AddZeroVal: false,
ListenSock: false,
CuratedKeys: false,
AddZeroVal: false,
ListenSock: false,
PrevTCPSockStats: lu.prevTCPSockStats,
}
var wg sync.WaitGroup

Expand All @@ -76,10 +77,11 @@ func (lu *linuxUtil) run(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
err := nsReader.readAndUpdate()
tcpSocketStats, err := nsReader.readAndUpdate()
if err != nil {
lu.l.Error("Reading netstat failed", zap.Error(err))
}
lu.prevTCPSockStats = tcpSocketStats
}()

ethtoolOpts := &EthtoolOpts{
Expand Down
43 changes: 31 additions & 12 deletions pkg/plugin/linuxutil/netstat_stats_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package linuxutil

import (
"fmt"
"net/netip"
"os"
"strconv"
"strings"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/metrics"
"github.com/microsoft/retina/pkg/utils"
"github.com/pkg/errors"
"go.uber.org/zap"
)

Expand All @@ -38,20 +40,20 @@ func NewNetstatReader(opts *NetstatOpts, ns NetstatInterface) *NetstatReader {
}
}

func (nr *NetstatReader) readAndUpdate() error {
func (nr *NetstatReader) readAndUpdate() (*SocketStats, error) {
if err := nr.readConnectionStats(pathNetNetstat); err != nil {
return err
return nil, err
}

// Get Socket stats
if err := nr.readSockStats(); err != nil {
return err
return nil, err
}

nr.updateMetrics()
nr.l.Debug("Done reading and updating connections stats")

return nil
return &nr.connStats.TcpSockets, nil
}

//nolint:gomnd // magic numbers are sufficiently explained in this function
Expand Down Expand Up @@ -184,6 +186,25 @@ func (nr *NetstatReader) readSockStats() error {
return err
} else {
sockStats := processSocks(socks)
// Compare existing tcp socket connections with updated ones, remove the ones that are not seen in the new sockStats map
// Log the socketByRemoteAddr map
if nr.opts.PrevTCPSockStats != nil {
for remoteAddr := range nr.opts.PrevTCPSockStats.socketByRemoteAddr {
addrPort, err := netip.ParseAddrPort(remoteAddr)
if err != nil {
return errors.Wrapf(err, "failed to parse remote address %s", remoteAddr)
}
addr := addrPort.Addr().String()
port := strconv.Itoa(int(addrPort.Port()))
// Check if the remote address is in the new sockStats map
if _, ok := sockStats.socketByRemoteAddr[remoteAddr]; !ok {
nr.l.Debug("Removing remote address from metrics", zap.String("remoteAddr", remoteAddr))
// If not, set the value to 0
metrics.TCPConnectionRemoteGauge.WithLabelValues(addr, port).Set(0)
}
}
}

nr.connStats.TcpSockets = *sockStats
}

Expand Down Expand Up @@ -231,15 +252,13 @@ func (nr *NetstatReader) updateMetrics() {
}

for remoteAddr, v := range nr.connStats.TcpSockets.socketByRemoteAddr {
addr := ""
port := ""
splitAddr := strings.Split(remoteAddr, ":")
if len(splitAddr) == 2 {
addr = splitAddr[0]
port = splitAddr[1]
} else {
addr = remoteAddr
addrPort, err := netip.ParseAddrPort(remoteAddr)
if err != nil {
nr.l.Error("Failed to parse remote address", zap.Error(err))
continue
}
addr := addrPort.Addr().String()
port := strconv.Itoa(int(addrPort.Port()))
if !validateRemoteAddr(addr) {
continue
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/plugin/linuxutil/netstat_stats_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/microsoft/retina/pkg/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
gomock "go.uber.org/mock/gomock"
)

Expand Down Expand Up @@ -335,3 +336,44 @@ func TestReadSockStats(t *testing.T) {

nr.updateMetrics()
}

// Test IP that belongs to a closed connection being removed from the metrics
func TestReadSockStatsRemoveClosedConnection(t *testing.T) {
_, err := log.SetupZapLogger(log.GetDefaultLogOpts())
require.NoError(t, err)
opts := &NetstatOpts{
CuratedKeys: false,
AddZeroVal: false,
ListenSock: false,
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()

ns := NewMockNetstatInterface(ctrl)
nr := NewNetstatReader(opts, ns)
assert.NotNil(t, nr)
InitalizeMetricsForTesting(ctrl)

testmetric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "testmetric",
Help: "testmetric",
})

// Initial value
nr.opts.PrevTCPSockStats = &SocketStats{
socketByRemoteAddr: map[string]int{
"127.0.0.1:80": 1,
},
}

// Latest read would not contain the IP in PrevTCPSockStats
ns.EXPECT().TCPSocks(gomock.Any()).Return([]netstat.SockTabEntry{}, nil).Times(1)
ns.EXPECT().UDPSocks(gomock.Any()).Return([]netstat.SockTabEntry{}, nil).Times(1)

// We are expecting the gauge to be called once for this value as it is removed
MockGaugeVec.EXPECT().WithLabelValues("127.0.0.1", "80").Return(testmetric).Times(1)

err = nr.readSockStats()
require.NoError(t, err)
}
10 changes: 7 additions & 3 deletions pkg/plugin/linuxutil/types_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ const (

//go:generate go run go.uber.org/mock/[email protected] -source=types_linux.go -destination=linuxutil_mock_generated.go -package=linuxutil
type linuxUtil struct {
cfg *kcfg.Config
l *log.ZapLogger
isRunning bool
cfg *kcfg.Config
l *log.ZapLogger
isRunning bool
prevTCPSockStats *SocketStats
}

var netstatCuratedKeys = map[string]struct{}{
Expand Down Expand Up @@ -85,6 +86,9 @@ type NetstatOpts struct {

// get only listening sockets
ListenSock bool

// previous TCP socket stats
PrevTCPSockStats *SocketStats
}

type EthtoolStats struct {
Expand Down
Loading