Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/kafkaexporter] ExporterFactory log/metric marshaler options #13433

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ const (
// FactoryOption applies changes to kafkaExporterFactory.
type FactoryOption func(factory *kafkaExporterFactory)

// WithTracesMarshalers adds tracesMarshalers.
// WithTracesMarshalers adds tracesMarshalers to the exporter factory.
// This allows custom-built collectors to configure custom Kafka marshaler(s) for trace data.
// An example use case is keying the message by trace ID so downstream collectors could do tail-based sampling with horizontal scale.
func WithTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaler := range tracesMarshalers {
Expand All @@ -62,6 +64,36 @@ func WithTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption {
}
}

// WithMetricsMarshalers adds metricsMarshalers to the exporter factory.
// This allows custom-built collectors to configure custom Kafka marshaler(s) for metric data.
// An example use case is keying the message by resource attribute values so downstream collectors can be horizontally scaled and still deliver metrics in the correct order to the backend.
// Another use case might be pre-aggregating metrics to reduce backend update throughput.
func WithMetricsMarshalers(metricsMarshalers ...MetricsMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaler := range metricsMarshalers {
factory.metricsMarshalers[marshaler.Encoding()] = marshaler
}
}
}

// WithLogsMarshalers adds logsMarshalers to the exporter factory.
// This allows custom-built collectors to configure custom Kafka marshaler(s) for log data.
// An example use case is keying the message by resource attribute values so downstream collectors can be horizontally scaled and still deliver logs in the correct order to the backend.
func WithLogsMarshalers(logsMarshalers ...LogsMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaler := range logsMarshalers {
factory.logsMarshalers[marshaler.Encoding()] = marshaler
}
}
}

// WithProducerFactory sets an alternative producer factory.
func WithProducerFactory(producerFactory ProducerFactoryFunc) FactoryOption {
return func(factory *kafkaExporterFactory) {
factory.producerFactory = producerFactory
}
}

Comment on lines +90 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you expand upon why you would need an alternative producer factory, it isn't clear to me why you would need this and why you'd want to use this.

Copy link
Contributor Author

@bgranetzke bgranetzke Sep 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was only to test the custom marshaler option. I did not like it either, but due to how the actual factory is wrapped inside the component.ExporterFactory, I had to inject the mock during NewFactory(). I didn't want to create kafkaExporterFactory directly because then if felt like I wasn't testing the new options I was adding.

Totally open to an alternative approach.

