Skip to content

Commit

Permalink
TraceByID: don't allow concurrent_shards greater than query_shards (#…
Browse files Browse the repository at this point in the history
…4074)

concurrent shards grater then query shards should not be allowed because it would create more goroutines then the jobs to send these jobs to queriers.

if that happens, set concurrent_shards=query_shards and log a warning.
  • Loading branch information
electron0zero authored Sep 12, 2024
1 parent 89f3d79 commit 7756ca8
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero)
* **BREAKING CHANGE** tempo-query is no longer a jaeger instance with grpcPlugin. Its now a standalone server. Serving a grpc api for jaeger on `0.0.0.0:7777` by default. [#3840](https://github.com/grafana/tempo/issues/3840) (@frzifus)
* [CHANGE] **BREAKING CHANGE** The dynamic injection of X-Scope-OrgID header for metrics generator remote-writes is changed. If the header is aleady set in per-tenant overrides or global tempo configuration, then it is honored and not overwritten. [#4021](https://github.com/grafana/tempo/pull/4021) (@mdisibio)
* [CHANGE] **BREAKING CHANGE** Migrate from OpenTracing to OpenTelemetry instrumentation. Removed the `use_otel_tracer` configuration option. Use the OpenTelemetry environment variables to configure the span exporter [#3646](https://github.com/grafana/tempo/pull/3646) (@andreasgerstmayr)
Expand Down
9 changes: 9 additions & 0 deletions cmd/tempo/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func (c *Config) CheckConfig() []ConfigWarning {
warnings = append(warnings, warnConfiguredLegacyCache)
}

if c.Frontend.TraceByID.ConcurrentShards > c.Frontend.TraceByID.QueryShards {
warnings = append(warnings, warnTraceByIDConcurrentShards)
}

return warnings
}

Expand Down Expand Up @@ -293,6 +297,11 @@ var (
Message: "c.StorageConfig.Trace.Cache is deprecated and will be removed in a future release.",
Explain: "Please migrate to the top level cache settings config.",
}

warnTraceByIDConcurrentShards = ConfigWarning{
Message: "c.Frontend.TraceByID.ConcurrentShards greater than query_shards is invalid. concurrent_shards will be set to query_shards",
Explain: "Please remove ConcurrentShards or set it to a value less than or equal to QueryShards",
}
)

func newV2Warning(setting string) ConfigWarning {
Expand Down
8 changes: 8 additions & 0 deletions cmd/tempo/app/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/grafana/tempo/modules/frontend"
"github.com/grafana/tempo/tempodb/backend/s3"
"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -52,6 +53,12 @@ func TestConfig_CheckConfig(t *testing.T) {
Enabled: true,
},
},
Frontend: frontend.Config{
TraceByID: frontend.TraceByIDConfig{
QueryShards: 100,
ConcurrentShards: 200,
},
},
},
expect: []ConfigWarning{
warnCompleteBlockTimeout,
Expand All @@ -63,6 +70,7 @@ func TestConfig_CheckConfig(t *testing.T) {
warnLogDiscardedTraces,
warnNativeAWSAuthEnabled,
warnConfiguredLegacyCache,
warnTraceByIDConcurrentShards,
},
},
{
Expand Down
8 changes: 8 additions & 0 deletions modules/frontend/traceid_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,18 @@ func (s asyncTraceSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline

// execute requests
concurrentShards := uint(s.cfg.QueryShards)
// if concurrent shards is set, respect that value
if s.cfg.ConcurrentShards > 0 {
concurrentShards = uint(s.cfg.ConcurrentShards)
}

// concurrent_shards grater then query_shards should not be allowed because it would create
// more goroutines then the jobs to send these jobs to queriers.
if concurrentShards > uint(s.cfg.QueryShards) {
// set the concurrent shards to the total shards
concurrentShards = uint(s.cfg.QueryShards)
}

return pipeline.NewAsyncSharderFunc(ctx, int(concurrentShards), len(reqs), func(i int) pipeline.Request {
return pipeline.NewHTTPRequest(reqs[i])
}, s.next), nil
Expand Down

0 comments on commit 7756ca8

Please sign in to comment.