Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed May 26, 2020
1 parent d3bcfb8 commit 02544ec
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 51 deletions.
90 changes: 52 additions & 38 deletions internal/file/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package watch
import (
"context"
"reflect"
"sync"

"github.com/fsnotify/fsnotify"
"github.com/vdaas/vald/internal/errgroup"
Expand All @@ -29,15 +30,16 @@ import (

type Watcher interface {
Start(ctx context.Context) (<-chan error, error)
Add(dir string) error
Remove(dir string) error
Add(dirs ...string) error
Remove(dirs ...string) error
Stop(ctx context.Context) error
}

type watch struct {
w *fsnotify.Watcher
eg errgroup.Group
dirs []string
dirs map[string]struct{}
mu sync.RWMutex
onChange func(ctx context.Context, name string) error
onCreate func(ctx context.Context, name string) error
onRename func(ctx context.Context, name string) error
Expand Down Expand Up @@ -66,20 +68,26 @@ func (w *watch) init() (*watch, error) {
if err != nil {
return nil, err
}
if w.w != nil {
err = w.w.Close()
w.mu.RLock()
dirs := w.dirs
w.mu.RUnlock()
for dir := range dirs {
err = watcher.Add(dir)
if err != nil {
return nil, err
}
}
w.w = watcher

for _, dir := range w.dirs {
err = w.w.Add(dir)
w.mu.Lock()
defer w.mu.Unlock()
if w.w != nil {
err = w.w.Close()
if err != nil {
return nil, err
}
}
w.w = watcher

return w, nil
}

Expand Down Expand Up @@ -132,6 +140,7 @@ func (w *watch) Start(ctx context.Context) (<-chan error, error) {
err = w.onChmod(ctx, event.Name)
}
}
case err, ok = <-w.w.Errors:
}
if !ok {
w, err = w.init()
Expand All @@ -140,47 +149,52 @@ func (w *watch) Start(ctx context.Context) (<-chan error, error) {
handleErr(ctx, err)
}
}
return nil
}))
w.eg.Go(safety.RecoverFunc(func() (err error) {
for {
select {
case <-ctx.Done():
return ctx.Err()
case err, ok := <-w.w.Errors:
if !ok {
w, err = w.init()
} else {
err = w.onError(ctx, err)
}
if err != nil {
log.Error(err)
select {
case <-ctx.Done():
case ech <- err:
}
}
}
}
return nil
}))
return ech, nil
}
func (w *watch) Add(dir string) error {
if w.w != nil {
return w.w.Add(dir)

func (w *watch) Add(dirs ...string) (err error) {
w.mu.Lock()
defer w.mu.Unlock()
for _, dir := range dirs {
w.dirs[dir] = struct{}{}
if w.w != nil {
err = w.w.Add(dir)
if err != nil {
return err
}
}
}
return nil
}

func (w *watch) Remove(dir string) error {
if w.w != nil {
return w.w.Remove(dir)
func (w *watch) Remove(dirs ...string) (err error) {
w.mu.Lock()
defer w.mu.Unlock()
for _, dir := range dirs {
delete(w.dirs, dir)
if w.w != nil {
err = w.w.Remove(dir)
if err != nil {
return err
}
}
}
return nil
}

func (w *watch) Stop(ctx context.Context) error {
func (w *watch) Stop(ctx context.Context) (err error) {
w.mu.Lock()
defer w.mu.Unlock()
for dir := range w.dirs {
delete(w.dirs, dir)
if w.w != nil {
err = w.w.Remove(dir)
if err != nil {
return err
}
}
}
if w.w != nil {
return w.w.Close()
}
Expand Down
47 changes: 34 additions & 13 deletions internal/file/watch/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package watch
import (
"context"
"reflect"
"sync"
"testing"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -106,7 +107,8 @@ func Test_watch_init(t *testing.T) {
type fields struct {
w *fsnotify.Watcher
eg errgroup.Group
dirs []string
dirs map[string]struct{}
mu sync.RWMutex
onChange func(ctx context.Context, name string) error
onCreate func(ctx context.Context, name string) error
onRename func(ctx context.Context, name string) error
Expand Down Expand Up @@ -145,6 +147,7 @@ func Test_watch_init(t *testing.T) {
w: nil,
eg: nil,
dirs: nil,
mu: sync.RWMutex{},
onChange: nil,
onCreate: nil,
onRename: nil,
Expand All @@ -167,6 +170,7 @@ func Test_watch_init(t *testing.T) {
w: nil,
eg: nil,
dirs: nil,
mu: sync.RWMutex{},
onChange: nil,
onCreate: nil,
onRename: nil,
Expand Down Expand Up @@ -198,6 +202,7 @@ func Test_watch_init(t *testing.T) {
w: test.fields.w,
eg: test.fields.eg,
dirs: test.fields.dirs,
mu: test.fields.mu,
onChange: test.fields.onChange,
onCreate: test.fields.onCreate,
onRename: test.fields.onRename,
Expand All @@ -223,7 +228,8 @@ func Test_watch_Start(t *testing.T) {
type fields struct {
w *fsnotify.Watcher
eg errgroup.Group
dirs []string
dirs map[string]struct{}
mu sync.RWMutex
onChange func(ctx context.Context, name string) error
onCreate func(ctx context.Context, name string) error
onRename func(ctx context.Context, name string) error
Expand Down Expand Up @@ -266,6 +272,7 @@ func Test_watch_Start(t *testing.T) {
w: nil,
eg: nil,
dirs: nil,
mu: sync.RWMutex{},
onChange: nil,
onCreate: nil,
onRename: nil,
Expand All @@ -291,6 +298,7 @@ func Test_watch_Start(t *testing.T) {
w: nil,
eg: nil,
dirs: nil,
mu: sync.RWMutex{},
onChange: nil,
onCreate: nil,
onRename: nil,
Expand Down Expand Up @@ -322,6 +330,7 @@ func Test_watch_Start(t *testing.T) {
w: test.fields.w,
eg: test.fields.eg,
dirs: test.fields.dirs,
mu: test.fields.mu,
onChange: test.fields.onChange,
onCreate: test.fields.onCreate,
onRename: test.fields.onRename,
Expand All @@ -342,12 +351,13 @@ func Test_watch_Start(t *testing.T) {

func Test_watch_Add(t *testing.T) {
type args struct {
dir string
dirs []string
}
type fields struct {
w *fsnotify.Watcher
eg errgroup.Group
dirs []string
dirs map[string]struct{}
mu sync.RWMutex
onChange func(ctx context.Context, name string) error
onCreate func(ctx context.Context, name string) error
onRename func(ctx context.Context, name string) error
Expand Down Expand Up @@ -380,12 +390,13 @@ func Test_watch_Add(t *testing.T) {
{
name: "test_case_1",
args: args {
dir: "",
dirs: nil,
},
fields: fields {
w: nil,
eg: nil,
dirs: nil,
mu: sync.RWMutex{},
onChange: nil,
onCreate: nil,
onRename: nil,
Expand All @@ -405,12 +416,13 @@ func Test_watch_Add(t *testing.T) {
return test {
name: "test_case_2",
args: args {
dir: "",
dirs: nil,
},
fields: fields {
w: nil,
eg: nil,
dirs: nil,
mu: sync.RWMutex{},
onChange: nil,
onCreate: nil,
onRename: nil,
Expand Down Expand Up @@ -442,6 +454,7 @@ func Test_watch_Add(t *testing.T) {
w: test.fields.w,
eg: test.fields.eg,
dirs: test.fields.dirs,
mu: test.fields.mu,
onChange: test.fields.onChange,
onCreate: test.fields.onCreate,
onRename: test.fields.onRename,
Expand All @@ -451,7 +464,7 @@ func Test_watch_Add(t *testing.T) {
onError: test.fields.onError,
}

err := w.Add(test.args.dir)
err := w.Add(test.args.dirs...)
if err := test.checkFunc(test.want, err); err != nil {
tt.Errorf("error = %v", err)
}
Expand All @@ -462,12 +475,13 @@ func Test_watch_Add(t *testing.T) {

func Test_watch_Remove(t *testing.T) {
type args struct {
dir string
dirs []string
}
type fields struct {
w *fsnotify.Watcher
eg errgroup.Group
dirs []string
dirs map[string]struct{}
mu sync.RWMutex
onChange func(ctx context.Context, name string) error
onCreate func(ctx context.Context, name string) error
onRename func(ctx context.Context, name string) error
Expand Down Expand Up @@ -500,12 +514,13 @@ func Test_watch_Remove(t *testing.T) {
{
name: "test_case_1",
args: args {
dir: "",
dirs: nil,
},
fields: fields {
w: nil,
eg: nil,
dirs: nil,
mu: sync.RWMutex{},
onChange: nil,
onCreate: nil,
onRename: nil,
Expand All @@ -525,12 +540,13 @@ func Test_watch_Remove(t *testing.T) {
return test {
name: "test_case_2",
args: args {
dir: "",
dirs: nil,
},
fields: fields {
w: nil,
eg: nil,
dirs: nil,
mu: sync.RWMutex{},
onChange: nil,
onCreate: nil,
onRename: nil,
Expand Down Expand Up @@ -562,6 +578,7 @@ func Test_watch_Remove(t *testing.T) {
w: test.fields.w,
eg: test.fields.eg,
dirs: test.fields.dirs,
mu: test.fields.mu,
onChange: test.fields.onChange,
onCreate: test.fields.onCreate,
onRename: test.fields.onRename,
Expand All @@ -571,7 +588,7 @@ func Test_watch_Remove(t *testing.T) {
onError: test.fields.onError,
}

err := w.Remove(test.args.dir)
err := w.Remove(test.args.dirs...)
if err := test.checkFunc(test.want, err); err != nil {
tt.Errorf("error = %v", err)
}
Expand All @@ -587,7 +604,8 @@ func Test_watch_Stop(t *testing.T) {
type fields struct {
w *fsnotify.Watcher
eg errgroup.Group
dirs []string
dirs map[string]struct{}
mu sync.RWMutex
onChange func(ctx context.Context, name string) error
onCreate func(ctx context.Context, name string) error
onRename func(ctx context.Context, name string) error
Expand Down Expand Up @@ -626,6 +644,7 @@ func Test_watch_Stop(t *testing.T) {
w: nil,
eg: nil,
dirs: nil,
mu: sync.RWMutex{},
onChange: nil,
onCreate: nil,
onRename: nil,
Expand All @@ -651,6 +670,7 @@ func Test_watch_Stop(t *testing.T) {
w: nil,
eg: nil,
dirs: nil,
mu: sync.RWMutex{},
onChange: nil,
onCreate: nil,
onRename: nil,
Expand Down Expand Up @@ -682,6 +702,7 @@ func Test_watch_Stop(t *testing.T) {
w: test.fields.w,
eg: test.fields.eg,
dirs: test.fields.dirs,
mu: test.fields.mu,
onChange: test.fields.onChange,
onCreate: test.fields.onCreate,
onRename: test.fields.onRename,
Expand Down

0 comments on commit 02544ec

Please sign in to comment.