Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sanitize OTEL datastream #369

Merged
merged 11 commits into from
Sep 26, 2024
4 changes: 2 additions & 2 deletions input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ func (c *Consumer) convertLogRecord(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamField(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamField(v.Str())
default:
setLabel(replaceDots(k), event, v)
}
Expand Down
13 changes: 13 additions & 0 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package otlp_test
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -532,6 +533,9 @@ func processLogEvents(t *testing.T, logs plog.Logs) modelpb.Batch {
}

func TestConsumerConsumeLogsDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLen := otlp.MaxDataStreamBytes - len(otlp.DisallowedDataStreamRunes)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -567,6 +571,15 @@ func TestConsumerConsumeLogsDataStream(t *testing.T) {
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: otlp.DisallowedDataStreamRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedDataStreamRunes + randomString,
expectedDataStreamDataset: strings.Repeat("_", len(otlp.DisallowedDataStreamRunes)) + randomString[:maxLen],
expectedDataStreamNamespace: strings.Repeat("_", len(otlp.DisallowedDataStreamRunes)) + randomString[:maxLen],
},
} {
tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace)
t.Run(tcName, func(t *testing.T) {
Expand Down
30 changes: 25 additions & 5 deletions input/otlp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"regexp"
"strconv"
"strings"
"unicode"

"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/collector/semconv/v1.25.0"
Expand All @@ -30,7 +31,9 @@ import (
)

const (
AgentNameJaeger = "Jaeger"
AgentNameJaeger = "Jaeger"
MaxDataStreamBytes = 100
DisallowedDataStreamRunes = "-\\/*?\"<>| ,#:"
)

var (
Expand Down Expand Up @@ -316,12 +319,12 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent)
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Dataset = v.Str()
out.DataStream.Dataset = sanitizeDataStreamField(v.Str())
case attributeDataStreamNamespace:
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Namespace = v.Str()
out.DataStream.Namespace = sanitizeDataStreamField(v.Str())
default:
if out.Labels == nil {
out.Labels = make(modelpb.Labels)
Expand Down Expand Up @@ -459,12 +462,12 @@ func translateScopeMetadata(scope pcommon.InstrumentationScope, out *modelpb.APM
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Dataset = v.Str()
out.DataStream.Dataset = sanitizeDataStreamField(v.Str())
case attributeDataStreamNamespace:
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Namespace = v.Str()
out.DataStream.Namespace = sanitizeDataStreamField(v.Str())
}
return true
})
Expand Down Expand Up @@ -545,3 +548,20 @@ func setLabel(key string, event *modelpb.APMEvent, v pcommon.Value) {
}
}
}

// Sanitize the datastream fields (dataset, namespace) to apply restrictions
// as outlined in https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
func sanitizeDataStreamField(field string) string {
field = strings.Map(replaceReservedRune, field)
if len(field) > MaxDataStreamBytes {
return field[:MaxDataStreamBytes]
}
return field
}

func replaceReservedRune(r rune) rune {
if strings.ContainsRune(DisallowedDataStreamRunes, r) {
return '_'
}
return unicode.ToLower(r)
}
4 changes: 2 additions & 2 deletions input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ func (c *Consumer) handleScopeMetrics(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamField(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
rubvs marked this conversation as resolved.
Show resolved Hide resolved
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamField(v.Str())

// The below fields are required by the Processes tab of the
// curated Kibana's hostmetrics UI. These fields are
Expand Down
12 changes: 12 additions & 0 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,9 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) {
}

func TestConsumeMetricsDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLen := otlp.MaxDataStreamBytes - len(otlp.DisallowedDataStreamRunes)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -764,6 +767,15 @@ func TestConsumeMetricsDataStream(t *testing.T) {
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: otlp.DisallowedDataStreamRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedDataStreamRunes + randomString,
expectedDataStreamDataset: strings.Repeat("_", len(otlp.DisallowedDataStreamRunes)) + randomString[:maxLen],
expectedDataStreamNamespace: strings.Repeat("_", len(otlp.DisallowedDataStreamRunes)) + randomString[:maxLen],
},
} {
tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace)
t.Run(tcName, func(t *testing.T) {
Expand Down
20 changes: 10 additions & 10 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,12 @@ func TranslateTransaction(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = stringval
event.DataStream.Dataset = sanitizeDataStreamField(stringval)
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = stringval
event.DataStream.Namespace = sanitizeDataStreamField(stringval)
default:
modelpb.Labels(event.Labels).Set(k, stringval)
}
Expand Down Expand Up @@ -824,12 +824,12 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = stringval
event.DataStream.Dataset = sanitizeDataStreamField(stringval)
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = stringval
event.DataStream.Namespace = sanitizeDataStreamField(stringval)
default:
setLabel(k, event, v)
}
Expand Down Expand Up @@ -1103,12 +1103,12 @@ func (c *Consumer) convertSpanEvent(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamField(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamField(v.Str())

default:
setLabel(replaceDots(k), event, v)
Expand Down Expand Up @@ -1144,12 +1144,12 @@ func (c *Consumer) convertSpanEvent(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamField(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamField(v.Str())
default:
k = replaceDots(k)
if isJaeger && k == "message" {
Expand Down Expand Up @@ -1202,12 +1202,12 @@ func (c *Consumer) convertJaegerErrorSpanEvent(event ptrace.SpanEvent, apmEvent
if apmEvent.DataStream == nil {
apmEvent.DataStream = &modelpb.DataStream{}
}
apmEvent.DataStream.Dataset = v.Str()
apmEvent.DataStream.Dataset = sanitizeDataStreamField(v.Str())
case attributeDataStreamNamespace:
if apmEvent.DataStream == nil {
apmEvent.DataStream = &modelpb.DataStream{}
}
apmEvent.DataStream.Namespace = v.Str()
apmEvent.DataStream.Namespace = sanitizeDataStreamField(v.Str())

default:
setLabel(replaceDots(k), apmEvent, v)
Expand Down
12 changes: 12 additions & 0 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,9 @@ func TestSpanNetworkAttributes(t *testing.T) {
}

func TestSpanDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLen := otlp.MaxDataStreamBytes - len(otlp.DisallowedDataStreamRunes)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -908,6 +911,15 @@ func TestSpanDataStream(t *testing.T) {
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: otlp.DisallowedDataStreamRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedDataStreamRunes + randomString,
expectedDataStreamDataset: strings.Repeat("_", len(otlp.DisallowedDataStreamRunes)) + randomString[:maxLen],
expectedDataStreamNamespace: strings.Repeat("_", len(otlp.DisallowedDataStreamRunes)) + randomString[:maxLen],
},
} {
for _, isTxn := range []bool{false, true} {
tcName := fmt.Sprintf("%s,%s,txn=%v", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace, isTxn)
Expand Down