diff --git a/tail.go b/tail.go index 1a3608a..57a3277 100644 --- a/tail.go +++ b/tail.go @@ -9,6 +9,7 @@ import ( "syscall" "time" + "github.com/fsnotify/fsnotify" "github.com/go-faster/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -75,7 +76,7 @@ type Config struct { // Logger to use, optional. Logger *zap.Logger // Tracker is optional custom *Tracker. - Tracker *Tracker + Tracker Tracker } // Handler is called on each log line. @@ -83,6 +84,15 @@ type Config struct { // Implementation should not retain Line or Line.Data. type Handler func(ctx context.Context, l *Line) error +// Tracker tracks file changes. +type Tracker interface { + watchFile(name string) error + watchCreate(name string) error + removeWatchName(name string) error + removeWatchCreate(name string) error + listenEvents(name string) <-chan fsnotify.Event +} + // Tailer implements file tailing. // // Use Tail() to start. diff --git a/tail_test.go b/tail_test.go index 60e2856..96bb70f 100644 --- a/tail_test.go +++ b/tail_test.go @@ -169,7 +169,7 @@ func TestMultipleTails(t *testing.T) { f := file(t) lg := zaptest.NewLogger(t) - tracker := NewTracker(lg) + tr := NewTracker(lg) g, ctx := errgroup.WithContext(context.Background()) const ( @@ -183,7 +183,7 @@ func TestMultipleTails(t *testing.T) { NotifyTimeout: notifyTimeout, Follow: true, Logger: lg.Named(fmt.Sprintf("t%d", i)), - Tracker: tracker, + Tracker: tr, }) g.Go(func() error { var gotLines int @@ -224,7 +224,7 @@ func TestDelete(t *testing.T) { f := file(t) lg := zaptest.NewLogger(t) - tracker := NewTracker(lg) + tr := NewTracker(lg) g, ctx := errgroup.WithContext(context.Background()) const lines = 10 @@ -240,7 +240,7 @@ func TestDelete(t *testing.T) { NotifyTimeout: notifyTimeout, Follow: true, Logger: lg, - Tracker: tracker, + Tracker: tr, }) read := make(chan struct{}) diff --git a/tracker.go b/tracker.go index 1cfe97b..03d5a1b 100644 --- a/tracker.go +++ b/tracker.go @@ -11,8 +11,8 @@ import ( "go.uber.org/zap" ) -// Tracker multiplexes fsnotify events. -type Tracker struct { +// tracker multiplexes fsnotify events. +type tracker struct { init sync.Once mux sync.Mutex watcher *fsnotify.Watcher @@ -34,11 +34,11 @@ func (i *watchInfo) isCreate() bool { return i.op == fsnotify.Create } -// NewTracker creates new custom *Tracker with provided logger. +// NewTracker creates new custom Tracker with provided logger. // // It is recommended to use it as singleton and create only once. -func NewTracker(log *zap.Logger) *Tracker { - return &Tracker{ +func NewTracker(log *zap.Logger) Tracker { + return &tracker{ chans: make(map[string]chan fsnotify.Event), done: make(map[string]chan bool), watchNums: make(map[string]int), @@ -52,7 +52,7 @@ func NewTracker(log *zap.Logger) *Tracker { var defaultTracker = NewTracker(zap.NewNop()) // watchFile signals the run goroutine to begin watching the input filename -func (t *Tracker) watchFile(name string) error { +func (t *tracker) watchFile(name string) error { return t.watchInfo(&watchInfo{ name: name, }) @@ -60,14 +60,14 @@ func (t *Tracker) watchFile(name string) error { // watchCreate watches create signals the run goroutine to begin watching the input filename // if call the watchCreate function, don't call the Cleanup, call the removeWatchCreate -func (t *Tracker) watchCreate(name string) error { +func (t *tracker) watchCreate(name string) error { return t.watchInfo(&watchInfo{ op: fsnotify.Create, name: name, }) } -func (t *Tracker) watchInfo(winfo *watchInfo) error { +func (t *tracker) watchInfo(winfo *watchInfo) error { if err := t.ensure(); err != nil { return err } @@ -78,7 +78,7 @@ func (t *Tracker) watchInfo(winfo *watchInfo) error { } // removeWatchInfo signals the run goroutine to remove the watch for the input filename -func (t *Tracker) removeWatchName(name string) error { +func (t *tracker) removeWatchName(name string) error { return t.removeInfo(&watchInfo{ name: name, }) @@ -86,16 +86,16 @@ func (t *Tracker) removeWatchName(name string) error { // removeWatchCreate signals the run goroutine to remove the // watch for the input filename. -func (t *Tracker) removeWatchCreate(name string) error { +func (t *tracker) removeWatchCreate(name string) error { return t.removeInfo(&watchInfo{ op: fsnotify.Create, name: name, }) } -func (t *Tracker) ensure() (err error) { +func (t *tracker) ensure() (err error) { if t == nil { - return errors.New("Tracker: invalid call (nil)") + return errors.New("tracker: invalid call (nil)") } t.init.Do(func() { @@ -111,7 +111,7 @@ func (t *Tracker) ensure() (err error) { return err } -func (t *Tracker) removeInfo(winfo *watchInfo) error { +func (t *tracker) removeInfo(winfo *watchInfo) error { if err := t.ensure(); err != nil { return err } @@ -132,7 +132,7 @@ func (t *Tracker) removeInfo(winfo *watchInfo) error { // listenEvents returns a channel to which FileEvents corresponding to the input filename // will be sent. This channel will be closed when removeWatchInfo is called on this // filename. -func (t *Tracker) listenEvents(name string) <-chan fsnotify.Event { +func (t *tracker) listenEvents(name string) <-chan fsnotify.Event { t.mux.Lock() defer t.mux.Unlock() @@ -141,7 +141,7 @@ func (t *Tracker) listenEvents(name string) <-chan fsnotify.Event { // watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating // a new watcher if the previous watcher was closed. -func (t *Tracker) addWatchInfo(winfo *watchInfo) error { +func (t *tracker) addWatchInfo(winfo *watchInfo) error { t.mux.Lock() defer t.mux.Unlock() @@ -172,7 +172,7 @@ func (t *Tracker) addWatchInfo(winfo *watchInfo) error { // removeWatchInfo calls fsnotify.Remove for the input filename and closes the // corresponding events channel. -func (t *Tracker) removeWatchInfo(winfo *watchInfo) error { +func (t *tracker) removeWatchInfo(winfo *watchInfo) error { t.mux.Lock() ch := t.chans[winfo.name] @@ -206,7 +206,7 @@ func (t *Tracker) removeWatchInfo(winfo *watchInfo) error { } // sendEvent sends the input event to the appropriate Tail. -func (t *Tracker) sendEvent(event fsnotify.Event) { +func (t *tracker) sendEvent(event fsnotify.Event) { name := filepath.Clean(event.Name) t.mux.Lock() @@ -223,7 +223,7 @@ func (t *Tracker) sendEvent(event fsnotify.Event) { } // run starts reading from inotify events. -func (t *Tracker) run() { +func (t *tracker) run() { for { select { case winfo := <-t.watch: diff --git a/watcher.go b/watcher.go index ba7670f..764d441 100644 --- a/watcher.go +++ b/watcher.go @@ -36,13 +36,13 @@ type watchHandler func(ctx context.Context, e event) error // watcher uses newWatcher to monitor file changes. type watcher struct { - t *Tracker + t Tracker lg *zap.Logger name string size int64 } -func newWatcher(lg *zap.Logger, t *Tracker, filename string) *watcher { +func newWatcher(lg *zap.Logger, t Tracker, filename string) *watcher { return &watcher{ t: t, name: filepath.Clean(filename), diff --git a/watcher_test.go b/watcher_test.go new file mode 100644 index 0000000..3d01cf8 --- /dev/null +++ b/watcher_test.go @@ -0,0 +1,123 @@ +package tail + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/fsnotify/fsnotify" + "github.com/go-faster/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "golang.org/x/sync/errgroup" +) + +type wrapTracker struct { + file func(name string) + create func(name string) + t Tracker +} + +func (w wrapTracker) watchFile(name string) error { + if w.file != nil { + defer w.file(name) + } + return w.t.watchFile(name) +} + +func (w wrapTracker) watchCreate(name string) error { + if w.create != nil { + defer w.create(name) + } + return w.t.watchCreate(name) +} + +func (w wrapTracker) removeWatchName(name string) error { + return w.t.removeWatchName(name) +} + +func (w wrapTracker) removeWatchCreate(name string) error { + return w.t.removeWatchCreate(name) +} + +func (w wrapTracker) listenEvents(name string) <-chan fsnotify.Event { + return w.t.listenEvents(name) +} + +func TestCreateAfterWatch(t *testing.T) { + lg := zaptest.NewLogger(t) + g, ctx := errgroup.WithContext(context.Background()) + name := filepath.Join(t.TempDir(), "foo.txt") + + const lines = 10 + + started := make(chan struct{}) + g.Go(func() error { + select { + case <-started: + case <-ctx.Done(): + return ctx.Err() + } + + f, err := os.Create(name) + if err != nil { + return err + } + for i := 0; i < lines; i++ { + if _, err := fmt.Fprintln(f, line); err != nil { + return err + } + } + return f.Close() + }) + + tailer := File(name, Config{ + NotifyTimeout: notifyTimeout, + Follow: true, + Logger: lg, + Tracker: wrapTracker{ + t: NewTracker(lg), + create: func(name string) { + close(started) + }, + }, + }) + + read := make(chan struct{}) + g.Go(func() error { + var gotLines int + // Ensure that each tailer got all lines. + h := func(ctx context.Context, l *Line) error { + assert.Equal(t, line, string(l.Data)) + gotLines++ + if gotLines == lines { + close(read) + } + return nil + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + if err := tailer.Tail(ctx, h); !errors.Is(err, errStop) { + return err + } + + return nil + }) + + // Read lines. + g.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-read: // ok + } + return os.Remove(name) + }) + + require.NoError(t, g.Wait()) +}