Skip to content

Commit

Permalink
Merge pull request #619 from ozontech/fix-remove-symlink
Browse files Browse the repository at this point in the history
dont remove symlink from file provider on broken link
  • Loading branch information
DmitryRomanov authored Apr 26, 2024
2 parents 23667e1 + cfb43e7 commit 953aa11
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 9 deletions.
13 changes: 7 additions & 6 deletions plugin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ func TestReadContinue(t *testing.T) {
blockSize := 2000
stopAfter := 100
processed := 0
inputEvents := make([]string, 0, blockSize*2)
outputEvents := make([]string, 0, cap(inputEvents)+stopAfter)
inputEvents := make(map[string]bool, blockSize*2)
outputEvents := make(map[string]bool, blockSize*2+stopAfter)
file := ""
size := 0

Expand All @@ -444,14 +444,14 @@ func TestReadContinue(t *testing.T) {
for x := 0; x < blockSize; x++ {
line := fmt.Sprintf(`{"data_1":"line_%d"}`, x)
size += len(line) + newLine
inputEvents = append(inputEvents, line)
inputEvents[line] = true
addString(file, line, true, false)
}
},
Assert: func(p *pipeline.Pipeline) {
processed = p.GetEventsTotal()
for i := 0; i < processed; i++ {
outputEvents = append(outputEvents, p.GetEventLogItem(i))
outputEvents[p.GetEventLogItem(i)] = true
}
},
}, stopAfter)
Expand All @@ -466,15 +466,16 @@ func TestReadContinue(t *testing.T) {
for x := 0; x < blockSize; x++ {
line := fmt.Sprintf(`{"data_2":"line_%d"}`, x)
size += len(line) + newLine
inputEvents = append(inputEvents, line)
inputEvents[line] = true
addString(file, line, true, false)
}
},
Assert: func(p *pipeline.Pipeline) {
for i := 0; i < p.GetEventsTotal(); i++ {
outputEvents = append(outputEvents, p.GetEventLogItem(i))
outputEvents[p.GetEventLogItem(i)] = true
}

// we compare maps because we can tolerate dublicates
require.Equalf(
t, inputEvents, outputEvents,
"input events not equal output events (input len=%d, output len=%d)",
Expand Down
12 changes: 9 additions & 3 deletions plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,22 @@ func (jp *jobProvider) addSymlink(inode inodeID, filename string) {
}

func (jp *jobProvider) refreshSymlink(symlink string, inode inodeID, isWrite bool) {
filename, err := filepath.EvalSymlinks(symlink)
if err != nil {
jp.logger.Warnf("symlink have been removed %s", symlink)
if _, err := os.Lstat(symlink); err != nil {
jp.logger.Warnf("symlink has been removed %s: %s", symlink, err.Error())

jp.symlinksMu.Lock()
delete(jp.symlinks, inode)
jp.symlinksMu.Unlock()
return
}

filename, err := os.Readlink(symlink)
if err != nil {
jp.logger.Warnf("symlink %s is broken: %s", symlink, err.Error())
// for example, in the case of rotating the k8s pod logs symlink will be fixed soon
return
}

filename, err = filepath.Abs(filename)
if err != nil {
jp.logger.Warnf("can't follow symlink to %s: %s", filename, err.Error())
Expand Down
50 changes: 50 additions & 0 deletions plugin/input/file/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package file

import (
"os"
"path/filepath"
"testing"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/require"
)

func TestRefreshSymlinkOnBrokenLink(t *testing.T) {
file, err := os.CreateTemp("", "input_file")
if err != nil {
panic(err.Error())
}

linkName := filepath.Join(os.TempDir(), uuid.NewV4().String())
err = os.Symlink(file.Name(), linkName)
if err != nil {
panic(err.Error())
}
defer os.Remove(linkName)
ctl := metric.NewCtl("test", prometheus.NewRegistry())
metrics := newMetricCollection(
ctl.RegisterCounter("worker1", "help_test"),
ctl.RegisterCounter("worker2", "help_test"),
ctl.RegisterGauge("worker3", "help_test"),
ctl.RegisterGauge("worker4", "help_test"),
)
jp := NewJobProvider(&Config{
MaxFiles: 100,
}, metrics, logger.Instance)
jp.addSymlink(1, linkName)
jp.maintenanceSymlinks()
require.Equal(t, 1, len(jp.symlinks))

// broke link
os.Remove(file.Name())
jp.maintenanceSymlinks()
require.Equal(t, 1, len(jp.symlinks))

// delete link
os.Remove(linkName)
jp.maintenanceSymlinks()
require.Equal(t, 0, len(jp.symlinks))
}

0 comments on commit 953aa11

Please sign in to comment.