Skip to content

Commit

Permalink
Add trim filter to modify plugin (#599)
Browse files Browse the repository at this point in the history
* Add trim filter to modify plugin

* Refactor cfg.substitution package

* Update cfg.substition tests

* Update modify plugin

* Update docs
  • Loading branch information
HeadHunter483 authored Mar 14, 2024
1 parent f78c15f commit 53e3b5d
Show file tree
Hide file tree
Showing 10 changed files with 534 additions and 252 deletions.
137 changes: 137 additions & 0 deletions cfg/substitution/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package substitution

import (
"fmt"
"strings"

"go.uber.org/zap"
)

const (
regexFilterPrefix = "re("
trimFilterPrefix = "trim("

bufInitSize = 1024
)

type FieldFilter interface {
// Apply accepts src and dst slices of bytes and returns result stored in modified src slice.
// src slice is needed to avoid unnecessary allocations.
Apply(src []byte, dst []byte) []byte

setBuffer([]byte)
compareArgs([]any) error
}

// parseFilterOps parses a chain of field filters from substitution string
// `${field|filter1|filter2|...|filterN}` -> `<filter1>,<filter2>,...,<filterN>`.
func parseFilterOps(substitution string, pipePos, endPos int, filterBuf []byte, logger *zap.Logger) ([]FieldFilter, error) {
var filterOps []FieldFilter
offset := 0
for pipePos != -1 {
pipePos += offset
filterOp, filterEndPos, err := parseFilter(substitution[pipePos+1:endPos], logger)
if err != nil {
return nil, err
}
// single buffer for all filters because there is only one event for a substitution op simultaneously
// and filters in substitution op are applied sequentially one by one
if filterBuf == nil {
filterBuf = make([]byte, 0, bufInitSize)
}
filterOp.setBuffer(filterBuf)
filterOps = append(filterOps, filterOp)
offset = pipePos + 1 + filterEndPos
pipePos = indexRuneInExpr(substitution[offset:endPos], '|', true, false)
}
return filterOps, nil
}

// parseFilter parses filter data from string with filter args if present in format "<filter-name>(<arg1>, <arg2>, ...)".
func parseFilter(data string, logger *zap.Logger) (FieldFilter, int, error) {
origDataLen := len(data)
data = strings.TrimLeft(data, " ")
offset := origDataLen - len(data)
if strings.HasPrefix(data, regexFilterPrefix) {
return parseRegexFilter(data, offset, logger)
} else if strings.HasPrefix(data, trimFilterPrefix) {
return parseTrimFilter(data, offset)
}
return nil, -1, errInvalidFilter
}

// parseFilterArgs parses args from string in format of "<arg1>, <arg2>, ...)" --
// filter args without its name and starting bracket.
func parseFilterArgs(data string) ([]string, int, error) {
var args []string
// no args
if data[0] == ')' {
return args, 0, nil
}
argsEndPos := -1
quotesOpened := quotesOpenedNo
brackets := make([]byte, 0, 100)
curArg := make([]byte, 0, 100)
// parse args separated by comma ','
// re("((),()),()", [1, 2, 3], "'(,)'")
gatherLoop:
for i, b := range []byte(data) {
// escaped characters handling
if i != 0 && data[i-1] == '\\' {
curArg = append(curArg, b)
continue
}
// quotes have a priority over brackets
switch {
case quotesOpened == quotesOpenedNo:
switch {
case b == '\'':
quotesOpened = quotesOpenedSingle
case b == '"':
quotesOpened = quotesOpenedDouble
case b == '(' || b == '[' || b == '{':
brackets = append(brackets, b)
case b == ')' && len(brackets) == 0:
// early stop on the end of args list
argsEndPos = i
break gatherLoop
case b == ')' || b == ']':
lastBracket := brackets[len(brackets)-1]
switch {
case b == ')' && lastBracket != '(':
return nil, argsEndPos, fmt.Errorf("invalid brackets: %s", data)
case b == ']' && lastBracket != '[':
return nil, argsEndPos, fmt.Errorf("invalid brackets: %s", data)
}
brackets = brackets[:len(brackets)-1]
case b == ',' && len(brackets) == 0:
// condition for separating args
args = append(args, string(curArg))
curArg = curArg[:0]
continue gatherLoop
}
case b == '\'' && quotesOpened == quotesOpenedSingle:
quotesOpened = quotesOpenedNo
case b == '"' && quotesOpened == quotesOpenedDouble:
quotesOpened = quotesOpenedNo
}
curArg = append(curArg, b)
}
if len(brackets) > 0 {
return nil, argsEndPos, fmt.Errorf("invalid brackets: %s", data)
}
if quotesOpened != quotesOpenedNo {
return nil, argsEndPos, fmt.Errorf("not all quotes are closed: %s", data)
}
if argsEndPos == -1 {
return nil, argsEndPos, fmt.Errorf("no closing bracket for args list: %s", data)
}
// last arg
if len(curArg) > 0 {
args = append(args, string(curArg))
}
for i := range args {
args[i] = strings.TrimSpace(args[i])
}
return args, argsEndPos, nil
}
128 changes: 128 additions & 0 deletions cfg/substitution/regex_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package substitution

import (
"encoding/json"
"fmt"
"regexp"

"github.com/ozontech/file.d/cfg"
"go.uber.org/zap"
)

type RegexFilter struct {
re *regexp.Regexp
limit int
groups []int
separator []byte

buf []byte
}

func (r *RegexFilter) Apply(src []byte, dst []byte) []byte {
if len(r.groups) == 0 {
return dst
}
indexes := r.re.FindAllSubmatchIndex(src, r.limit)
if len(indexes) == 0 {
return dst
}
r.buf = r.buf[:0]
for _, index := range indexes {
for _, grp := range r.groups {
// (*regexp.Regexp).FindAllSubmatchIndex(...) returns a slice of indexes in format:
// [<start of whole regexp match>, <end of whole regexp match>, <start of group 1>, <end of group 1>, ...].
// So if there are more than one group in regexp the first two indexes must be skipped.
// Start of group 1 is index[2], end of group 1 is index[3], start of group 2 is index[4], end of group 2 is index[5],
// and so on. Hence, the start of group i is index[2*i], the end of group i is index[2*i+1].
start := index[grp*2]
end := index[grp*2+1]
if len(r.separator) > 0 && len(r.buf) != 0 {
r.buf = append(r.buf, r.separator...)
}
r.buf = append(r.buf, src[start:end]...)
}
}
if cap(dst) < len(r.buf) {
dst = make([]byte, len(r.buf))
} else {
dst = dst[:len(r.buf)]
}
copy(dst, r.buf)
return dst
}

func (r *RegexFilter) setBuffer(buf []byte) {
r.buf = buf
}

// compareArgs is used for testing. Checks filter args values.
func (r *RegexFilter) compareArgs(args []any) error {
wantArgsCnt := 4
if len(args) != wantArgsCnt {
return fmt.Errorf("wrong regex filter amount of args, want=%d got=%d", wantArgsCnt, len(args))
}
wantRe := args[0].(string)
gotRe := r.re.String()
if wantRe != gotRe {
return fmt.Errorf("wrong regex filter regex expr, want=%q got=%q", wantRe, gotRe)
}
wantLimit := args[1].(int)
gotLimit := r.limit
if wantLimit != gotLimit {
return fmt.Errorf("wrong regex filter limit, want=%v got=%v", wantLimit, gotLimit)
}
wantGroups := args[2].([]int)
gotGroups := r.groups
if len(wantGroups) != len(gotGroups) {
return fmt.Errorf("wrong regex filter groups, want=%v got=%v", wantGroups, gotGroups)
}
for i := 0; i < len(wantGroups); i++ {
if wantGroups[i] != gotGroups[i] {
return fmt.Errorf("wrong regex filter groups, want=%v got=%v", wantGroups, gotGroups)
}
}
wantSeparator := args[3].(string)
gotSeparator := string(r.separator)
if wantSeparator != gotSeparator {
return fmt.Errorf("wrong regex filter separator, want=%q got=%q", wantSeparator, gotSeparator)
}
return nil
}

func parseRegexFilter(data string, offset int, logger *zap.Logger) (FieldFilter, int, error) {
expArgsCnt := 4
filterEndPos := -1
args, argsEndPos, err := parseFilterArgs(data[len(regexFilterPrefix):])
if err != nil {
return nil, filterEndPos, fmt.Errorf("failed to parse filter args: %w", err)
}
filterEndPos = argsEndPos + len(regexFilterPrefix) + offset
if len(args) != expArgsCnt {
return nil, filterEndPos, fmt.Errorf("invalid args for regexp filter, exptected %d, got %d", expArgsCnt, len(args))
}
var reStr string
var limit int
var groups []int
var separator string
if err := json.Unmarshal([]byte(args[0]), &reStr); err != nil {
return nil, filterEndPos, fmt.Errorf("failed to parse regexp filter regexp string: %w", err)
}
re := regexp.MustCompile(reStr)
if err := json.Unmarshal([]byte(args[1]), &limit); err != nil {
return nil, filterEndPos, fmt.Errorf("failed to parse regexp filter limit: %w", err)
}
if err := json.Unmarshal([]byte(args[2]), &groups); err != nil {
return nil, filterEndPos, fmt.Errorf("failed to parse regexp filter groups: %w", err)
}
cfg.VerifyGroupNumbers(groups, re.NumSubexp(), logger)
if err := json.Unmarshal([]byte(args[3]), &separator); err != nil {
return nil, filterEndPos, fmt.Errorf("failed to parse regexp filter separator: %w", err)
}
filter := &RegexFilter{
re: re,
limit: limit,
groups: groups,
separator: []byte(separator),
}
return filter, filterEndPos, nil
}
Loading

0 comments on commit 53e3b5d

Please sign in to comment.