diff --git a/.chloggen/kafka-exporter-key-by-traceid.yaml b/.chloggen/kafka-exporter-key-by-traceid.yaml new file mode 100644 index 000000000000..f256c8ecefa9 --- /dev/null +++ b/.chloggen/kafka-exporter-key-by-traceid.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkaexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add ability to publish kafka messages with message key of TraceID - it will allow partitioning of the kafka Topic. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [12318] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index d916a4246327..bd138b0fde56 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -37,6 +37,7 @@ The following settings can be optionally configured: - `zipkin_json`: the payload is serialized to Zipkin v2 JSON Span. - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. +- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. - `auth` - `plain_text` - `username`: The username to use. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index 186441f33d7c..e893d71385b7 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -38,6 +38,11 @@ type Config struct { // Encoding of messages (default "otlp_proto") Encoding string `mapstructure:"encoding"` + // PartitionTracesByID sets the message key of outgoing trace messages to the trace ID. + // Please note: does not have any effect on Jaeger encoding exporters since Jaeger exporters include + // trace ID as the message key by default. + PartitionTracesByID bool `mapstructure:"partition_traces_by_id"` + // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. Metadata Metadata `mapstructure:"metadata"` diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index ac080ae9ee9d..4b41c23d9792 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -54,9 +54,10 @@ func TestLoadConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Topic: "spans", - Encoding: "otlp_proto", - Brokers: []string{"foo:123", "bar:456"}, + Topic: "spans", + Encoding: "otlp_proto", + PartitionTracesByID: true, + Brokers: []string{"foo:123", "bar:456"}, Authentication: kafka.Authentication{ PlainText: &kafka.PlainTextConfig{ Username: "jdoe", @@ -106,9 +107,10 @@ func TestLoadConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Topic: "spans", - Encoding: "otlp_proto", - Brokers: []string{"foo:123", "bar:456"}, + Topic: "spans", + Encoding: "otlp_proto", + PartitionTracesByID: true, + Brokers: []string{"foo:123", "bar:456"}, Authentication: kafka.Authentication{ PlainText: &kafka.PlainTextConfig{ Username: "jdoe", @@ -159,6 +161,7 @@ func TestLoadConfig(t *testing.T) { }, Topic: "spans", Encoding: "otlp_proto", + PartitionTracesByID: true, Brokers: []string{"foo:123", "bar:456"}, ResolveCanonicalBootstrapServersOnly: true, Authentication: kafka.Authentication{ diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index 01f1711d2a1f..cb6d7837d368 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -9,6 +9,7 @@ require ( github.com/jaegertracing/jaeger v1.51.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.91.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.91.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.90.1 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.91.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.91.0 github.com/openzipkin/zipkin-go v0.4.2 @@ -84,6 +85,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/corei replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka => ../../internal/kafka +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger => ../../pkg/translator/jaeger retract ( diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 52ef71c48be3..202e4ca8f090 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -188,6 +188,11 @@ func newTracesExporter(config Config, set exporter.CreateSettings, marshalers ma if marshaler == nil { return nil, errUnrecognizedEncoding } + if config.PartitionTracesByID { + if keyableMarshaler, ok := marshaler.(KeyableTracesMarshaler); ok { + keyableMarshaler.Key() + } + } producer, err := newSaramaProducer(config) if err != nil { return nil, err diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index a94811c554cd..0d0cfba637f5 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/IBM/sarama" zipkin "github.com/openzipkin/zipkin-go/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -85,6 +86,8 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { ils := rs.ScopeSpans().At(0) ils.SetSchemaUrl(conventions.SchemaURL) ils.Spans().AppendEmpty() + ils.Spans().AppendEmpty() + ils.Spans().AppendEmpty() span := ils.Spans().At(0) span.SetKind(ptrace.SpanKindServer) @@ -95,9 +98,27 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}) span.SetParentSpanID([8]byte{8, 9, 10, 11, 12, 13, 14}) + span = ils.Spans().At(1) + span.SetKind(ptrace.SpanKindClient) + span.SetName("bar") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second))) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{15, 16, 17, 18, 19, 20, 21}) + span.SetParentSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}) + + span = ils.Spans().At(2) + span.SetKind(ptrace.SpanKindServer) + span.SetName("baz") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Second))) + span.SetTraceID([16]byte{17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}) + span.SetSpanID([8]byte{22, 23, 24, 25, 26, 27, 28}) + span.SetParentSpanID([8]byte{29, 30, 31, 32, 33, 34, 35, 36}) + // Since marshaling json is not guaranteed to be in order // within a string, using a map to compare that the expected values are there - otlpJSON := map[string]any{ + unkeyedOtlpJSON := map[string]any{ "resourceSpans": []any{ map[string]any{ "resource": map[string]any{}, @@ -115,6 +136,95 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { "endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()), "status": map[string]any{}, }, + map[string]any{ + "traceId": "0102030405060708090a0b0c0d0e0f10", + "spanId": "0f10111213141500", + "parentSpanId": "0001020304050607", + "name": "bar", + "kind": float64(ptrace.SpanKindClient), + "startTimeUnixNano": fmt.Sprint(now.UnixNano()), + "endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()), + "status": map[string]any{}, + }, + map[string]any{ + "traceId": "1112131415161718191a1b1c1d1e1f20", + "spanId": "161718191a1b1c00", + "parentSpanId": "1d1e1f2021222324", + "name": "baz", + "kind": float64(ptrace.SpanKindServer), + "startTimeUnixNano": fmt.Sprint(now.UnixNano()), + "endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()), + "status": map[string]any{}, + }, + }, + "schemaUrl": conventions.SchemaURL, + }, + }, + "schemaUrl": conventions.SchemaURL, + }, + }, + } + + unkeyedOtlpJSONResult := make([]any, 1) + unkeyedOtlpJSONResult[0] = unkeyedOtlpJSON + + keyedOtlpJSON1 := map[string]any{ + "resourceSpans": []any{ + map[string]any{ + "resource": map[string]any{}, + "scopeSpans": []any{ + map[string]any{ + "scope": map[string]any{}, + "spans": []any{ + map[string]any{ + "traceId": "0102030405060708090a0b0c0d0e0f10", + "spanId": "0001020304050607", + "parentSpanId": "08090a0b0c0d0e00", + "name": "foo", + "kind": float64(ptrace.SpanKindServer), + "startTimeUnixNano": fmt.Sprint(now.UnixNano()), + "endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()), + "status": map[string]any{}, + }, + map[string]any{ + "traceId": "0102030405060708090a0b0c0d0e0f10", + "spanId": "0f10111213141500", + "parentSpanId": "0001020304050607", + "name": "bar", + "kind": float64(ptrace.SpanKindClient), + "startTimeUnixNano": fmt.Sprint(now.UnixNano()), + "endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()), + "status": map[string]any{}, + }, + }, + "schemaUrl": conventions.SchemaURL, + }, + }, + "schemaUrl": conventions.SchemaURL, + }, + }, + } + + unkeyedMessageKey := []sarama.Encoder{nil} + + keyedOtlpJSON2 := map[string]any{ + "resourceSpans": []any{ + map[string]any{ + "resource": map[string]any{}, + "scopeSpans": []any{ + map[string]any{ + "scope": map[string]any{}, + "spans": []any{ + map[string]any{ + "traceId": "1112131415161718191a1b1c1d1e1f20", + "spanId": "161718191a1b1c00", + "parentSpanId": "1d1e1f2021222324", + "name": "baz", + "kind": float64(ptrace.SpanKindServer), + "startTimeUnixNano": fmt.Sprint(now.UnixNano()), + "endTimeUnixNano": fmt.Sprint(now.Add(time.Second).UnixNano()), + "status": map[string]any{}, + }, }, "schemaUrl": conventions.SchemaURL, }, @@ -124,7 +234,13 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { }, } - zipkinJSON := []any{ + keyedOtlpJSONResult := make([]any, 2) + keyedOtlpJSONResult[0] = keyedOtlpJSON1 + keyedOtlpJSONResult[1] = keyedOtlpJSON2 + + keyedMessageKey := []sarama.Encoder{sarama.ByteEncoder("0102030405060708090a0b0c0d0e0f10"), sarama.ByteEncoder("1112131415161718191a1b1c1d1e1f20")} + + unkeyedZipkinJSON := []any{ map[string]any{ "traceId": "0102030405060708090a0b0c0d0e0f10", "id": "0001020304050607", @@ -135,15 +251,83 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { "kind": string(zipkin.Server), "localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"}, }, + map[string]any{ + "traceId": "0102030405060708090a0b0c0d0e0f10", + "id": "0f10111213141500", + "parentId": "0001020304050607", + "name": "bar", + "timestamp": float64(time.Second.Microseconds()), + "duration": float64(time.Second.Microseconds()), + "kind": string(zipkin.Client), + "localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"}, + }, + map[string]any{ + "traceId": "1112131415161718191a1b1c1d1e1f20", + "id": "161718191a1b1c00", + "parentId": "1d1e1f2021222324", + "name": "baz", + "timestamp": float64(time.Second.Microseconds()), + "duration": float64(time.Second.Microseconds()), + "kind": string(zipkin.Server), + "localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"}, + }, } + unkeyedZipkinJSONResult := make([]any, 1) + unkeyedZipkinJSONResult[0] = unkeyedZipkinJSON + + keyedZipkinJSON1 := []any{ + map[string]any{ + "traceId": "0102030405060708090a0b0c0d0e0f10", + "id": "0001020304050607", + "parentId": "08090a0b0c0d0e00", + "name": "foo", + "timestamp": float64(time.Second.Microseconds()), + "duration": float64(time.Second.Microseconds()), + "kind": string(zipkin.Server), + "localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"}, + }, + map[string]any{ + "traceId": "0102030405060708090a0b0c0d0e0f10", + "id": "0f10111213141500", + "parentId": "0001020304050607", + "name": "bar", + "timestamp": float64(time.Second.Microseconds()), + "duration": float64(time.Second.Microseconds()), + "kind": string(zipkin.Client), + "localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"}, + }, + } + + keyedZipkinJSON2 := []any{ + map[string]any{ + "traceId": "1112131415161718191a1b1c1d1e1f20", + "id": "161718191a1b1c00", + "parentId": "1d1e1f2021222324", + "name": "baz", + "timestamp": float64(time.Second.Microseconds()), + "duration": float64(time.Second.Microseconds()), + "kind": string(zipkin.Server), + "localEndpoint": map[string]any{"serviceName": "otlpresourcenoservicename"}, + }, + } + + keyedZipkinJSONResult := make([]any, 2) + keyedZipkinJSONResult[0] = keyedZipkinJSON1 + keyedZipkinJSONResult[1] = keyedZipkinJSON2 + tests := []struct { - encoding string - expectedJSON any - unmarshaled any + encoding string + keyed bool + numExpectedMessages int + expectedJSON []any + expectedMessageKey []sarama.Encoder + unmarshaled any }{ - {encoding: "otlp_json", expectedJSON: otlpJSON, unmarshaled: map[string]any{}}, - {encoding: "zipkin_json", expectedJSON: zipkinJSON, unmarshaled: []map[string]any{}}, + {encoding: "otlp_json", numExpectedMessages: 1, expectedJSON: unkeyedOtlpJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: map[string]any{}}, + {encoding: "otlp_json", keyed: true, numExpectedMessages: 2, expectedJSON: keyedOtlpJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: map[string]any{}}, + {encoding: "zipkin_json", numExpectedMessages: 1, expectedJSON: unkeyedZipkinJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: []map[string]any{}}, + {encoding: "zipkin_json", keyed: true, numExpectedMessages: 2, expectedJSON: keyedZipkinJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: []map[string]any{}}, } for _, test := range tests { @@ -151,19 +335,27 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { marshaler, ok := tracesMarshalers()[test.encoding] require.True(t, ok, fmt.Sprintf("Must have %s marshaller", test.encoding)) + if test.keyed { + keyableMarshaler, ok := marshaler.(KeyableTracesMarshaler) + require.True(t, ok, "Must be a KeyableTracesMarshaler") + keyableMarshaler.Key() + } + msg, err := marshaler.Marshal(traces, t.Name()) require.NoError(t, err, "Must have marshaled the data without error") - require.Len(t, msg, 1, "Must have one entry in the message") - - data, err := msg[0].Value.Encode() - require.NoError(t, err, "Must not error when encoding value") - require.NotNil(t, data, "Must have valid data to test") + require.Len(t, msg, test.numExpectedMessages, "Expected number of messages in the message") - unmarshaled := test.unmarshaled - err = json.Unmarshal(data, &unmarshaled) - require.NoError(t, err, "Must not error marshaling expected data") + for idx, singleMsg := range msg { + data, err := singleMsg.Value.Encode() + require.NoError(t, err, "Must not error when encoding value") + require.NotNil(t, data, "Must have valid data to test") - assert.Equal(t, test.expectedJSON, unmarshaled, "Must match the expected value") + unmarshaled := test.unmarshaled + err = json.Unmarshal(data, &unmarshaled) + require.NoError(t, err, "Must not error marshaling expected data") + assert.Equal(t, test.expectedJSON[idx], unmarshaled, "Must match the expected value") + assert.Equal(t, test.expectedMessageKey[idx], singleMsg.Key) + } } } diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index d4511946b3bc..d9e38dd52caf 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -8,6 +8,9 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" ) type pdataLogsMarshaler struct { @@ -68,30 +71,59 @@ func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string) Metr } } +// KeyableTracesMarshaler is an extension of the TracesMarshaler interface inteded to provide partition key capabilities +// for trace messages +type KeyableTracesMarshaler interface { + TracesMarshaler + Key() +} + type pdataTracesMarshaler struct { marshaler ptrace.Marshaler encoding string + keyed bool } -func (p pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { - bts, err := p.marshaler.MarshalTraces(td) - if err != nil { - return nil, err - } - return []*sarama.ProducerMessage{ - { +func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { + var msgs []*sarama.ProducerMessage + if p.keyed { + for _, trace := range batchpersignal.SplitTraces(td) { + bts, err := p.marshaler.MarshalTraces(trace) + if err != nil { + return nil, err + } + msgs = append(msgs, &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(bts), + Key: sarama.ByteEncoder(traceutil.TraceIDToHexOrEmptyString(trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID())), + }) + + } + } else { + bts, err := p.marshaler.MarshalTraces(td) + if err != nil { + return nil, err + } + msgs = append(msgs, &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(bts), - }, - }, nil + }) + } + + return msgs, nil } -func (p pdataTracesMarshaler) Encoding() string { +func (p *pdataTracesMarshaler) Encoding() string { return p.encoding } +// Key configures the pdataTracesMarshaler to set the message key on the kafka messages +func (p *pdataTracesMarshaler) Key() { + p.keyed = true +} + func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string) TracesMarshaler { - return pdataTracesMarshaler{ + return &pdataTracesMarshaler{ marshaler: marshaler, encoding: encoding, } diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index 0f5ca49560b5..38b38bd5ac13 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -11,6 +11,7 @@ kafka: max_message_bytes: 10000000 required_acks: -1 # WaitForAll timeout: 10s + partition_traces_by_id: true auth: plain_text: username: jdoe diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index 2c109f197225..d9cd7769c60c 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -56,6 +56,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.90.1 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect @@ -84,6 +85,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal + replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter => ../../exporter/kafkaexporter replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal