diff --git a/go.mod b/go.mod index 3eb4a509c..757bd0c1c 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/bitly/go-simplejson v0.5.0 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/euank/go-kmsg-parser v2.0.0+incompatible - github.com/fsnotify/fsnotify v1.4.7 + github.com/fsnotify/fsnotify v1.4.9 github.com/ghodss/yaml v1.0.0 github.com/gogo/protobuf v1.2.1 // indirect github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc // indirect @@ -40,4 +40,4 @@ require ( k8s.io/utils v0.0.0-20190829053155-3a4a5477acf8 // indirect ) -go 1.13 +go 1.15 diff --git a/go.sum b/go.sum index 31f903e8c..297c27313 100644 --- a/go.sum +++ b/go.sum @@ -36,11 +36,14 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= +github.com/euank/go-kmsg-parser v1.0.0 h1:rtNgGgSPLxuBSrjPtDNf6oFvT90i/VraFHuX8YBH+SU= github.com/euank/go-kmsg-parser v2.0.0+incompatible h1:cHD53+PLQuuQyLZeriD1V/esuG4MuU0Pjs5y6iknohY= github.com/euank/go-kmsg-parser v2.0.0+incompatible/go.mod h1:MhmAMZ8V4CYH4ybgdRwPr2TU5ThnS43puaKEMpja1uw= github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -187,6 +190,7 @@ golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f h1:68K/z8GLUxV76xGSqwTWw2gyk/jwn79LUL43rES2g8o= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 59a7346f3..997c3ba8a 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -382,7 +382,7 @@ func (p *Pipeline) expandProcs() { for x := 0; x < int(to-from); x++ { proc := p.newProc() - p.Procs = append(p.Procs, ) + p.Procs = append(p.Procs, proc) proc.start(p.actionParams, p.logger) } diff --git a/pipeline/stream.go b/pipeline/stream.go index b52645036..c2739c831 100644 --- a/pipeline/stream.go +++ b/pipeline/stream.go @@ -208,9 +208,9 @@ func (s *stream) get() *Event { s.first = s.first.next } - event.stage = eventStageProcessor s.awaySeq = event.SeqID if event != nil { + event.stage = eventStageProcessor s.len-- } diff --git a/plugin/input/dmesg/dmesg.go b/plugin/input/dmesg/dmesg.go index 14b56efa3..cbbc7021a 100644 --- a/plugin/input/dmesg/dmesg.go +++ b/plugin/input/dmesg/dmesg.go @@ -1,3 +1,5 @@ +// +build linux + package dmesg import ( diff --git a/plugin/input/dmesg/doc.go b/plugin/input/dmesg/doc.go new file mode 100644 index 000000000..a9311c78f --- /dev/null +++ b/plugin/input/dmesg/doc.go @@ -0,0 +1 @@ +package dmesg \ No newline at end of file diff --git a/plugin/input/http/http.go b/plugin/input/http/http.go index 4ba72b97d..b3eb3fc3a 100644 --- a/plugin/input/http/http.go +++ b/plugin/input/http/http.go @@ -143,6 +143,8 @@ func (p *Plugin) serve(w http.ResponseWriter, r *http.Request) { eventBuff = p.processChunk(sourceID, readBuff[:n], eventBuff) } + + _ = r.Body.Close() p.readBuffs.Put(readBuff) p.eventBuffs.Put(eventBuff) diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index df60562fd..fa3df1a1b 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -173,8 +173,14 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { } respContent, err := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() if err != nil { - p.logger.Errorf("can't read batch response from %s, will try other endpoint: %s", endpoint, err.Error()) + p.logger.Errorf("can't read response from %s, will try other endpoint: %s", endpoint, err.Error()) + continue + } + + if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusAccepted { + p.logger.Errorf("response status from %s isn't OK, will try other endpoint: status=%d, body=%s", endpoint, resp.StatusCode, respContent) continue } diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index ecb8bb2c8..bf40adfb0 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -141,7 +141,11 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { data.messages[i] = &sarama.ProducerMessage{} } data.messages[i].Value = outBuf[start:] - data.messages[i].Topic = topic + + // copy topic from json, to temporary out buffer to avoid event reusing issues + start = len(outBuf) + outBuf = append(outBuf, topic...) + data.messages[i].Topic = pipeline.ByteToStringUnsafe(outBuf[start:]) } data.outBuf = outBuf