Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/ottl]: Add ParseCSV converter #31081

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
aec4407
add parse CSV function and tests
BinaryFissionGames Feb 6, 2024
bda815f
make gci
BinaryFissionGames Feb 6, 2024
7fadbe5
Add e2e test
BinaryFissionGames Feb 6, 2024
617cfba
Add docs
BinaryFissionGames Feb 6, 2024
650e79f
add license
BinaryFissionGames Feb 6, 2024
d881bf7
tweak readme
BinaryFissionGames Feb 6, 2024
879b1ed
add parsecsv to table of contents
BinaryFissionGames Feb 6, 2024
5e1c365
explain modes
BinaryFissionGames Feb 6, 2024
a4c2e60
add e2e test for specifying optionals
BinaryFissionGames Feb 6, 2024
60ef325
edit ignore quotes example to show why you'd use it
BinaryFissionGames Feb 6, 2024
5254553
add changelog
BinaryFissionGames Feb 6, 2024
91dbe5e
make goporto
BinaryFissionGames Feb 6, 2024
074f32a
add missing word.
BinaryFissionGames Feb 6, 2024
14f06ec
remove double wrapping of error
BinaryFissionGames Feb 9, 2024
571ee1a
test empty row and empty header, tweak to fix bugs with that
BinaryFissionGames Feb 9, 2024
19fd563
clarify newline behaviour in README
BinaryFissionGames Feb 9, 2024
6a376ce
return error if header string is empty.
BinaryFissionGames Feb 9, 2024
48a5415
add some more newline tests
BinaryFissionGames Feb 12, 2024
00aed9c
Update readme newline behavior
BinaryFissionGames Feb 12, 2024
a4896fc
minor tweak to add newline at end of field test
BinaryFissionGames Feb 12, 2024
ca71cb8
pull common logic to coreinternal
BinaryFissionGames Feb 14, 2024
0e973f5
move CSV reader creation into common as well
BinaryFissionGames Feb 14, 2024
c1915ca
add tests for shared parsing, minor tweaks
BinaryFissionGames Feb 14, 2024
d286d1c
add license
BinaryFissionGames Feb 14, 2024
1037fb0
make goporto
BinaryFissionGames Feb 14, 2024
aa9c924
add @djaglowski as codeowner for parseutils
BinaryFissionGames Feb 14, 2024
2c692b7
remove parseutils from codeowners
BinaryFissionGames Feb 14, 2024
415cb0c
checkout csv.go from main
BinaryFissionGames Feb 15, 2024
201b533
make header delimiter a string, not a rune
BinaryFissionGames Feb 15, 2024
f9c5c03
reduce error wrapping
BinaryFissionGames Feb 15, 2024
cf250c4
Collapse parseCSV and parseCSVIgnoreQuotes together
BinaryFissionGames Feb 15, 2024
2e2acf5
swap to using callback for parsing the row
BinaryFissionGames Feb 15, 2024
eed4db3
wrap get error with human-friendly error message
BinaryFissionGames Feb 15, 2024
481f577
fix tests after modifying error message
BinaryFissionGames Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .chloggen/feat_ottl_csv-parse-function.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/ottl

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds a new ParseCSV converter that can be used to parse CSV strings.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30921]
85 changes: 85 additions & 0 deletions internal/coreinternal/parseutils/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package parseutils // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils"

import (
"encoding/csv"
"errors"
"fmt"
"io"
"strings"
)

