From 3716ed9748c455bbf3c8d1340a74e5234dd63421 Mon Sep 17 00:00:00 2001 From: vvitkovskiy Date: Fri, 13 Nov 2020 17:40:10 +0300 Subject: [PATCH] k8s become input plugin instead action --- Makefile | 6 +- README.md | 6 +- _sidebar.md | 46 ++-- cfg/config.go | 253 ++++++++++-------- cfg/config_test.go | 18 ++ cmd/file.d.go | 2 +- cmd/file.d_test.go | 8 +- decoder/consts.go | 11 + decoder/cri.go | 60 +++++ decoder/cri_test.go | 28 ++ docs/examples.md | 6 +- fd/file.d.go | 19 +- fd/util.go | 2 +- pipeline/pipeline.go | 51 +++- pipeline/plugin.go | 7 +- plugin/README.md | 87 +++--- plugin/action/README.md | 40 +-- plugin/action/k8s/README.md | 37 --- plugin/input/README.md | 37 ++- plugin/input/file/worker.go | 4 +- plugin/{action => input}/k8s/README.idoc.md | 0 plugin/input/k8s/README.md | 66 +++++ plugin/{action => input}/k8s/gatherer.go | 100 ++++--- plugin/input/k8s/k8s.go | 140 ++++++++++ plugin/{action => input}/k8s/k8s_test.go | 32 ++- .../k8s.go => input/k8s/multiline_action.go} | 91 ++----- plugin/output/README.md | 10 +- plugin/output/elasticsearch/elasticsearch.go | 1 + 28 files changed, 763 insertions(+), 405 deletions(-) create mode 100644 decoder/consts.go create mode 100644 decoder/cri.go create mode 100644 decoder/cri_test.go delete mode 100755 plugin/action/k8s/README.md rename plugin/{action => input}/k8s/README.idoc.md (100%) create mode 100755 plugin/input/k8s/README.md rename plugin/{action => input}/k8s/gatherer.go (70%) create mode 100644 plugin/input/k8s/k8s.go rename plugin/{action => input}/k8s/k8s_test.go (88%) rename plugin/{action/k8s/k8s.go => input/k8s/multiline_action.go} (57%) diff --git a/Makefile b/Makefile index e02ef37bd..e2196908c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -VERSION ?= v0.1.4 +VERSION ?= v0.1.7 .PHONY: test test: @@ -14,6 +14,10 @@ test-e2e: bench-file: go test -bench LightJsonReadPar ./plugin/input/file -v -count 1 -run -benchmem -benchtime 1x +.PHONY: gen-doc +gen-doc: + insane-doc + .PHONY: profile-file profile-file: go test -bench LightJsonReadPar ./plugin/input/file -v -count 1 -run -benchmem -benchtime 1x -cpuprofile cpu.pprof -memprofile mem.pprof -mutexprofile mutex.pprof diff --git a/README.md b/README.md index 000e9a71b..e8291aa49 100755 --- a/README.md +++ b/README.md @@ -29,11 +29,11 @@ TBD: throughput on production servers. ## Plugins -**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [kafka](plugin/input/kafka/README.md) +**Input**: [dmesg](/plugin/input/dmesg/README.md), [fake](/plugin/input/fake/README.md), [file](/plugin/input/file/README.md), [http](/plugin/input/http/README.md), [k8s](/plugin/input/k8s/README.md), [kafka](/plugin/input/kafka/README.md) -**Action**: [convert_date](plugin/action/convert_date/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [json_decode](plugin/action/json_decode/README.md), [k8s](plugin/action/k8s/README.md), [keep_fields](plugin/action/keep_fields/README.md), [modify](plugin/action/modify/README.md), [parse_es](plugin/action/parse_es/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [throttle](plugin/action/throttle/README.md) +**Action**: [convert_date](/plugin/action/convert_date/README.md), [debug](/plugin/action/debug/README.md), [discard](/plugin/action/discard/README.md), [flatten](/plugin/action/flatten/README.md), [join](/plugin/action/join/README.md), [json_decode](/plugin/action/json_decode/README.md), [keep_fields](/plugin/action/keep_fields/README.md), [modify](/plugin/action/modify/README.md), [parse_es](/plugin/action/parse_es/README.md), [remove_fields](/plugin/action/remove_fields/README.md), [rename](/plugin/action/rename/README.md), [throttle](/plugin/action/throttle/README.md) -**Output**: [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [stdout](plugin/output/stdout/README.md) +**Output**: [devnull](/plugin/output/devnull/README.md), [elasticsearch](/plugin/output/elasticsearch/README.md), [gelf](/plugin/output/gelf/README.md), [kafka](/plugin/output/kafka/README.md), [stdout](/plugin/output/stdout/README.md) ## What's next * [Quick start](/docs/quick-start.md) diff --git a/_sidebar.md b/_sidebar.md index 6ea1df50b..53fd353ea 100644 --- a/_sidebar.md +++ b/_sidebar.md @@ -13,33 +13,33 @@ - **Plugins** - Input - - [dmesg](plugin/input/dmesg/README.md) - - [fake](plugin/input/fake/README.md) - - [file](plugin/input/file/README.md) - - [http](plugin/input/http/README.md) - - [kafka](plugin/input/kafka/README.md) + - [dmesg](/plugin/input/dmesg/README.md) + - [fake](/plugin/input/fake/README.md) + - [file](/plugin/input/file/README.md) + - [http](/plugin/input/http/README.md) + - [k8s](/plugin/input/k8s/README.md) + - [kafka](/plugin/input/kafka/README.md) - Action - - [convert_date](plugin/action/convert_date/README.md) - - [debug](plugin/action/debug/README.md) - - [discard](plugin/action/discard/README.md) - - [flatten](plugin/action/flatten/README.md) - - [join](plugin/action/join/README.md) - - [json_decode](plugin/action/json_decode/README.md) - - [k8s](plugin/action/k8s/README.md) - - [keep_fields](plugin/action/keep_fields/README.md) - - [modify](plugin/action/modify/README.md) - - [parse_es](plugin/action/parse_es/README.md) - - [remove_fields](plugin/action/remove_fields/README.md) - - [rename](plugin/action/rename/README.md) - - [throttle](plugin/action/throttle/README.md) + - [convert_date](/plugin/action/convert_date/README.md) + - [debug](/plugin/action/debug/README.md) + - [discard](/plugin/action/discard/README.md) + - [flatten](/plugin/action/flatten/README.md) + - [join](/plugin/action/join/README.md) + - [json_decode](/plugin/action/json_decode/README.md) + - [keep_fields](/plugin/action/keep_fields/README.md) + - [modify](/plugin/action/modify/README.md) + - [parse_es](/plugin/action/parse_es/README.md) + - [remove_fields](/plugin/action/remove_fields/README.md) + - [rename](/plugin/action/rename/README.md) + - [throttle](/plugin/action/throttle/README.md) - Output - - [devnull](plugin/output/devnull/README.md) - - [elasticsearch](plugin/output/elasticsearch/README.md) - - [gelf](plugin/output/gelf/README.md) - - [kafka](plugin/output/kafka/README.md) - - [stdout](plugin/output/stdout/README.md) + - [devnull](/plugin/output/devnull/README.md) + - [elasticsearch](/plugin/output/elasticsearch/README.md) + - [gelf](/plugin/output/gelf/README.md) + - [kafka](/plugin/output/kafka/README.md) + - [stdout](/plugin/output/stdout/README.md) - **Other** diff --git a/cfg/config.go b/cfg/config.go index 7597f460c..762fb62b7 100644 --- a/cfg/config.go +++ b/cfg/config.go @@ -83,152 +83,185 @@ func Parse(ptr interface{}, values map[string]int) error { return nil } + var vChild reflect.Value for i := 0; i < t.NumField(); i++ { vField := v.Field(i) tField := t.Field(i) - tag := tField.Tag.Get("required") - required := tag == "true" + shouldFill := tField.Tag.Get("child") + if shouldFill == "true" { + vChild = vField + continue + } - tag = tField.Tag.Get("default") - if tag != "" { - switch vField.Kind() { - case reflect.String: - if vField.String() == "" { - vField.SetString(tag) - } - case reflect.Int: - val, err := strconv.Atoi(tag) - if err != nil { - return errors.Wrapf(err, "default value for field %s should be int, got=%s", tField.Name, tag) - } - vField.SetInt(int64(val)) - } + + err := ParseField(v, vField, tField, values) + if err != nil { + return err } + } - tag = tField.Tag.Get("options") - if tag != "" { - parts := strings.Split(tag, "|") - if vField.Kind() != reflect.String { - return fmt.Errorf("options deals with strings only, but field %s has %s type", tField.Name, tField.Type.Name()) + if vChild.CanAddr() { + for i := 0; i < vChild.NumField(); i++ { + name := vChild.Type().Field(i).Name + val := v.FieldByName(name) + if val.CanAddr() { + vChild.Field(i).Set(val) } + } + + err := Parse(vChild.Addr().Interface(), values) + if err != nil { + return err + } + return nil + } - found := false - for _, part := range parts { - if vField.String() == part { - found = true - break - } - } + return nil +} + +func ParseField(v reflect.Value, vField reflect.Value, tField reflect.StructField, values map[string]int) error { + tag := tField.Tag.Get("required") + required := tag == "true" - if !found { - return fmt.Errorf("field %s should be one of %s, got=%s", t.Field(i).Name, tag, vField.String()) + tag = tField.Tag.Get("default") + if tag != "" { + switch vField.Kind() { + case reflect.String: + if vField.String() == "" { + vField.SetString(tag) } + case reflect.Int: + val, err := strconv.Atoi(tag) + if err != nil { + return errors.Wrapf(err, "default value for field %s should be int, got=%s", tField.Name, tag) + } + vField.SetInt(int64(val)) } + } - tag = tField.Tag.Get("parse") - if tag != "" { - if vField.Kind() != reflect.String { - return fmt.Errorf("field %s should be a string, but it's %s", tField.Name, tField.Type.Name()) + tag = tField.Tag.Get("options") + if tag != "" { + parts := strings.Split(tag, "|") + if vField.Kind() != reflect.String { + return fmt.Errorf("options deals with strings only, but field %s has %s type", tField.Name, tField.Type.Name()) + } + + found := false + for _, part := range parts { + if vField.String() == part { + found = true + break } + } - finalField := v.FieldByName(t.Field(i).Name + "_") + if !found { + return fmt.Errorf("field %s should be one of %s, got=%s", tField.Name, tag, vField.String()) + } + } - switch tag { - case "regexp": - re, err := CompileRegex(vField.String()) - if err != nil { - return fmt.Errorf("can't compile regexp for field %s: %s", t.Field(i).Name, err.Error()) - } - finalField.Set(reflect.ValueOf(re)) - case "selector": - fields := ParseFieldSelector(vField.String()) - finalField.Set(reflect.ValueOf(fields)) - case "duration": - result, err := time.ParseDuration(vField.String()) - if err != nil { - return fmt.Errorf("field %s has wrong duration format: %s", t.Field(i).Name, err.Error()) - } + tag = tField.Tag.Get("parse") + if tag != "" { + if vField.Kind() != reflect.String { + return fmt.Errorf("field %s should be a string, but it's %s", tField.Name, tField.Type.Name()) + } - finalField.SetInt(int64(result)) - case "list-map": - listMap := make(map[string]bool) + finalField := v.FieldByName(tField.Name + "_") - parts := strings.Split(vField.String(), ",") - for _, part := range parts { - cleanPart := strings.TrimSpace(part) - listMap[cleanPart] = true - } + switch tag { + case "regexp": + re, err := CompileRegex(vField.String()) + if err != nil { + return fmt.Errorf("can't compile regexp for field %s: %s", tField.Name, err.Error()) + } + finalField.Set(reflect.ValueOf(re)) + case "selector": + fields := ParseFieldSelector(vField.String()) + finalField.Set(reflect.ValueOf(fields)) + case "duration": + result, err := time.ParseDuration(vField.String()) + if err != nil { + return fmt.Errorf("field %s has wrong duration format: %s", tField.Name, err.Error()) + } - finalField.Set(reflect.ValueOf(listMap)) - case "list": - list := make([]string, 0) + finalField.SetInt(int64(result)) + case "list-map": + listMap := make(map[string]bool) - parts := strings.Split(vField.String(), ",") - for _, part := range parts { - cleanPart := strings.TrimSpace(part) - list = append(list, cleanPart) - } + parts := strings.Split(vField.String(), ",") + for _, part := range parts { + cleanPart := strings.TrimSpace(part) + listMap[cleanPart] = true + } - finalField.Set(reflect.ValueOf(list)) - case "expression": - pos := strings.IndexAny(vField.String(), "*/+-") - if pos == -1 { - i, err := strconv.Atoi(vField.String()) - if err != nil { - return fmt.Errorf("can't convert %s to int", vField.String()) - } - finalField.SetInt(int64(i)) - return nil - } + finalField.Set(reflect.ValueOf(listMap)) + case "list": + list := make([]string, 0) - op1 := strings.TrimSpace(vField.String()[:pos]) - op := vField.String()[pos] - op2 := strings.TrimSpace(vField.String()[pos+1:]) + parts := strings.Split(vField.String(), ",") + for _, part := range parts { + cleanPart := strings.TrimSpace(part) + list = append(list, cleanPart) + } - op1_, err := strconv.Atoi(op1) + finalField.Set(reflect.ValueOf(list)) + case "expression": + pos := strings.IndexAny(vField.String(), "*/+-") + if pos == -1 { + i, err := strconv.Atoi(vField.String()) if err != nil { - has := false - op1_, has = values[op1] - if ! has { - return fmt.Errorf("can't find value for %q in expression", op1) - } + return fmt.Errorf("can't convert %s to int", vField.String()) } + finalField.SetInt(int64(i)) + return nil + } - op2_, err := strconv.Atoi(op2) - if err != nil { - has := false - op2_, has = values[op2] - if ! has { - return fmt.Errorf("can't find value for %q in expression", op2) - } + op1 := strings.TrimSpace(vField.String()[:pos]) + op := vField.String()[pos] + op2 := strings.TrimSpace(vField.String()[pos+1:]) + + op1_, err := strconv.Atoi(op1) + if err != nil { + has := false + op1_, has = values[op1] + if ! has { + return fmt.Errorf("can't find value for %q in expression", op1) } + } - result := 0 - switch op { - case '+': - result = op1_ + op2_ - case '-': - result = op1_ - op2_ - case '*': - result = op1_ * op2_ - case '/': - result = op1_ / op2_ - default: - return fmt.Errorf("unknown operation %q", op) + op2_, err := strconv.Atoi(op2) + if err != nil { + has := false + op2_, has = values[op2] + if ! has { + return fmt.Errorf("can't find value for %q in expression", op2) } + } - finalField.SetInt(int64(result)) + result := 0 + switch op { + case '+': + result = op1_ + op2_ + case '-': + result = op1_ - op2_ + case '*': + result = op1_ * op2_ + case '/': + result = op1_ / op2_ default: - return fmt.Errorf("unsupported parse type %q for field %s", tag, t.Field(i).Name) + return fmt.Errorf("unknown operation %q", op) } - } - if required && vField.IsZero() { - return fmt.Errorf("field %s should set as non-zero value", t.Field(i).Name) + finalField.SetInt(int64(result)) + default: + return fmt.Errorf("unsupported parse type %q for field %s", tag, tField.Name) } } + if required && vField.IsZero() { + return fmt.Errorf("field %s should set as non-zero value", tField.Name) + } + return nil } diff --git a/cfg/config_test.go b/cfg/config_test.go index cbdf7f172..a6abf963f 100644 --- a/cfg/config_test.go +++ b/cfg/config_test.go @@ -41,6 +41,15 @@ type strExpression struct { T_ int } +type hierarchyChild struct { + T string `required:"true"` +} + +type hierarchy struct { + T string `default:"sync"` + Child hierarchyChild `child:"true"` +} + func TestParseRequiredOk(t *testing.T) { s := &strRequired{T: "some_value"} err := Parse(s, nil) @@ -133,3 +142,12 @@ func TestParseFieldSelectorEnding(t *testing.T) { assert.Equal(t, "b", path[1], "wrong field") assert.Equal(t, "c.", path[2], "wrong field") } + +func TestHierarchy(t *testing.T) { + s := &hierarchy{T: "10"} + err := Parse(s, map[string]int{}) + + assert.Nil(t, err, "shouldn't be an error") + assert.Equal(t, "10", s.T, "wrong value") + assert.Equal(t, "10", s.Child.T, "wrong value") +} diff --git a/cmd/file.d.go b/cmd/file.d.go index 5929aa6e3..333667d14 100644 --- a/cmd/file.d.go +++ b/cmd/file.d.go @@ -20,7 +20,6 @@ import ( _ "github.com/ozonru/file.d/plugin/action/flatten" _ "github.com/ozonru/file.d/plugin/action/join" _ "github.com/ozonru/file.d/plugin/action/json_decode" - _ "github.com/ozonru/file.d/plugin/action/k8s" _ "github.com/ozonru/file.d/plugin/action/keep_fields" _ "github.com/ozonru/file.d/plugin/action/modify" _ "github.com/ozonru/file.d/plugin/action/parse_es" @@ -28,6 +27,7 @@ import ( _ "github.com/ozonru/file.d/plugin/action/rename" _ "github.com/ozonru/file.d/plugin/action/throttle" _ "github.com/ozonru/file.d/plugin/input/dmesg" + _ "github.com/ozonru/file.d/plugin/input/k8s" _ "github.com/ozonru/file.d/plugin/input/fake" _ "github.com/ozonru/file.d/plugin/input/file" _ "github.com/ozonru/file.d/plugin/input/http" diff --git a/cmd/file.d_test.go b/cmd/file.d_test.go index 146732130..91c2c63e7 100644 --- a/cmd/file.d_test.go +++ b/cmd/file.d_test.go @@ -15,12 +15,12 @@ import ( "github.com/ozonru/file.d/fd" _ "github.com/ozonru/file.d/plugin/action/discard" _ "github.com/ozonru/file.d/plugin/action/json_decode" - "github.com/ozonru/file.d/plugin/action/k8s" _ "github.com/ozonru/file.d/plugin/action/keep_fields" _ "github.com/ozonru/file.d/plugin/action/rename" _ "github.com/ozonru/file.d/plugin/action/throttle" _ "github.com/ozonru/file.d/plugin/input/fake" _ "github.com/ozonru/file.d/plugin/input/file" + k8s2 "github.com/ozonru/file.d/plugin/input/k8s" _ "github.com/ozonru/file.d/plugin/output/devnull" _ "github.com/ozonru/file.d/plugin/output/kafka" uuid "github.com/satori/go.uuid" @@ -75,9 +75,9 @@ func TestEndToEnd(t *testing.T) { rand.Seed(0) // disable k8s environment - k8s.DisableMetaUpdates = true - k8s.MetaWaitTimeout = time.Millisecond - k8s.MaintenanceInterval = time.Millisecond * 100 + k8s2.DisableMetaUpdates = true + k8s2.MetaWaitTimeout = time.Millisecond + k8s2.MaintenanceInterval = time.Millisecond * 100 filesDir, _ := ioutil.TempDir("", "file.d") offsetsDir, _ := ioutil.TempDir("", "file.d") diff --git a/decoder/consts.go b/decoder/consts.go new file mode 100644 index 000000000..22540f03b --- /dev/null +++ b/decoder/consts.go @@ -0,0 +1,11 @@ +package decoder + +const ( + NO DecoderType = iota + AUTO + JSON + RAW + CRI +) + +type DecoderType int diff --git a/decoder/cri.go b/decoder/cri.go new file mode 100644 index 000000000..e8d83958e --- /dev/null +++ b/decoder/cri.go @@ -0,0 +1,60 @@ +package decoder + +import ( + "bytes" + "fmt" + + insaneJSON "github.com/vitkovskii/insane-json" +) + +const ( + criDelimiter = ' ' +) + +// Examples of format: +// 2016-10-06T00:17:09.669794202Z stdout P log content 1 +// 2016-10-06T00:17:09.669794203Z stderr F log content +func DecodeCRI(event *insaneJSON.Root, data []byte) error { + // time + pos := bytes.IndexByte(data, criDelimiter) + if pos < 0 { + return fmt.Errorf("timestamp is not found") + } + + time := data[:pos] + data = data[pos+1:] + + // stream type + pos = bytes.IndexByte(data, criDelimiter) + if pos < 0 { + return fmt.Errorf("stream type is not found") + } + + stream := data[:pos] + data = data[pos+1:] + + // tags + pos = bytes.IndexByte(data, criDelimiter) + if pos < 0 { + return fmt.Errorf("log tag is not found") + } + + tags := data[:pos] + data = data[pos+1:] + + isPartial := tags[0] == 'P' + + // log + log := data + + // remove \n from log for partial logs + if isPartial { + log = log[:len(log)-1] + } + + event.AddFieldNoAlloc(event, "log").MutateToBytesCopy(event, log) + event.AddFieldNoAlloc(event, "time").MutateToBytesCopy(event, time) + event.AddFieldNoAlloc(event, "stream").MutateToBytesCopy(event, stream) + + return nil +} diff --git a/decoder/cri_test.go b/decoder/cri_test.go new file mode 100644 index 000000000..22f607756 --- /dev/null +++ b/decoder/cri_test.go @@ -0,0 +1,28 @@ +package decoder + +import ( + "testing" + + "github.com/stretchr/testify/assert" + insaneJSON "github.com/vitkovskii/insane-json" +) + +func TestCRIPartial(t *testing.T) { + root := insaneJSON.Spawn() + err := DecodeCRI(root, []byte("2016-10-06T00:17:09.669794202Z stdout P partial content 1\n")) + + assert.NoError(t, err, "error while decoding cri log") + assert.Equal(t, "2016-10-06T00:17:09.669794202Z", root.Dig("time").AsString()) + assert.Equal(t, "stdout", root.Dig("stream").AsString()) + assert.Equal(t, "partial content 1", root.Dig("log").AsString()) +} + +func TestCRIFull(t *testing.T) { + root := insaneJSON.Spawn() + err := DecodeCRI(root, []byte("2016-10-06T00:17:09.669794202Z stdout F full content 2\n")) + + assert.NoError(t, err, "error while decoding cri log") + assert.Equal(t, "2016-10-06T00:17:09.669794202Z", root.Dig("time").AsString()) + assert.Equal(t, "stdout", root.Dig("stream").AsString()) + assert.Equal(t, "full content 2\n", root.Dig("log").AsString()) +} \ No newline at end of file diff --git a/docs/examples.md b/docs/examples.md index 1deea5180..814a52ab7 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -1,7 +1,7 @@ # Examples ## Kubernetes to kafka -The following config reads logs on k8s node with docker containers, processes them and sends into kafka. +The following config reads logs on k8s node, processes them and sends into kafka. It assumes that k8s logs located in `/var/log/containers/` directory. ```yaml pipelines: @@ -46,7 +46,7 @@ pipelines: ## Kafka to graylog The following config reads logs from kafka, processes them and sends into gelf endpoint(graylog). -It assumes that logs are in docker format. +It assumes that logs are in docker json format. ```yaml pipelines: kafka_gelf_example: @@ -56,7 +56,7 @@ pipelines: topics: [k8s-logs] actions: - - type: json_decode # unpack docker "log" field + - type: json_decode # unpack "log" field field: log metric_name: input metric_labels: [k8s_label_app] # expose input metrics to prometheus diff --git a/fd/file.d.go b/fd/file.d.go index 74523b5cb..515edaffe 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -93,16 +93,29 @@ func (f *FileD) addPipeline(name string, config *cfg.PipelineConfig) { } func (f *FileD) setupInput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineConfig, values map[string]int) error { - info, err := f.getStaticInfo(pipelineConfig, pipeline.PluginKindInput, values) + inputInfo, err := f.getStaticInfo(pipelineConfig, pipeline.PluginKindInput, values) if err != nil { return err } p.SetInput(&pipeline.InputPluginInfo{ - PluginStaticInfo: info, - PluginRuntimeInfo: f.instantiatePlugin(info), + PluginStaticInfo: inputInfo, + PluginRuntimeInfo: f.instantiatePlugin(inputInfo), }) + for _, actionType := range inputInfo.AdditionalActions { + actionInfo := f.plugins.GetActionByType(actionType) + + infoCopy := *actionInfo + infoCopy.Config = inputInfo.Config + infoCopy.Type = actionType + + p.AddAction(&pipeline.ActionPluginStaticInfo{ + PluginStaticInfo: &infoCopy, + MatchConditions: pipeline.MatchConditions{}, + }) + } + return nil } diff --git a/fd/util.go b/fd/util.go index 28999e083..96cbd45a8 100644 --- a/fd/util.go +++ b/fd/util.go @@ -17,7 +17,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { avgLogSize := pipeline.DefaultAvgLogSize streamField := pipeline.DefaultStreamField maintenanceInterval := pipeline.DefaultMaintenanceInterval - decoder := "json" + decoder := "auto" isStrict := false if settings != nil { diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 997c3ba8a..1a688a91a 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/ozonru/file.d/decoder" "github.com/ozonru/file.d/logger" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -25,16 +26,14 @@ const ( antispamUnbanIterations = 4 metricsGenInterval = time.Hour - - decoderJSON = 0 - decoderRAW = 1 ) type finalizeFn = func(event *Event, notifyInput bool, backEvent bool) type InputPluginController interface { In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool) uint64 - DisableStreams() // don't use stream field and spread all events across all processors + DisableStreams() // don't use stream field and spread all events across all processors + SuggestDecoder(t decoder.DecoderType) // set decoder if pipeline uses "auto" value for decoder } type ActionPluginController interface { @@ -54,7 +53,8 @@ type Pipeline struct { Name string settings *Settings - decoder int + decoder decoder.DecoderType // decoder set in the config + suggestedDecoder decoder.DecoderType // decoder suggested by input plugin, it is used when config decoder is set to "auto" eventPool *eventPool streamer *streamer @@ -122,9 +122,13 @@ func New(name string, settings *Settings, registry *prometheus.Registry, mux *ht switch settings.Decoder { case "json": - pipeline.decoder = decoderJSON + pipeline.decoder = decoder.JSON case "raw": - pipeline.decoder = decoderRAW + pipeline.decoder = decoder.RAW + case "cri": + pipeline.decoder = decoder.CRI + case "auto": + pipeline.decoder = decoder.AUTO default: pipeline.logger.Fatalf("unknown decoder %q for pipeline %q", settings.Decoder, name) } @@ -213,7 +217,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes length := len(bytes) // don't process shit - isEmpty := length == 0 + isEmpty := length == 0 || (bytes[0] == '\n' && length == 1) isSpam := p.antispamer.isSpam(sourceID, sourceName, isNewSource) if isEmpty || isSpam { return 0 @@ -221,16 +225,33 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes event := p.eventPool.get() - switch p.decoder { - case decoderJSON: + dec := decoder.NO + if p.decoder == decoder.AUTO { + dec = p.suggestedDecoder + } else { + dec = p.decoder + } + if dec == decoder.NO { + dec = decoder.JSON + } + + switch dec { + case decoder.JSON: err := event.parseJSON(bytes) if err != nil { - p.logger.Fatalf("wrong json offset=%d, length=%d, err=%s, source=%d:%s, json=%s", offset, length, err.Error(), sourceID, sourceName, bytes) + p.logger.Fatalf("wrong json format offset=%d, length=%d, err=%s, source=%d:%s, json=%s", offset, length, err.Error(), sourceID, sourceName, bytes) return 0 } - case decoderRAW: + case decoder.RAW: + _ = event.Root.DecodeString("{}") + event.Root.AddFieldNoAlloc(event.Root, "message").MutateToBytesCopy(event.Root, bytes[:len(bytes)-1]) + case decoder.CRI: _ = event.Root.DecodeString("{}") - event.Root.AddFieldNoAlloc(event.Root, "message").MutateToBytesCopy(event.Root, bytes) + err := decoder.DecodeCRI(event.Root, bytes) + if err != nil { + p.logger.Fatalf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset, length, err.Error(), sourceID, sourceName, bytes) + return 0 + } default: p.logger.Panicf("unknown decoder %d for pipeline %q", p.decoder, p.Name) } @@ -437,6 +458,10 @@ func (p *Pipeline) DisableStreams() { p.useStreams = false } +func (p *Pipeline) SuggestDecoder(t decoder.DecoderType) { + p.suggestedDecoder = t +} + func (p *Pipeline) DisableParallelism() { p.singleProc = true } diff --git a/pipeline/plugin.go b/pipeline/plugin.go index 1ed3896ce..e1f719eca 100644 --- a/pipeline/plugin.go +++ b/pipeline/plugin.go @@ -56,9 +56,10 @@ type InputPluginParams struct { } type PluginStaticInfo struct { - Type string - Factory PluginFactory - Config AnyConfig + Type string + Factory PluginFactory + Config AnyConfig + AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config } type PluginRuntimeInfo struct { diff --git a/plugin/README.md b/plugin/README.md index 97e206034..d5f0eda66 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -4,11 +4,11 @@ ## dmesg It reads kernel events from /dev/kmsg -[More details...](plugin/input/dmesg/README.md) +[More details...](/plugin/input/dmesg/README.md) ## fake It provides an API to test pipelines and other plugins. -[More details...](plugin/input/fake/README.md) +[More details...](/plugin/input/fake/README.md) ## file It watches for files in the provided directory and reads them line by line. @@ -39,7 +39,7 @@ pipelines: persistence_mode: async ``` -[More details...](plugin/input/file/README.md) +[More details...](/plugin/input/file/README.md) ## http Reads events from HTTP requests with the body delimited by a new line. @@ -51,22 +51,49 @@ So you can use Elasticsearch filebeat output plugin to send data to `file.d`. > Plugin answers with HTTP code `OK 200` right after it has read all the request body. > It doesn't wait until events are committed. -[More details...](plugin/input/http/README.md) +[More details...](/plugin/input/http/README.md) +## k8s +It reads Kubernetes logs and also adds pod meta-information. Also, it joins split logs into a single event. + +Source log file should be named in the following format:
`[pod-name]_[namespace]_[container-name]-[container-id].log` + +E.g. `my_pod-1566485760-trtrq_my-namespace_my-container-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0.log` + +An information which plugin adds: +* `k8s_node` – node name where pod is running; +* `k8s_pod` – pod name; +* `k8s_namespace` – pod namespace name; +* `k8s_container` – pod container name; +* `k8s_label_*` – pod labels. + +**Example:** +```yaml +pipelines: + example_k8s_pipeline: + input: + type: k8s + offsets_file: /data/offsets.yaml + file_config: // customize file plugin + persistence_mode: sync + read_buffer_size: 2048 +``` + +[More details...](/plugin/input/k8s/README.md) ## kafka It reads events from multiple Kafka topics using `sarama` library. > It guarantees at "at-least-once delivery" due to the commitment mechanism. -[More details...](plugin/input/kafka/README.md) +[More details...](/plugin/input/kafka/README.md) # Actions ## convert_date It converts field date/time data to different format. -[More details...](plugin/action/convert_date/README.md) +[More details...](/plugin/action/convert_date/README.md) ## debug It logs event to stdout. Useful for debugging. -[More details...](plugin/action/debug/README.md) +[More details...](/plugin/action/debug/README.md) ## discard It drops an event. It is used in a combination with `match_fields`/`match_mode` parameters to filter out the events. @@ -82,7 +109,7 @@ pipelines: ... ``` -[More details...](plugin/action/discard/README.md) +[More details...](/plugin/action/discard/README.md) ## flatten It extracts the object keys and adds them into the root with some prefix. If the provided field isn't an object, an event will be skipped. @@ -99,7 +126,7 @@ pipelines: ``` It transforms `{"animal":{"type":"cat","paws":4}}` into `{"pet_type":"b","pet_paws":"4"}`. -[More details...](plugin/action/flatten/README.md) +[More details...](/plugin/action/flatten/README.md) ## join It makes one big event from the sequence of the events. It is useful for assembling back together "exceptions" or "panics" if they were written line by line. @@ -123,32 +150,16 @@ pipelines: ... ``` -[More details...](plugin/action/join/README.md) +[More details...](/plugin/action/join/README.md) ## json_decode It decodes a JSON string from the event field and merges the result with the event root. If the decoded JSON isn't an object, the event will be skipped. -[More details...](plugin/action/json_decode/README.md) -## k8s -It adds the Kubernetes meta-information into the events collected from docker log files. Also, it joins split docker logs into a single event. - -Source docker log file should be named in the following format:
`[pod-name]_[namespace]_[container-name]-[container-id].log` - -E.g. `my_pod-1566485760-trtrq_my-namespace_my-container-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0.log` - -An information which plugin adds: -* `k8s_node` – node name where pod is running; -* `k8s_pod` – pod name; -* `k8s_namespace` – pod namespace name; -* `k8s_container` – pod container name; -* `k8s_label_*` – pod labels. - - -[More details...](plugin/action/k8s/README.md) +[More details...](/plugin/action/json_decode/README.md) ## keep_fields It keeps the list of the event fields and removes others. -[More details...](plugin/action/keep_fields/README.md) +[More details...](/plugin/action/keep_fields/README.md) ## modify It modifies the content for a field. It works only with strings. You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`cfg.Substitution`. @@ -177,16 +188,16 @@ The resulting event could look like: } ``` -[More details...](plugin/action/modify/README.md) +[More details...](/plugin/action/modify/README.md) ## parse_es It parses HTTP input using Elasticsearch `/_bulk` API format. It converts sources defining create/index actions to the events. Update/delete actions are ignored. > Check out the details in [Elastic Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html). -[More details...](plugin/action/parse_es/README.md) +[More details...](/plugin/action/parse_es/README.md) ## remove_fields It removes the list of the event fields and keeps others. -[More details...](plugin/action/remove_fields/README.md) +[More details...](/plugin/action/remove_fields/README.md) ## rename It renames the fields of the event. You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`string`. When `override` is set to `false`, the field won't be renamed in the case of field name collision. @@ -214,22 +225,22 @@ The resulting event could look like: }, ``` -[More details...](plugin/action/rename/README.md) +[More details...](/plugin/action/rename/README.md) ## throttle It discards the events if pipeline throughput gets higher than a configured threshold. -[More details...](plugin/action/throttle/README.md) +[More details...](/plugin/action/throttle/README.md) # Outputs ## devnull It provides an API to test pipelines and other plugins. -[More details...](plugin/output/devnull/README.md) +[More details...](/plugin/output/devnull/README.md) ## elasticsearch It sends events into Elasticsearch. It uses `_bulk` API to send events in batches. If a network error occurs, the batch will infinitely try to be delivered to the random endpoint. -[More details...](plugin/output/elasticsearch/README.md) +[More details...](/plugin/output/elasticsearch/README.md) ## gelf It sends event batches to the GELF endpoint. Transport level protocol TCP or UDP is configurable. > It doesn't support UDP chunking. So don't use UDP if event size may be greater than 8192. @@ -248,15 +259,15 @@ GELF messages are separated by null byte. Each message is a JSON with the follow Every field with an underscore prefix `_` will be treated as an extra field. Allowed characters in field names are letters, numbers, underscores, dashes, and dots. -[More details...](plugin/output/gelf/README.md) +[More details...](/plugin/output/gelf/README.md) ## kafka It sends the event batches to kafka brokers using `sarama` lib. -[More details...](plugin/output/kafka/README.md) +[More details...](/plugin/output/kafka/README.md) ## stdout It writes events to stdout(also known as console). -[More details...](plugin/output/stdout/README.md) +[More details...](/plugin/output/stdout/README.md)
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/action/README.md b/plugin/action/README.md index 7b46fc3dc..fb2bbb41d 100755 --- a/plugin/action/README.md +++ b/plugin/action/README.md @@ -3,11 +3,11 @@ ## convert_date It converts field date/time data to different format. -[More details...](plugin/action/convert_date/README.md) +[More details...](/plugin/action/convert_date/README.md) ## debug It logs event to stdout. Useful for debugging. -[More details...](plugin/action/debug/README.md) +[More details...](/plugin/action/debug/README.md) ## discard It drops an event. It is used in a combination with `match_fields`/`match_mode` parameters to filter out the events. @@ -23,7 +23,7 @@ pipelines: ... ``` -[More details...](plugin/action/discard/README.md) +[More details...](/plugin/action/discard/README.md) ## flatten It extracts the object keys and adds them into the root with some prefix. If the provided field isn't an object, an event will be skipped. @@ -40,7 +40,7 @@ pipelines: ``` It transforms `{"animal":{"type":"cat","paws":4}}` into `{"pet_type":"b","pet_paws":"4"}`. -[More details...](plugin/action/flatten/README.md) +[More details...](/plugin/action/flatten/README.md) ## join It makes one big event from the sequence of the events. It is useful for assembling back together "exceptions" or "panics" if they were written line by line. @@ -64,32 +64,16 @@ pipelines: ... ``` -[More details...](plugin/action/join/README.md) +[More details...](/plugin/action/join/README.md) ## json_decode It decodes a JSON string from the event field and merges the result with the event root. If the decoded JSON isn't an object, the event will be skipped. -[More details...](plugin/action/json_decode/README.md) -## k8s -It adds the Kubernetes meta-information into the events collected from docker log files. Also, it joins split docker logs into a single event. - -Source docker log file should be named in the following format:
`[pod-name]_[namespace]_[container-name]-[container-id].log` - -E.g. `my_pod-1566485760-trtrq_my-namespace_my-container-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0.log` - -An information which plugin adds: -* `k8s_node` – node name where pod is running; -* `k8s_pod` – pod name; -* `k8s_namespace` – pod namespace name; -* `k8s_container` – pod container name; -* `k8s_label_*` – pod labels. - - -[More details...](plugin/action/k8s/README.md) +[More details...](/plugin/action/json_decode/README.md) ## keep_fields It keeps the list of the event fields and removes others. -[More details...](plugin/action/keep_fields/README.md) +[More details...](/plugin/action/keep_fields/README.md) ## modify It modifies the content for a field. It works only with strings. You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`cfg.Substitution`. @@ -118,16 +102,16 @@ The resulting event could look like: } ``` -[More details...](plugin/action/modify/README.md) +[More details...](/plugin/action/modify/README.md) ## parse_es It parses HTTP input using Elasticsearch `/_bulk` API format. It converts sources defining create/index actions to the events. Update/delete actions are ignored. > Check out the details in [Elastic Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html). -[More details...](plugin/action/parse_es/README.md) +[More details...](/plugin/action/parse_es/README.md) ## remove_fields It removes the list of the event fields and keeps others. -[More details...](plugin/action/remove_fields/README.md) +[More details...](/plugin/action/remove_fields/README.md) ## rename It renames the fields of the event. You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`string`. When `override` is set to `false`, the field won't be renamed in the case of field name collision. @@ -155,9 +139,9 @@ The resulting event could look like: }, ``` -[More details...](plugin/action/rename/README.md) +[More details...](/plugin/action/rename/README.md) ## throttle It discards the events if pipeline throughput gets higher than a configured threshold. -[More details...](plugin/action/throttle/README.md) +[More details...](/plugin/action/throttle/README.md)
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/action/k8s/README.md b/plugin/action/k8s/README.md deleted file mode 100755 index d5928cbe6..000000000 --- a/plugin/action/k8s/README.md +++ /dev/null @@ -1,37 +0,0 @@ -# Kubernetes plugin -It adds the Kubernetes meta-information into the events collected from docker log files. Also, it joins split docker logs into a single event. - -Source docker log file should be named in the following format:
`[pod-name]_[namespace]_[container-name]-[container-id].log` - -E.g. `my_pod-1566485760-trtrq_my-namespace_my-container-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0.log` - -An information which plugin adds: -* `k8s_node` – node name where pod is running; -* `k8s_pod` – pod name; -* `k8s_namespace` – pod namespace name; -* `k8s_container` – pod container name; -* `k8s_label_*` – pod labels. - - -### Config params -**`max_event_size`** *`int`* *`default=1000000`* - -Docker splits long logs by 16kb chunks. The plugin joins them back, but if an event is longer than this value in bytes, it will be split after all. -> Due to the optimization process it's not a strict rule. Events may be split even if they won't exceed the limit. - -
- -**`labels_whitelist`** *`[]string`* - -If set, it defines which pod labels to add to the event, others will be ignored. - -
- -**`only_node`** *`bool`* *`default=false`* - -Skips retrieving k8s meta information using Kubernetes API and adds only `k8s_node` field. - -
- - -
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/input/README.md b/plugin/input/README.md index c30eb1318..1f88d3a0d 100755 --- a/plugin/input/README.md +++ b/plugin/input/README.md @@ -3,11 +3,11 @@ ## dmesg It reads kernel events from /dev/kmsg -[More details...](plugin/input/dmesg/README.md) +[More details...](/plugin/input/dmesg/README.md) ## fake It provides an API to test pipelines and other plugins. -[More details...](plugin/input/fake/README.md) +[More details...](/plugin/input/fake/README.md) ## file It watches for files in the provided directory and reads them line by line. @@ -38,7 +38,7 @@ pipelines: persistence_mode: async ``` -[More details...](plugin/input/file/README.md) +[More details...](/plugin/input/file/README.md) ## http Reads events from HTTP requests with the body delimited by a new line. @@ -50,10 +50,37 @@ So you can use Elasticsearch filebeat output plugin to send data to `file.d`. > Plugin answers with HTTP code `OK 200` right after it has read all the request body. > It doesn't wait until events are committed. -[More details...](plugin/input/http/README.md) +[More details...](/plugin/input/http/README.md) +## k8s +It reads Kubernetes logs and also adds pod meta-information. Also, it joins split logs into a single event. + +Source log file should be named in the following format:
`[pod-name]_[namespace]_[container-name]-[container-id].log` + +E.g. `my_pod-1566485760-trtrq_my-namespace_my-container-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0.log` + +An information which plugin adds: +* `k8s_node` – node name where pod is running; +* `k8s_pod` – pod name; +* `k8s_namespace` – pod namespace name; +* `k8s_container` – pod container name; +* `k8s_label_*` – pod labels. + +**Example:** +```yaml +pipelines: + example_k8s_pipeline: + input: + type: k8s + offsets_file: /data/offsets.yaml + file_config: // customize file plugin + persistence_mode: sync + read_buffer_size: 2048 +``` + +[More details...](/plugin/input/k8s/README.md) ## kafka It reads events from multiple Kafka topics using `sarama` library. > It guarantees at "at-least-once delivery" due to the commitment mechanism. -[More details...](plugin/input/kafka/README.md) +[More details...](/plugin/input/kafka/README.md)
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go index a4193907e..c4a68d5d9 100644 --- a/plugin/input/file/worker.go +++ b/plugin/input/file/worker.go @@ -88,10 +88,10 @@ func (w *worker) work(controller pipeline.InputPluginController, jobProvider *jo } else { offset := lastOffset + accumulated + pos + 1 if len(accumBuffer) != 0 { - accumBuffer = append(accumBuffer, readBuffer[processed:pos]...) + accumBuffer = append(accumBuffer, readBuffer[processed:pos+1]...) seqID = controller.In(sourceID, sourceName, offset, accumBuffer, isVirgin) } else { - seqID = controller.In(sourceID, sourceName, offset, readBuffer[processed:pos], isVirgin) + seqID = controller.In(sourceID, sourceName, offset, readBuffer[processed:pos+1], isVirgin) } job.lastEventSeq = seqID } diff --git a/plugin/action/k8s/README.idoc.md b/plugin/input/k8s/README.idoc.md similarity index 100% rename from plugin/action/k8s/README.idoc.md rename to plugin/input/k8s/README.idoc.md diff --git a/plugin/input/k8s/README.md b/plugin/input/k8s/README.md new file mode 100755 index 000000000..1ea3e2519 --- /dev/null +++ b/plugin/input/k8s/README.md @@ -0,0 +1,66 @@ +# Kubernetes plugin +It reads Kubernetes logs and also adds pod meta-information. Also, it joins split logs into a single event. + +Source log file should be named in the following format:
`[pod-name]_[namespace]_[container-name]-[container-id].log` + +E.g. `my_pod-1566485760-trtrq_my-namespace_my-container-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0.log` + +An information which plugin adds: +* `k8s_node` – node name where pod is running; +* `k8s_pod` – pod name; +* `k8s_namespace` – pod namespace name; +* `k8s_container` – pod container name; +* `k8s_label_*` – pod labels. + +**Example:** +```yaml +pipelines: + example_k8s_pipeline: + input: + type: k8s + offsets_file: /data/offsets.yaml + file_config: // customize file plugin + persistence_mode: sync + read_buffer_size: 2048 +``` + +### Config params +**`max_event_size`** *`int`* *`default=1000000`* + +Docker splits long logs by 16kb chunks. The plugin joins them back, but if an event is longer than this value in bytes, it will be split after all. +> Due to the optimization process it's not a strict rule. Events may be split even if they won't exceed the limit. + +
+ +**`labels_whitelist`** *`[]string`* + +If set, it defines which pod labels to add to the event, others will be ignored. + +
+ +**`only_node`** *`bool`* *`default=false`* + +Skips retrieving Kubernetes meta information using Kubernetes API and adds only `k8s_node` field. + +
+ +**`watching_dir`** *`string`* *`default=/var/log/containers`* + +Kubernetes dir with container logs. It's like `watching_dir` parameter from [file plugin](/plugin/input/file/README.md) config. + +
+ +**`offsets_file`** *`string`* *`required`* + +The filename to store offsets of processed files. It's like `offsets_file` parameter from [file plugin](/plugin/input/file/README.md) config. + +
+ +**`file_config`** *`file.Config`* + +Under the hood this plugin uses [file plugin](/plugin/input/file/README.md) to collect logs from files. So you can change any [file plugin](/plugin/input/file/README.md) config parameter using `file_config` section. Check out an example. + +
+ + +
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/action/k8s/gatherer.go b/plugin/input/k8s/gatherer.go similarity index 70% rename from plugin/action/k8s/gatherer.go rename to plugin/input/k8s/gatherer.go index 19b5727ae..68078a20b 100644 --- a/plugin/action/k8s/gatherer.go +++ b/plugin/input/k8s/gatherer.go @@ -71,15 +71,17 @@ var ( DisableMetaUpdates = false metaAddedCounter atomic.Int64 expiredItemsCounter atomic.Int64 + + criType = "docker" - node string + selfNodeName string - logger *zap.SugaredLogger + localLogger *zap.SugaredLogger ) func enableGatherer(l *zap.SugaredLogger) { - logger = l - logger.Info("enabling k8s meta gatherer") + localLogger = l + localLogger.Info("enabling k8s meta gatherer") if !DisableMetaUpdates { initGatherer() @@ -91,7 +93,7 @@ func enableGatherer(l *zap.SugaredLogger) { } func disableGatherer() { - logger.Info("disabling k8s meta gatherer") + localLogger.Info("disabling k8s meta gatherer") if !DisableMetaUpdates { informerStop <- struct{}{} } @@ -105,38 +107,39 @@ func initGatherer() { kubeConfig := filepath.Join(os.Getenv("HOME"), ".kube", "config") apiConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfig) if err != nil { - logger.Fatalf("can't get k8s client config: %s", err.Error()) + localLogger.Fatalf("can't get k8s client config: %s", err.Error()) } } client, err = kubernetes.NewForConfig(apiConfig) if err != nil { - logger.Fatalf("can't create k8s client: %s", err.Error()) + localLogger.Fatalf("can't create k8s client: %s", err.Error()) panic("") } initNodeInfo() initInformer() + initRuntime() } func initNodeInfo() { podName, err := os.Hostname() if err != nil { - logger.Fatalf("can't get host name for k8s plugin: %s", err.Error()) + localLogger.Fatalf("can't get host name for k8s plugin: %s", err.Error()) panic("") } pod, err := client.CoreV1().Pods(getNamespace()).Get(podName, metav1.GetOptions{}) if err != nil { - logger.Fatalf("can't detect node name for k8s plugin using pod %q: %s", podName, err.Error()) + localLogger.Fatalf("can't detect node name for k8s plugin using pod %q: %s", podName, err.Error()) panic("") } - node = pod.Spec.NodeName + selfNodeName = pod.Spec.NodeName } func initInformer() { - selector, err := fields.ParseSelector("spec.nodeName=" + node) + selector, err := fields.ParseSelector("spec.nodeName=" + selfNodeName) if err != nil { - logger.Fatalf("can't create k8s field selector: %s", err.Error()) + localLogger.Fatalf("can't create k8s field selector: %s", err.Error()) } podListWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "pods", "", selector) _, c := cache.NewIndexerInformer(podListWatcher, &corev1.Pod{}, metaExpireDuration/4, cache.ResourceEventHandlerFuncs{ @@ -152,12 +155,28 @@ func initInformer() { controller = c } +func initRuntime() { + node, err := client.CoreV1().Nodes().Get(selfNodeName, metav1.GetOptions{}) + if err != nil || node == nil { + localLogger.Fatalf("can't detect CRI runtime for node %s, api call is unsuccessful: %s", node, err.Error()) + panic("_") + } + runtimeVer := node.Status.NodeInfo.ContainerRuntimeVersion + pos := strings.IndexByte(runtimeVer, ':') + if pos < 0 { + localLogger.Fatalf("can't detect CRI runtime for node %s, wrong runtime version: %s", node, runtimeVer) + } + + criType = runtimeVer[:pos] +} + + func removeExpired() { expiredItems = getExpiredItems(expiredItems) cleanUpItems(expiredItems) if MaintenanceInterval > time.Second { - logger.Infof("k8s meta stat for last %d seconds: total=%d, updated=%d, expired=%d", MaintenanceInterval/time.Second, getTotalItems(), metaAddedCounter.Load(), expiredItemsCounter.Load()) + localLogger.Infof("k8s meta stat for last %d seconds: total=%d, updated=%d, expired=%d", MaintenanceInterval/time.Second, getTotalItems(), metaAddedCounter.Load(), expiredItemsCounter.Load()) } metaAddedCounter.Swap(0) @@ -237,7 +256,7 @@ func cleanUpItems(items []*metaItem) { func getMeta(fullFilename string) (ns namespace, pod podName, container containerName, cid containerID, success bool, podMeta *podMeta) { podMeta = nil success = false - ns, pod, container, cid = parseDockerFilename(fullFilename) + ns, pod, container, cid = parseLogFilename(fullFilename) i := time.Nanosecond for { @@ -248,7 +267,7 @@ func getMeta(fullFilename string) (ns namespace, pod podName, container containe if has { if i-metaWaitWarn >= 0 { - logger.Warnf("meta retrieved with delay time=%dms pod=%s container=%s", i/time.Millisecond, string(pod), string(cid)) + localLogger.Warnf("meta retrieved with delay time=%dms pod=%s container=%s", i/time.Millisecond, string(pod), string(cid)) } success = true @@ -271,7 +290,7 @@ func getMeta(fullFilename string) (ns namespace, pod podName, container containe } podBlackList[pod] = true metaDataMu.Unlock() - logger.Errorf("pod %q have blacklisted, cause k8s meta retrieve timeout ns=%s container=%s cid=%s", string(pod), string(ns), string(container), string(cid)) + localLogger.Errorf("pod %q have blacklisted, cause k8s meta retrieve timeout ns=%s container=%s cid=%s", string(pod), string(ns), string(container), string(cid)) return } @@ -319,18 +338,29 @@ func putMeta(podData *corev1.Pod) { metaAddedCounter.Inc() } +// putContainerMeta fullContainerID must be in format XXX://ID, eg docker://4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0 func putContainerMeta(ns namespace, pod podName, fullContainerID string, podInfo *corev1.Pod) { - if len(fullContainerID) == 0 { + l := len(fullContainerID) + if l == 0 { return } - if len(fullContainerID) < 9 || fullContainerID[:9] != "docker://" { - logger.Fatalf("wrong container id: %s", fullContainerID) + pos := strings.IndexByte(fullContainerID, ':') + if pos <= 0 { + localLogger.Fatalf("container id should have format XXXX://ID: %s", fullContainerID) } - - containerID := containerID(fullContainerID[9:]) + + if pos + 3 >= l { + localLogger.Fatalf("container id should have format XXXX://ID: %s", fullContainerID) + } + + if fullContainerID[pos:pos+3] != "://" { + localLogger.Fatalf("container id should have format XXXX://ID: %s", fullContainerID) + } + + containerID := containerID(fullContainerID[pos+3:]) if len(containerID) != 64 { - logger.Fatalf("wrong container id: %s", fullContainerID) + localLogger.Fatalf("wrong container id: %s", fullContainerID) } meta := &podMeta{ @@ -343,26 +373,26 @@ func putContainerMeta(ns namespace, pod podName, fullContainerID string, podInfo metaDataMu.Unlock() } -func parseDockerFilename(fullFilename string) (namespace, podName, containerName, containerID) { +func parseLogFilename(fullFilename string) (namespace, podName, containerName, containerID) { if fullFilename[len(fullFilename)-4:] != ".log" { - logger.Infof(formatInfo) - logger.Fatalf("wrong docker log file name, no .log at ending %s", fullFilename) + localLogger.Infof(formatInfo) + localLogger.Fatalf("wrong log file name, no .log at ending %s", fullFilename) } lastSlash := strings.LastIndexByte(fullFilename, '/') if lastSlash < 0 { - logger.Infof(formatInfo) - logger.Fatalf("wrong docker log file name %s, no slashes", fullFilename) + localLogger.Infof(formatInfo) + localLogger.Fatalf("wrong log file name %s, no slashes", fullFilename) } filename := fullFilename[lastSlash+1 : len(fullFilename)-4] if filename == "" { - logger.Infof(formatInfo) - logger.Fatalf("wrong docker log file name, empty", filename) + localLogger.Infof(formatInfo) + localLogger.Fatalf("wrong log file name, empty", filename) } underscore := strings.IndexByte(filename, '_') if underscore < 0 { - logger.Infof(formatInfo) - logger.Fatalf("wrong docker log file name, no underscore for pod: %s", filename) + localLogger.Infof(formatInfo) + localLogger.Fatalf("wrong log file name, no underscore for pod: %s", filename) } pod := filename[:underscore] @@ -370,15 +400,15 @@ func parseDockerFilename(fullFilename string) (namespace, podName, containerName underscore = strings.IndexByte(filename, '_') if underscore < 0 { - logger.Infof(formatInfo) - logger.Fatalf("wrong docker log file name, no underscore for ns: %s", filename) + localLogger.Infof(formatInfo) + localLogger.Fatalf("wrong log file name, no underscore for ns: %s", filename) } ns := filename[:underscore] filename = filename[underscore+1:] if len(filename) < 65 { - logger.Infof(formatInfo) - logger.Fatalf("wrong docker log file name, not enough chars: %s", filename) + localLogger.Infof(formatInfo) + localLogger.Fatalf("wrong log file name, not enough chars: %s", filename) } container := filename[:len(filename)-65] diff --git a/plugin/input/k8s/k8s.go b/plugin/input/k8s/k8s.go new file mode 100644 index 000000000..3ffc38253 --- /dev/null +++ b/plugin/input/k8s/k8s.go @@ -0,0 +1,140 @@ +package k8s + +import ( + "github.com/ozonru/file.d/decoder" + "github.com/ozonru/file.d/fd" + "github.com/ozonru/file.d/pipeline" + "github.com/ozonru/file.d/plugin/input/file" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +/*{ introduction +It reads Kubernetes logs and also adds pod meta-information. Also, it joins split logs into a single event. + +Source log file should be named in the following format:
`[pod-name]_[namespace]_[container-name]-[container-id].log` + +E.g. `my_pod-1566485760-trtrq_my-namespace_my-container-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0.log` + +An information which plugin adds: +* `k8s_node` – node name where pod is running; +* `k8s_pod` – pod name; +* `k8s_namespace` – pod namespace name; +* `k8s_container` – pod container name; +* `k8s_label_*` – pod labels. + +**Example:** +```yaml +pipelines: + example_k8s_pipeline: + input: + type: k8s + offsets_file: /data/offsets.yaml + file_config: // customize file plugin + persistence_mode: sync + read_buffer_size: 2048 +``` +}*/ + +type Plugin struct { + config *Config + logger *zap.SugaredLogger + params *pipeline.InputPluginParams + logBuff []byte + logSize int + + fp *file.Plugin +} + +type Config struct { + //! config-params + //^ config-params + + //> @3@4@5@6 + //> + //> Docker splits long logs by 16kb chunks. The plugin joins them back, but if an event is longer than this value in bytes, it will be split after all. + //> > Due to the optimization process it's not a strict rule. Events may be split even if they won't exceed the limit. + MaxEventSize int `json:"max_event_size" default:"1000000"` //* + + //> @3@4@5@6 + //> + //> If set, it defines which pod labels to add to the event, others will be ignored. + LabelsWhitelist []string `json:"labels_whitelist"` //* + LabelsWhitelist_ map[string]bool + + //> @3@4@5@6 + //> + //> Skips retrieving Kubernetes meta information using Kubernetes API and adds only `k8s_node` field. + OnlyNode bool `json:"only_node" default:"false"` //* + + //> @3@4@5@6 + //> + //> Kubernetes dir with container logs. It's like `watching_dir` parameter from [file plugin](/plugin/input/file/README.md) config. + WatchingDir string `json:"watching_dir" default:"/var/log/containers"` //* + WatchingDir_ string + + //> @3@4@5@6 + //> + //> The filename to store offsets of processed files. It's like `offsets_file` parameter from [file plugin](/plugin/input/file/README.md) config. + OffsetsFile string `json:"offsets_file" required:"true"` //* + + //> @3@4@5@6 + //> + //> Under the hood this plugin uses [file plugin](/plugin/input/file/README.md) to collect logs from files. So you can change any [file plugin](/plugin/input/file/README.md) config parameter using `file_config` section. Check out an example. + FileConfig file.Config `json:"file_config" child:"true"` //* +} + +var ( + startCounter atomic.Int32 +) + +func init() { + fd.DefaultPluginRegistry.RegisterInput(&pipeline.PluginStaticInfo{ + Type: "k8s", + Factory: Factory, + AdditionalActions: []string{"k8s-multiline"}, + }) + fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{ + Type: "k8s-multiline", + Factory: MultilineActionFactory, + }) +} + +func MultilineActionFactory() (pipeline.AnyPlugin, pipeline.AnyConfig) { + return &MultilineAction{}, &Config{} +} + +func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { + return &Plugin{}, &Config{} +} + +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginParams) { + p.logger = params.Logger + p.params = params + p.config = config.(*Config) + + startCounter := startCounter.Inc() + + if startCounter == 1 { + enableGatherer(p.logger) + } + + if criType == "docker" { + p.params.Controller.SuggestDecoder(decoder.JSON) + } else { + p.params.Controller.SuggestDecoder(decoder.CRI) + } + + p.fp = &file.Plugin{} + + + p.fp.Start(&p.config.FileConfig, params) +} + +func (p *Plugin) Commit(event *pipeline.Event) { + p.fp.Commit(event) +} + +func (p *Plugin) Stop() { + p.fp.Stop() +} diff --git a/plugin/action/k8s/k8s_test.go b/plugin/input/k8s/k8s_test.go similarity index 88% rename from plugin/action/k8s/k8s_test.go rename to plugin/input/k8s/k8s_test.go index f83dc42f1..a691fa2a7 100644 --- a/plugin/action/k8s/k8s_test.go +++ b/plugin/input/k8s/k8s_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/ozonru/file.d/cfg" + "github.com/ozonru/file.d/logger" "github.com/ozonru/file.d/pipeline" "github.com/ozonru/file.d/test" "github.com/stretchr/testify/assert" @@ -41,7 +42,7 @@ func getPodInfo(item *metaItem, isWhite bool) *corev1.Pod { podInfo.Name = string(item.podName) podInfo.Status.ContainerStatuses = make([]corev1.ContainerStatus, 1) podInfo.Status.ContainerStatuses[0].Name = string(item.containerName) - podInfo.Status.ContainerStatuses[0].ContainerID = "docker://" + string(item.containerID) + podInfo.Status.ContainerStatuses[0].ContainerID = "containerd://" + string(item.containerID) podInfo.Spec.NodeName = string(item.nodeName) if isWhite { podInfo.Labels = map[string]string{"white_label": "white_value"} @@ -55,13 +56,13 @@ func config() *Config { config := &Config{LabelsWhitelist: []string{"white_label"}} err := cfg.Parse(config, nil) if err != nil { - logger.Panic(err.Error()) + localLogger.Panic(err.Error()) } return config } func TestEnrichment(t *testing.T) { - p, input, _ := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config(), pipeline.MatchModeAnd, nil)) + p, input, _ := test.NewPipelineMock(test.NewActionPluginStaticInfo(MultilineActionFactory, config(), pipeline.MatchModeAnd, nil)) wg := &sync.WaitGroup{} wg.Add(1) @@ -73,10 +74,10 @@ func TestEnrichment(t *testing.T) { } podInfo := getPodInfo(item, true) putMeta(podInfo) - node = "node_1" + selfNodeName = "node_1" var event *pipeline.Event = nil - filename := getLogFilename("/docker-logs", item) + filename := getLogFilename("/k8s-logs", item) input.SetCommitFn(func(e *pipeline.Event) { event = e wg.Done() @@ -94,7 +95,7 @@ func TestEnrichment(t *testing.T) { } func TestWhitelist(t *testing.T) { - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config(), pipeline.MatchModeAnd, nil)) + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(MultilineActionFactory, config(), pipeline.MatchModeAnd, nil)) wg := &sync.WaitGroup{} wg.Add(2) @@ -106,7 +107,7 @@ func TestWhitelist(t *testing.T) { containerID: "4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0", } putMeta(getPodInfo(item, true)) - filename1 := getLogFilename("/docker-logs", item) + filename1 := getLogFilename("/k8s-logs", item) item = &metaItem{ nodeName: "node_1", @@ -116,7 +117,7 @@ func TestWhitelist(t *testing.T) { containerID: "4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0", } putMeta(getPodInfo(item, false)) - filename2 := getLogFilename("/docker-logs", item) + filename2 := getLogFilename("/k8s-logs", item) outEvents := make([]*pipeline.Event, 0, 0) output.SetOutFn(func(e *pipeline.Event) { @@ -135,7 +136,7 @@ func TestWhitelist(t *testing.T) { } func TestJoin(t *testing.T) { - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config(), pipeline.MatchModeAnd, nil)) + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(MultilineActionFactory, config(), pipeline.MatchModeAnd, nil)) wg := &sync.WaitGroup{} wg.Add(4) @@ -156,7 +157,7 @@ func TestJoin(t *testing.T) { wg.Done() }) - filename := getLogFilename("/docker-logs", item) + filename := getLogFilename("/k8s-logs", item) input.In(0, filename, 10, []byte(`{"ts":"time","stream":"stdout","log":"one line log 1\n"}`)) input.In(0, filename, 20, []byte(`{"ts":"time","stream":"stderr","log":"error "}`)) input.In(0, filename, 30, []byte(`{"ts":"time","stream":"stdout","log":"this "}`)) @@ -203,8 +204,11 @@ func TestJoin(t *testing.T) { } func TestCleanUp(t *testing.T) { - p, _, _ := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config(), pipeline.MatchModeAnd, nil)) + p, _, _ := test.NewPipelineMock(test.NewActionPluginStaticInfo(MultilineActionFactory, config(), pipeline.MatchModeAnd, nil)) + enableGatherer(logger.Instance) + defer disableGatherer() + putMeta(getPodInfo(&metaItem{ nodeName: "node_1", namespace: "sre", @@ -228,13 +232,13 @@ func TestCleanUp(t *testing.T) { }, true)) time.Sleep(metaExpireDuration + MaintenanceInterval) - + p.Stop() assert.Equal(t, 0, len(metaData)) } -func TestParseDockerFilename(t *testing.T) { - ns, pod, container, cid := parseDockerFilename("/docker-logs/advanced-logs-checker-1566485760-trtrq_sre_duty-bot-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0.log") +func TestParseLogFilename(t *testing.T) { + ns, pod, container, cid := parseLogFilename("/k8s-logs/advanced-logs-checker-1566485760-trtrq_sre_duty-bot-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0.log") assert.Equal(t, namespace("sre"), ns) assert.Equal(t, podName("advanced-logs-checker-1566485760-trtrq"), pod) diff --git a/plugin/action/k8s/k8s.go b/plugin/input/k8s/multiline_action.go similarity index 57% rename from plugin/action/k8s/k8s.go rename to plugin/input/k8s/multiline_action.go index 7351f5c3a..2c461bc3e 100644 --- a/plugin/action/k8s/k8s.go +++ b/plugin/input/k8s/multiline_action.go @@ -2,96 +2,35 @@ package k8s import ( "github.com/ozonru/file.d/cfg" - "github.com/ozonru/file.d/fd" "github.com/ozonru/file.d/pipeline" - "go.uber.org/atomic" "go.uber.org/zap" ) -/*{ introduction -It adds the Kubernetes meta-information into the events collected from docker log files. Also, it joins split docker logs into a single event. - -Source docker log file should be named in the following format:
`[pod-name]_[namespace]_[container-name]-[container-id].log` - -E.g. `my_pod-1566485760-trtrq_my-namespace_my-container-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0.log` - -An information which plugin adds: -* `k8s_node` – node name where pod is running; -* `k8s_pod` – pod name; -* `k8s_namespace` – pod namespace name; -* `k8s_container` – pod container name; -* `k8s_label_*` – pod labels. - -}*/ -type Plugin struct { - config *Config - logger *zap.SugaredLogger - logBuff []byte - logSize int +type MultilineAction struct { + config *Config + logger *zap.SugaredLogger + params *pipeline.ActionPluginParams + logBuff []byte + logSize int } const ( - predictionLookahead = 128 * 1024 + predictionLookahead = 128 * 1024 ) -//! config-params -//^ config-params -type Config struct { - //> @3@4@5@6 - //> - //> Docker splits long logs by 16kb chunks. The plugin joins them back, but if an event is longer than this value in bytes, it will be split after all. - //> > Due to the optimization process it's not a strict rule. Events may be split even if they won't exceed the limit. - MaxEventSize int `json:"max_event_size" default:"1000000"` //* - - //> @3@4@5@6 - //> - //> If set, it defines which pod labels to add to the event, others will be ignored. - LabelsWhitelist []string `json:"labels_whitelist"` //* - LabelsWhitelist_ map[string]bool - - //> @3@4@5@6 - //> - //> Skips retrieving k8s meta information using Kubernetes API and adds only `k8s_node` field. - OnlyNode bool `json:"only_node" default:"false"` //* -} - -var ( - startCounter atomic.Int32 -) - -func init() { - fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{ - Type: "k8s", - Factory: factory, - }) -} - -func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { - return &Plugin{}, &Config{} -} - -func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { - p.config = config.(*Config) +func (p *MultilineAction) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { p.logger = params.Logger - - startCounter := startCounter.Inc() + p.params = params + p.config = config.(*Config) p.config.LabelsWhitelist_ = cfg.ListToMap(p.config.LabelsWhitelist) - if startCounter == 1 { - enableGatherer(p.logger) - } - p.logBuff = append(p.logBuff, '"') } -func (p *Plugin) Stop() { - startCounter := startCounter.Dec() - if startCounter == 0 { - disableGatherer() - } +func (p *MultilineAction) Stop() { } -func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { +func (p *MultilineAction) Do(event *pipeline.Event) pipeline.ActionResult { // todo: do same logic as in join plugin here to send not full logs if event.IsTimeoutKind() { p.logger.Errorf("can't read next sequential event for k8s pod stream") @@ -99,15 +38,15 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { return pipeline.ActionDiscard } - event.Root.AddFieldNoAlloc(event.Root, "k8s_node").MutateToString(node) + event.Root.AddFieldNoAlloc(event.Root, "k8s_node").MutateToString(selfNodeName) if p.config.OnlyNode { return pipeline.ActionPass } // don't need to unescape/escape log fields cause concatenation of escaped strings is escaped string logFragment := event.Root.Dig("log").AsEscapedString() if logFragment == "" { - p.logger.Fatalf("wrong docker log format, it doesn't contain log field: %s", event.Root.EncodeToString()) - panic("") + p.logger.Fatalf("wrong event format, it doesn't contain log field: %s", event.Root.EncodeToString()) + panic("_") } // docker splits long logs by 16kb chunks, so let's join them diff --git a/plugin/output/README.md b/plugin/output/README.md index 51e82dc34..7e1e92e41 100755 --- a/plugin/output/README.md +++ b/plugin/output/README.md @@ -3,12 +3,12 @@ ## devnull It provides an API to test pipelines and other plugins. -[More details...](plugin/output/devnull/README.md) +[More details...](/plugin/output/devnull/README.md) ## elasticsearch It sends events into Elasticsearch. It uses `_bulk` API to send events in batches. If a network error occurs, the batch will infinitely try to be delivered to the random endpoint. -[More details...](plugin/output/elasticsearch/README.md) +[More details...](/plugin/output/elasticsearch/README.md) ## gelf It sends event batches to the GELF endpoint. Transport level protocol TCP or UDP is configurable. > It doesn't support UDP chunking. So don't use UDP if event size may be greater than 8192. @@ -27,13 +27,13 @@ GELF messages are separated by null byte. Each message is a JSON with the follow Every field with an underscore prefix `_` will be treated as an extra field. Allowed characters in field names are letters, numbers, underscores, dashes, and dots. -[More details...](plugin/output/gelf/README.md) +[More details...](/plugin/output/gelf/README.md) ## kafka It sends the event batches to kafka brokers using `sarama` lib. -[More details...](plugin/output/kafka/README.md) +[More details...](/plugin/output/kafka/README.md) ## stdout It writes events to stdout(also known as console). -[More details...](plugin/output/stdout/README.md) +[More details...](/plugin/output/stdout/README.md)
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index fa3df1a1b..1947f4239 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -187,6 +187,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { root, err := insaneJSON.DecodeBytes(respContent) if err != nil { p.logger.Errorf("wrong response from %s, will try other endpoint: %s", endpoint, err.Error()) + insaneJSON.Release(root) continue }