Skip to content

Commit

Permalink
Add concurrency limit for remote write
Browse files Browse the repository at this point in the history
Prior to this commit, unbounded number of write would have been trigged from avalanche based on number of samples and batch size. Adding a new flag to limit the concurrency will be useful to simulate prometheus remote write sharding.

Signed-off-by: Arunprasad Rajkumar <[email protected]>
  • Loading branch information
arajkumar committed Sep 22, 2022
1 parent 19b624b commit ca0b35e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
40 changes: 21 additions & 19 deletions cmd/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,26 @@ import (
)

var (
metricCount = kingpin.Flag("metric-count", "Number of metrics to serve.").Default("500").Int()
labelCount = kingpin.Flag("label-count", "Number of labels per-metric.").Default("10").Int()
seriesCount = kingpin.Flag("series-count", "Number of series per-metric.").Default("10").Int()
metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int()
labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int()
constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings()
valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int()
labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int()
metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int()
port = kingpin.Flag("port", "Port to serve at").Default("9001").Int()
remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL()
remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList()
remotePprofInterval = kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles.When not provided it will download a profile once before the end of the test.").Duration()
remoteBatchSize = kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int()
remoteRequestCount = kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int()
remoteReqsInterval = kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration()
remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String()
tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool()
remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String()
metricCount = kingpin.Flag("metric-count", "Number of metrics to serve.").Default("500").Int()
labelCount = kingpin.Flag("label-count", "Number of labels per-metric.").Default("10").Int()
seriesCount = kingpin.Flag("series-count", "Number of series per-metric.").Default("10").Int()
metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int()
labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int()
constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings()
valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int()
labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int()
metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int()
port = kingpin.Flag("port", "Port to serve at").Default("9001").Int()
remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL()
remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList()
remotePprofInterval = kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles.When not provided it will download a profile once before the end of the test.").Duration()
remoteBatchSize = kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int()
remoteConcurrencyLimit = kingpin.Flag("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").Int()
remoteRequestCount = kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int()
remoteReqsInterval = kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration()
remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String()
tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool()
remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String()
)

func main() {
Expand Down Expand Up @@ -77,6 +78,7 @@ func main() {
RequestInterval: *remoteReqsInterval,
BatchSize: *remoteBatchSize,
RequestCount: *remoteRequestCount,
Concurrency: *remoteConcurrencyLimit,
UpdateNotify: updateNotify,
Tenant: *remoteTenant,
TLSClientConfig: tls.Config{
Expand Down
6 changes: 6 additions & 0 deletions metrics/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ConfigWrite struct {
Tenant string
TLSClientConfig tls.Config
TenantHeader string
Concurrency int
}

// Client for the remote write requests.
Expand Down Expand Up @@ -126,6 +127,7 @@ func (c *Client) write() error {
log.Printf("Sending: %v timeseries, %v samples, %v timeseries per request, %v delay between requests\n", len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval)
ticker := time.NewTicker(c.config.RequestInterval)
defer ticker.Stop()
concurrencyLimitCh := make(chan struct{}, c.config.Concurrency)
for ii := 0; ii < c.config.RequestCount; ii++ {
// Download the pprofs during half of the iteration to get avarege readings.
// Do that only when it is not set to take profiles at a given interval.
Expand All @@ -151,7 +153,11 @@ func (c *Client) write() error {
start := time.Now()
for i := 0; i < len(tss); i += c.config.BatchSize {
wgMetrics.Add(1)
concurrencyLimitCh <- struct{}{}
go func(i int) {
defer func() {
<-concurrencyLimitCh
}()
defer wgMetrics.Done()
end := i + c.config.BatchSize
if end > len(tss) {
Expand Down

0 comments on commit ca0b35e

Please sign in to comment.