From 2f70759e3f2bed80f01c3481c402ce567d7d66a5 Mon Sep 17 00:00:00 2001 From: Raj Date: Thu, 28 Sep 2023 16:04:01 +0530 Subject: [PATCH 01/10] feat: logsparsing previews: add skeleton for inmemory exporter --- .../preview/inmemoryexporter/config.go | 17 +++ .../preview/inmemoryexporter/config_test.go | 46 ++++++ .../preview/inmemoryexporter/exporter.go | 134 ++++++++++++++++++ .../preview/inmemoryexporter/factory.go | 46 ++++++ 4 files changed, 243 insertions(+) create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config.go create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config_test.go create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config.go new file mode 100644 index 0000000000..c2156c7520 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config.go @@ -0,0 +1,17 @@ +package inmemoryexporter + +import "fmt" + +type Config struct { + // Unique id for the exporter. + // Useful for examining received data without having access + // to the exact exporter instance. + Id string `mapstructure:"id"` +} + +func (c *Config) Validate() error { + if len(c.Id) < 1 { + return fmt.Errorf("inmemory exporter: id is required") + } + return nil +} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config_test.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config_test.go new file mode 100644 index 0000000000..753787a024 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config_test.go @@ -0,0 +1,46 @@ +package inmemoryexporter + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" +) + +func TestValidate(t *testing.T) { + tests := []struct { + name string + rawConf *confmap.Conf + errorExpected bool + }{ + { + name: "with id", + rawConf: confmap.NewFromStringMap(map[string]interface{}{ + "id": "test_exporter", + }), + errorExpected: false, + }, + { + name: "without id", + rawConf: confmap.NewFromStringMap(map[string]interface{}{}), + errorExpected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + err := component.UnmarshalConfig(tt.rawConf, cfg) + require.NoError(t, err, "could not UnmarshalConfig") + + err = component.ValidateConfig(cfg) + if tt.errorExpected { + require.NotNilf(t, err, "Invalid config did not return validation error: %v", cfg) + } else { + require.NoErrorf(t, err, "Valid config returned validation error: %v", cfg) + } + }) + } +} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go new file mode 100644 index 0000000000..202b9b6c30 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go @@ -0,0 +1,134 @@ +package inmemoryexporter + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +/* An in-memory exporter for use in testing and previewing log pipelines.*/ + +// InMemoryExporter is an in-memory exporter that can be used for testing. +// It implements component.TracesExporter, component.MetricsExporter and component.LogsExporter interfaces. +type InMemoryExporter struct { + // Unique identifier for the exporter. + id string + // mu protects the below fields. + mu sync.Mutex + // traces is a slice of pdata.Traces that were received by this exporter. + traces []ptrace.Traces + // metrics is a slice of pdata.Metrics that were received by this exporter. + metrics []pmetric.Metrics + // logs is a slice of pdata.Logs that were received by this exporter. + logs []plog.Logs +} + +// Keep track of all exporter instances in the process. +// exporters add themselves to this map when `Start`ed and remove +// themselves when `Stop`ped. +// Useful for getting a hold of the exporter in scenarios where one doesn't +// create the instances. Eg: bringing up a collector service from collector config +var allExporterInstances map[string]*InMemoryExporter + +func init() { + allExporterInstances = make(map[string]*InMemoryExporter) +} + +func (e *InMemoryExporter) Start(ctx context.Context, host component.Host) error { + allExporterInstances[e.id] = e + return nil +} + +func (e *InMemoryExporter) Shutdown(ctx context.Context) error { + delete(allExporterInstances, e.id) + return nil +} + +func GetExporterInstance(id string) *InMemoryExporter { + return allExporterInstances[id] +} + +// Rest of InMemoryExporter functions + +func (e *InMemoryExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.traces = append(e.traces, td) + return nil +} + +// ConsumeMetrics implements component.MetricsExporter. +func (e *InMemoryExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.metrics = append(e.metrics, md) + return nil +} + +// ConsumeLogs implements component.LogsExporter. +func (e *InMemoryExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.logs = append(e.logs, ld) + return nil +} + +// GetTraces returns a slice of pdata.Traces that were received by this exporter. +func (e *InMemoryExporter) GetTraces() []ptrace.Traces { + e.mu.Lock() + defer e.mu.Unlock() + + return e.traces +} + +// GetMetrics returns a slice of pdata.Metrics that were received by this exporter. +func (e *InMemoryExporter) GetMetrics() []pmetric.Metrics { + e.mu.Lock() + defer e.mu.Unlock() + + return e.metrics +} + +// GetLogs returns a slice of pdata.Logs that were received by this exporter. +func (e *InMemoryExporter) GetLogs() []plog.Logs { + e.mu.Lock() + defer e.mu.Unlock() + + return e.logs +} + +// ResetTraces removes all traces that were received by this exporter. +func (e *InMemoryExporter) ResetTraces() { + e.mu.Lock() + defer e.mu.Unlock() + + e.traces = nil +} + +// ResetMetrics removes all metrics that were received by this exporter. +func (e *InMemoryExporter) ResetMetrics() { + e.mu.Lock() + defer e.mu.Unlock() + + e.metrics = nil +} + +// ResetLogs removes all logs that were received by this exporter. +func (e *InMemoryExporter) ResetLogs() { + e.mu.Lock() + defer e.mu.Unlock() + + e.logs = nil +} + +func (e *InMemoryExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go new file mode 100644 index 0000000000..eaf0af503f --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go @@ -0,0 +1,46 @@ +package inmemoryexporter + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" +) + +func createDefaultConfig() component.Config { + return &Config{} +} + +func createTracesExporter( + _ context.Context, _ exporter.CreateSettings, config component.Config, +) (exporter.Traces, error) { + return &InMemoryExporter{ + id: config.(*Config).Id, + }, nil +} + +func createMetricsExporter( + _ context.Context, _ exporter.CreateSettings, config component.Config, +) (exporter.Metrics, error) { + return &InMemoryExporter{ + id: config.(*Config).Id, + }, nil +} + +func createLogsExporter( + _ context.Context, _ exporter.CreateSettings, config component.Config, +) (exporter.Logs, error) { + return &InMemoryExporter{ + id: config.(*Config).Id, + }, nil +} + +// NewFactory creates a new OTLP receiver factory. +func NewFactory() exporter.Factory { + return exporter.NewFactory( + "memory", + createDefaultConfig, + exporter.WithTraces(createTracesExporter, component.StabilityLevelStable), + exporter.WithMetrics(createMetricsExporter, component.StabilityLevelStable), + exporter.WithLogs(createLogsExporter, component.StabilityLevelBeta)) +} From 0e1a8618d1b8674d48ab3ec525854511786613ad Mon Sep 17 00:00:00 2001 From: Raj Date: Thu, 28 Sep 2023 16:07:50 +0530 Subject: [PATCH 02/10] feat: logsparsing previews: add inmemory exporter factory tests --- .../preview/inmemoryexporter/config_test.go | 6 ++- .../preview/inmemoryexporter/factory.go | 15 +++++- .../preview/inmemoryexporter/factory_test.go | 50 +++++++++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory_test.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config_test.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config_test.go index 753787a024..29749757dc 100644 --- a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config_test.go +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config_test.go @@ -22,8 +22,10 @@ func TestValidate(t *testing.T) { errorExpected: false, }, { - name: "without id", - rawConf: confmap.NewFromStringMap(map[string]interface{}{}), + name: "empty id", + rawConf: confmap.NewFromStringMap(map[string]interface{}{ + "id": "", + }), errorExpected: true, }, } diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go index eaf0af503f..de502fca52 100644 --- a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go @@ -3,17 +3,24 @@ package inmemoryexporter import ( "context" + "github.com/google/uuid" + "github.com/pkg/errors" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" ) func createDefaultConfig() component.Config { - return &Config{} + return &Config{ + Id: uuid.NewString(), + } } func createTracesExporter( _ context.Context, _ exporter.CreateSettings, config component.Config, ) (exporter.Traces, error) { + if err := component.ValidateConfig(config); err != nil { + return nil, errors.Wrap(err, "invalid inmemory exporter config") + } return &InMemoryExporter{ id: config.(*Config).Id, }, nil @@ -22,6 +29,9 @@ func createTracesExporter( func createMetricsExporter( _ context.Context, _ exporter.CreateSettings, config component.Config, ) (exporter.Metrics, error) { + if err := component.ValidateConfig(config); err != nil { + return nil, errors.Wrap(err, "invalid inmemory exporter config") + } return &InMemoryExporter{ id: config.(*Config).Id, }, nil @@ -30,6 +40,9 @@ func createMetricsExporter( func createLogsExporter( _ context.Context, _ exporter.CreateSettings, config component.Config, ) (exporter.Logs, error) { + if err := component.ValidateConfig(config); err != nil { + return nil, errors.Wrap(err, "invalid inmemory exporter config") + } return &InMemoryExporter{ id: config.(*Config).Id, }, nil diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory_test.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory_test.go new file mode 100644 index 0000000000..256f2d7cea --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory_test.go @@ -0,0 +1,50 @@ +package inmemoryexporter + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestCreateMetricsExporter(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + me, err := factory.CreateMetricsExporter( + context.Background(), exporter.CreateSettings{}, cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, me) +} + +func TestCreateTracesExporter(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + te, err := factory.CreateTracesExporter( + context.Background(), exporter.CreateSettings{}, cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, te) +} + +func TestCreateLogsExporter(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + te, err := factory.CreateLogsExporter( + context.Background(), exporter.CreateSettings{}, cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, te) +} From b45045dc326dbf3244f1c6add06089720d9df71c Mon Sep 17 00:00:00 2001 From: Raj Date: Thu, 28 Sep 2023 16:40:51 +0530 Subject: [PATCH 03/10] feat: logsparsing previews: protect allExporterInstances with a lock --- .../preview/inmemoryexporter/exporter.go | 7 ++++++ .../preview/inmemoryexporter/exporter_test.go | 24 +++++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter_test.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go index 202b9b6c30..83eb5646fb 100644 --- a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go @@ -34,17 +34,24 @@ type InMemoryExporter struct { // Useful for getting a hold of the exporter in scenarios where one doesn't // create the instances. Eg: bringing up a collector service from collector config var allExporterInstances map[string]*InMemoryExporter +var allExportersLock sync.Mutex func init() { allExporterInstances = make(map[string]*InMemoryExporter) } func (e *InMemoryExporter) Start(ctx context.Context, host component.Host) error { + allExportersLock.Lock() + defer allExportersLock.Unlock() + allExporterInstances[e.id] = e return nil } func (e *InMemoryExporter) Shutdown(ctx context.Context) error { + allExportersLock.Lock() + defer allExportersLock.Unlock() + delete(allExporterInstances, e.id) return nil } diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter_test.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter_test.go new file mode 100644 index 0000000000..2ac2adc9d5 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter_test.go @@ -0,0 +1,24 @@ +package inmemoryexporter + +import "testing" + +func TestExporterLifecycle(t *testing.T) { + // There should be no exporter with an id before it is started. + + // Should be able to get a hold of the exporter after starting it. + + // Should not be able to start 2 exporters with the same id + t.Fatal("TODO(Raj): Implement in memory exporter tests") + + // Should not be able to get a hold of an exporter after shutdown + + // Should be able to start a new exporter with same id after shuttin + +} + +func TestLogsExporter(t *testing.T) { + // Should be able to wait for logs to arrive at the exporter + t.Fatal("TODO(Raj): Implement in memory logs exporter tests") + + // Should be able to wait with a timeout. +} From 6330bf802e6eace3a74125da2f6af1ff4f2a4cbb Mon Sep 17 00:00:00 2001 From: Raj Date: Thu, 28 Sep 2023 16:54:24 +0530 Subject: [PATCH 04/10] feat: logsparsing previews: strip down inmemory exporter to just a logs exporter --- .../preview/inmemoryexporter/config.go | 4 +- .../preview/inmemoryexporter/exporter.go | 141 ------------------ .../preview/inmemoryexporter/factory.go | 26 +--- .../preview/inmemoryexporter/factory_test.go | 22 --- .../preview/inmemoryexporter/logsexporter.go | 79 ++++++++++ ...{exporter_test.go => logsexporter_test.go} | 0 6 files changed, 82 insertions(+), 190 deletions(-) delete mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/logsexporter.go rename pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/{exporter_test.go => logsexporter_test.go} (100%) diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config.go index c2156c7520..9734147444 100644 --- a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config.go +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config.go @@ -4,8 +4,8 @@ import "fmt" type Config struct { // Unique id for the exporter. - // Useful for examining received data without having access - // to the exact exporter instance. + // Useful for getting a hold of the exporter in code that doesn't control + // its instantiation. Example: when instantiation happens in collector service Id string `mapstructure:"id"` } diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go deleted file mode 100644 index 83eb5646fb..0000000000 --- a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter.go +++ /dev/null @@ -1,141 +0,0 @@ -package inmemoryexporter - -import ( - "context" - "sync" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" -) - -/* An in-memory exporter for use in testing and previewing log pipelines.*/ - -// InMemoryExporter is an in-memory exporter that can be used for testing. -// It implements component.TracesExporter, component.MetricsExporter and component.LogsExporter interfaces. -type InMemoryExporter struct { - // Unique identifier for the exporter. - id string - // mu protects the below fields. - mu sync.Mutex - // traces is a slice of pdata.Traces that were received by this exporter. - traces []ptrace.Traces - // metrics is a slice of pdata.Metrics that were received by this exporter. - metrics []pmetric.Metrics - // logs is a slice of pdata.Logs that were received by this exporter. - logs []plog.Logs -} - -// Keep track of all exporter instances in the process. -// exporters add themselves to this map when `Start`ed and remove -// themselves when `Stop`ped. -// Useful for getting a hold of the exporter in scenarios where one doesn't -// create the instances. Eg: bringing up a collector service from collector config -var allExporterInstances map[string]*InMemoryExporter -var allExportersLock sync.Mutex - -func init() { - allExporterInstances = make(map[string]*InMemoryExporter) -} - -func (e *InMemoryExporter) Start(ctx context.Context, host component.Host) error { - allExportersLock.Lock() - defer allExportersLock.Unlock() - - allExporterInstances[e.id] = e - return nil -} - -func (e *InMemoryExporter) Shutdown(ctx context.Context) error { - allExportersLock.Lock() - defer allExportersLock.Unlock() - - delete(allExporterInstances, e.id) - return nil -} - -func GetExporterInstance(id string) *InMemoryExporter { - return allExporterInstances[id] -} - -// Rest of InMemoryExporter functions - -func (e *InMemoryExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - e.mu.Lock() - defer e.mu.Unlock() - - e.traces = append(e.traces, td) - return nil -} - -// ConsumeMetrics implements component.MetricsExporter. -func (e *InMemoryExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - e.mu.Lock() - defer e.mu.Unlock() - - e.metrics = append(e.metrics, md) - return nil -} - -// ConsumeLogs implements component.LogsExporter. -func (e *InMemoryExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - e.mu.Lock() - defer e.mu.Unlock() - - e.logs = append(e.logs, ld) - return nil -} - -// GetTraces returns a slice of pdata.Traces that were received by this exporter. -func (e *InMemoryExporter) GetTraces() []ptrace.Traces { - e.mu.Lock() - defer e.mu.Unlock() - - return e.traces -} - -// GetMetrics returns a slice of pdata.Metrics that were received by this exporter. -func (e *InMemoryExporter) GetMetrics() []pmetric.Metrics { - e.mu.Lock() - defer e.mu.Unlock() - - return e.metrics -} - -// GetLogs returns a slice of pdata.Logs that were received by this exporter. -func (e *InMemoryExporter) GetLogs() []plog.Logs { - e.mu.Lock() - defer e.mu.Unlock() - - return e.logs -} - -// ResetTraces removes all traces that were received by this exporter. -func (e *InMemoryExporter) ResetTraces() { - e.mu.Lock() - defer e.mu.Unlock() - - e.traces = nil -} - -// ResetMetrics removes all metrics that were received by this exporter. -func (e *InMemoryExporter) ResetMetrics() { - e.mu.Lock() - defer e.mu.Unlock() - - e.metrics = nil -} - -// ResetLogs removes all logs that were received by this exporter. -func (e *InMemoryExporter) ResetLogs() { - e.mu.Lock() - defer e.mu.Unlock() - - e.logs = nil -} - -func (e *InMemoryExporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go index de502fca52..6c703f705c 100644 --- a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go @@ -15,35 +15,13 @@ func createDefaultConfig() component.Config { } } -func createTracesExporter( - _ context.Context, _ exporter.CreateSettings, config component.Config, -) (exporter.Traces, error) { - if err := component.ValidateConfig(config); err != nil { - return nil, errors.Wrap(err, "invalid inmemory exporter config") - } - return &InMemoryExporter{ - id: config.(*Config).Id, - }, nil -} - -func createMetricsExporter( - _ context.Context, _ exporter.CreateSettings, config component.Config, -) (exporter.Metrics, error) { - if err := component.ValidateConfig(config); err != nil { - return nil, errors.Wrap(err, "invalid inmemory exporter config") - } - return &InMemoryExporter{ - id: config.(*Config).Id, - }, nil -} - func createLogsExporter( _ context.Context, _ exporter.CreateSettings, config component.Config, ) (exporter.Logs, error) { if err := component.ValidateConfig(config); err != nil { return nil, errors.Wrap(err, "invalid inmemory exporter config") } - return &InMemoryExporter{ + return &InMemoryLogsExporter{ id: config.(*Config).Id, }, nil } @@ -53,7 +31,5 @@ func NewFactory() exporter.Factory { return exporter.NewFactory( "memory", createDefaultConfig, - exporter.WithTraces(createTracesExporter, component.StabilityLevelStable), - exporter.WithMetrics(createMetricsExporter, component.StabilityLevelStable), exporter.WithLogs(createLogsExporter, component.StabilityLevelBeta)) } diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory_test.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory_test.go index 256f2d7cea..1a9481169a 100644 --- a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory_test.go +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory_test.go @@ -16,28 +16,6 @@ func TestCreateDefaultConfig(t *testing.T) { assert.NoError(t, componenttest.CheckConfigStruct(cfg)) } -func TestCreateMetricsExporter(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - - me, err := factory.CreateMetricsExporter( - context.Background(), exporter.CreateSettings{}, cfg, - ) - assert.NoError(t, err) - assert.NotNil(t, me) -} - -func TestCreateTracesExporter(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - - te, err := factory.CreateTracesExporter( - context.Background(), exporter.CreateSettings{}, cfg, - ) - assert.NoError(t, err) - assert.NotNil(t, te) -} - func TestCreateLogsExporter(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/logsexporter.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/logsexporter.go new file mode 100644 index 0000000000..9cd14c7bf5 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/logsexporter.go @@ -0,0 +1,79 @@ +package inmemoryexporter + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" +) + +/* An in-memory exporter for use in testing and previewing log pipelines.*/ +type InMemoryLogsExporter struct { + // Unique identifier for the exporter. + id string + // mu protects the data below + mu sync.Mutex + // slice of pdata.Logs that were received by this exporter. + logs []plog.Logs +} + +// ConsumeLogs implements component.LogsExporter. +func (e *InMemoryLogsExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.logs = append(e.logs, ld) + return nil +} + +// GetLogs returns a slice of pdata.Logs that were received by this exporter. +func (e *InMemoryLogsExporter) GetLogs() []plog.Logs { + e.mu.Lock() + defer e.mu.Unlock() + + return e.logs +} + +// ResetLogs removes all logs that were received by this exporter. +func (e *InMemoryLogsExporter) ResetLogs() { + e.mu.Lock() + defer e.mu.Unlock() + + e.logs = nil +} + +func (e *InMemoryLogsExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// Keep track of all exporter instances in the process. +// Useful for getting a hold of the exporter in scenarios where one doesn't +// create the instances. Eg: bringing up a collector service from collector config +var allExporterInstances map[string]*InMemoryLogsExporter +var allExportersLock sync.Mutex + +func init() { + allExporterInstances = make(map[string]*InMemoryLogsExporter) +} + +func (e *InMemoryLogsExporter) Start(ctx context.Context, host component.Host) error { + allExportersLock.Lock() + defer allExportersLock.Unlock() + + allExporterInstances[e.id] = e + return nil +} + +func (e *InMemoryLogsExporter) Shutdown(ctx context.Context) error { + allExportersLock.Lock() + defer allExportersLock.Unlock() + + delete(allExporterInstances, e.id) + return nil +} + +func GetExporterInstance(id string) *InMemoryLogsExporter { + return allExporterInstances[id] +} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter_test.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/logsexporter_test.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/exporter_test.go rename to pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/logsexporter_test.go From 663a6c346716758c3d78ebe3bee14624715a4756 Mon Sep 17 00:00:00 2001 From: Raj Date: Thu, 28 Sep 2023 17:11:48 +0530 Subject: [PATCH 05/10] feat: logsparsing previews: get in memory receiver started --- .../preview/inmemoryreceiver/config.go | 17 ++++++ .../preview/inmemoryreceiver/config_test.go | 48 ++++++++++++++++ .../preview/inmemoryreceiver/factory.go | 41 ++++++++++++++ .../preview/inmemoryreceiver/factory_test.go | 29 ++++++++++ .../preview/inmemoryreceiver/logsreceiver.go | 56 +++++++++++++++++++ .../inmemoryreceiver/logsreceiver_test.go | 17 ++++++ 6 files changed, 208 insertions(+) create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config.go create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config_test.go create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory.go create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory_test.go create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver.go create mode 100644 pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver_test.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config.go new file mode 100644 index 0000000000..5f1462e51b --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config.go @@ -0,0 +1,17 @@ +package inmemoryreceiver + +import "fmt" + +type Config struct { + // Unique id for the receiver. + // Useful for getting a hold of the receiver in code that doesn't control + // its instantiation. Example: when instantiation happens in collector service + Id string `mapstructure:"id"` +} + +func (c *Config) Validate() error { + if len(c.Id) < 1 { + return fmt.Errorf("inmemory receiver: id is required") + } + return nil +} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config_test.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config_test.go new file mode 100644 index 0000000000..a0daf71c45 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config_test.go @@ -0,0 +1,48 @@ +package inmemoryreceiver + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" +) + +func TestValidate(t *testing.T) { + tests := []struct { + name string + rawConf *confmap.Conf + errorExpected bool + }{ + { + name: "with id", + rawConf: confmap.NewFromStringMap(map[string]interface{}{ + "id": "test_receiver", + }), + errorExpected: false, + }, + { + name: "empty id", + rawConf: confmap.NewFromStringMap(map[string]interface{}{ + "id": "", + }), + errorExpected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + err := component.UnmarshalConfig(tt.rawConf, cfg) + require.NoError(t, err, "could not UnmarshalConfig") + + err = component.ValidateConfig(cfg) + if tt.errorExpected { + require.NotNilf(t, err, "Invalid config did not return validation error: %v", cfg) + } else { + require.NoErrorf(t, err, "Valid config returned validation error: %v", cfg) + } + }) + } +} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory.go new file mode 100644 index 0000000000..09f2e4b39d --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory.go @@ -0,0 +1,41 @@ +package inmemoryreceiver + +import ( + "context" + + "github.com/google/uuid" + "github.com/pkg/errors" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" +) + +func createDefaultConfig() component.Config { + return &Config{ + Id: uuid.NewString(), + } +} + +func createLogsReceiver( + _ context.Context, + _ receiver.CreateSettings, + config component.Config, + consumer consumer.Logs, +) (receiver.Logs, error) { + if err := component.ValidateConfig(config); err != nil { + return nil, errors.Wrap(err, "invalid inmemory receiver config") + } + return &InMemoryLogsReceiver{ + id: config.(*Config).Id, + nextConsumer: consumer, + }, nil + +} + +// NewFactory creates a new OTLP receiver factory. +func NewFactory() receiver.Factory { + return receiver.NewFactory( + "memory", + createDefaultConfig, + receiver.WithLogs(createLogsReceiver, component.StabilityLevelBeta)) +} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory_test.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory_test.go new file mode 100644 index 0000000000..7bdcd80bee --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory_test.go @@ -0,0 +1,29 @@ +package inmemoryreceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestCreateLogsReceiver(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + te, err := factory.CreateLogsReceiver( + context.Background(), receiver.CreateSettings{}, cfg, consumertest.NewNop(), + ) + assert.NoError(t, err) + assert.NotNil(t, te) +} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver.go new file mode 100644 index 0000000000..dd18012a8c --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver.go @@ -0,0 +1,56 @@ +package inmemoryreceiver + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" +) + +// In memory receivers for testing and simulation +type InMemoryLogsReceiver struct { + // Unique identifier for the receiver. + id string + + nextConsumer consumer.Logs +} + +func (r *InMemoryLogsReceiver) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + return r.nextConsumer.ConsumeLogs(ctx, ld) +} + +func (e *InMemoryLogsReceiver) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// Keep track of all receiver instances in the process. +// Useful for getting a hold of the receiver in scenarios where one doesn't +// create the instances. Eg: bringing up a collector service from collector config +var allReceiverInstances map[string]*InMemoryLogsReceiver +var allReceiversLock sync.Mutex + +func init() { + allReceiverInstances = make(map[string]*InMemoryLogsReceiver) +} + +func (e *InMemoryLogsReceiver) Start(ctx context.Context, host component.Host) error { + allReceiversLock.Lock() + defer allReceiversLock.Unlock() + + allReceiverInstances[e.id] = e + return nil +} + +func (e *InMemoryLogsReceiver) Shutdown(ctx context.Context) error { + allReceiversLock.Lock() + defer allReceiversLock.Unlock() + + delete(allReceiverInstances, e.id) + return nil +} + +func GetReceiverInstance(id string) *InMemoryLogsReceiver { + return allReceiverInstances[id] +} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver_test.go b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver_test.go new file mode 100644 index 0000000000..21a5cdbb13 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver_test.go @@ -0,0 +1,17 @@ +package inmemoryreceiver + +import "testing" + +func TestReceiverLifecycle(t *testing.T) { + // There should be no exporter with an id before it is started. + + // Should be able to get a hold of the exporter after starting it. + + // Should not be able to start 2 exporters with the same id + t.Fatal("TODO(Raj): Implement in memory exporter tests") + + // Should not be able to get a hold of an exporter after shutdown + + // Should be able to start a new exporter with same id after shuttin + +} From 5cf01cc47688fde0c2d9f8c39e56e9a916c72b2b Mon Sep 17 00:00:00 2001 From: Raj Date: Thu, 28 Sep 2023 20:56:23 +0530 Subject: [PATCH 06/10] feat: logsparsing previews: reorganize collector simulation files --- .../collectorsimulator/collectorsimulator.go | 16 ++++++++++++++++ .../collectorsimulator_test.go | 7 +++++++ .../inmemoryexporter/config.go | 0 .../inmemoryexporter/config_test.go | 0 .../inmemoryexporter/factory.go | 0 .../inmemoryexporter/factory_test.go | 0 .../inmemoryexporter/logsexporter.go | 0 .../inmemoryexporter/logsexporter_test.go | 0 .../inmemoryreceiver/config.go | 0 .../inmemoryreceiver/config_test.go | 0 .../inmemoryreceiver/factory.go | 0 .../inmemoryreceiver/factory_test.go | 0 .../inmemoryreceiver/logsreceiver.go | 0 .../inmemoryreceiver/logsreceiver_test.go | 0 14 files changed, 23 insertions(+) create mode 100644 pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go create mode 100644 pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator_test.go rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryexporter/config.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryexporter/config_test.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryexporter/factory.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryexporter/factory_test.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryexporter/logsexporter.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryexporter/logsexporter_test.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryreceiver/config.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryreceiver/config_test.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryreceiver/factory.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryreceiver/factory_test.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryreceiver/logsreceiver.go (100%) rename pkg/query-service/app/logparsingpipeline/{preview => collectorsimulator}/inmemoryreceiver/logsreceiver_test.go (100%) diff --git a/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go new file mode 100644 index 0000000000..59c615c745 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go @@ -0,0 +1,16 @@ +package collectorsimulator + +import ( + "go.opentelemetry.io/collector/pdata/plog" + "go.signoz.io/signoz/pkg/query-service/model" +) + +type SignozLog model.GetLogsResponse + +type CollectorConfProviderFn func(baseConfYaml []byte) ([]byte, error) + +func SimulateLogsProcessing(configProvider CollectorConfProviderFn, logs plog.Logs) ( + plog.Logs, error, +) { + return logs, nil +} diff --git a/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator_test.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator_test.go new file mode 100644 index 0000000000..9bc834e32a --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator_test.go @@ -0,0 +1,7 @@ +package collectorsimulator + +import "testing" + +func TestSimulateLogsProcessing(t *testing.T) { + t.Fatal("TODO(Raj): Implement simulateLogsProcessing test") +} diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/config.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/config.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config_test.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/config_test.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/config_test.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/config_test.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/factory.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/factory.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory_test.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/factory_test.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/factory_test.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/factory_test.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/logsexporter.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/logsexporter.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/logsexporter.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/logsexporter.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/logsexporter_test.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/logsexporter_test.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryexporter/logsexporter_test.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter/logsexporter_test.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/config.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/config.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config_test.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/config_test.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/config_test.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/config_test.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/factory.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/factory.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory_test.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/factory_test.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/factory_test.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/factory_test.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/logsreceiver.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/logsreceiver.go diff --git a/pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver_test.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/logsreceiver_test.go similarity index 100% rename from pkg/query-service/app/logparsingpipeline/preview/inmemoryreceiver/logsreceiver_test.go rename to pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver/logsreceiver_test.go From a3178d1355d2b63a14679d1da543b30653a75113 Mon Sep 17 00:00:00 2001 From: Raj Date: Thu, 28 Sep 2023 21:13:57 +0530 Subject: [PATCH 07/10] feat: logsparsing previews: extract helper for making collector conf with pipeline processors --- pkg/query-service/app/opamp/logspipeline.go | 72 +++++++++++++-------- 1 file changed, 45 insertions(+), 27 deletions(-) diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go index 9ad81fe77c..38c0290394 100644 --- a/pkg/query-service/app/opamp/logspipeline.go +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -40,41 +40,21 @@ func UpsertLogsParsingProcessor( for _, agent := range agents { config := agent.EffectiveConfig - c, err := yaml.Parser().Unmarshal([]byte(config)) - if err != nil { - return confHash, coreModel.BadRequest(err) - } - buildLogParsingProcessors(c, parsingProcessors) - - p, err := getOtelPipelinFromConfig(c) - if err != nil { - return confHash, coreModel.BadRequest(err) - } - if p.Pipelines.Logs == nil { - return confHash, coreModel.InternalError(fmt.Errorf( - "logs pipeline doesn't exist", - )) - } - - // build the new processor list - updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames) - p.Pipelines.Logs.Processors = updatedProcessorList - - // add the new processor to the data ( no checks required as the keys will exists) - c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs - - updatedConf, err := yaml.Parser().Marshal(c) - if err != nil { - return confHash, coreModel.BadRequest(err) + updatedConf, apiErr := GenerateCollectorConfigWithPipelines( + []byte(config), parsingProcessors, parsingProcessorsNames, + ) + if apiErr != nil { + return confHash, apiErr } // zap.S().Infof("sending new config", string(updatedConf)) hash := sha256.New() - _, err = hash.Write(updatedConf) + _, err := hash.Write(updatedConf) if err != nil { return confHash, coreModel.InternalError(err) } + agent.EffectiveConfig = string(updatedConf) err = agent.Upsert() if err != nil { @@ -104,6 +84,44 @@ func UpsertLogsParsingProcessor( return confHash, nil } +func GenerateCollectorConfigWithPipelines( + baseConfig []byte, + parsingProcessors map[string]interface{}, + parsingProcessorsNames []string, +) ([]byte, *coreModel.ApiError) { + + c, err := yaml.Parser().Unmarshal(baseConfig) + if err != nil { + return nil, coreModel.BadRequest(err) + } + + buildLogParsingProcessors(c, parsingProcessors) + + p, err := getOtelPipelinFromConfig(c) + if err != nil { + return nil, coreModel.BadRequest(err) + } + if p.Pipelines.Logs == nil { + return nil, coreModel.InternalError(fmt.Errorf( + "logs pipeline doesn't exist", + )) + } + + // build the new processor list + updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames) + p.Pipelines.Logs.Processors = updatedProcessorList + + // add the new processor to the data ( no checks required as the keys will exists) + c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs + + updatedConf, err := yaml.Parser().Marshal(c) + if err != nil { + return nil, coreModel.BadRequest(err) + } + + return updatedConf, nil +} + // check if the processors already exist // if yes then update the processor. // if something doesn't exists then remove it. From 2388aaf78eda9fa25dccafad6595d94fd0058775 Mon Sep 17 00:00:00 2001 From: Raj Date: Thu, 28 Sep 2023 21:34:25 +0530 Subject: [PATCH 08/10] feat: logsparsing previews: add pipelines preview test and wire up collectorsimulator --- .../collectorsimulator/collectorsimulator.go | 6 +- .../app/logparsingpipeline/preview.go | 161 ++++++++++++++++++ .../app/logparsingpipeline/preview_test.go | 155 +++++++++++++++++ 3 files changed, 319 insertions(+), 3 deletions(-) create mode 100644 pkg/query-service/app/logparsingpipeline/preview.go create mode 100644 pkg/query-service/app/logparsingpipeline/preview_test.go diff --git a/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go index 59c615c745..8702a14c0f 100644 --- a/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go +++ b/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go @@ -7,10 +7,10 @@ import ( type SignozLog model.GetLogsResponse -type CollectorConfProviderFn func(baseConfYaml []byte) ([]byte, error) +type CollectorConfGeneratorFn func(baseConfYaml []byte) ([]byte, error) -func SimulateLogsProcessing(configProvider CollectorConfProviderFn, logs plog.Logs) ( - plog.Logs, error, +func SimulateLogsProcessing(configProvider CollectorConfGeneratorFn, logs []plog.Logs) ( + []plog.Logs, *model.ApiError, ) { return logs, nil } diff --git a/pkg/query-service/app/logparsingpipeline/preview.go b/pkg/query-service/app/logparsingpipeline/preview.go new file mode 100644 index 0000000000..384b13ead2 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview.go @@ -0,0 +1,161 @@ +package logparsingpipeline + +import ( + "time" + + "github.com/pkg/errors" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline/collectorsimulator" + "go.signoz.io/signoz/pkg/query-service/app/opamp" + "go.signoz.io/signoz/pkg/query-service/model" +) + +// TODO(Raj): See if there is a more suitable Log struct +type SignozLog model.GetLogsResponse + +func SimulatePipelinesProcessing( + pipelines []Pipeline, + logs []SignozLog, +) ([]SignozLog, *model.ApiError) { + if len(pipelines) < 1 { + return logs, nil + } + + processors, procNames, err := PreparePipelineProcessor(pipelines) + if err != nil { + return nil, model.BadRequest(errors.Wrap( + err, "could not prepare otel processors for pipelines", + )) + } + + collectorConfGenerator := func( + baseConfYaml []byte, + ) ([]byte, error) { + return opamp.GenerateCollectorConfigWithPipelines( + baseConfYaml, processors, procNames, + ) + } + + plogs := toPlogs(logs) + + resultPlogs, apiErr := collectorsimulator.SimulateLogsProcessing( + collectorConfGenerator, plogs, + ) + if apiErr != nil { + return nil, apiErr + } + + siglogs := toSignozLogs(resultPlogs) + + return siglogs, nil +} + +func toPlogs(logs []SignozLog) []plog.Logs { + result := []plog.Logs{} + + for _, log := range logs { + pl := plog.NewLogs() + rl := pl.ResourceLogs().AppendEmpty() + + resourceAttribs := rl.Resource().Attributes() + for k, v := range log.Resources_string { + resourceAttribs.PutStr(k, v) + } + + scopeLog := rl.ScopeLogs().AppendEmpty() + slRecord := scopeLog.LogRecords().AppendEmpty() + + slRecord.SetTimestamp(pcommon.NewTimestampFromTime( + time.Unix(0, int64(log.Timestamp)), + )) + + var spanIdBuf [8]byte + copy(spanIdBuf[:], []byte(log.SpanID)) + slRecord.SetSpanID(spanIdBuf) + + var traceIdBuf [16]byte + copy(traceIdBuf[:], []byte(log.TraceID)) + slRecord.SetTraceID(traceIdBuf) + + slRecord.SetSeverityText(log.SeverityText) + slRecord.SetSeverityNumber(plog.SeverityNumber(log.SeverityNumber)) + + slRecord.Body().SetStr(log.Body) + + slAttribs := slRecord.Attributes() + for k, v := range log.Attributes_string { + slAttribs.PutStr(k, v) + } + for k, v := range log.Attributes_int64 { + slAttribs.PutInt(k, v) + } + for k, v := range log.Attributes_float64 { + slAttribs.PutDouble(k, v) + } + + result = append(result, pl) + } + + return result +} + +func toSignozLogs(plogs []plog.Logs) []SignozLog { + result := []SignozLog{} + + for _, pl := range plogs { + + resourceLogsSlice := pl.ResourceLogs() + for i := 0; i < resourceLogsSlice.Len(); i++ { + rl := resourceLogsSlice.At(i) + + scopeLogsSlice := rl.ScopeLogs() + for j := 0; j < scopeLogsSlice.Len(); j++ { + sl := scopeLogsSlice.At(j) + + lrSlice := sl.LogRecords() + for k := 0; k < lrSlice.Len(); k++ { + lr := lrSlice.At(k) + + signozLog := SignozLog{ + Timestamp: uint64(lr.Timestamp()), + TraceID: lr.TraceID().String(), + SpanID: lr.SpanID().String(), + SeverityText: lr.SeverityText(), + SeverityNumber: uint8(lr.SeverityNumber()), + Body: lr.Body().AsString(), + Resources_string: pMapToStrMap(rl.Resource().Attributes()), + Attributes_string: map[string]string{}, + Attributes_int64: map[string]int64{}, + Attributes_float64: map[string]float64{}, + } + + // Populate signozLog.Attributes_... + lr.Attributes().Range(func(k string, v pcommon.Value) bool { + if v.Type() == pcommon.ValueTypeDouble { + signozLog.Attributes_float64[k] = v.Double() + } else if v.Type() == pcommon.ValueTypeInt { + signozLog.Attributes_int64[k] = v.Int() + } else { + signozLog.Attributes_string[k] = v.AsString() + } + return true + }) + + result = append(result, signozLog) + } + } + } + } + + return result +} + +func pMapToStrMap(pMap pcommon.Map) map[string]string { + result := map[string]string{} + pMap.Range(func(k string, v pcommon.Value) bool { + result[k] = v.AsString() + return true + }) + return result +} diff --git a/pkg/query-service/app/logparsingpipeline/preview_test.go b/pkg/query-service/app/logparsingpipeline/preview_test.go new file mode 100644 index 0000000000..7f6952a91b --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview_test.go @@ -0,0 +1,155 @@ +package logparsingpipeline + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestPipelinePreview(t *testing.T) { + require := require.New(t) + + testPipelines := []Pipeline{ + { + OrderId: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "attributes.test", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, + { + OrderId: 2, + Name: "pipeline2", + Alias: "pipeline2", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "resource.test1", + Value: "val1", + Enabled: true, + Name: "test add2", + }, { + OrderId: 2, + ID: "add2", + Type: "add", + Field: "resource.test2", + Value: "val2", + Enabled: true, + Name: "test add3", + }, + }, + }, + } + + matchingLog := makeTestLogEntry( + "test log body", + map[string]string{ + "method": "GET", + }, + ) + nonMatchingLog := makeTestLogEntry( + "test log body", + map[string]string{ + "method": "POST", + }, + ) + + result, err := SimulatePipelinesProcessing( + testPipelines, + []SignozLog{ + matchingLog, + nonMatchingLog, + }, + ) + + require.Nil(err) + require.Equal(2, len(result)) + + // matching log should have been modified as expected. + require.NotEqual( + matchingLog.Attributes_string, + result[0].Attributes_string, + ) + testAttrValue := result[0].Attributes_string["test"] + require.NotNil(testAttrValue) + require.Equal( + testAttrValue, "val", + ) + + require.Equal(result[0].Resources_string, map[string]string{ + "test1": "val1", + "test2": "val2", + }) + + // non-matching log should not be touched. + require.Equal( + nonMatchingLog.Attributes_string, + result[1].Attributes_string, + ) + require.Equal( + nonMatchingLog.Resources_string, + result[1].Resources_string, + ) + +} + +func makeTestLogEntry( + body string, + attributes map[string]string, +) SignozLog { + return SignozLog{ + Timestamp: uint64(time.Now().UnixNano()), + Body: body, + Attributes_string: attributes, + Resources_string: map[string]string{}, + SeverityText: plog.SeverityNumberInfo.String(), + SeverityNumber: uint8(plog.SeverityNumberInfo), + SpanID: uuid.New().String(), + TraceID: uuid.New().String(), + } +} From 34c9e7fa2ec2d75a47cdc2afbf925f91fa7ccdd0 Mon Sep 17 00:00:00 2001 From: Raj Date: Fri, 29 Sep 2023 10:53:38 +0530 Subject: [PATCH 09/10] feat: logsparsing previews: opamp logpipelines account for no processors in agentConf --- pkg/query-service/app/opamp/logspipeline.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go index 38c0290394..ced11d5761 100644 --- a/pkg/query-service/app/opamp/logspipeline.go +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -126,7 +126,11 @@ func GenerateCollectorConfigWithPipelines( // if yes then update the processor. // if something doesn't exists then remove it. func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error { + if agentConf["processors"] == nil { + agentConf["processors"] = map[string]interface{}{} + } agentProcessors := agentConf["processors"].(map[string]interface{}) + exists := map[string]struct{}{} for key, params := range parsingProcessors { agentProcessors[key] = params From 0baf8450239fa2bba21ebaa50d38807171e2f07f Mon Sep 17 00:00:00 2001 From: Raj Date: Fri, 29 Sep 2023 10:54:03 +0530 Subject: [PATCH 10/10] feat: logsparsing previews: wire up simulation test e2e --- .../collectorsimulator/collectorsimulator.go | 152 +++++++++++++++++- .../app/logparsingpipeline/preview.go | 6 +- .../app/logparsingpipeline/preview_test.go | 3 + 3 files changed, 156 insertions(+), 5 deletions(-) diff --git a/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go b/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go index 8702a14c0f..e6026e1a52 100644 --- a/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go +++ b/pkg/query-service/app/logparsingpipeline/collectorsimulator/collectorsimulator.go @@ -1,16 +1,162 @@ package collectorsimulator import ( + "context" + "fmt" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/google/uuid" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor" + "github.com/pkg/errors" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/provider/yamlprovider" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/otelcol" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/service" + + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter" + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver" "go.signoz.io/signoz/pkg/query-service/model" ) type SignozLog model.GetLogsResponse -type CollectorConfGeneratorFn func(baseConfYaml []byte) ([]byte, error) +type CollectorConfGeneratorFn func(baseConfYaml []byte) ([]byte, *model.ApiError) -func SimulateLogsProcessing(configProvider CollectorConfGeneratorFn, logs []plog.Logs) ( +func SimulateLogsProcessing(ctx context.Context, generateConfig CollectorConfGeneratorFn, logs []plog.Logs) ( []plog.Logs, *model.ApiError, ) { - return logs, nil + // Factories for components usable in the simulation + receiverFactories, err := receiver.MakeFactoryMap(inmemoryreceiver.NewFactory()) + if err != nil { + return nil, model.InternalError(errors.Wrap(err, "could not create receiver factories.")) + } + processorFactories, err := processor.MakeFactoryMap(logstransformprocessor.NewFactory()) + if err != nil { + return nil, model.InternalError(errors.Wrap(err, "could not create processor factories.")) + } + exporterFactories, err := exporter.MakeFactoryMap(inmemoryexporter.NewFactory()) + if err != nil { + return nil, model.InternalError(errors.Wrap(err, "could not create processor factories.")) + } + factories := otelcol.Factories{ + Receivers: receiverFactories, + Processors: processorFactories, + Exporters: exporterFactories, + } + + // Prepare collector conf yaml for simulation + receiverId := uuid.NewString() + exporterId := uuid.NewString() + baseConf := makeBaseConfig(receiverId, exporterId) + + spew.Printf("\nbaseconf:\n%v\n", string(baseConf)) + + collectorConfYaml, apiErr := generateConfig([]byte(baseConf)) + if err != nil { + return nil, model.WrapApiError(apiErr, "could not generate collector config") + } + + spew.Printf("\ngenerated collector conf:\n%v\n", string(collectorConfYaml)) + + // Parse and validate collector conf + yamlP := yamlprovider.New() + confProvider, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + URIs: []string{"yaml:" + string(collectorConfYaml)}, + Providers: map[string]confmap.Provider{yamlP.Scheme(): yamlP}, + }, + }) + if err != nil { + return nil, model.BadRequest(errors.Wrap(err, "could not create config provider.")) + } + + collectorCfg, err := confProvider.Get(ctx, factories) + if err != nil { + return nil, model.BadRequest(errors.Wrap(err, "failed to parse collector config")) + } + if err = collectorCfg.Validate(); err != nil { + return nil, model.BadRequest(errors.Wrap(err, "invalid collector config")) + } + + // Build and start collector service. + + svcSettings := service.Settings{ + Receivers: receiver.NewBuilder(collectorCfg.Receivers, factories.Receivers), + Processors: processor.NewBuilder(collectorCfg.Processors, factories.Processors), + Exporters: exporter.NewBuilder(collectorCfg.Exporters, factories.Exporters), + Connectors: connector.NewBuilder(collectorCfg.Connectors, factories.Connectors), + Extensions: extension.NewBuilder(collectorCfg.Extensions, factories.Extensions), + } + collectorSvc, err := service.New(ctx, svcSettings, collectorCfg.Service) + + if err != nil { + return nil, model.InternalError(errors.Wrap(err, "could not instantiate collector service")) + } + if err = collectorSvc.Start(ctx); err != nil { + return nil, model.InternalError(errors.Wrap(err, "could not start collector service.")) + } + + // Do the simulation + receiver := inmemoryreceiver.GetReceiverInstance(receiverId) + if receiver == nil { + return nil, model.InternalError(fmt.Errorf("could not find in memory receiver")) + } + for _, plog := range logs { + receiver.ConsumeLogs(ctx, plog) + } + + time.Sleep(500 * time.Millisecond) + + //startTs := time.Now().Unix() + //for { + // if time.Now().Unix()-startTs > 1 { + // break + // } + // exportedLogs := exporter.(*inmemoryexporter.InMemoryExporter).GetLogs() + // if len(exportedLogs) > 0 { + // break + // } + // time.Sleep(10 * time.Millisecond) + //} + + exporter := inmemoryexporter.GetExporterInstance(exporterId) + if exporter == nil { + return nil, model.InternalError(fmt.Errorf("could not find in memory exporter")) + } + result := exporter.GetLogs() + + // Shut down the collector service. + if err := collectorSvc.Shutdown(ctx); err != nil { + return nil, model.InternalError(errors.Wrap(err, "could not shutdown the collector service")) + } + + return result, nil +} + +func makeBaseConfig(receiverId string, exporterId string) string { + return fmt.Sprintf(` +receivers: + memory: + id: %s +exporters: + memory: + id: %s +service: + pipelines: + logs: + receivers: + - memory + exporters: + - memory + telemetry: + metrics: + level: none +`, receiverId, exporterId) } diff --git a/pkg/query-service/app/logparsingpipeline/preview.go b/pkg/query-service/app/logparsingpipeline/preview.go index 384b13ead2..e5aac517e3 100644 --- a/pkg/query-service/app/logparsingpipeline/preview.go +++ b/pkg/query-service/app/logparsingpipeline/preview.go @@ -1,6 +1,7 @@ package logparsingpipeline import ( + "context" "time" "github.com/pkg/errors" @@ -15,6 +16,7 @@ import ( type SignozLog model.GetLogsResponse func SimulatePipelinesProcessing( + ctx context.Context, pipelines []Pipeline, logs []SignozLog, ) ([]SignozLog, *model.ApiError) { @@ -31,7 +33,7 @@ func SimulatePipelinesProcessing( collectorConfGenerator := func( baseConfYaml []byte, - ) ([]byte, error) { + ) ([]byte, *model.ApiError) { return opamp.GenerateCollectorConfigWithPipelines( baseConfYaml, processors, procNames, ) @@ -40,7 +42,7 @@ func SimulatePipelinesProcessing( plogs := toPlogs(logs) resultPlogs, apiErr := collectorsimulator.SimulateLogsProcessing( - collectorConfGenerator, plogs, + ctx, collectorConfGenerator, plogs, ) if apiErr != nil { return nil, apiErr diff --git a/pkg/query-service/app/logparsingpipeline/preview_test.go b/pkg/query-service/app/logparsingpipeline/preview_test.go index 7f6952a91b..cbf214c5ae 100644 --- a/pkg/query-service/app/logparsingpipeline/preview_test.go +++ b/pkg/query-service/app/logparsingpipeline/preview_test.go @@ -1,6 +1,7 @@ package logparsingpipeline import ( + "context" "testing" "time" @@ -99,7 +100,9 @@ func TestPipelinePreview(t *testing.T) { }, ) + ctx := context.Background() result, err := SimulatePipelinesProcessing( + ctx, testPipelines, []SignozLog{ matchingLog,