Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][WIP] Span hash sanitizer and enhance span hash adjuster #6499

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
27 changes: 27 additions & 0 deletions cmd/collector/app/sanitizer/hash_sanitizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2022 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package sanitizer

import (
"github.com/jaegertracing/jaeger/model"
)

// AddHashTag creates a sanitizer to add hash field to spans
func AddHashTag() SanitizeSpan {
return hashingSanitizer
}

func hashingSanitizer(span *model.Span) *model.Span {
// Check if hash already exists
if _, found := span.GetHashTag(); found {
return span
}

_, err := span.SetHashTag()
if err != nil {
return span
}

return span
}
67 changes: 67 additions & 0 deletions cmd/collector/app/sanitizer/hash_sanitizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package sanitizer

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
)

func TestHashingSanitizer(t *testing.T) {
sanitizer := AddHashTag()

t.Run("add hash to span without hash tag", func(t *testing.T) {
span := &model.Span{
TraceID: model.TraceID{Low: 1},
SpanID: model.SpanID(2),
}
sanitizedSpan := sanitizer(span)
_, exists := sanitizedSpan.GetHashTag()
assert.True(t, exists, "hash tag should be added")
})

t.Run("preserve existing hash tag", func(t *testing.T) {
span := &model.Span{
TraceID: model.TraceID{Low: 1},
SpanID: model.SpanID(2),
Tags: []model.KeyValue{
{
Key: jptrace.HashAttribute,
VType: model.ValueType_INT64,
VInt64: 12345,
},
},
}
sanitizedSpan := sanitizer(span)
hash, exists := sanitizedSpan.GetHashTag()
assert.True(t, exists, "hash tag should exist")
assert.Equal(t, int64(12345), hash, "hash value should remain unchanged")
})

t.Run("identical spans get same hash", func(t *testing.T) {
span1 := &model.Span{
TraceID: model.TraceID{Low: 1},
SpanID: model.SpanID(2),
}
span2 := &model.Span{
TraceID: model.TraceID{Low: 1},
SpanID: model.SpanID(2),
}

sanitizedSpan1 := sanitizer(span1)
sanitizedSpan2 := sanitizer(span2)

hash1, exists1 := sanitizedSpan1.GetHashTag()
hash2, exists2 := sanitizedSpan2.GetHashTag()

require.True(t, exists1, "hash tag should be added to span1")
require.True(t, exists2, "hash tag should be added to span2")
assert.Equal(t, hash1, hash2, "identical spans should have same hash")
})
}
1 change: 1 addition & 0 deletions cmd/collector/app/sanitizer/sanitizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type SanitizeSpan func(span *model.Span) *model.Span
func NewStandardSanitizers() []SanitizeSpan {
return []SanitizeSpan{
NewEmptyServiceNameSanitizer(),
// AddHashTag(),
}
}

Expand Down
47 changes: 47 additions & 0 deletions cmd/jaeger/internal/sanitizer/hash_sanitizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package sanitizer

import (
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
)

func NewHashingSanitizer() Func {
return hashingSanitizer
}

func hashingSanitizer(traces ptrace.Traces) ptrace.Traces {
resourceSpans := traces.ResourceSpans()
spanHasher := jptrace.NewSpanHasher()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
hashTrace := ptrace.NewTraces()
hashResourceSpan := hashTrace.ResourceSpans().AppendEmpty()
hashScopeSpan := hashResourceSpan.ScopeSpans().AppendEmpty()
hashSpan := hashScopeSpan.Spans().AppendEmpty()
rs.Resource().Attributes().CopyTo(hashResourceSpan.Resource().Attributes())
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
ss.Scope().Attributes().CopyTo(hashScopeSpan.Scope().Attributes())
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
span.CopyTo(hashSpan)
spanHash, err := spanHasher.SpanHash(hashTrace)
if err != nil {
jptrace.AddWarnings(span, err.Error())
}
if spanHash != 0 {
attrs := span.Attributes()
attrs.PutInt(jptrace.HashAttribute, spanHash)
}
}
}
}

