From b93f1d7da127939a7fc15529b112e01f88651d5b Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 15 Apr 2024 14:30:15 -0700 Subject: [PATCH] [receiver/filelog] Implement `ExcludeOlderThan` matcher criterion (#31916) **Description:** This PR implements a new matcher criterion in the Stanza fileconsumer matcher: ``` ExcludeOlderThan time.Duration `mapstructure:"exclude_older_than"` ``` and the corresponding setting in the `filelog` receiver configuration: | Field | Default | Description | |-------------------------------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `exclude_older_than` | | Exclude files whose modification time is older than the specified age. | When specified, the matcher will exclude files whose modification times are older than the specified time. **Link to tracking Issue:** #31053 **Testing:** Added unit tests. **Documentation:** Documented `exclude_older_than` configuration setting in the `filelogreceiver`'s README. --------- Co-authored-by: Daniel Jaglowski --- .../receiver-filelog-exclude-older-than.yaml | 27 +++++ .../matcher/internal/filter/exclude.go | 40 +++++++ .../matcher/internal/filter/exclude_test.go | 110 ++++++++++++++++++ pkg/stanza/fileconsumer/matcher/matcher.go | 43 ++++--- .../fileconsumer/matcher/matcher_test.go | 8 ++ receiver/filelogreceiver/README.md | 3 +- 6 files changed, 212 insertions(+), 19 deletions(-) create mode 100644 .chloggen/receiver-filelog-exclude-older-than.yaml create mode 100644 pkg/stanza/fileconsumer/matcher/internal/filter/exclude.go create mode 100644 pkg/stanza/fileconsumer/matcher/internal/filter/exclude_test.go diff --git a/.chloggen/receiver-filelog-exclude-older-than.yaml b/.chloggen/receiver-filelog-exclude-older-than.yaml new file mode 100644 index 000000000000..a3a9e70221aa --- /dev/null +++ b/.chloggen/receiver-filelog-exclude-older-than.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add `exclude_older_than` configuration setting" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31053] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/pkg/stanza/fileconsumer/matcher/internal/filter/exclude.go b/pkg/stanza/fileconsumer/matcher/internal/filter/exclude.go new file mode 100644 index 000000000000..8e6e254f597e --- /dev/null +++ b/pkg/stanza/fileconsumer/matcher/internal/filter/exclude.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package filter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher/internal/filter" +import ( + "os" + "time" + + "go.uber.org/multierr" +) + +type excludeOlderThanOption struct { + age time.Duration +} + +func (eot excludeOlderThanOption) apply(items []*item) ([]*item, error) { + filteredItems := make([]*item, 0, len(items)) + var errs error + for _, item := range items { + fi, err := os.Stat(item.value) + if err != nil { + errs = multierr.Append(errs, err) + continue + } + + // Keep (include) the file if its age (since last modification) + // is the same or less than the configured age. + fileAge := time.Since(fi.ModTime()) + if fileAge <= eot.age { + filteredItems = append(filteredItems, item) + } + } + + return filteredItems, errs +} + +// ExcludeOlderThan excludes files whose modification time is older than the specified age. +func ExcludeOlderThan(age time.Duration) Option { + return excludeOlderThanOption{age: age} +} diff --git a/pkg/stanza/fileconsumer/matcher/internal/filter/exclude_test.go b/pkg/stanza/fileconsumer/matcher/internal/filter/exclude_test.go new file mode 100644 index 000000000000..1cf18a02ae14 --- /dev/null +++ b/pkg/stanza/fileconsumer/matcher/internal/filter/exclude_test.go @@ -0,0 +1,110 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package filter + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestExcludeOlderThanFilter(t *testing.T) { + twoHoursAgo := time.Now().Add(-2 * time.Hour) + threeHoursAgo := twoHoursAgo.Add(-1 * time.Hour) + + cases := map[string]struct { + files []string + fileMTimes []time.Time + excludeOlderThan time.Duration + + expect []string + expectedErr string + }{ + "no_files": { + files: []string{}, + fileMTimes: []time.Time{}, + excludeOlderThan: 2 * time.Hour, + + expect: []string{}, + expectedErr: "", + }, + "exclude_no_files": { + files: []string{"a.log", "b.log"}, + fileMTimes: []time.Time{twoHoursAgo, twoHoursAgo}, + excludeOlderThan: 3 * time.Hour, + + expect: []string{"a.log", "b.log"}, + expectedErr: "", + }, + "exclude_some_files": { + files: []string{"a.log", "b.log"}, + fileMTimes: []time.Time{twoHoursAgo, threeHoursAgo}, + excludeOlderThan: 3 * time.Hour, + + expect: []string{"a.log"}, + expectedErr: "", + }, + "exclude_all_files": { + files: []string{"a.log", "b.log"}, + fileMTimes: []time.Time{twoHoursAgo, threeHoursAgo}, + excludeOlderThan: 90 * time.Minute, + + expect: []string{}, + expectedErr: "", + }, + "file_not_present": { + files: []string{"a.log", "b.log"}, + fileMTimes: []time.Time{twoHoursAgo, {}}, + excludeOlderThan: 3 * time.Hour, + + expect: []string{"a.log"}, + expectedErr: "b.log: no such file or directory", + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + tmpDir := t.TempDir() + var items []*item + // Create files with specified mtime + for i, file := range tc.files { + mtime := tc.fileMTimes[i] + fullPath := filepath.Join(tmpDir, file) + + // Only create file if mtime is specified + if !mtime.IsZero() { + f, err := os.Create(fullPath) + require.NoError(t, err) + require.NoError(t, f.Close()) + require.NoError(t, os.Chtimes(fullPath, twoHoursAgo, mtime)) + } + + it, err := newItem(fullPath, nil) + require.NoError(t, err) + + items = append(items, it) + } + + f := ExcludeOlderThan(tc.excludeOlderThan) + result, err := f.apply(items) + if tc.expectedErr != "" { + require.ErrorContains(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + } + + relativeResult := make([]string, 0, len(result)) + for _, r := range result { + rel, err := filepath.Rel(tmpDir, r.value) + require.NoError(t, err) + relativeResult = append(relativeResult, rel) + } + + require.Equal(t, tc.expect, relativeResult) + }) + } +} diff --git a/pkg/stanza/fileconsumer/matcher/matcher.go b/pkg/stanza/fileconsumer/matcher/matcher.go index a1fc7109a19e..a01a9fe1641a 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher.go +++ b/pkg/stanza/fileconsumer/matcher/matcher.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "regexp" + "time" "go.opentelemetry.io/collector/featuregate" @@ -33,8 +34,12 @@ var mtimeSortTypeFeatureGate = featuregate.GlobalRegistry().MustRegister( ) type Criteria struct { - Include []string `mapstructure:"include,omitempty"` - Exclude []string `mapstructure:"exclude,omitempty"` + Include []string `mapstructure:"include,omitempty"` + Exclude []string `mapstructure:"exclude,omitempty"` + + // ExcludeOlderThan allows excluding files whose modification time is older + // than the specified age. + ExcludeOlderThan time.Duration `mapstructure:"exclude_older_than"` OrderingCriteria OrderingCriteria `mapstructure:"ordering_criteria,omitempty"` } @@ -66,11 +71,17 @@ func New(c Criteria) (*Matcher, error) { return nil, fmt.Errorf("exclude: %w", err) } + m := &Matcher{ + include: c.Include, + exclude: c.Exclude, + } + + if c.ExcludeOlderThan != 0 { + m.filterOpts = append(m.filterOpts, filter.ExcludeOlderThan(c.ExcludeOlderThan)) + } + if len(c.OrderingCriteria.SortBy) == 0 { - return &Matcher{ - include: c.Include, - exclude: c.Exclude, - }, nil + return m, nil } if c.OrderingCriteria.TopN < 0 { @@ -80,6 +91,7 @@ func New(c Criteria) (*Matcher, error) { if c.OrderingCriteria.TopN == 0 { c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN } + m.topN = c.OrderingCriteria.TopN var regex *regexp.Regexp if orderingCriteriaNeedsRegex(c.OrderingCriteria.SortBy) { @@ -92,9 +104,10 @@ func New(c Criteria) (*Matcher, error) { if err != nil { return nil, fmt.Errorf("compile regex: %w", err) } + + m.regex = regex } - var filterOpts []filter.Option for _, sc := range c.OrderingCriteria.SortBy { switch sc.SortType { case sortTypeNumeric: @@ -102,36 +115,30 @@ func New(c Criteria) (*Matcher, error) { if err != nil { return nil, fmt.Errorf("numeric sort: %w", err) } - filterOpts = append(filterOpts, f) + m.filterOpts = append(m.filterOpts, f) case sortTypeAlphabetical: f, err := filter.SortAlphabetical(sc.RegexKey, sc.Ascending) if err != nil { return nil, fmt.Errorf("alphabetical sort: %w", err) } - filterOpts = append(filterOpts, f) + m.filterOpts = append(m.filterOpts, f) case sortTypeTimestamp: f, err := filter.SortTemporal(sc.RegexKey, sc.Ascending, sc.Layout, sc.Location) if err != nil { return nil, fmt.Errorf("timestamp sort: %w", err) } - filterOpts = append(filterOpts, f) + m.filterOpts = append(m.filterOpts, f) case sortTypeMtime: if !mtimeSortTypeFeatureGate.IsEnabled() { return nil, fmt.Errorf("the %q feature gate must be enabled to use %q sort type", mtimeSortTypeFeatureGate.ID(), sortTypeMtime) } - filterOpts = append(filterOpts, filter.SortMtime()) + m.filterOpts = append(m.filterOpts, filter.SortMtime()) default: return nil, fmt.Errorf("'sort_type' must be specified") } } - return &Matcher{ - include: c.Include, - exclude: c.Exclude, - regex: regex, - topN: c.OrderingCriteria.TopN, - filterOpts: filterOpts, - }, nil + return m, nil } // orderingCriteriaNeedsRegex returns true if any of the sort options require a regex to be set. diff --git a/pkg/stanza/fileconsumer/matcher/matcher_test.go b/pkg/stanza/fileconsumer/matcher/matcher_test.go index 2c8f799ac915..177a091b8658 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher_test.go +++ b/pkg/stanza/fileconsumer/matcher/matcher_test.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -209,6 +210,13 @@ func TestNew(t *testing.T) { }, expectedErr: `the "filelog.mtimeSortType" feature gate must be enabled to use "mtime" sort type`, }, + { + name: "ExcludeOlderThan", + criteria: Criteria{ + Include: []string{"*.log"}, + ExcludeOlderThan: 24 * time.Hour, + }, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { diff --git a/receiver/filelogreceiver/README.md b/receiver/filelogreceiver/README.md index 3e3520803717..2b0723d90db5 100644 --- a/receiver/filelogreceiver/README.md +++ b/receiver/filelogreceiver/README.md @@ -22,7 +22,8 @@ Tails and parses logs from files. | Field | Default | Description | |-------------------------------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `include` | required | A list of file glob patterns that match the file paths to be read. | -| `exclude` | [] | A list of file glob patterns to exclude from reading. | +| `exclude` | [] | A list of file glob patterns to exclude from reading. This is applied against the paths matched by `include`. | +| `exclude_older_than` | | Exclude files whose modification time is older than the specified [age](#time-parameters). | | `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end`. | | `multiline` | | A `multiline` configuration block. See [below](#multiline-configuration) for more details. | | `force_flush_period` | `500ms` | [Time](#time-parameters) since last read of data from file, after which currently buffered log should be send to pipeline. A value of `0` will disable forced flushing. |