Skip to content

Commit

Permalink
[netflow]: disable event normalisation (elastic#40635)
Browse files Browse the repository at this point in the history
* feat: netflow disable event normalisation

* fix: add kibana in docker-compose.yml used for agentbeat integration tests

* fix: switch to normalize and use an active voice for normaliseIPFields
  • Loading branch information
pkoutsovasilis authored Aug 29, 2024
1 parent 3b49a93 commit 3445a3e
Show file tree
Hide file tree
Showing 8 changed files with 1,683 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Improve logging in Okta Entity Analytics provider. {issue}40106[40106] {pull}40347[40347]
- Document `winlog` input. {issue}40074[40074] {pull}40462[40462]
- Added retry logic to websocket connections in the streaming input. {issue}40271[40271] {pull}40601[40601]
- Disable event normalization for netflow input {pull}40635[40635]

*Auditbeat*

Expand Down
9 changes: 9 additions & 0 deletions x-pack/agentbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ services:
image: busybox
depends_on:
elasticsearch: { condition: service_healthy }
kibana: { condition: service_healthy }
cometd: { condition: service_healthy }

elasticsearch:
Expand All @@ -24,3 +25,11 @@ services:
hostname: cometd
ports:
- 8080:8080

kibana:
extends:
file: ${ES_BEATS}/testing/environments/${STACK_ENVIRONMENT}.yml
service: kibana
healthcheck:
test: [ "CMD-SHELL", "curl -u beats:testing -s http://localhost:5601/api/status?v8format=true | grep -q '\"overall\":{\"level\":\"available\"'" ]
retries: 600
9 changes: 9 additions & 0 deletions x-pack/filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ services:
image: busybox
depends_on:
elasticsearch: { condition: service_healthy }
kibana: { condition: service_healthy }
cometd: { condition: service_healthy }

elasticsearch:
Expand All @@ -24,3 +25,11 @@ services:
hostname: cometd
ports:
- 8080:8080

kibana:
extends:
file: ${ES_BEATS}/testing/environments/${STACK_ENVIRONMENT}.yml
service: kibana
healthcheck:
test: [ "CMD-SHELL", "curl -u beats:testing -s http://localhost:5601/api/status?v8format=true | grep -q '\"overall\":{\"level\":\"available\"'" ]
retries: 600
31 changes: 28 additions & 3 deletions x-pack/filebeat/input/netflow/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,38 @@ import (
)

func toBeatEvent(flow record.Record, internalNetworks []string) (event beat.Event) {
var e beat.Event
switch flow.Type {
case record.Flow:
return flowToBeatEvent(flow, internalNetworks)
e = flowToBeatEvent(flow, internalNetworks)
case record.Options:
return optionsToBeatEvent(flow)
e = optionsToBeatEvent(flow)
default:
return toBeatEventCommon(flow)
e = toBeatEventCommon(flow)
}

normaliseIPFields(e.Fields)
return e
}

// normaliseIPFields normalizes net.IP fields in the given map from []byte to string.
// This function mutates the map and assumes every net.IP field is a direct entry.
// Fields that don't adhere to this convention (e.g. part of a struct) are not
// normalized.
func normaliseIPFields(fields mapstr.M) {
for key, value := range fields {
switch valueType := value.(type) {
case net.IP:
fields[key] = valueType.String()
case []net.IP:
stringIPs := make([]string, len(valueType))
for i, ip := range valueType {
stringIPs[i] = ip.String()
}
fields[key] = stringIPs
case mapstr.M:
normaliseIPFields(valueType)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/netflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (n *netflowInput) Run(env v2.Context, connector beat.PipelineConnector) err
client, err := connector.ConnectWith(beat.ClientConfig{
PublishMode: beat.DefaultGuarantees,
Processing: beat.ProcessingConfig{
EventNormalization: boolPtr(true),
EventNormalization: boolPtr(false),
},
EventListener: nil,
})
Expand Down
125 changes: 88 additions & 37 deletions x-pack/filebeat/input/netflow/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package netflow_test

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand All @@ -22,7 +23,6 @@ import (

"golang.org/x/time/rate"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/tests/integration"
filebeat "github.com/elastic/beats/v7/x-pack/filebeat/cmd"
"github.com/elastic/elastic-agent-client/v7/pkg/client/mock"
Expand All @@ -35,7 +35,7 @@ import (
)

const (
waitFor = 10 * time.Second
waitFor = 20 * time.Second
tick = 200 * time.Millisecond
)

Expand All @@ -50,16 +50,26 @@ func TestNetFlowIntegration(t *testing.T) {
outputHost := fmt.Sprintf("%s://%s:%s", esConnectionDetails.Scheme, esConnectionDetails.Hostname(), esConnectionDetails.Port())
outputHosts := []interface{}{outputHost}

kibanaURL, kibanaUser := integration.GetKibana(t)
kibanaUsername := kibanaUser.Username()
kibanaPassword, ok := kibanaUser.Password()
require.True(t, ok, "kibana user should have a password")

// since beat is managed by a mocked elastic-agent we need to install the netflow package
// through the Kibana API
err := installNetflowPackage(ctx, kibanaURL.String(), kibanaUsername, kibanaPassword)
require.NoError(t, err, "failed to install netflow package")

// we are going to need admin access to query ES about the logs-netflow.log-default data_stream
outputUsername := os.Getenv("ES_SUPERUSER_USER")
require.NotEmpty(t, outputUsername)
require.NotEmpty(t, outputUsername, "ES_SUPERUSER_USER env var must be set")
outputPassword := os.Getenv("ES_SUPERUSER_PASS")
require.NotEmpty(t, outputPassword)
require.NotEmpty(t, outputPassword, "ES_SUPERUSER_PASS env var must be set")
outputProtocol := esConnectionDetails.Scheme

deleted, err := DeleteDataStream(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default")
require.NoError(t, err)
require.True(t, deleted)
require.NoError(t, err, "failed to delete data stream")
require.True(t, deleted, "failed to delete data stream")

// construct expected Agent units
allStreams := []*proto.UnitExpected{
Expand Down Expand Up @@ -129,7 +139,7 @@ func TestNetFlowIntegration(t *testing.T) {
"queue_size": 2 * 4 * 1600,
"detect_sequence_reset": true,
"max_message_size": "10KiB",
"workers": 100,
"workers": 8,
}),
},
},
Expand Down Expand Up @@ -190,72 +200,67 @@ func TestNetFlowIntegration(t *testing.T) {
case err := <-beatRunErr:
t.Fatalf("beat run err: %v", err)
case <-time.After(waitFor):
t.Fatalf("timed out waiting for beat to become healthy")
t.Fatalf("timed out waiting for filebeat to report healthy")
}

registry := monitoring.GetNamespace("dataset").GetRegistry().GetRegistry("netflow_integration_test")

discardedEventsTotalVar, ok := registry.Get("discarded_events_total").(*monitoring.Uint)
require.True(t, ok)
require.True(t, ok, "failed to get discarded_events_total metric")

receivedEventTotalVar, ok := registry.Get("received_events_total").(*monitoring.Uint)
require.True(t, ok)
require.True(t, ok, "failed to get received_events_total metric")

udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:6006")
require.NoError(t, err)
require.NoError(t, err, "failed to resolve UDP address")

conn, err := net.DialUDP("udp", nil, udpAddr)
require.NoError(t, err)

data, err := os.ReadFile("testdata/golden/ipfix_cisco.reversed.pcap.golden.json")
require.NoError(t, err)

var expectedFlows struct {
Flows []beat.Event `json:"events,omitempty"`
}
err = json.Unmarshal(data, &expectedFlows)
require.NoError(t, err)
require.NoError(t, err, "failed to open UDP connection")

f, err := pcap.OpenOffline("testdata/pcap/ipfix_cisco.reversed.pcap")
require.NoError(t, err)
// for more info look testdata/integration/test.md
f, err := pcap.OpenOffline("testdata/integration/test.pcap")
require.NoError(t, err, "failed to open pcap file")
defer f.Close()
expectedEventsNumbers := 32

var totalBytes, totalPackets int
rateLimit := 10000
rateLimit := 3000
limiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit)

packetSource := gopacket.NewPacketSource(f, f.LinkType())
for pkt := range packetSource.Packets() {

if totalPackets%rateLimit == 0 {
err = limiter.WaitN(ctx, rateLimit)
require.NoError(t, err)
require.NoError(t, err, "failed to wait for rate limiter")
}

payloadData := pkt.TransportLayer().LayerPayload()

n, err := conn.Write(payloadData)
require.NoError(t, err)
require.NoError(t, err, "failed to write payload to UDP connection")

totalBytes += n
totalPackets++
}

require.Zero(t, discardedEventsTotalVar.Get())
require.Zero(t, discardedEventsTotalVar.Get(), "expected no discarded events")

require.Eventually(t, func() bool {
return receivedEventTotalVar.Get() == uint64(totalPackets)
}, waitFor, tick)
}, waitFor, tick, "expected all events to be received")

require.Eventually(t, func() bool {
return HasDataStream(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default") == nil
}, waitFor, tick)
}, waitFor, tick, "expected netflow data stream to be created")

require.Eventually(t, func() bool {
eventsCount, err := DataStreamEventsCount(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default")
require.NoError(t, err)
return eventsCount == uint64(len(expectedFlows.Flows))
}, waitFor, tick)
streamEventsCount, err := DataStreamEventsCount(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default")
if err != nil {
return false
}
return streamEventsCount == uint64(expectedEventsNumbers)
}, waitFor, tick, fmt.Sprintf("expected netflow data stream to have %d events", expectedEventsNumbers))
}

