Skip to content

Commit

Permalink
[receiver/filelog] Implement ExcludeOlderThan matcher criterion (#3…
Browse files Browse the repository at this point in the history
…1916)

**Description:** 
This PR implements a new matcher criterion in the Stanza fileconsumer
matcher:

```
ExcludeOlderThan time.Duration        `mapstructure:"exclude_older_than"`
```

and the corresponding setting in the `filelog` receiver configuration:

| Field | Default | Description |

|-------------------------------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `exclude_older_than` | | Exclude files whose modification time is
older than the specified age. |


When specified, the matcher will exclude files whose modification times
are older than the specified time.

**Link to tracking Issue:** #31053

**Testing:** Added unit tests.

**Documentation:** Documented `exclude_older_than` configuration setting
in the `filelogreceiver`'s README.

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
ycombinator and djaglowski authored Apr 15, 2024
1 parent e870c7f commit 7579382
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 19 deletions.
27 changes: 27 additions & 0 deletions .chloggen/receiver-filelog-exclude-older-than.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add `exclude_older_than` configuration setting"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31053]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
40 changes: 40 additions & 0 deletions pkg/stanza/fileconsumer/matcher/internal/filter/exclude.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package filter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher/internal/filter"
import (
"os"
"time"

"go.uber.org/multierr"
)

type excludeOlderThanOption struct {
age time.Duration
}

func (eot excludeOlderThanOption) apply(items []*item) ([]*item, error) {
filteredItems := make([]*item, 0, len(items))
var errs error
for _, item := range items {
fi, err := os.Stat(item.value)
if err != nil {
errs = multierr.Append(errs, err)
continue
}

// Keep (include) the file if its age (since last modification)
// is the same or less than the configured age.
fileAge := time.Since(fi.ModTime())
if fileAge <= eot.age {
filteredItems = append(filteredItems, item)
}
}

return filteredItems, errs
}

// ExcludeOlderThan excludes files whose modification time is older than the specified age.
func ExcludeOlderThan(age time.Duration) Option {
return excludeOlderThanOption{age: age}
}
110 changes: 110 additions & 0 deletions pkg/stanza/fileconsumer/matcher/internal/filter/exclude_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package filter

import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestExcludeOlderThanFilter(t *testing.T) {
twoHoursAgo := time.Now().Add(-2 * time.Hour)
threeHoursAgo := twoHoursAgo.Add(-1 * time.Hour)

cases := map[string]struct {
files []string
fileMTimes []time.Time
excludeOlderThan time.Duration

expect []string
expectedErr string
}{
"no_files": {
files: []string{},
fileMTimes: []time.Time{},
excludeOlderThan: 2 * time.Hour,

expect: []string{},
expectedErr: "",
},
"exclude_no_files": {
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{twoHoursAgo, twoHoursAgo},
excludeOlderThan: 3 * time.Hour,

expect: []string{"a.log", "b.log"},
expectedErr: "",
},
"exclude_some_files": {
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{twoHoursAgo, threeHoursAgo},
excludeOlderThan: 3 * time.Hour,

expect: []string{"a.log"},
expectedErr: "",
},
"exclude_all_files": {
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{twoHoursAgo, threeHoursAgo},
excludeOlderThan: 90 * time.Minute,

expect: []string{},
expectedErr: "",
},
"file_not_present": {
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{twoHoursAgo, {}},
excludeOlderThan: 3 * time.Hour,

expect: []string{"a.log"},
expectedErr: "b.log: no such file or directory",
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
tmpDir := t.TempDir()
var items []*item
// Create files with specified mtime
for i, file := range tc.files {
mtime := tc.fileMTimes[i]
fullPath := filepath.Join(tmpDir, file)

// Only create file if mtime is specified
if !mtime.IsZero() {
f, err := os.Create(fullPath)
require.NoError(t, err)
require.NoError(t, f.Close())
require.NoError(t, os.Chtimes(fullPath, twoHoursAgo, mtime))
}

it, err := newItem(fullPath, nil)
require.NoError(t, err)

items = append(items, it)
}

f := ExcludeOlderThan(tc.excludeOlderThan)
result, err := f.apply(items)
if tc.expectedErr != "" {
require.ErrorContains(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
}

relativeResult := make([]string, 0, len(result))
for _, r := range result {
rel, err := filepath.Rel(tmpDir, r.value)
require.NoError(t, err)
relativeResult = append(relativeResult, rel)
}

require.Equal(t, tc.expect, relativeResult)
})
}
}
43 changes: 25 additions & 18 deletions pkg/stanza/fileconsumer/matcher/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"regexp"
"time"

"go.opentelemetry.io/collector/featuregate"

Expand All @@ -33,8 +34,12 @@ var mtimeSortTypeFeatureGate = featuregate.GlobalRegistry().MustRegister(
)

