Skip to content

Commit

Permalink
Add max tenant config to tenant federation
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Jan 15, 2025
1 parent 68a2993 commit fa7fd17
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455
* [FEATURE] Ingester/StoreGateway: Add support for cache regex query matchers via `-ingester.matchers-cache-max-items` and `-blocks-storage.bucket-store.matchers-cache-max-items`. #6477 #6491
* [ENHANCEMENT] Query Frontend: Add a `source` label to query stat metrics. #6470
* [ENHANCEMENT] Query Frontend: Add a flag `-tenant-federation.max-tenant` to limit the number of tenants for federated query. #6493
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ tenant_federation:
# CLI flag: -tenant-federation.max-concurrent
[max_concurrent: <int> | default = 16]

# A maximum number of tenants to query at once. 0 means no limit.
# CLI flag: -tenant-federation.max-tenant
[max_tenant: <int> | default = 0]

# The ruler_config configures the Cortex ruler.
[ruler: <ruler_config>]

Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
// Wrap roundtripper into Tripperware.
roundTripper = t.QueryFrontendTripperware(roundTripper)

handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
handler := transport.NewHandler(t.Cfg.Frontend.Handler, t.Cfg.TenantFederation, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
t.API.RegisterQueryFrontendHandler(handler)

if frontendV1 != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -279,7 +280,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(transport.NewHandler(config.Handler, rt, logger, nil)))
).Wrap(transport.NewHandler(config.Handler, tenantfederation.Config{}, rt, logger, nil)))

httpServer := http.Server{
Handler: r,
Expand Down
28 changes: 21 additions & 7 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"syscall"
"time"

"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -33,6 +34,8 @@ const (
// StatusClientClosedRequest is the status code for when a client request cancellation of a http request
StatusClientClosedRequest = 499
ServiceTimingHeaderName = "Server-Timing"

errTooManyTenants = "too many tenants, max: %d, actual: %d"
)

var (
Expand Down Expand Up @@ -84,9 +87,10 @@ func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) {
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
// but all other logic is inside the RoundTripper.
type Handler struct {
cfg HandlerConfig
log log.Logger
roundTripper http.RoundTripper
cfg HandlerConfig
tenantFederationCfg tenantfederation.Config
log log.Logger
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
Expand All @@ -101,11 +105,12 @@ type Handler struct {
}

// NewHandler creates a new frontend handler.
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler {
func NewHandler(cfg HandlerConfig, tenantFederationCfg tenantfederation.Config, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler {
h := &Handler{
cfg: cfg,
log: log,
roundTripper: roundTripper,
cfg: cfg,
tenantFederationCfg: tenantFederationCfg,
log: log,
roundTripper: roundTripper,
}

if cfg.QueryStatsEnabled {
Expand Down Expand Up @@ -185,6 +190,15 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}

if f.tenantFederationCfg.Enabled {
maxTenant := f.tenantFederationCfg.MaxTenant
if maxTenant > 0 && len(tenantIDs) > maxTenant {
http.Error(w, fmt.Errorf(errTooManyTenants, maxTenant, len(tenantIDs)).Error(), http.StatusBadRequest)
return
}
}

userID := tenant.JoinTenantIDs(tenantIDs)

// Initialise the stats in the context and make sure it's propagated
Expand Down
106 changes: 104 additions & 2 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/codes"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/tenant"
util_api "github.com/cortexproject/cortex/pkg/util/api"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)
Expand Down Expand Up @@ -178,6 +181,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
})
userID := "12345"
tenantFederationCfg := tenantfederation.Config{}
for _, tt := range []struct {
name string
cfg HandlerConfig
Expand Down Expand Up @@ -379,7 +383,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
handler := NewHandler(tt.cfg, tt.roundTripperFunc, log.NewNopLogger(), reg)
handler := NewHandler(tt.cfg, tenantFederationCfg, tt.roundTripperFunc, log.NewNopLogger(), reg)

ctx := user.InjectOrgID(context.Background(), userID)
req := httptest.NewRequest("GET", "/", nil)
Expand Down Expand Up @@ -413,7 +417,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
func TestReportQueryStatsFormat(t *testing.T) {
outputBuf := bytes.NewBuffer(nil)
logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf))
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, http.DefaultTransport, logger, nil)
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, tenantfederation.Config{}, http.DefaultTransport, logger, nil)
userID := "fake"
req, _ := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil)
resp := &http.Response{ContentLength: 1000}
Expand Down Expand Up @@ -506,3 +510,101 @@ func TestReportQueryStatsFormat(t *testing.T) {
})
}
}

func Test_TenantFederation_MaxTenant(t *testing.T) {
// set a multi tenant resolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())

roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("{}")),
}, nil
})

tests := []struct {
name string
cfg tenantfederation.Config
orgId string
expectedStatusCode int
expectedErrMsg string
}{
{
name: "one tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 0,
},
orgId: "org1",
expectedStatusCode: http.StatusOK,
},
{
name: "less than max tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 3,
},
orgId: "org1|org2",
expectedStatusCode: http.StatusOK,
},
{
name: "equal to max tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 2,
},
orgId: "org1|org2",
expectedStatusCode: http.StatusOK,
},
{
name: "exceeds max tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 2,
},
orgId: "org1|org2|org3",
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "too many tenants, max: 2, actual: 3",
},
{
name: "no org Id",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 0,
},
orgId: "",
expectedStatusCode: http.StatusUnauthorized,
expectedErrMsg: "no org id",
},
{
name: "no limit",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 0,
},
orgId: "org1|org2|org3",
expectedStatusCode: http.StatusOK,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
handler := NewHandler(HandlerConfig{}, test.cfg, roundTripper, log.NewNopLogger(), nil)
handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler)

req := httptest.NewRequest("GET", "http://fake", nil)
req.Header.Set("X-Scope-OrgId", test.orgId)
resp := httptest.NewRecorder()

handlerWithAuth.ServeHTTP(resp, req)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, test.expectedStatusCode, resp.Code)

if test.expectedErrMsg != "" {
require.Contains(t, string(body), test.expectedErrMsg)
}
})
}
}
5 changes: 4 additions & 1 deletion pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -264,14 +265,16 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a

// Default HTTP handler config.
handlerCfg := transport.HandlerConfig{}
tenantFederationCfg := tenantfederation.Config{}

flagext.DefaultValues(&handlerCfg)

rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1)
r := mux.NewRouter()
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(transport.NewHandler(handlerCfg, rt, logger, nil)))
).Wrap(transport.NewHandler(handlerCfg, tenantFederationCfg, rt, logger, nil)))

httpServer := http.Server{
Handler: r,
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/tenantfederation/tenant_federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ type Config struct {
Enabled bool `yaml:"enabled"`
// MaxConcurrent The number of workers used for processing federated query.
MaxConcurrent int `yaml:"max_concurrent"`
// MaxTenant A maximum number of tenants to query at once.
MaxTenant int `yaml:"max_tenant"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).")
f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used to process each federated query.")
f.IntVar(&cfg.MaxTenant, "tenant-federation.max-tenant", 0, "A maximum number of tenants to query at once. 0 means no limit.")
}

0 comments on commit fa7fd17

Please sign in to comment.