Skip to content

Commit

Permalink
Merge pull request #3 from ozonru/k8s-input
Browse files Browse the repository at this point in the history
k8s become input plugin instead action
  • Loading branch information
vitkovskii authored Nov 13, 2020
2 parents 4fd14dd + 3716ed9 commit 58a5f9e
Show file tree
Hide file tree
Showing 28 changed files with 763 additions and 405 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION ?= v0.1.4
VERSION ?= v0.1.7

.PHONY: test
test:
Expand All @@ -14,6 +14,10 @@ test-e2e:
bench-file:
go test -bench LightJsonReadPar ./plugin/input/file -v -count 1 -run -benchmem -benchtime 1x

.PHONY: gen-doc
gen-doc:
insane-doc

.PHONY: profile-file
profile-file:
go test -bench LightJsonReadPar ./plugin/input/file -v -count 1 -run -benchmem -benchtime 1x -cpuprofile cpu.pprof -memprofile mem.pprof -mutexprofile mutex.pprof
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ TBD: throughput on production servers.

## Plugins

**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [kafka](plugin/input/kafka/README.md)
**Input**: [dmesg](/plugin/input/dmesg/README.md), [fake](/plugin/input/fake/README.md), [file](/plugin/input/file/README.md), [http](/plugin/input/http/README.md), [k8s](/plugin/input/k8s/README.md), [kafka](/plugin/input/kafka/README.md)

