Skip to content

Commit

Permalink
NETOBSERV-1566: ipfix: make RTT optional (#630)
Browse files Browse the repository at this point in the history
- refactor IPFIX fields mapping / definition
- allow optional fields
- add interfaces and directions (plural) fields to IPFIX template
- add tests for partial records & non-enriched records
  • Loading branch information
jotak authored Mar 15, 2024
1 parent 4a8953c commit af02097
Show file tree
Hide file tree
Showing 2 changed files with 364 additions and 206 deletions.
191 changes: 145 additions & 46 deletions pkg/pipeline/write/testnorace/write_ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
},
TimeFlowStart: timestamppb.New(startTime),
TimeFlowEnd: timestamppb.New(endTime),
Interface: "eth0",

AgentIp: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x0a090807},
},
Expand All @@ -73,6 +73,10 @@ var (
Interface: "eth0",
Direction: pbflow.Direction_EGRESS,
},
{
Interface: "a1234567",
Direction: pbflow.Direction_INGRESS,
},
},
}
)
Expand Down Expand Up @@ -115,21 +119,82 @@ func TestEnrichedIPFIXFlow(t *testing.T) {
cp.CloseMsgChan()
cp.Stop()

expectedFields := append(write.IPv4IANAFields, write.KubeFields...)
expectedFields = append(expectedFields, write.CustomNetworkFields...)

// Check template
assert.Equal(t, uint16(10), tplv4Msg.GetVersion())
templateSet := tplv4Msg.GetSet()
templateElements := templateSet.GetRecords()[0].GetOrderedElementList()
assert.Len(t, templateElements, 21)
assert.Len(t, templateElements, len(expectedFields))
assert.Equal(t, uint32(0), templateElements[0].GetInfoElement().EnterpriseId)

// Check data
assert.Equal(t, uint16(10), dataMsg.GetVersion())
dataSet := dataMsg.GetSet()
record := dataSet.GetRecords()[0]

for _, name := range expectedFields {
element, _, exist := record.GetInfoElementWithValue(name)
assert.Truef(t, exist, "element with name %s should exist in the record", name)
assert.NotNil(t, element)
matchElement(t, element, flow)
}
}