return traces
}
56 changes: 56 additions & 0 deletions cmd/jaeger/internal/sanitizer/hash_sanitizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package sanitizer

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
)

func createTestTraces() ptrace.Traces {
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ss := rs.ScopeSpans().AppendEmpty()
span := ss.Spans().AppendEmpty()
span.SetName("test-span")
return traces
}

func TestHashingSanitizer(t *testing.T) {
t.Run("new span without hash", func(t *testing.T) {
traces := createTestTraces()
sanitized := hashingSanitizer(traces)

span := sanitized.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
hashAttr, exists := span.Attributes().Get(jptrace.HashAttribute)

require.True(t, exists, "hash should be added")
fmt.Printf("%v", hashAttr.Type())
assert.Equal(t, pcommon.ValueTypeInt, hashAttr.Type())
})
}

func TestComputeHashCode(t *testing.T) {
t.Run("successful hash computation", func(t *testing.T) {
traces := createTestTraces()
spanHasher := jptrace.NewSpanHasher()

hash, err := spanHasher.SpanHash(traces)
span := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
span.Attributes().PutInt(jptrace.HashAttribute, hash)
require.NoError(t, err)
assert.NotZero(t, hash)

hashAttr, exists := span.Attributes().Get(jptrace.HashAttribute)
require.True(t, exists)
assert.Equal(t, int64(hash), hashAttr.Int())
})
}
1 change: 1 addition & 0 deletions cmd/jaeger/internal/sanitizer/sanitizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func NewStandardSanitizers() []Func {
return []Func{
NewEmptyServiceNameSanitizer(),
NewUTF8Sanitizer(),
NewHashingSanitizer(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/sanitizer/sanitizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func TestNewStandardSanitizers(t *testing.T) {
sanitizers := NewStandardSanitizers()
require.Len(t, sanitizers, 2)
require.Len(t, sanitizers, 3)
}

func TestNewChainedSanitizer(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
Expand Down Expand Up @@ -50,6 +51,7 @@ var (
},
Warnings: []string{},
}
mockHash = int64(12345)
)

type testQueryService struct {
Expand Down Expand Up @@ -130,10 +132,12 @@ func TestGetTraceWithRawTraces(t *testing.T) {
tags: model.KeyValues{
model.String("z", "key"),
model.String("a", "key"),
model.Int64(jptrace.HashAttribute, mockHash),
},
expected: model.KeyValues{
model.String("z", "key"),
model.String("a", "key"),
model.Int64(jptrace.HashAttribute, mockHash),
},
},
{
Expand All @@ -142,8 +146,10 @@ func TestGetTraceWithRawTraces(t *testing.T) {
tags: model.KeyValues{
model.String("z", "key"),
model.String("a", "key"),
model.Int64(jptrace.HashAttribute, mockHash),
},
expected: model.KeyValues{
model.Int64(jptrace.HashAttribute, mockHash),
model.String("a", "key"),
model.String("z", "key"),
},
Expand Down Expand Up @@ -283,10 +289,12 @@ func TestFindTracesWithRawTraces(t *testing.T) {
tags: model.KeyValues{
model.String("z", "key"),
model.String("a", "key"),
model.Int64(jptrace.HashAttribute, mockHash),
},
expected: model.KeyValues{
model.String("z", "key"),
model.String("a", "key"),
model.Int64(jptrace.HashAttribute, mockHash),
},
},
{
Expand All @@ -295,8 +303,10 @@ func TestFindTracesWithRawTraces(t *testing.T) {
tags: model.KeyValues{
model.String("z", "key"),
model.String("a", "key"),
model.Int64(jptrace.HashAttribute, mockHash),
},
expected: model.KeyValues{
model.Int64(jptrace.HashAttribute, mockHash),
model.String("a", "key"),
model.String("z", "key"),
},
Expand Down
41 changes: 12 additions & 29 deletions cmd/query/app/querysvc/v2/adjuster/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
package adjuster

import (
"fmt"
"hash/fnv"

"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
Expand All @@ -24,18 +21,15 @@ var _ Adjuster = (*SpanHashDeduper)(nil)
// To ensure consistent hash codes, this adjuster should be executed after
// SortAttributesAndEvents, which normalizes the order of collections within the span.
func DeduplicateSpans() *SpanHashDeduper {
return &SpanHashDeduper{
marshaler: &ptrace.ProtoMarshaler{},
}
return &SpanHashDeduper{}
}

type SpanHashDeduper struct {
marshaler ptrace.Marshaler
}
type SpanHashDeduper struct{}

func (s *SpanHashDeduper) Adjust(traces ptrace.Traces) {
spansByHash := make(map[uint64]ptrace.Span)
func (SpanHashDeduper) Adjust(traces ptrace.Traces) {
spansByHash := make(map[int64]ptrace.Span)
resourceSpans := traces.ResourceSpans()
spanHasher := jptrace.NewSpanHasher()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
Expand All @@ -52,32 +46,21 @@ func (s *SpanHashDeduper) Adjust(traces ptrace.Traces) {
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
span.CopyTo(hashSpan)
h, err := s.computeHashCode(
hashTrace,
)
spanHash, err := spanHasher.SpanHash(hashTrace)
if err != nil {
jptrace.AddWarnings(span, fmt.Sprintf("failed to compute hash code: %v", err))
jptrace.AddWarnings(span, err.Error())
}
if spanHash == 0 {
span.CopyTo(dedupedSpans.AppendEmpty())
continue
}
if _, ok := spansByHash[h]; !ok {
spansByHash[h] = span

if _, ok := spansByHash[spanHash]; !ok {
spansByHash[spanHash] = span
span.CopyTo(dedupedSpans.AppendEmpty())
}
}
dedupedSpans.CopyTo(spans)
}
}
}

func (s *SpanHashDeduper) computeHashCode(
hashTrace ptrace.Traces,
) (uint64, error) {
b, err := s.marshaler.MarshalTraces(hashTrace)
if err != nil {
return 0, err
}
hasher := fnv.New64a()
hasher.Write(b) // the writer in the Hash interface never returns an error
return hasher.Sum64(), nil
}
14 changes: 2 additions & 12 deletions cmd/query/app/querysvc/v2/adjuster/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,8 @@ func TestSpanHash_DuplicateSpansDifferentResourceAttributes(t *testing.T) {
assert.Equal(t, expected(), i)
}

type errorMarshaler struct{}

func (*errorMarshaler) MarshalTraces(ptrace.Traces) ([]byte, error) {
return nil, assert.AnError
}

func TestSpanHash_ErrorInMarshaler(t *testing.T) {
adjuster := SpanHashDeduper{
marshaler: &errorMarshaler{},
}
adjuster := SpanHashDeduper{}

traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
Expand All @@ -314,7 +306,5 @@ func TestSpanHash_ErrorInMarshaler(t *testing.T) {
assert.Equal(t, "span1", gotSpan.Name())

warnings := jptrace.GetWarnings(gotSpan)
assert.Len(t, warnings, 1)
assert.Contains(t, warnings[0], "failed to compute hash code")
assert.Contains(t, warnings[0], assert.AnError.Error())
assert.Empty(t, warnings)
}
4 changes: 4 additions & 0 deletions internal/jptrace/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ const (
// wire format in which the span was received by Jaeger,
// e.g. proto, thrift, json.
FormatAttribute = "@jaeger@format"
// HashAttrivute is the name of the span attribute where we can
// store hash values of the span . The hash value can be used to
// skip hash computation and used for deduplication of spans
HashAttribute = "@jaeger@hash"
)
Loading
Loading