Skip to content

Commit

Permalink
Merge branch 'main' into otlp-span-event-logs-parent-id
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip authored Oct 1, 2024
2 parents 61019c6 + bd5e341 commit 4db6f73
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 37 deletions.
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ require (
github.com/google/go-cmp v0.6.0
github.com/jaegertracing/jaeger v1.61.0
github.com/json-iterator/go v1.1.12
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.110.0
github.com/pkg/errors v0.9.1
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10
github.com/stretchr/testify v1.9.0
github.com/xeipuuv/gojsonschema v1.2.0
go.elastic.co/apm/v2 v2.6.2
go.elastic.co/fastjson v1.4.0
go.opentelemetry.io/collector/consumer v0.109.0
go.opentelemetry.io/collector/pdata v1.15.0
go.opentelemetry.io/collector/semconv v0.109.0
go.opentelemetry.io/collector/consumer v0.110.0
go.opentelemetry.io/collector/pdata v1.16.0
go.opentelemetry.io/collector/semconv v0.110.0
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/otel/metric v1.30.0
go.opentelemetry.io/otel/sdk v1.30.0
Expand All @@ -25,7 +25,7 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/sync v0.8.0
golang.org/x/tools v0.25.0
google.golang.org/grpc v1.66.2
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
)

Expand All @@ -42,7 +42,7 @@ require (
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.110.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
Expand Down
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0 h1:4VBRgtyh3hHSgAVGgs4bvNwJd0oUGyxVA3eQO2ujNsA=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0/go.mod h1:9MGQCqxdCNBhdD+7QBZ6hH9HipXe5CajMafVKglD5f0=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.109.0 h1:CSYFxtxCBTF7BHbITx3g5ilxsjAI2Mn5nDHotnU4KXg=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.109.0/go.mod h1:D0jbiFn1iOXtc/lfotbBKayP3KWUIYdc00GmTFcsWds=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.110.0 h1:Pn3SxtOswZyyebq7AIuM1FSDNOUW525QjWdgqUzPHLM=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.110.0/go.mod h1:ZjPILhF0GqsPugqe530whfSWKxamiydp7ukaFgM/aEM=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.110.0 h1:WHIl5Y/lmmcl/Ag2hePS4IKzvHinUINSMilJCQipMek=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.110.0/go.mod h1:H4a4Q3092Txvdui31zrl5kUw69YtPhGyBKsidz4/DRs=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -79,12 +79,12 @@ go.elastic.co/apm/v2 v2.6.2 h1:VBplAxgbOgTv+Giw/FS91xJpHYw/q8fz/XKPvqC+7/o=
go.elastic.co/apm/v2 v2.6.2/go.mod h1:33rOXgtHwbgZcDgi6I/GtCSMZQqgxkHC0IQT3gudKvo=
go.elastic.co/fastjson v1.4.0 h1:a4BXUKXZHAzjVOPrqtEx2FDsIRBCMek01vCnrtyutWs=
go.elastic.co/fastjson v1.4.0/go.mod h1:ZD5um63l0/8TIdddZbL2znD83FAr2IckYa3KR7VcdNA=
go.opentelemetry.io/collector/consumer v0.109.0 h1:fdXlJi5Rat/poHPiznM2mLiXjcv1gPy3fyqqeirri58=
go.opentelemetry.io/collector/consumer v0.109.0/go.mod h1:E7PZHnVe1DY9hYy37toNxr9/hnsO7+LmnsixW8akLQI=
go.opentelemetry.io/collector/pdata v1.15.0 h1:q/T1sFpRKJnjDrUsHdJ6mq4uSqViR/f92yvGwDby/gY=
go.opentelemetry.io/collector/pdata v1.15.0/go.mod h1:2wcsTIiLAJSbqBq/XUUYbi+cP+N87d0jEJzmb9nT19U=
go.opentelemetry.io/collector/semconv v0.109.0 h1:6CStOFOVhdrzlHg51kXpcPHRKPh5RtV7z/wz+c1TG1g=
go.opentelemetry.io/collector/semconv v0.109.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A=
go.opentelemetry.io/collector/consumer v0.110.0 h1:CnB83KSFQxhFAbNJwTM0blahg16xa6CyUnIIA5qPMbA=
go.opentelemetry.io/collector/consumer v0.110.0/go.mod h1:WlzhfwDfwKkEa5XFdN5x9+jjp9ZF5EUSmtOgVe69zm0=
go.opentelemetry.io/collector/pdata v1.16.0 h1:g02K8jlRnmQ7TQDuXpdgVL6vIxIVqr5Gbb1qIR27rto=
go.opentelemetry.io/collector/pdata v1.16.0/go.mod h1:YZZJIt2ehxosYf/Y1pbvexjNWsIGNNrzzlCTO9jC1F4=
go.opentelemetry.io/collector/semconv v0.110.0 h1:KHQnOHe3gUz0zsxe8ph9kN5OTypCFD4V+06AiBTfeNk=
go.opentelemetry.io/collector/semconv v0.110.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A=
go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w=
Expand Down Expand Up @@ -143,8 +143,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo=
google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw=
google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 2 additions & 2 deletions input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ func (c *Consumer) convertLogRecord(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
default:
setLabel(replaceDots(k), event, v)
}
Expand Down
14 changes: 14 additions & 0 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package otlp_test
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -532,6 +533,10 @@ func processLogEvents(t *testing.T, logs plog.Logs) modelpb.Batch {
}

func TestConsumerConsumeLogsDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLenNamespace := otlp.MaxDataStreamBytes - len(otlp.DisallowedNamespaceRunes)
maxLenDataset := otlp.MaxDataStreamBytes - len(otlp.DisallowedDatasetRunes)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -567,6 +572,15 @@ func TestConsumerConsumeLogsDataStream(t *testing.T) {
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: otlp.DisallowedDatasetRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedNamespaceRunes + randomString,
expectedDataStreamDataset: strings.Repeat("_", len(otlp.DisallowedDatasetRunes)) + randomString[:maxLenDataset],
expectedDataStreamNamespace: strings.Repeat("_", len(otlp.DisallowedNamespaceRunes)) + randomString[:maxLenNamespace],
},
} {
tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace)
t.Run(tcName, func(t *testing.T) {
Expand Down
44 changes: 39 additions & 5 deletions input/otlp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"regexp"
"strconv"
"strings"
"unicode"

"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/collector/semconv/v1.25.0"
Expand All @@ -30,7 +31,10 @@ import (
)

const (
AgentNameJaeger = "Jaeger"
AgentNameJaeger = "Jaeger"
MaxDataStreamBytes = 100
DisallowedNamespaceRunes = "\\/*?\"<>| ,#:"
DisallowedDatasetRunes = "-\\/*?\"<>| ,#:"
)

var (
Expand Down Expand Up @@ -316,12 +320,12 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent)
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Dataset = v.Str()
out.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Namespace = v.Str()
out.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
default:
if out.Labels == nil {
out.Labels = make(modelpb.Labels)
Expand Down Expand Up @@ -459,12 +463,12 @@ func translateScopeMetadata(scope pcommon.InstrumentationScope, out *modelpb.APM
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Dataset = v.Str()
out.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Namespace = v.Str()
out.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
}
return true
})
Expand Down Expand Up @@ -545,3 +549,33 @@ func setLabel(key string, event *modelpb.APMEvent, v pcommon.Value) {
}
}
}

