Skip to content

Commit

Permalink
minor plugin fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vvitkovskiy committed Nov 11, 2020
1 parent 99a4408 commit 4fd14dd
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 6 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/bitly/go-simplejson v0.5.0
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/fsnotify/fsnotify v1.4.7
github.com/fsnotify/fsnotify v1.4.9
github.com/ghodss/yaml v1.0.0
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc // indirect
Expand Down Expand Up @@ -40,4 +40,4 @@ require (
k8s.io/utils v0.0.0-20190829053155-3a4a5477acf8 // indirect
)

go 1.13
go 1.15
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/euank/go-kmsg-parser v1.0.0 h1:rtNgGgSPLxuBSrjPtDNf6oFvT90i/VraFHuX8YBH+SU=
github.com/euank/go-kmsg-parser v2.0.0+incompatible h1:cHD53+PLQuuQyLZeriD1V/esuG4MuU0Pjs5y6iknohY=
github.com/euank/go-kmsg-parser v2.0.0+incompatible/go.mod h1:MhmAMZ8V4CYH4ybgdRwPr2TU5ThnS43puaKEMpja1uw=
github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
Expand Down Expand Up @@ -187,6 +190,7 @@ golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f h1:68K/z8GLUxV76xGSqwTWw2gyk/jwn79LUL43rES2g8o=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
2 changes: 1 addition & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (p *Pipeline) expandProcs() {

for x := 0; x < int(to-from); x++ {
proc := p.newProc()
p.Procs = append(p.Procs, )
p.Procs = append(p.Procs, proc)
proc.start(p.actionParams, p.logger)
}

Expand Down
2 changes: 1 addition & 1 deletion pipeline/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ func (s *stream) get() *Event {
s.first = s.first.next
}

event.stage = eventStageProcessor
s.awaySeq = event.SeqID
if event != nil {
event.stage = eventStageProcessor
s.len--
}

Expand Down
2 changes: 2 additions & 0 deletions plugin/input/dmesg/dmesg.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build linux

package dmesg

import (
Expand Down
1 change: 1 addition & 0 deletions plugin/input/dmesg/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package dmesg
2 changes: 2 additions & 0 deletions plugin/input/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (p *Plugin) serve(w http.ResponseWriter, r *http.Request) {

eventBuff = p.processChunk(sourceID, readBuff[:n], eventBuff)
}

_ = r.Body.Close()

p.readBuffs.Put(readBuff)
p.eventBuffs.Put(eventBuff)
Expand Down
8 changes: 7 additions & 1 deletion plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,14 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}

respContent, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
p.logger.Errorf("can't read batch response from %s, will try other endpoint: %s", endpoint, err.Error())
p.logger.Errorf("can't read response from %s, will try other endpoint: %s", endpoint, err.Error())
continue
}

if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusAccepted {
p.logger.Errorf("response status from %s isn't OK, will try other endpoint: status=%d, body=%s", endpoint, resp.StatusCode, respContent)
continue
}

Expand Down
6 changes: 5 additions & 1 deletion plugin/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
data.messages[i] = &sarama.ProducerMessage{}
}
data.messages[i].Value = outBuf[start:]
data.messages[i].Topic = topic

// copy topic from json, to temporary out buffer to avoid event reusing issues
start = len(outBuf)
outBuf = append(outBuf, topic...)
data.messages[i].Topic = pipeline.ByteToStringUnsafe(outBuf[start:])
}

data.outBuf = outBuf
Expand Down

0 comments on commit 4fd14dd

Please sign in to comment.