Skip to content
This repository has been archived by the owner on Nov 23, 2023. It is now read-only.

Commit

Permalink
refactor: extract Tracker to interface
Browse files Browse the repository at this point in the history
Improve test coverage.

Update #6
  • Loading branch information
ernado committed Jan 22, 2022
1 parent 32050cb commit 4c49769
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 25 deletions.
12 changes: 11 additions & 1 deletion tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"syscall"
"time"

"github.com/fsnotify/fsnotify"
"github.com/go-faster/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -75,14 +76,23 @@ type Config struct {
// Logger to use, optional.
Logger *zap.Logger
// Tracker is optional custom *Tracker.
Tracker *Tracker
Tracker Tracker
}

// Handler is called on each log line.
//
// Implementation should not retain Line or Line.Data.
type Handler func(ctx context.Context, l *Line) error

// Tracker tracks file changes.
type Tracker interface {
watchFile(name string) error
watchCreate(name string) error
removeWatchName(name string) error
removeWatchCreate(name string) error
listenEvents(name string) <-chan fsnotify.Event
}

// Tailer implements file tailing.
//
// Use Tail() to start.
Expand Down
8 changes: 4 additions & 4 deletions tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestMultipleTails(t *testing.T) {
f := file(t)

lg := zaptest.NewLogger(t)
tracker := NewTracker(lg)
tr := NewTracker(lg)
g, ctx := errgroup.WithContext(context.Background())

const (
Expand All @@ -183,7 +183,7 @@ func TestMultipleTails(t *testing.T) {
NotifyTimeout: notifyTimeout,
Follow: true,
Logger: lg.Named(fmt.Sprintf("t%d", i)),
Tracker: tracker,
Tracker: tr,
})
g.Go(func() error {
var gotLines int
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestDelete(t *testing.T) {
f := file(t)

lg := zaptest.NewLogger(t)
tracker := NewTracker(lg)
tr := NewTracker(lg)
g, ctx := errgroup.WithContext(context.Background())

const lines = 10
Expand All @@ -240,7 +240,7 @@ func TestDelete(t *testing.T) {
NotifyTimeout: notifyTimeout,
Follow: true,
Logger: lg,
Tracker: tracker,
Tracker: tr,
})

read := make(chan struct{})
Expand Down
36 changes: 18 additions & 18 deletions tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"go.uber.org/zap"
)

// Tracker multiplexes fsnotify events.
type Tracker struct {
// tracker multiplexes fsnotify events.
type tracker struct {
init sync.Once
mux sync.Mutex
watcher *fsnotify.Watcher
Expand All @@ -34,11 +34,11 @@ func (i *watchInfo) isCreate() bool {
return i.op == fsnotify.Create
}

// NewTracker creates new custom *Tracker with provided logger.
// NewTracker creates new custom Tracker with provided logger.
//
// It is recommended to use it as singleton and create only once.
func NewTracker(log *zap.Logger) *Tracker {
return &Tracker{
func NewTracker(log *zap.Logger) Tracker {
return &tracker{
chans: make(map[string]chan fsnotify.Event),
done: make(map[string]chan bool),
watchNums: make(map[string]int),
Expand All @@ -52,22 +52,22 @@ func NewTracker(log *zap.Logger) *Tracker {
var defaultTracker = NewTracker(zap.NewNop())

// watchFile signals the run goroutine to begin watching the input filename
func (t *Tracker) watchFile(name string) error {
func (t *tracker) watchFile(name string) error {
return t.watchInfo(&watchInfo{
name: name,
})
}

// watchCreate watches create signals the run goroutine to begin watching the input filename
// if call the watchCreate function, don't call the Cleanup, call the removeWatchCreate
func (t *Tracker) watchCreate(name string) error {
func (t *tracker) watchCreate(name string) error {
return t.watchInfo(&watchInfo{
op: fsnotify.Create,
name: name,
})
}

func (t *Tracker) watchInfo(winfo *watchInfo) error {
func (t *tracker) watchInfo(winfo *watchInfo) error {
if err := t.ensure(); err != nil {
return err
}
Expand All @@ -78,24 +78,24 @@ func (t *Tracker) watchInfo(winfo *watchInfo) error {
}

// removeWatchInfo signals the run goroutine to remove the watch for the input filename
func (t *Tracker) removeWatchName(name string) error {
func (t *tracker) removeWatchName(name string) error {
return t.removeInfo(&watchInfo{
name: name,
})
}

// removeWatchCreate signals the run goroutine to remove the
// watch for the input filename.
func (t *Tracker) removeWatchCreate(name string) error {
func (t *tracker) removeWatchCreate(name string) error {
return t.removeInfo(&watchInfo{
op: fsnotify.Create,
name: name,
})
}

func (t *Tracker) ensure() (err error) {
func (t *tracker) ensure() (err error) {
if t == nil {
return errors.New("Tracker: invalid call (nil)")
return errors.New("tracker: invalid call (nil)")
}

t.init.Do(func() {
Expand All @@ -111,7 +111,7 @@ func (t *Tracker) ensure() (err error) {
return err
}

func (t *Tracker) removeInfo(winfo *watchInfo) error {
func (t *tracker) removeInfo(winfo *watchInfo) error {
if err := t.ensure(); err != nil {
return err
}
Expand All @@ -132,7 +132,7 @@ func (t *Tracker) removeInfo(winfo *watchInfo) error {
// listenEvents returns a channel to which FileEvents corresponding to the input filename
// will be sent. This channel will be closed when removeWatchInfo is called on this
// filename.
func (t *Tracker) listenEvents(name string) <-chan fsnotify.Event {
func (t *tracker) listenEvents(name string) <-chan fsnotify.Event {
t.mux.Lock()
defer t.mux.Unlock()

Expand All @@ -141,7 +141,7 @@ func (t *Tracker) listenEvents(name string) <-chan fsnotify.Event {

// watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating
// a new watcher if the previous watcher was closed.
func (t *Tracker) addWatchInfo(winfo *watchInfo) error {
func (t *tracker) addWatchInfo(winfo *watchInfo) error {
t.mux.Lock()
defer t.mux.Unlock()

Expand Down Expand Up @@ -172,7 +172,7 @@ func (t *Tracker) addWatchInfo(winfo *watchInfo) error {

// removeWatchInfo calls fsnotify.Remove for the input filename and closes the
// corresponding events channel.
func (t *Tracker) removeWatchInfo(winfo *watchInfo) error {
func (t *tracker) removeWatchInfo(winfo *watchInfo) error {
t.mux.Lock()

ch := t.chans[winfo.name]
Expand Down Expand Up @@ -206,7 +206,7 @@ func (t *Tracker) removeWatchInfo(winfo *watchInfo) error {
}

// sendEvent sends the input event to the appropriate Tail.
func (t *Tracker) sendEvent(event fsnotify.Event) {
func (t *tracker) sendEvent(event fsnotify.Event) {
name := filepath.Clean(event.Name)

t.mux.Lock()
Expand All @@ -223,7 +223,7 @@ func (t *Tracker) sendEvent(event fsnotify.Event) {
}

// run starts reading from inotify events.
func (t *Tracker) run() {
func (t *tracker) run() {
for {
select {
case winfo := <-t.watch:
Expand Down
4 changes: 2 additions & 2 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ type watchHandler func(ctx context.Context, e event) error

// watcher uses newWatcher to monitor file changes.
type watcher struct {
t *Tracker
t Tracker
lg *zap.Logger
name string
size int64
}

func newWatcher(lg *zap.Logger, t *Tracker, filename string) *watcher {
func newWatcher(lg *zap.Logger, t Tracker, filename string) *watcher {
return &watcher{
t: t,
name: filepath.Clean(filename),
Expand Down
123 changes: 123 additions & 0 deletions watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package tail

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"

"github.com/fsnotify/fsnotify"
"github.com/go-faster/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
)

type wrapTracker struct {
file func(name string)
create func(name string)
t Tracker
}

func (w wrapTracker) watchFile(name string) error {
if w.file != nil {
defer w.file(name)
}
return w.t.watchFile(name)
}

func (w wrapTracker) watchCreate(name string) error {
if w.create != nil {
defer w.create(name)
}
return w.t.watchCreate(name)
}

func (w wrapTracker) removeWatchName(name string) error {
return w.t.removeWatchName(name)
}

func (w wrapTracker) removeWatchCreate(name string) error {
return w.t.removeWatchCreate(name)
}

func (w wrapTracker) listenEvents(name string) <-chan fsnotify.Event {
return w.t.listenEvents(name)
}

func TestCreateAfterWatch(t *testing.T) {
lg := zaptest.NewLogger(t)
g, ctx := errgroup.WithContext(context.Background())
name := filepath.Join(t.TempDir(), "foo.txt")

const lines = 10

started := make(chan struct{})
g.Go(func() error {
select {
case <-started:
case <-ctx.Done():
return ctx.Err()
}

f, err := os.Create(name)
if err != nil {
return err
}
for i := 0; i < lines; i++ {
if _, err := fmt.Fprintln(f, line); err != nil {
return err
}
}
return f.Close()
})

tailer := File(name, Config{
NotifyTimeout: notifyTimeout,
Follow: true,
Logger: lg,
Tracker: wrapTracker{
t: NewTracker(lg),
create: func(name string) {
close(started)
},
},
})

read := make(chan struct{})
g.Go(func() error {
var gotLines int
// Ensure that each tailer got all lines.
h := func(ctx context.Context, l *Line) error {
assert.Equal(t, line, string(l.Data))
gotLines++
if gotLines == lines {
close(read)
}
return nil
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

if err := tailer.Tail(ctx, h); !errors.Is(err, errStop) {
return err
}

return nil
})

// Read lines.
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case <-read: // ok
}
return os.Remove(name)
})

require.NoError(t, g.Wait())
}

0 comments on commit 4c49769

Please sign in to comment.