Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: increase ratelimit-burst-size for global use #6277

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions common/quotas/dynamicratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package quotas

import (
"context"
"time"

"golang.org/x/time/rate"

Expand All @@ -33,15 +34,30 @@ import (
// DynamicRateLimiter implements a dynamic config wrapper around the rate limiter,
// checks for updates to the dynamic config and updates the rate limiter accordingly
type DynamicRateLimiter struct {
rps RPSFunc
rl *RateLimiter
rps RPSFunc
rl *RateLimiter
burstWindow time.Duration
}

// NewDynamicRateLimiter returns a rate limiter which handles dynamic config
// NewDynamicRateLimiter returns a rate limiter which handles dynamic config.
// This limiter allows one second of requests to burst.
func NewDynamicRateLimiter(rps RPSFunc) *DynamicRateLimiter {
initialRps := rps()
rl := NewRateLimiter(&initialRps, _defaultRPSTTL, _burstSize)
return &DynamicRateLimiter{rps, rl}
return &DynamicRateLimiter{
rps: rps,
rl: rl,
}
}

// NewWindowedDynamicRateLimiter allows a configurable burst window, rather than one second.
func NewWindowedDynamicRateLimiter(rps RPSFunc, burstWindow time.Duration) *DynamicRateLimiter {
initialRPS := rps()
rl := NewRateLimiterWithBurstWindow(&initialRPS, _defaultRPSTTL, burstWindow)
return &DynamicRateLimiter{
rps: rps,
rl: rl,
}
}

// Allow immediately returns with true or false indicating if a rate limit
Expand Down
51 changes: 49 additions & 2 deletions common/quotas/global/collection/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,13 @@
// should not happen outside pretty major errors, but may recover next time.
c.logger.Error("aggregator update error", tag.Error(res.Err))
}

// currently hardcoded, but may be useful to make configurable?
// 10s could be larger than desired.
const (
burstWindow = 10 * time.Second
)

// either way, process all weights we did successfully retrieve.
for gkey, info := range res.Weights {
delete(usage, gkey) // clean up the list so we know what was missed
Expand All @@ -474,8 +481,9 @@
target := rate.Limit(c.targetRPS(lkey))
limiter := c.global.Load(lkey)
fallbackTarget := limiter.FallbackLimit()
boosted := boostRPS(target, fallbackTarget, info.Weight, info.UsedRPS)
limiter.Update(boosted)
boostedRPS := boostRPS(target, fallbackTarget, info.Weight, info.UsedRPS)
boostedBurst := boostBurst(target, boostedRPS, burstWindow)
limiter.Update(boostedRPS, boostedBurst)
}
}

Expand All @@ -498,6 +506,45 @@
}
}

func boostBurst(target, boosted rate.Limit, window time.Duration) int {
// the default behavior of "rps == burst" is quite bad with low-volume bursty use,
// especially when RPS or weight is low. 100RPS or lower configured can easily lead
// to ~1 request allowed, rejecting most of a 10-request burst that occurs every minute or so.
//
// obviously there are tradeoffs here, but low-RPS behaves so badly that we feel the risk
// of allowing large multi-host bursts beyond our capacity is worth it.
// they're rare enough in practice, and the burst size will decrease as utilization increases.
//
// basic logic is that the burst value should:
// - never exceed the full target RPS as a safety limit
// - allow at least a few seconds in most cases
// - be based on update-frequency, to allow buffering traffic shifts while weights rebalance
//
// this way:
// - heavily-weighted hosts trend towards burst==RPS, forcing imbalanced load to be smooth-ish
// - low-weighted hosts allow large bursts, and will re-balance "soon" if load has just shifted
// - well-distributed requests allow large bursts, but not exceeding RPS per window
// - low-weight but high-cluster-usage may be substantially below target, which seems fair,
// and probably won't be noticed in the high-usage flood.
//
// currently the target is ~10s, as this matches our metrics aggregation, and that metrics
// difference vs limiting behavior is frequently confusing to our users.

burst := float64(boosted) * (float64(window) / float64(time.Second)) // rps * num seconds in window, without flooring
if burst > float64(target) {
return int(target) // do not exceed cluster RPS, that seems overly dangerous
}

return int(burst)

Check warning on line 538 in common/quotas/global/collection/collection.go

View check run for this annotation

Codecov / codecov/patch

common/quotas/global/collection/collection.go#L538

Added line #L538 was not covered by tests
}

func max[T ~int64](a, b T) T {
if a > b {
return a

Check warning on line 543 in common/quotas/global/collection/collection.go

View check run for this annotation

Codecov / codecov/patch

common/quotas/global/collection/collection.go#L541-L543

Added lines #L541 - L543 were not covered by tests
}
return b

Check warning on line 545 in common/quotas/global/collection/collection.go

View check run for this annotation

Codecov / codecov/patch

common/quotas/global/collection/collection.go#L545

Added line #L545 was not covered by tests
}