// Sanitize the datastream fields (dataset, namespace) to apply restrictions
// as outlined in https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
func sanitizeDataStreamDataset(field string) string {
field = strings.Map(replaceReservedRune(DisallowedDatasetRunes), field)
if len(field) > MaxDataStreamBytes {
return field[:MaxDataStreamBytes]
}

return field
}

// Sanitize the datastream fields (dataset, namespace) to apply restrictions
// as outlined in https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
func sanitizeDataStreamNamespace(field string) string {
field = strings.Map(replaceReservedRune(DisallowedNamespaceRunes), field)
if len(field) > MaxDataStreamBytes {
return field[:MaxDataStreamBytes]
}
return field
}

func replaceReservedRune(disallowedRunes string) func(r rune) rune {
return func(r rune) rune {
if strings.ContainsRune(disallowedRunes, r) {
return '_'
}
return unicode.ToLower(r)
}
}
4 changes: 2 additions & 2 deletions input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ func (c *Consumer) handleScopeMetrics(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())

// The below fields are required by the Processes tab of the
// curated Kibana's hostmetrics UI. These fields are
Expand Down
13 changes: 13 additions & 0 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) {
}

func TestConsumeMetricsDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLenNamespace := otlp.MaxDataStreamBytes - len(otlp.DisallowedNamespaceRunes)
maxLenDataset := otlp.MaxDataStreamBytes - len(otlp.DisallowedDatasetRunes)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -764,6 +768,15 @@ func TestConsumeMetricsDataStream(t *testing.T) {
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: otlp.DisallowedDatasetRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedNamespaceRunes + randomString,
expectedDataStreamDataset: strings.Repeat("_", len(otlp.DisallowedDatasetRunes)) + randomString[:maxLenDataset],
expectedDataStreamNamespace: strings.Repeat("_", len(otlp.DisallowedNamespaceRunes)) + randomString[:maxLenNamespace],
},
} {
tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace)
t.Run(tcName, func(t *testing.T) {
Expand Down
20 changes: 10 additions & 10 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,12 @@ func TranslateTransaction(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = stringval
event.DataStream.Dataset = sanitizeDataStreamDataset(stringval)
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = stringval
event.DataStream.Namespace = sanitizeDataStreamNamespace(stringval)
default:
modelpb.Labels(event.Labels).Set(k, stringval)
}
Expand Down Expand Up @@ -824,12 +824,12 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = stringval
event.DataStream.Dataset = sanitizeDataStreamDataset(stringval)
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = stringval
event.DataStream.Namespace = sanitizeDataStreamNamespace(stringval)
default:
setLabel(k, event, v)
}
Expand Down Expand Up @@ -1104,12 +1104,12 @@ func (c *Consumer) convertSpanEvent(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())

default:
setLabel(replaceDots(k), event, v)
Expand Down Expand Up @@ -1145,12 +1145,12 @@ func (c *Consumer) convertSpanEvent(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
default:
k = replaceDots(k)
if isJaeger && k == "message" {
Expand Down Expand Up @@ -1203,12 +1203,12 @@ func (c *Consumer) convertJaegerErrorSpanEvent(event ptrace.SpanEvent, apmEvent
if apmEvent.DataStream == nil {
apmEvent.DataStream = &modelpb.DataStream{}
}
apmEvent.DataStream.Dataset = v.Str()
apmEvent.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if apmEvent.DataStream == nil {
apmEvent.DataStream = &modelpb.DataStream{}
}
apmEvent.DataStream.Namespace = v.Str()
apmEvent.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())

default:
setLabel(replaceDots(k), apmEvent, v)
Expand Down
13 changes: 13 additions & 0 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,10 @@ func TestSpanNetworkAttributes(t *testing.T) {
}

func TestSpanDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLenNamespace := otlp.MaxDataStreamBytes - len(otlp.DisallowedNamespaceRunes)
maxLenDataset := otlp.MaxDataStreamBytes - len(otlp.DisallowedDatasetRunes)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -908,6 +912,15 @@ func TestSpanDataStream(t *testing.T) {
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: otlp.DisallowedDatasetRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedNamespaceRunes + randomString,
expectedDataStreamDataset: strings.Repeat("_", len(otlp.DisallowedDatasetRunes)) + randomString[:maxLenDataset],
expectedDataStreamNamespace: strings.Repeat("_", len(otlp.DisallowedNamespaceRunes)) + randomString[:maxLenNamespace],
},
} {
for _, isTxn := range []bool{false, true} {
tcName := fmt.Sprintf("%s,%s,txn=%v", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace, isTxn)
Expand Down

0 comments on commit 4db6f73

Please sign in to comment.