**Action**: [convert_date](plugin/action/convert_date/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [json_decode](plugin/action/json_decode/README.md), [k8s](plugin/action/k8s/README.md), [keep_fields](plugin/action/keep_fields/README.md), [modify](plugin/action/modify/README.md), [parse_es](plugin/action/parse_es/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [throttle](plugin/action/throttle/README.md)
**Action**: [convert_date](/plugin/action/convert_date/README.md), [debug](/plugin/action/debug/README.md), [discard](/plugin/action/discard/README.md), [flatten](/plugin/action/flatten/README.md), [join](/plugin/action/join/README.md), [json_decode](/plugin/action/json_decode/README.md), [keep_fields](/plugin/action/keep_fields/README.md), [modify](/plugin/action/modify/README.md), [parse_es](/plugin/action/parse_es/README.md), [remove_fields](/plugin/action/remove_fields/README.md), [rename](/plugin/action/rename/README.md), [throttle](/plugin/action/throttle/README.md)

**Output**: [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [stdout](plugin/output/stdout/README.md)
**Output**: [devnull](/plugin/output/devnull/README.md), [elasticsearch](/plugin/output/elasticsearch/README.md), [gelf](/plugin/output/gelf/README.md), [kafka](/plugin/output/kafka/README.md), [stdout](/plugin/output/stdout/README.md)

## What's next
* [Quick start](/docs/quick-start.md)
Expand Down
46 changes: 23 additions & 23 deletions _sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,33 @@

- **Plugins**
- Input
- [dmesg](plugin/input/dmesg/README.md)
- [fake](plugin/input/fake/README.md)
- [file](plugin/input/file/README.md)
- [http](plugin/input/http/README.md)
- [kafka](plugin/input/kafka/README.md)
- [dmesg](/plugin/input/dmesg/README.md)
- [fake](/plugin/input/fake/README.md)
- [file](/plugin/input/file/README.md)
- [http](/plugin/input/http/README.md)
- [k8s](/plugin/input/k8s/README.md)
- [kafka](/plugin/input/kafka/README.md)

- Action
- [convert_date](plugin/action/convert_date/README.md)
- [debug](plugin/action/debug/README.md)
- [discard](plugin/action/discard/README.md)
- [flatten](plugin/action/flatten/README.md)
- [join](plugin/action/join/README.md)
- [json_decode](plugin/action/json_decode/README.md)
- [k8s](plugin/action/k8s/README.md)
- [keep_fields](plugin/action/keep_fields/README.md)
- [modify](plugin/action/modify/README.md)
- [parse_es](plugin/action/parse_es/README.md)
- [remove_fields](plugin/action/remove_fields/README.md)
- [rename](plugin/action/rename/README.md)
- [throttle](plugin/action/throttle/README.md)
- [convert_date](/plugin/action/convert_date/README.md)
- [debug](/plugin/action/debug/README.md)
- [discard](/plugin/action/discard/README.md)
- [flatten](/plugin/action/flatten/README.md)
- [join](/plugin/action/join/README.md)
- [json_decode](/plugin/action/json_decode/README.md)
- [keep_fields](/plugin/action/keep_fields/README.md)
- [modify](/plugin/action/modify/README.md)
- [parse_es](/plugin/action/parse_es/README.md)
- [remove_fields](/plugin/action/remove_fields/README.md)
- [rename](/plugin/action/rename/README.md)
- [throttle](/plugin/action/throttle/README.md)

- Output
- [devnull](plugin/output/devnull/README.md)
- [elasticsearch](plugin/output/elasticsearch/README.md)
- [gelf](plugin/output/gelf/README.md)
- [kafka](plugin/output/kafka/README.md)
- [stdout](plugin/output/stdout/README.md)
- [devnull](/plugin/output/devnull/README.md)
- [elasticsearch](/plugin/output/elasticsearch/README.md)
- [gelf](/plugin/output/gelf/README.md)
- [kafka](/plugin/output/kafka/README.md)
- [stdout](/plugin/output/stdout/README.md)


- **Other**
Expand Down
253 changes: 143 additions & 110 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,152 +83,185 @@ func Parse(ptr interface{}, values map[string]int) error {
return nil
}

var vChild reflect.Value
for i := 0; i < t.NumField(); i++ {
vField := v.Field(i)
tField := t.Field(i)

tag := tField.Tag.Get("required")
required := tag == "true"
shouldFill := tField.Tag.Get("child")
if shouldFill == "true" {
vChild = vField
continue
}

tag = tField.Tag.Get("default")
if tag != "" {
switch vField.Kind() {
case reflect.String:
if vField.String() == "" {
vField.SetString(tag)
}
case reflect.Int:
val, err := strconv.Atoi(tag)
if err != nil {
return errors.Wrapf(err, "default value for field %s should be int, got=%s", tField.Name, tag)
}
vField.SetInt(int64(val))
}

err := ParseField(v, vField, tField, values)
if err != nil {
return err
}
}

tag = tField.Tag.Get("options")
if tag != "" {
parts := strings.Split(tag, "|")
if vField.Kind() != reflect.String {
return fmt.Errorf("options deals with strings only, but field %s has %s type", tField.Name, tField.Type.Name())
if vChild.CanAddr() {
for i := 0; i < vChild.NumField(); i++ {
name := vChild.Type().Field(i).Name
val := v.FieldByName(name)
if val.CanAddr() {
vChild.Field(i).Set(val)
}
}

err := Parse(vChild.Addr().Interface(), values)
if err != nil {
return err
}
return nil
}

found := false
for _, part := range parts {
if vField.String() == part {
found = true
break
}
}
return nil
}

func ParseField(v reflect.Value, vField reflect.Value, tField reflect.StructField, values map[string]int) error {
tag := tField.Tag.Get("required")
required := tag == "true"

if !found {
return fmt.Errorf("field %s should be one of %s, got=%s", t.Field(i).Name, tag, vField.String())
tag = tField.Tag.Get("default")
if tag != "" {
switch vField.Kind() {
case reflect.String:
if vField.String() == "" {
vField.SetString(tag)
}
case reflect.Int:
val, err := strconv.Atoi(tag)
if err != nil {
return errors.Wrapf(err, "default value for field %s should be int, got=%s", tField.Name, tag)
}
vField.SetInt(int64(val))
}
}

tag = tField.Tag.Get("parse")
if tag != "" {
if vField.Kind() != reflect.String {
return fmt.Errorf("field %s should be a string, but it's %s", tField.Name, tField.Type.Name())
tag = tField.Tag.Get("options")
if tag != "" {
parts := strings.Split(tag, "|")
if vField.Kind() != reflect.String {
return fmt.Errorf("options deals with strings only, but field %s has %s type", tField.Name, tField.Type.Name())
}

found := false
for _, part := range parts {
if vField.String() == part {
found = true
break
}
}

finalField := v.FieldByName(t.Field(i).Name + "_")
if !found {
return fmt.Errorf("field %s should be one of %s, got=%s", tField.Name, tag, vField.String())
}
}

switch tag {
case "regexp":
re, err := CompileRegex(vField.String())
if err != nil {
return fmt.Errorf("can't compile regexp for field %s: %s", t.Field(i).Name, err.Error())
}
finalField.Set(reflect.ValueOf(re))
case "selector":
fields := ParseFieldSelector(vField.String())
finalField.Set(reflect.ValueOf(fields))
case "duration":
result, err := time.ParseDuration(vField.String())
if err != nil {
return fmt.Errorf("field %s has wrong duration format: %s", t.Field(i).Name, err.Error())
}
tag = tField.Tag.Get("parse")
if tag != "" {
if vField.Kind() != reflect.String {
return fmt.Errorf("field %s should be a string, but it's %s", tField.Name, tField.Type.Name())
}

finalField.SetInt(int64(result))
case "list-map":
listMap := make(map[string]bool)
finalField := v.FieldByName(tField.Name + "_")

parts := strings.Split(vField.String(), ",")
for _, part := range parts {
cleanPart := strings.TrimSpace(part)
listMap[cleanPart] = true
}
switch tag {
case "regexp":
re, err := CompileRegex(vField.String())
if err != nil {
return fmt.Errorf("can't compile regexp for field %s: %s", tField.Name, err.Error())
}
finalField.Set(reflect.ValueOf(re))
case "selector":
fields := ParseFieldSelector(vField.String())
finalField.Set(reflect.ValueOf(fields))
case "duration":
result, err := time.ParseDuration(vField.String())
if err != nil {
return fmt.Errorf("field %s has wrong duration format: %s", tField.Name, err.Error())
}

finalField.Set(reflect.ValueOf(listMap))
case "list":
list := make([]string, 0)
finalField.SetInt(int64(result))
case "list-map":
listMap := make(map[string]bool)

parts := strings.Split(vField.String(), ",")
for _, part := range parts {
cleanPart := strings.TrimSpace(part)
list = append(list, cleanPart)
}
parts := strings.Split(vField.String(), ",")
for _, part := range parts {
cleanPart := strings.TrimSpace(part)
listMap[cleanPart] = true
}

finalField.Set(reflect.ValueOf(list))
case "expression":
pos := strings.IndexAny(vField.String(), "*/+-")
if pos == -1 {
i, err := strconv.Atoi(vField.String())
if err != nil {
return fmt.Errorf("can't convert %s to int", vField.String())
}
finalField.SetInt(int64(i))
return nil
}
finalField.Set(reflect.ValueOf(listMap))
case "list":
list := make([]string, 0)

op1 := strings.TrimSpace(vField.String()[:pos])
op := vField.String()[pos]
op2 := strings.TrimSpace(vField.String()[pos+1:])
parts := strings.Split(vField.String(), ",")
for _, part := range parts {
cleanPart := strings.TrimSpace(part)
list = append(list, cleanPart)
}

op1_, err := strconv.Atoi(op1)
finalField.Set(reflect.ValueOf(list))
case "expression":
pos := strings.IndexAny(vField.String(), "*/+-")
if pos == -1 {
i, err := strconv.Atoi(vField.String())
if err != nil {
has := false
op1_, has = values[op1]
if ! has {
return fmt.Errorf("can't find value for %q in expression", op1)
}
return fmt.Errorf("can't convert %s to int", vField.String())
}
finalField.SetInt(int64(i))
return nil
}

op2_, err := strconv.Atoi(op2)
if err != nil {
has := false
op2_, has = values[op2]
if ! has {
return fmt.Errorf("can't find value for %q in expression", op2)
}
op1 := strings.TrimSpace(vField.String()[:pos])
op := vField.String()[pos]
op2 := strings.TrimSpace(vField.String()[pos+1:])

op1_, err := strconv.Atoi(op1)
if err != nil {
has := false
op1_, has = values[op1]
if ! has {
return fmt.Errorf("can't find value for %q in expression", op1)
}
}

result := 0
switch op {
case '+':
result = op1_ + op2_
case '-':
result = op1_ - op2_
case '*':
result = op1_ * op2_
case '/':
result = op1_ / op2_
default:
return fmt.Errorf("unknown operation %q", op)
op2_, err := strconv.Atoi(op2)
if err != nil {
has := false
op2_, has = values[op2]
if ! has {
return fmt.Errorf("can't find value for %q in expression", op2)
}
}

finalField.SetInt(int64(result))
result := 0
switch op {
case '+':
result = op1_ + op2_
case '-':
result = op1_ - op2_
case '*':
result = op1_ * op2_
case '/':
result = op1_ / op2_
default:
return fmt.Errorf("unsupported parse type %q for field %s", tag, t.Field(i).Name)
return fmt.Errorf("unknown operation %q", op)
}
}

if required && vField.IsZero() {
return fmt.Errorf("field %s should set as non-zero value", t.Field(i).Name)
finalField.SetInt(int64(result))
default:
return fmt.Errorf("unsupported parse type %q for field %s", tag, tField.Name)
}
}

if required && vField.IsZero() {
return fmt.Errorf("field %s should set as non-zero value", tField.Name)
}

return nil
}

Expand Down
Loading

0 comments on commit 58a5f9e

Please sign in to comment.