Skip to content

Commit

Permalink
[chore][pkg/stanza] fileconsumer: integrate recently added fileset (#…
Browse files Browse the repository at this point in the history
…30728)

**Description:** Integrate recently added fileset

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
VihasMakwana and djaglowski authored Feb 4, 2024
1 parent 24cfa93 commit 416f296
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 100 deletions.
11 changes: 8 additions & 3 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
Expand Down Expand Up @@ -163,16 +164,20 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
HeaderConfig: hCfg,
DeleteAtEOF: c.DeleteAfterRead,
}

knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](c.MaxConcurrentFiles / 2)
}
return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
readerFactory: readerFactory,
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2),
knownFiles: []*reader.Metadata{},
currentPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2),
previousPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2),
knownFiles: knownFiles,
}, nil
}

Expand Down
111 changes: 56 additions & 55 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
Expand All @@ -32,26 +33,17 @@ type Manager struct {
maxBatches int
maxBatchFiles int

previousPollFiles []*reader.Reader
knownFiles []*reader.Metadata

// This value approximates the expected number of files which we will find in a single poll cycle.
// It is updated each poll cycle using a simple moving average calculation which assigns 20% weight
// to the most recent poll cycle.
// It is used to regulate the size of knownFiles. The goal is to allow knownFiles
// to contain checkpoints from a few previous poll cycles, but not grow unbounded.
movingAverageMatches int
currentPollFiles *fileset.Fileset[*reader.Reader]
previousPollFiles *fileset.Fileset[*reader.Reader]
knownFiles []*fileset.Fileset[*reader.Metadata]
}

