From 3c3b675321e7cbd0ebfc98c06960a731d1464e40 Mon Sep 17 00:00:00 2001 From: Arik Sher <104715391+arik-dig@users.noreply.github.com> Date: Sun, 13 Aug 2023 17:32:47 +0300 Subject: [PATCH 01/10] kafka exporter - using batchpersignal to SplitTraces by traceId --- exporter/kafkaexporter/go.mod | 3 +++ exporter/kafkaexporter/pdata_marshaler.go | 24 +++++++++++++++-------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index 103b799d0463..a338ee260a03 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -9,6 +9,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/jaegertracing/jaeger v1.41.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.82.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.82.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.82.0 github.com/stretchr/testify v1.8.4 github.com/xdg-go/scram v1.1.2 @@ -82,6 +83,8 @@ require ( replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger => ../../pkg/translator/jaeger retract ( diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index d4511946b3bc..63bba7ce9144 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -8,6 +8,8 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" ) type pdataLogsMarshaler struct { @@ -74,16 +76,22 @@ type pdataTracesMarshaler struct { } func (p pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { - bts, err := p.marshaler.MarshalTraces(td) - if err != nil { - return nil, err - } - return []*sarama.ProducerMessage{ - { + var messages []*sarama.ProducerMessage + + for _, tracesById := range batchpersignal.SplitTraces(td) { + bts, err := p.marshaler.MarshalTraces(tracesById) + if err != nil { + return nil, err + } + var traceId = tracesById.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() + key := []byte(traceId.String()) + messages = append(messages, &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(bts), - }, - }, nil + Key: sarama.ByteEncoder(key), + }) + } + return messages, nil } func (p pdataTracesMarshaler) Encoding() string { From 19b29d21ceb43f72e8b4a23f89b7710c8459e62f Mon Sep 17 00:00:00 2001 From: Arik Sher <104715391+arik-dig@users.noreply.github.com> Date: Mon, 21 Aug 2023 12:39:06 +0300 Subject: [PATCH 02/10] using traceutil TraceIDToHexOrEmptyString --- exporter/kafkaexporter/pdata_marshaler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 63bba7ce9144..1e78350bd0d1 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" ) @@ -83,8 +84,8 @@ func (p pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*sarama if err != nil { return nil, err } - var traceId = tracesById.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() - key := []byte(traceId.String()) + var traceID = tracesById.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() + key := traceutil.TraceIDToHexOrEmptyString(traceID) messages = append(messages, &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(bts), From 21421bc2e8b586f6983c10eb5079f791e14c8650 Mon Sep 17 00:00:00 2001 From: Arik Sher <104715391+arik-dig@users.noreply.github.com> Date: Mon, 21 Aug 2023 15:09:41 +0300 Subject: [PATCH 03/10] add KeyData --- exporter/kafkaexporter/README.md | 3 +++ exporter/kafkaexporter/config.go | 3 +++ exporter/kafkaexporter/config_test.go | 2 ++ exporter/kafkaexporter/factory.go | 2 ++ exporter/kafkaexporter/kafka_exporter_test.go | 2 ++ 5 files changed, 12 insertions(+) diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index d2a9d40a1612..db8a5d56e11d 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -34,6 +34,9 @@ The following settings can be optionally configured: - `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.\ - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. +- `key_data` (default = none): the key of the traces sent to kafka. All available key options: + - `none`: no use of key + - `traceID`: taking the TraceID from `ExportTraceServiceRequest` and use it as a string key - `auth` - `plain_text` - `username`: The username to use. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index 4ddffe6d9b41..80301e8f0869 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -28,6 +28,9 @@ type Config struct { // Encoding of messages (default "otlp_proto") Encoding string `mapstructure:"encoding"` + // KeyData of messages (default "none") + KeyData string `mapstructure:"key_data"` + // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. Metadata Metadata `mapstructure:"metadata"` diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index e3fd449d3482..8408976cfec8 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -55,6 +55,7 @@ func TestLoadConfig(t *testing.T) { }, Topic: "spans", Encoding: "otlp_proto", + KeyData: "none", Brokers: []string{"foo:123", "bar:456"}, Authentication: Authentication{ PlainText: &PlainTextConfig{ @@ -107,6 +108,7 @@ func TestLoadConfig(t *testing.T) { }, Topic: "spans", Encoding: "otlp_proto", + KeyData: "none", Brokers: []string{"foo:123", "bar:456"}, Authentication: Authentication{ PlainText: &PlainTextConfig{ diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index 15d317652048..b5a3a8a32686 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -21,6 +21,7 @@ const ( defaultMetricsTopic = "otlp_metrics" defaultLogsTopic = "otlp_logs" defaultEncoding = "otlp_proto" + defaultKeyData = "none" defaultBroker = "localhost:9092" // default from sarama.NewConfig() defaultMetadataRetryMax = 3 @@ -96,6 +97,7 @@ func createDefaultConfig() component.Config { // using an empty topic to track when it has not been set by user, default is based on traces or metrics. Topic: "", Encoding: defaultEncoding, + KeyData: defaultKeyData, Metadata: Metadata{ Full: defaultMetadataFull, Retry: MetadataRetry{ diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index f812842b8a7a..d31444363d1a 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -89,6 +89,7 @@ func TestNewExporter_err_auth_type(t *testing.T) { }, }, Encoding: defaultEncoding, + KeyData: defaultKeyData, Metadata: Metadata{ Full: false, }, @@ -114,6 +115,7 @@ func TestNewExporter_err_auth_type(t *testing.T) { func TestNewExporter_err_compression(t *testing.T) { c := Config{ Encoding: defaultEncoding, + KeyData: defaultKeyData, Producer: Producer{ Compression: "idk", }, From 371ae71063dc2e3e088943ac5f40938b8a880a80 Mon Sep 17 00:00:00 2001 From: Arik Sher <104715391+arik-dig@users.noreply.github.com> Date: Mon, 21 Aug 2023 20:30:10 +0300 Subject: [PATCH 04/10] init trace marshalers by KeyData --- exporter/kafkaexporter/factory.go | 10 +++++++++- exporter/kafkaexporter/factory_test.go | 2 ++ exporter/kafkaexporter/jaeger_marshaler.go | 13 +++++++++++++ exporter/kafkaexporter/kafka_exporter.go | 6 +++++- exporter/kafkaexporter/kafka_exporter_test.go | 17 ++++++++++++++--- exporter/kafkaexporter/marshaler.go | 19 +++++++++++++------ exporter/kafkaexporter/marshaler_test.go | 18 ++++++++++-------- exporter/kafkaexporter/pdata_marshaler.go | 8 +++++++- 8 files changed, 73 insertions(+), 20 deletions(-) diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index b5a3a8a32686..41c6d02a8670 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -46,7 +46,7 @@ type FactoryOption func(factory *kafkaExporterFactory) func WithTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption { return func(factory *kafkaExporterFactory) { for _, marshaler := range tracesMarshalers { - factory.tracesMarshalers[marshaler.Encoding()] = marshaler + factory.tracesMarshalers[KeyOfTracerMarshaller(marshaler)] = marshaler } } } @@ -120,6 +120,14 @@ type kafkaExporterFactory struct { logsMarshalers map[string]LogsMarshaler } +func KeyOfTracerMarshaller(marshaler TracesMarshaler) string { + return KeyOfTracerMarshallerBy(marshaler.Encoding(), marshaler.KeyData()) +} + +func KeyOfTracerMarshallerBy(encoding string, keyData string) string { + return encoding + "#" + keyData +} + func (f *kafkaExporterFactory) createTracesExporter( ctx context.Context, set exporter.CreateSettings, diff --git a/exporter/kafkaexporter/factory_test.go b/exporter/kafkaexporter/factory_test.go index 4451dbb4de68..47c759eda2a8 100644 --- a/exporter/kafkaexporter/factory_test.go +++ b/exporter/kafkaexporter/factory_test.go @@ -32,6 +32,8 @@ type mockMarshaler[Data data] struct { func (mm *mockMarshaler[Data]) Encoding() string { return mm.encoding } +func (mm *mockMarshaler[Data]) KeyData() string { return "none" } + func (mm *mockMarshaler[Data]) Marshal(d Data, topic string) ([]*sarama.ProducerMessage, error) { if mm.consume != nil { return mm.consume(d, topic) diff --git a/exporter/kafkaexporter/jaeger_marshaler.go b/exporter/kafkaexporter/jaeger_marshaler.go index abc73c22f18a..eed21b73e1b3 100644 --- a/exporter/kafkaexporter/jaeger_marshaler.go +++ b/exporter/kafkaexporter/jaeger_marshaler.go @@ -53,9 +53,14 @@ func (j jaegerMarshaler) Encoding() string { return j.marshaler.encoding() } +func (j jaegerMarshaler) KeyData() string { + return j.marshaler.keyData() +} + type jaegerSpanMarshaler interface { marshal(span *jaegerproto.Span) ([]byte, error) encoding() string + keyData() string } type jaegerProtoSpanMarshaler struct { @@ -71,6 +76,10 @@ func (p jaegerProtoSpanMarshaler) encoding() string { return "jaeger_proto" } +func (p jaegerProtoSpanMarshaler) keyData() string { + return "none" +} + type jaegerJSONSpanMarshaler struct { pbMarshaler *jsonpb.Marshaler } @@ -92,3 +101,7 @@ func (p jaegerJSONSpanMarshaler) marshal(span *jaegerproto.Span) ([]byte, error) func (p jaegerJSONSpanMarshaler) encoding() string { return "jaeger_json" } + +func (p jaegerJSONSpanMarshaler) keyData() string { + return "none" +} diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 638dfc439570..d99ec9b6d6b5 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -18,6 +18,7 @@ import ( ) var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") +var errUnrecognizedKeyData = fmt.Errorf("unrecognized key_data") // kafkaTracesProducer uses sarama to produce trace messages to Kafka. type kafkaTracesProducer struct { @@ -178,8 +179,11 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m // newTracesExporter creates Kafka exporter. func newTracesExporter(config Config, set exporter.CreateSettings, marshalers map[string]TracesMarshaler) (*kafkaTracesProducer, error) { - marshaler := marshalers[config.Encoding] + marshaler := marshalers[KeyOfTracerMarshallerBy(config.Encoding, config.KeyData)] if marshaler == nil { + if config.KeyData != "none" && config.KeyData != "traceID" { + return nil, errUnrecognizedKeyData + } return nil, errUnrecognizedEncoding } producer, err := newSaramaProducer(config) diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index d31444363d1a..3771cdd39b8e 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -30,12 +30,19 @@ func TestNewExporter_err_version(t *testing.T) { } func TestNewExporter_err_encoding(t *testing.T) { - c := Config{Encoding: "foo"} + c := Config{Encoding: "foo", KeyData: defaultKeyData} texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers()) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) assert.Nil(t, texp) } +func TestNewExporter_err_keyData(t *testing.T) { + c := Config{Encoding: defaultEncoding, KeyData: "foo"} + texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers()) + assert.EqualError(t, err, errUnrecognizedKeyData.Error()) + assert.Nil(t, texp) +} + func TestNewMetricsExporter_err_version(t *testing.T) { c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding} mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers()) @@ -133,7 +140,7 @@ func TestTracesPusher(t *testing.T) { p := kafkaTracesProducer{ producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, defaultKeyData), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -150,7 +157,7 @@ func TestTracesPusher_err(t *testing.T) { p := kafkaTracesProducer{ producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, defaultKeyData), logger: zap.NewNop(), } t.Cleanup(func() { @@ -297,6 +304,10 @@ func (e tracesErrorMarshaler) Encoding() string { panic("implement me") } +func (e tracesErrorMarshaler) KeyData() string { + panic("implement me") +} + func (e logsErrorMarshaler) Marshal(_ plog.Logs, _ string) ([]*sarama.ProducerMessage, error) { return nil, e.err } diff --git a/exporter/kafkaexporter/marshaler.go b/exporter/kafkaexporter/marshaler.go index 38525f9fbec7..56beac62f7e6 100644 --- a/exporter/kafkaexporter/marshaler.go +++ b/exporter/kafkaexporter/marshaler.go @@ -17,6 +17,9 @@ type TracesMarshaler interface { // Encoding returns encoding name Encoding() string + + // KeyData returns keyData name + KeyData() string } // MetricsMarshaler marshals metrics into Message array @@ -39,15 +42,19 @@ type LogsMarshaler interface { // tracesMarshalers returns map of supported encodings with TracesMarshaler. func tracesMarshalers() map[string]TracesMarshaler { - otlpPb := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding) - otlpJSON := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json") + otlpPbAndKeyNone := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, "none") + otlpPbAndKeyTraceId := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, "traceID") + otlpJsonAndKeyNone := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json", "none") + otlpJsonAndKeyTraceId := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json", "traceID") jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}} jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()} return map[string]TracesMarshaler{ - otlpPb.Encoding(): otlpPb, - otlpJSON.Encoding(): otlpJSON, - jaegerProto.Encoding(): jaegerProto, - jaegerJSON.Encoding(): jaegerJSON, + KeyOfTracerMarshaller(otlpPbAndKeyNone): otlpPbAndKeyNone, + KeyOfTracerMarshaller(otlpPbAndKeyTraceId): otlpPbAndKeyTraceId, + KeyOfTracerMarshaller(otlpJsonAndKeyNone): otlpJsonAndKeyNone, + KeyOfTracerMarshaller(otlpJsonAndKeyTraceId): otlpJsonAndKeyTraceId, + KeyOfTracerMarshaller(jaegerProto): jaegerProto, + KeyOfTracerMarshaller(jaegerJSON): jaegerJSON, } } diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index d1a70f47a04d..55989ff63be7 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -17,15 +17,17 @@ import ( ) func TestDefaultTracesMarshalers(t *testing.T) { - expectedEncodings := []string{ - "otlp_proto", - "otlp_json", - "jaeger_proto", - "jaeger_json", + expectedKeys := []string{ + KeyOfTracerMarshallerBy("otlp_proto", "none"), + KeyOfTracerMarshallerBy("otlp_proto", "traceID"), + KeyOfTracerMarshallerBy("otlp_json", "none"), + KeyOfTracerMarshallerBy("otlp_json", "traceID"), + KeyOfTracerMarshallerBy("jaeger_proto", "none"), + KeyOfTracerMarshallerBy("jaeger_json", "none"), } marshalers := tracesMarshalers() - assert.Equal(t, len(expectedEncodings), len(marshalers)) - for _, e := range expectedEncodings { + assert.Equal(t, len(expectedKeys), len(marshalers)) + for _, e := range expectedKeys { t.Run(e, func(t *testing.T) { m, ok := marshalers[e] require.True(t, ok) @@ -91,7 +93,7 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}) span.SetParentSpanID([8]byte{8, 9, 10, 11, 12, 13, 14}) - marshaler, ok := tracesMarshalers()["otlp_json"] + marshaler, ok := tracesMarshalers()[KeyOfTracerMarshallerBy("otlp_json", defaultKeyData)] require.True(t, ok, "Must have otlp json marshaller") msg, err := marshaler.Marshal(traces, t.Name()) diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 1e78350bd0d1..04d917ecfb6b 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -74,6 +74,7 @@ func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string) Metr type pdataTracesMarshaler struct { marshaler ptrace.Marshaler encoding string + keyData string } func (p pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { @@ -99,9 +100,14 @@ func (p pdataTracesMarshaler) Encoding() string { return p.encoding } -func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string) TracesMarshaler { +func (p pdataTracesMarshaler) KeyData() string { + return p.keyData +} + +func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string, keyData string) TracesMarshaler { return pdataTracesMarshaler{ marshaler: marshaler, encoding: encoding, + keyData: keyData, } } From 9bd24284536fddb1e05a8aa7da198eb45a52db73 Mon Sep 17 00:00:00 2001 From: Arik Sher <104715391+arik-dig@users.noreply.github.com> Date: Mon, 21 Aug 2023 20:56:14 +0300 Subject: [PATCH 05/10] verify default value for KeyData --- exporter/kafkaexporter/factory_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exporter/kafkaexporter/factory_test.go b/exporter/kafkaexporter/factory_test.go index 47c759eda2a8..62f649f4b4d5 100644 --- a/exporter/kafkaexporter/factory_test.go +++ b/exporter/kafkaexporter/factory_test.go @@ -58,6 +58,7 @@ func TestCreateDefaultConfig(t *testing.T) { cfg := createDefaultConfig().(*Config) assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) + assert.Equal(t, defaultKeyData, cfg.KeyData) assert.Equal(t, []string{defaultBroker}, cfg.Brokers) assert.Equal(t, "", cfg.Topic) } From 6d26b8bef778d151a391f6213e2fa879d0938b96 Mon Sep 17 00:00:00 2001 From: Arik Sher <104715391+arik-dig@users.noreply.github.com> Date: Mon, 21 Aug 2023 21:10:09 +0300 Subject: [PATCH 06/10] using dedicated pdataTracesMarshalerByTraceId --- exporter/kafkaexporter/pdata_marshaler.go | 38 ++++++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 04d917ecfb6b..984a0bd085ed 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -74,10 +74,32 @@ func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string) Metr type pdataTracesMarshaler struct { marshaler ptrace.Marshaler encoding string - keyData string } func (p pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { + bts, err := p.marshaler.MarshalTraces(td) + if err != nil { + return nil, err + } + return []*sarama.ProducerMessage{ + { + Topic: topic, + Value: sarama.ByteEncoder(bts), + }, + }, nil +} + +func (p pdataTracesMarshaler) Encoding() string { + return p.encoding +} + +func (p pdataTracesMarshaler) KeyData() string { + return "none" +} + +type pdataTracesMarshalerByTraceId pdataTracesMarshaler + +func (p pdataTracesMarshalerByTraceId) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { var messages []*sarama.ProducerMessage for _, tracesById := range batchpersignal.SplitTraces(td) { @@ -96,18 +118,24 @@ func (p pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*sarama return messages, nil } -func (p pdataTracesMarshaler) Encoding() string { +func (p pdataTracesMarshalerByTraceId) Encoding() string { return p.encoding } -func (p pdataTracesMarshaler) KeyData() string { - return p.keyData +func (p pdataTracesMarshalerByTraceId) KeyData() string { + return "traceID" } func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string, keyData string) TracesMarshaler { + if keyData == "traceID" { + return pdataTracesMarshalerByTraceId{ + marshaler: marshaler, + encoding: encoding, + } + } + return pdataTracesMarshaler{ marshaler: marshaler, encoding: encoding, - keyData: keyData, } } From c1f045af8c55c03e4cc4dc4abf4ecbcb4c7dacfb Mon Sep 17 00:00:00 2001 From: Arik Sher <104715391+arik-dig@users.noreply.github.com> Date: Mon, 21 Aug 2023 21:23:50 +0300 Subject: [PATCH 07/10] create newPdataTracesMarshalerByTraceId --- exporter/kafkaexporter/kafka_exporter_test.go | 4 ++-- exporter/kafkaexporter/marshaler.go | 20 +++++++++---------- exporter/kafkaexporter/pdata_marshaler.go | 14 ++++++------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index 3771cdd39b8e..276dfbcc0537 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -140,7 +140,7 @@ func TestTracesPusher(t *testing.T) { p := kafkaTracesProducer{ producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, defaultKeyData), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -157,7 +157,7 @@ func TestTracesPusher_err(t *testing.T) { p := kafkaTracesProducer{ producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, defaultKeyData), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding), logger: zap.NewNop(), } t.Cleanup(func() { diff --git a/exporter/kafkaexporter/marshaler.go b/exporter/kafkaexporter/marshaler.go index 56beac62f7e6..ff7405642a56 100644 --- a/exporter/kafkaexporter/marshaler.go +++ b/exporter/kafkaexporter/marshaler.go @@ -42,19 +42,19 @@ type LogsMarshaler interface { // tracesMarshalers returns map of supported encodings with TracesMarshaler. func tracesMarshalers() map[string]TracesMarshaler { - otlpPbAndKeyNone := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, "none") - otlpPbAndKeyTraceId := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, "traceID") - otlpJsonAndKeyNone := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json", "none") - otlpJsonAndKeyTraceId := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json", "traceID") + otlpPb := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding) + otlpPbByTraceId := newPdataTracesMarshalerByTraceId(&ptrace.ProtoMarshaler{}, defaultEncoding) + otlpJSON := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json") + otlpJsonByTraceId := newPdataTracesMarshalerByTraceId(&ptrace.JSONMarshaler{}, "otlp_json") jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}} jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()} return map[string]TracesMarshaler{ - KeyOfTracerMarshaller(otlpPbAndKeyNone): otlpPbAndKeyNone, - KeyOfTracerMarshaller(otlpPbAndKeyTraceId): otlpPbAndKeyTraceId, - KeyOfTracerMarshaller(otlpJsonAndKeyNone): otlpJsonAndKeyNone, - KeyOfTracerMarshaller(otlpJsonAndKeyTraceId): otlpJsonAndKeyTraceId, - KeyOfTracerMarshaller(jaegerProto): jaegerProto, - KeyOfTracerMarshaller(jaegerJSON): jaegerJSON, + KeyOfTracerMarshaller(otlpPb): otlpPb, + KeyOfTracerMarshaller(otlpPbByTraceId): otlpPbByTraceId, + KeyOfTracerMarshaller(otlpJSON): otlpJSON, + KeyOfTracerMarshaller(otlpJsonByTraceId): otlpJsonByTraceId, + KeyOfTracerMarshaller(jaegerProto): jaegerProto, + KeyOfTracerMarshaller(jaegerJSON): jaegerJSON, } } diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 984a0bd085ed..579ca76d397e 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -126,15 +126,15 @@ func (p pdataTracesMarshalerByTraceId) KeyData() string { return "traceID" } -func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string, keyData string) TracesMarshaler { - if keyData == "traceID" { - return pdataTracesMarshalerByTraceId{ - marshaler: marshaler, - encoding: encoding, - } +func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string) TracesMarshaler { + return pdataTracesMarshaler{ + marshaler: marshaler, + encoding: encoding, } +} - return pdataTracesMarshaler{ +func newPdataTracesMarshalerByTraceId(marshaler ptrace.Marshaler, encoding string) TracesMarshaler { + return pdataTracesMarshalerByTraceId{ marshaler: marshaler, encoding: encoding, } From 2f7c207fac3d447bd03d1532c2a79f7aa80b68fa Mon Sep 17 00:00:00 2001 From: Arik Sher <104715391+arik-dig@users.noreply.github.com> Date: Mon, 21 Aug 2023 21:30:46 +0300 Subject: [PATCH 08/10] add comment about key_data = traceID --- exporter/kafkaexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index db8a5d56e11d..4b3122694353 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -36,7 +36,7 @@ The following settings can be optionally configured: - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. - `key_data` (default = none): the key of the traces sent to kafka. All available key options: - `none`: no use of key - - `traceID`: taking the TraceID from `ExportTraceServiceRequest` and use it as a string key + - `traceID`: taking the TraceID from `ExportTraceServiceRequest` and use it as a string key. note that it affects the partition number, and allows concurrent consumers - `auth` - `plain_text` - `username`: The username to use. From 3b9388e66235df5504bf8744fffa01c5a960648c Mon Sep 17 00:00:00 2001 From: Arik Sher <104715391+arik-dig@users.noreply.github.com> Date: Mon, 21 Aug 2023 21:34:10 +0300 Subject: [PATCH 09/10] using key_data of traceID --- exporter/kafkaexporter/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index 4b3122694353..94d9781cb505 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -34,7 +34,7 @@ The following settings can be optionally configured: - `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.\ - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. -- `key_data` (default = none): the key of the traces sent to kafka. All available key options: +- `key_data` (default = none): affects key of the traces sent to kafka. All available key options: - `none`: no use of key - `traceID`: taking the TraceID from `ExportTraceServiceRequest` and use it as a string key. note that it affects the partition number, and allows concurrent consumers - `auth` @@ -101,4 +101,5 @@ exporters: brokers: - localhost:9092 protocol_version: 2.0.0 + key_data: "traceID" ``` From 9c7bb5a07dd68c08b0cdc03dbea098c0bcf4534d Mon Sep 17 00:00:00 2001 From: Arik Sher <104715391+arik-dig@users.noreply.github.com> Date: Tue, 22 Aug 2023 16:58:10 +0300 Subject: [PATCH 10/10] add change log yaml file --- .../pr25909-kafka-exporter-by-traceid.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/pr25909-kafka-exporter-by-traceid.yaml diff --git a/.chloggen/pr25909-kafka-exporter-by-traceid.yaml b/.chloggen/pr25909-kafka-exporter-by-traceid.yaml new file mode 100644 index 000000000000..5c711704c176 --- /dev/null +++ b/.chloggen/pr25909-kafka-exporter-by-traceid.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkaexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add ability to publish kafka messages with key of TraceID - it will allow to partition the kafka Topic and consume it concurrently. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [12318] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api]