Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logsparsing pipeline previews in QS #3651

Closed
wants to merge 10 commits into from
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package collectorsimulator

import (
"context"
"fmt"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/google/uuid"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service"

"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryexporter"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline/collectorsimulator/inmemoryreceiver"
"go.signoz.io/signoz/pkg/query-service/model"
)

type SignozLog model.GetLogsResponse

type CollectorConfGeneratorFn func(baseConfYaml []byte) ([]byte, *model.ApiError)

func SimulateLogsProcessing(ctx context.Context, generateConfig CollectorConfGeneratorFn, logs []plog.Logs) (
[]plog.Logs, *model.ApiError,
) {
// Factories for components usable in the simulation
receiverFactories, err := receiver.MakeFactoryMap(inmemoryreceiver.NewFactory())
if err != nil {
return nil, model.InternalError(errors.Wrap(err, "could not create receiver factories."))
}
processorFactories, err := processor.MakeFactoryMap(logstransformprocessor.NewFactory())
if err != nil {
return nil, model.InternalError(errors.Wrap(err, "could not create processor factories."))
}
exporterFactories, err := exporter.MakeFactoryMap(inmemoryexporter.NewFactory())
if err != nil {
return nil, model.InternalError(errors.Wrap(err, "could not create processor factories."))
}
factories := otelcol.Factories{
Receivers: receiverFactories,
Processors: processorFactories,
Exporters: exporterFactories,
}

// Prepare collector conf yaml for simulation
receiverId := uuid.NewString()
exporterId := uuid.NewString()
baseConf := makeBaseConfig(receiverId, exporterId)

spew.Printf("\nbaseconf:\n%v\n", string(baseConf))

collectorConfYaml, apiErr := generateConfig([]byte(baseConf))
if err != nil {
return nil, model.WrapApiError(apiErr, "could not generate collector config")
}

spew.Printf("\ngenerated collector conf:\n%v\n", string(collectorConfYaml))

// Parse and validate collector conf
yamlP := yamlprovider.New()
confProvider, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{"yaml:" + string(collectorConfYaml)},
Providers: map[string]confmap.Provider{yamlP.Scheme(): yamlP},
},
})
if err != nil {
return nil, model.BadRequest(errors.Wrap(err, "could not create config provider."))
}

collectorCfg, err := confProvider.Get(ctx, factories)
if err != nil {
return nil, model.BadRequest(errors.Wrap(err, "failed to parse collector config"))
}
if err = collectorCfg.Validate(); err != nil {
return nil, model.BadRequest(errors.Wrap(err, "invalid collector config"))
}

// Build and start collector service.

svcSettings := service.Settings{
Receivers: receiver.NewBuilder(collectorCfg.Receivers, factories.Receivers),
Processors: processor.NewBuilder(collectorCfg.Processors, factories.Processors),
Exporters: exporter.NewBuilder(collectorCfg.Exporters, factories.Exporters),
Connectors: connector.NewBuilder(collectorCfg.Connectors, factories.Connectors),
Extensions: extension.NewBuilder(collectorCfg.Extensions, factories.Extensions),
}
collectorSvc, err := service.New(ctx, svcSettings, collectorCfg.Service)

if err != nil {
return nil, model.InternalError(errors.Wrap(err, "could not instantiate collector service"))
}
if err = collectorSvc.Start(ctx); err != nil {
return nil, model.InternalError(errors.Wrap(err, "could not start collector service."))
}

// Do the simulation
receiver := inmemoryreceiver.GetReceiverInstance(receiverId)
if receiver == nil {
return nil, model.InternalError(fmt.Errorf("could not find in memory receiver"))
}
for _, plog := range logs {
receiver.ConsumeLogs(ctx, plog)
}

time.Sleep(500 * time.Millisecond)

//startTs := time.Now().Unix()
//for {
// if time.Now().Unix()-startTs > 1 {
// break
// }
// exportedLogs := exporter.(*inmemoryexporter.InMemoryExporter).GetLogs()
// if len(exportedLogs) > 0 {
// break
// }
// time.Sleep(10 * time.Millisecond)
//}

exporter := inmemoryexporter.GetExporterInstance(exporterId)
if exporter == nil {
return nil, model.InternalError(fmt.Errorf("could not find in memory exporter"))
}
result := exporter.GetLogs()

// Shut down the collector service.
if err := collectorSvc.Shutdown(ctx); err != nil {
return nil, model.InternalError(errors.Wrap(err, "could not shutdown the collector service"))
}

return result, nil
}

