Skip to content

Commit

Permalink
check stream name in cri decoder (#633)
Browse files Browse the repository at this point in the history
* check stream name in cri decoder

* try to split in cri decode
  • Loading branch information
DmitryRomanov authored Jun 3, 2024
1 parent ed69904 commit 682222e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
18 changes: 11 additions & 7 deletions decoder/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ func DecodeCRI(data []byte) (row CRIRow, _ error) {
row.Time = data[:pos]
data = data[pos+1:]

// stream type
pos = bytes.IndexByte(data, criDelimiter)
if pos < 0 {
return row, fmt.Errorf("stream type is not found")
var stream []byte
// stderr or stdout
for len(stream) != 6 {
// stream type
pos = bytes.IndexByte(data, criDelimiter)
if pos < 0 {
return row, fmt.Errorf("stream type is not found")
}
stream = data[:pos]
data = data[pos+1:]
}

row.Stream = data[:pos]
data = data[pos+1:]
row.Stream = stream

// tags
pos = bytes.IndexByte(data, criDelimiter)
Expand Down
26 changes: 26 additions & 0 deletions decoder/cri_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,29 @@ func TestCRIError(t *testing.T) {

assert.Error(t, err, "there must be an error")
}

func TestCRIErrorJoined2Lines(t *testing.T) {
_, err := DecodeCRI([]byte("2016-10-06T00:17:09.669794202Z s2024-05-22T10:15:04.129321194Z 3\n"))

assert.Error(t, err, "there must be an error")
}

func TestCRIJoined2Lines(t *testing.T) {
row, err := DecodeCRI([]byte("2024-05-22T09:51:04.025764351Z s2024-05-22T10:15:04.129321194Z stderr F 2024/05/22 10:15:04 start prepraring file\n"))

assert.NoError(t, err, "error while decoding cri log")
assert.Equal(t, "2024-05-22T09:51:04.025764351Z", string(row.Time))
assert.Equal(t, "stderr", string(row.Stream))
assert.Equal(t, "2024/05/22 10:15:04 start prepraring file\n", string(row.Log))
assert.Equal(t, false, row.IsPartial)
}

func TestCRIJoined3Lines(t *testing.T) {
row, err := DecodeCRI([]byte(`2024-05-22T09:51:04.025764351Z s2024-05-22T10:15:04.129321194Z stder2024-05-22T01:16:15.512917014Z stdo 2024-05-22T06:39:29.230708414Z stdout F {"level":"warn","ts":"2024-05-22T06:39:29.230Z"}\n`))

assert.NoError(t, err, "error while decoding cri log")
assert.Equal(t, "2024-05-22T09:51:04.025764351Z", string(row.Time))
assert.Equal(t, "stdout", string(row.Stream))
assert.Equal(t, `{"level":"warn","ts":"2024-05-22T06:39:29.230Z"}\n`, string(row.Log))
assert.Equal(t, false, row.IsPartial)
}

0 comments on commit 682222e

Please sign in to comment.