diff --git a/pkg/operator/operator/scrape.go b/pkg/operator/operator/scrape.go index aa77c8946..6176a9e6d 100644 --- a/pkg/operator/operator/scrape.go +++ b/pkg/operator/operator/scrape.go @@ -38,7 +38,7 @@ func (s scrapeStat) ID() string { return fmt.Sprintf("%s/%s", s.Namespace, s.MonitorName) } -func (c *Operator) scrapeForce(namespace, monitor string) chan string { +func (c *Operator) scrapeForce(namespace, monitor string, workers int) chan string { statefulset, daemonset := c.collectChildConfigs() childConfigs := make([]*discover.ChildConfig, 0, len(statefulset)+len(daemonset)) childConfigs = append(childConfigs, statefulset...) @@ -55,16 +55,26 @@ func (c *Operator) scrapeForce(namespace, monitor string) chan string { out := make(chan string, 8) if len(cfgs) == 0 { - out <- "warning: no monitor targets found" + out <- fmt.Sprintf("warning: no monitor targets found, namespace=%s, monitor=%s", namespace, monitor) close(out) return out } + if workers <= 0 { + workers = 8 // 默认 workers 数 + } + sem := make(chan struct{}, workers) + logger.Infof("scrape task: namespace=%s, monitor=%s, workers=%d", namespace, monitor, workers) + wg := sync.WaitGroup{} + wg.Add(len(cfgs)) for _, cfg := range cfgs { - wg.Add(1) go func(cfg *discover.ChildConfig) { - defer wg.Done() + sem <- struct{}{} + defer func() { + wg.Done() + <-sem + }() client, err := scraper.New(cfg.Data) if err != nil { logger.Warnf("failed to crate scraper http client: %v", err) diff --git a/pkg/operator/operator/server.go b/pkg/operator/operator/server.go index 03b404edc..b9128a50b 100644 --- a/pkg/operator/operator/server.go +++ b/pkg/operator/operator/server.go @@ -15,6 +15,7 @@ import ( "fmt" "net/http" "net/http/pprof" + "strconv" "strings" "time" @@ -104,7 +105,9 @@ func (c *Operator) CheckScrapeNamespaceMonitorRoute(w http.ResponseWriter, r *ht return } - ch := c.scrapeForce(namespace, monitor) + worker := r.URL.Query().Get("workers") + i, _ := strconv.Atoi(worker) + ch := c.scrapeForce(namespace, monitor, i) const batch = 1000 n := 0 for line := range ch {