From 5dca21719d8206808ab6aaa953414ae6d923dc55 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 7 Jun 2024 18:39:33 +0530 Subject: [PATCH 1/6] filestream: tag events with `take_over: true` --- filebeat/input/filestream/input.go | 7 +++++++ filebeat/input/filestream/input_test.go | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 0136b062b48..1f7faa84306 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -41,6 +41,7 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" ) const pluginName = "filestream" @@ -61,6 +62,7 @@ type filestream struct { encodingFactory encoding.EncodingFactory closerConfig closerConfig parsers parser.Config + takeOver bool } // Plugin creates a new filestream input plugin for creating a stateful input. @@ -101,6 +103,7 @@ func configure(cfg *conf.C) (loginp.Prospector, loginp.Harvester, error) { encodingFactory: encodingFactory, closerConfig: config.Close, parsers: config.Reader.Parsers, + takeOver: config.TakeOver, } return prospector, filestream, nil @@ -378,6 +381,10 @@ func (inp *filestream) readFromSource( metrics.BytesProcessed.Add(uint64(message.Bytes)) + if inp.takeOver { + mapstr.AddTags(message.Fields, []string{"take_over"}) + } + if err := p.Publish(message.ToEvent(), s); err != nil { metrics.ProcessingErrors.Inc() return err diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index a1d9729c5aa..e895dd6ab16 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -115,6 +115,25 @@ paths: }) } +func TestTakeOverTags(t *testing.T) { + filename := generateFile(t, t.TempDir(), 5) + + cfg := ` +type: filestream +prospector.scanner.check_interval: 1s +take_over: true +paths: + - ` + filename + ` +` + runner := createFilestreamTestRunner(context.Background(), t, "test-take_over-tag", cfg, 5, true) + events := runner(t) + for _, event := range events { + tags, err := event.GetValue("tags") + require.NoError(t, err) + require.Contains(t, tags, "take_over") + } +} + // runFilestreamBenchmark runs the entire filestream input with the in-memory registry and the test pipeline. // `testID` must be unique for each test run // `cfg` must be a valid YAML string containing valid filestream configuration From 994aba5dd66ad8f399db935470105abf562cd95f Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 7 Jun 2024 18:54:38 +0530 Subject: [PATCH 2/6] filestream: modify test cases --- filebeat/input/filestream/input_test.go | 47 +++++++++++++++++++------ 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index e895dd6ab16..b267fce2740 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -35,6 +35,7 @@ import ( "github.com/elastic/beats/v7/libbeat/statestore/storetest" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" ) func BenchmarkFilestream(b *testing.B) { @@ -116,21 +117,45 @@ paths: } func TestTakeOverTags(t *testing.T) { - filename := generateFile(t, t.TempDir(), 5) - cfg := ` + testCases := []struct { + name string + takeOver bool + testFunc func(event beat.Event) + }{ + { + name: "test-take_over-true", + takeOver: true, + testFunc: func(event beat.Event) { + tags, err := event.GetValue("tags") + require.NoError(t, err) + require.Contains(t, tags, "take_over") + }, + }, + { + name: "test-take_over-false", + takeOver: false, + testFunc: func(event beat.Event) { + _, err := event.GetValue("tags") + require.ErrorIs(t, err, mapstr.ErrKeyNotFound) + }, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + filename := generateFile(t, t.TempDir(), 5) + cfg := fmt.Sprintf(` type: filestream prospector.scanner.check_interval: 1s -take_over: true +take_over: %t paths: - - ` + filename + ` -` - runner := createFilestreamTestRunner(context.Background(), t, "test-take_over-tag", cfg, 5, true) - events := runner(t) - for _, event := range events { - tags, err := event.GetValue("tags") - require.NoError(t, err) - require.Contains(t, tags, "take_over") + - %s`, testCase.takeOver, filename) + runner := createFilestreamTestRunner(context.Background(), t, testCase.name, cfg, 5, true) + events := runner(t) + for _, event := range events { + testCase.testFunc(event) + } + }) } } From d5b734a0ca6a48023508c3366df324b90f136d3f Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 7 Jun 2024 18:59:42 +0530 Subject: [PATCH 3/6] add comments --- filebeat/input/filestream/input.go | 1 + 1 file changed, 1 insertion(+) diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 1f7faa84306..2294c12f331 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -381,6 +381,7 @@ func (inp *filestream) readFromSource( metrics.BytesProcessed.Add(uint64(message.Bytes)) + // add "take_over" tag if `take_over` is set to true if inp.takeOver { mapstr.AddTags(message.Fields, []string{"take_over"}) } From 5b6d5cc8988adb485ff61fa89660f2ec36ef387b Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 7 Jun 2024 19:28:34 +0530 Subject: [PATCH 4/6] update documentation --- filebeat/docs/howto/migrate-to-filestream.asciidoc | 5 +++++ filebeat/input/filestream/input.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/filebeat/docs/howto/migrate-to-filestream.asciidoc b/filebeat/docs/howto/migrate-to-filestream.asciidoc index a57105adb3e..30057fab725 100644 --- a/filebeat/docs/howto/migrate-to-filestream.asciidoc +++ b/filebeat/docs/howto/migrate-to-filestream.asciidoc @@ -247,3 +247,8 @@ and return to old `log` inputs the files that were taken by `filestream` inputs, 6. Run Filebeat with the old configuration (no `filestream` inputs with `take_over: true`). NOTE: Reverting to backups might cause some events to repeat, depends on the amount of time the new configuration was running. + +=== Debugging on Kibana + +Events produced by `filestream` with `take_over: true` contains `take_over` tag. +You can filter on this tag in Kibana and see the events which came from a filestream in the "take over" mode. \ No newline at end of file diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 2294c12f331..7da25654a25 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -383,7 +383,7 @@ func (inp *filestream) readFromSource( // add "take_over" tag if `take_over` is set to true if inp.takeOver { - mapstr.AddTags(message.Fields, []string{"take_over"}) + _ = mapstr.AddTags(message.Fields, []string{"take_over"}) } if err := p.Publish(message.ToEvent(), s); err != nil { From 48e5924837e37551613cc215aa16ea8b7732f2ad Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Fri, 7 Jun 2024 20:04:22 +0530 Subject: [PATCH 5/6] Update filebeat/input/filestream/input_test.go Co-authored-by: Tiago Queiroz --- filebeat/input/filestream/input_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index b267fce2740..3dfe176ac01 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -117,16 +117,15 @@ paths: } func TestTakeOverTags(t *testing.T) { - testCases := []struct { name string takeOver bool - testFunc func(event beat.Event) + testFunc func(t *testing.T, event beat.Event) }{ { name: "test-take_over-true", takeOver: true, - testFunc: func(event beat.Event) { + testFunc: func(t *testing.T, event beat.Event) { tags, err := event.GetValue("tags") require.NoError(t, err) require.Contains(t, tags, "take_over") @@ -135,7 +134,7 @@ func TestTakeOverTags(t *testing.T) { { name: "test-take_over-false", takeOver: false, - testFunc: func(event beat.Event) { + testFunc: func(t *testing.T, event beat.Event) { _, err := event.GetValue("tags") require.ErrorIs(t, err, mapstr.ErrKeyNotFound) }, @@ -153,7 +152,7 @@ paths: runner := createFilestreamTestRunner(context.Background(), t, testCase.name, cfg, 5, true) events := runner(t) for _, event := range events { - testCase.testFunc(event) + testCase.testFunc(t, event) } }) } From 480b2c7ac1d2dcc97c8a92ea67e66a815c0cc2e1 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 7 Jun 2024 21:31:48 +0530 Subject: [PATCH 6/6] add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4a02b3c9a1d..8b0d3b41e1e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Removed deprecated Sophos UTM from Beats. Use the https://docs.elastic.co/integrations/sophos[Sophos] Elastic integration instead. {pull}38037[38037] - Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055] - Update Salesforce module to use new Salesforce input. {pull}37509[37509] +- Tag events that come from a filestream in "take over" mode. {pull}39828[39828] *Heartbeat*