diff --git a/plugin/action/modify/modify_test.go b/plugin/action/modify/modify_test.go index 0eac32d7b..103a8c726 100644 --- a/plugin/action/modify/modify_test.go +++ b/plugin/action/modify/modify_test.go @@ -60,9 +60,16 @@ func TestModifyRegex(t *testing.T) { p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} - outEvents := make([]*pipeline.Event, 0) + outEvents := struct { + mu sync.Mutex + events []*pipeline.Event + }{ + events: make([]*pipeline.Event, 0), + } output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents.mu.Lock() + outEvents.events = append(outEvents.events, e) + outEvents.mu.Unlock() wg.Done() }) wg.Add(len(testEvents)) @@ -74,12 +81,14 @@ func TestModifyRegex(t *testing.T) { wg.Wait() p.Stop() - assert.Equal(t, len(testEvents), len(outEvents), "wrong out events count") + assert.Equal(t, len(testEvents), len(outEvents.events), "wrong out events count") for i := 0; i < len(testEvents); i++ { fvs := testEvents[i].fieldsValues for field := range fvs { wantVal := fvs[field] - gotVal := outEvents[i].Root.Dig(field).AsString() + outEvents.mu.Lock() + gotVal := outEvents.events[i].Root.Dig(field).AsString() + outEvents.mu.Unlock() assert.Equal(t, wantVal, gotVal, "wrong field value") } } @@ -113,9 +122,17 @@ func TestModifyTrim(t *testing.T) { p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) wg := &sync.WaitGroup{} - outEvents := make([]*pipeline.Event, 0) + outEvents := struct { + mu sync.Mutex + events []*pipeline.Event + }{ + events: make([]*pipeline.Event, 0), + } + output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents.mu.Lock() + outEvents.events = append(outEvents.events, e) + outEvents.mu.Unlock() wg.Done() }) wg.Add(len(testEvents)) @@ -127,12 +144,14 @@ func TestModifyTrim(t *testing.T) { wg.Wait() p.Stop() - assert.Equal(t, len(testEvents), len(outEvents), "wrong out events count") + assert.Equal(t, len(testEvents), len(outEvents.events), "wrong out events count") for i := 0; i < len(testEvents); i++ { fvs := testEvents[i].fieldsValues for field := range fvs { wantVal := fvs[field] - gotVal := outEvents[i].Root.Dig(field).AsString() + outEvents.mu.Lock() + gotVal := outEvents.events[i].Root.Dig(field).AsString() + outEvents.mu.Unlock() assert.Equal(t, wantVal, gotVal, "wrong field value") } } diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index e916df15b..083eac8d5 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -2,6 +2,7 @@ package file import ( "net/http" + "path/filepath" "time" "github.com/ozontech/file.d/cfg" @@ -174,6 +175,8 @@ type Config struct { ShouldWatchChanges bool `json:"should_watch_file_changes" default:"false"` // * } +var offsetFiles = make(map[string]string) + func init() { fd.DefaultPluginRegistry.RegisterInput(&pipeline.PluginStaticInfo{ Type: "file", @@ -197,6 +200,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.config.OffsetsFileTmp = p.config.OffsetsFile + ".atomic" + offsetFilePath := filepath.Clean(p.config.OffsetsFile) + if pipelineName, alreadyUsed := offsetFiles[offsetFilePath]; alreadyUsed { + p.logger.Fatalf( + "offset file %s is already used in pipeline %s", + p.config.OffsetsFile, + pipelineName, + ) + } else { + offsetFiles[offsetFilePath] = params.PipelineName + } + p.jobProvider = NewJobProvider( p.config, newMetricCollection( diff --git a/plugin/input/file/file_test.go b/plugin/input/file/file_test.go index d2abb2f42..09deba19a 100644 --- a/plugin/input/file/file_test.go +++ b/plugin/input/file/file_test.go @@ -129,6 +129,7 @@ func rotateFile(file string) string { func run(testCase *test.Case, eventCount int, opts ...string) { if !test.Opts(opts).Has("dirty") { cleanUp() + setupDirs() } test.RunCase(testCase, getInputInfo(opts...), eventCount, opts...) @@ -362,6 +363,7 @@ func TestWatch(t *testing.T) { if testing.Short() { t.Skip("skip test in short mode") } + iterations := 4 eventsPerIteration := 2 finalEvent := 1 @@ -429,8 +431,8 @@ func TestReadContinue(t *testing.T) { blockSize := 2000 stopAfter := 100 processed := 0 - inputEvents := make([]string, 0) - outputEvents := make([]string, 0) + inputEvents := make([]string, 0, blockSize*2) + outputEvents := make([]string, 0, cap(inputEvents)+stopAfter) file := "" size := 0 @@ -454,6 +456,9 @@ func TestReadContinue(t *testing.T) { }, }, stopAfter) + // restart + offsetFiles = make(map[string]string) + run(&test.Case{ Prepare: func() { }, @@ -470,9 +475,11 @@ func TestReadContinue(t *testing.T) { outputEvents = append(outputEvents, p.GetEventLogItem(i)) } - for i := range inputEvents { - require.Equalf(t, inputEvents[i], outputEvents[i], "wrong event, all events: %v", inputEvents) - } + require.Equalf( + t, inputEvents, outputEvents, + "input events not equal output events (input len=%d, output len=%d)", + len(inputEvents), len(outputEvents), + ) assertOffsetsAreEqual(t, genOffsetsContent(file, size), getContent(getConfigByPipeline(p).OffsetsFile)) }, @@ -928,6 +935,8 @@ func TestRotationRenameWhileNotWorking(t *testing.T) { }, }, 2) + // restart + offsetFiles = make(map[string]string) newFile := rotateFile(file) run(&test.Case{ @@ -995,6 +1004,8 @@ func TestTruncationSeq(t *testing.T) { if testing.Short() { t.Skip("skip long tests in short mode") } + setupDirs() + defer cleanUp() p, _, _ := test.NewPipelineMock(nil, "passive") p.SetInput(getInputInfo()) p.Start() diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index 2b6e50c43..e41f1bb07 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -367,7 +367,10 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, jp.numberOfCurrentJobsMetric.Set(float64(jobsLen)) if jobsLen > jp.config.MaxFiles { - jp.logger.Fatalf("max_files reached for input plugin, consider increase this parameter") + jp.logger.Fatalf( + "limit max_files=%d is reached for input plugin, consider increase this parameter", + jp.config.MaxFiles, + ) } jp.jobsLog = append(jp.jobsLog, filename) jp.jobsDone.Inc()