Skip to content

Commit

Permalink
Support 3 levels of data_stream.* with precedence
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Jan 17, 2024
1 parent 4695926 commit 972a634
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 6 deletions.
19 changes: 17 additions & 2 deletions input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,22 @@ func (c *Consumer) convertInstrumentationLibraryLogs(
) {
otelLogs := in.LogRecords()
for i := 0; i < otelLogs.Len(); i++ {
event := c.convertLogRecord(otelLogs.At(i), baseEvent, timeDelta)
event := c.convertLogRecordWithScope(otelLogs.At(i), in.Scope(), baseEvent, timeDelta)
*out = append(*out, event)
}
}

func (c *Consumer) convertLogRecord(
func (c *Consumer) convertLogRecordWithScope(
record plog.LogRecord,
scope pcommon.InstrumentationScope,
baseEvent *modelpb.APMEvent,
timeDelta time.Duration,
) *modelpb.APMEvent {
event := baseEvent.CloneVT()
initEventLabels(event)

translateScopeMetadata(scope, event)

if record.Timestamp() == 0 {
event.Timestamp = modelpb.FromTime(record.ObservedTimestamp().AsTime().Add(timeDelta))
} else {
Expand Down Expand Up @@ -182,6 +186,17 @@ func (c *Consumer) convertLogRecord(
event.Network.Connection = modelpb.NetworkConnectionFromVTPool()
}
event.Network.Connection.Type = v.Str()
// data_stream.*
case attributeDataStreamDataset:
if event.DataStream == nil {
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Dataset = v.Str()
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Namespace = v.Str()
default:
setLabel(replaceDots(k), event, ifaceAttributeValue(v))
}
Expand Down
97 changes: 93 additions & 4 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,6 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) {
resourceLogs := logs.ResourceLogs().AppendEmpty()
resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes()
resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "swift")
resourceAttrs.PutStr("data_stream.dataset", "dataset")
resourceAttrs.PutStr("data_stream.namespace", "namespace")
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()

record1 := newLogRecord("") // no log body
Expand Down Expand Up @@ -467,8 +465,99 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) {
assert.Equal(t, "event", processed[0].Event.Kind)
assert.Equal(t, "device", processed[0].Event.Category)
assert.Equal(t, "MyEvent", processed[0].Event.Action)
assert.Equal(t, "dataset", processed[0].DataStream.Dataset)
assert.Equal(t, "namespace", processed[0].DataStream.Namespace)
}

func TestConsumerConsumeOTelEventLogsDataStream(t *testing.T) {
for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
scopeDataStreamDataset string
scopeDataStreamNamespace string
recordDataStreamDataset string
recordDataStreamNamespace string

expectedDataStreamDataset string
expectedDataStreamNamespace string
}{
{
resourceDataStreamDataset: "1",
resourceDataStreamNamespace: "2",
scopeDataStreamDataset: "3",
scopeDataStreamNamespace: "4",
recordDataStreamDataset: "5",
recordDataStreamNamespace: "6",
expectedDataStreamDataset: "5",
expectedDataStreamNamespace: "6",
},
{
resourceDataStreamDataset: "1",
resourceDataStreamNamespace: "2",
scopeDataStreamDataset: "3",
scopeDataStreamNamespace: "4",
expectedDataStreamDataset: "3",
expectedDataStreamNamespace: "4",
},
{
resourceDataStreamDataset: "1",
resourceDataStreamNamespace: "2",
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
} {
t.Run(tc.resourceDataStreamDataset, func(t *testing.T) {

logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes()
if tc.resourceDataStreamDataset != "" {
resourceAttrs.PutStr("data_stream.dataset", tc.resourceDataStreamDataset)
}
if tc.resourceDataStreamNamespace != "" {
resourceAttrs.PutStr("data_stream.namespace", tc.resourceDataStreamNamespace)
}

scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
scopeAttrs := resourceLogs.ScopeLogs().At(0).Scope().Attributes()
if tc.scopeDataStreamDataset != "" {
scopeAttrs.PutStr("data_stream.dataset", tc.scopeDataStreamDataset)
}
if tc.scopeDataStreamNamespace != "" {
scopeAttrs.PutStr("data_stream.namespace", tc.scopeDataStreamNamespace)
}

record1 := newLogRecord("") // no log body
record1.CopyTo(scopeLogs.LogRecords().AppendEmpty())
recordAttrs := scopeLogs.LogRecords().At(0).Attributes()
if tc.recordDataStreamDataset != "" {
recordAttrs.PutStr("data_stream.dataset", tc.recordDataStreamDataset)
}
if tc.recordDataStreamNamespace != "" {
recordAttrs.PutStr("data_stream.namespace", tc.recordDataStreamNamespace)
}

var processed modelpb.Batch
var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error {
if processed != nil {
panic("already processes batch")
}
processed = batch.Clone()
assert.NotZero(t, processed[0].Timestamp)
processed[0].Timestamp = 0
return nil
}
consumer := otlp.NewConsumer(otlp.ConsumerConfig{
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

assert.Len(t, processed, 1)
assert.Equal(t, tc.expectedDataStreamDataset, processed[0].DataStream.Dataset)
assert.Equal(t, tc.expectedDataStreamNamespace, processed[0].DataStream.Namespace)
})
}
}

func TestConsumerConsumeOTelLogsWithTimestamp(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions input/otlp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,25 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent)
}
}

func translateScopeMetadata(scope pcommon.InstrumentationScope, out *modelpb.APMEvent) {
scope.Attributes().Range(func(k string, v pcommon.Value) bool {
switch k {
// data_stream.*
case attributeDataStreamDataset:
if out.DataStream == nil {
out.DataStream = modelpb.DataStreamFromVTPool()
}
out.DataStream.Dataset = v.Str()
case attributeDataStreamNamespace:
if out.DataStream == nil {
out.DataStream = modelpb.DataStreamFromVTPool()
}
out.DataStream.Namespace = v.Str()
}
return true
})
}

func cleanServiceName(name string) string {
return serviceNameInvalidRegexp.ReplaceAllString(truncate(name), "_")
}
Expand Down

0 comments on commit 972a634

Please sign in to comment.