Skip to content

Commit

Permalink
Merge pull request #59 from nlnwa/frankness
Browse files Browse the repository at this point in the history
Run indexing concurrently in serve command
  • Loading branch information
Avokadoen authored Apr 7, 2022
2 parents b008d0e + 4f6783d commit 3a83a15
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 25 deletions.
10 changes: 9 additions & 1 deletion cmd/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/nlnwa/gowarcserver/internal/config"
"github.com/nlnwa/gowarcserver/internal/database"
"github.com/nlnwa/gowarcserver/internal/index"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"regexp"
Expand Down Expand Up @@ -137,7 +138,7 @@ func indexCmd(_ *cobra.Command, args []string) error {
}
}

indexer, err := index.NewAutoIndexer(indexWorker, dirs,
indexer, err := index.NewAutoIndexer(indexWorker,
index.WithMaxDepth(viper.GetInt("max-depth")),
index.WithIncludes(includes...),
index.WithExcludes(excludes...),
Expand All @@ -147,5 +148,12 @@ func indexCmd(_ *cobra.Command, args []string) error {
}
defer indexer.Close()

for _, dir := range dirs {
err := indexer.Index(dir)
if err != nil {
log.Warn().Msgf(`Error indexing "%s": %v`, dir, err)
}
}

return nil
}
13 changes: 11 additions & 2 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func serveCmd(cmd *cobra.Command, args []string) error {
indexWorker := index.Worker(cdxDb, viper.GetInt("workers"))
defer indexWorker.Close()

autoIndexer, err := index.NewAutoIndexer(indexWorker, dirs,
indexer, err := index.NewAutoIndexer(indexWorker,
index.WithWatch(viper.GetBool("watch")),
index.WithMaxDepth(viper.GetInt("max-depth")),
index.WithIncludes(includes...),
Expand All @@ -158,7 +158,16 @@ func serveCmd(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
defer autoIndexer.Close()
defer indexer.Close()

for _, dir := range dirs {
dir := dir
go func() {
if err := indexer.Index(dir); err != nil {
log.Warn().Msgf(`Error indexing "%s": %v`, dir, err)
}
}()
}
}

// create record loader
Expand Down
76 changes: 55 additions & 21 deletions internal/index/autoindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,49 @@ import (
)

type autoIndexer struct {
watcher *watcher
done chan struct{}
perFileFn func(string)
perDirFn func(string) bool
watcher *watcher
settings *Options
}

type Scheduler interface {
Schedule(job string, batchWindow time.Duration)
}

func NewAutoIndexer(s Scheduler, paths []string, opts ...Option) (*autoIndexer, error) {
func NewAutoIndexer(s Scheduler, opts ...Option) (*autoIndexer, error) {
a := new(autoIndexer)

settings := defaultOptions()
for _, opt := range opts {
opt(settings)
}
a.settings = settings

a := new(autoIndexer)
done := make(chan struct{})
a.done = done

isDone := func() bool {
select {
case <-done:
return true
default:
return false
}
}

perDirFn := func(name string) bool {
if isDone() {
return false
}
return !settings.isExcluded(name)
}

perFileFn := func(name string) {
if isDone() {
return
}
if settings.filter(name) {
s.Schedule(name, 0)
}
Expand All @@ -57,6 +80,9 @@ func NewAutoIndexer(s Scheduler, paths []string, opts ...Option) (*autoIndexer,
a.watcher = w

perDirFn = func(name string) bool {
if isDone() {
return false
}
if settings.isExcluded(name) {
return false
}
Expand All @@ -65,50 +91,58 @@ func NewAutoIndexer(s Scheduler, paths []string, opts ...Option) (*autoIndexer,
}

onFileChanged := func(name string) {
if isDone() {
return
}
if settings.filter(name) {
s.Schedule(name, 10*time.Second)
}
}
go w.Watch(onFileChanged)
}

for _, path := range paths {
_ = index(path, settings.MaxDepth, perFileFn, perDirFn)
}
a.perFileFn = perFileFn
a.perDirFn = perDirFn

return a, nil
}

func (a *autoIndexer) Close() {
a.watcher.Close()
}

func index(path string, maxDepth int, perFileFn func(string), perDirFn func(string) bool) error {
func (a *autoIndexer) Index(path string) error {
info, err := os.Stat(path)
if err != nil {
return fmt.Errorf(`: %s: %w`, path, err)
return fmt.Errorf("failed to get file info: %w", err)
}
if info.IsDir() {
walk(path, 0, maxDepth, perFileFn, perDirFn)
if err := a.walk(path, 0); err != nil {
return err
}
} else {
perFileFn(path)
a.perFileFn(path)
}
return nil
}

func walk(dir string, currentDepth int, maxDepth int, perFileFn func(string), perDirFn func(string) bool) {
func (a *autoIndexer) Close() {
close(a.done)
a.watcher.Close()
}

func (a *autoIndexer) walk(dir string, currentDepth int) error {
entries, err := os.ReadDir(dir)
if err != nil {
return
return fmt.Errorf(`failed to read directory "%s": %w`, dir, err)
}
for _, entry := range entries {
name := filepath.Join(dir, entry.Name())
if !entry.IsDir() {
perFileFn(name)
} else if currentDepth < maxDepth {
if perDirFn(name) {
walk(name, currentDepth+1, maxDepth, perFileFn, perDirFn)
a.perFileFn(name)
} else if currentDepth < a.settings.MaxDepth {
if a.perDirFn(name) {
err = a.walk(name, currentDepth+1)
if err != nil {
return err
}
}
}
}
return nil
}
2 changes: 1 addition & 1 deletion internal/index/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func indexFile(fileName string, r RecordWriter) error {
gowarc.WithSyntaxErrorPolicy(gowarc.ErrIgnore),
gowarc.WithSpecViolationPolicy(gowarc.ErrIgnore),
)
log.Debug().Msgf("Indexed %5d of %5d records in %10v: %s\n", count, total, time.Since(start), fileName)
log.Info().Msgf("Indexed %5d of %5d records in %10v: %s\n", count, total, time.Since(start), fileName)
return err
}

Expand Down

0 comments on commit 3a83a15

Please sign in to comment.