Skip to content

Commit

Permalink
only uniqie offsets filename (#612)
Browse files Browse the repository at this point in the history
* only uniqie offsets filename

* show max_files on reach a limit

* fix data race in TestModifyTrim

* show diff for TestReadContinue

* show path from config on same offset file
  • Loading branch information
DmitryRomanov authored Apr 16, 2024
1 parent fe85e5c commit 42eddbb
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 14 deletions.
35 changes: 27 additions & 8 deletions plugin/action/modify/modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,16 @@ func TestModifyRegex(t *testing.T) {
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false))
wg := &sync.WaitGroup{}

outEvents := make([]*pipeline.Event, 0)
outEvents := struct {
mu sync.Mutex
events []*pipeline.Event
}{
events: make([]*pipeline.Event, 0),
}
output.SetOutFn(func(e *pipeline.Event) {
outEvents = append(outEvents, e)
outEvents.mu.Lock()
outEvents.events = append(outEvents.events, e)
outEvents.mu.Unlock()
wg.Done()
})
wg.Add(len(testEvents))
Expand All @@ -74,12 +81,14 @@ func TestModifyRegex(t *testing.T) {
wg.Wait()
p.Stop()

assert.Equal(t, len(testEvents), len(outEvents), "wrong out events count")
assert.Equal(t, len(testEvents), len(outEvents.events), "wrong out events count")
for i := 0; i < len(testEvents); i++ {
fvs := testEvents[i].fieldsValues
for field := range fvs {
wantVal := fvs[field]
gotVal := outEvents[i].Root.Dig(field).AsString()
outEvents.mu.Lock()
gotVal := outEvents.events[i].Root.Dig(field).AsString()
outEvents.mu.Unlock()
assert.Equal(t, wantVal, gotVal, "wrong field value")
}
}
Expand Down Expand Up @@ -113,9 +122,17 @@ func TestModifyTrim(t *testing.T) {
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false))
wg := &sync.WaitGroup{}

outEvents := make([]*pipeline.Event, 0)
outEvents := struct {
mu sync.Mutex
events []*pipeline.Event
}{
events: make([]*pipeline.Event, 0),
}

output.SetOutFn(func(e *pipeline.Event) {
outEvents = append(outEvents, e)
outEvents.mu.Lock()
outEvents.events = append(outEvents.events, e)
outEvents.mu.Unlock()
wg.Done()
})
wg.Add(len(testEvents))
Expand All @@ -127,12 +144,14 @@ func TestModifyTrim(t *testing.T) {
wg.Wait()
p.Stop()

assert.Equal(t, len(testEvents), len(outEvents), "wrong out events count")
assert.Equal(t, len(testEvents), len(outEvents.events), "wrong out events count")
for i := 0; i < len(testEvents); i++ {
fvs := testEvents[i].fieldsValues
for field := range fvs {
wantVal := fvs[field]
gotVal := outEvents[i].Root.Dig(field).AsString()
outEvents.mu.Lock()
gotVal := outEvents.events[i].Root.Dig(field).AsString()
outEvents.mu.Unlock()
assert.Equal(t, wantVal, gotVal, "wrong field value")
}
}
Expand Down
14 changes: 14 additions & 0 deletions plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package file

import (
"net/http"
"path/filepath"
"time"

"github.com/ozontech/file.d/cfg"
Expand Down Expand Up @@ -174,6 +175,8 @@ type Config struct {
ShouldWatchChanges bool `json:"should_watch_file_changes" default:"false"` // *
}

var offsetFiles = make(map[string]string)

func init() {
fd.DefaultPluginRegistry.RegisterInput(&pipeline.PluginStaticInfo{
Type: "file",
Expand All @@ -197,6 +200,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa

p.config.OffsetsFileTmp = p.config.OffsetsFile + ".atomic"

offsetFilePath := filepath.Clean(p.config.OffsetsFile)
if pipelineName, alreadyUsed := offsetFiles[offsetFilePath]; alreadyUsed {
p.logger.Fatalf(
"offset file %s is already used in pipeline %s",
p.config.OffsetsFile,
pipelineName,
)
} else {
offsetFiles[offsetFilePath] = params.PipelineName
}

p.jobProvider = NewJobProvider(
p.config,
newMetricCollection(
Expand Down
21 changes: 16 additions & 5 deletions plugin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func rotateFile(file string) string {
func run(testCase *test.Case, eventCount int, opts ...string) {
if !test.Opts(opts).Has("dirty") {
cleanUp()
setupDirs()
}

test.RunCase(testCase, getInputInfo(opts...), eventCount, opts...)
Expand Down Expand Up @@ -362,6 +363,7 @@ func TestWatch(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}

iterations := 4
eventsPerIteration := 2
finalEvent := 1
Expand Down Expand Up @@ -429,8 +431,8 @@ func TestReadContinue(t *testing.T) {
blockSize := 2000
stopAfter := 100
processed := 0
inputEvents := make([]string, 0)
outputEvents := make([]string, 0)
inputEvents := make([]string, 0, blockSize*2)
outputEvents := make([]string, 0, cap(inputEvents)+stopAfter)
file := ""
size := 0

Expand All @@ -454,6 +456,9 @@ func TestReadContinue(t *testing.T) {
},
}, stopAfter)

// restart
offsetFiles = make(map[string]string)

run(&test.Case{
Prepare: func() {
},
Expand All @@ -470,9 +475,11 @@ func TestReadContinue(t *testing.T) {
outputEvents = append(outputEvents, p.GetEventLogItem(i))
}

for i := range inputEvents {
require.Equalf(t, inputEvents[i], outputEvents[i], "wrong event, all events: %v", inputEvents)
}
require.Equalf(
t, inputEvents, outputEvents,
"input events not equal output events (input len=%d, output len=%d)",
len(inputEvents), len(outputEvents),
)

assertOffsetsAreEqual(t, genOffsetsContent(file, size), getContent(getConfigByPipeline(p).OffsetsFile))
},
Expand Down Expand Up @@ -928,6 +935,8 @@ func TestRotationRenameWhileNotWorking(t *testing.T) {
},
}, 2)

// restart
offsetFiles = make(map[string]string)
newFile := rotateFile(file)

run(&test.Case{
Expand Down Expand Up @@ -995,6 +1004,8 @@ func TestTruncationSeq(t *testing.T) {
if testing.Short() {
t.Skip("skip long tests in short mode")
}
setupDirs()
defer cleanUp()
p, _, _ := test.NewPipelineMock(nil, "passive")
p.SetInput(getInputInfo())
p.Start()
Expand Down
5 changes: 4 additions & 1 deletion plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,10 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
jp.numberOfCurrentJobsMetric.Set(float64(jobsLen))

if jobsLen > jp.config.MaxFiles {
jp.logger.Fatalf("max_files reached for input plugin, consider increase this parameter")
jp.logger.Fatalf(
"limit max_files=%d is reached for input plugin, consider increase this parameter",
jp.config.MaxFiles,
)
}
jp.jobsLog = append(jp.jobsLog, filename)
jp.jobsDone.Inc()
Expand Down

0 comments on commit 42eddbb

Please sign in to comment.