func boostRPS(target, fallback rate.Limit, weight float64, usedRPS float64) rate.Limit {
baseline := target * rate.Limit(weight)

Expand Down
2 changes: 1 addition & 1 deletion common/quotas/global/collection/internal/counted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestRegression_ReserveCountsCorrectly(t *testing.T) {
ts := clock.NewMockedTimeSource()
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
l := NewFallbackLimiter(allowlimiter{})
l.Update(1) // allows using primary, else it calls the fallback
l.Update(1, 1) // allows using primary, else it calls the fallback
l.primary = wrapped // cheat, just swap it out

run(t, l, ts.Advance, func() UsageMetrics {
Expand Down
4 changes: 2 additions & 2 deletions common/quotas/global/collection/internal/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (b *FallbackLimiter) useFallback() bool {

// Update adjusts the underlying "primary" ratelimit, and resets the fallback fuse.
// This implies switching to the "primary" limiter - if that is not desired, call Reset() immediately after.
func (b *FallbackLimiter) Update(lim rate.Limit) {
func (b *FallbackLimiter) Update(lim rate.Limit, burst int) {
// caution: order here matters, to prevent potentially-old limiter values from being used
// before they are updated.
//
Expand All @@ -170,7 +170,7 @@ func (b *FallbackLimiter) Update(lim rate.Limit) {
}

b.primary.SetLimit(lim)
b.primary.SetBurst(max(1, int(lim))) // 0 burst blocks all requests, so allow at least 1 and rely on rps to fill sanely
b.primary.SetBurst(max(1, burst)) // 0 burst blocks all requests, so allow at least 1 and rely on rps to fill sanely
}

// FailedUpdate should be called when a limit fails to update from an aggregator,
Expand Down
12 changes: 6 additions & 6 deletions common/quotas/global/collection/internal/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestLimiter(t *testing.T) {
})
t.Run("uses primary after update", func(t *testing.T) {
lim := NewFallbackLimiter(allowlimiter{})
lim.Update(1_000_000) // large enough to allow millisecond sleeps to refill
lim.Update(1_000_000, 10) // large enough to allow millisecond sleeps to refill

time.Sleep(time.Millisecond) // allow some tokens to fill
assert.True(t, lim.Allow(), "limiter allows after enough time has passed")
Expand All @@ -69,7 +69,7 @@ func TestLimiter(t *testing.T) {

t.Run("collecting usage data resets counts", func(t *testing.T) {
lim := NewFallbackLimiter(allowlimiter{})
lim.Update(1)
lim.Update(1, 1)
lim.Allow()
limit, _, _ := lim.Collect()
assert.Equal(t, 1, limit.Allowed+limit.Rejected, "should count one request")
Expand All @@ -88,7 +88,7 @@ func TestLimiter(t *testing.T) {

t.Run("falls back after too many failures", func(t *testing.T) {
lim := NewFallbackLimiter(allowlimiter{}) // fallback behavior is ignored
lim.Update(1)
lim.Update(1, 1)
_, startup, failing := lim.Collect()
require.False(t, failing, "should not be using fallback")
require.False(t, startup, "should not be starting up, has had an update")
Expand Down Expand Up @@ -124,8 +124,8 @@ func TestLimiter(t *testing.T) {
t.Run("coverage", func(t *testing.T) {
// easy line to cover to bring to 100%
lim := NewFallbackLimiter(nil)
lim.Update(1)
lim.Update(1) // should go down "no changes needed, return early" path
lim.Update(1, 1)
lim.Update(1, 1) // should go down "no changes needed, return early" path
})
}

Expand All @@ -145,7 +145,7 @@ func TestLimiterNotRacy(t *testing.T) {
// this should randomly clear occasionally via failures.
if rand.Intn(10) == 0 {
g.Go(func() error {
lim.Update(rate.Limit(1 / rand.Float64())) // essentially never exercises "same value, do nothing" logic
lim.Update(rate.Limit(1/rand.Float64()), 1) // essentially never exercises "same value, do nothing" logic
return nil
})
} else {
Expand Down
13 changes: 11 additions & 2 deletions common/quotas/permember.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package quotas

import (
"math"
"time"

"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/membership"
Expand Down Expand Up @@ -52,12 +53,14 @@ func NewPerMemberDynamicRateLimiterFactory(
globalRPS dynamicconfig.IntPropertyFnWithDomainFilter,
instanceRPS dynamicconfig.IntPropertyFnWithDomainFilter,
resolver membership.Resolver,
burstWindow time.Duration,
) LimiterFactory {
return perMemberFactory{
service: service,
globalRPS: globalRPS,
instanceRPS: instanceRPS,
resolver: resolver,
burstWindow: burstWindow,
}
}

Expand All @@ -66,15 +69,21 @@ type perMemberFactory struct {
globalRPS dynamicconfig.IntPropertyFnWithDomainFilter
instanceRPS dynamicconfig.IntPropertyFnWithDomainFilter
resolver membership.Resolver

// burstWindow is the maximum token-rps burst size.
//
// By default this should be 1 second, which is what most non-user-facing
// limiters use, and the only value we have used historically.
burstWindow time.Duration
}

func (f perMemberFactory) GetLimiter(domain string) Limiter {
return NewDynamicRateLimiter(func() float64 {
return NewWindowedDynamicRateLimiter(func() float64 {
return PerMember(
f.service,
float64(f.globalRPS(domain)),
float64(f.instanceRPS(domain)),
f.resolver,
)
})
}, f.burstWindow)
}
2 changes: 2 additions & 0 deletions common/quotas/permember_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package quotas

import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -62,6 +63,7 @@ func Test_PerMemberFactory(t *testing.T) {
func(string) int { return 20 },
func(string) int { return 3 },
resolver,
time.Second,
)

limiter := factory.GetLimiter("TestDomainName")
Expand Down
24 changes: 21 additions & 3 deletions common/quotas/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ type RateLimiter struct {
goRateLimiter atomic.Value
// TTL is used to determine whether to update the limit. Until TTL, pick
// lower(existing TTL, input TTL). After TTL, pick input TTL if different from existing TTL
ttlTimer *time.Timer
ttl time.Duration
minBurst int
ttlTimer *time.Timer
ttl time.Duration
minBurst int
burstWindow time.Duration
}

// NewSimpleRateLimiter returns a new rate limiter backed by the golang rate
Expand All @@ -73,6 +74,22 @@ func NewRateLimiter(maxDispatchPerSecond *float64, ttl time.Duration, minBurst i
ttl: ttl,
ttlTimer: time.NewTimer(ttl),
minBurst: minBurst,
burstWindow: time.Second,
}
rl.storeLimiter(maxDispatchPerSecond)
return rl
}

// NewRateLimiterWithBurstWindow returns a ratelimiter that updates itself periodically, and allows bursts to consume up to burstWindow worth of tokens.
// This is primarily intended to be used by user-facing limits, to average across a larger window of time, and allow lower RPS limits to send small bursts
// in large clusters without unfairly limiting them to only a few requests per frontend host.
func NewRateLimiterWithBurstWindow(maxDispatchPerSecond *float64, ttl time.Duration, burstWindow time.Duration) *RateLimiter {
rl := &RateLimiter{
maxDispatchPerSecond: maxDispatchPerSecond,
ttl: ttl,
ttlTimer: time.NewTimer(ttl),
minBurst: 1,
burstWindow: burstWindow,
}
rl.storeLimiter(maxDispatchPerSecond)
return rl
Expand Down Expand Up @@ -119,6 +136,7 @@ func (rl *RateLimiter) Limit() rate.Limit {

func (rl *RateLimiter) storeLimiter(maxDispatchPerSecond *float64) {
burst := int(*maxDispatchPerSecond)
burst = int(float64(burst) * (float64(rl.burstWindow) / float64(time.Second))) // scale by burst window
// If throttling is zero, burst also has to be 0
if *maxDispatchPerSecond != 0 && burst <= rl.minBurst {
burst = rl.minBurst
Expand Down
6 changes: 4 additions & 2 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ func (s *Service) createGlobalQuotaCollections() (globalRatelimiterCollections,
// to safely shadow global ratelimits, we must make duplicate *quota.Collection collections
// so they do not share data when the global limiter decides to use its local fallback.
// these are then combined into the global/algorithm.Collection to handle all limiting calls
local, global := s.createBaseLimiters(), s.createBaseLimiters()
local := s.createBaseLimiters(time.Second) // historical value, matches disabled mode: burst == rps
global := s.createBaseLimiters(10 * time.Second) // new value: burst == 10*rps for fallback, dynamic for truly global

user, err := create("user", local.user, global.user, s.config.GlobalDomainUserRPS)
combinedErr = multierr.Combine(combinedErr, err)
Expand All @@ -261,13 +262,14 @@ func (s *Service) createGlobalQuotaCollections() (globalRatelimiterCollections,
async: async,
}, combinedErr
}
func (s *Service) createBaseLimiters() ratelimiterCollections {
func (s *Service) createBaseLimiters(burstWindow time.Duration) ratelimiterCollections {
create := func(shared, perInstance dynamicconfig.IntPropertyFnWithDomainFilter) *quotas.Collection {
return quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory(
service.Frontend,
shared,
perInstance,
s.GetMembershipResolver(),
burstWindow,
))
}
return ratelimiterCollections{
Expand Down
Loading