type Criteria struct {
Include []string `mapstructure:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty"`
Include []string `mapstructure:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty"`

// ExcludeOlderThan allows excluding files whose modification time is older
// than the specified age.
ExcludeOlderThan time.Duration `mapstructure:"exclude_older_than"`
OrderingCriteria OrderingCriteria `mapstructure:"ordering_criteria,omitempty"`
}

Expand Down Expand Up @@ -66,11 +71,17 @@ func New(c Criteria) (*Matcher, error) {
return nil, fmt.Errorf("exclude: %w", err)
}

m := &Matcher{
include: c.Include,
exclude: c.Exclude,
}

if c.ExcludeOlderThan != 0 {
m.filterOpts = append(m.filterOpts, filter.ExcludeOlderThan(c.ExcludeOlderThan))
}

if len(c.OrderingCriteria.SortBy) == 0 {
return &Matcher{
include: c.Include,
exclude: c.Exclude,
}, nil
return m, nil
}

if c.OrderingCriteria.TopN < 0 {
Expand All @@ -80,6 +91,7 @@ func New(c Criteria) (*Matcher, error) {
if c.OrderingCriteria.TopN == 0 {
c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN
}
m.topN = c.OrderingCriteria.TopN

var regex *regexp.Regexp
if orderingCriteriaNeedsRegex(c.OrderingCriteria.SortBy) {
Expand All @@ -92,46 +104,41 @@ func New(c Criteria) (*Matcher, error) {
if err != nil {
return nil, fmt.Errorf("compile regex: %w", err)
}

m.regex = regex
}

var filterOpts []filter.Option
for _, sc := range c.OrderingCriteria.SortBy {
switch sc.SortType {
case sortTypeNumeric:
f, err := filter.SortNumeric(sc.RegexKey, sc.Ascending)
if err != nil {
return nil, fmt.Errorf("numeric sort: %w", err)
}
filterOpts = append(filterOpts, f)
m.filterOpts = append(m.filterOpts, f)
case sortTypeAlphabetical:
f, err := filter.SortAlphabetical(sc.RegexKey, sc.Ascending)
if err != nil {
return nil, fmt.Errorf("alphabetical sort: %w", err)
}
filterOpts = append(filterOpts, f)
m.filterOpts = append(m.filterOpts, f)
case sortTypeTimestamp:
f, err := filter.SortTemporal(sc.RegexKey, sc.Ascending, sc.Layout, sc.Location)
if err != nil {
return nil, fmt.Errorf("timestamp sort: %w", err)
}
filterOpts = append(filterOpts, f)
m.filterOpts = append(m.filterOpts, f)
case sortTypeMtime:
if !mtimeSortTypeFeatureGate.IsEnabled() {
return nil, fmt.Errorf("the %q feature gate must be enabled to use %q sort type", mtimeSortTypeFeatureGate.ID(), sortTypeMtime)
}
filterOpts = append(filterOpts, filter.SortMtime())
m.filterOpts = append(m.filterOpts, filter.SortMtime())
default:
return nil, fmt.Errorf("'sort_type' must be specified")
}
}

return &Matcher{
include: c.Include,
exclude: c.Exclude,
regex: regex,
topN: c.OrderingCriteria.TopN,
filterOpts: filterOpts,
}, nil
return m, nil
}

// orderingCriteriaNeedsRegex returns true if any of the sort options require a regex to be set.
Expand Down
8 changes: 8 additions & 0 deletions pkg/stanza/fileconsumer/matcher/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -209,6 +210,13 @@ func TestNew(t *testing.T) {
},
expectedErr: `the "filelog.mtimeSortType" feature gate must be enabled to use "mtime" sort type`,
},
{
name: "ExcludeOlderThan",
criteria: Criteria{
Include: []string{"*.log"},
ExcludeOlderThan: 24 * time.Hour,
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion receiver/filelogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ Tails and parses logs from files.
| Field | Default | Description |
|-------------------------------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `include` | required | A list of file glob patterns that match the file paths to be read. |
| `exclude` | [] | A list of file glob patterns to exclude from reading. This is applied against the paths matched by `include`. |
| `exclude` | [] | A list of file glob patterns to exclude from reading. This is applied against the paths matched by `include`. |
| `exclude_older_than` | | Exclude files whose modification time is older than the specified [age](#time-parameters). |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end`. |
| `multiline` | | A `multiline` configuration block. See [below](#multiline-configuration) for more details. |
| `force_flush_period` | `500ms` | [Time](#time-parameters) since last read of data from file, after which currently buffered log should be send to pipeline. A value of `0` will disable forced flushing. |
Expand Down

0 comments on commit 7579382

Please sign in to comment.