type unitPayload map[string]interface{}
Expand Down Expand Up @@ -299,7 +304,7 @@ type DataStreamResult struct {
}

func HasDataStream(ctx context.Context, username string, password string, url string, name string) error {
resultBytes, err := request(ctx, http.MethodGet, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name))
resultBytes, err := request(ctx, http.MethodGet, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -339,7 +344,7 @@ type CountResults struct {
}

func DataStreamEventsCount(ctx context.Context, username string, password string, url string, name string) (uint64, error) {
resultBytes, err := request(ctx, http.MethodGet, username, password, fmt.Sprintf("%s/%s/_count?q=!_ignored:*+AND+!event.message:*", url, name))
resultBytes, err := request(ctx, http.MethodGet, username, password, fmt.Sprintf("%s/%s/_count?q=!_ignored:*+AND+!event.message:*", url, name), nil, nil)
if err != nil {
return 0, err
}
Expand All @@ -362,29 +367,75 @@ type DeleteResults struct {
}

func DeleteDataStream(ctx context.Context, username string, password string, url string, name string) (bool, error) {
_, err := request(ctx, http.MethodDelete, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name))
_, err := request(ctx, http.MethodDelete, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name), nil, nil)
if err != nil {
return false, err
}

return true, nil
}

func request(ctx context.Context, httpMethod string, username string, password string, url string) ([]byte, error) {
func installNetflowPackage(ctx context.Context, url string, username string, password string) error {

type Response struct {
Item struct {
Version string `json:"version"`
} `json:"item"`
}

resp, err := request(ctx, http.MethodGet, username, password, fmt.Sprintf("%s/api/fleet/epm/packages/netflow?prerelease=true", url), nil, nil)
if err != nil {
return err
}

var results Response
err = json.Unmarshal(resp, &results)
if err != nil {
return err
}

version := results.Item.Version

resp, err = request(ctx, http.MethodPost, username, password, fmt.Sprintf("%s/api/fleet/epm/packages/netflow/%s", url, version), map[string]string{
"kbn-xsrf": "true",
}, []byte(`{"force":true}`))
if err != nil {
return err
}

if resp == nil {
return errors.New("http not found error")
}

return nil
}

func request(ctx context.Context, httpMethod string, username string, password string, url string, headers map[string]string, reqBody []byte) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, httpMethod, url, nil)
if err != nil {
return nil, err
}
req.SetBasicAuth(username, password)

for k, v := range headers {
req.Header.Set(k, v)
}

if reqBody != nil {
req.Body = io.NopCloser(bytes.NewReader(reqBody))
}

res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return nil, nil
} else if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", res.StatusCode)
}

resultBytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 3445a3e

Please sign in to comment.