// NewFactory creates Kafka exporter factory.
func NewFactory(options ...FactoryOption) component.ExporterFactory {
f := &kafkaExporterFactory{
Expand Down Expand Up @@ -107,10 +139,13 @@ func createDefaultConfig() config.Exporter {
}
}

type ProducerFactoryFunc func(config Config) (sarama.SyncProducer, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment exported types for their intended use.


type kafkaExporterFactory struct {
tracesMarshalers map[string]TracesMarshaler
metricsMarshalers map[string]MetricsMarshaler
logsMarshalers map[string]LogsMarshaler
producerFactory ProducerFactoryFunc
}

func (f *kafkaExporterFactory) createTracesExporter(
Expand All @@ -125,7 +160,10 @@ func (f *kafkaExporterFactory) createTracesExporter(
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newTracesExporter(oCfg, set, f.tracesMarshalers)
if f.producerFactory == nil {
f.producerFactory = newSaramaProducer
}
exp, err := newTracesExporter(oCfg, set, f.producerFactory, f.tracesMarshalers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -155,7 +193,10 @@ func (f *kafkaExporterFactory) createMetricsExporter(
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newMetricsExporter(oCfg, set, f.metricsMarshalers)
if f.producerFactory == nil {
f.producerFactory = newSaramaProducer
}
exp, err := newMetricsExporter(oCfg, set, f.producerFactory, f.metricsMarshalers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -185,7 +226,10 @@ func (f *kafkaExporterFactory) createLogsExporter(
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newLogsExporter(oCfg, set, f.logsMarshalers)
if f.producerFactory == nil {
f.producerFactory = newSaramaProducer
}
exp, err := newLogsExporter(oCfg, set, f.producerFactory, f.logsMarshalers)
if err != nil {
return nil, err
}
Expand Down
169 changes: 149 additions & 20 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ package kafkaexporter

import (
"context"
"fmt"
"reflect"
"testing"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -140,35 +146,158 @@ func TestCreateLogsExporter_err(t *testing.T) {
}

func TestWithMarshalers(t *testing.T) {
cm := &customMarshaler{}
f := NewFactory(WithTracesMarshalers(cm))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = cm.Encoding()
exporter, err := f.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
customMarshalerEncoding := "custom"

// For simplicity, all the custom marshalers will produce the same message
customMarshalMessage := &sarama.ProducerMessage{
Topic: "any_topic",
Key: sarama.StringEncoder("key that could only come from the custom marshaller"),
Value: sarama.StringEncoder("value"),
}

// sets up mock to either expect the custom message or not
setupProducerMock := func(expectCustom bool) sarama.SyncProducer {
producer := mocks.NewSyncProducer(t, nil)
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(func(pm *sarama.ProducerMessage) error {
isCustom := pm != nil &&
pm.Topic == customMarshalMessage.Topic &&
pm.Key == customMarshalMessage.Key &&
pm.Value == customMarshalMessage.Value
if expectCustom {
if !isCustom {
return fmt.Errorf("marshaled message was not custom")
}
} else {
if isCustom {
return fmt.Errorf("marshaled message was custom and expecting default")
}
}
return nil
})
return producer
}

// config is consistent except encoding to use
setupConfig := func(encoding string) *Config {
cfg := createDefaultConfig().(*Config)
cfg.Encoding = encoding
cfg.QueueSettings = exporterhelper.QueueSettings{
Enabled: false,
}
cfg.RetrySettings = exporterhelper.RetrySettings{
Enabled: false,
}
return cfg
}

// all the marshaler tests are setup the same
setupFactory := func(producer sarama.SyncProducer) component.ExporterFactory {
f := NewFactory(
WithProducerFactory(func(config Config) (sarama.SyncProducer, error) { return producer, nil }),
WithTracesMarshalers(&mockTraceMarshaler{
MarshalFunc: func(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
return []*sarama.ProducerMessage{
customMarshalMessage,
}, nil
},
EncodingValue: customMarshalerEncoding,
}),
WithMetricsMarshalers(&mockMetricsMarshaler{
MarshalFunc: func(metrics pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) {
return []*sarama.ProducerMessage{
customMarshalMessage,
}, nil
},
EncodingValue: customMarshalerEncoding,
}),
WithLogsMarshalers(&mockLogMarshaler{
MarshalFunc: func(logs plog.Logs, topic string) ([]*sarama.ProducerMessage, error) {
return []*sarama.ProducerMessage{
customMarshalMessage,
}, nil
},
EncodingValue: customMarshalerEncoding,
}),
)

return f
}

traces := ptrace.NewTraces()
traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetName("testSpan")

t.Run("traces_custom_encoding", func(t *testing.T) {
cfg := setupConfig(customMarshalerEncoding)
producer := setupProducerMock(true)
exporterFactory := setupFactory(producer)

exporter, err := exporterFactory.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
consumeErr := exporter.ConsumeTraces(context.TODO(), traces)
require.NoError(t, err)
require.NoError(t, consumeErr)
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = defaultEncoding
exporter, err := f.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
t.Run("traces_default_encoding", func(t *testing.T) {
cfg := setupConfig(defaultEncoding)
producer := setupProducerMock(false)
exporterFactory := setupFactory(producer)

exporter, err := exporterFactory.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
consumeErr := exporter.ConsumeTraces(context.TODO(), traces)
require.NoError(t, err)
require.NoError(t, consumeErr)
assert.NotNil(t, exporter)
})
}

type customMarshaler struct {
}
metrics := pmetric.NewMetrics()
metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("test")
t.Run("metrics_custom_encoding", func(t *testing.T) {
cfg := setupConfig(customMarshalerEncoding)
producer := setupProducerMock(true)
exporterFactory := setupFactory(producer)

var _ TracesMarshaler = (*customMarshaler)(nil)
exporter, err := exporterFactory.CreateMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
consumeErr := exporter.ConsumeMetrics(context.TODO(), metrics)
require.NoError(t, err)
require.NoError(t, consumeErr)
require.NotNil(t, exporter)
})
t.Run("metrics_default_encoding", func(t *testing.T) {
cfg := setupConfig(defaultEncoding)
producer := setupProducerMock(false)
exporterFactory := setupFactory(producer)

func (c customMarshaler) Marshal(_ ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
panic("implement me")
}
exporter, err := exporterFactory.CreateMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
consumeErr := exporter.ConsumeMetrics(context.TODO(), metrics)
require.NoError(t, err)
require.NoError(t, consumeErr)
assert.NotNil(t, exporter)
})

logs := plog.NewLogs()
logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStringVal("test")
t.Run("logs_custom_encoding", func(t *testing.T) {
cfg := setupConfig(customMarshalerEncoding)
producer := setupProducerMock(true)
exporterFactory := setupFactory(producer)

exporter, err := exporterFactory.CreateLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
consumeErr := exporter.ConsumeLogs(context.TODO(), logs)
require.NoError(t, err)
require.NoError(t, consumeErr)
require.NotNil(t, exporter)
})
t.Run("logs_default_encoding", func(t *testing.T) {
cfg := setupConfig(defaultEncoding)
producer := setupProducerMock(false)
exporterFactory := setupFactory(producer)

exporter, err := exporterFactory.CreateLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
consumeErr := exporter.ConsumeLogs(context.TODO(), logs)
require.NoError(t, err)
require.NoError(t, consumeErr)
assert.NotNil(t, exporter)
})

func (c customMarshaler) Encoding() string {
return "custom"
}
16 changes: 10 additions & 6 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
return producer, nil
}

func newMetricsExporter(config Config, set component.ExporterCreateSettings, marshalers map[string]MetricsMarshaler) (*kafkaMetricsProducer, error) {
func newMetricsExporter(config Config, set component.ExporterCreateSettings, producerFactory ProducerFactoryFunc, marshalers map[string]MetricsMarshaler) (*kafkaMetricsProducer, error) {
marshaler := marshalers[config.Encoding]
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
producer, err := newSaramaProducer(config)

producer, err := producerFactory(config)
if err != nil {
return nil, err
}
Expand All @@ -188,15 +189,17 @@ func newMetricsExporter(config Config, set component.ExporterCreateSettings, mar
}

// newTracesExporter creates Kafka exporter.
func newTracesExporter(config Config, set component.ExporterCreateSettings, marshalers map[string]TracesMarshaler) (*kafkaTracesProducer, error) {
func newTracesExporter(config Config, set component.ExporterCreateSettings, producerFactory ProducerFactoryFunc, marshalers map[string]TracesMarshaler) (*kafkaTracesProducer, error) {
marshaler := marshalers[config.Encoding]
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
producer, err := newSaramaProducer(config)

producer, err := producerFactory(config)
if err != nil {
return nil, err
}

return &kafkaTracesProducer{
producer: producer,
topic: config.Topic,
Expand All @@ -205,12 +208,13 @@ func newTracesExporter(config Config, set component.ExporterCreateSettings, mars
}, nil
}

func newLogsExporter(config Config, set component.ExporterCreateSettings, marshalers map[string]LogsMarshaler) (*kafkaLogsProducer, error) {
func newLogsExporter(config Config, set component.ExporterCreateSettings, producerFactory ProducerFactoryFunc, marshalers map[string]LogsMarshaler) (*kafkaLogsProducer, error) {
marshaler := marshalers[config.Encoding]
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
producer, err := newSaramaProducer(config)

producer, err := producerFactory(config)
if err != nil {
return nil, err
}
Expand Down
Loading