diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 2eec1b6b2e79..bd0daf2d3e40 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -25,7 +25,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -87,6 +86,7 @@ type Config struct { DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"` Compression string `mapstructure:"compression,omitempty"` + PollsToArchive int `mapstructure:"-"` // TODO: activate this config once archiving is set up AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"` } @@ -174,13 +174,6 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts AcquireFSLock: c.AcquireFSLock, } - var t tracker.Tracker - if o.noTracking { - t = tracker.NewNoStateTracker(set, c.MaxConcurrentFiles/2) - } else { - t = tracker.NewFileTracker(set, c.MaxConcurrentFiles/2) - } - telemetryBuilder, err := metadata.NewTelemetryBuilder(set) if err != nil { return nil, err @@ -192,8 +185,8 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts pollInterval: c.PollInterval, maxBatchFiles: c.MaxConcurrentFiles / 2, maxBatches: c.MaxBatches, - tracker: t, telemetryBuilder: telemetryBuilder, + noTracking: o.noTracking, }, nil } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index d46507ecf3eb..c5675a90a063 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -30,11 +30,13 @@ type Manager struct { readerFactory reader.Factory fileMatcher *matcher.Matcher tracker tracker.Tracker + noTracking bool - pollInterval time.Duration - persister operator.Persister - maxBatches int - maxBatchFiles int + pollInterval time.Duration + persister operator.Persister + maxBatches int + maxBatchFiles int + pollsToArchive int telemetryBuilder *metadata.TelemetryBuilder } @@ -47,6 +49,9 @@ func (m *Manager) Start(persister operator.Persister) error { m.set.Logger.Warn("finding files", zap.Error(err)) } + // instantiate the tracker + m.instantiateTracker(persister) + if persister != nil { m.persister = persister offsets, err := checkpoint.Load(ctx, m.persister) @@ -58,6 +63,8 @@ func (m *Manager) Start(persister operator.Persister) error { m.readerFactory.FromBeginning = true m.tracker.LoadMetadata(offsets) } + } else if m.pollsToArchive > 0 { + m.set.Logger.Error("archiving is not supported in memory, please use a storage extension") } // Start polling goroutine @@ -73,7 +80,9 @@ func (m *Manager) Stop() error { m.cancel = nil } m.wg.Wait() - m.telemetryBuilder.FileconsumerOpenFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles())) + if m.tracker != nil { + m.telemetryBuilder.FileconsumerOpenFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles())) + } if m.persister != nil { if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil { m.set.Logger.Error("save offsets", zap.Error(err)) @@ -261,3 +270,13 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint. m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1) return r, nil } + +func (m *Manager) instantiateTracker(persister operator.Persister) { + var t tracker.Tracker + if m.noTracking { + t = tracker.NewNoStateTracker(m.set, m.maxBatchFiles) + } else { + t = tracker.NewFileTracker(m.set, m.maxBatchFiles, m.pollsToArchive, persister) + } + m.tracker = t +} diff --git a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go index b75933abcc28..b9476fb3d5e9 100644 --- a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go +++ b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go @@ -18,6 +18,10 @@ const knownFilesKey = "knownFiles" // Save syncs the most recent set of files to the database func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata) error { + return SaveKey(ctx, persister, rmds, knownFilesKey) +} + +func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string) error { var buf bytes.Buffer enc := json.NewEncoder(&buf) @@ -34,7 +38,7 @@ func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Meta } } - if err := persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil { + if err := persister.Set(ctx, key, buf.Bytes()); err != nil { errs = append(errs, fmt.Errorf("persist known files: %w", err)) } diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index 5039003a36ed..54bf5e9e12c1 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -4,12 +4,17 @@ package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" import ( + "context" + "fmt" + "go.opentelemetry.io/collector/component" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) // Interface for tracking files that are being consumed. @@ -37,9 +42,16 @@ type fileTracker struct { currentPollFiles *fileset.Fileset[*reader.Reader] previousPollFiles *fileset.Fileset[*reader.Reader] knownFiles []*fileset.Fileset[*reader.Metadata] + + // persister is to be used to store offsets older than 3 poll cycles. + // These offsets will be stored on disk + persister operator.Persister + + pollsToArchive int + archiveIndex int } -func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int) Tracker { +func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker { knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) for i := 0; i < len(knownFiles); i++ { knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles) @@ -51,6 +63,9 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int) Tracker currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles), previousPollFiles: fileset.New[*reader.Reader](maxBatchFiles), knownFiles: knownFiles, + pollsToArchive: pollsToArchive, + persister: persister, + archiveIndex: 0, } } @@ -113,6 +128,9 @@ func (t *fileTracker) ClosePreviousFiles() (filesClosed int) { func (t *fileTracker) EndPoll() { // shift the filesets at end of every poll() call // t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2] + + // Instead of throwing it away, archive it. + t.archive(t.knownFiles[2]) copy(t.knownFiles[1:], t.knownFiles) t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles) } @@ -125,6 +143,34 @@ func (t *fileTracker) TotalReaders() int { return total } +func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { + // We make use of a ring buffer, where each set of files is stored under a specific index. + // Instead of discarding knownFiles[2], write it to the next index and eventually roll over. + // Separate storage keys knownFilesArchive0, knownFilesArchive1, ..., knownFilesArchiveN, roll over back to knownFilesArchive0 + + // Archiving: ┌─────────────────────on-disk archive─────────────────────────┐ + // | ┌───┐ ┌───┐ ┌──────────────────┐ | + // index | ▶ │ 0 │ ▶ │ 1 │ ▶ ... ▶ │ polls_to_archive │ | + // | ▲ └───┘ └───┘ └──────────────────┘ | + // | ▲ ▲ ▼ | + // | ▲ │ Roll over overriting older offsets, if any ◀ | + // └──────│──────────────────────────────────────────────────────┘ + // │ + // │ + // │ + // start + // index + + if t.pollsToArchive <= 0 || t.persister == nil { + return + } + key := fmt.Sprintf("knownFiles%d", t.archiveIndex) + if err := checkpoint.SaveKey(context.Background(), t.persister, metadata.Get(), key); err != nil { + t.set.Logger.Error("error faced while saving to the archive", zap.Error(err)) + } + t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index +} + // noStateTracker only tracks the current polled files. Once the poll is // complete and telemetry is consumed, the tracked files are closed. The next // poll will create fresh readers with no previously tracked offsets. diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 19d465a6e43b..69bb92ca26cd 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -10,6 +10,8 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest.Sink) { @@ -20,6 +22,7 @@ func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink, opts ...Option) *Manager { set := componenttest.NewNopTelemetrySettings() input, err := cfg.Build(set, sink.Callback, opts...) + input.tracker = tracker.NewFileTracker(set, cfg.MaxBatches, cfg.PollsToArchive, testutil.NewUnscopedMockPersister()) require.NoError(t, err) t.Cleanup(func() { input.tracker.ClosePreviousFiles() }) return input diff --git a/receiver/nginxreceiver/factory.go b/receiver/nginxreceiver/factory.go index d04e07ea848c..feb633b7ca38 100644 --- a/receiver/nginxreceiver/factory.go +++ b/receiver/nginxreceiver/factory.go @@ -28,12 +28,13 @@ func createDefaultConfig() component.Config { cfg := scraperhelper.NewDefaultControllerConfig() cfg.CollectionInterval = 10 * time.Second + clientConfig := confighttp.NewDefaultClientConfig() + clientConfig.Endpoint = "http://localhost:80/status" + clientConfig.Timeout = 10 * time.Second + return &Config{ - ControllerConfig: cfg, - ClientConfig: confighttp.ClientConfig{ - Endpoint: "http://localhost:80/status", - Timeout: 10 * time.Second, - }, + ControllerConfig: cfg, + ClientConfig: clientConfig, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), } }