diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index e82719f698ff..18ad9d84fc38 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -16,6 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" @@ -163,7 +164,10 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli HeaderConfig: hCfg, DeleteAtEOF: c.DeleteAfterRead, } - + knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) + for i := 0; i < len(knownFiles); i++ { + knownFiles[i] = fileset.New[*reader.Metadata](c.MaxConcurrentFiles / 2) + } return &Manager{ SugaredLogger: logger.With("component", "fileconsumer"), readerFactory: readerFactory, @@ -171,8 +175,9 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli pollInterval: c.PollInterval, maxBatchFiles: c.MaxConcurrentFiles / 2, maxBatches: c.MaxBatches, - previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2), - knownFiles: []*reader.Metadata{}, + currentPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2), + previousPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2), + knownFiles: knownFiles, }, nil } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 085389685017..edc4f89cb004 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -13,6 +13,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" @@ -32,26 +33,17 @@ type Manager struct { maxBatches int maxBatchFiles int - previousPollFiles []*reader.Reader - knownFiles []*reader.Metadata - - // This value approximates the expected number of files which we will find in a single poll cycle. - // It is updated each poll cycle using a simple moving average calculation which assigns 20% weight - // to the most recent poll cycle. - // It is used to regulate the size of knownFiles. The goal is to allow knownFiles - // to contain checkpoints from a few previous poll cycles, but not grow unbounded. - movingAverageMatches int + currentPollFiles *fileset.Fileset[*reader.Reader] + previousPollFiles *fileset.Fileset[*reader.Reader] + knownFiles []*fileset.Fileset[*reader.Metadata] } func (m *Manager) Start(persister operator.Persister) error { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel - if matches, err := m.fileMatcher.MatchFiles(); err != nil { + if _, err := m.fileMatcher.MatchFiles(); err != nil { m.Warnf("finding files: %v", err) - } else { - m.movingAverageMatches = len(matches) - m.knownFiles = make([]*reader.Metadata, 0, 4*len(matches)) } if persister != nil { @@ -63,7 +55,7 @@ func (m *Manager) Start(persister operator.Persister) error { if len(offsets) > 0 { m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") m.readerFactory.FromBeginning = true - m.knownFiles = append(m.knownFiles, offsets...) + m.knownFiles[0].Add(offsets...) } } @@ -74,13 +66,18 @@ func (m *Manager) Start(persister operator.Persister) error { } func (m *Manager) closePreviousFiles() { - if len(m.knownFiles) > 4*m.movingAverageMatches { - m.knownFiles = m.knownFiles[m.movingAverageMatches:] - } - for _, r := range m.previousPollFiles { - m.knownFiles = append(m.knownFiles, r.Close()) + // m.previousPollFiles -> m.knownFiles[0] + + for r, _ := m.previousPollFiles.Pop(); r != nil; r, _ = m.previousPollFiles.Pop() { + m.knownFiles[0].Add(r.Close()) } - m.previousPollFiles = nil +} + +func (m *Manager) rotateFilesets() { + // shift the filesets at end of every consume() call + // m.knownFiles[0] -> m.knownFiles[1] -> m.knownFiles[2] + copy(m.knownFiles[1:], m.knownFiles) + m.knownFiles[0] = fileset.New[*reader.Metadata](m.maxBatchFiles / 2) } // Stop will stop the file monitoring process @@ -92,7 +89,11 @@ func (m *Manager) Stop() error { m.wg.Wait() m.closePreviousFiles() if m.persister != nil { - if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil { + checkpoints := make([]*reader.Metadata, 0, m.totalReaders()) + for _, knownFiles := range m.knownFiles { + checkpoints = append(checkpoints, knownFiles.Get()...) + } + if err := checkpoint.Save(context.Background(), m.persister, checkpoints); err != nil { m.Errorw("save offsets", zap.Error(err)) } } @@ -128,9 +129,7 @@ func (m *Manager) poll(ctx context.Context) { // Get the list of paths on disk matches, err := m.fileMatcher.MatchFiles() if err != nil { - m.Debugf("finding files: %v", err) - } else { - m.movingAverageMatches = (m.movingAverageMatches*3 + len(matches)) / 4 + m.Warnf("finding files: %v", err) } m.Debugf("matched files", zap.Strings("paths", matches)) @@ -152,26 +151,31 @@ func (m *Manager) poll(ctx context.Context) { // Any new files that appear should be consumed entirely m.readerFactory.FromBeginning = true if m.persister != nil { - allCheckpoints := make([]*reader.Metadata, 0, len(m.knownFiles)+len(m.previousPollFiles)) - allCheckpoints = append(allCheckpoints, m.knownFiles...) - for _, r := range m.previousPollFiles { + allCheckpoints := make([]*reader.Metadata, 0, m.totalReaders()) + for _, knownFiles := range m.knownFiles { + allCheckpoints = append(allCheckpoints, knownFiles.Get()...) + } + + for _, r := range m.previousPollFiles.Get() { allCheckpoints = append(allCheckpoints, r.Metadata) } if err := checkpoint.Save(context.Background(), m.persister, allCheckpoints); err != nil { m.Errorw("save offsets", zap.Error(err)) } } + // rotate at end of every poll() + m.rotateFilesets() } func (m *Manager) consume(ctx context.Context, paths []string) { m.Debug("Consuming files", zap.Strings("paths", paths)) - readers := m.makeReaders(paths) + m.makeReaders(paths) - m.preConsume(ctx, readers) + m.preConsume(ctx) // read new readers to end var wg sync.WaitGroup - for _, r := range readers { + for _, r := range m.currentPollFiles.Get() { wg.Add(1) go func(r *reader.Reader) { defer wg.Done() @@ -180,7 +184,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { } wg.Wait() - m.postConsume(readers) + m.postConsume() } func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) { @@ -211,9 +215,8 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi // makeReader take a file path, then creates reader, // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval -func (m *Manager) makeReaders(paths []string) []*reader.Reader { - readers := make([]*reader.Reader, 0, len(paths)) -OUTER: +func (m *Manager) makeReaders(paths []string) { + m.currentPollFiles = fileset.New[*reader.Reader](m.maxBatchFiles / 2) for _, path := range paths { fp, file := m.makeFingerprint(path) if fp == nil { @@ -222,13 +225,13 @@ OUTER: // Exclude duplicate paths with the same content. This can happen when files are // being rotated with copy/truncate strategy. (After copy, prior to truncate.) - for _, r := range readers { - if fp.Equal(r.Fingerprint) { - if err := file.Close(); err != nil { - m.Debugw("problem closing file", zap.Error(err)) - } - continue OUTER + if r := m.currentPollFiles.Match(fp, fileset.Equal); r != nil { + // re-add the reader as Match() removes duplicates + m.currentPollFiles.Add(r) + if err := file.Close(); err != nil { + m.Debugw("problem closing file", zap.Error(err)) } + continue } r, err := m.newReader(file, fp) @@ -237,29 +240,19 @@ OUTER: continue } - readers = append(readers, r) + m.currentPollFiles.Add(r) } - return readers } func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) { // Check previous poll cycle for match - for i := 0; i < len(m.previousPollFiles); i++ { - oldReader := m.previousPollFiles[i] - if fp.StartsWith(oldReader.Fingerprint) { - // Keep the new reader and discard the old. This ensures that if the file was - // copied to another location and truncated, our handle is updated. - m.previousPollFiles = append(m.previousPollFiles[:i], m.previousPollFiles[i+1:]...) - return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) - } + if oldReader := m.previousPollFiles.Match(fp, fileset.StartsWith); oldReader != nil { + return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) } // Iterate backwards to match newest first - for i := len(m.knownFiles) - 1; i >= 0; i-- { - oldMetadata := m.knownFiles[i] - if fp.StartsWith(oldMetadata.Fingerprint) { - // Remove the old metadata from the list. We will keep updating it and save it again later. - m.knownFiles = append(m.knownFiles[:i], m.knownFiles[i+1:]...) + for i := 0; i < len(m.knownFiles); i++ { + if oldMetadata := m.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil { return m.readerFactory.NewReaderFromMetadata(file, oldMetadata) } } @@ -268,3 +261,11 @@ func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader m.Infow("Started watching file", "path", file.Name()) return m.readerFactory.NewReader(file, fp) } + +func (m *Manager) totalReaders() int { + total := m.previousPollFiles.Len() + for i := 0; i < len(m.knownFiles); i++ { + total += m.knownFiles[i].Len() + } + return total +} diff --git a/pkg/stanza/fileconsumer/file_other.go b/pkg/stanza/fileconsumer/file_other.go index 6960982801e2..cfa0c80fae09 100644 --- a/pkg/stanza/fileconsumer/file_other.go +++ b/pkg/stanza/fileconsumer/file_other.go @@ -15,11 +15,11 @@ import ( // Take care of files which disappeared from the pattern since the last poll cycle // this can mean either files which were removed, or rotated into a name not matching the pattern // we do this before reading existing files to ensure we emit older log lines before newer ones -func (m *Manager) preConsume(ctx context.Context, newReaders []*reader.Reader) { - lostReaders := make([]*reader.Reader, 0, len(m.previousPollFiles)) +func (m *Manager) preConsume(ctx context.Context) { + lostReaders := make([]*reader.Reader, 0, m.previousPollFiles.Len()) OUTER: - for _, oldReader := range m.previousPollFiles { - for _, newReader := range newReaders { + for _, oldReader := range m.previousPollFiles.Get() { + for _, newReader := range m.currentPollFiles.Get() { if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) { continue OUTER } @@ -52,7 +52,9 @@ OUTER: // On non-windows platforms, we keep files open between poll cycles so that we can detect // and read "lost" files, which have been moved out of the matching pattern. -func (m *Manager) postConsume(readers []*reader.Reader) { +func (m *Manager) postConsume() { m.closePreviousFiles() - m.previousPollFiles = readers + + // m.currentPollFiles -> m.previousPollFiles + m.previousPollFiles = m.currentPollFiles } diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 2e203ef1ca4d..d816089f228e 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -859,7 +859,6 @@ func TestFileBatchingRespectsStartAtEnd(t *testing.T) { operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() - operator.movingAverageMatches = 10 temps := make([]*os.File, 0, initFiles+moreFiles) for i := 0; i < initFiles; i++ { @@ -1144,7 +1143,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { require.NoError(t, longFile.Close()) // Verify we have no checkpointed files - require.Equal(t, 0, len(operator.knownFiles)) + require.Equal(t, 0, operator.totalReaders()) // Wait until the only line in the short file and // at least one line from the long file have been consumed @@ -1286,7 +1285,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { operator.wg.Wait() if runtime.GOOS != "windows" { // On windows, we never keep files in previousPollFiles, so we don't expect to see them here - require.Len(t, operator.previousPollFiles, 1) + require.Equal(t, operator.previousPollFiles.Len(), 1) } // keep append data to file1 and file2 diff --git a/pkg/stanza/fileconsumer/file_windows.go b/pkg/stanza/fileconsumer/file_windows.go index 2c22b5ed4401..5481278301dd 100644 --- a/pkg/stanza/fileconsumer/file_windows.go +++ b/pkg/stanza/fileconsumer/file_windows.go @@ -7,15 +7,14 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" ) -func (m *Manager) preConsume(_ context.Context, _ []*reader.Reader) { +func (m *Manager) preConsume(ctx context.Context) { } // On windows, we close files immediately after reading because they cannot be moved while open. -func (m *Manager) postConsume(readers []*reader.Reader) { - m.previousPollFiles = readers +func (m *Manager) postConsume() { + // m.currentPollFiles -> m.previousPollFiles + m.previousPollFiles = m.currentPollFiles m.closePreviousFiles() } diff --git a/pkg/stanza/fileconsumer/internal/fileset/fileset.go b/pkg/stanza/fileconsumer/internal/fileset/fileset.go index ead43a717ce6..3d1cf50fb143 100644 --- a/pkg/stanza/fileconsumer/internal/fileset/fileset.go +++ b/pkg/stanza/fileconsumer/internal/fileset/fileset.go @@ -55,27 +55,22 @@ func (set *Fileset[T]) Add(readers ...T) { set.readers = append(set.readers, readers...) } -func (set *Fileset[T]) Clear() { - // clear the underlying readers - set.readers = make([]T, 0, cap(set.readers)) -} - -func (set *Fileset[T]) Reset(readers ...T) []T { - // empty the underlying set and return the old array - arr := make([]T, len(set.readers)) - copy(arr, set.readers) - set.Clear() - set.readers = append(set.readers, readers...) - return arr -} - -func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint) T { +func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint, cmp func(a, b *fingerprint.Fingerprint) bool) T { var val T for idx, r := range set.readers { - if fp.StartsWith(r.GetFingerprint()) { + if cmp(fp, r.GetFingerprint()) { set.readers = append(set.readers[:idx], set.readers[idx+1:]...) return r } } return val } + +// comparators +func StartsWith(a, b *fingerprint.Fingerprint) bool { + return a.StartsWith(b) +} + +func Equal(a, b *fingerprint.Fingerprint) bool { + return a.Equal(b) +} diff --git a/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go b/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go index 9e6d9269a4cb..878791569f86 100644 --- a/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go +++ b/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go @@ -32,27 +32,22 @@ func push[T Matchable](ele ...T) func(t *testing.T, fileset *Fileset[T]) { func pop[T Matchable](expectedErr error, expectedElemet T) func(t *testing.T, fileset *Fileset[T]) { return func(t *testing.T, fileset *Fileset[T]) { + pr := fileset.Len() el, err := fileset.Pop() if expectedErr == nil { require.NoError(t, err) require.Equal(t, el, expectedElemet) + require.Equal(t, pr-1, fileset.Len()) } else { require.ErrorIs(t, err, expectedErr) } } } -func reset[T Matchable](ele ...T) func(t *testing.T, fileset *Fileset[T]) { - return func(t *testing.T, fileset *Fileset[T]) { - fileset.Reset(ele...) - require.Equal(t, fileset.Len(), len(ele)) - } -} - func match[T Matchable](ele T, expect bool) func(t *testing.T, fileset *Fileset[T]) { return func(t *testing.T, fileset *Fileset[T]) { pr := fileset.Len() - r := fileset.Match(ele.GetFingerprint()) + r := fileset.Match(ele.GetFingerprint(), StartsWith) if expect { require.NotNil(t, r) require.Equal(t, pr-1, fileset.Len()) @@ -90,12 +85,13 @@ func TestFilesetReader(t *testing.T) { // match() removes the matched item and returns it match(newReader([]byte("ABCDEFGHI")), true), - match(newReader([]byte("ABCEFGHI")), false), + match(newReader([]byte("ABCDEFGHI")), false), - reset(newReader([]byte("XYZ"))), + push(newReader([]byte("XYZ"))), match(newReader([]byte("ABCDEF")), false), - match(newReader([]byte("QWERT")), false), + match(newReader([]byte("QWERT")), true), // should still be present match(newReader([]byte("XYZabc")), true), + pop(errFilesetEmpty, newReader([]byte(""))), }, }, { @@ -106,7 +102,7 @@ func TestFilesetReader(t *testing.T) { pop(nil, newReader([]byte("QWERT"))), pop(errFilesetEmpty, newReader([]byte(""))), - reset(newReader([]byte("XYZ"))), + push(newReader([]byte("XYZ"))), pop(nil, newReader([]byte("XYZ"))), pop(errFilesetEmpty, newReader([]byte(""))), }, diff --git a/receiver/filelogreceiver/go.mod b/receiver/filelogreceiver/go.mod index 1445cc712830..e485d8e449b4 100644 --- a/receiver/filelogreceiver/go.mod +++ b/receiver/filelogreceiver/go.mod @@ -57,6 +57,7 @@ require ( go.opentelemetry.io/otel/sdk v1.22.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.22.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20230711023510-fffb14384f22 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/receiver/filelogreceiver/go.sum b/receiver/filelogreceiver/go.sum index f64eb2fd729f..fe66e96e87e8 100644 --- a/receiver/filelogreceiver/go.sum +++ b/receiver/filelogreceiver/go.sum @@ -120,6 +120,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20230711023510-fffb14384f22 h1:FqrVOBQxQ8r/UwwXibI0KMolVhvFiGobSfdE33deHJM= +golang.org/x/exp v0.0.0-20230711023510-fffb14384f22/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= diff --git a/receiver/otlpjsonfilereceiver/go.mod b/receiver/otlpjsonfilereceiver/go.mod index 795abc23c976..e00d734e5ef2 100644 --- a/receiver/otlpjsonfilereceiver/go.mod +++ b/receiver/otlpjsonfilereceiver/go.mod @@ -55,6 +55,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.22.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect + golang.org/x/exp v0.0.0-20230711023510-fffb14384f22 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/receiver/otlpjsonfilereceiver/go.sum b/receiver/otlpjsonfilereceiver/go.sum index 46a35c197ecb..74c1dc0a6589 100644 --- a/receiver/otlpjsonfilereceiver/go.sum +++ b/receiver/otlpjsonfilereceiver/go.sum @@ -118,6 +118,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20230711023510-fffb14384f22 h1:FqrVOBQxQ8r/UwwXibI0KMolVhvFiGobSfdE33deHJM= +golang.org/x/exp v0.0.0-20230711023510-fffb14384f22/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=