diff --git a/.github/workflows/unit-integration-tests.yml b/.github/workflows/unit-integration-tests.yml index 71edd97b45..bc1faec295 100644 --- a/.github/workflows/unit-integration-tests.yml +++ b/.github/workflows/unit-integration-tests.yml @@ -175,20 +175,24 @@ jobs: image: memcached:1.5.9 ports: - 11211:11211 - zookeeper: - image: bitnami/zookeeper:latest - env: - ALLOW_ANONYMOUS_LOGIN: "yes" - ports: - - 2181:2181 kafka: - image: darccio/kafka:2.13-2.8.1 + image: confluentinc/confluent-local:7.5.0 env: - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 - KAFKA_CREATE_TOPICS: gotest:1:1,gosegtest:1:1 - KAFKA_BROKER_ID: 1 + KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9093,BROKER://localhost:9092" + KAFKA_REST_BOOTSTRAP_SERVERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092" + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9094" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT" + KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_BROKER_ID: "1" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" + KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: "1" + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1" + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1" + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0" + KAFKA_NODE_ID: "1" + KAFKA_PROCESS_ROLES: "broker,controller" + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" ports: - 9092:9092 localstack: diff --git a/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go b/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go index e3331c90e1..43633fbf48 100644 --- a/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go +++ b/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go @@ -3,6 +3,13 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2024 Datadog, Inc. +// Package tracing contains tracing logic for the cloud.google.com/go/pubsub.v1 instrumentation. +// +// WARNING: this package SHOULD NOT import cloud.google.com/go/pubsub. +// +// The motivation of this package is to support orchestrion, which cannot use the main package because it imports +// the cloud.google.com/go/pubsub package, and since orchestrion modifies the library code itself, +// this would cause an import cycle. package tracing import ( diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go index 8efa05fc58..bc33c09ad9 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go @@ -179,30 +179,6 @@ func TestConsumerChannel(t *testing.T) { } } -/* -to run the integration test locally: - - docker network create confluent - - docker run --rm \ - --name zookeeper \ - --network confluent \ - -p 2181:2181 \ - -e ZOOKEEPER_CLIENT_PORT=2181 \ - confluentinc/cp-zookeeper:5.0.0 - - docker run --rm \ - --name kafka \ - --network confluent \ - -p 9092:9092 \ - -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ - -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ - -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ - -e KAFKA_CREATE_TOPICS=gotest:1:1 \ - -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ - confluentinc/cp-kafka:5.0.0 -*/ - func TestConsumerFunctional(t *testing.T) { for _, tt := range []struct { name string diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go index 4707a1e5ae..d7cc103141 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go @@ -179,30 +179,6 @@ func TestConsumerChannel(t *testing.T) { } } -/* -to run the integration test locally: - - docker network create confluent - - docker run --rm \ - --name zookeeper \ - --network confluent \ - -p 2181:2181 \ - -e ZOOKEEPER_CLIENT_PORT=2181 \ - confluentinc/cp-zookeeper:5.0.0 - - docker run --rm \ - --name kafka \ - --network confluent \ - -p 9092:9092 \ - -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ - -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ - -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ - -e KAFKA_CREATE_TOPICS=gotest:1:1 \ - -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ - confluentinc/cp-kafka:5.0.0 -*/ - func TestConsumerFunctional(t *testing.T) { for _, tt := range []struct { name string diff --git a/contrib/log/slog/slog.go b/contrib/log/slog/slog.go index 1a27186a27..5cb0fcbdc2 100644 --- a/contrib/log/slog/slog.go +++ b/contrib/log/slog/slog.go @@ -42,6 +42,7 @@ type handler struct { func (h *handler) Handle(ctx context.Context, rec slog.Record) error { span, ok := tracer.SpanFromContext(ctx) if ok { + rec = rec.Clone() rec.Add( slog.Uint64(ext.LogKeyTraceID, span.Context().TraceID()), slog.Uint64(ext.LogKeySpanID, span.Context().SpanID()), diff --git a/contrib/log/slog/slog_test.go b/contrib/log/slog/slog_test.go index 5b74691469..86254dde88 100644 --- a/contrib/log/slog/slog_test.go +++ b/contrib/log/slog/slog_test.go @@ -9,6 +9,7 @@ import ( "bytes" "context" "encoding/json" + "io" "log/slog" "strings" "testing" @@ -74,3 +75,47 @@ func TestWrapHandler(t *testing.T) { return WrapHandler(slog.NewJSONHandler(b, nil)) }) } + +// TestRecordClone is a regression test for https://github.com/DataDog/dd-trace-go/issues/2918. +func TestRecordClone(t *testing.T) { + // start a new span + span, ctx := tracer.StartSpanFromContext(context.Background(), "test") + defer span.Finish() + + r := slog.Record{} + gate := func() { + // Calling Handle below should not overwrite this value + r.Add("sentinel-key", "sentinel-value") + } + h := handlerGate{gate, WrapHandler(slog.NewJSONHandler(io.Discard, nil))} + // Up to slog.nAttrsInline (5) attributes are stored in the front array of + // the record. Make sure to add more records than that to trigger the bug. + for i := 0; i < 5*10; i++ { + r.Add("i", i) + } + h.Handle(ctx, r) + + var foundSentinel bool + r.Attrs(func(a slog.Attr) bool { + if a.Key == "sentinel-key" { + foundSentinel = true + return false + } + return true + }) + assert.True(t, foundSentinel) +} + +// handlerGate calls a gate function before calling the underlying handler. This +// allows simulating a concurrent modification of the record that happens after +// Handle is called (and the record has been copied), but before the back array +// of the Record is written to. +type handlerGate struct { + gate func() + slog.Handler +} + +func (h handlerGate) Handle(ctx context.Context, r slog.Record) { + h.gate() + h.Handler.Handle(ctx, r) +} diff --git a/contrib/segmentio/kafka.go.v0/example_test.go b/contrib/segmentio/kafka.go.v0/example_test.go index 11dbd328a3..504b5689a4 100644 --- a/contrib/segmentio/kafka.go.v0/example_test.go +++ b/contrib/segmentio/kafka.go.v0/example_test.go @@ -13,7 +13,7 @@ import ( kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - kafka "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go" ) func ExampleWriter() { diff --git a/contrib/segmentio/kafka.go.v0/headers.go b/contrib/segmentio/kafka.go.v0/headers.go index ce19449d5a..347806d36a 100644 --- a/contrib/segmentio/kafka.go.v0/headers.go +++ b/contrib/segmentio/kafka.go.v0/headers.go @@ -6,49 +6,14 @@ package kafka import ( + "github.com/segmentio/kafka-go" + + "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - - "github.com/segmentio/kafka-go" ) -// A messageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message -type messageCarrier struct { - msg *kafka.Message -} - -var _ interface { - tracer.TextMapReader - tracer.TextMapWriter -} = (*messageCarrier)(nil) - -// ForeachKey conforms to the TextMapReader interface. -func (c messageCarrier) ForeachKey(handler func(key, val string) error) error { - for _, h := range c.msg.Headers { - err := handler(h.Key, string(h.Value)) - if err != nil { - return err - } - } - return nil -} - -// Set implements TextMapWriter -func (c messageCarrier) Set(key, val string) { - // ensure uniqueness of keys - for i := 0; i < len(c.msg.Headers); i++ { - if string(c.msg.Headers[i].Key) == key { - c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...) - i-- - } - } - c.msg.Headers = append(c.msg.Headers, kafka.Header{ - Key: key, - Value: []byte(val), - }) -} - // ExtractSpanContext retrieves the SpanContext from a kafka.Message func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error) { - return tracer.Extract(messageCarrier{&msg}) + return tracer.Extract(tracing.NewMessageCarrier(wrapMessage(&msg))) } diff --git a/contrib/segmentio/kafka.go.v0/internal/tracing/dsm.go b/contrib/segmentio/kafka.go.v0/internal/tracing/dsm.go new file mode 100644 index 0000000000..728308d0ad --- /dev/null +++ b/contrib/segmentio/kafka.go.v0/internal/tracing/dsm.go @@ -0,0 +1,86 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package tracing + +import ( + "context" + + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +func (tr *Tracer) SetConsumeDSMCheckpoint(msg Message) { + if !tr.dataStreamsEnabled || msg == nil { + return + } + edges := []string{"direction:in", "topic:" + msg.GetTopic(), "type:kafka"} + if tr.kafkaCfg.ConsumerGroupID != "" { + edges = append(edges, "group:"+tr.kafkaCfg.ConsumerGroupID) + } + carrier := NewMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams( + datastreams.ExtractFromBase64Carrier(context.Background(), carrier), + options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, + edges..., + ) + if !ok { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) + if tr.kafkaCfg.ConsumerGroupID != "" { + // only track Kafka lag if a consumer group is set. + // since there is no ack mechanism, we consider that messages read are committed right away. + tracer.TrackKafkaCommitOffset(tr.kafkaCfg.ConsumerGroupID, msg.GetTopic(), int32(msg.GetPartition()), msg.GetOffset()) + } +} + +func (tr *Tracer) SetProduceDSMCheckpoint(msg Message, writer Writer) { + if !tr.dataStreamsEnabled || msg == nil { + return + } + + var topic string + if writer.GetTopic() != "" { + topic = writer.GetTopic() + } else { + topic = msg.GetTopic() + } + + edges := []string{"direction:out", "topic:" + topic, "type:kafka"} + carrier := MessageCarrier{msg} + ctx, ok := tracer.SetDataStreamsCheckpointWithParams( + datastreams.ExtractFromBase64Carrier(context.Background(), carrier), + options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, + edges..., + ) + if !ok { + return + } + + // Headers will be dropped if the current protocol does not support them + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func getProducerMsgSize(msg Message) (size int64) { + for _, header := range msg.GetHeaders() { + size += int64(len(header.GetKey()) + len(header.GetValue())) + } + if msg.GetValue() != nil { + size += int64(len(msg.GetValue())) + } + if msg.GetKey() != nil { + size += int64(len(msg.GetKey())) + } + return size +} + +func getConsumerMsgSize(msg Message) (size int64) { + for _, header := range msg.GetHeaders() { + size += int64(len(header.GetKey()) + len(header.GetValue())) + } + return size + int64(len(msg.GetValue())+len(msg.GetKey())) +} diff --git a/contrib/segmentio/kafka.go.v0/internal/tracing/message_carrier.go b/contrib/segmentio/kafka.go.v0/internal/tracing/message_carrier.go new file mode 100644 index 0000000000..f06c40fac1 --- /dev/null +++ b/contrib/segmentio/kafka.go.v0/internal/tracing/message_carrier.go @@ -0,0 +1,52 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package tracing + +import ( + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +// A MessageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message +type MessageCarrier struct { + msg Message +} + +var _ interface { + tracer.TextMapReader + tracer.TextMapWriter +} = (*MessageCarrier)(nil) + +// ForeachKey conforms to the TextMapReader interface. +func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error { + for _, h := range c.msg.GetHeaders() { + err := handler(h.GetKey(), string(h.GetValue())) + if err != nil { + return err + } + } + return nil +} + +// Set implements TextMapWriter +func (c MessageCarrier) Set(key, val string) { + headers := c.msg.GetHeaders() + // ensure uniqueness of keys + for i := 0; i < len(headers); i++ { + if headers[i].GetKey() == key { + headers = append(headers[:i], headers[i+1:]...) + i-- + } + } + headers = append(headers, KafkaHeader{ + Key: key, + Value: []byte(val), + }) + c.msg.SetHeaders(headers) +} + +func NewMessageCarrier(msg Message) MessageCarrier { + return MessageCarrier{msg: msg} +} diff --git a/contrib/segmentio/kafka.go.v0/internal/tracing/tracer.go b/contrib/segmentio/kafka.go.v0/internal/tracing/tracer.go new file mode 100644 index 0000000000..21baa893f3 --- /dev/null +++ b/contrib/segmentio/kafka.go.v0/internal/tracing/tracer.go @@ -0,0 +1,89 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package tracing + +import ( + "math" + + "gopkg.in/DataDog/dd-trace-go.v1/internal" + "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" +) + +const defaultServiceName = "kafka" + +type Tracer struct { + consumerServiceName string + producerServiceName string + consumerSpanName string + producerSpanName string + analyticsRate float64 + dataStreamsEnabled bool + kafkaCfg KafkaConfig +} + +// An Option customizes the Tracer. +type Option func(tr *Tracer) + +func NewTracer(kafkaCfg KafkaConfig, opts ...Option) *Tracer { + tr := &Tracer{ + // analyticsRate: globalConfig.AnalyticsRate(), + analyticsRate: math.NaN(), + kafkaCfg: kafkaCfg, + } + if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) { + tr.analyticsRate = 1.0 + } + + tr.dataStreamsEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false) + + tr.consumerServiceName = namingschema.ServiceName(defaultServiceName) + tr.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName) + tr.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound) + tr.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound) + + for _, opt := range opts { + opt(tr) + } + return tr +} + +// WithServiceName sets the Tracer service name to serviceName. +func WithServiceName(serviceName string) Option { + return func(tr *Tracer) { + tr.consumerServiceName = serviceName + tr.producerServiceName = serviceName + } +} + +// WithAnalytics enables Trace Analytics for all started spans. +func WithAnalytics(on bool) Option { + return func(tr *Tracer) { + if on { + tr.analyticsRate = 1.0 + } else { + tr.analyticsRate = math.NaN() + } + } +} + +// WithAnalyticsRate sets the sampling rate for Trace Analytics events +// correlated to started spans. +func WithAnalyticsRate(rate float64) Option { + return func(tr *Tracer) { + if rate >= 0.0 && rate <= 1.0 { + tr.analyticsRate = rate + } else { + tr.analyticsRate = math.NaN() + } + } +} + +// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/ +func WithDataStreams() Option { + return func(tr *Tracer) { + tr.dataStreamsEnabled = true + } +} diff --git a/contrib/segmentio/kafka.go.v0/option_test.go b/contrib/segmentio/kafka.go.v0/internal/tracing/tracer_test.go similarity index 83% rename from contrib/segmentio/kafka.go.v0/option_test.go rename to contrib/segmentio/kafka.go.v0/internal/tracing/tracer_test.go index f811ca0446..0bf6bedc56 100644 --- a/contrib/segmentio/kafka.go.v0/option_test.go +++ b/contrib/segmentio/kafka.go.v0/internal/tracing/tracer_test.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016 Datadog, Inc. -package kafka +package tracing import ( "math" @@ -16,7 +16,7 @@ import ( func TestAnalyticsSettings(t *testing.T) { t.Run("defaults", func(t *testing.T) { - cfg := newConfig() + cfg := NewTracer(KafkaConfig{}) assert.True(t, math.IsNaN(cfg.analyticsRate)) }) @@ -26,12 +26,12 @@ func TestAnalyticsSettings(t *testing.T) { defer globalconfig.SetAnalyticsRate(rate) globalconfig.SetAnalyticsRate(0.4) - cfg := newConfig() + cfg := NewTracer(KafkaConfig{}) assert.Equal(t, 0.4, cfg.analyticsRate) }) t.Run("enabled", func(t *testing.T) { - cfg := newConfig(WithAnalytics(true)) + cfg := NewTracer(KafkaConfig{}, WithAnalytics(true)) assert.Equal(t, 1.0, cfg.analyticsRate) }) @@ -40,19 +40,19 @@ func TestAnalyticsSettings(t *testing.T) { defer globalconfig.SetAnalyticsRate(rate) globalconfig.SetAnalyticsRate(0.4) - cfg := newConfig(WithAnalyticsRate(0.2)) + cfg := NewTracer(KafkaConfig{}, WithAnalyticsRate(0.2)) assert.Equal(t, 0.2, cfg.analyticsRate) }) t.Run("withEnv", func(t *testing.T) { t.Setenv("DD_DATA_STREAMS_ENABLED", "true") - cfg := newConfig() + cfg := NewTracer(KafkaConfig{}) assert.True(t, cfg.dataStreamsEnabled) }) t.Run("optionOverridesEnv", func(t *testing.T) { t.Setenv("DD_DATA_STREAMS_ENABLED", "false") - cfg := newConfig() + cfg := NewTracer(KafkaConfig{}) WithDataStreams()(cfg) assert.True(t, cfg.dataStreamsEnabled) }) diff --git a/contrib/segmentio/kafka.go.v0/internal/tracing/tracing.go b/contrib/segmentio/kafka.go.v0/internal/tracing/tracing.go new file mode 100644 index 0000000000..9b5f7bbb9b --- /dev/null +++ b/contrib/segmentio/kafka.go.v0/internal/tracing/tracing.go @@ -0,0 +1,92 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +// Package tracing contains tracing logic for the segmentio/kafka-go.v0 instrumentation. +// +// WARNING: this package SHOULD NOT import segmentio/kafka-go. +// +// The motivation of this package is to support orchestrion, which cannot use the main package because it imports +// the segmentio/kafka-go package, and since orchestrion modifies the library code itself, +// this would cause an import cycle. +package tracing + +import ( + "context" + "math" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" +) + +const componentName = "segmentio/kafka.go.v0" + +func init() { + telemetry.LoadIntegration(componentName) + tracer.MarkIntegrationImported("github.com/segmentio/kafka-go") +} + +func (tr *Tracer) StartConsumeSpan(ctx context.Context, msg Message) ddtrace.Span { + opts := []tracer.StartSpanOption{ + tracer.ServiceName(tr.consumerServiceName), + tracer.ResourceName("Consume Topic " + msg.GetTopic()), + tracer.SpanType(ext.SpanTypeMessageConsumer), + tracer.Tag(ext.MessagingKafkaPartition, msg.GetPartition()), + tracer.Tag("offset", msg.GetOffset()), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), + tracer.Tag(ext.KafkaBootstrapServers, tr.kafkaCfg.BootstrapServers), + tracer.Measured(), + } + if !math.IsNaN(tr.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, tr.analyticsRate)) + } + // kafka supports headers, so try to extract a span context + carrier := NewMessageCarrier(msg) + if spanctx, err := tracer.Extract(carrier); err == nil { + opts = append(opts, tracer.ChildOf(spanctx)) + } + span, _ := tracer.StartSpanFromContext(ctx, tr.consumerSpanName, opts...) + // reinject the span context so consumers can pick it up + if err := tracer.Inject(span.Context(), carrier); err != nil { + log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier in reader, %v", err) + } + return span +} + +func (tr *Tracer) StartProduceSpan(ctx context.Context, writer Writer, msg Message, spanOpts ...tracer.StartSpanOption) ddtrace.Span { + opts := []tracer.StartSpanOption{ + tracer.ServiceName(tr.producerServiceName), + tracer.SpanType(ext.SpanTypeMessageProducer), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindProducer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), + tracer.Tag(ext.KafkaBootstrapServers, tr.kafkaCfg.BootstrapServers), + } + if writer.GetTopic() != "" { + opts = append(opts, tracer.ResourceName("Produce Topic "+writer.GetTopic())) + } else { + opts = append(opts, tracer.ResourceName("Produce Topic "+msg.GetTopic())) + } + if !math.IsNaN(tr.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, tr.analyticsRate)) + } + opts = append(opts, spanOpts...) + carrier := NewMessageCarrier(msg) + span, _ := tracer.StartSpanFromContext(ctx, tr.producerSpanName, opts...) + if err := tracer.Inject(span.Context(), carrier); err != nil { + log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier in writer, %v", err) + } + return span +} + +func (*Tracer) FinishProduceSpan(span ddtrace.Span, partition int, offset int64, err error) { + span.SetTag(ext.MessagingKafkaPartition, partition) + span.SetTag("offset", offset) + span.Finish(tracer.WithError(err)) +} diff --git a/contrib/segmentio/kafka.go.v0/internal/tracing/types.go b/contrib/segmentio/kafka.go.v0/internal/tracing/types.go new file mode 100644 index 0000000000..6c17b179f4 --- /dev/null +++ b/contrib/segmentio/kafka.go.v0/internal/tracing/types.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package tracing + +type Header interface { + GetKey() string + GetValue() []byte +} + +type KafkaHeader struct { + Key string + Value []byte +} + +func (h KafkaHeader) GetKey() string { + return h.Key +} + +func (h KafkaHeader) GetValue() []byte { + return h.Value +} + +type Writer interface { + GetTopic() string +} + +type Message interface { + GetValue() []byte + GetKey() []byte + GetHeaders() []Header + SetHeaders([]Header) + GetTopic() string + GetPartition() int + GetOffset() int64 +} + +// KafkaConfig holds information from the kafka config for span tags. +type KafkaConfig struct { + BootstrapServers string + ConsumerGroupID string +} diff --git a/contrib/segmentio/kafka.go.v0/kafka.go b/contrib/segmentio/kafka.go.v0/kafka.go index ffd881dfa1..93af2643af 100644 --- a/contrib/segmentio/kafka.go.v0/kafka.go +++ b/contrib/segmentio/kafka.go.v0/kafka.go @@ -7,25 +7,21 @@ package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka import ( "context" - "math" "strings" - "gopkg.in/DataDog/dd-trace-go.v1/datastreams" - "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" + "github.com/segmentio/kafka-go" + + "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" - "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" - - "github.com/segmentio/kafka-go" ) -const componentName = "segmentio/kafka.go.v0" - -func init() { - telemetry.LoadIntegration(componentName) - tracer.MarkIntegrationImported("github.com/segmentio/kafka-go") +// A Reader wraps a kafka.Reader. +type Reader struct { + *kafka.Reader + tracer *tracing.Tracer + kafkaCfg *tracing.KafkaConfig + prev ddtrace.Span } // NewReader calls kafka.NewReader and wraps the resulting Consumer. @@ -33,74 +29,23 @@ func NewReader(conf kafka.ReaderConfig, opts ...Option) *Reader { return WrapReader(kafka.NewReader(conf), opts...) } -// NewWriter calls kafka.NewWriter and wraps the resulting Producer. -func NewWriter(conf kafka.WriterConfig, opts ...Option) *Writer { - return WrapWriter(kafka.NewWriter(conf), opts...) -} - // WrapReader wraps a kafka.Reader so that any consumed events are traced. func WrapReader(c *kafka.Reader, opts ...Option) *Reader { wrapped := &Reader{ Reader: c, - cfg: newConfig(opts...), } - + kafkaCfg := tracing.KafkaConfig{} if c.Config().Brokers != nil { - wrapped.bootstrapServers = strings.Join(c.Config().Brokers, ",") + kafkaCfg.BootstrapServers = strings.Join(c.Config().Brokers, ",") } - if c.Config().GroupID != "" { - wrapped.groupID = c.Config().GroupID + kafkaCfg.ConsumerGroupID = c.Config().GroupID } - - log.Debug("contrib/segmentio/kafka-go.v0/kafka: Wrapping Reader: %#v", wrapped.cfg) + wrapped.tracer = tracing.NewTracer(kafkaCfg, opts...) + log.Debug("contrib/segmentio/kafka-go.v0/kafka: Wrapping Reader: %#v", wrapped.tracer) return wrapped } -// A kafkaConfig struct holds information from the kafka config for span tags -type kafkaConfig struct { - bootstrapServers string - groupID string -} - -// A Reader wraps a kafka.Reader. -type Reader struct { - *kafka.Reader - kafkaConfig - cfg *config - prev ddtrace.Span -} - -func (r *Reader) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span { - opts := []tracer.StartSpanOption{ - tracer.ServiceName(r.cfg.consumerServiceName), - tracer.ResourceName("Consume Topic " + msg.Topic), - tracer.SpanType(ext.SpanTypeMessageConsumer), - tracer.Tag(ext.MessagingKafkaPartition, msg.Partition), - tracer.Tag("offset", msg.Offset), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), - tracer.Tag(ext.KafkaBootstrapServers, r.bootstrapServers), - tracer.Measured(), - } - - if !math.IsNaN(r.cfg.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, r.cfg.analyticsRate)) - } - // kafka supports headers, so try to extract a span context - carrier := messageCarrier{msg} - if spanctx, err := tracer.Extract(carrier); err == nil { - opts = append(opts, tracer.ChildOf(spanctx)) - } - span, _ := tracer.StartSpanFromContext(ctx, r.cfg.consumerSpanName, opts...) - // reinject the span context so consumers can pick it up - if err := tracer.Inject(span.Context(), carrier); err != nil { - log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier in reader, %v", err) - } - return span -} - // Close calls the underlying Reader.Close and if polling is enabled, finishes // any remaining span. func (r *Reader) Close() error { @@ -122,8 +67,9 @@ func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error) { if err != nil { return kafka.Message{}, err } - r.prev = r.startSpan(ctx, &msg) - setConsumeCheckpoint(r.cfg.dataStreamsEnabled, r.groupID, &msg) + tMsg := wrapMessage(&msg) + r.prev = r.tracer.StartConsumeSpan(ctx, tMsg) + r.tracer.SetConsumeDSMCheckpoint(tMsg) return msg, nil } @@ -137,147 +83,51 @@ func (r *Reader) FetchMessage(ctx context.Context) (kafka.Message, error) { if err != nil { return msg, err } - r.prev = r.startSpan(ctx, &msg) - setConsumeCheckpoint(r.cfg.dataStreamsEnabled, r.groupID, &msg) + tMsg := wrapMessage(&msg) + r.prev = r.tracer.StartConsumeSpan(ctx, tMsg) + r.tracer.SetConsumeDSMCheckpoint(tMsg) return msg, nil } -func setConsumeCheckpoint(enabled bool, groupID string, msg *kafka.Message) { - if !enabled || msg == nil { - return - } - edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"} - if groupID != "" { - edges = append(edges, "group:"+groupID) - } - carrier := messageCarrier{msg} - ctx, ok := tracer.SetDataStreamsCheckpointWithParams( - datastreams.ExtractFromBase64Carrier(context.Background(), carrier), - options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, - edges..., - ) - if !ok { - return - } - datastreams.InjectToBase64Carrier(ctx, carrier) - if groupID != "" { - // only track Kafka lag if a consumer group is set. - // since there is no ack mechanism, we consider that messages read are committed right away. - tracer.TrackKafkaCommitOffset(groupID, msg.Topic, int32(msg.Partition), msg.Offset) - } +// Writer wraps a kafka.Writer with tracing config data +type Writer struct { + *kafka.Writer + tracer *tracing.Tracer +} + +// NewWriter calls kafka.NewWriter and wraps the resulting Producer. +func NewWriter(conf kafka.WriterConfig, opts ...Option) *Writer { + return WrapWriter(kafka.NewWriter(conf), opts...) } // WrapWriter wraps a kafka.Writer so requests are traced. func WrapWriter(w *kafka.Writer, opts ...Option) *Writer { writer := &Writer{ Writer: w, - cfg: newConfig(opts...), } - + kafkaCfg := tracing.KafkaConfig{} if w.Addr.String() != "" { - writer.bootstrapServers = w.Addr.String() + kafkaCfg.BootstrapServers = w.Addr.String() } - log.Debug("contrib/segmentio/kafka.go.v0: Wrapping Writer: %#v", writer.cfg) + writer.tracer = tracing.NewTracer(kafkaCfg, opts...) + log.Debug("contrib/segmentio/kafka.go.v0: Wrapping Writer: %#v", writer.tracer) return writer } -// Writer wraps a kafka.Writer with tracing config data -type Writer struct { - *kafka.Writer - kafkaConfig - cfg *config -} - -func (w *Writer) startSpan(ctx context.Context, msg *kafka.Message) ddtrace.Span { - opts := []tracer.StartSpanOption{ - tracer.ServiceName(w.cfg.producerServiceName), - tracer.SpanType(ext.SpanTypeMessageProducer), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindProducer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), - tracer.Tag(ext.KafkaBootstrapServers, w.bootstrapServers), - } - if w.Writer.Topic != "" { - opts = append(opts, tracer.ResourceName("Produce Topic "+w.Writer.Topic)) - } else { - opts = append(opts, tracer.ResourceName("Produce Topic "+msg.Topic)) - } - if !math.IsNaN(w.cfg.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate)) - } - carrier := messageCarrier{msg} - span, _ := tracer.StartSpanFromContext(ctx, w.cfg.producerSpanName, opts...) - if err := tracer.Inject(span.Context(), carrier); err != nil { - log.Debug("contrib/segmentio/kafka.go.v0: Failed to inject span context into carrier in writer, %v", err) - } - return span -} - -func finishSpan(span ddtrace.Span, partition int, offset int64, err error) { - span.SetTag(ext.MessagingKafkaPartition, partition) - span.SetTag("offset", offset) - span.Finish(tracer.WithError(err)) -} - // WriteMessages calls kafka.go.v0.Writer.WriteMessages and traces the requests. func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error { // although there's only one call made to the SyncProducer, the messages are // treated individually, so we create a span for each one spans := make([]ddtrace.Span, len(msgs)) for i := range msgs { - spans[i] = w.startSpan(ctx, &msgs[i]) - setProduceCheckpoint(w.cfg.dataStreamsEnabled, &msgs[i], w.Writer) + tMsg := wrapMessage(&msgs[i]) + tWriter := wrapTracingWriter(w.Writer) + spans[i] = w.tracer.StartProduceSpan(ctx, tWriter, tMsg) + w.tracer.SetProduceDSMCheckpoint(tMsg, tWriter) } err := w.Writer.WriteMessages(ctx, msgs...) for i, span := range spans { - finishSpan(span, msgs[i].Partition, msgs[i].Offset, err) + w.tracer.FinishProduceSpan(span, msgs[i].Partition, msgs[i].Offset, err) } return err } - -func setProduceCheckpoint(enabled bool, msg *kafka.Message, writer *kafka.Writer) { - if !enabled || msg == nil { - return - } - - var topic string - if writer.Topic != "" { - topic = writer.Topic - } else { - topic = msg.Topic - } - - edges := []string{"direction:out", "topic:" + topic, "type:kafka"} - carrier := messageCarrier{msg} - ctx, ok := tracer.SetDataStreamsCheckpointWithParams( - datastreams.ExtractFromBase64Carrier(context.Background(), carrier), - options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, - edges..., - ) - if !ok { - return - } - - // Headers will be dropped if the current protocol does not support them - datastreams.InjectToBase64Carrier(ctx, carrier) -} - -func getProducerMsgSize(msg *kafka.Message) (size int64) { - for _, header := range msg.Headers { - size += int64(len(header.Key) + len(header.Value)) - } - if msg.Value != nil { - size += int64(len(msg.Value)) - } - if msg.Key != nil { - size += int64(len(msg.Key)) - } - return size -} - -func getConsumerMsgSize(msg *kafka.Message) (size int64) { - for _, header := range msg.Headers { - size += int64(len(header.Key) + len(header.Value)) - } - return size + int64(len(msg.Value)+len(msg.Key)) -} diff --git a/contrib/segmentio/kafka.go.v0/kafka_test.go b/contrib/segmentio/kafka.go.v0/kafka_test.go index 5d33859f7b..ab4e7839c9 100644 --- a/contrib/segmentio/kafka.go.v0/kafka_test.go +++ b/contrib/segmentio/kafka.go.v0/kafka_test.go @@ -7,19 +7,26 @@ package kafka import ( "context" + "errors" + "fmt" + "log" + "net" "os" + "strconv" "testing" "time" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" + "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing" "gopkg.in/DataDog/dd-trace-go.v1/datastreams" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - - kafka "github.com/segmentio/kafka-go" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) const ( @@ -28,44 +35,139 @@ const ( testReaderMaxWait = 10 * time.Millisecond ) -func skipIntegrationTest(t *testing.T) { - if _, ok := os.LookupEnv("INTEGRATION"); !ok { - t.Skip("🚧 Skipping integration test (INTEGRATION environment variable is not set)") +var ( + // add some dummy values to broker/addr to test bootstrap servers. + kafkaBrokers = []string{"localhost:9092", "localhost:9093", "localhost:9094"} +) + +func TestMain(m *testing.M) { + _, ok := os.LookupEnv("INTEGRATION") + if !ok { + log.Println("🚧 Skipping integration test (INTEGRATION environment variable is not set)") + os.Exit(0) } + cleanup := createTopic() + exitCode := m.Run() + cleanup() + os.Exit(exitCode) } -/* -to setup the integration test locally run: - docker-compose -f local_testing.yaml up -*/ +func testWriter() *kafka.Writer { + return &kafka.Writer{ + Addr: kafka.TCP(kafkaBrokers...), + Topic: testTopic, + RequiredAcks: kafka.RequireOne, + Balancer: &kafka.LeastBytes{}, + } +} + +func testReader() *kafka.Reader { + return kafka.NewReader(kafka.ReaderConfig{ + Brokers: kafkaBrokers, + GroupID: testGroupID, + Topic: testTopic, + MaxWait: testReaderMaxWait, + MaxBytes: 10e6, // 10MB + }) +} + +func createTopic() func() { + conn, err := kafka.Dial("tcp", "localhost:9092") + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + log.Fatal(err) + } + controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + log.Fatal(err) + } + if err := controllerConn.DeleteTopics(testTopic); err != nil && !errors.Is(err, kafka.UnknownTopicOrPartition) { + log.Fatalf("failed to delete topic: %v", err) + } + topicConfigs := []kafka.TopicConfig{ + { + Topic: testTopic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + if err := controllerConn.CreateTopics(topicConfigs...); err != nil { + log.Fatal(err) + } + if err := ensureTopicReady(); err != nil { + log.Fatal(err) + } + return func() { + if err := controllerConn.DeleteTopics(testTopic); err != nil { + log.Printf("failed to delete topic: %v", err) + } + if err := controllerConn.Close(); err != nil { + log.Printf("failed to close controller connection: %v", err) + } + } +} + +func ensureTopicReady() error { + const ( + maxRetries = 10 + retryDelay = 100 * time.Millisecond + ) + writer := testWriter() + defer writer.Close() + reader := testReader() + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var ( + retryCount int + err error + ) + for retryCount < maxRetries { + err = writer.WriteMessages(ctx, kafka.Message{Key: []byte("some-key"), Value: []byte("some-value")}) + if err == nil { + break + } + // This error happens sometimes with brand-new topics, as there is a delay between when the topic is created + // on the broker, and when the topic can actually be written to. + if errors.Is(err, kafka.UnknownTopicOrPartition) { + retryCount++ + log.Printf("topic not ready yet, retrying produce in %s (retryCount: %d)\n", retryDelay, retryCount) + time.Sleep(retryDelay) + } + } + if err != nil { + return fmt.Errorf("timeout waiting for topic to be ready: %w", err) + } + // read the message to ensure we don't pollute tests + _, err = reader.ReadMessage(ctx) + if err != nil { + return err + } + return nil +} type readerOpFn func(t *testing.T, r *Reader) func genIntegrationTestSpans(t *testing.T, mt mocktracer.Tracer, writerOp func(t *testing.T, w *Writer), readerOp readerOpFn, writerOpts []Option, readerOpts []Option) ([]mocktracer.Span, []kafka.Message) { - skipIntegrationTest(t) - writtenMessages := []kafka.Message{} - // add some dummy values to broker/addr to test bootstrap servers. - kw := &kafka.Writer{ - Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), - Topic: testTopic, - RequiredAcks: kafka.RequireOne, - Completion: func(messages []kafka.Message, err error) { - writtenMessages = append(writtenMessages, messages...) - }, + kw := testWriter() + kw.Completion = func(messages []kafka.Message, err error) { + writtenMessages = append(writtenMessages, messages...) } w := WrapWriter(kw, writerOpts...) writerOp(t, w) err := w.Close() require.NoError(t, err) - r := NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, - GroupID: testGroupID, - Topic: testTopic, - MaxWait: testReaderMaxWait, - }, readerOpts...) + r := WrapReader(testReader(), readerOpts...) readerOp(t, r) err = r.Close() require.NoError(t, err) @@ -113,8 +215,8 @@ func TestReadMessageFunctional(t *testing.T) { []Option{WithDataStreams()}, ) - assert.Len(t, writtenMessages, len(messagesToWrite)) - assert.Len(t, readMessages, len(messagesToWrite)) + require.Len(t, writtenMessages, len(messagesToWrite)) + require.Len(t, readMessages, len(messagesToWrite)) // producer span s0 := spans[0] @@ -129,7 +231,7 @@ func TestReadMessageFunctional(t *testing.T) { assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s0.Tag(ext.KafkaBootstrapServers)) - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]})) + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), tracing.NewMessageCarrier(wrapMessage(&writtenMessages[0])))) assert.True(t, ok) expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+testTopic, "type:kafka") expected, _ := datastreams.PathwayFromContext(expectedCtx) @@ -149,10 +251,14 @@ func TestReadMessageFunctional(t *testing.T) { assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s1.Tag(ext.KafkaBootstrapServers)) - p, ok = datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&readMessages[0]})) + // context propagation + assert.Equal(t, s0.SpanID(), s1.ParentID(), "consume span should be child of the produce span") + assert.Equal(t, s0.TraceID(), s1.TraceID(), "spans should have the same trace id") + + p, ok = datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), tracing.NewMessageCarrier(wrapMessage(&readMessages[0])))) assert.True(t, ok) expectedCtx, _ = tracer.SetDataStreamsCheckpoint( - datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]}), + datastreams.ExtractFromBase64Carrier(context.Background(), tracing.NewMessageCarrier(wrapMessage(&writtenMessages[0]))), "direction:in", "topic:"+testTopic, "type:kafka", "group:"+testGroupID, ) expected, _ = datastreams.PathwayFromContext(expectedCtx) @@ -209,7 +315,7 @@ func TestFetchMessageFunctional(t *testing.T) { assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s0.Tag(ext.KafkaBootstrapServers)) - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]})) + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), tracing.NewMessageCarrier(wrapMessage(&writtenMessages[0])))) assert.True(t, ok) expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+testTopic, "type:kafka") expected, _ := datastreams.PathwayFromContext(expectedCtx) @@ -229,10 +335,13 @@ func TestFetchMessageFunctional(t *testing.T) { assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s1.Tag(ext.KafkaBootstrapServers)) - p, ok = datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&readMessages[0]})) + // context propagation + assert.Equal(t, s0.SpanID(), s1.ParentID(), "consume span should be child of the produce span") + + p, ok = datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), tracing.NewMessageCarrier(wrapMessage(&readMessages[0])))) assert.True(t, ok) expectedCtx, _ = tracer.SetDataStreamsCheckpoint( - datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]}), + datastreams.ExtractFromBase64Carrier(context.Background(), tracing.NewMessageCarrier(wrapMessage(&writtenMessages[0]))), "direction:in", "topic:"+testTopic, "type:kafka", "group:"+testGroupID, ) expected, _ = datastreams.PathwayFromContext(expectedCtx) @@ -240,6 +349,62 @@ func TestFetchMessageFunctional(t *testing.T) { assert.Equal(t, expected.GetHash(), p.GetHash()) } +func TestProduceMultipleMessages(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + messages := []kafka.Message{ + { + Key: []byte("key1"), + Value: []byte("value1"), + }, + { + Key: []byte("key2"), + Value: []byte("value2"), + }, + { + Key: []byte("key3"), + Value: []byte("value3"), + }, + } + + writer := WrapWriter(testWriter()) + reader := WrapReader(testReader()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err := writer.WriteMessages(ctx, messages...) + require.NoError(t, err) + require.NoError(t, writer.Close()) + + curMsg := 0 + for curMsg < len(messages) { + readMsg, err := reader.ReadMessage(ctx) + require.NoError(t, err) + require.Equal(t, string(messages[curMsg].Key), string(readMsg.Key)) + require.Equal(t, string(messages[curMsg].Value), string(readMsg.Value)) + curMsg++ + } + require.NoError(t, reader.Close()) + + spans := mt.FinishedSpans() + require.Len(t, spans, 6) + + produceSpans := spans[0:3] + consumeSpans := spans[3:6] + for i := 0; i < 3; i++ { + ps := produceSpans[i] + cs := consumeSpans[i] + + assert.Equal(t, "kafka.produce", ps.OperationName(), "wrong produce span name") + assert.Equal(t, "kafka.consume", cs.OperationName(), "wrong consume span name") + assert.Equal(t, cs.ParentID(), ps.SpanID(), "consume span should be child of a produce span") + assert.Equal(t, uint64(0), ps.ParentID(), "produce span should not be child of any span") + assert.Equal(t, cs.TraceID(), ps.TraceID(), "spans should be part of the same trace") + } +} + func TestNamingSchema(t *testing.T) { genSpans := func(t *testing.T, serviceOverride string) []mocktracer.Span { var opts []Option @@ -282,40 +447,50 @@ func TestNamingSchema(t *testing.T) { namingschematest.NewKafkaTest(genSpans)(t) } -func BenchmarkReaderStartSpan(b *testing.B) { - r := NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, - GroupID: testGroupID, - Topic: testTopic, - MaxWait: testReaderMaxWait, - }) +// benchSpan is a package-level variable used to prevent compiler optimisations in the benchmarks below. +var benchSpan ddtrace.Span +func BenchmarkReaderStartSpan(b *testing.B) { + ctx := context.Background() + kafkaCfg := tracing.KafkaConfig{ + BootstrapServers: "localhost:9092,localhost:9093,localhost:9094", + ConsumerGroupID: testGroupID, + } + tr := tracing.NewTracer(kafkaCfg) msg := kafka.Message{ Key: []byte("key1"), Value: []byte("value1"), } + var result ddtrace.Span b.ResetTimer() for n := 0; n < b.N; n++ { - r.startSpan(nil, &msg) + result = tr.StartConsumeSpan(ctx, wrapMessage(&msg)) } + benchSpan = result } func BenchmarkWriterStartSpan(b *testing.B) { + ctx := context.Background() + kafkaCfg := tracing.KafkaConfig{ + BootstrapServers: "localhost:9092,localhost:9093,localhost:9094", + ConsumerGroupID: testGroupID, + } + tr := tracing.NewTracer(kafkaCfg) kw := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: testTopic, RequiredAcks: kafka.RequireOne, } - w := WrapWriter(kw) - msg := kafka.Message{ Key: []byte("key1"), Value: []byte("value1"), } + var result ddtrace.Span b.ResetTimer() for n := 0; n < b.N; n++ { - w.startSpan(nil, &msg) + result = tr.StartProduceSpan(ctx, wrapTracingWriter(kw), wrapMessage(&msg)) } + benchSpan = result } diff --git a/contrib/segmentio/kafka.go.v0/local_testing.yaml b/contrib/segmentio/kafka.go.v0/local_testing.yaml deleted file mode 100644 index 0cc5c8a355..0000000000 --- a/contrib/segmentio/kafka.go.v0/local_testing.yaml +++ /dev/null @@ -1,33 +0,0 @@ -version: "3" -services: - zookeeper: - image: 'bitnami/zookeeper:latest' - ports: - - '2181:2181' - environment: - - ALLOW_ANONYMOUS_LOGIN=yes - kafka: - image: 'bitnami/kafka:2' - ports: - - '9092:9092' - environment: - - KAFKA_BROKER_ID=1 - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - - ALLOW_PLAINTEXT_LISTENER=yes - depends_on: - - zookeeper - create-topics: - image: confluentinc/cp-kafka:5.5.0 - hostname: create-topics - container_name: create-topics - restart: on-failure - depends_on: - - kafka - command: " - bash -c 'sleep 5 && \ - kafka-topics --create --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 --topic gosegtest'" - environment: - KAFKA_BROKER_ID: ignored - KAFKA_ZOOKEEPER_CONNECT: ignored diff --git a/contrib/segmentio/kafka.go.v0/option.go b/contrib/segmentio/kafka.go.v0/option.go index 8b14b7aab9..b31d24e2d5 100644 --- a/contrib/segmentio/kafka.go.v0/option.go +++ b/contrib/segmentio/kafka.go.v0/option.go @@ -6,82 +6,21 @@ package kafka import ( - "math" - - "gopkg.in/DataDog/dd-trace-go.v1/internal" - "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" + "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing" ) -const defaultServiceName = "kafka" - -type config struct { - consumerServiceName string - producerServiceName string - consumerSpanName string - producerSpanName string - analyticsRate float64 - dataStreamsEnabled bool -} - // An Option customizes the config. -type Option func(cfg *config) - -func newConfig(opts ...Option) *config { - cfg := &config{ - // analyticsRate: globalconfig.AnalyticsRate(), - analyticsRate: math.NaN(), - } - if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) { - cfg.analyticsRate = 1.0 - } - - cfg.dataStreamsEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false) - - cfg.consumerServiceName = namingschema.ServiceName(defaultServiceName) - cfg.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName) - cfg.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound) - cfg.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound) - - for _, opt := range opts { - opt(cfg) - } - return cfg -} +type Option = tracing.Option // WithServiceName sets the config service name to serviceName. -func WithServiceName(serviceName string) Option { - return func(cfg *config) { - cfg.consumerServiceName = serviceName - cfg.producerServiceName = serviceName - } -} +var WithServiceName = tracing.WithServiceName // WithAnalytics enables Trace Analytics for all started spans. -func WithAnalytics(on bool) Option { - return func(cfg *config) { - if on { - cfg.analyticsRate = 1.0 - } else { - cfg.analyticsRate = math.NaN() - } - } -} +var WithAnalytics = tracing.WithAnalytics // WithAnalyticsRate sets the sampling rate for Trace Analytics events // correlated to started spans. -func WithAnalyticsRate(rate float64) Option { - return func(cfg *config) { - if rate >= 0.0 && rate <= 1.0 { - cfg.analyticsRate = rate - } else { - cfg.analyticsRate = math.NaN() - } - } -} +var WithAnalyticsRate = tracing.WithAnalyticsRate // WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/ -func WithDataStreams() Option { - return func(cfg *config) { - cfg.dataStreamsEnabled = true - } -} +var WithDataStreams = tracing.WithDataStreams diff --git a/contrib/segmentio/kafka.go.v0/tracing.go b/contrib/segmentio/kafka.go.v0/tracing.go new file mode 100644 index 0000000000..fdf1e84b9b --- /dev/null +++ b/contrib/segmentio/kafka.go.v0/tracing.go @@ -0,0 +1,90 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package kafka + +import ( + "github.com/segmentio/kafka-go" + + "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing" +) + +type wMessage struct { + *kafka.Message +} + +func wrapMessage(msg *kafka.Message) tracing.Message { + if msg == nil { + return nil + } + return &wMessage{msg} +} + +func (w *wMessage) GetValue() []byte { + return w.Value +} + +func (w *wMessage) GetKey() []byte { + return w.Key +} + +func (w *wMessage) GetHeaders() []tracing.Header { + hs := make([]tracing.Header, 0, len(w.Headers)) + for _, h := range w.Headers { + hs = append(hs, wrapHeader(h)) + } + return hs +} + +func (w *wMessage) SetHeaders(headers []tracing.Header) { + hs := make([]kafka.Header, 0, len(headers)) + for _, h := range headers { + hs = append(hs, kafka.Header{ + Key: h.GetKey(), + Value: h.GetValue(), + }) + } + w.Message.Headers = hs +} + +func (w *wMessage) GetTopic() string { + return w.Topic +} + +func (w *wMessage) GetPartition() int { + return w.Partition +} + +func (w *wMessage) GetOffset() int64 { + return w.Offset +} + +type wHeader struct { + kafka.Header +} + +func wrapHeader(h kafka.Header) tracing.Header { + return &wHeader{h} +} + +func (w wHeader) GetKey() string { + return w.Key +} + +func (w wHeader) GetValue() []byte { + return w.Value +} + +type wWriter struct { + *kafka.Writer +} + +func (w *wWriter) GetTopic() string { + return w.Topic +} + +func wrapTracingWriter(w *kafka.Writer) tracing.Writer { + return &wWriter{w} +} diff --git a/docker-compose.yaml b/docker-compose.yaml index 17788504ee..76680be932 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -113,23 +113,24 @@ services: image: memcached:1.5.9 ports: - "11211:11211" - zookeeper: - image: bitnami/zookeeper:latest + kafka: + image: confluentinc/confluent-local:7.5.0 environment: - ALLOW_ANONYMOUS_LOGIN: "yes" - ports: - - "2181:2181" - kafka2: - image: darccio/kafka:2.13-2.8.1 - environment: - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 - #KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT,LISTENER_NAME:PLAINTEXT - KAFKA_CREATE_TOPICS: gotest:1:1,gosegtest:1:1 - KAFKA_BROKER_ID: 1 - depends_on: - - zookeeper + KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9093,BROKER://localhost:9092" + KAFKA_REST_BOOTSTRAP_SERVERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092" + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9094" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT" + KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_BROKER_ID: "1" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" + KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: "1" + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1" + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1" + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0" + KAFKA_NODE_ID: "1" + KAFKA_PROCESS_ROLES: "broker,controller" + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" ports: - "9092:9092" localstack: diff --git a/internal/version/version.go b/internal/version/version.go index 54219cc5ce..b7b24999c0 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -13,7 +13,7 @@ import ( // Tag specifies the current release tag. It needs to be manually // updated. A test checks that the value of Tag never points to a // git tag that is older than HEAD. -const Tag = "v1.69.0-dev" +const Tag = "v1.70.0-dev" // Dissected version number. Filled during init() var (