diff --git a/main.go b/main.go index ae627c8..3fedf06 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "log" + "math" "os" "sync" @@ -16,19 +17,20 @@ import ( var version = "dev" -func runScraper(s *scraper.Scraper, itemsChannel chan map[string]interface{}, globalConfig *scraper.GlobalConfig) { - log.Printf("scraping %s\n", s.Name) - // This could probably be improved. We could pass the channel to - // GetItems instead of waiting for the scraper to finish. - items, err := s.GetItems(globalConfig, false) - if err != nil { - log.Printf("%s ERROR: %s", s.Name, err) - return - } - log.Printf("fetched %d %s items\n", len(items), s.Name) - for _, item := range items { - itemsChannel <- item +func worker(sc chan scraper.Scraper, ic chan map[string]interface{}, gc *scraper.GlobalConfig, threadNr int) { + for s := range sc { + log.Printf("thread %d: scraping %s\n", threadNr, s.Name) + items, err := s.GetItems(gc, false) + if err != nil { + log.Printf("%s ERROR: %s", s.Name, err) + continue + } + log.Printf("thread %d: fetched %d %s items\n", threadNr, len(items), s.Name) + for _, item := range items { + ic <- item + } } + log.Printf("thread %d: done working\n", threadNr) } func main() { @@ -107,9 +109,9 @@ func main() { return } - var scraperWg sync.WaitGroup + var workerWg sync.WaitGroup var writerWg sync.WaitGroup - itemsChannel := make(chan map[string]interface{}, len(config.Scrapers)) + ic := make(chan map[string]interface{}) var writer output.Writer if *toStdout { @@ -130,21 +132,40 @@ func main() { if config.Global.UserAgent == "" { config.Global.UserAgent = "goskyr web scraper (github.com/jakopako/goskyr)" } - for _, s := range config.Scrapers { - if *singleScraper == "" || *singleScraper == s.Name { - scraperWg.Add(1) - go func(scr scraper.Scraper) { - defer scraperWg.Done() - runScraper(&scr, itemsChannel, &config.Global) - }(s) + + sc := make(chan scraper.Scraper) + + // fill worker queue + go func() { + for _, s := range config.Scrapers { + if *singleScraper == "" || *singleScraper == s.Name { + sc <- s + } } + close(sc) + }() + + // start workers + nrWorkers := 1 + if *singleScraper == "" { + nrWorkers = int(math.Min(20, float64(len(config.Scrapers)))) + } + log.Printf("running with %d threads\n", nrWorkers) + workerWg.Add(nrWorkers) + for i := 0; i < nrWorkers; i++ { + go func(j int) { + defer workerWg.Done() + worker(sc, ic, &config.Global, j) + }(i) } + + // start writer writerWg.Add(1) go func() { defer writerWg.Done() - writer.Write(itemsChannel) + writer.Write(ic) }() - scraperWg.Wait() - close(itemsChannel) + workerWg.Wait() + close(ic) writerWg.Wait() }