func TestEnrichedIPFIXPartialFlow(t *testing.T) {
cp := startCollector(t)
addr := cp.GetAddress().(*net.UDPAddr)

flow := decode.PBFlowToMap(&FullPBFlow)

// Add partial enrichment
flow["SrcK8S_Name"] = "pod A"
flow["SrcK8S_Namespace"] = "ns1"
flow["SrcK8S_HostName"] = "node1"

// Remove a field
delete(flow, "TimeFlowRttNs")

writer, err := write.NewWriteIpfix(config.StageParam{
Write: &config.Write{
Ipfix: &api.WriteIpfix{
TargetHost: addr.IP.String(),
TargetPort: addr.Port,
Transport: addr.Network(),
EnterpriseID: 9999,
},
},
})
require.NoError(t, err)

writer.Write(flow)

// Read collector
// 1st = IPv4 template
tplv4Msg := <-cp.GetMsgChan()
// 2nd = IPv6 template (ignore)
<-cp.GetMsgChan()
// 3rd = data record
dataMsg := <-cp.GetMsgChan()
cp.CloseMsgChan()
cp.Stop()

expectedFields := append(write.IPv4IANAFields, write.KubeFields...)
expectedFields = append(expectedFields, write.CustomNetworkFields...)

// Check template
assert.Equal(t, uint16(10), tplv4Msg.GetVersion())
templateSet := tplv4Msg.GetSet()
templateElements := templateSet.GetRecords()[0].GetOrderedElementList()
assert.Len(t, templateElements, len(expectedFields))
assert.Equal(t, uint32(0), templateElements[0].GetInfoElement().EnterpriseId)

// Check data
assert.Equal(t, uint16(10), dataMsg.GetVersion())
dataSet := dataMsg.GetSet()
record := dataSet.GetRecords()[0]

for _, name := range expectedFields {
element, _, exist := record.GetInfoElementWithValue(name)
assert.Truef(t, exist, "element with name %s should exist in the record", name)
Expand All @@ -138,52 +203,86 @@ func TestEnrichedIPFIXFlow(t *testing.T) {
}
}

func TestBasicIPFIXFlow(t *testing.T) {
cp := startCollector(t)
addr := cp.GetAddress().(*net.UDPAddr)

flow := decode.PBFlowToMap(&FullPBFlow)

// Add partial enrichment (must be ignored)
flow["SrcK8S_Name"] = "pod A"
flow["SrcK8S_Namespace"] = "ns1"
flow["SrcK8S_HostName"] = "node1"

writer, err := write.NewWriteIpfix(config.StageParam{
Write: &config.Write{
Ipfix: &api.WriteIpfix{
TargetHost: addr.IP.String(),
TargetPort: addr.Port,
Transport: addr.Network(),
// No enterprise ID here
},
},
})
require.NoError(t, err)

writer.Write(flow)

// Read collector
// 1st = IPv4 template
tplv4Msg := <-cp.GetMsgChan()
// 2nd = IPv6 template (ignore)
<-cp.GetMsgChan()
// 3rd = data record
dataMsg := <-cp.GetMsgChan()
cp.CloseMsgChan()
cp.Stop()

// Check template
assert.Equal(t, uint16(10), tplv4Msg.GetVersion())
templateSet := tplv4Msg.GetSet()
templateElements := templateSet.GetRecords()[0].GetOrderedElementList()
assert.Len(t, templateElements, len(write.IPv4IANAFields))
assert.Equal(t, uint32(0), templateElements[0].GetInfoElement().EnterpriseId)

// Check data
assert.Equal(t, uint16(10), dataMsg.GetVersion())
dataSet := dataMsg.GetSet()
record := dataSet.GetRecords()[0]

for _, name := range write.IPv4IANAFields {
element, _, exist := record.GetInfoElementWithValue(name)
assert.Truef(t, exist, "element with name %s should exist in the record", name)
assert.NotNil(t, element)
matchElement(t, element, flow)
}

// Make sure enriched fields are absent
for _, name := range write.KubeFields {
element, _, exist := record.GetInfoElementWithValue(name)
assert.Falsef(t, exist, "element with name %s should NOT exist in the record", name)
assert.Nil(t, element)
}
}

//nolint:cyclop
func matchElement(t *testing.T, element entities.InfoElementWithValue, flow config.GenericMap) {
switch element.GetName() {
case "sourceIPv4Address":
assert.Equal(t, flow["SrcAddr"], element.GetIPAddressValue().String())
case "destinationIPv4Address":
assert.Equal(t, flow["DstAddr"], element.GetIPAddressValue().String())
case "ethernetType":
assert.Equal(t, flow["Etype"], uint32(element.GetUnsigned16Value()))
case "flowDirection":
assert.Equal(t, flow["IfDirections"], []int{int(element.GetUnsigned8Value())})
case "protocolIdentifier":
assert.Equal(t, flow["Proto"], uint32(element.GetUnsigned8Value()))
case "sourceTransportPort":
assert.Equal(t, flow["SrcPort"], uint32(element.GetUnsigned16Value()))
case "destinationTransportPort":
assert.Equal(t, flow["DstPort"], uint32(element.GetUnsigned16Value()))
case "octetDeltaCount":
assert.Equal(t, flow["Bytes"], element.GetUnsigned64Value())
case "flowStartMilliseconds":
assert.Equal(t, flow["TimeFlowStartMs"], int64(element.GetUnsigned64Value()))
case "flowEndMilliseconds":
assert.Equal(t, flow["TimeFlowEndMs"], int64(element.GetUnsigned64Value()))
case "packetDeltaCount":
assert.Equal(t, flow["Packets"], element.GetUnsigned64Value())
case "interfaceName":
assert.Equal(t, flow["Interfaces"], []string{element.GetStringValue()})
case "sourcePodNamespace":
assert.Equal(t, flow["SrcK8S_Namespace"], element.GetStringValue())
case "sourcePodName":
assert.Equal(t, flow["SrcK8S_Name"], element.GetStringValue())
case "destinationPodNamespace":
assert.Equal(t, flow["DstK8S_Namespace"], element.GetStringValue())
case "destinationPodName":
assert.Equal(t, flow["DstK8S_Name"], element.GetStringValue())
case "sourceNodeName":
assert.Equal(t, flow["SrcK8S_HostName"], element.GetStringValue())
case "destinationNodeName":
assert.Equal(t, flow["DstK8S_HostName"], element.GetStringValue())
case "timeFlowRttNs":
assert.Equal(t, uint64(flow["TimeFlowRttNs"].(int64)), element.GetUnsigned64Value())
case "sourceMacAddress":
case "destinationMacAddress":
// Getting some discrepancies here, need to figure out why
default:
assert.Fail(t, "missing check on element", element.GetName())
name := element.GetName()
mapping, ok := write.MapIPFIXKeys[name]
if !ok {
assert.Fail(t, "missing check on element", name)
return
}
expected := flow[mapping.Key]
if mapping.Matcher != nil {
assert.True(t, mapping.Matcher(element, expected), "unexpected "+name)
} else {
value := mapping.Getter(element)
if expected == nil {
assert.Empty(t, value, "unexpected "+name)
} else {
assert.Equal(t, expected, value, "unexpected "+name)
}
}
}

Expand Down
Loading

0 comments on commit af02097

Please sign in to comment.