// ReadCSVRow reads a CSV row from the csv reader, returning the fields parsed from the line.
// We make the assumption that the payload we are reading is a single row, so we allow newline characters in fields.
// However, the csv package does not support newlines in a CSV field (it assumes rows are newline separated),
// so in order to support parsing newlines in a field, we need to stitch together the results of multiple Read calls.
func ReadCSVRow(row string, delimiter rune, lazyQuotes bool) ([]string, error) {
reader := csv.NewReader(strings.NewReader(row))
reader.Comma = delimiter
// -1 indicates a variable length of fields
reader.FieldsPerRecord = -1
reader.LazyQuotes = lazyQuotes

lines := make([][]string, 0, 1)
for {
line, err := reader.Read()
if errors.Is(err, io.EOF) {
break
}

if err != nil && len(line) == 0 {
return nil, fmt.Errorf("read csv line: %w", err)
}

lines = append(lines, line)
}

// If the input is empty, we might not get any lines
if len(lines) == 0 {
return nil, errors.New("no csv lines found")
}

/*
This parser is parsing a single value, which came from a single log entry.
Therefore, if there are multiple lines here, it should be assumed that each
subsequent line contains a continuation of the last field in the previous line.

Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee",
expect reader.Read() to return bodies:
- ["aa","b"]
- ["b","cc","d"]
- ["d","ee"]
*/

joinedLine := lines[0]
for i := 1; i < len(lines); i++ {
nextLine := lines[i]

// The first element of the next line is a continuation of the previous line's last element
joinedLine[len(joinedLine)-1] += "\n" + nextLine[0]

// The remainder are separate elements
for n := 1; n < len(nextLine); n++ {
joinedLine = append(joinedLine, nextLine[n])
}
}

return joinedLine, nil
}

// MapCSVHeaders creates a map of headers[i] -> fields[i].
func MapCSVHeaders(headers []string, fields []string) (map[string]any, error) {
if len(fields) != len(headers) {
return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields))
}

parsedValues := make(map[string]any, len(headers))

for i, val := range fields {
parsedValues[headers[i]] = val
}

return parsedValues, nil
}
175 changes: 175 additions & 0 deletions internal/coreinternal/parseutils/csv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package parseutils

import (
"testing"

"github.com/stretchr/testify/require"
)

func Test_ParseCSV(t *testing.T) {
testCases := []struct {
name string
row string
delimiter rune
lazyQuotes bool
expectedRow []string
expectedErr string
}{
{
name: "Typical CSV row",
row: "field1,field2,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "field2", "field3"},
},
{
name: "Quoted CSV row",
row: `field1,"field2,contains delimiter",field3`,
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "field2,contains delimiter", "field3"},
},
{
name: "Bare quote in field (strict)",
row: `field1,field"2,field3`,
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1"},
},
{
name: "Bare quote in field (lazy quotes)",
row: `field1,field"2,field3`,
delimiter: ',',
lazyQuotes: true,
expectedRow: []string{"field1", `field"2`, "field3"},
},
{
name: "Empty row",
row: "",
delimiter: ',',
lazyQuotes: false,
expectedErr: "no csv lines found",
},
{
name: "Newlines in field",
row: "field1,fie\nld2,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "fie\nld2", "field3"},
},
{
name: "Newlines prefix field",
row: "field1,\nfield2,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "\nfield2", "field3"},
},
{
name: "Newlines suffix field",
row: "field1,field2\n,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "field2\n", "field3"},
},
{
name: "Newlines prefix row",
row: "\nfield1,field2,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "field2", "field3"},
},
{
name: "Newlines suffix row",
row: "field1,field2,field3\n",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "field2", "field3"},
},
{
name: "Newlines in first row",
row: "fiel\nd1,field2,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"fiel\nd1", "field2", "field3"},
},
{
name: "Newlines in all rows",
row: "\nfiel\nd1,fie\nld2,fie\nld3\n",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"fiel\nd1", "fie\nld2", "fie\nld3"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s, err := ReadCSVRow(tc.row, tc.delimiter, tc.lazyQuotes)
if tc.expectedErr != "" {
require.ErrorContains(t, err, tc.expectedErr)
} else {
require.Equal(t, tc.expectedRow, s)
}
})
}
}

