From 421d710d3d8ceb28e375f57109ae976423713f9d Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 10 Dec 2024 19:21:57 -0800 Subject: [PATCH] [extension/jsonlogencoding] Support marshaling multiple logs (#36156) #### Description Change how logs are marshaled. Instead of marshaling just the first log, marshal all logs into a JSON array. #### Link to tracking issue Fixes #34064 --- .chloggen/consume_multiple_records.yaml | 27 ++++++++++ .../jsonlogencodingextension/extension.go | 54 ++++++++++--------- .../encoding/jsonlogencodingextension/go.mod | 3 +- .../encoding/jsonlogencodingextension/go.sum | 2 + .../jsonlogencodingextension/json_test.go | 4 +- 5 files changed, 63 insertions(+), 27 deletions(-) create mode 100644 .chloggen/consume_multiple_records.yaml diff --git a/.chloggen/consume_multiple_records.yaml b/.chloggen/consume_multiple_records.yaml new file mode 100644 index 000000000000..6ec779a071e9 --- /dev/null +++ b/.chloggen/consume_multiple_records.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: jsonlogencodingextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change how logs are marshaled. Instead of marshaling just the first log, marshal all logs into a JSON array. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34064] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/extension/encoding/jsonlogencodingextension/extension.go b/extension/encoding/jsonlogencodingextension/extension.go index 266c6d91aa60..18575859ce7f 100644 --- a/extension/encoding/jsonlogencodingextension/extension.go +++ b/extension/encoding/jsonlogencodingextension/extension.go @@ -6,9 +6,8 @@ package jsonlogencodingextension // import "github.com/open-telemetry/openteleme import ( "context" "fmt" - "time" - jsoniter "github.com/json-iterator/go" + "github.com/goccy/go-json" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -29,38 +28,45 @@ func (e *jsonLogExtension) MarshalLogs(ld plog.Logs) ([]byte, error) { if e.config.(*Config).Mode == JSONEncodingModeBodyWithInlineAttributes { return e.logProcessor(ld) } - logRecord := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body() - var raw map[string]any - switch logRecord.Type() { - case pcommon.ValueTypeMap: - raw = logRecord.Map().AsRaw() - default: - return nil, fmt.Errorf("marshal: expected 'Map' found '%v'", logRecord.Type().String()) - } - buf, err := jsoniter.Marshal(raw) - if err != nil { - return nil, err + logs := make([]map[string]any, 0, ld.LogRecordCount()) + + rls := ld.ResourceLogs() + for i := 0; i < rls.Len(); i++ { + rl := rls.At(i) + sls := rl.ScopeLogs() + for j := 0; j < sls.Len(); j++ { + sl := sls.At(j) + logSlice := sl.LogRecords() + for k := 0; k < logSlice.Len(); k++ { + log := logSlice.At(k) + switch log.Body().Type() { + case pcommon.ValueTypeMap: + logs = append(logs, log.Body().Map().AsRaw()) + default: + return nil, fmt.Errorf("marshal: expected 'Map' found '%v'", log.Body().Type()) + } + } + } } - return buf, nil + return json.Marshal(logs) } func (e *jsonLogExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) { p := plog.NewLogs() // get json logs from the buffer - jsonVal := map[string]any{} - if err := jsoniter.Unmarshal(buf, &jsonVal); err != nil { + var jsonVal []map[string]any + if err := json.Unmarshal(buf, &jsonVal); err != nil { return p, err } - // create a new log record - logRecords := p.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - logRecords.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - - // Set the unmarshaled jsonVal as the body of the log record - if err := logRecords.Body().SetEmptyMap().FromRaw(jsonVal); err != nil { - return p, err + sl := p.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty() + for _, r := range jsonVal { + if err := sl.LogRecords().AppendEmpty().Body().SetEmptyMap().FromRaw(r); err != nil { + return p, err + } } + return p, nil } @@ -96,7 +102,7 @@ func (e *jsonLogExtension) logProcessor(ld plog.Logs) ([]byte, error) { } } - return jsoniter.Marshal(logs) + return json.Marshal(logs) } type logBody struct { diff --git a/extension/encoding/jsonlogencodingextension/go.mod b/extension/encoding/jsonlogencodingextension/go.mod index 1252235f2a77..33ef63938cea 100644 --- a/extension/encoding/jsonlogencodingextension/go.mod +++ b/extension/encoding/jsonlogencodingextension/go.mod @@ -3,7 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/encod go 1.22.0 require ( - github.com/json-iterator/go v1.1.12 + github.com/goccy/go-json v0.10.3 github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.115.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8 @@ -22,6 +22,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect diff --git a/extension/encoding/jsonlogencodingextension/go.sum b/extension/encoding/jsonlogencodingextension/go.sum index 15de1d72e6d4..f80d74fe7739 100644 --- a/extension/encoding/jsonlogencodingextension/go.sum +++ b/extension/encoding/jsonlogencodingextension/go.sum @@ -9,6 +9,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= diff --git a/extension/encoding/jsonlogencodingextension/json_test.go b/extension/encoding/jsonlogencodingextension/json_test.go index e3fffa6869a4..38daf73916dc 100644 --- a/extension/encoding/jsonlogencodingextension/json_test.go +++ b/extension/encoding/jsonlogencodingextension/json_test.go @@ -17,7 +17,7 @@ func TestMarshalUnmarshal(t *testing.T) { Mode: JSONEncodingModeBody, }, } - json := `{"example":"example valid json to test that the unmarshaler is correctly returning a plog value"}` + json := `[{"example":"example valid json to test that the unmarshaler is correctly returning a plog value"}]` ld, err := e.UnmarshalLogs([]byte(json)) assert.NoError(t, err) assert.Equal(t, 1, ld.LogRecordCount()) @@ -47,7 +47,7 @@ func TestInvalidUnmarshal(t *testing.T) { }, } _, err := e.UnmarshalLogs([]byte("NOT A JSON")) - assert.ErrorContains(t, err, "ReadMapCB: expect { or n, but found N") + assert.ErrorContains(t, err, "json: slice unexpected end of JSON input") } func TestPrettyLogProcessor(t *testing.T) {