From 52bb3b75ca1ffd8393e37cc0cbc6c9d53265b676 Mon Sep 17 00:00:00 2001 From: Ebenezer Date: Thu, 19 Dec 2024 13:13:46 +0100 Subject: [PATCH 01/15] Add implementation for v2 storage api `StorageIntegrationV2` supports v2 storage api interfaces as StorageIntegration still supports v1 api Provide a function for extracting flat array of spans from otel traces Signed-off-by: Emmanuel Emonueje Ebenezer --- plugin/storage/integration/integration_v2.go | 41 ++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 plugin/storage/integration/integration_v2.go diff --git a/plugin/storage/integration/integration_v2.go b/plugin/storage/integration/integration_v2.go new file mode 100644 index 00000000000..9a18baaa4fa --- /dev/null +++ b/plugin/storage/integration/integration_v2.go @@ -0,0 +1,41 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/storage_v2/tracestore" +) + +type StorageIntegrationV2 struct { + TraceReader tracestore.Reader +} + +// extractSpansFromTraces returns a slice of spans contained in one otel trace +func extractSpansFromTraces(traces ptrace.Traces) []ptrace.Span { + var allSpans []ptrace.Span + + // Iterate through ResourceSpans + resourceSpans := traces.ResourceSpans() + for i := 0; i < resourceSpans.Len(); i++ { + resourceSpan := resourceSpans.At(i) + + // Iterate through ScopeSpans within ResourceSpans + scopeSpans := resourceSpan.ScopeSpans() + for j := 0; j < scopeSpans.Len(); j++ { + scopeSpan := scopeSpans.At(j) + + // Iterate through Spans within ScopeSpans + spans := scopeSpan.Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + allSpans = append(allSpans, span) + } + } + } + + return allSpans +} From a802b153a739e14ad261704d9738ea9c1957873b Mon Sep 17 00:00:00 2001 From: Emmanuel Emonueje Ebenezer Date: Fri, 20 Dec 2024 11:15:37 +0100 Subject: [PATCH 02/15] Resolve merge conflict Accept combination of change in the file Signed-off-by: Emmanuel Emonueje Ebenezer --- plugin/storage/integration/integration.go | 62 +++++++++++++++++------ 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index c03c6fb911c..c4aa3226db0 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -21,12 +21,15 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) //go:embed fixtures @@ -41,6 +44,7 @@ var fixtures embed.FS // Some implementations may declare multiple tests, with different settings, // and RunAll() under different conditions. type StorageIntegration struct { + StorageIntegrationV2 SpanWriter spanstore.Writer SpanReader spanstore.Reader ArchiveSpanReader spanstore.Reader @@ -79,7 +83,7 @@ type StorageIntegration struct { // the service name is formatted "query##-service". type QueryFixtures struct { Caption string - Query *spanstore.TraceQueryParameters + Query *tracestore.TraceQueryParams ExpectedFixtures []string } @@ -143,7 +147,7 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { var actual []string found := s.waitForCondition(t, func(t *testing.T) bool { var err error - actual, err = s.SpanReader.GetServices(context.Background()) + actual, err = s.TraceReader.GetServices(context.Background()) if err != nil { t.Log(err) return false @@ -154,16 +158,18 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { // If the storage backend returns more services than expected, let's log traces for those t.Log("🛑 Found unexpected services!") for _, service := range actual { - traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{ + iterTraces := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ ServiceName: service, }) + traces, err := iter.FlattenWithErrors(iterTraces) if err != nil { t.Log(err) continue } for _, trace := range traces { - for _, span := range trace.Spans { - t.Logf("span: Service: %s, TraceID: %s, Operation: %s", service, span.TraceID, span.OperationName) + spans := extractSpansFromTraces(trace) + for _, span := range spans { + t.Logf("span: Service: %s, TraceID: %s, Operation: %s", service, span.TraceID(), span.Name()) } } } @@ -213,11 +219,18 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { expected := s.writeLargeTraceWithDuplicateSpanIds(t) expectedTraceID := expected.Spans[0].TraceID + expectedOtelTraceID := expectedTraceID.ToOTELTraceID() var actual *model.Trace found := s.waitForCondition(t, func(_ *testing.T) bool { var err error - actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) + iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedOtelTraceID}) + traces, err := iter.FlattenWithErrors(iterTraces) + + batches := otlp2jaeger.ProtoFromTraces(traces[0]) + for _, batch := range batches { + actual.Spans = append(actual.Spans, batch.Spans...) + } return err == nil && len(actual.Spans) >= len(expected.Spans) }) @@ -242,15 +255,15 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) - var expected []spanstore.Operation + var expected []tracestore.Operation if s.GetOperationsMissingSpanKind { - expected = []spanstore.Operation{ + expected = []tracestore.Operation{ {Name: "example-operation-1"}, {Name: "example-operation-3"}, {Name: "example-operation-4"}, } } else { - expected = []spanstore.Operation{ + expected = []tracestore.Operation{ {Name: "example-operation-1", SpanKind: ""}, {Name: "example-operation-3", SpanKind: "server"}, {Name: "example-operation-4", SpanKind: "client"}, @@ -258,11 +271,11 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { } s.loadParseAndWriteExampleTrace(t) - var actual []spanstore.Operation + var actual []tracestore.Operation found := s.waitForCondition(t, func(t *testing.T) bool { var err error - actual, err = s.SpanReader.GetOperations(context.Background(), - spanstore.OperationQueryParameters{ServiceName: "example-service-1"}) + actual, err = s.TraceReader.GetOperations(context.Background(), + tracestore.OperationQueryParameters{ServiceName: "example-service-1"}) if err != nil { t.Log(err) return false @@ -286,11 +299,18 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { expected := s.loadParseAndWriteExampleTrace(t) expectedTraceID := expected.Spans[0].TraceID + expectedOtelTraceID := expectedTraceID.ToOTELTraceID() var actual *model.Trace found := s.waitForCondition(t, func(t *testing.T) bool { var err error - actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) + iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedOtelTraceID}) + traces, err := iter.FlattenWithErrors(iterTraces) + + batches := otlp2jaeger.ProtoFromTraces(traces[0]) + for _, batch := range batches { + actual.Spans = append(actual.Spans, batch.Spans...) + } if err != nil { t.Log(err) } @@ -342,15 +362,27 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) { } } -func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.TraceQueryParameters, expected []*model.Trace) []*model.Trace { +func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.TraceQueryParams, expected []*model.Trace) []*model.Trace { var traces []*model.Trace found := s.waitForCondition(t, func(t *testing.T) bool { var err error - traces, err = s.SpanReader.FindTraces(context.Background(), query) + iterTraces := s.TraceReader.FindTraces(context.Background(), *query) + tracesFound, err := iter.FlattenWithErrors(iterTraces) if err != nil { t.Log(err) return false } + + for _, traceFound := range tracesFound { + modelTrace := &model.Trace{} + batches := otlp2jaeger.ProtoFromTraces(traceFound) + for _, batch := range batches { + modelTrace.Spans = append(modelTrace.Spans, batch.Spans...) + } + traces = append(traces, modelTrace) + } + + if len(expected) != len(traces) { t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces)) return false From ea8034dda9aa3c4f588e1f913e3d9ec07f804706 Mon Sep 17 00:00:00 2001 From: Ebenezer Date: Thu, 19 Dec 2024 14:02:09 +0100 Subject: [PATCH 03/15] Initialize StorageIntegration with v2 TraceReader for each storage backend Signed-off-by: Ebenezer Signed-off-by: Emmanuel Emonueje Ebenezer --- plugin/storage/integration/badgerstore_test.go | 6 ++++-- plugin/storage/integration/cassandra_test.go | 4 +++- plugin/storage/integration/elasticsearch_test.go | 4 +++- plugin/storage/integration/grpc_test.go | 4 +++- plugin/storage/integration/kafka_test.go | 4 +++- plugin/storage/integration/memstore_test.go | 4 +++- 6 files changed, 19 insertions(+), 7 deletions(-) diff --git a/plugin/storage/integration/badgerstore_test.go b/plugin/storage/integration/badgerstore_test.go index 6bec5b45365..29d68edd4cd 100644 --- a/plugin/storage/integration/badgerstore_test.go +++ b/plugin/storage/integration/badgerstore_test.go @@ -14,6 +14,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/badger" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type BadgerIntegrationStorage struct { @@ -35,9 +36,10 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) { s.SpanWriter, err = s.factory.CreateSpanWriter() require.NoError(t, err) - s.SpanReader, err = s.factory.CreateSpanReader() + spanReader, err := s.factory.CreateSpanReader() require.NoError(t, err) - + s.TraceReader = v1adapter.NewTraceReader(spanReader) + s.SamplingStore, err = s.factory.CreateSamplingStore(0) require.NoError(t, err) } diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index 41d67c94544..559529498d9 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -19,6 +19,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/cassandra" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type CassandraStorageIntegration struct { @@ -74,8 +75,9 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) { var err error s.SpanWriter, err = f.CreateSpanWriter() require.NoError(t, err) - s.SpanReader, err = f.CreateSpanReader() + spanReader, err := f.CreateSpanReader() require.NoError(t, err) + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() require.NoError(t, err) s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index cd5cfe47464..5bc8ea02b16 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) const ( @@ -134,8 +135,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) var err error s.SpanWriter, err = f.CreateSpanWriter() require.NoError(t, err) - s.SpanReader, err = f.CreateSpanReader() + spanReader, err := f.CreateSpanReader() require.NoError(t, err) + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() require.NoError(t, err) s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 8ca6df51ddb..17e58887a1b 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -15,6 +15,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/grpc" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type GRPCStorageIntegrationTestSuite struct { @@ -38,8 +39,9 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) { s.SpanWriter, err = f.CreateSpanWriter() require.NoError(t, err) - s.SpanReader, err = f.CreateSpanReader() + spanReader, err := f.CreateSpanReader() require.NoError(t, err) + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() require.NoError(t, err) s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index 24b2131993d..2b42773ca4a 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -25,6 +25,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/kafka" "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) const defaultLocalKafkaBroker = "127.0.0.1:9092" @@ -91,7 +92,8 @@ func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) { spanConsumer.Start() s.SpanWriter = spanWriter - s.SpanReader = &ingester{traceStore} + spanReader := &ingester{traceStore} + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.CleanUp = func(_ *testing.T) {} s.SkipArchiveTest = true } diff --git a/plugin/storage/integration/memstore_test.go b/plugin/storage/integration/memstore_test.go index 35eaede7517..f910a2f6ed0 100644 --- a/plugin/storage/integration/memstore_test.go +++ b/plugin/storage/integration/memstore_test.go @@ -11,6 +11,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type MemStorageIntegrationTestSuite struct { @@ -24,7 +25,8 @@ func (s *MemStorageIntegrationTestSuite) initialize(_ *testing.T) { store := memory.NewStore() archiveStore := memory.NewStore() s.SamplingStore = memory.NewSamplingStore(2) - s.SpanReader = store + spanReader := store + s.TraceReader = v1adapter.NewTraceReader(spanReader) s.SpanWriter = store s.ArchiveSpanReader = archiveStore s.ArchiveSpanWriter = archiveStore From 51648ab37d5fe36509c20263bfb2820ef5347598 Mon Sep 17 00:00:00 2001 From: Emmanuel Emonueje Ebenezer Date: Sat, 21 Dec 2024 22:18:59 +0100 Subject: [PATCH 04/15] Consume otel trace sequence to produce model traces Signed-off-by: Emmanuel Emonueje Ebenezer --- storage_v2/v1adapter/ptrace_model.go | 64 ++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 storage_v2/v1adapter/ptrace_model.go diff --git a/storage_v2/v1adapter/ptrace_model.go b/storage_v2/v1adapter/ptrace_model.go new file mode 100644 index 00000000000..01db4f4816b --- /dev/null +++ b/storage_v2/v1adapter/ptrace_model.go @@ -0,0 +1,64 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package v1adapter + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" + otel2model "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" +) + +// PTracesSeq2ToModel consumes an otel trace iterator and returns a jaeger model trace. +// +// When necessary, it groups chunks of traces into one single trace +func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) { + var ( + jaegerTraces []*model.Trace + err error + tracesByID map[pcommon.TraceID]*model.Trace + ) + + seqTrace(func(otelTraces []ptrace.Traces, e error) bool { + if e != nil { + err = e + return false + } + + for _, otelTrace := range otelTraces { + spans := modelSpansFromOtelTrace(otelTrace) + for _, span := range spans { + traceId := span.TraceID.ToOTELTraceID() + if _, exists := tracesByID[traceId]; !exists { + tracesByID[traceId] = &model.Trace{} + } + trace := tracesByID[traceId] + trace.Spans = append(trace.Spans, span) + tracesByID[traceId] = trace + } + } + return true + }) + + if err != nil { + return nil, err + } + + for _, trace := range tracesByID { + jaegerTraces = append(jaegerTraces, trace) + } + return jaegerTraces, nil +} + +// modelSpansFromOtelTrace extracts spans from otel traces +func modelSpansFromOtelTrace(otelTrace ptrace.Traces) []*model.Span { + spans := []*model.Span{} + batches := otel2model.ProtoFromTraces(otelTrace) + for _, batch := range batches { + spans = append(spans, batch.Spans...) + } + return spans +} From 5c8bdb7997ab9f83eb55d144d80f3b1aebd8f86b Mon Sep 17 00:00:00 2001 From: Emmanuel Emonueje Ebenezer Date: Sat, 21 Dec 2024 22:23:16 +0100 Subject: [PATCH 05/15] Refactor storage integrationtest methods use `v1adapter`s `PTracesSeq2ToModel` Signed-off-by: Emmanuel Emonueje Ebenezer --- plugin/storage/integration/integration.go | 38 +++++--------------- plugin/storage/integration/integration_v2.go | 34 +++--------------- 2 files changed, 14 insertions(+), 58 deletions(-) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index c4aa3226db0..7dc4f831c6b 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -21,15 +21,14 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) //go:embed fixtures @@ -161,15 +160,14 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { iterTraces := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ ServiceName: service, }) - traces, err := iter.FlattenWithErrors(iterTraces) + traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) if err != nil { t.Log(err) continue } for _, trace := range traces { - spans := extractSpansFromTraces(trace) - for _, span := range spans { - t.Logf("span: Service: %s, TraceID: %s, Operation: %s", service, span.TraceID(), span.Name()) + for _, span := range trace.Spans { + t.Logf("span: Service: %s, TraceID: %s, Operation: %s", service, span.TraceID, span.OperationName) } } } @@ -225,12 +223,8 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { found := s.waitForCondition(t, func(_ *testing.T) bool { var err error iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedOtelTraceID}) - traces, err := iter.FlattenWithErrors(iterTraces) - - batches := otlp2jaeger.ProtoFromTraces(traces[0]) - for _, batch := range batches { - actual.Spans = append(actual.Spans, batch.Spans...) - } + traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) + actual = traces[0] return err == nil && len(actual.Spans) >= len(expected.Spans) }) @@ -305,15 +299,11 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { found := s.waitForCondition(t, func(t *testing.T) bool { var err error iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedOtelTraceID}) - traces, err := iter.FlattenWithErrors(iterTraces) - - batches := otlp2jaeger.ProtoFromTraces(traces[0]) - for _, batch := range batches { - actual.Spans = append(actual.Spans, batch.Spans...) - } + traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) if err != nil { t.Log(err) } + actual = traces[0] return err == nil && len(actual.Spans) == len(expected.Spans) }) if !assert.True(t, found) { @@ -367,21 +357,11 @@ func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.T found := s.waitForCondition(t, func(t *testing.T) bool { var err error iterTraces := s.TraceReader.FindTraces(context.Background(), *query) - tracesFound, err := iter.FlattenWithErrors(iterTraces) + traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) if err != nil { t.Log(err) return false } - - for _, traceFound := range tracesFound { - modelTrace := &model.Trace{} - batches := otlp2jaeger.ProtoFromTraces(traceFound) - for _, batch := range batches { - modelTrace.Spans = append(modelTrace.Spans, batch.Spans...) - } - traces = append(traces, modelTrace) - } - if len(expected) != len(traces) { t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces)) diff --git a/plugin/storage/integration/integration_v2.go b/plugin/storage/integration/integration_v2.go index 9a18baaa4fa..6f11b089b8b 100644 --- a/plugin/storage/integration/integration_v2.go +++ b/plugin/storage/integration/integration_v2.go @@ -5,37 +5,13 @@ package integration import ( - "go.opentelemetry.io/collector/pdata/ptrace" - "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) +// StorageIntegrationV2 performs same functions as StorageIntegration +// +// However StorageIntegrationV2 upgrades storage integration tests +// to storage v2 tests type StorageIntegrationV2 struct { TraceReader tracestore.Reader -} - -// extractSpansFromTraces returns a slice of spans contained in one otel trace -func extractSpansFromTraces(traces ptrace.Traces) []ptrace.Span { - var allSpans []ptrace.Span - - // Iterate through ResourceSpans - resourceSpans := traces.ResourceSpans() - for i := 0; i < resourceSpans.Len(); i++ { - resourceSpan := resourceSpans.At(i) - - // Iterate through ScopeSpans within ResourceSpans - scopeSpans := resourceSpan.ScopeSpans() - for j := 0; j < scopeSpans.Len(); j++ { - scopeSpan := scopeSpans.At(j) - - // Iterate through Spans within ScopeSpans - spans := scopeSpan.Spans() - for k := 0; k < spans.Len(); k++ { - span := spans.At(k) - allSpans = append(allSpans, span) - } - } - } - - return allSpans -} +} \ No newline at end of file From dc74211d41e8bce57621e134a8ac37c968e13fc1 Mon Sep 17 00:00:00 2001 From: Emmanuel Emonueje Ebenezer Date: Tue, 24 Dec 2024 07:38:07 +0100 Subject: [PATCH 06/15] Update `StorageIntegration` span reader with trace reader Upgraded to TraceReader for v2 storage api Signed-off-by: Emmanuel Emonueje Ebenezer --- .../internal/integration/e2e_integration.go | 8 +++++-- .../internal/integration/tailsampling_test.go | 2 +- plugin/storage/integration/integration.go | 21 ++++++++----------- plugin/storage/integration/integration_v2.go | 17 --------------- 4 files changed, 16 insertions(+), 32 deletions(-) delete mode 100644 plugin/storage/integration/integration_v2.go diff --git a/cmd/jaeger/internal/integration/e2e_integration.go b/cmd/jaeger/internal/integration/e2e_integration.go index c8681e4ed7e..9cb44d210ab 100644 --- a/cmd/jaeger/internal/integration/e2e_integration.go +++ b/cmd/jaeger/internal/integration/e2e_integration.go @@ -24,6 +24,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner" "github.com/jaegertracing/jaeger/plugin/storage/integration" "github.com/jaegertracing/jaeger/ports" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) const otlpPort = 4317 @@ -149,8 +150,9 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { s.SpanWriter, err = createSpanWriter(logger, otlpPort) require.NoError(t, err) - s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC) + spanReader, err := createSpanReader(logger, ports.QueryGRPC) require.NoError(t, err) + s.TraceReader = v1adapter.NewTraceReader(spanReader) t.Cleanup(func() { // Call e2eCleanUp to close the SpanReader and SpanWriter gRPC connection. @@ -207,7 +209,9 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool { // e2eCleanUp closes the SpanReader and SpanWriter gRPC connection. // This function should be called after all the tests are finished. func (s *E2EStorageIntegration) e2eCleanUp(t *testing.T) { - require.NoError(t, s.SpanReader.(io.Closer).Close()) + spanReader, err := v1adapter.GetV1Reader(s.TraceReader) + require.NoError(t, err) + require.NoError(t, spanReader.(io.Closer).Close()) require.NoError(t, s.SpanWriter.(io.Closer).Close()) } diff --git a/cmd/jaeger/internal/integration/tailsampling_test.go b/cmd/jaeger/internal/integration/tailsampling_test.go index 6ce27cc649e..d548552c112 100644 --- a/cmd/jaeger/internal/integration/tailsampling_test.go +++ b/cmd/jaeger/internal/integration/tailsampling_test.go @@ -76,7 +76,7 @@ func (ts *TailSamplingIntegration) testTailSamplingProccessor(t *testing.T) { var actual []string assert.Eventually(t, func() bool { var err error - actual, err = ts.SpanReader.GetServices(context.Background()) + actual, err = ts.TraceReader.GetServices(context.Background()) require.NoError(t, err) sort.Strings(actual) return assert.ObjectsAreEqualValues(ts.expectedServices, actual) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 7dc4f831c6b..67975030a08 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -43,9 +43,8 @@ var fixtures embed.FS // Some implementations may declare multiple tests, with different settings, // and RunAll() under different conditions. type StorageIntegration struct { - StorageIntegrationV2 SpanWriter spanstore.Writer - SpanReader spanstore.Reader + TraceReader tracestore.Reader ArchiveSpanReader spanstore.Reader ArchiveSpanWriter spanstore.Writer DependencyWriter dependencystore.Writer @@ -217,13 +216,12 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { expected := s.writeLargeTraceWithDuplicateSpanIds(t) expectedTraceID := expected.Spans[0].TraceID - expectedOtelTraceID := expectedTraceID.ToOTELTraceID() var actual *model.Trace found := s.waitForCondition(t, func(_ *testing.T) bool { - var err error - iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedOtelTraceID}) + iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()}) traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) + require.NotEmpty(t, traces) actual = traces[0] return err == nil && len(actual.Spans) >= len(expected.Spans) }) @@ -293,16 +291,15 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { expected := s.loadParseAndWriteExampleTrace(t) expectedTraceID := expected.Spans[0].TraceID - expectedOtelTraceID := expectedTraceID.ToOTELTraceID() var actual *model.Trace found := s.waitForCondition(t, func(t *testing.T) bool { - var err error - iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedOtelTraceID}) + iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()}) traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) if err != nil { t.Log(err) } + require.NotEmpty(t, traces) actual = traces[0] return err == nil && len(actual.Spans) == len(expected.Spans) }) @@ -312,9 +309,10 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { t.Run("NotFound error", func(t *testing.T) { fakeTraceID := model.TraceID{High: 0, Low: 1} - trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID}) + iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID.ToOTELTraceID()}) + traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) assert.Equal(t, spanstore.ErrTraceNotFound, err) - assert.Nil(t, trace) + assert.Nil(t, traces) }) } @@ -357,12 +355,11 @@ func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.T found := s.waitForCondition(t, func(t *testing.T) bool { var err error iterTraces := s.TraceReader.FindTraces(context.Background(), *query) - traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) + traces, err = v1adapter.PTracesSeq2ToModel(iterTraces) if err != nil { t.Log(err) return false } - if len(expected) != len(traces) { t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces)) return false diff --git a/plugin/storage/integration/integration_v2.go b/plugin/storage/integration/integration_v2.go deleted file mode 100644 index 6f11b089b8b..00000000000 --- a/plugin/storage/integration/integration_v2.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. -// SPDX-License-Identifier: Apache-2.0 - -package integration - -import ( - "github.com/jaegertracing/jaeger/storage_v2/tracestore" -) - -// StorageIntegrationV2 performs same functions as StorageIntegration -// -// However StorageIntegrationV2 upgrades storage integration tests -// to storage v2 tests -type StorageIntegrationV2 struct { - TraceReader tracestore.Reader -} \ No newline at end of file From 9ce16ce4011a28259cbf0b5903f32d50df2c6e9d Mon Sep 17 00:00:00 2001 From: Emmanuel Emonueje Ebenezer Date: Tue, 24 Dec 2024 07:41:13 +0100 Subject: [PATCH 07/15] Consume trace iterator trusting the v2 storage api contracts Signed-off-by: Emmanuel Emonueje Ebenezer --- .../{ptrace_model.go => ptrace2model.go} | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) rename storage_v2/v1adapter/{ptrace_model.go => ptrace2model.go} (57%) diff --git a/storage_v2/v1adapter/ptrace_model.go b/storage_v2/v1adapter/ptrace2model.go similarity index 57% rename from storage_v2/v1adapter/ptrace_model.go rename to storage_v2/v1adapter/ptrace2model.go index 01db4f4816b..dd177a71dab 100644 --- a/storage_v2/v1adapter/ptrace_model.go +++ b/storage_v2/v1adapter/ptrace2model.go @@ -4,23 +4,24 @@ package v1adapter import ( - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" + "github.com/jaegertracing/jaeger/storage/spanstore" otel2model "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" ) -// PTracesSeq2ToModel consumes an otel trace iterator and returns a jaeger model trace. +// PTracesSeq2ToModel consumes an otel trace iterator and returns a model trace. // -// When necessary, it groups chunks of traces into one single trace +// When necessary, it groups chunks of a trace into a single model trace func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) { var ( - jaegerTraces []*model.Trace - err error - tracesByID map[pcommon.TraceID]*model.Trace + err error + lastTraceID *model.TraceID + lastTrace *model.Trace ) + jaegerTraces := []*model.Trace{} seqTrace(func(otelTraces []ptrace.Traces, e error) bool { if e != nil { @@ -30,14 +31,17 @@ func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Tr for _, otelTrace := range otelTraces { spans := modelSpansFromOtelTrace(otelTrace) - for _, span := range spans { - traceId := span.TraceID.ToOTELTraceID() - if _, exists := tracesByID[traceId]; !exists { - tracesByID[traceId] = &model.Trace{} - } - trace := tracesByID[traceId] - trace.Spans = append(trace.Spans, span) - tracesByID[traceId] = trace + if len(spans) == 0 { + continue + } + currentTraceID := spans[0].TraceID + if lastTraceID != nil && *lastTraceID == currentTraceID { + lastTrace.Spans = append(lastTrace.Spans, spans...) + } else { + newTrace := &model.Trace{Spans: spans} + lastTraceID = ¤tTraceID + lastTrace = newTrace + jaegerTraces = append(jaegerTraces, lastTrace) } } return true @@ -47,8 +51,8 @@ func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Tr return nil, err } - for _, trace := range tracesByID { - jaegerTraces = append(jaegerTraces, trace) + if len(jaegerTraces) == 0 { + return nil, spanstore.ErrTraceNotFound } return jaegerTraces, nil } @@ -58,7 +62,10 @@ func modelSpansFromOtelTrace(otelTrace ptrace.Traces) []*model.Span { spans := []*model.Span{} batches := otel2model.ProtoFromTraces(otelTrace) for _, batch := range batches { - spans = append(spans, batch.Spans...) + for _, span := range batch.Spans { + span.Process = batch.Process + spans = append(spans, span) + } } return spans } From aa18bc4e150f2490552e54f3225abd4960ae887a Mon Sep 17 00:00:00 2001 From: Emmanuel Emonueje Ebenezer Date: Thu, 26 Dec 2024 00:19:35 +0100 Subject: [PATCH 08/15] Test `PTracesSeq2ToModel` Signed-off-by: Emmanuel Emonueje Ebenezer --- plugin/storage/integration/integration.go | 8 +- storage_v2/v1adapter/ptrace2model.go | 9 +- storage_v2/v1adapter/ptrace2model_test.go | 149 ++++++++++++++++++ .../v1adapter}/trace_compare.go | 2 +- .../v1adapter}/trace_compare_test.go | 2 +- 5 files changed, 161 insertions(+), 9 deletions(-) create mode 100644 storage_v2/v1adapter/ptrace2model_test.go rename {plugin/storage/integration => storage_v2/v1adapter}/trace_compare.go (99%) rename {plugin/storage/integration => storage_v2/v1adapter}/trace_compare_test.go (95%) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 67975030a08..980396cc64b 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -205,7 +205,7 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) { return err == nil && len(actual.Spans) == 1 }) require.True(t, found) - CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual) + v1adapter.CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual) } func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { @@ -227,7 +227,7 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { }) if !assert.True(t, found) { - CompareTraces(t, expected, actual) + v1adapter.CompareTraces(t, expected, actual) return } @@ -304,7 +304,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { return err == nil && len(actual.Spans) == len(expected.Spans) }) if !assert.True(t, found) { - CompareTraces(t, expected, actual) + v1adapter.CompareTraces(t, expected, actual) } t.Run("NotFound error", func(t *testing.T) { @@ -345,7 +345,7 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) { s.skipIfNeeded(t) expected := expectedTracesPerTestCase[i] actual := s.findTracesByQuery(t, queryTestCase.Query, expected) - CompareSliceOfTraces(t, expected, actual) + v1adapter.CompareSliceOfTraces(t, expected, actual) }) } } diff --git a/storage_v2/v1adapter/ptrace2model.go b/storage_v2/v1adapter/ptrace2model.go index dd177a71dab..469b1162d8a 100644 --- a/storage_v2/v1adapter/ptrace2model.go +++ b/storage_v2/v1adapter/ptrace2model.go @@ -12,9 +12,10 @@ import ( otel2model "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" ) -// PTracesSeq2ToModel consumes an otel trace iterator and returns a model trace. +// PTracesSeq2ToModel consumes an iterator seqTrace. When necessary, +// it groups spans from *consecutive* chunks of ptrace.Traces into a single model.Trace // -// When necessary, it groups chunks of a trace into a single model trace +// Returns nil, and spanstore.ErrTraceNotFound for empty iterators func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) { var ( err error @@ -63,7 +64,9 @@ func modelSpansFromOtelTrace(otelTrace ptrace.Traces) []*model.Span { batches := otel2model.ProtoFromTraces(otelTrace) for _, batch := range batches { for _, span := range batch.Spans { - span.Process = batch.Process + span.Process = &model.Process{} + span.Process.ServiceName = batch.GetProcess().GetServiceName() + span.Process.Tags = batch.GetProcess().GetTags() spans = append(spans, span) } } diff --git a/storage_v2/v1adapter/ptrace2model_test.go b/storage_v2/v1adapter/ptrace2model_test.go new file mode 100644 index 00000000000..66d3f7fa515 --- /dev/null +++ b/storage_v2/v1adapter/ptrace2model_test.go @@ -0,0 +1,149 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package v1adapter + +import ( + "errors" + "testing" + "time" + + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/crossdock/crossdock-go/require" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +func TestPTracesSeq2ToModel_SuccessWithSinglePTraceInSeq(t *testing.T) { + var ( + ProcessNoServiceName string = "OTLPResourceNoServiceName" + StartTime time.Time = time.Unix(0, 0) // 1970-01-01T00:00:00Z, matches the default for otel span's start time + ) + + testCases := []struct { + name string + expectedModelTraces []*model.Trace + seqTrace iter.Seq2[[]ptrace.Traces, error] + expectedErr error + }{ + { + name: "seq with one ptrace.Traces", + expectedModelTraces: []*model.Trace{ + { + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(1), + OperationName: "op-success-a", + Process: model.NewProcess(ProcessNoServiceName, nil), + StartTime: StartTime, + }, + }, + }, + }, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + testTrace := ptrace.NewTraces() + rSpans := testTrace.ResourceSpans().AppendEmpty() + sSpans := rSpans.ScopeSpans().AppendEmpty() + spans := sSpans.Spans() + + // Add a new span and set attributes + modelTraceID := model.NewTraceID(2, 3) + span1 := spans.AppendEmpty() + span1.SetTraceID(modelTraceID.ToOTELTraceID()) + span1.SetName("op-success-a") + span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) + + // Yield the test trace + yield([]ptrace.Traces{testTrace}, nil) + }, + expectedErr: nil, + }, + { + name: "seq with two chunks of ptrace.Traces", + expectedModelTraces: []*model.Trace{ + { + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(1), + OperationName: "op-two-chunks-a", + Process: model.NewProcess(ProcessNoServiceName, nil), + StartTime: StartTime, + }, { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(2), + OperationName: "op-two-chunks-b", + Process: model.NewProcess(ProcessNoServiceName, nil), + StartTime: StartTime, + }, + }, + }, + }, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + traceChunk1 := ptrace.NewTraces() + rSpans1 := traceChunk1.ResourceSpans().AppendEmpty() + sSpans1 := rSpans1.ScopeSpans().AppendEmpty() + spans1 := sSpans1.Spans() + modelTraceID := model.NewTraceID(2, 3) + span1 := spans1.AppendEmpty() + span1.SetTraceID(modelTraceID.ToOTELTraceID()) + span1.SetName("op-two-chunks-a") + span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) + + traceChunk2 := ptrace.NewTraces() + rSpans2 := traceChunk2.ResourceSpans().AppendEmpty() + sSpans2 := rSpans2.ScopeSpans().AppendEmpty() + spans2 := sSpans2.Spans() + span2 := spans2.AppendEmpty() + span2.SetTraceID(modelTraceID.ToOTELTraceID()) + span2.SetName("op-two-chunks-b") + span2.SetSpanID(model.NewSpanID(2).ToOTELSpanID()) + // Yield the test trace + yield([]ptrace.Traces{traceChunk1, traceChunk2}, nil) + }, + expectedErr: nil, + }, + { + // a case that occurs when no trace is contained in the iterator + name: "empty seq for ptrace.Traces", + expectedModelTraces: nil, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + }, + expectedErr: spanstore.ErrTraceNotFound, + }, + { + name: "seq of one ptrace and one error", + expectedModelTraces: nil, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + testTrace := ptrace.NewTraces() + rSpans := testTrace.ResourceSpans().AppendEmpty() + sSpans := rSpans.ScopeSpans().AppendEmpty() + spans := sSpans.Spans() + + modelTraceID := model.NewTraceID(2, 3) + span1 := spans.AppendEmpty() + span1.SetTraceID(modelTraceID.ToOTELTraceID()) + span1.SetName("op-error-a") + span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) + + // Yield the test trace + if !yield([]ptrace.Traces{testTrace}, nil) { + return + } + yield(nil, errors.New("unexpected-op-err")) + }, + expectedErr: errors.New("unexpected-op-err"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualTraces, err := PTracesSeq2ToModel(tc.seqTrace) + require.Equal(t, tc.expectedErr, err) + CompareSliceOfTraces(t, tc.expectedModelTraces, actualTraces) + }) + } +} diff --git a/plugin/storage/integration/trace_compare.go b/storage_v2/v1adapter/trace_compare.go similarity index 99% rename from plugin/storage/integration/trace_compare.go rename to storage_v2/v1adapter/trace_compare.go index 053ae8f5a50..0372ffeede7 100644 --- a/plugin/storage/integration/trace_compare.go +++ b/storage_v2/v1adapter/trace_compare.go @@ -2,7 +2,7 @@ // Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 -package integration +package v1adapter import ( "encoding/json" diff --git a/plugin/storage/integration/trace_compare_test.go b/storage_v2/v1adapter/trace_compare_test.go similarity index 95% rename from plugin/storage/integration/trace_compare_test.go rename to storage_v2/v1adapter/trace_compare_test.go index a419d624ea8..01497adc2c3 100644 --- a/plugin/storage/integration/trace_compare_test.go +++ b/storage_v2/v1adapter/trace_compare_test.go @@ -1,7 +1,7 @@ // Copyright (c) 2024 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 -package integration +package v1adapter import ( "testing" From 2d2199a9683afd8669e850f1d5e95ee8953de71f Mon Sep 17 00:00:00 2001 From: Emmanuel Emonueje Ebenezer Date: Thu, 26 Dec 2024 03:29:31 +0100 Subject: [PATCH 09/15] Simplify `pTracesSeq2ToModel` Signed-off-by: Emmanuel Emonueje Ebenezer --- plugin/storage/integration/integration.go | 8 +-- .../storage/integration}/trace_compare.go | 2 +- .../integration}/trace_compare_test.go | 2 +- storage_v2/v1adapter/ptrace2model.go | 59 +++++++------------ storage_v2/v1adapter/ptrace2model_test.go | 23 ++++++-- 5 files changed, 45 insertions(+), 49 deletions(-) rename {storage_v2/v1adapter => plugin/storage/integration}/trace_compare.go (99%) rename {storage_v2/v1adapter => plugin/storage/integration}/trace_compare_test.go (95%) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 980396cc64b..67975030a08 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -205,7 +205,7 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) { return err == nil && len(actual.Spans) == 1 }) require.True(t, found) - v1adapter.CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual) + CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual) } func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { @@ -227,7 +227,7 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { }) if !assert.True(t, found) { - v1adapter.CompareTraces(t, expected, actual) + CompareTraces(t, expected, actual) return } @@ -304,7 +304,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { return err == nil && len(actual.Spans) == len(expected.Spans) }) if !assert.True(t, found) { - v1adapter.CompareTraces(t, expected, actual) + CompareTraces(t, expected, actual) } t.Run("NotFound error", func(t *testing.T) { @@ -345,7 +345,7 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) { s.skipIfNeeded(t) expected := expectedTracesPerTestCase[i] actual := s.findTracesByQuery(t, queryTestCase.Query, expected) - v1adapter.CompareSliceOfTraces(t, expected, actual) + CompareSliceOfTraces(t, expected, actual) }) } } diff --git a/storage_v2/v1adapter/trace_compare.go b/plugin/storage/integration/trace_compare.go similarity index 99% rename from storage_v2/v1adapter/trace_compare.go rename to plugin/storage/integration/trace_compare.go index 0372ffeede7..053ae8f5a50 100644 --- a/storage_v2/v1adapter/trace_compare.go +++ b/plugin/storage/integration/trace_compare.go @@ -2,7 +2,7 @@ // Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 -package v1adapter +package integration import ( "encoding/json" diff --git a/storage_v2/v1adapter/trace_compare_test.go b/plugin/storage/integration/trace_compare_test.go similarity index 95% rename from storage_v2/v1adapter/trace_compare_test.go rename to plugin/storage/integration/trace_compare_test.go index 01497adc2c3..a419d624ea8 100644 --- a/storage_v2/v1adapter/trace_compare_test.go +++ b/plugin/storage/integration/trace_compare_test.go @@ -1,7 +1,7 @@ // Copyright (c) 2024 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 -package v1adapter +package integration import ( "testing" diff --git a/storage_v2/v1adapter/ptrace2model.go b/storage_v2/v1adapter/ptrace2model.go index 469b1162d8a..e4153a45add 100644 --- a/storage_v2/v1adapter/ptrace2model.go +++ b/storage_v2/v1adapter/ptrace2model.go @@ -6,55 +6,35 @@ package v1adapter import ( "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage/spanstore" otel2model "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" ) -// PTracesSeq2ToModel consumes an iterator seqTrace. When necessary, -// it groups spans from *consecutive* chunks of ptrace.Traces into a single model.Trace +// PTracesSeq2ToModel consumes tracesSeq. +// When necessary, it groups spans from *consecutive* chunks of ptrace.Traces into a single model.Trace +// It adheres to the chunking requirement of tracestore.Reader.GetTraces. // +// // Returns nil, and spanstore.ErrTraceNotFound for empty iterators -func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) { - var ( - err error - lastTraceID *model.TraceID - lastTrace *model.Trace - ) +func PTracesSeq2ToModel(tracesSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) { jaegerTraces := []*model.Trace{} - - seqTrace(func(otelTraces []ptrace.Traces, e error) bool { - if e != nil { - err = e - return false - } - - for _, otelTrace := range otelTraces { - spans := modelSpansFromOtelTrace(otelTrace) - if len(spans) == 0 { - continue - } - currentTraceID := spans[0].TraceID - if lastTraceID != nil && *lastTraceID == currentTraceID { - lastTrace.Spans = append(lastTrace.Spans, spans...) - } else { - newTrace := &model.Trace{Spans: spans} - lastTraceID = ¤tTraceID - lastTrace = newTrace - jaegerTraces = append(jaegerTraces, lastTrace) - } - } - return true - }) - + otelTraces, err := iter.CollectWithErrors(jptrace.AggregateTraces(tracesSeq)) if err != nil { return nil, err } - - if len(jaegerTraces) == 0 { + if len(otelTraces) == 0 { return nil, spanstore.ErrTraceNotFound } + + for _, otelTrace := range otelTraces { + jTrace := &model.Trace{ + Spans: modelSpansFromOtelTrace(otelTrace), + } + jaegerTraces = append(jaegerTraces, jTrace) + } return jaegerTraces, nil } @@ -64,9 +44,12 @@ func modelSpansFromOtelTrace(otelTrace ptrace.Traces) []*model.Span { batches := otel2model.ProtoFromTraces(otelTrace) for _, batch := range batches { for _, span := range batch.Spans { - span.Process = &model.Process{} - span.Process.ServiceName = batch.GetProcess().GetServiceName() - span.Process.Tags = batch.GetProcess().GetTags() + if span.Process == nil { + span.Process = &model.Process{ // give each span it's own process, avoid potential side effects from shared Process objects. + ServiceName: batch.Process.ServiceName, + Tags: batch.Process.Tags, + } + } spans = append(spans, span) } } diff --git a/storage_v2/v1adapter/ptrace2model_test.go b/storage_v2/v1adapter/ptrace2model_test.go index 66d3f7fa515..4abcb26b81d 100644 --- a/storage_v2/v1adapter/ptrace2model_test.go +++ b/storage_v2/v1adapter/ptrace2model_test.go @@ -29,7 +29,7 @@ func TestPTracesSeq2ToModel_SuccessWithSinglePTraceInSeq(t *testing.T) { expectedErr error }{ { - name: "seq with one ptrace.Traces", + name: "sequence with one trace", expectedModelTraces: []*model.Trace{ { Spans: []*model.Span{ @@ -62,7 +62,7 @@ func TestPTracesSeq2ToModel_SuccessWithSinglePTraceInSeq(t *testing.T) { expectedErr: nil, }, { - name: "seq with two chunks of ptrace.Traces", + name: "sequence with two chunks of a trace", expectedModelTraces: []*model.Trace{ { Spans: []*model.Span{ @@ -108,14 +108,14 @@ func TestPTracesSeq2ToModel_SuccessWithSinglePTraceInSeq(t *testing.T) { }, { // a case that occurs when no trace is contained in the iterator - name: "empty seq for ptrace.Traces", + name: "empty sequence", expectedModelTraces: nil, seqTrace: func(yield func([]ptrace.Traces, error) bool) { }, expectedErr: spanstore.ErrTraceNotFound, }, { - name: "seq of one ptrace and one error", + name: "sequence containing error", expectedModelTraces: nil, seqTrace: func(yield func([]ptrace.Traces, error) bool) { testTrace := ptrace.NewTraces() @@ -143,7 +143,20 @@ func TestPTracesSeq2ToModel_SuccessWithSinglePTraceInSeq(t *testing.T) { t.Run(tc.name, func(t *testing.T) { actualTraces, err := PTracesSeq2ToModel(tc.seqTrace) require.Equal(t, tc.expectedErr, err) - CompareSliceOfTraces(t, tc.expectedModelTraces, actualTraces) + require.Equal(t, len(tc.expectedModelTraces), len(actualTraces)) + if len(tc.expectedModelTraces) < 1 { + return + } + for i, etrace := range tc.expectedModelTraces { + eSpans := etrace.Spans + aSpans := actualTraces[i].Spans + require.Equal(t, len(eSpans), len(aSpans)) + for j, espan := range eSpans { + require.Equal(t, espan.TraceID, aSpans[j].TraceID) + require.Equal(t, espan.OperationName, aSpans[j].OperationName) + require.Equal(t, espan.Process, aSpans[j].Process) + } + } }) } } From dd00e871f38669c8ff16d5730e1fb280d0e3add0 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 25 Dec 2024 22:53:44 -0500 Subject: [PATCH 10/15] copy object instead of manual construction Signed-off-by: Yuri Shkuro --- storage_v2/v1adapter/ptrace2model.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/storage_v2/v1adapter/ptrace2model.go b/storage_v2/v1adapter/ptrace2model.go index e4153a45add..3151c3d6c55 100644 --- a/storage_v2/v1adapter/ptrace2model.go +++ b/storage_v2/v1adapter/ptrace2model.go @@ -17,7 +17,6 @@ import ( // When necessary, it groups spans from *consecutive* chunks of ptrace.Traces into a single model.Trace // It adheres to the chunking requirement of tracestore.Reader.GetTraces. // -// // Returns nil, and spanstore.ErrTraceNotFound for empty iterators func PTracesSeq2ToModel(tracesSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) { jaegerTraces := []*model.Trace{} @@ -45,10 +44,8 @@ func modelSpansFromOtelTrace(otelTrace ptrace.Traces) []*model.Span { for _, batch := range batches { for _, span := range batch.Spans { if span.Process == nil { - span.Process = &model.Process{ // give each span it's own process, avoid potential side effects from shared Process objects. - ServiceName: batch.Process.ServiceName, - Tags: batch.Process.Tags, - } + proc := *batch.Process // shallow clone + span.Process = &proc } spans = append(spans, span) } From 2a66f86e9b0b00095927cea39e9e636e609f904f Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 25 Dec 2024 23:14:06 -0500 Subject: [PATCH 11/15] refactor and move to translator Signed-off-by: Yuri Shkuro --- .../storage/integration/badgerstore_test.go | 2 +- plugin/storage/integration/integration.go | 10 +- storage_v2/v1adapter/ptrace2model.go | 54 ------ storage_v2/v1adapter/ptrace2model_test.go | 162 ------------------ storage_v2/v1adapter/translator.go | 50 ++++++ storage_v2/v1adapter/translator_test.go | 150 ++++++++++++++++ 6 files changed, 206 insertions(+), 222 deletions(-) delete mode 100644 storage_v2/v1adapter/ptrace2model.go delete mode 100644 storage_v2/v1adapter/ptrace2model_test.go diff --git a/plugin/storage/integration/badgerstore_test.go b/plugin/storage/integration/badgerstore_test.go index 29d68edd4cd..23778fcb87a 100644 --- a/plugin/storage/integration/badgerstore_test.go +++ b/plugin/storage/integration/badgerstore_test.go @@ -39,7 +39,7 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) { spanReader, err := s.factory.CreateSpanReader() require.NoError(t, err) s.TraceReader = v1adapter.NewTraceReader(spanReader) - + s.SamplingStore, err = s.factory.CreateSamplingStore(0) require.NoError(t, err) } diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 67975030a08..defec0cacb6 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -159,7 +159,7 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { iterTraces := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ ServiceName: service, }) - traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) + traces, err := v1adapter.V1TracesFromSeq2(iterTraces) if err != nil { t.Log(err) continue @@ -220,7 +220,7 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { var actual *model.Trace found := s.waitForCondition(t, func(_ *testing.T) bool { iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()}) - traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) + traces, err := v1adapter.V1TracesFromSeq2(iterTraces) require.NotEmpty(t, traces) actual = traces[0] return err == nil && len(actual.Spans) >= len(expected.Spans) @@ -295,7 +295,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { var actual *model.Trace found := s.waitForCondition(t, func(t *testing.T) bool { iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()}) - traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) + traces, err := v1adapter.V1TracesFromSeq2(iterTraces) if err != nil { t.Log(err) } @@ -310,7 +310,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { t.Run("NotFound error", func(t *testing.T) { fakeTraceID := model.TraceID{High: 0, Low: 1} iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID.ToOTELTraceID()}) - traces, err := v1adapter.PTracesSeq2ToModel(iterTraces) + traces, err := v1adapter.V1TracesFromSeq2(iterTraces) assert.Equal(t, spanstore.ErrTraceNotFound, err) assert.Nil(t, traces) }) @@ -355,7 +355,7 @@ func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.T found := s.waitForCondition(t, func(t *testing.T) bool { var err error iterTraces := s.TraceReader.FindTraces(context.Background(), *query) - traces, err = v1adapter.PTracesSeq2ToModel(iterTraces) + traces, err = v1adapter.V1TracesFromSeq2(iterTraces) if err != nil { t.Log(err) return false diff --git a/storage_v2/v1adapter/ptrace2model.go b/storage_v2/v1adapter/ptrace2model.go deleted file mode 100644 index 3151c3d6c55..00000000000 --- a/storage_v2/v1adapter/ptrace2model.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package v1adapter - -import ( - "go.opentelemetry.io/collector/pdata/ptrace" - - "github.com/jaegertracing/jaeger/internal/jptrace" - "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/iter" - "github.com/jaegertracing/jaeger/storage/spanstore" - otel2model "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" -) - -// PTracesSeq2ToModel consumes tracesSeq. -// When necessary, it groups spans from *consecutive* chunks of ptrace.Traces into a single model.Trace -// It adheres to the chunking requirement of tracestore.Reader.GetTraces. -// -// Returns nil, and spanstore.ErrTraceNotFound for empty iterators -func PTracesSeq2ToModel(tracesSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) { - jaegerTraces := []*model.Trace{} - otelTraces, err := iter.CollectWithErrors(jptrace.AggregateTraces(tracesSeq)) - if err != nil { - return nil, err - } - if len(otelTraces) == 0 { - return nil, spanstore.ErrTraceNotFound - } - - for _, otelTrace := range otelTraces { - jTrace := &model.Trace{ - Spans: modelSpansFromOtelTrace(otelTrace), - } - jaegerTraces = append(jaegerTraces, jTrace) - } - return jaegerTraces, nil -} - -// modelSpansFromOtelTrace extracts spans from otel traces -func modelSpansFromOtelTrace(otelTrace ptrace.Traces) []*model.Span { - spans := []*model.Span{} - batches := otel2model.ProtoFromTraces(otelTrace) - for _, batch := range batches { - for _, span := range batch.Spans { - if span.Process == nil { - proc := *batch.Process // shallow clone - span.Process = &proc - } - spans = append(spans, span) - } - } - return spans -} diff --git a/storage_v2/v1adapter/ptrace2model_test.go b/storage_v2/v1adapter/ptrace2model_test.go deleted file mode 100644 index 4abcb26b81d..00000000000 --- a/storage_v2/v1adapter/ptrace2model_test.go +++ /dev/null @@ -1,162 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package v1adapter - -import ( - "errors" - "testing" - "time" - - "go.opentelemetry.io/collector/pdata/ptrace" - - "github.com/crossdock/crossdock-go/require" - "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/iter" - "github.com/jaegertracing/jaeger/storage/spanstore" -) - -func TestPTracesSeq2ToModel_SuccessWithSinglePTraceInSeq(t *testing.T) { - var ( - ProcessNoServiceName string = "OTLPResourceNoServiceName" - StartTime time.Time = time.Unix(0, 0) // 1970-01-01T00:00:00Z, matches the default for otel span's start time - ) - - testCases := []struct { - name string - expectedModelTraces []*model.Trace - seqTrace iter.Seq2[[]ptrace.Traces, error] - expectedErr error - }{ - { - name: "sequence with one trace", - expectedModelTraces: []*model.Trace{ - { - Spans: []*model.Span{ - { - TraceID: model.NewTraceID(2, 3), - SpanID: model.NewSpanID(1), - OperationName: "op-success-a", - Process: model.NewProcess(ProcessNoServiceName, nil), - StartTime: StartTime, - }, - }, - }, - }, - seqTrace: func(yield func([]ptrace.Traces, error) bool) { - testTrace := ptrace.NewTraces() - rSpans := testTrace.ResourceSpans().AppendEmpty() - sSpans := rSpans.ScopeSpans().AppendEmpty() - spans := sSpans.Spans() - - // Add a new span and set attributes - modelTraceID := model.NewTraceID(2, 3) - span1 := spans.AppendEmpty() - span1.SetTraceID(modelTraceID.ToOTELTraceID()) - span1.SetName("op-success-a") - span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) - - // Yield the test trace - yield([]ptrace.Traces{testTrace}, nil) - }, - expectedErr: nil, - }, - { - name: "sequence with two chunks of a trace", - expectedModelTraces: []*model.Trace{ - { - Spans: []*model.Span{ - { - TraceID: model.NewTraceID(2, 3), - SpanID: model.NewSpanID(1), - OperationName: "op-two-chunks-a", - Process: model.NewProcess(ProcessNoServiceName, nil), - StartTime: StartTime, - }, { - TraceID: model.NewTraceID(2, 3), - SpanID: model.NewSpanID(2), - OperationName: "op-two-chunks-b", - Process: model.NewProcess(ProcessNoServiceName, nil), - StartTime: StartTime, - }, - }, - }, - }, - seqTrace: func(yield func([]ptrace.Traces, error) bool) { - traceChunk1 := ptrace.NewTraces() - rSpans1 := traceChunk1.ResourceSpans().AppendEmpty() - sSpans1 := rSpans1.ScopeSpans().AppendEmpty() - spans1 := sSpans1.Spans() - modelTraceID := model.NewTraceID(2, 3) - span1 := spans1.AppendEmpty() - span1.SetTraceID(modelTraceID.ToOTELTraceID()) - span1.SetName("op-two-chunks-a") - span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) - - traceChunk2 := ptrace.NewTraces() - rSpans2 := traceChunk2.ResourceSpans().AppendEmpty() - sSpans2 := rSpans2.ScopeSpans().AppendEmpty() - spans2 := sSpans2.Spans() - span2 := spans2.AppendEmpty() - span2.SetTraceID(modelTraceID.ToOTELTraceID()) - span2.SetName("op-two-chunks-b") - span2.SetSpanID(model.NewSpanID(2).ToOTELSpanID()) - // Yield the test trace - yield([]ptrace.Traces{traceChunk1, traceChunk2}, nil) - }, - expectedErr: nil, - }, - { - // a case that occurs when no trace is contained in the iterator - name: "empty sequence", - expectedModelTraces: nil, - seqTrace: func(yield func([]ptrace.Traces, error) bool) { - }, - expectedErr: spanstore.ErrTraceNotFound, - }, - { - name: "sequence containing error", - expectedModelTraces: nil, - seqTrace: func(yield func([]ptrace.Traces, error) bool) { - testTrace := ptrace.NewTraces() - rSpans := testTrace.ResourceSpans().AppendEmpty() - sSpans := rSpans.ScopeSpans().AppendEmpty() - spans := sSpans.Spans() - - modelTraceID := model.NewTraceID(2, 3) - span1 := spans.AppendEmpty() - span1.SetTraceID(modelTraceID.ToOTELTraceID()) - span1.SetName("op-error-a") - span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) - - // Yield the test trace - if !yield([]ptrace.Traces{testTrace}, nil) { - return - } - yield(nil, errors.New("unexpected-op-err")) - }, - expectedErr: errors.New("unexpected-op-err"), - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - actualTraces, err := PTracesSeq2ToModel(tc.seqTrace) - require.Equal(t, tc.expectedErr, err) - require.Equal(t, len(tc.expectedModelTraces), len(actualTraces)) - if len(tc.expectedModelTraces) < 1 { - return - } - for i, etrace := range tc.expectedModelTraces { - eSpans := etrace.Spans - aSpans := actualTraces[i].Spans - require.Equal(t, len(eSpans), len(aSpans)) - for j, espan := range eSpans { - require.Equal(t, espan.TraceID, aSpans[j].TraceID) - require.Equal(t, espan.OperationName, aSpans[j].OperationName) - require.Equal(t, espan.Process, aSpans[j].Process) - } - } - }) - } -} diff --git a/storage_v2/v1adapter/translator.go b/storage_v2/v1adapter/translator.go index dda96a5630a..30f4001d0a1 100644 --- a/storage_v2/v1adapter/translator.go +++ b/storage_v2/v1adapter/translator.go @@ -10,10 +10,20 @@ import ( "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" + "github.com/jaegertracing/jaeger/storage/spanstore" ) +// V1BatchesFromTraces converts OpenTelemetry traces (ptrace.Traces) +// to Jaeger model batches ([]*model.Batch). +func V1BatchesFromTraces(traces ptrace.Traces) []*model.Batch { + return ProtoFromTraces(traces) +} + // ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces) // to Jaeger model batches ([]*model.Batch). +// +// TODO remove this function in favor of V1BatchesFromTraces func ProtoFromTraces(traces ptrace.Traces) []*model.Batch { batches := jaegerTranslator.ProtoFromTraces(traces) spanMap := createSpanMapFromBatches(batches) @@ -32,6 +42,46 @@ func V1BatchesToTraces(batches []*model.Batch) ptrace.Traces { return traces } +// V1TracesFromSeq2 converts an interator of ptrace.Traces chunks into v1 traces. +// Returns spanstore.ErrTraceNotFound for empty iterators. +func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) { + var ( + jaegerTraces []*model.Trace + iterErr error + ) + jptrace.AggregateTraces(otelSeq)(func(otelTrace ptrace.Traces, err error) bool { + if err != nil { + iterErr = err + return false + } + jaegerTraces = append(jaegerTraces, modelTraceFromOtelTrace(otelTrace)) + return true + }) + if iterErr != nil { + return nil, iterErr + } + if len(jaegerTraces) == 0 { + return nil, spanstore.ErrTraceNotFound + } + return jaegerTraces, nil +} + +// modelTraceFromOtelTrace extracts spans from otel traces +func modelTraceFromOtelTrace(otelTrace ptrace.Traces) *model.Trace { + var spans []*model.Span + batches := V1BatchesFromTraces(otelTrace) + for _, batch := range batches { + for _, span := range batch.Spans { + if span.Process == nil { + proc := *batch.Process // shallow clone + span.Process = &proc + } + spans = append(spans, span) + } + } + return &model.Trace{Spans: spans} +} + func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span { spanMap := make(map[model.SpanID]*model.Span) for _, batch := range batches { diff --git a/storage_v2/v1adapter/translator_test.go b/storage_v2/v1adapter/translator_test.go index e78a71f423f..3aafdd99d8e 100644 --- a/storage_v2/v1adapter/translator_test.go +++ b/storage_v2/v1adapter/translator_test.go @@ -4,14 +4,19 @@ package v1adapter import ( + "errors" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" + "github.com/jaegertracing/jaeger/storage/spanstore" ) func TestProtoFromTraces_AddsWarnings(t *testing.T) { @@ -105,3 +110,148 @@ func TestProtoToTraces_AddsWarnings(t *testing.T) { assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}), span3.SpanID()) assert.Equal(t, []string{"test-warning-3"}, jptrace.GetWarnings(span3)) } + +func TestV1TracesFromSeq2(t *testing.T) { + var ( + ProcessNoServiceName string = "OTLPResourceNoServiceName" + StartTime time.Time = time.Unix(0, 0) // 1970-01-01T00:00:00Z, matches the default for otel span's start time + ) + + testCases := []struct { + name string + expectedModelTraces []*model.Trace + seqTrace iter.Seq2[[]ptrace.Traces, error] + expectedErr error + }{ + { + name: "sequence with one trace", + expectedModelTraces: []*model.Trace{ + { + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(1), + OperationName: "op-success-a", + Process: model.NewProcess(ProcessNoServiceName, nil), + StartTime: StartTime, + }, + }, + }, + }, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + testTrace := ptrace.NewTraces() + rSpans := testTrace.ResourceSpans().AppendEmpty() + sSpans := rSpans.ScopeSpans().AppendEmpty() + spans := sSpans.Spans() + + // Add a new span and set attributes + modelTraceID := model.NewTraceID(2, 3) + span1 := spans.AppendEmpty() + span1.SetTraceID(modelTraceID.ToOTELTraceID()) + span1.SetName("op-success-a") + span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) + + // Yield the test trace + yield([]ptrace.Traces{testTrace}, nil) + }, + expectedErr: nil, + }, + { + name: "sequence with two chunks of a trace", + expectedModelTraces: []*model.Trace{ + { + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(1), + OperationName: "op-two-chunks-a", + Process: model.NewProcess(ProcessNoServiceName, nil), + StartTime: StartTime, + }, { + TraceID: model.NewTraceID(2, 3), + SpanID: model.NewSpanID(2), + OperationName: "op-two-chunks-b", + Process: model.NewProcess(ProcessNoServiceName, nil), + StartTime: StartTime, + }, + }, + }, + }, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + traceChunk1 := ptrace.NewTraces() + rSpans1 := traceChunk1.ResourceSpans().AppendEmpty() + sSpans1 := rSpans1.ScopeSpans().AppendEmpty() + spans1 := sSpans1.Spans() + modelTraceID := model.NewTraceID(2, 3) + span1 := spans1.AppendEmpty() + span1.SetTraceID(modelTraceID.ToOTELTraceID()) + span1.SetName("op-two-chunks-a") + span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) + + traceChunk2 := ptrace.NewTraces() + rSpans2 := traceChunk2.ResourceSpans().AppendEmpty() + sSpans2 := rSpans2.ScopeSpans().AppendEmpty() + spans2 := sSpans2.Spans() + span2 := spans2.AppendEmpty() + span2.SetTraceID(modelTraceID.ToOTELTraceID()) + span2.SetName("op-two-chunks-b") + span2.SetSpanID(model.NewSpanID(2).ToOTELSpanID()) + // Yield the test trace + yield([]ptrace.Traces{traceChunk1, traceChunk2}, nil) + }, + expectedErr: nil, + }, + { + // a case that occurs when no trace is contained in the iterator + name: "empty sequence", + expectedModelTraces: nil, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + }, + expectedErr: spanstore.ErrTraceNotFound, + }, + { + name: "sequence containing error", + expectedModelTraces: nil, + seqTrace: func(yield func([]ptrace.Traces, error) bool) { + testTrace := ptrace.NewTraces() + rSpans := testTrace.ResourceSpans().AppendEmpty() + sSpans := rSpans.ScopeSpans().AppendEmpty() + spans := sSpans.Spans() + + modelTraceID := model.NewTraceID(2, 3) + span1 := spans.AppendEmpty() + span1.SetTraceID(modelTraceID.ToOTELTraceID()) + span1.SetName("op-error-a") + span1.SetSpanID(model.NewSpanID(1).ToOTELSpanID()) + + // Yield the test trace + if !yield([]ptrace.Traces{testTrace}, nil) { + return + } + yield(nil, errors.New("unexpected-op-err")) + }, + expectedErr: errors.New("unexpected-op-err"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualTraces, err := V1TracesFromSeq2(tc.seqTrace) + require.Equal(t, tc.expectedErr, err) + require.Equal(t, len(tc.expectedModelTraces), len(actualTraces)) + if len(tc.expectedModelTraces) < 1 { + return + } + for i, etrace := range tc.expectedModelTraces { + eSpans := etrace.Spans + aSpans := actualTraces[i].Spans + require.Equal(t, len(eSpans), len(aSpans)) + for j, espan := range eSpans { + assert.Equal(t, espan.TraceID, aSpans[j].TraceID) + assert.Equal(t, espan.OperationName, aSpans[j].OperationName) + assert.Equal(t, espan.Process, aSpans[j].Process) + } + } + }) + } +} From 54b4ecf3e1eda6a23e14c7787bba8177c78fe79d Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 25 Dec 2024 23:16:07 -0500 Subject: [PATCH 12/15] move-code Signed-off-by: Yuri Shkuro --- storage_v2/v1adapter/translator.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/storage_v2/v1adapter/translator.go b/storage_v2/v1adapter/translator.go index 30f4001d0a1..d08a3b328d8 100644 --- a/storage_v2/v1adapter/translator.go +++ b/storage_v2/v1adapter/translator.go @@ -17,7 +17,10 @@ import ( // V1BatchesFromTraces converts OpenTelemetry traces (ptrace.Traces) // to Jaeger model batches ([]*model.Batch). func V1BatchesFromTraces(traces ptrace.Traces) []*model.Batch { - return ProtoFromTraces(traces) + batches := jaegerTranslator.ProtoFromTraces(traces) + spanMap := createSpanMapFromBatches(batches) + transferWarningsToModelSpans(traces, spanMap) + return batches } // ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces) @@ -25,10 +28,7 @@ func V1BatchesFromTraces(traces ptrace.Traces) []*model.Batch { // // TODO remove this function in favor of V1BatchesFromTraces func ProtoFromTraces(traces ptrace.Traces) []*model.Batch { - batches := jaegerTranslator.ProtoFromTraces(traces) - spanMap := createSpanMapFromBatches(batches) - transferWarningsToModelSpans(traces, spanMap) - return batches + return V1BatchesFromTraces(traces) } // V1BatchesToTraces converts Jaeger model batches ([]*model.Batch) From a4de5b2a8fc4d7fbd802e95eb4ecb56c8d928905 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 25 Dec 2024 23:29:32 -0500 Subject: [PATCH 13/15] remove not-found-error Signed-off-by: Yuri Shkuro --- plugin/storage/integration/integration.go | 4 ++-- storage_v2/v1adapter/translator.go | 5 ----- storage_v2/v1adapter/translator_test.go | 6 ++---- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index defec0cacb6..625a5b2489d 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -311,8 +311,8 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { fakeTraceID := model.TraceID{High: 0, Low: 1} iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID.ToOTELTraceID()}) traces, err := v1adapter.V1TracesFromSeq2(iterTraces) - assert.Equal(t, spanstore.ErrTraceNotFound, err) - assert.Nil(t, traces) + assert.NoError(t, err) // v2 TraceReader no longer returns an error for not found + assert.Empty(t, traces) }) } diff --git a/storage_v2/v1adapter/translator.go b/storage_v2/v1adapter/translator.go index d08a3b328d8..835bff80437 100644 --- a/storage_v2/v1adapter/translator.go +++ b/storage_v2/v1adapter/translator.go @@ -11,7 +11,6 @@ import ( "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" - "github.com/jaegertracing/jaeger/storage/spanstore" ) // V1BatchesFromTraces converts OpenTelemetry traces (ptrace.Traces) @@ -43,7 +42,6 @@ func V1BatchesToTraces(batches []*model.Batch) ptrace.Traces { } // V1TracesFromSeq2 converts an interator of ptrace.Traces chunks into v1 traces. -// Returns spanstore.ErrTraceNotFound for empty iterators. func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) { var ( jaegerTraces []*model.Trace @@ -60,9 +58,6 @@ func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace if iterErr != nil { return nil, iterErr } - if len(jaegerTraces) == 0 { - return nil, spanstore.ErrTraceNotFound - } return jaegerTraces, nil } diff --git a/storage_v2/v1adapter/translator_test.go b/storage_v2/v1adapter/translator_test.go index 3aafdd99d8e..5f666dfe2f9 100644 --- a/storage_v2/v1adapter/translator_test.go +++ b/storage_v2/v1adapter/translator_test.go @@ -16,7 +16,6 @@ import ( "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" - "github.com/jaegertracing/jaeger/storage/spanstore" ) func TestProtoFromTraces_AddsWarnings(t *testing.T) { @@ -205,9 +204,8 @@ func TestV1TracesFromSeq2(t *testing.T) { // a case that occurs when no trace is contained in the iterator name: "empty sequence", expectedModelTraces: nil, - seqTrace: func(yield func([]ptrace.Traces, error) bool) { - }, - expectedErr: spanstore.ErrTraceNotFound, + seqTrace: func(yield func([]ptrace.Traces, error) bool) {}, + expectedErr: nil, }, { name: "sequence containing error", From 6009579f06859beb4fa3bba6c3322e5abe9fb7d1 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 25 Dec 2024 23:40:31 -0500 Subject: [PATCH 14/15] remove assertions from waitForCondition Signed-off-by: Yuri Shkuro --- plugin/storage/integration/integration.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 625a5b2489d..e9e1df3d6af 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -221,8 +221,9 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { found := s.waitForCondition(t, func(_ *testing.T) bool { iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()}) traces, err := v1adapter.V1TracesFromSeq2(iterTraces) - require.NotEmpty(t, traces) - actual = traces[0] + if len(traces) > 0 { + actual = traces[0] + } return err == nil && len(actual.Spans) >= len(expected.Spans) }) @@ -298,10 +299,12 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { traces, err := v1adapter.V1TracesFromSeq2(iterTraces) if err != nil { t.Log(err) + return false + } + if len(traces) > 0 { + actual = traces[0] } - require.NotEmpty(t, traces) - actual = traces[0] - return err == nil && len(actual.Spans) == len(expected.Spans) + return len(actual.Spans) == len(expected.Spans) }) if !assert.True(t, found) { CompareTraces(t, expected, actual) @@ -311,7 +314,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { fakeTraceID := model.TraceID{High: 0, Low: 1} iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID.ToOTELTraceID()}) traces, err := v1adapter.V1TracesFromSeq2(iterTraces) - assert.NoError(t, err) // v2 TraceReader no longer returns an error for not found + require.NoError(t, err) // v2 TraceReader no longer returns an error for not found assert.Empty(t, traces) }) } From e22f53f6f357df05d390b149b05b7e4a52dbe3e1 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 25 Dec 2024 23:52:50 -0500 Subject: [PATCH 15/15] fix Signed-off-by: Yuri Shkuro --- plugin/storage/integration/integration.go | 4 ++-- storage_v2/v1adapter/translator_test.go | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index e9e1df3d6af..f9e6e9a9a78 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -217,7 +217,7 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { expected := s.writeLargeTraceWithDuplicateSpanIds(t) expectedTraceID := expected.Spans[0].TraceID - var actual *model.Trace + actual := &model.Trace{} // no spans found := s.waitForCondition(t, func(_ *testing.T) bool { iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()}) traces, err := v1adapter.V1TracesFromSeq2(iterTraces) @@ -293,7 +293,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { expected := s.loadParseAndWriteExampleTrace(t) expectedTraceID := expected.Spans[0].TraceID - var actual *model.Trace + actual := &model.Trace{} // no spans found := s.waitForCondition(t, func(t *testing.T) bool { iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()}) traces, err := v1adapter.V1TracesFromSeq2(iterTraces) diff --git a/storage_v2/v1adapter/translator_test.go b/storage_v2/v1adapter/translator_test.go index 5f666dfe2f9..a3b5ca5ab17 100644 --- a/storage_v2/v1adapter/translator_test.go +++ b/storage_v2/v1adapter/translator_test.go @@ -112,8 +112,8 @@ func TestProtoToTraces_AddsWarnings(t *testing.T) { func TestV1TracesFromSeq2(t *testing.T) { var ( - ProcessNoServiceName string = "OTLPResourceNoServiceName" - StartTime time.Time = time.Unix(0, 0) // 1970-01-01T00:00:00Z, matches the default for otel span's start time + processNoServiceName string = "OTLPResourceNoServiceName" + startTime time.Time = time.Unix(0, 0) // 1970-01-01T00:00:00Z, matches the default for otel span's start time ) testCases := []struct { @@ -131,8 +131,8 @@ func TestV1TracesFromSeq2(t *testing.T) { TraceID: model.NewTraceID(2, 3), SpanID: model.NewSpanID(1), OperationName: "op-success-a", - Process: model.NewProcess(ProcessNoServiceName, nil), - StartTime: StartTime, + Process: model.NewProcess(processNoServiceName, nil), + StartTime: startTime, }, }, }, @@ -164,14 +164,14 @@ func TestV1TracesFromSeq2(t *testing.T) { TraceID: model.NewTraceID(2, 3), SpanID: model.NewSpanID(1), OperationName: "op-two-chunks-a", - Process: model.NewProcess(ProcessNoServiceName, nil), - StartTime: StartTime, + Process: model.NewProcess(processNoServiceName, nil), + StartTime: startTime, }, { TraceID: model.NewTraceID(2, 3), SpanID: model.NewSpanID(2), OperationName: "op-two-chunks-b", - Process: model.NewProcess(ProcessNoServiceName, nil), - StartTime: StartTime, + Process: model.NewProcess(processNoServiceName, nil), + StartTime: startTime, }, }, }, @@ -204,7 +204,7 @@ func TestV1TracesFromSeq2(t *testing.T) { // a case that occurs when no trace is contained in the iterator name: "empty sequence", expectedModelTraces: nil, - seqTrace: func(yield func([]ptrace.Traces, error) bool) {}, + seqTrace: func(_ func([]ptrace.Traces, error) bool) {}, expectedErr: nil, }, {