func (m *Manager) Start(persister operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

if matches, err := m.fileMatcher.MatchFiles(); err != nil {
if _, err := m.fileMatcher.MatchFiles(); err != nil {
m.Warnf("finding files: %v", err)
} else {
m.movingAverageMatches = len(matches)
m.knownFiles = make([]*reader.Metadata, 0, 4*len(matches))
}

if persister != nil {
Expand All @@ -63,7 +55,7 @@ func (m *Manager) Start(persister operator.Persister) error {
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
m.knownFiles = append(m.knownFiles, offsets...)
m.knownFiles[0].Add(offsets...)
}
}

Expand All @@ -74,13 +66,18 @@ func (m *Manager) Start(persister operator.Persister) error {
}

func (m *Manager) closePreviousFiles() {
if len(m.knownFiles) > 4*m.movingAverageMatches {
m.knownFiles = m.knownFiles[m.movingAverageMatches:]
}
for _, r := range m.previousPollFiles {
m.knownFiles = append(m.knownFiles, r.Close())
// m.previousPollFiles -> m.knownFiles[0]

for r, _ := m.previousPollFiles.Pop(); r != nil; r, _ = m.previousPollFiles.Pop() {
m.knownFiles[0].Add(r.Close())
}
m.previousPollFiles = nil
}

func (m *Manager) rotateFilesets() {
// shift the filesets at end of every consume() call
// m.knownFiles[0] -> m.knownFiles[1] -> m.knownFiles[2]
copy(m.knownFiles[1:], m.knownFiles)
m.knownFiles[0] = fileset.New[*reader.Metadata](m.maxBatchFiles / 2)
}

// Stop will stop the file monitoring process
Expand All @@ -92,7 +89,11 @@ func (m *Manager) Stop() error {
m.wg.Wait()
m.closePreviousFiles()
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil {
checkpoints := make([]*reader.Metadata, 0, m.totalReaders())
for _, knownFiles := range m.knownFiles {
checkpoints = append(checkpoints, knownFiles.Get()...)
}
if err := checkpoint.Save(context.Background(), m.persister, checkpoints); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
Expand Down Expand Up @@ -128,9 +129,7 @@ func (m *Manager) poll(ctx context.Context) {
// Get the list of paths on disk
matches, err := m.fileMatcher.MatchFiles()
if err != nil {
m.Debugf("finding files: %v", err)
} else {
m.movingAverageMatches = (m.movingAverageMatches*3 + len(matches)) / 4
m.Warnf("finding files: %v", err)
}
m.Debugf("matched files", zap.Strings("paths", matches))

Expand All @@ -152,26 +151,31 @@ func (m *Manager) poll(ctx context.Context) {
// Any new files that appear should be consumed entirely
m.readerFactory.FromBeginning = true
if m.persister != nil {
allCheckpoints := make([]*reader.Metadata, 0, len(m.knownFiles)+len(m.previousPollFiles))
allCheckpoints = append(allCheckpoints, m.knownFiles...)
for _, r := range m.previousPollFiles {
allCheckpoints := make([]*reader.Metadata, 0, m.totalReaders())
for _, knownFiles := range m.knownFiles {
allCheckpoints = append(allCheckpoints, knownFiles.Get()...)
}

for _, r := range m.previousPollFiles.Get() {
allCheckpoints = append(allCheckpoints, r.Metadata)
}
if err := checkpoint.Save(context.Background(), m.persister, allCheckpoints); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
// rotate at end of every poll()
m.rotateFilesets()
}

func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files", zap.Strings("paths", paths))
readers := m.makeReaders(paths)
m.makeReaders(paths)

m.preConsume(ctx, readers)
m.preConsume(ctx)

// read new readers to end
var wg sync.WaitGroup
for _, r := range readers {
for _, r := range m.currentPollFiles.Get() {
wg.Add(1)
go func(r *reader.Reader) {
defer wg.Done()
Expand All @@ -180,7 +184,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

m.postConsume(readers)
m.postConsume()
}

func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
Expand Down Expand Up @@ -211,9 +215,8 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReaders(paths []string) []*reader.Reader {
readers := make([]*reader.Reader, 0, len(paths))
OUTER:
func (m *Manager) makeReaders(paths []string) {
m.currentPollFiles = fileset.New[*reader.Reader](m.maxBatchFiles / 2)
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
Expand All @@ -222,13 +225,13 @@ OUTER:

// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
for _, r := range readers {
if fp.Equal(r.Fingerprint) {
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
}
continue OUTER
if r := m.currentPollFiles.Match(fp, fileset.Equal); r != nil {
// re-add the reader as Match() removes duplicates
m.currentPollFiles.Add(r)
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
}
continue
}

r, err := m.newReader(file, fp)
Expand All @@ -237,29 +240,19 @@ OUTER:
continue
}

readers = append(readers, r)
m.currentPollFiles.Add(r)
}
return readers
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check previous poll cycle for match
for i := 0; i < len(m.previousPollFiles); i++ {
oldReader := m.previousPollFiles[i]
if fp.StartsWith(oldReader.Fingerprint) {
// Keep the new reader and discard the old. This ensures that if the file was
// copied to another location and truncated, our handle is updated.
m.previousPollFiles = append(m.previousPollFiles[:i], m.previousPollFiles[i+1:]...)
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}
if oldReader := m.previousPollFiles.Match(fp, fileset.StartsWith); oldReader != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}

// Iterate backwards to match newest first
for i := len(m.knownFiles) - 1; i >= 0; i-- {
oldMetadata := m.knownFiles[i]
if fp.StartsWith(oldMetadata.Fingerprint) {
// Remove the old metadata from the list. We will keep updating it and save it again later.
m.knownFiles = append(m.knownFiles[:i], m.knownFiles[i+1:]...)
for i := 0; i < len(m.knownFiles); i++ {
if oldMetadata := m.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}
}
Expand All @@ -268,3 +261,11 @@ func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader
m.Infow("Started watching file", "path", file.Name())
return m.readerFactory.NewReader(file, fp)
}

func (m *Manager) totalReaders() int {
total := m.previousPollFiles.Len()
for i := 0; i < len(m.knownFiles); i++ {
total += m.knownFiles[i].Len()
}
return total
}
14 changes: 8 additions & 6 deletions pkg/stanza/fileconsumer/file_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
// Take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
func (m *Manager) preConsume(ctx context.Context, newReaders []*reader.Reader) {
lostReaders := make([]*reader.Reader, 0, len(m.previousPollFiles))
func (m *Manager) preConsume(ctx context.Context) {
lostReaders := make([]*reader.Reader, 0, m.previousPollFiles.Len())
OUTER:
for _, oldReader := range m.previousPollFiles {
for _, newReader := range newReaders {
for _, oldReader := range m.previousPollFiles.Get() {
for _, newReader := range m.currentPollFiles.Get() {
if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
Expand Down Expand Up @@ -52,7 +52,9 @@ OUTER:

// On non-windows platforms, we keep files open between poll cycles so that we can detect
// and read "lost" files, which have been moved out of the matching pattern.
func (m *Manager) postConsume(readers []*reader.Reader) {
func (m *Manager) postConsume() {
m.closePreviousFiles()
m.previousPollFiles = readers

// m.currentPollFiles -> m.previousPollFiles
m.previousPollFiles = m.currentPollFiles
}
5 changes: 2 additions & 3 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,6 @@ func TestFileBatchingRespectsStartAtEnd(t *testing.T) {

operator, sink := testManager(t, cfg)
operator.persister = testutil.NewUnscopedMockPersister()
operator.movingAverageMatches = 10

temps := make([]*os.File, 0, initFiles+moreFiles)
for i := 0; i < initFiles; i++ {
Expand Down Expand Up @@ -1144,7 +1143,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
require.NoError(t, longFile.Close())

// Verify we have no checkpointed files
require.Equal(t, 0, len(operator.knownFiles))
require.Equal(t, 0, operator.totalReaders())

// Wait until the only line in the short file and
// at least one line from the long file have been consumed
Expand Down Expand Up @@ -1286,7 +1285,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) {
operator.wg.Wait()
if runtime.GOOS != "windows" {
// On windows, we never keep files in previousPollFiles, so we don't expect to see them here
require.Len(t, operator.previousPollFiles, 1)
require.Equal(t, operator.previousPollFiles.Len(), 1)
}

// keep append data to file1 and file2
Expand Down
9 changes: 4 additions & 5 deletions pkg/stanza/fileconsumer/file_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto

import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

func (m *Manager) preConsume(_ context.Context, _ []*reader.Reader) {
func (m *Manager) preConsume(ctx context.Context) {
}

// On windows, we close files immediately after reading because they cannot be moved while open.
func (m *Manager) postConsume(readers []*reader.Reader) {
m.previousPollFiles = readers
func (m *Manager) postConsume() {
// m.currentPollFiles -> m.previousPollFiles
m.previousPollFiles = m.currentPollFiles
m.closePreviousFiles()
}
27 changes: 11 additions & 16 deletions pkg/stanza/fileconsumer/internal/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,22 @@ func (set *Fileset[T]) Add(readers ...T) {
set.readers = append(set.readers, readers...)
}

func (set *Fileset[T]) Clear() {
// clear the underlying readers
set.readers = make([]T, 0, cap(set.readers))
}

func (set *Fileset[T]) Reset(readers ...T) []T {
// empty the underlying set and return the old array
arr := make([]T, len(set.readers))
copy(arr, set.readers)
set.Clear()
set.readers = append(set.readers, readers...)
return arr
}

func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint) T {
func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint, cmp func(a, b *fingerprint.Fingerprint) bool) T {
var val T
for idx, r := range set.readers {
if fp.StartsWith(r.GetFingerprint()) {
if cmp(fp, r.GetFingerprint()) {
set.readers = append(set.readers[:idx], set.readers[idx+1:]...)
return r
}
}
return val
}

// comparators
func StartsWith(a, b *fingerprint.Fingerprint) bool {
return a.StartsWith(b)
}

func Equal(a, b *fingerprint.Fingerprint) bool {
return a.Equal(b)
}
Loading

0 comments on commit 416f296

Please sign in to comment.