func Test_MapCSVHeaders(t *testing.T) {
testCases := []struct {
name string
headers []string
fields []string
expectedMap map[string]any
expectedErr string
}{
{
name: "Map headers to fields",
headers: []string{"Col1", "Col2", "Col3"},
fields: []string{"Val1", "Val2", "Val3"},
expectedMap: map[string]any{
"Col1": "Val1",
"Col2": "Val2",
"Col3": "Val3",
},
},
{
name: "Missing field",
headers: []string{"Col1", "Col2", "Col3"},
fields: []string{"Val1", "Val2"},
expectedErr: "wrong number of fields: expected 3, found 2",
},
{
name: "Too many fields",
headers: []string{"Col1", "Col2", "Col3"},
fields: []string{"Val1", "Val2", "Val3", "Val4"},
expectedErr: "wrong number of fields: expected 3, found 4",
},
{
name: "Single field",
headers: []string{"Col1"},
fields: []string{"Val1"},
expectedMap: map[string]any{
"Col1": "Val1",
},
},
{
name: "No fields",
headers: []string{},
fields: []string{},
expectedMap: map[string]any{},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
m, err := MapCSVHeaders(tc.headers, tc.fields)
if tc.expectedErr != "" {
require.ErrorContains(t, err, tc.expectedErr)
} else {
require.Equal(t, tc.expectedMap, m)
}
})
}
}
18 changes: 18 additions & 0 deletions pkg/ottl/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,24 @@ func Test_e2e_converters(t *testing.T) {
tCtx.GetLogRecord().Attributes().PutStr("test", "pass")
},
},
{
statement: `set(attributes["test"], ParseCSV("val1;val2;val3","header1|header2|header3",";","|","strict"))`,
want: func(tCtx ottllog.TransformContext) {
m := tCtx.GetLogRecord().Attributes().PutEmptyMap("test")
m.PutStr("header1", "val1")
m.PutStr("header2", "val2")
m.PutStr("header3", "val3")
},
},
{
statement: `set(attributes["test"], ParseCSV("val1,val2,val3","header1|header2|header3",headerDelimiter="|",mode="strict"))`,
want: func(tCtx ottllog.TransformContext) {
m := tCtx.GetLogRecord().Attributes().PutEmptyMap("test")
m.PutStr("header1", "val1")
m.PutStr("header2", "val2")
m.PutStr("header3", "val3")
},
},
{
statement: `set(attributes["test"], ParseJSON("{\"id\":1}"))`,
want: func(tCtx ottllog.TransformContext) {
Expand Down
33 changes: 33 additions & 0 deletions pkg/ottl/ottlfuncs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ Available Converters:
- [Minutes](#minutes)
- [Nanoseconds](#nanoseconds)
- [Now](#now)
- [ParseCSV](#parsecsv)
- [ParseJSON](#parsejson)
- [ParseKeyValue](#parsekeyvalue)
- [Seconds](#seconds)
Expand Down Expand Up @@ -799,6 +800,38 @@ Examples:
- `UnixSeconds(Now())`
- `set(start_time, Now())`

### ParseCSV

`ParseCSV(target, headers, Optional[delimiter], Optional[headerDelimiter], Optional[mode])`

The `ParseCSV` Converter returns a `pcommon.Map` struct that contains the result of parsing the `target` string as CSV. The resultant map is structured such that it is a mapping of field name -> field value.

`target` is a Getter that returns a string. This string should be a CSV row. if `target` is not a properly formatted CSV row, or if the number of fields in `target` does not match the number of fields in `headers`, `ParseCSV` will return an error. Leading and trailing newlines in `target` will be stripped. Newlines elswhere in `target` are not treated as row delimiters during parsing, and will be treated as though they are part of the field that are placed in.

`headers` is a Getter that returns a string. This string should be a CSV header, specifying the names of the CSV fields.

`delimiter` is an optional string parameter that specifies the delimiter used to split `target` into fields. By default, it is set to `,`.

`headerDelimiter` is an optional string parameter that specified the delimiter used to split `headers` into fields. By default, it is set to the value of `delimiter`.

`mode` is an optional string paramater that specifies the parsing mode. Valid values are `strict`, `lazyQuotes`, and `ignoreQuotes`. By default, it is set to `strict`.
- The `strict` mode provides typical CSV parsing.
- The `lazyQotes` mode provides a relaxed version of CSV parsing where a quote may appear in the middle of a unquoted field.
- The `ignoreQuotes` mode completely ignores any quoting rules for CSV and just splits the row on the delimiter.

Examples:

- `ParseCSV("999-999-9999,Joe Smith,[email protected]", "phone,name,email")`


- `ParseCSV(body, "phone|name|email", delimiter="|")`


- `ParseCSV(attributes["csv_line"], attributes["csv_headers"], delimiter="|", headerDelimiter=",", mode="lazyQuotes")`


- `ParseCSV("\"555-555-5556,Joe Smith\",[email protected]", "phone,name,email", mode="ignoreQuotes")`

### ParseJSON

`ParseJSON(target)`
Expand Down
Loading
Loading