From cd888d381c3b1a25dfd909d7e752d0665f3ad87b Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Fri, 8 Nov 2024 11:43:09 +0100 Subject: [PATCH 01/12] remove converter type Signed-off-by: Florian Bacher --- pkg/stanza/adapter/converter.go | 187 +---------------- pkg/stanza/adapter/converter_test.go | 210 ++----------------- receiver/filelogreceiver/filelog_test.go | 45 ++-- receiver/filelogreceiver/go.mod | 2 + receiver/namedpipereceiver/namedpipe_test.go | 4 - 5 files changed, 39 insertions(+), 409 deletions(-) diff --git a/pkg/stanza/adapter/converter.go b/pkg/stanza/adapter/converter.go index 3ab508745bc3..c42c0a458400 100644 --- a/pkg/stanza/adapter/converter.go +++ b/pkg/stanza/adapter/converter.go @@ -4,162 +4,18 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" import ( - "context" "encoding/binary" "encoding/json" - "errors" "fmt" - "math" - "runtime" "sort" "sync" "github.com/cespare/xxhash/v2" - "go.opentelemetry.io/collector/component" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) -// Converter converts a batch of entry.Entry into plog.Logs aggregating translated -// entries into logs coming from the same Resource. -// -// The diagram below illustrates the internal communication inside the Converter: -// -// ┌─────────────────────────────────┐ -// │ Batch() │ -// ┌─────────┤ Ingests batches of log entries │ -// │ │ and sends them onto workerChan │ -// │ └─────────────────────────────────┘ -// │ -// │ ┌───────────────────────────────────────────────────┐ -// ├─► workerLoop() │ -// │ │ ┌─────────────────────────────────────────────────┴─┐ -// ├─┼─► workerLoop() │ -// │ │ │ ┌─────────────────────────────────────────────────┴─┐ -// └─┼─┼─► workerLoop() │ -// └─┤ │ consumes sent log entries from workerChan, │ -// │ │ translates received entries to plog.LogRecords, │ -// └─┤ and sends them on flushChan │ -// └─────────────────────────┬─────────────────────────┘ -// │ -// ▼ -// ┌─────────────────────────────────────────────────────┐ -// │ flushLoop() │ -// │ receives log records from flushChan and sends │ -// │ them onto pLogsChan which is consumed by │ -// │ downstream consumers via OutChannel() │ -// └─────────────────────────────────────────────────────┘ -type Converter struct { - set component.TelemetrySettings - - // pLogsChan is a channel on which aggregated logs will be sent to. - pLogsChan chan plog.Logs - - stopOnce sync.Once - - // converterChan is an internal communication channel signaling stop was called - // prevents sending to closed channels - converterChan chan struct{} - - // workerChan is an internal communication channel that gets the log - // entries from Batch() calls and it receives the data in workerLoop(). - workerChan chan []*entry.Entry - // workerCount configures the amount of workers started. - workerCount int - - // flushChan is an internal channel used for transporting batched plog.Logs. - flushChan chan plog.Logs - - // wg is a WaitGroup that makes sure that we wait for spun up goroutines exit - // when Stop() is called. - wg sync.WaitGroup - - // flushWg is a WaitGroup that makes sure that we wait for flush loop to exit - // when Stop() is called. - flushWg sync.WaitGroup -} - -type converterOption interface { - apply(*Converter) -} - -func withWorkerCount(workerCount int) converterOption { - return workerCountOption{workerCount} -} - -type workerCountOption struct { - workerCount int -} - -func (o workerCountOption) apply(c *Converter) { - c.workerCount = o.workerCount -} - -func NewConverter(set component.TelemetrySettings, opts ...converterOption) *Converter { - set.Logger = set.Logger.With(zap.String("component", "converter")) - c := &Converter{ - set: set, - workerChan: make(chan []*entry.Entry), - workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))), - pLogsChan: make(chan plog.Logs), - converterChan: make(chan struct{}), - flushChan: make(chan plog.Logs), - } - for _, opt := range opts { - opt.apply(c) - } - return c -} - -func (c *Converter) Start() { - c.set.Logger.Debug("Starting log converter", zap.Int("worker_count", c.workerCount)) - - c.wg.Add(c.workerCount) - for i := 0; i < c.workerCount; i++ { - go c.workerLoop() - } - - c.flushWg.Add(1) - go c.flushLoop() -} - -func (c *Converter) Stop() { - c.stopOnce.Do(func() { - close(c.converterChan) - - // close workerChan and wait for entries to be processed - close(c.workerChan) - c.wg.Wait() - - // close flushChan and wait for flush loop to finish - close(c.flushChan) - c.flushWg.Wait() - - // close pLogsChan so callers can stop processing - close(c.pLogsChan) - }) -} - -// OutChannel returns the channel on which converted entries will be sent to. -func (c *Converter) OutChannel() <-chan plog.Logs { - return c.pLogsChan -} - -// workerLoop is responsible for obtaining log entries from Batch() calls, -// converting them to plog.LogRecords batched by Resource, and sending them -// on flushChan. -func (c *Converter) workerLoop() { - defer c.wg.Done() - - for entries := range c.workerChan { - // Send plogs directly to flushChan - c.flushChan <- ConvertEntries(entries) - } -} - func ConvertEntries(entries []*entry.Entry) plog.Logs { resourceHashToIdx := make(map[uint64]int) scopeIdxByResource := make(map[uint64]map[string]int) @@ -197,47 +53,6 @@ func ConvertEntries(entries []*entry.Entry) plog.Logs { return pLogs } -func (c *Converter) flushLoop() { - defer c.flushWg.Done() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - for pLogs := range c.flushChan { - if err := c.flush(ctx, pLogs); err != nil { - c.set.Logger.Debug("Problem sending log entries", - zap.Error(err), - ) - } - } -} - -// flush flushes provided plog.Logs entries onto a channel. -func (c *Converter) flush(ctx context.Context, pLogs plog.Logs) error { - doneChan := ctx.Done() - - select { - case <-doneChan: - return fmt.Errorf("flushing log entries interrupted, err: %w", ctx.Err()) - - case c.pLogsChan <- pLogs: - } - - return nil -} - -// Batch takes in an entry.Entry and sends it to an available worker for processing. -func (c *Converter) Batch(e []*entry.Entry) error { - // in case Stop was called do not process batch - select { - case <-c.converterChan: - return errors.New("logs converter has been stopped") - default: - } - - c.workerChan <- e - return nil -} - // convert converts one entry.Entry into plog.LogRecord allocating it. func convert(ent *entry.Entry) plog.LogRecord { dest := plog.NewLogRecord() diff --git a/pkg/stanza/adapter/converter_test.go b/pkg/stanza/adapter/converter_test.go index 8092cb677592..5bcd9b6420ed 100644 --- a/pkg/stanza/adapter/converter_test.go +++ b/pkg/stanza/adapter/converter_test.go @@ -4,15 +4,12 @@ package adapter import ( - "context" "fmt" "sort" "strconv" - "sync" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" @@ -42,10 +39,6 @@ func BenchmarkConvertComplex(b *testing.B) { } } -func complexEntries(count int) []*entry.Entry { - return complexEntriesForNDifferentHosts(count, 1) -} - func complexEntriesForNDifferentHosts(count int, n int) []*entry.Entry { ret := make([]*entry.Entry, count) for i := 0; i < count; i++ { @@ -394,161 +387,27 @@ func TestAllConvertedEntriesScopeGrouping(t *testing.T) { set := componenttest.NewNopTelemetrySettings() set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - - go func() { - entries := complexEntriesForNDifferentHostsMDifferentScopes(100, 1, tc.numberOFScopes) - assert.NoError(t, converter.Batch(entries)) - }() - - var ( - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - select { - case pLogs, ok := <-ch: - if !ok { - break - } - - rLogs := pLogs.ResourceLogs() - rLog := rLogs.At(0) - - ills := rLog.ScopeLogs() - require.Equal(t, ills.Len(), tc.numberOFScopes) - - for i := 0; i < tc.numberOFScopes; i++ { - sl := ills.At(i) - require.Equal(t, sl.Scope().Name(), fmt.Sprintf("scope-%d", i%tc.numberOFScopes)) - require.Equal(t, sl.LogRecords().Len(), tc.logsPerScope) - } - - case <-timeoutTimer.C: - break - } - }) - } -} - -func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) { - t.Parallel() - - testcases := []struct { - entries int - maxFlushCount uint - }{ - { - entries: 10, - maxFlushCount: 10, - }, - { - entries: 10, - maxFlushCount: 3, - }, - { - entries: 100, - maxFlushCount: 20, - }, - } - - for i, tc := range testcases { - tc := tc - - t.Run(strconv.Itoa(i), func(t *testing.T) { - t.Parallel() - - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - - go func() { - entries := complexEntries(tc.entries) - for from := 0; from < tc.entries; from += int(tc.maxFlushCount) { - to := from + int(tc.maxFlushCount) - if to > tc.entries { - to = tc.entries - } - assert.NoError(t, converter.Batch(entries[from:to])) - } - }() - - var ( - actualCount int - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - forLoop: - for { - if tc.entries == actualCount { - break - } - select { - case pLogs, ok := <-ch: - if !ok { - break forLoop - } + entries := complexEntriesForNDifferentHostsMDifferentScopes(100, 1, tc.numberOFScopes) - rLogs := pLogs.ResourceLogs() - require.Equal(t, 1, rLogs.Len()) + pLogs := ConvertEntries(entries) - rLog := rLogs.At(0) - ills := rLog.ScopeLogs() - require.Equal(t, 1, ills.Len()) + rLogs := pLogs.ResourceLogs() + rLog := rLogs.At(0) - sl := ills.At(0) + ills := rLog.ScopeLogs() + require.Equal(t, ills.Len(), tc.numberOFScopes) - actualCount += sl.LogRecords().Len() - - assert.LessOrEqual(t, uint(sl.LogRecords().Len()), tc.maxFlushCount, - "Received more log records in one flush than configured by maxFlushCount", - ) - - case <-timeoutTimer.C: - break forLoop - } + for i := 0; i < tc.numberOFScopes; i++ { + sl := ills.At(i) + require.Equal(t, sl.Scope().Name(), fmt.Sprintf("scope-%d", i%tc.numberOFScopes)) + require.Equal(t, sl.LogRecords().Len(), tc.logsPerScope) } - assert.Equal(t, tc.entries, actualCount, - "didn't receive expected number of entries after conversion", - ) }) } } -func TestConverterCancelledContextCancellsTheFlush(t *testing.T) { - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - var wg sync.WaitGroup - wg.Add(1) - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - go func() { - defer wg.Done() - pLogs := plog.NewLogs() - ills := pLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty() - - lr := convert(complexEntry()) - lr.CopyTo(ills.LogRecords().AppendEmpty()) - - assert.Error(t, converter.flush(ctx, pLogs)) - }() - wg.Wait() -} - func TestConvertMetadata(t *testing.T) { now := time.Now() @@ -946,53 +805,18 @@ func BenchmarkConverter(b *testing.B) { for i := 0; i < b.N; i++ { set := componenttest.NewNopTelemetrySettings() set.Logger = zaptest.NewLogger(b) - converter := NewConverter(set, withWorkerCount(wc)) - converter.Start() - defer converter.Stop() b.ReportAllocs() - go func() { - for from := 0; from < entryCount; from += int(batchSize) { - to := from + int(batchSize) - if to > entryCount { - to = entryCount - } - assert.NoError(b, converter.Batch(entries[from:to])) - } - }() - - var ( - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - var n int - forLoop: - for { - if n == entryCount { - break - } - - select { - case pLogs, ok := <-ch: - if !ok { - break forLoop - } - - rLogs := pLogs.ResourceLogs() - require.Equal(b, hostsCount, rLogs.Len()) - n += pLogs.LogRecordCount() - - case <-timeoutTimer.C: - break forLoop + for from := 0; from < entryCount; from += int(batchSize) { + to := from + int(batchSize) + if to > entryCount { + to = entryCount } + pLogs := ConvertEntries(entries[from:to]) + rLogs := pLogs.ResourceLogs() + require.Equal(b, hostsCount, rLogs.Len()) } - - assert.Equal(b, entryCount, n, - "didn't receive expected number of entries after conversion", - ) } }) } diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 80931bbe87ee..5fde73fe232a 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" "io" "os" "path/filepath" @@ -82,44 +83,41 @@ func TestReadStaticFile(t *testing.T) { sink := new(consumertest.LogsSink) cfg := testdataConfigYaml() - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - defer converter.Stop() - - var wg sync.WaitGroup - wg.Add(1) - go consumeNLogsFromConverter(converter.OutChannel(), 3, &wg) - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) + expectedLogs := []plog.Logs{} // Build the expected set by using adapter.Converter to translate entries // to pdata Logs. - queueEntry := func(t *testing.T, c *adapter.Converter, msg string, severity entry.Severity) { + entries := []*entry.Entry{} + queueEntry := func(t *testing.T, msg string, severity entry.Severity) { e := entry.New() e.Timestamp = expectedTimestamp require.NoError(t, e.Set(entry.NewBodyField("msg"), msg)) e.Severity = severity e.AddAttribute("file_name", "simple.log") - require.NoError(t, c.Batch([]*entry.Entry{e})) + entries = append(entries, e) } - queueEntry(t, converter, "Something routine", entry.Info) - queueEntry(t, converter, "Something bad happened!", entry.Error) - queueEntry(t, converter, "Some details...", entry.Debug) + queueEntry(t, "Something routine", entry.Info) + queueEntry(t, "Something bad happened!", entry.Error) + queueEntry(t, "Some details...", entry.Debug) + + expectedLogs = append(expectedLogs, adapter.ConvertEntries(entries)) dir, err := os.Getwd() require.NoError(t, err) t.Logf("Working Directory: %s", dir) - wg.Wait() - require.Eventually(t, expectNLogs(sink, 3), 2*time.Second, 5*time.Millisecond, "expected %d but got %d logs", 3, sink.LogRecordCount(), ) // TODO: Figure out a nice way to assert each logs entry content. - // require.Equal(t, expectedLogs, sink.AllLogs()) + require.Equal(t, expectedLogs, sink.AllLogs()) + for i, expectedLog := range expectedLogs { + require.NoError(t, plogtest.CompareLogs(expectedLog, sink.AllLogs()[i]), plogtest.IgnoreObservedTimestamp(), plogtest.IgnoreTimestamp()) + } require.NoError(t, rcvr.Shutdown(context.Background())) } @@ -169,12 +167,6 @@ func (rt *rotationTest) Run(t *testing.T) { // Build expected outputs expectedTimestamp, _ := time.ParseInLocation("2006-01-02", "2020-08-25", time.Local) - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - - var wg sync.WaitGroup - wg.Add(1) - go consumeNLogsFromConverter(converter.OutChannel(), numLogs, &wg) rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") @@ -186,6 +178,8 @@ func (rt *rotationTest) Run(t *testing.T) { }() require.NoError(t, err) + expectedLogs := []plog.Logs{} + for i := 0; i < numLogs; i++ { if (i+1)%maxLinesPerFile == 0 { if rt.copyTruncate { @@ -227,7 +221,8 @@ func (rt *rotationTest) Run(t *testing.T) { e := entry.New() e.Timestamp = expectedTimestamp require.NoError(t, e.Set(entry.NewBodyField("msg"), msg)) - require.NoError(t, converter.Batch([]*entry.Entry{e})) + + expectedLogs = append(expectedLogs, adapter.ConvertEntries([]*entry.Entry{e})) // ... and write the logs lines to the actual file consumed by receiver. _, err := file.WriteString(fmt.Sprintf("2020-08-25 %s\n", msg)) @@ -235,15 +230,13 @@ func (rt *rotationTest) Run(t *testing.T) { time.Sleep(time.Millisecond) } - wg.Wait() require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, 10*time.Millisecond, "expected %d but got %d logs", numLogs, sink.LogRecordCount(), ) // TODO: Figure out a nice way to assert each logs entry content. - // require.Equal(t, expectedLogs, sink.AllLogs()) + require.Equal(t, expectedLogs, sink.AllLogs()) require.NoError(t, rcvr.Shutdown(context.Background())) - converter.Stop() } func consumeNLogsFromConverter(ch <-chan plog.Logs, count int, wg *sync.WaitGroup) { diff --git a/receiver/filelogreceiver/go.mod b/receiver/filelogreceiver/go.mod index a8a2b2f03516..487944703c91 100644 --- a/receiver/filelogreceiver/go.mod +++ b/receiver/filelogreceiver/go.mod @@ -19,6 +19,7 @@ require ( ) require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.113.0 go.opentelemetry.io/collector/consumer/consumertest v0.113.0 go.opentelemetry.io/collector/pipeline v0.113.0 go.opentelemetry.io/collector/receiver/receivertest v0.113.0 @@ -50,6 +51,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.113.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/valyala/fastjson v1.6.4 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.113.0 // indirect diff --git a/receiver/namedpipereceiver/namedpipe_test.go b/receiver/namedpipereceiver/namedpipe_test.go index c7a4f25bea80..a13bc83e28c6 100644 --- a/receiver/namedpipereceiver/namedpipe_test.go +++ b/receiver/namedpipereceiver/namedpipe_test.go @@ -55,10 +55,6 @@ func TestReadPipe(t *testing.T) { sink := new(consumertest.LogsSink) cfg := testdataConfigYaml() - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - defer converter.Stop() - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) From 70ac3a5d09f9382cdd9a000fad8331d058ef1bcf Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 11 Nov 2024 08:08:19 +0100 Subject: [PATCH 02/12] enable adapt log entry verification Signed-off-by: Florian Bacher --- receiver/filelogreceiver/filelog_test.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 5fde73fe232a..b09209c5e11d 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -94,9 +94,12 @@ func TestReadStaticFile(t *testing.T) { queueEntry := func(t *testing.T, msg string, severity entry.Severity) { e := entry.New() e.Timestamp = expectedTimestamp - require.NoError(t, e.Set(entry.NewBodyField("msg"), msg)) + e.Body = fmt.Sprintf("2020-08-25 %s %s", severity.String(), msg) e.Severity = severity - e.AddAttribute("file_name", "simple.log") + e.AddAttribute("log.file.name", "simple.log") + e.AddAttribute("time", "2020-08-25") + e.AddAttribute("sev", severity.String()) + e.AddAttribute("msg", msg) entries = append(entries, e) } queueEntry(t, "Something routine", entry.Info) @@ -113,10 +116,16 @@ func TestReadStaticFile(t *testing.T) { "expected %d but got %d logs", 3, sink.LogRecordCount(), ) - // TODO: Figure out a nice way to assert each logs entry content. - require.Equal(t, expectedLogs, sink.AllLogs()) + for i, expectedLog := range expectedLogs { - require.NoError(t, plogtest.CompareLogs(expectedLog, sink.AllLogs()[i]), plogtest.IgnoreObservedTimestamp(), plogtest.IgnoreTimestamp()) + require.NoError(t, + plogtest.CompareLogs( + expectedLog, + sink.AllLogs()[i], + plogtest.IgnoreObservedTimestamp(), + plogtest.IgnoreTimestamp(), + ), + ) } require.NoError(t, rcvr.Shutdown(context.Background())) } From 2068e442e465af79d296b637e26bbac2e3ed30bf Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 11 Nov 2024 08:44:06 +0100 Subject: [PATCH 03/12] fix linting Signed-off-by: Florian Bacher --- pkg/stanza/adapter/converter_test.go | 10 +--------- receiver/filelogreceiver/filelog_test.go | 22 ++++------------------ 2 files changed, 5 insertions(+), 27 deletions(-) diff --git a/pkg/stanza/adapter/converter_test.go b/pkg/stanza/adapter/converter_test.go index 5bcd9b6420ed..52f5ffba09c6 100644 --- a/pkg/stanza/adapter/converter_test.go +++ b/pkg/stanza/adapter/converter_test.go @@ -10,13 +10,10 @@ import ( "testing" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap/zaptest" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) func BenchmarkConvertSimple(b *testing.B) { @@ -385,9 +382,6 @@ func TestAllConvertedEntriesScopeGrouping(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) { t.Parallel() - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - entries := complexEntriesForNDifferentHostsMDifferentScopes(100, 1, tc.numberOFScopes) pLogs := ConvertEntries(entries) @@ -803,8 +797,6 @@ func BenchmarkConverter(b *testing.B) { for _, wc := range workerCounts { b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) { for i := 0; i < b.N; i++ { - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(b) b.ReportAllocs() diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index b09209c5e11d..199aa9ddee48 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -12,7 +12,6 @@ import ( "os" "path/filepath" "runtime" - "sync" "sync/atomic" "testing" "time" @@ -91,7 +90,7 @@ func TestReadStaticFile(t *testing.T) { // Build the expected set by using adapter.Converter to translate entries // to pdata Logs. entries := []*entry.Entry{} - queueEntry := func(t *testing.T, msg string, severity entry.Severity) { + queueEntry := func(msg string, severity entry.Severity) { e := entry.New() e.Timestamp = expectedTimestamp e.Body = fmt.Sprintf("2020-08-25 %s %s", severity.String(), msg) @@ -102,9 +101,9 @@ func TestReadStaticFile(t *testing.T) { e.AddAttribute("msg", msg) entries = append(entries, e) } - queueEntry(t, "Something routine", entry.Info) - queueEntry(t, "Something bad happened!", entry.Error) - queueEntry(t, "Some details...", entry.Debug) + queueEntry("Something routine", entry.Info) + queueEntry("Something bad happened!", entry.Error) + queueEntry("Some details...", entry.Debug) expectedLogs = append(expectedLogs, adapter.ConvertEntries(entries)) @@ -248,19 +247,6 @@ func (rt *rotationTest) Run(t *testing.T) { require.NoError(t, rcvr.Shutdown(context.Background())) } -func consumeNLogsFromConverter(ch <-chan plog.Logs, count int, wg *sync.WaitGroup) { - defer wg.Done() - - n := 0 - for pLog := range ch { - n += pLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len() - - if n == count { - return - } - } -} - func expectNLogs(sink *consumertest.LogsSink, expected int) func() bool { return func() bool { return sink.LogRecordCount() == expected } } From 6f45fed248c717b3fb83c7064f257dca0bbd66f8 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 11 Nov 2024 09:08:16 +0100 Subject: [PATCH 04/12] fix linting Signed-off-by: Florian Bacher --- pkg/stanza/adapter/converter.go | 3 ++- pkg/stanza/adapter/converter_test.go | 3 ++- receiver/filelogreceiver/filelog_test.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/stanza/adapter/converter.go b/pkg/stanza/adapter/converter.go index c42c0a458400..a81fd8f00a42 100644 --- a/pkg/stanza/adapter/converter.go +++ b/pkg/stanza/adapter/converter.go @@ -11,9 +11,10 @@ import ( "sync" "github.com/cespare/xxhash/v2" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) func ConvertEntries(entries []*entry.Entry) plog.Logs { diff --git a/pkg/stanza/adapter/converter_test.go b/pkg/stanza/adapter/converter_test.go index 52f5ffba09c6..ff49fd99754f 100644 --- a/pkg/stanza/adapter/converter_test.go +++ b/pkg/stanza/adapter/converter_test.go @@ -10,10 +10,11 @@ import ( "testing" "time" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) func BenchmarkConvertSimple(b *testing.B) { diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 199aa9ddee48..f21224c2dd87 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" "io" "os" "path/filepath" @@ -27,6 +26,7 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" From 3217a1313144f28730e5bc89c336a6e6829c0d84 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 11 Nov 2024 09:54:00 +0100 Subject: [PATCH 05/12] fix unit test Signed-off-by: Florian Bacher --- receiver/filelogreceiver/filelog_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index f21224c2dd87..843d0257d8f3 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -243,7 +243,7 @@ func (rt *rotationTest) Run(t *testing.T) { numLogs, sink.LogRecordCount(), ) // TODO: Figure out a nice way to assert each logs entry content. - require.Equal(t, expectedLogs, sink.AllLogs()) + // require.Equal(t, expectedLogs, sink.AllLogs()) require.NoError(t, rcvr.Shutdown(context.Background())) } From 9588aeb8581efafcc1f6ee16f8e2aac1beacb039 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 11 Nov 2024 11:43:00 +0100 Subject: [PATCH 06/12] fix unit tests Signed-off-by: Florian Bacher --- receiver/filelogreceiver/filelog_test.go | 41 ++++++++++++++++++++---- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 843d0257d8f3..c9bc35158f42 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "go.opentelemetry.io/collector/pdata/pcommon" "io" "os" "path/filepath" @@ -186,8 +187,7 @@ func (rt *rotationTest) Run(t *testing.T) { }() require.NoError(t, err) - expectedLogs := []plog.Logs{} - + expectedLogEntries := []*entry.Entry{} for i := 0; i < numLogs; i++ { if (i+1)%maxLinesPerFile == 0 { if rt.copyTruncate { @@ -228,22 +228,51 @@ func (rt *rotationTest) Run(t *testing.T) { // Build the expected set by converting entries to pdata Logs... e := entry.New() e.Timestamp = expectedTimestamp - require.NoError(t, e.Set(entry.NewBodyField("msg"), msg)) + e.Body = fmt.Sprintf("2020-08-25 %s", msg) + e.AddAttribute("ts", "2020-08-25") + e.AddAttribute("msg", msg) + e.ObservedTimestamp = expectedTimestamp - expectedLogs = append(expectedLogs, adapter.ConvertEntries([]*entry.Entry{e})) + expectedLogEntries = append(expectedLogEntries, e) // ... and write the logs lines to the actual file consumed by receiver. _, err := file.WriteString(fmt.Sprintf("2020-08-25 %s\n", msg)) require.NoError(t, err) time.Sleep(time.Millisecond) } + expectedLogs := adapter.ConvertEntries(expectedLogEntries) require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, 10*time.Millisecond, "expected %d but got %d logs", numLogs, sink.LogRecordCount(), ) - // TODO: Figure out a nice way to assert each logs entry content. - // require.Equal(t, expectedLogs, sink.AllLogs()) + + // gather all Log entries into one ResourceLog so they can be compared with the expectedLogs ResourceLog + allReceivedLogs := plog.NewResourceLogs() + sink.AllLogs()[0].ResourceLogs().At(0).CopyTo(allReceivedLogs) + + for i, l := range sink.AllLogs() { + if i == 0 { + continue + } + for i := 0; i < l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len(); i++ { + lr := allReceivedLogs.ScopeLogs().At(0).LogRecords().AppendEmpty() + + l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).CopyTo(lr) + } + } + + // override observed timestamp attributes here as the plogtest.COmpareResourceLogs does not provide the IgnoreObservedTimestamp option + for i := 0; i < allReceivedLogs.ScopeLogs().At(0).LogRecords().Len(); i++ { + allReceivedLogs.ScopeLogs().At(0).LogRecords().At(i).SetObservedTimestamp(pcommon.NewTimestampFromTime(expectedTimestamp)) + } + require.NoError( + t, + plogtest.CompareResourceLogs( + expectedLogs.ResourceLogs().At(0), + allReceivedLogs, + ), + ) require.NoError(t, rcvr.Shutdown(context.Background())) } From 522d1d4fd856a9816056fa0f56fabccb2e679d44 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 11 Nov 2024 12:03:51 +0100 Subject: [PATCH 07/12] fix unit tests Signed-off-by: Florian Bacher --- receiver/filelogreceiver/filelog_test.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index c9bc35158f42..fd50e8f5e304 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "go.opentelemetry.io/collector/pdata/pcommon" "io" "os" "path/filepath" @@ -231,7 +230,6 @@ func (rt *rotationTest) Run(t *testing.T) { e.Body = fmt.Sprintf("2020-08-25 %s", msg) e.AddAttribute("ts", "2020-08-25") e.AddAttribute("msg", msg) - e.ObservedTimestamp = expectedTimestamp expectedLogEntries = append(expectedLogEntries, e) @@ -248,29 +246,27 @@ func (rt *rotationTest) Run(t *testing.T) { ) // gather all Log entries into one ResourceLog so they can be compared with the expectedLogs ResourceLog - allReceivedLogs := plog.NewResourceLogs() - sink.AllLogs()[0].ResourceLogs().At(0).CopyTo(allReceivedLogs) + allReceivedLogs := plog.NewLogs() + sink.AllLogs()[0].CopyTo(allReceivedLogs) for i, l := range sink.AllLogs() { if i == 0 { continue } for i := 0; i < l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len(); i++ { - lr := allReceivedLogs.ScopeLogs().At(0).LogRecords().AppendEmpty() + lr := allReceivedLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty() l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).CopyTo(lr) } } - // override observed timestamp attributes here as the plogtest.COmpareResourceLogs does not provide the IgnoreObservedTimestamp option - for i := 0; i < allReceivedLogs.ScopeLogs().At(0).LogRecords().Len(); i++ { - allReceivedLogs.ScopeLogs().At(0).LogRecords().At(i).SetObservedTimestamp(pcommon.NewTimestampFromTime(expectedTimestamp)) - } require.NoError( t, - plogtest.CompareResourceLogs( - expectedLogs.ResourceLogs().At(0), + plogtest.CompareLogs( + expectedLogs, allReceivedLogs, + plogtest.IgnoreLogRecordsOrder(), + plogtest.IgnoreObservedTimestamp(), ), ) require.NoError(t, rcvr.Shutdown(context.Background())) From f7dd387be2bcaa243c56a8224c3019cab92f657d Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 11 Nov 2024 13:11:52 +0100 Subject: [PATCH 08/12] remove previously introduced log record verification Signed-off-by: Florian Bacher --- receiver/filelogreceiver/filelog_test.go | 40 ++---------------------- 1 file changed, 2 insertions(+), 38 deletions(-) diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index fd50e8f5e304..508c7de546cb 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -173,9 +173,6 @@ func (rt *rotationTest) Run(t *testing.T) { fileName := filepath.Join(tempDir, "test.log") backupFileName := filepath.Join(tempDir, "test-backup.log") - // Build expected outputs - expectedTimestamp, _ := time.ParseInLocation("2006-01-02", "2020-08-25", time.Local) - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) @@ -186,7 +183,6 @@ func (rt *rotationTest) Run(t *testing.T) { }() require.NoError(t, err) - expectedLogEntries := []*entry.Entry{} for i := 0; i < numLogs; i++ { if (i+1)%maxLinesPerFile == 0 { if rt.copyTruncate { @@ -224,51 +220,19 @@ func (rt *rotationTest) Run(t *testing.T) { msg := fmt.Sprintf("This is a simple log line with the number %3d", i) - // Build the expected set by converting entries to pdata Logs... - e := entry.New() - e.Timestamp = expectedTimestamp - e.Body = fmt.Sprintf("2020-08-25 %s", msg) - e.AddAttribute("ts", "2020-08-25") - e.AddAttribute("msg", msg) - - expectedLogEntries = append(expectedLogEntries, e) - // ... and write the logs lines to the actual file consumed by receiver. _, err := file.WriteString(fmt.Sprintf("2020-08-25 %s\n", msg)) require.NoError(t, err) time.Sleep(time.Millisecond) } - expectedLogs := adapter.ConvertEntries(expectedLogEntries) require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, 10*time.Millisecond, "expected %d but got %d logs", numLogs, sink.LogRecordCount(), ) - // gather all Log entries into one ResourceLog so they can be compared with the expectedLogs ResourceLog - allReceivedLogs := plog.NewLogs() - sink.AllLogs()[0].CopyTo(allReceivedLogs) - - for i, l := range sink.AllLogs() { - if i == 0 { - continue - } - for i := 0; i < l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len(); i++ { - lr := allReceivedLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty() - - l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).CopyTo(lr) - } - } - - require.NoError( - t, - plogtest.CompareLogs( - expectedLogs, - allReceivedLogs, - plogtest.IgnoreLogRecordsOrder(), - plogtest.IgnoreObservedTimestamp(), - ), - ) + // TODO: Figure out a nice way to assert each logs entry content. + // require.Equal(t, expectedLogs, sink.AllLogs()) require.NoError(t, rcvr.Shutdown(context.Background())) } From 991a3466ee5c98e3a88813a7b1af79b2ff7d19c1 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 12 Nov 2024 08:20:30 +0100 Subject: [PATCH 09/12] fix linting error Signed-off-by: Florian Bacher --- receiver/filelogreceiver/filelog_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 187280a0217f..09e4fbd35830 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -12,7 +12,6 @@ import ( "path/filepath" "runtime" "strconv" - "sync" "sync/atomic" "testing" "time" From c8b723ef3c9a8b58bea1a12897f7476844239eb6 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 19 Nov 2024 09:41:38 +0100 Subject: [PATCH 10/12] go mod tidy Signed-off-by: Florian Bacher --- receiver/filelogreceiver/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/filelogreceiver/go.mod b/receiver/filelogreceiver/go.mod index bc48bbc5432c..85c4ed82ed62 100644 --- a/receiver/filelogreceiver/go.mod +++ b/receiver/filelogreceiver/go.mod @@ -52,7 +52,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.113.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.114.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/valyala/fastjson v1.6.4 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.114.0 // indirect From abca2b581cc75883af0e7eb31ef9b46e9adac0f2 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 19 Nov 2024 10:31:30 +0100 Subject: [PATCH 11/12] fix linting Signed-off-by: Florian Bacher --- pkg/stanza/adapter/converter_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/stanza/adapter/converter_test.go b/pkg/stanza/adapter/converter_test.go index e55e50a85c6e..a527e3bddb84 100644 --- a/pkg/stanza/adapter/converter_test.go +++ b/pkg/stanza/adapter/converter_test.go @@ -398,7 +398,6 @@ func TestAllConvertedEntriesScopeGrouping(t *testing.T) { require.Equal(t, sl.Scope().Name(), fmt.Sprintf("scope-%d", i%tc.numberOFScopes)) require.Equal(t, sl.LogRecords().Len(), tc.logsPerScope) } - }) } } @@ -800,7 +799,6 @@ func BenchmarkConverter(b *testing.B) { for _, wc := range workerCounts { b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) { for i := 0; i < b.N; i++ { - b.ReportAllocs() for from := 0; from < entryCount; from += int(batchSize) { From 44093b326ffc5e01d4bb35bd8af4bf93cbac67f7 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 13:06:20 +0100 Subject: [PATCH 12/12] go mod tidy Signed-off-by: Florian Bacher --- receiver/filelogreceiver/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/filelogreceiver/go.mod b/receiver/filelogreceiver/go.mod index e4c0e287a261..43e705cd1e03 100644 --- a/receiver/filelogreceiver/go.mod +++ b/receiver/filelogreceiver/go.mod @@ -52,7 +52,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.114.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.115.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/valyala/fastjson v1.6.4 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect