From b12a82812a34d1b377391fd9b0d41408cf39afd0 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 25 Jan 2024 13:03:26 +0000 Subject: [PATCH] error: inconfigurable dataset & configurable namespace --- input/otlp/traces.go | 14 +++++++++ input/otlp/traces_test.go | 41 +++++++++++++++---------- model/modelprocessor/datastream_test.go | 6 ++++ 3 files changed, 44 insertions(+), 17 deletions(-) diff --git a/input/otlp/traces.go b/input/otlp/traces.go index 8f4c3d8b..d04d7f22 100644 --- a/input/otlp/traces.go +++ b/input/otlp/traces.go @@ -1060,6 +1060,20 @@ func (c *Consumer) convertSpanEvent( exceptionType = v.Str() case "exception.escaped": exceptionEscaped = v.Bool() + + // data_stream.* + // Note: fields are parsed but dataset will be overridden by SetDataStream because it is an error + case attributeDataStreamDataset: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Dataset = v.Str() + case attributeDataStreamNamespace: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Namespace = v.Str() + default: setLabel(replaceDots(k), event, ifaceAttributeValue(v)) } diff --git a/input/otlp/traces_test.go b/input/otlp/traces_test.go index 954473ae..f6d94aed 100644 --- a/input/otlp/traces_test.go +++ b/input/otlp/traces_test.go @@ -1702,27 +1702,34 @@ func TestSpanEventsDataStream(t *testing.T) { t.Run(fmt.Sprintf("isException=%v", isException), func(t *testing.T) { timestamp := time.Unix(123, 0).UTC() - event := ptrace.NewSpanEvent() - event.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + traces, spans := newTracesSpans() + traces.ResourceSpans().At(0).Resource().Attributes().PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + traces.ResourceSpans().At(0).Resource().Attributes().PutStr("data_stream.dataset", "1") + traces.ResourceSpans().At(0).Resource().Attributes().PutStr("data_stream.namespace", "2") + otelSpan := spans.Spans().AppendEmpty() + otelSpan.SetTraceID(pcommon.TraceID{1}) + otelSpan.SetSpanID(pcommon.SpanID{2}) + otelSpan.Attributes().PutStr("data_stream.dataset", "3") + otelSpan.Attributes().PutStr("data_stream.namespace", "4") + + spanEvent := ptrace.NewSpanEvent() + spanEvent.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) if isException { - event.SetName("exception") - event.Attributes().PutStr("exception.type", "java.net.ConnectException.OSError") - event.Attributes().PutStr("exception.message", "Division by zero") + spanEvent.SetName("exception") + spanEvent.Attributes().PutStr("exception.type", "java.net.ConnectException.OSError") + spanEvent.Attributes().PutStr("exception.message", "Division by zero") } - event.Attributes().PutStr("data_stream.dataset", "dataset") - event.Attributes().PutStr("data_stream.namespace", "namespace") + spanEvent.Attributes().PutStr("data_stream.dataset", "5") + spanEvent.Attributes().PutStr("data_stream.namespace", "6") + spanEvent.CopyTo(otelSpan.Events().AppendEmpty()) - _, events := transformTransactionSpanEvents(t, "java", event) - if isException { - // Exceptions data_stream fields will only be stored as labels instead of actual event.DataStream. - assert.Nil(t, events[0].DataStream) - } else { - assert.Equal(t, &modelpb.DataStream{ - Dataset: "dataset", - Namespace: "namespace", - }, events[0].DataStream) - } + allEvents := transformTraces(t, traces) + events := (*allEvents)[1:] + assert.Equal(t, &modelpb.DataStream{ + Dataset: "5", + Namespace: "6", + }, events[0].DataStream) }) } } diff --git a/model/modelprocessor/datastream_test.go b/model/modelprocessor/datastream_test.go index 75013922..572bc544 100644 --- a/model/modelprocessor/datastream_test.go +++ b/model/modelprocessor/datastream_test.go @@ -80,6 +80,12 @@ func TestSetDataStream(t *testing.T) { }, { input: &modelpb.APMEvent{Error: &modelpb.Error{}}, output: &modelpb.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"}, + }, { + input: &modelpb.APMEvent{Error: &modelpb.Error{}, DataStream: &modelpb.DataStream{ + Dataset: "dataset", + Namespace: "namespace", + }}, + output: &modelpb.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "namespace"}, }, { input: &modelpb.APMEvent{Log: &modelpb.Log{}}, output: &modelpb.DataStream{Type: "logs", Dataset: "apm.app.unknown", Namespace: "custom"},