Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into splunk_output_plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleb Zakharov committed Sep 1, 2021
2 parents 7c4430d + 00a41f5 commit a2dfa78
Show file tree
Hide file tree
Showing 11 changed files with 1,046 additions and 133 deletions.
1 change: 1 addition & 0 deletions cmd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
_ "github.com/ozonru/file.d/plugin/output/file"
_ "github.com/ozonru/file.d/plugin/output/gelf"
_ "github.com/ozonru/file.d/plugin/output/kafka"
_ "github.com/ozonru/file.d/plugin/output/s3"
_ "github.com/ozonru/file.d/plugin/output/splunk"
_ "github.com/ozonru/file.d/plugin/output/stdout"
)
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ require (
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/fsnotify/fsnotify v1.4.9
github.com/ghodss/yaml v1.0.0
github.com/go-ini/ini v1.62.0 // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc // indirect
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/minio/minio-go v6.0.14+incompatible
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.3.0
github.com/satori/go.uuid v1.2.0
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/spf13/pflag v1.0.3 // indirect
github.com/stretchr/testify v1.7.0
github.com/vitkovskii/insane-json v0.1.0
Expand All @@ -27,6 +31,7 @@ require (
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190704094625-facf06a8f4b8
k8s.io/client-go v11.0.0+incompatible
Expand Down
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-ini/ini v1.62.0 h1:7VJT/ZXjzqSrvtraFp4ONq80hTcRQth1c9ZnQ3uNQvU=
github.com/go-ini/ini v1.62.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
Expand Down Expand Up @@ -79,6 +81,8 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
github.com/googleapis/gnostic v0.3.1 h1:WeAefnSUHlBb0iJKwxFDZdbfGwkd7xRNuV+IpXMJhYk=
github.com/googleapis/gnostic v0.3.1/go.mod h1:on+2t9HRStVgn95RSsFWFz+6Q0Snyqv1awfrALZdbtU=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI=
Expand Down Expand Up @@ -107,6 +111,8 @@ github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be/go.mod h1:+SdeFBv
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.8 h1:QiWkFLKq0T7mpzwOTu6BzNDbfTE8OLrYhVKYMLF46Ok=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand All @@ -123,6 +129,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o=
github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down Expand Up @@ -162,6 +172,10 @@ github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
Expand Down Expand Up @@ -242,6 +256,7 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
Expand All @@ -262,6 +277,8 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
Expand Down
53 changes: 30 additions & 23 deletions plugin/output/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Plugin struct {
fileName string
tsFileName string

SealUpCallback func(string) ()

mu *sync.RWMutex
}

Expand All @@ -46,7 +48,7 @@ const (
)

var (
fileSealUpInterval = time.Second
FileSealUpInterval = time.Second
)

type Config struct {
Expand Down Expand Up @@ -92,13 +94,16 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.controller = params.Controller
p.logger = params.Logger
p.config = config.(*Config)

dir, file := filepath.Split(p.config.TargetFile)

if p.file == nil {
p.logger.Panic("file struct is nil!")
}
p.targetDir = dir
p.fileExtension = filepath.Ext(file)
p.fileName = file[0 : len(file)-len(p.fileExtension)]
p.tsFileName = "%s" + "-" + p.fileName

p.batcher = pipeline.NewBatcher(
params.PipelineName,
"file",
Expand All @@ -110,23 +115,20 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.config.BatchFlushTimeout_,
0,
)

p.mu = &sync.RWMutex{}
ctx, cancel := context.WithCancel(context.Background())
p.ctx = ctx
p.cancelFunc = cancel
//create target dir
if _, err := os.Stat(p.targetDir); os.IsNotExist(err) {
if err := os.MkdirAll(p.targetDir, os.ModePerm); err != nil {
p.logger.Fatalf("could not create target dir: %s, error: %s", p.targetDir, err.Error())
}

if err := os.MkdirAll(p.targetDir, os.ModePerm); err != nil {
p.logger.Fatalf("could not create target dir: %s, error: %s", p.targetDir, err.Error())
}

p.idx = p.getStartIdx()
p.createNew()
p.setNextSealUpTime()
//additional checks
if p.file == nil {
p.logger.Panic("file struct is nil!")
}

if p.nextSealUpTime.IsZero() {
p.logger.Panic("next seal up time is nil!")
}
Expand Down Expand Up @@ -169,15 +171,13 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}

func (p *Plugin) fileSealUpTicker() {
ticker := time.NewTicker(fileSealUpInterval)
defer ticker.Stop()
for {
timer := time.NewTimer(time.Until(p.nextSealUpTime))
select {
case t := <-ticker.C:
if t.After(p.nextSealUpTime) {
p.sealUp()
}
case <-timer.C:
p.sealUp()
case <-p.ctx.Done():
timer.Stop()
return
}
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func (p *Plugin) createNew() {
p.file = file
}

//sealUp manages current file: renames, closes, and creates new.
// sealUp manages current file: renames, closes, and creates new.
func (p *Plugin) sealUp() {
info, err := p.file.Stat()
if err != nil {
Expand All @@ -226,7 +226,10 @@ func (p *Plugin) sealUp() {
if info.Size() == 0 {
return
}
p.rename()

// newFileName will be like: ".var/log/log_1_01-02-2009_15:04.log
newFileName := filepath.Join(p.targetDir, fmt.Sprintf("%s%s%d%s%s%s", p.fileName, fileNameSeparator, p.idx, fileNameSeparator, time.Now().Format(p.config.Layout), p.fileExtension))
p.rename(newFileName)
oldFile := p.file
p.mu.Lock()
p.createNew()
Expand All @@ -235,12 +238,16 @@ func (p *Plugin) sealUp() {
if err := oldFile.Close(); err != nil {
p.logger.Panicf("could not close file: %s, error: %s", oldFile.Name(), err.Error())
}

if p.SealUpCallback != nil {
go p.SealUpCallback(newFileName)
}
}

func (p *Plugin) rename() {
fullFileName := filepath.Join(p.targetDir, fmt.Sprintf("%s%s%d%s%s%s", p.fileName, fileNameSeparator, p.idx, fileNameSeparator, time.Now().Format(p.config.Layout), p.fileExtension))

func (p *Plugin) rename(newFileName string) {
f := fmt.Sprintf("%s%s", p.targetDir, p.tsFileName)
if err := os.Rename(f, fullFileName); err != nil {
if err := os.Rename(f, newFileName); err != nil {
p.logger.Panicf("could not rename file, error: %s", err.Error())
}
p.idx++
Expand Down
Loading

0 comments on commit a2dfa78

Please sign in to comment.