Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: tag events that come from a filestream with take_over: true #39828

Merged
merged 7 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Removed deprecated Sophos UTM from Beats. Use the https://docs.elastic.co/integrations/sophos[Sophos] Elastic integration instead. {pull}38037[38037]
- Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055]
- Update Salesforce module to use new Salesforce input. {pull}37509[37509]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]

*Heartbeat*
Expand Down
5 changes: 5 additions & 0 deletions filebeat/docs/howto/migrate-to-filestream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,8 @@ and return to old `log` inputs the files that were taken by `filestream` inputs,
6. Run Filebeat with the old configuration (no `filestream` inputs with `take_over: true`).

NOTE: Reverting to backups might cause some events to repeat, depends on the amount of time the new configuration was running.

=== Debugging on Kibana

Events produced by `filestream` with `take_over: true` contains `take_over` tag.
You can filter on this tag in Kibana and see the events which came from a filestream in the "take over" mode.
8 changes: 8 additions & 0 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

const pluginName = "filestream"
Expand All @@ -61,6 +62,7 @@ type filestream struct {
encodingFactory encoding.EncodingFactory
closerConfig closerConfig
parsers parser.Config
takeOver bool
}

// Plugin creates a new filestream input plugin for creating a stateful input.
Expand Down Expand Up @@ -101,6 +103,7 @@ func configure(cfg *conf.C) (loginp.Prospector, loginp.Harvester, error) {
encodingFactory: encodingFactory,
closerConfig: config.Close,
parsers: config.Reader.Parsers,
takeOver: config.TakeOver,
}

return prospector, filestream, nil
Expand Down Expand Up @@ -378,6 +381,11 @@ func (inp *filestream) readFromSource(

metrics.BytesProcessed.Add(uint64(message.Bytes))

// add "take_over" tag if `take_over` is set to true
if inp.takeOver {
_ = mapstr.AddTags(message.Fields, []string{"take_over"})
}

if err := p.Publish(message.ToEvent(), s); err != nil {
metrics.ProcessingErrors.Inc()
return err
Expand Down
43 changes: 43 additions & 0 deletions filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

func BenchmarkFilestream(b *testing.B) {
Expand Down Expand Up @@ -115,6 +116,48 @@ paths:
})
}

func TestTakeOverTags(t *testing.T) {
testCases := []struct {
name string
takeOver bool
testFunc func(t *testing.T, event beat.Event)
}{
{
name: "test-take_over-true",
takeOver: true,
testFunc: func(t *testing.T, event beat.Event) {
tags, err := event.GetValue("tags")
require.NoError(t, err)
require.Contains(t, tags, "take_over")
},
},
{
name: "test-take_over-false",
takeOver: false,
testFunc: func(t *testing.T, event beat.Event) {
_, err := event.GetValue("tags")
require.ErrorIs(t, err, mapstr.ErrKeyNotFound)
},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
filename := generateFile(t, t.TempDir(), 5)
cfg := fmt.Sprintf(`
type: filestream
prospector.scanner.check_interval: 1s
take_over: %t
paths:
- %s`, testCase.takeOver, filename)
runner := createFilestreamTestRunner(context.Background(), t, testCase.name, cfg, 5, true)
events := runner(t)
for _, event := range events {
testCase.testFunc(t, event)
}
})
}
}

// runFilestreamBenchmark runs the entire filestream input with the in-memory registry and the test pipeline.
// `testID` must be unique for each test run
// `cfg` must be a valid YAML string containing valid filestream configuration
Expand Down
Loading