From fa7fd17268492165cbd11dd6cface15107389a34 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Wed, 15 Jan 2025 14:19:18 +0900 Subject: [PATCH] Add max tenant config to tenant federation Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 4 + pkg/cortex/modules.go | 2 +- pkg/frontend/frontend_test.go | 3 +- pkg/frontend/transport/handler.go | 28 +++-- pkg/frontend/transport/handler_test.go | 106 +++++++++++++++++- pkg/frontend/v1/frontend_test.go | 5 +- .../tenantfederation/tenant_federation.go | 3 + 8 files changed, 140 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e3c309dae9..e4820d90b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455 * [FEATURE] Ingester/StoreGateway: Add support for cache regex query matchers via `-ingester.matchers-cache-max-items` and `-blocks-storage.bucket-store.matchers-cache-max-items`. #6477 #6491 * [ENHANCEMENT] Query Frontend: Add a `source` label to query stat metrics. #6470 +* [ENHANCEMENT] Query Frontend: Add a flag `-tenant-federation.max-tenant` to limit the number of tenants for federated query. #6493 * [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449 * [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423 * [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 548c9c6798..28154b7b18 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -161,6 +161,10 @@ tenant_federation: # CLI flag: -tenant-federation.max-concurrent [max_concurrent: | default = 16] + # A maximum number of tenants to query at once. 0 means no limit. + # CLI flag: -tenant-federation.max-tenant + [max_tenant: | default = 0] + # The ruler_config configures the Cortex ruler. [ruler: ] diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 795c588af5..390212a313 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -536,7 +536,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) { // Wrap roundtripper into Tripperware. roundTripper = t.QueryFrontendTripperware(roundTripper) - handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer) + handler := transport.NewHandler(t.Cfg.Frontend.Handler, t.Cfg.TenantFederation, roundTripper, util_log.Logger, prometheus.DefaultRegisterer) t.API.RegisterQueryFrontendHandler(handler) if frontendV1 != nil { diff --git a/pkg/frontend/frontend_test.go b/pkg/frontend/frontend_test.go index 77694689bd..08251a2f2e 100644 --- a/pkg/frontend/frontend_test.go +++ b/pkg/frontend/frontend_test.go @@ -27,6 +27,7 @@ import ( "github.com/cortexproject/cortex/pkg/frontend/transport" frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" + "github.com/cortexproject/cortex/pkg/querier/tenantfederation" querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -279,7 +280,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand r.PathPrefix("/").Handler(middleware.Merge( middleware.AuthenticateUser, middleware.Tracer{}, - ).Wrap(transport.NewHandler(config.Handler, rt, logger, nil))) + ).Wrap(transport.NewHandler(config.Handler, tenantfederation.Config{}, rt, logger, nil))) httpServer := http.Server{ Handler: r, diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 68a0ae6144..7cd8375136 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -14,6 +14,7 @@ import ( "syscall" "time" + "github.com/cortexproject/cortex/pkg/querier/tenantfederation" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -33,6 +34,8 @@ const ( // StatusClientClosedRequest is the status code for when a client request cancellation of a http request StatusClientClosedRequest = 499 ServiceTimingHeaderName = "Server-Timing" + + errTooManyTenants = "too many tenants, max: %d, actual: %d" ) var ( @@ -84,9 +87,10 @@ func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) { // Handler accepts queries and forwards them to RoundTripper. It can log slow queries, // but all other logic is inside the RoundTripper. type Handler struct { - cfg HandlerConfig - log log.Logger - roundTripper http.RoundTripper + cfg HandlerConfig + tenantFederationCfg tenantfederation.Config + log log.Logger + roundTripper http.RoundTripper // Metrics. querySeconds *prometheus.CounterVec @@ -101,11 +105,12 @@ type Handler struct { } // NewHandler creates a new frontend handler. -func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler { +func NewHandler(cfg HandlerConfig, tenantFederationCfg tenantfederation.Config, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler { h := &Handler{ - cfg: cfg, - log: log, - roundTripper: roundTripper, + cfg: cfg, + tenantFederationCfg: tenantFederationCfg, + log: log, + roundTripper: roundTripper, } if cfg.QueryStatsEnabled { @@ -185,6 +190,15 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { return } + + if f.tenantFederationCfg.Enabled { + maxTenant := f.tenantFederationCfg.MaxTenant + if maxTenant > 0 && len(tenantIDs) > maxTenant { + http.Error(w, fmt.Errorf(errTooManyTenants, maxTenant, len(tenantIDs)).Error(), http.StatusBadRequest) + return + } + } + userID := tenant.JoinTenantIDs(tenantIDs) // Initialise the stats in the context and make sure it's propagated diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index c21e1bf18e..92e0b59fd4 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -20,11 +20,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" "google.golang.org/grpc/codes" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/querier/tenantfederation" "github.com/cortexproject/cortex/pkg/querier/tripperware" + "github.com/cortexproject/cortex/pkg/tenant" util_api "github.com/cortexproject/cortex/pkg/util/api" util_log "github.com/cortexproject/cortex/pkg/util/log" ) @@ -178,6 +181,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }) userID := "12345" + tenantFederationCfg := tenantfederation.Config{} for _, tt := range []struct { name string cfg HandlerConfig @@ -379,7 +383,7 @@ func TestHandler_ServeHTTP(t *testing.T) { } { t.Run(tt.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - handler := NewHandler(tt.cfg, tt.roundTripperFunc, log.NewNopLogger(), reg) + handler := NewHandler(tt.cfg, tenantFederationCfg, tt.roundTripperFunc, log.NewNopLogger(), reg) ctx := user.InjectOrgID(context.Background(), userID) req := httptest.NewRequest("GET", "/", nil) @@ -413,7 +417,7 @@ func TestHandler_ServeHTTP(t *testing.T) { func TestReportQueryStatsFormat(t *testing.T) { outputBuf := bytes.NewBuffer(nil) logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf)) - handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, http.DefaultTransport, logger, nil) + handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, tenantfederation.Config{}, http.DefaultTransport, logger, nil) userID := "fake" req, _ := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil) resp := &http.Response{ContentLength: 1000} @@ -506,3 +510,101 @@ func TestReportQueryStatsFormat(t *testing.T) { }) } } + +func Test_TenantFederation_MaxTenant(t *testing.T) { + // set a multi tenant resolver + tenant.WithDefaultResolver(tenant.NewMultiResolver()) + + roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("{}")), + }, nil + }) + + tests := []struct { + name string + cfg tenantfederation.Config + orgId string + expectedStatusCode int + expectedErrMsg string + }{ + { + name: "one tenant", + cfg: tenantfederation.Config{ + Enabled: true, + MaxTenant: 0, + }, + orgId: "org1", + expectedStatusCode: http.StatusOK, + }, + { + name: "less than max tenant", + cfg: tenantfederation.Config{ + Enabled: true, + MaxTenant: 3, + }, + orgId: "org1|org2", + expectedStatusCode: http.StatusOK, + }, + { + name: "equal to max tenant", + cfg: tenantfederation.Config{ + Enabled: true, + MaxTenant: 2, + }, + orgId: "org1|org2", + expectedStatusCode: http.StatusOK, + }, + { + name: "exceeds max tenant", + cfg: tenantfederation.Config{ + Enabled: true, + MaxTenant: 2, + }, + orgId: "org1|org2|org3", + expectedStatusCode: http.StatusBadRequest, + expectedErrMsg: "too many tenants, max: 2, actual: 3", + }, + { + name: "no org Id", + cfg: tenantfederation.Config{ + Enabled: true, + MaxTenant: 0, + }, + orgId: "", + expectedStatusCode: http.StatusUnauthorized, + expectedErrMsg: "no org id", + }, + { + name: "no limit", + cfg: tenantfederation.Config{ + Enabled: true, + MaxTenant: 0, + }, + orgId: "org1|org2|org3", + expectedStatusCode: http.StatusOK, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + handler := NewHandler(HandlerConfig{}, test.cfg, roundTripper, log.NewNopLogger(), nil) + handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler) + + req := httptest.NewRequest("GET", "http://fake", nil) + req.Header.Set("X-Scope-OrgId", test.orgId) + resp := httptest.NewRecorder() + + handlerWithAuth.ServeHTTP(resp, req) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, test.expectedStatusCode, resp.Code) + + if test.expectedErrMsg != "" { + require.Contains(t, string(body), test.expectedErrMsg) + } + }) + } +} diff --git a/pkg/frontend/v1/frontend_test.go b/pkg/frontend/v1/frontend_test.go index ef7cd705f2..7ae0c97c29 100644 --- a/pkg/frontend/v1/frontend_test.go +++ b/pkg/frontend/v1/frontend_test.go @@ -29,6 +29,7 @@ import ( "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" + "github.com/cortexproject/cortex/pkg/querier/tenantfederation" querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/cortexproject/cortex/pkg/scheduler/queue" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -264,6 +265,8 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a // Default HTTP handler config. handlerCfg := transport.HandlerConfig{} + tenantFederationCfg := tenantfederation.Config{} + flagext.DefaultValues(&handlerCfg) rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1) @@ -271,7 +274,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a r.PathPrefix("/").Handler(middleware.Merge( middleware.AuthenticateUser, middleware.Tracer{}, - ).Wrap(transport.NewHandler(handlerCfg, rt, logger, nil))) + ).Wrap(transport.NewHandler(handlerCfg, tenantFederationCfg, rt, logger, nil))) httpServer := http.Server{ Handler: r, diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index 56e5fb59db..4b161ab732 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -9,9 +9,12 @@ type Config struct { Enabled bool `yaml:"enabled"` // MaxConcurrent The number of workers used for processing federated query. MaxConcurrent int `yaml:"max_concurrent"` + // MaxTenant A maximum number of tenants to query at once. + MaxTenant int `yaml:"max_tenant"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).") f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used to process each federated query.") + f.IntVar(&cfg.MaxTenant, "tenant-federation.max-tenant", 0, "A maximum number of tenants to query at once. 0 means no limit.") }