Skip to content

Commit

Permalink
Read span event data_stream.*
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Jan 25, 2024
1 parent 14781c7 commit 664d9de
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
24 changes: 19 additions & 5 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,12 +1088,26 @@ func (c *Consumer) convertSpanEvent(
event.Message = spanEvent.Name()
setLogContext(event, parent)
spanEvent.Attributes().Range(func(k string, v pcommon.Value) bool {
k = replaceDots(k)
if isJaeger && k == "message" {
event.Message = truncate(v.Str())
return true
switch k {
// data_stream.*
case attributeDataStreamDataset:
if event.DataStream == nil {
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Dataset = v.Str()
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Namespace = v.Str()
default:
k = replaceDots(k)
if isJaeger && k == "message" {
event.Message = truncate(v.Str())
return true
}
setLabel(k, event, ifaceAttributeValue(v))
}
setLabel(k, event, ifaceAttributeValue(v))
return true
})
}
Expand Down
30 changes: 30 additions & 0 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,6 +1697,36 @@ func TestSpanCodeStacktrace(t *testing.T) {
})
}

func TestSpanEventsDataStream(t *testing.T) {
for _, isException := range []bool{false, true} {
t.Run(fmt.Sprintf("isException=%v", isException), func(t *testing.T) {
timestamp := time.Unix(123, 0).UTC()

event := ptrace.NewSpanEvent()
event.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
if isException {
event.SetName("exception")
event.Attributes().PutStr("exception.type", "java.net.ConnectException.OSError")
event.Attributes().PutStr("exception.message", "Division by zero")
}

event.Attributes().PutStr("data_stream.dataset", "dataset")
event.Attributes().PutStr("data_stream.namespace", "namespace")

_, events := transformTransactionSpanEvents(t, "java", event)
if isException {
// Exceptions data_stream fields will only be stored as labels instead of actual event.DataStream.
assert.Nil(t, events[0].DataStream)
} else {
assert.Equal(t, &modelpb.DataStream{
Dataset: "dataset",
Namespace: "namespace",
}, events[0].DataStream)
}
})
}
}

func testJaegerLogs() []jaegermodel.Log {
return []jaegermodel.Log{{
// errors that can be converted to elastic errors
Expand Down

0 comments on commit 664d9de

Please sign in to comment.