Skip to content

Commit

Permalink
add read_start option
Browse files Browse the repository at this point in the history
  • Loading branch information
wen-templari committed Dec 22, 2024
1 parent d829a5b commit 1993943
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 17 deletions.
9 changes: 9 additions & 0 deletions plugins/inputs/tail/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Read file from beginning.
# from_beginning = false

## Read file method (defaults to 'save-offset-or-end').
## After setting this, the "from_beginning" option will be ignored.
## The following methods are available:
## save-offset-or-end -- start reading from the persisted offset of the file or, if no offset persisted, start from the end (default)
## save-offset-or-beginning -- start reading from the persisted offset of the file or, if no offset persisted, start from the beginning
## end -- start reading from the end of the file ignoring any saved offset
## beginning -- start reading from the beginning of the file ignoring any saved offset
# read_start = "save-offset-or-end"

## Whether file is a named pipe
# pipe = false

Expand Down
9 changes: 9 additions & 0 deletions plugins/inputs/tail/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
## Read file from beginning.
# from_beginning = false

## Read file method (defaults to 'save-offset-or-end').
## After setting this, the "from_beginning" option will be ignored.
## The following methods are available:
## save-offset-or-end -- start reading from the persisted offset of the file or, if no offset persisted, start from the end (default)
## save-offset-or-beginning -- start reading from the persisted offset of the file or, if no offset persisted, start from the beginning
## end -- start reading from the end of the file ignoring any saved offset
## beginning -- start reading from the beginning of the file ignoring any saved offset
# read_start = "save-offset-or-end"

## Whether file is a named pipe
# pipe = false

Expand Down
68 changes: 53 additions & 15 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type semaphore chan empty
type Tail struct {
Files []string `toml:"files"`
FromBeginning bool `toml:"from_beginning"`
ReadStart string `toml:"read_start"`
Pipe bool `toml:"pipe"`
WatchMethod string `toml:"watch_method"`
MaxUndeliveredLines int `toml:"max_undelivered_lines"`
Expand Down Expand Up @@ -162,6 +163,54 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
return err
}

func (t *Tail) getSeekInfo(file string, fromBeginning bool) (*tail.SeekInfo, error) {
var seek *tail.SeekInfo

seekStart := &tail.SeekInfo{
Whence: 0,
Offset: 0,
}
seekEnd := &tail.SeekInfo{
Whence: 2,
Offset: 0,
}

if fromBeginning && t.ReadStart == "" {
return seekStart, nil
}

switch t.ReadStart {
case "end":
seek = seekEnd
case "beginning":
seek = seekStart
case "", "save-offset-or-end":
if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = seekEnd
}
case "save-offset-or-beginning":
if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = seekStart
}
default:
return nil, errors.New("invalid 'read_start' setting")
}

return seek, nil
}

func (t *Tail) tailNewFiles(fromBeginning bool) error {
var poll bool
if t.WatchMethod == "poll" {
Expand All @@ -180,20 +229,9 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
continue
}

var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}
seek, err := t.getSeekInfo(file, fromBeginning)
if err != nil {
return err
}

tailer, err := tail.TailFile(file,
Expand Down Expand Up @@ -379,7 +417,7 @@ func (t *Tail) receiver(parser telegraf.Parser, tailer *tail.Tail) {

func (t *Tail) Stop() {
for _, tailer := range t.tailers {
if !t.Pipe && !t.FromBeginning {
if !t.Pipe {
// store offset for resume
offset, err := tailer.Tell()
if err == nil {
Expand Down
125 changes: 123 additions & 2 deletions plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"github.com/influxdata/tail"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func newInfluxParser() (telegraf.Parser, error) {
Expand Down Expand Up @@ -800,3 +800,124 @@ func TestStatePersistence(t *testing.T) {
require.True(t, ok, "state is not a map[string]int64")
require.Equal(t, expectedState, actualState)
}

func TestGetSeekInfo(t *testing.T) {
tests := []struct {
name string
offsets map[string]int64
file string
fromBeginning bool
readStart string
expected *tail.SeekInfo
}{
{
name: "Read from beginning when from_beginning set to true",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
fromBeginning: true,
expected: &tail.SeekInfo{
Whence: 0,
Offset: 0,
},
},
{
name: "Read from beginning when read_start set to beginning",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
readStart: "beginning",
expected: &tail.SeekInfo{
Whence: 0,
Offset: 0,
},
},
{
name: "Read from end when read_start set to end",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
readStart: "end",
expected: &tail.SeekInfo{
Whence: 2,
Offset: 0,
},
},
{
name: "Read from end when offset not exists and read_start set to save-offset-or-end",
offsets: map[string]int64{},
file: "test.log",
readStart: "save-offset-or-end",
expected: &tail.SeekInfo{
Whence: 2,
Offset: 0,
},
},
{
name: "Read from offset when offset exists and read_start set to save-offset-or-end",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
readStart: "save-offset-or-end",
expected: &tail.SeekInfo{
Whence: 0,
Offset: 100,
},
},
{
name: "Read from start when offset not exists and read_start set to save-offset-or-start",
offsets: map[string]int64{},
file: "test.log",
readStart: "save-offset-or-beginning",
expected: &tail.SeekInfo{
Whence: 0,
Offset: 0,
},
},
{
name: "Read from offset when offset exists and read_start set to save-offset-or-end",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
readStart: "save-offset-or-beginning",
expected: &tail.SeekInfo{
Whence: 0,
Offset: 100,
},
},
{
name: "Ignore from_beginning when read_start is set",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
fromBeginning: true,
readStart: "save-offset-or-beginning",
expected: &tail.SeekInfo{
Whence: 0,
Offset: 100,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logger := &testutil.CaptureLogger{}
tt := NewTestTail()
tt.Log = logger
tt.ReadStart = test.readStart
tt.FromBeginning = test.fromBeginning

require.NoError(t, tt.Init())
tt.offsets = test.offsets

seekInfo, err := tt.getSeekInfo(test.file, test.fromBeginning)
require.NoError(t, err)
require.Equal(t, test.expected, seekInfo)
})
}

t.Run("Return error when read_start is invalid", func(t *testing.T) {
logger := &testutil.CaptureLogger{}
tt := NewTestTail()
tt.Log = logger
tt.ReadStart = "invalid"

require.NoError(t, tt.Init())
_, err := tt.getSeekInfo("test.log", false)
require.Error(t, err)
})
}

0 comments on commit 1993943

Please sign in to comment.