Skip to content

Commit

Permalink
feat: operator scrape 增加并发控制 --story=119027463 (#473)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx authored Aug 5, 2024
1 parent 0462fd7 commit 0b10d1a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
18 changes: 14 additions & 4 deletions pkg/operator/operator/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s scrapeStat) ID() string {
return fmt.Sprintf("%s/%s", s.Namespace, s.MonitorName)
}

func (c *Operator) scrapeForce(namespace, monitor string) chan string {
func (c *Operator) scrapeForce(namespace, monitor string, workers int) chan string {
statefulset, daemonset := c.collectChildConfigs()
childConfigs := make([]*discover.ChildConfig, 0, len(statefulset)+len(daemonset))
childConfigs = append(childConfigs, statefulset...)
Expand All @@ -55,16 +55,26 @@ func (c *Operator) scrapeForce(namespace, monitor string) chan string {

out := make(chan string, 8)
if len(cfgs) == 0 {
out <- "warning: no monitor targets found"
out <- fmt.Sprintf("warning: no monitor targets found, namespace=%s, monitor=%s", namespace, monitor)
close(out)
return out
}

if workers <= 0 {
workers = 8 // 默认 workers 数
}
sem := make(chan struct{}, workers)
logger.Infof("scrape task: namespace=%s, monitor=%s, workers=%d", namespace, monitor, workers)

wg := sync.WaitGroup{}
wg.Add(len(cfgs))
for _, cfg := range cfgs {
wg.Add(1)
go func(cfg *discover.ChildConfig) {
defer wg.Done()
sem <- struct{}{}
defer func() {
wg.Done()
<-sem
}()
client, err := scraper.New(cfg.Data)
if err != nil {
logger.Warnf("failed to crate scraper http client: %v", err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/operator/operator/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"net/http"
"net/http/pprof"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -104,7 +105,9 @@ func (c *Operator) CheckScrapeNamespaceMonitorRoute(w http.ResponseWriter, r *ht
return
}

ch := c.scrapeForce(namespace, monitor)
worker := r.URL.Query().Get("workers")
i, _ := strconv.Atoi(worker)
ch := c.scrapeForce(namespace, monitor, i)
const batch = 1000
n := 0
for line := range ch {
Expand Down

0 comments on commit 0b10d1a

Please sign in to comment.