diff --git a/plugin/input/file/file_test.go b/plugin/input/file/file_test.go index 09deba19a..d8649a21d 100644 --- a/plugin/input/file/file_test.go +++ b/plugin/input/file/file_test.go @@ -431,8 +431,8 @@ func TestReadContinue(t *testing.T) { blockSize := 2000 stopAfter := 100 processed := 0 - inputEvents := make([]string, 0, blockSize*2) - outputEvents := make([]string, 0, cap(inputEvents)+stopAfter) + inputEvents := make(map[string]bool, blockSize*2) + outputEvents := make(map[string]bool, blockSize*2+stopAfter) file := "" size := 0 @@ -444,14 +444,14 @@ func TestReadContinue(t *testing.T) { for x := 0; x < blockSize; x++ { line := fmt.Sprintf(`{"data_1":"line_%d"}`, x) size += len(line) + newLine - inputEvents = append(inputEvents, line) + inputEvents[line] = true addString(file, line, true, false) } }, Assert: func(p *pipeline.Pipeline) { processed = p.GetEventsTotal() for i := 0; i < processed; i++ { - outputEvents = append(outputEvents, p.GetEventLogItem(i)) + outputEvents[p.GetEventLogItem(i)] = true } }, }, stopAfter) @@ -466,15 +466,16 @@ func TestReadContinue(t *testing.T) { for x := 0; x < blockSize; x++ { line := fmt.Sprintf(`{"data_2":"line_%d"}`, x) size += len(line) + newLine - inputEvents = append(inputEvents, line) + inputEvents[line] = true addString(file, line, true, false) } }, Assert: func(p *pipeline.Pipeline) { for i := 0; i < p.GetEventsTotal(); i++ { - outputEvents = append(outputEvents, p.GetEventLogItem(i)) + outputEvents[p.GetEventLogItem(i)] = true } + // we compare maps because we can tolerate dublicates require.Equalf( t, inputEvents, outputEvents, "input events not equal output events (input len=%d, output len=%d)", diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index e41f1bb07..d2fb412e2 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -262,9 +262,8 @@ func (jp *jobProvider) addSymlink(inode inodeID, filename string) { } func (jp *jobProvider) refreshSymlink(symlink string, inode inodeID, isWrite bool) { - filename, err := filepath.EvalSymlinks(symlink) - if err != nil { - jp.logger.Warnf("symlink have been removed %s", symlink) + if _, err := os.Lstat(symlink); err != nil { + jp.logger.Warnf("symlink has been removed %s: %s", symlink, err.Error()) jp.symlinksMu.Lock() delete(jp.symlinks, inode) @@ -272,6 +271,13 @@ func (jp *jobProvider) refreshSymlink(symlink string, inode inodeID, isWrite boo return } + filename, err := os.Readlink(symlink) + if err != nil { + jp.logger.Warnf("symlink %s is broken: %s", symlink, err.Error()) + // for example, in the case of rotating the k8s pod logs symlink will be fixed soon + return + } + filename, err = filepath.Abs(filename) if err != nil { jp.logger.Warnf("can't follow symlink to %s: %s", filename, err.Error()) diff --git a/plugin/input/file/provider_test.go b/plugin/input/file/provider_test.go new file mode 100644 index 000000000..7973bea71 --- /dev/null +++ b/plugin/input/file/provider_test.go @@ -0,0 +1,50 @@ +package file + +import ( + "os" + "path/filepath" + "testing" + + "github.com/ozontech/file.d/logger" + "github.com/ozontech/file.d/metric" + "github.com/prometheus/client_golang/prometheus" + uuid "github.com/satori/go.uuid" + "github.com/stretchr/testify/require" +) + +func TestRefreshSymlinkOnBrokenLink(t *testing.T) { + file, err := os.CreateTemp("", "input_file") + if err != nil { + panic(err.Error()) + } + + linkName := filepath.Join(os.TempDir(), uuid.NewV4().String()) + err = os.Symlink(file.Name(), linkName) + if err != nil { + panic(err.Error()) + } + defer os.Remove(linkName) + ctl := metric.NewCtl("test", prometheus.NewRegistry()) + metrics := newMetricCollection( + ctl.RegisterCounter("worker1", "help_test"), + ctl.RegisterCounter("worker2", "help_test"), + ctl.RegisterGauge("worker3", "help_test"), + ctl.RegisterGauge("worker4", "help_test"), + ) + jp := NewJobProvider(&Config{ + MaxFiles: 100, + }, metrics, logger.Instance) + jp.addSymlink(1, linkName) + jp.maintenanceSymlinks() + require.Equal(t, 1, len(jp.symlinks)) + + // broke link + os.Remove(file.Name()) + jp.maintenanceSymlinks() + require.Equal(t, 1, len(jp.symlinks)) + + // delete link + os.Remove(linkName) + jp.maintenanceSymlinks() + require.Equal(t, 0, len(jp.symlinks)) +}