Skip to content

Commit

Permalink
[extension/jsonlogencoding] Support marshaling multiple logs (#36156)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
Change how logs are marshaled. Instead of marshaling just the first log,
marshal all logs into a JSON array.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes #34064
  • Loading branch information
atoulme authored Dec 11, 2024
1 parent 349ebd1 commit 421d710
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 27 deletions.
27 changes: 27 additions & 0 deletions .chloggen/consume_multiple_records.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: jsonlogencodingextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change how logs are marshaled. Instead of marshaling just the first log, marshal all logs into a JSON array.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34064]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
54 changes: 30 additions & 24 deletions extension/encoding/jsonlogencodingextension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ package jsonlogencodingextension // import "github.com/open-telemetry/openteleme
import (
"context"
"fmt"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/goccy/go-json"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -29,38 +28,45 @@ func (e *jsonLogExtension) MarshalLogs(ld plog.Logs) ([]byte, error) {
if e.config.(*Config).Mode == JSONEncodingModeBodyWithInlineAttributes {
return e.logProcessor(ld)
}
logRecord := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body()
var raw map[string]any
switch logRecord.Type() {
case pcommon.ValueTypeMap:
raw = logRecord.Map().AsRaw()
default:
return nil, fmt.Errorf("marshal: expected 'Map' found '%v'", logRecord.Type().String())
}
buf, err := jsoniter.Marshal(raw)
if err != nil {
return nil, err
logs := make([]map[string]any, 0, ld.LogRecordCount())

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
sls := rl.ScopeLogs()
for j := 0; j < sls.Len(); j++ {
sl := sls.At(j)
logSlice := sl.LogRecords()
for k := 0; k < logSlice.Len(); k++ {
log := logSlice.At(k)
switch log.Body().Type() {
case pcommon.ValueTypeMap:
logs = append(logs, log.Body().Map().AsRaw())
default:
return nil, fmt.Errorf("marshal: expected 'Map' found '%v'", log.Body().Type())
}
}
}
}
return buf, nil
return json.Marshal(logs)
}

func (e *jsonLogExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) {
p := plog.NewLogs()

// get json logs from the buffer
jsonVal := map[string]any{}
if err := jsoniter.Unmarshal(buf, &jsonVal); err != nil {
var jsonVal []map[string]any
if err := json.Unmarshal(buf, &jsonVal); err != nil {
return p, err
}

// create a new log record
logRecords := p.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
logRecords.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))

// Set the unmarshaled jsonVal as the body of the log record
if err := logRecords.Body().SetEmptyMap().FromRaw(jsonVal); err != nil {
return p, err
sl := p.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty()
for _, r := range jsonVal {
if err := sl.LogRecords().AppendEmpty().Body().SetEmptyMap().FromRaw(r); err != nil {
return p, err
}
}

return p, nil
}

Expand Down Expand Up @@ -96,7 +102,7 @@ func (e *jsonLogExtension) logProcessor(ld plog.Logs) ([]byte, error) {
}
}

return jsoniter.Marshal(logs)
return json.Marshal(logs)
}

type logBody struct {
Expand Down
3 changes: 2 additions & 1 deletion extension/encoding/jsonlogencodingextension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/encod
go 1.22.0

require (
github.com/json-iterator/go v1.1.12
github.com/goccy/go-json v0.10.3
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.115.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8
Expand All @@ -22,6 +22,7 @@ require (
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions extension/encoding/jsonlogencodingextension/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions extension/encoding/jsonlogencodingextension/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestMarshalUnmarshal(t *testing.T) {
Mode: JSONEncodingModeBody,
},
}
json := `{"example":"example valid json to test that the unmarshaler is correctly returning a plog value"}`
json := `[{"example":"example valid json to test that the unmarshaler is correctly returning a plog value"}]`
ld, err := e.UnmarshalLogs([]byte(json))
assert.NoError(t, err)
assert.Equal(t, 1, ld.LogRecordCount())
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestInvalidUnmarshal(t *testing.T) {
},
}
_, err := e.UnmarshalLogs([]byte("NOT A JSON"))
assert.ErrorContains(t, err, "ReadMapCB: expect { or n, but found N")
assert.ErrorContains(t, err, "json: slice unexpected end of JSON input")
}

func TestPrettyLogProcessor(t *testing.T) {
Expand Down

0 comments on commit 421d710

Please sign in to comment.