func makeBaseConfig(receiverId string, exporterId string) string {
return fmt.Sprintf(`
receivers:
memory:
id: %s
exporters:
memory:
id: %s
service:
pipelines:
logs:
receivers:
- memory
exporters:
- memory
telemetry:
metrics:
level: none
`, receiverId, exporterId)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package collectorsimulator

import "testing"

func TestSimulateLogsProcessing(t *testing.T) {
t.Fatal("TODO(Raj): Implement simulateLogsProcessing test")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package inmemoryexporter

import "fmt"

type Config struct {
// Unique id for the exporter.
// Useful for getting a hold of the exporter in code that doesn't control
// its instantiation. Example: when instantiation happens in collector service
Id string `mapstructure:"id"`
}

func (c *Config) Validate() error {
if len(c.Id) < 1 {
return fmt.Errorf("inmemory exporter: id is required")
}
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package inmemoryexporter

import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
)

func TestValidate(t *testing.T) {
tests := []struct {
name string
rawConf *confmap.Conf
errorExpected bool
}{
{
name: "with id",
rawConf: confmap.NewFromStringMap(map[string]interface{}{
"id": "test_exporter",
}),
errorExpected: false,
},
{
name: "empty id",
rawConf: confmap.NewFromStringMap(map[string]interface{}{
"id": "",
}),
errorExpected: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
err := component.UnmarshalConfig(tt.rawConf, cfg)
require.NoError(t, err, "could not UnmarshalConfig")

err = component.ValidateConfig(cfg)
if tt.errorExpected {
require.NotNilf(t, err, "Invalid config did not return validation error: %v", cfg)
} else {
require.NoErrorf(t, err, "Valid config returned validation error: %v", cfg)
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package inmemoryexporter

import (
"context"

"github.com/google/uuid"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
)

func createDefaultConfig() component.Config {
return &Config{
Id: uuid.NewString(),
}
}

func createLogsExporter(
_ context.Context, _ exporter.CreateSettings, config component.Config,
) (exporter.Logs, error) {
if err := component.ValidateConfig(config); err != nil {
return nil, errors.Wrap(err, "invalid inmemory exporter config")
}
return &InMemoryLogsExporter{
id: config.(*Config).Id,
}, nil
}

// NewFactory creates a new OTLP receiver factory.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
"memory",
createDefaultConfig,
exporter.WithLogs(createLogsExporter, component.StabilityLevelBeta))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package inmemoryexporter

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func TestCreateLogsExporter(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

te, err := factory.CreateLogsExporter(
context.Background(), exporter.CreateSettings{}, cfg,
)
assert.NoError(t, err)
assert.NotNil(t, te)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package inmemoryexporter

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
)

/* An in-memory exporter for use in testing and previewing log pipelines.*/
type InMemoryLogsExporter struct {
// Unique identifier for the exporter.
id string
// mu protects the data below
mu sync.Mutex
// slice of pdata.Logs that were received by this exporter.
logs []plog.Logs
}

// ConsumeLogs implements component.LogsExporter.
func (e *InMemoryLogsExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
e.mu.Lock()
defer e.mu.Unlock()

e.logs = append(e.logs, ld)
return nil
}

// GetLogs returns a slice of pdata.Logs that were received by this exporter.
func (e *InMemoryLogsExporter) GetLogs() []plog.Logs {
e.mu.Lock()
defer e.mu.Unlock()

return e.logs
}

// ResetLogs removes all logs that were received by this exporter.
func (e *InMemoryLogsExporter) ResetLogs() {
e.mu.Lock()
defer e.mu.Unlock()

e.logs = nil
}

func (e *InMemoryLogsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// Keep track of all exporter instances in the process.
// Useful for getting a hold of the exporter in scenarios where one doesn't
// create the instances. Eg: bringing up a collector service from collector config
var allExporterInstances map[string]*InMemoryLogsExporter
var allExportersLock sync.Mutex

func init() {
allExporterInstances = make(map[string]*InMemoryLogsExporter)
}

func (e *InMemoryLogsExporter) Start(ctx context.Context, host component.Host) error {
allExportersLock.Lock()
defer allExportersLock.Unlock()

allExporterInstances[e.id] = e
return nil
}

func (e *InMemoryLogsExporter) Shutdown(ctx context.Context) error {
allExportersLock.Lock()
defer allExportersLock.Unlock()

delete(allExporterInstances, e.id)
return nil
}

func GetExporterInstance(id string) *InMemoryLogsExporter {
return allExporterInstances[id]
}
Loading