From fd930b3c8205ef2ec657fe6b25f2c0e53f272e54 Mon Sep 17 00:00:00 2001 From: bohdanstorozhuk Date: Mon, 27 Jun 2022 21:16:06 +0100 Subject: [PATCH 1/4] Fix test approach for detecting issues --- go.mod | 1 - go.sum | 2 - ratelimit.go | 14 ++++-- ratelimit_test.go | 121 +++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 125 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index ce7cfc9..77c1eda 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module go.uber.org/ratelimit go 1.18 require ( - github.com/benbjohnson/clock v1.3.0 github.com/stretchr/testify v1.6.1 go.uber.org/atomic v1.7.0 ) diff --git a/go.sum b/go.sum index 471801e..8c610cb 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/ratelimit.go b/ratelimit.go index 7370526..7afce5b 100644 --- a/ratelimit.go +++ b/ratelimit.go @@ -22,8 +22,6 @@ package ratelimit // import "go.uber.org/ratelimit" import ( "time" - - "github.com/benbjohnson/clock" ) // Note: This file is inspired by: @@ -45,6 +43,16 @@ type Clock interface { Sleep(time.Duration) } +type internalClock struct{} + +func (i *internalClock) Now() time.Time { + return time.Now() +} + +func (i *internalClock) Sleep(duration time.Duration) { + time.Sleep(duration) +} + // config configures a limiter. type config struct { clock Clock @@ -60,7 +68,7 @@ func New(rate int, opts ...Option) Limiter { // buildConfig combines defaults with options. func buildConfig(opts []Option) config { c := config{ - clock: clock.New(), + clock: &internalClock{}, slack: 10, per: time.Second, } diff --git a/ratelimit_test.go b/ratelimit_test.go index 7b584b5..d118cfc 100644 --- a/ratelimit_test.go +++ b/ratelimit_test.go @@ -7,10 +7,105 @@ import ( "go.uber.org/atomic" - "github.com/benbjohnson/clock" "github.com/stretchr/testify/assert" ) +// testTime is a fake time used for testing. +type testTime struct { + mu sync.Mutex + cur time.Time // current fake time + timers []testTimer // fake timers +} + +// testTimer is a fake timer. +type testTimer struct { + when time.Time + ch chan<- time.Time +} + +// now returns the current fake time. +func (tt *testTime) now() time.Time { + tt.mu.Lock() + defer tt.mu.Unlock() + return tt.cur +} + +// newTimer creates a fake timer. It returns the channel, +// a function to stop the timer (which we don't care about), +// and a function to advance to the next timer. +func (tt *testTime) newTimer(dur time.Duration) (<-chan time.Time, func() bool, func()) { + tt.mu.Lock() + defer tt.mu.Unlock() + ch := make(chan time.Time, 1) + timer := testTimer{ + when: tt.cur.Add(dur), + ch: ch, + } + tt.timers = append(tt.timers, timer) + return ch, func() bool { return true }, tt.advanceToTimer +} + +// advance advances the fake time. +func (tt *testTime) advance(dur time.Duration) { + tt.mu.Lock() + defer tt.mu.Unlock() + tt.advanceUnlocked(dur) +} + +// advanceUnlock advances the fake time, assuming it is already locked. +func (tt *testTime) advanceUnlocked(dur time.Duration) { + tt.cur = tt.cur.Add(dur) + + i := 0 + j := 0 + for i < len(tt.timers) { + if tt.timers[i].when.After(tt.cur) { + if i != j { + tt.timers[j] = tt.timers[i] + } + i++ + j++ + } else { + tt.timers[i].ch <- tt.cur + time.Sleep(1 * time.Millisecond) + i++ + } + } + tt.timers = tt.timers[0:j] +} + +// advanceToTimer advances the time to the next timer. +func (tt *testTime) advanceToTimer() { + tt.mu.Lock() + defer tt.mu.Unlock() + if len(tt.timers) == 0 { + panic("no timer") + } + when := tt.timers[0].when + for _, timer := range tt.timers[1:] { + if timer.when.Before(when) { + when = timer.when + } + } + tt.advanceUnlocked(when.Sub(tt.cur)) +} + +func (tt *testTime) Now() time.Time { + return tt.now() +} + +func (tt *testTime) Sleep(duration time.Duration) { + timer, _, _ := tt.newTimer(duration) + <-timer +} + +// makeTestTime hooks the testTimer into the package. +func makeTestTime() *testTime { + return &testTime{ + cur: time.Now(), + } +} + type testRunner interface { // createLimiter builds a limiter with given options. createLimiter(int, ...Option) Limiter @@ -27,7 +122,7 @@ type testRunner interface { type runnerImpl struct { t *testing.T - clock *clock.Mock + clock *testTime constructor func(int, ...Option) Limiter count atomic.Int32 // maxDuration is the time we need to move into the future for a test. @@ -60,7 +155,7 @@ func runTest(t *testing.T, fn func(testRunner)) { t.Run(tt.name, func(t *testing.T) { r := runnerImpl{ t: t, - clock: clock.NewMock(), + clock: makeTestTime(), constructor: tt.constructor, doneCh: make(chan struct{}), } @@ -68,7 +163,18 @@ func runTest(t *testing.T, fn func(testRunner)) { defer r.wg.Wait() fn(&r) - r.clock.Add(r.maxDuration) + go func() { + move := func() { + defer func() { + _ = recover() + time.Sleep(10 * time.Millisecond) + }() + r.clock.advanceToTimer() + } + for { + move() + } + }() }) } } @@ -86,6 +192,7 @@ func (r *runnerImpl) startTaking(rls ...Limiter) { for _, rl := range rls { rl.Take() } + r.clock.advance(time.Nanosecond) r.count.Inc() select { case <-r.doneCh: @@ -110,14 +217,14 @@ func (r *runnerImpl) afterFunc(d time.Duration, fn func()) { if d > r.maxDuration { r.maxDuration = d } - + timer, _, _ := r.clock.newTimer(d) r.goWait(func() { select { case <-r.doneCh: return - case <-r.clock.After(d): + case <-timer: + fn() } - fn() }) } From 2dd0d9b3bfa334249baed2c81841c6ae8a1ecd47 Mon Sep 17 00:00:00 2001 From: bohdanstorozhuk Date: Tue, 28 Jun 2022 01:09:26 +0100 Subject: [PATCH 2/4] Move time mocking to a separate file --- Makefile | 2 +- ratelimit_mock_time_test.go | 111 ++++++++++++++++++++++++++++++++++++ ratelimit_test.go | 92 ++---------------------------- 3 files changed, 117 insertions(+), 88 deletions(-) create mode 100644 ratelimit_mock_time_test.go diff --git a/Makefile b/Makefile index 7eb6d24..7ff707d 100644 --- a/Makefile +++ b/Makefile @@ -53,4 +53,4 @@ staticcheck: bin/staticcheck .PHONY: test test: - go test -race ./... + go test -v -race ./... diff --git a/ratelimit_mock_time_test.go b/ratelimit_mock_time_test.go new file mode 100644 index 0000000..5383cba --- /dev/null +++ b/ratelimit_mock_time_test.go @@ -0,0 +1,111 @@ +package ratelimit + +/** +This fake time implementation is a modification of time mocking +the mechanism used by Ian Lance Taylor in https://github.com/golang/time project +https://github.com/golang/time/commit/579cf78fd858857c0d766e0d63eb2b0ccf29f436 + +Modified parts: + - advanceUnlocked method uses in-place filtering of timers + instead of a full copy on every remove. + Since we have 100s of timers in our tests current linear + the complexity of this operation is OK + If going to have 1000s in the future, we can use heap to store timers. + - advanceUnlocked method yields the processor, after every timer triggering, + allowing other goroutines to run +*/ + +import ( + "runtime" + "sync" + "time" +) + +// testTime is a fake time used for testing. +type testTime struct { + mu sync.Mutex + cur time.Time // current fake time + timers []testTimer // fake timers +} + +// makeTestTime hooks the testTimer into the package. +func makeTestTime() *testTime { + return &testTime{ + cur: time.Now(), + } +} + +// testTimer is a fake timer. +type testTimer struct { + when time.Time + ch chan<- time.Time +} + +// now returns the current fake time. +func (tt *testTime) now() time.Time { + tt.mu.Lock() + defer tt.mu.Unlock() + return tt.cur +} + +// newTimer creates a fake timer. It returns the channel, +// a function to stop the timer (which we don't care about), +// and a function to advance to the next timer. +func (tt *testTime) newTimer(dur time.Duration) (<-chan time.Time, func() bool, func()) { + tt.mu.Lock() + defer tt.mu.Unlock() + ch := make(chan time.Time, 1) + timer := testTimer{ + when: tt.cur.Add(dur), + ch: ch, + } + tt.timers = append(tt.timers, timer) + return ch, func() bool { return true }, tt.advanceToTimer +} + +// advance advances the fake time. +func (tt *testTime) advance(dur time.Duration) { + tt.mu.Lock() + defer tt.mu.Unlock() + tt.advanceUnlocked(dur) +} + +// advanceUnlock advances the fake time, assuming it is already locked. +func (tt *testTime) advanceUnlocked(dur time.Duration) { + tt.cur = tt.cur.Add(dur) + + i := 0 + j := 0 + for i < len(tt.timers) { + if tt.timers[i].when.After(tt.cur) { + if i != j { + tt.timers[j] = tt.timers[i] + } + i++ + j++ + } else { + tt.timers[i].ch <- tt.cur + for i := 0; i < 16; i++ { + runtime.Gosched() + } + i++ + } + } + tt.timers = tt.timers[0:j] +} + +// advanceToTimer advances the time to the next timer. +func (tt *testTime) advanceToTimer() { + tt.mu.Lock() + defer tt.mu.Unlock() + if len(tt.timers) == 0 { + panic("no timer") + } + when := tt.timers[0].when + for _, timer := range tt.timers[1:] { + if timer.when.Before(when) { + when = timer.when + } + } + tt.advanceUnlocked(when.Sub(tt.cur)) +} diff --git a/ratelimit_test.go b/ratelimit_test.go index d118cfc..8a90984 100644 --- a/ratelimit_test.go +++ b/ratelimit_test.go @@ -10,86 +10,6 @@ import ( "github.com/stretchr/testify/assert" ) -// testTime is a fake time used for testing. -type testTime struct { - mu sync.Mutex - cur time.Time // current fake time - timers []testTimer // fake timers -} - -// testTimer is a fake timer. -type testTimer struct { - when time.Time - ch chan<- time.Time -} - -// now returns the current fake time. -func (tt *testTime) now() time.Time { - tt.mu.Lock() - defer tt.mu.Unlock() - return tt.cur -} - -// newTimer creates a fake timer. It returns the channel, -// a function to stop the timer (which we don't care about), -// and a function to advance to the next timer. -func (tt *testTime) newTimer(dur time.Duration) (<-chan time.Time, func() bool, func()) { - tt.mu.Lock() - defer tt.mu.Unlock() - ch := make(chan time.Time, 1) - timer := testTimer{ - when: tt.cur.Add(dur), - ch: ch, - } - tt.timers = append(tt.timers, timer) - return ch, func() bool { return true }, tt.advanceToTimer -} - -// advance advances the fake time. -func (tt *testTime) advance(dur time.Duration) { - tt.mu.Lock() - defer tt.mu.Unlock() - tt.advanceUnlocked(dur) -} - -// advanceUnlock advances the fake time, assuming it is already locked. -func (tt *testTime) advanceUnlocked(dur time.Duration) { - tt.cur = tt.cur.Add(dur) - - i := 0 - j := 0 - for i < len(tt.timers) { - if tt.timers[i].when.After(tt.cur) { - if i != j { - tt.timers[j] = tt.timers[i] - } - i++ - j++ - } else { - tt.timers[i].ch <- tt.cur - time.Sleep(1 * time.Millisecond) - i++ - } - } - tt.timers = tt.timers[0:j] -} - -// advanceToTimer advances the time to the next timer. -func (tt *testTime) advanceToTimer() { - tt.mu.Lock() - defer tt.mu.Unlock() - if len(tt.timers) == 0 { - panic("no timer") - } - when := tt.timers[0].when - for _, timer := range tt.timers[1:] { - if timer.when.Before(when) { - when = timer.when - } - } - tt.advanceUnlocked(when.Sub(tt.cur)) -} - func (tt *testTime) Now() time.Time { return tt.now() } @@ -99,13 +19,6 @@ func (tt *testTime) Sleep(duration time.Duration) { <-timer } -// makeTestTime hooks the testTimer into the package. -func makeTestTime() *testTime { - return &testTime{ - cur: time.Now(), - } -} - type testRunner interface { // createLimiter builds a limiter with given options. createLimiter(int, ...Option) Limiter @@ -240,6 +153,7 @@ func (r *runnerImpl) goWait(fn func()) { } func TestUnlimited(t *testing.T) { + t.Parallel() now := time.Now() rl := NewUnlimited() for i := 0; i < 1000; i++ { @@ -249,6 +163,7 @@ func TestUnlimited(t *testing.T) { } func TestRateLimiter(t *testing.T) { + t.Parallel() runTest(t, func(r testRunner) { rl := r.createLimiter(100, WithoutSlack) @@ -265,6 +180,7 @@ func TestRateLimiter(t *testing.T) { } func TestDelayedRateLimiter(t *testing.T) { + t.Parallel() runTest(t, func(r testRunner) { slow := r.createLimiter(10, WithoutSlack) fast := r.createLimiter(100, WithoutSlack) @@ -283,6 +199,7 @@ func TestDelayedRateLimiter(t *testing.T) { } func TestPer(t *testing.T) { + t.Parallel() runTest(t, func(r testRunner) { rl := r.createLimiter(7, WithoutSlack, Per(time.Minute)) @@ -296,6 +213,7 @@ func TestPer(t *testing.T) { } func TestSlack(t *testing.T) { + t.Parallel() // To simulate slack, we combine two limiters. // - First, we start a single goroutine with both of them, // during this time the slow limiter will dominate, From 4d086956e39d501ef8f01f1d638d131b0b0beb2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82?= Date: Mon, 27 Jun 2022 15:18:15 -0700 Subject: [PATCH 3/4] Only benchmark on the latest Go version, take 2 I thought #88 would do this, but apparently it didn't. Attempting to use `go.mod` location since that means we need to update one less place on Go version release. Skipping PR process since (0) I don't know how to test this without landing (1) this affects only builds (2) the `main` looks like it's broken, which is sad --- .github/workflows/benchmark.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/benchmark.yaml b/.github/workflows/benchmark.yaml index 42dc477..0a3d5c0 100644 --- a/.github/workflows/benchmark.yaml +++ b/.github/workflows/benchmark.yaml @@ -13,6 +13,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 with: + go-version-file: 'go.mod' check-latest: true cache: true - name: Benchmark From 0e6d083ad5a95027a5d8114b67052b81a55d93f4 Mon Sep 17 00:00:00 2001 From: Bohdan Storozhuk Date: Sat, 2 Jul 2022 06:59:56 +0100 Subject: [PATCH 4/4] Restore int64 based atomic rate limiter (#94) This limiter was introduced and merged in the following PR #85 Later @twelsh-aw found an issue with this implementation #90 So @rabbbit reverted this change in #91 Our tests did not detect this issue, so we have a separate PR #93 that enhances our tests approach to detect potential errors better. With this PR, we want to restore the int64-based atomic rate limiter implementation as a non-default rate limiter and then check that #93 will detect the bug. Right after it, we'll open a subsequent PR to fix this bug. --- limiter_atomic_int64.go | 90 +++++++++++++++++++++++++++++++++++++++++ ratelimit_bench_test.go | 9 +++-- ratelimit_test.go | 6 +++ tools/go.mod | 1 + 4 files changed, 103 insertions(+), 3 deletions(-) create mode 100644 limiter_atomic_int64.go diff --git a/limiter_atomic_int64.go b/limiter_atomic_int64.go new file mode 100644 index 0000000..6588cd0 --- /dev/null +++ b/limiter_atomic_int64.go @@ -0,0 +1,90 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ratelimit // import "go.uber.org/ratelimit" + +import ( + "time" + + "sync/atomic" +) + +type atomicInt64Limiter struct { + //lint:ignore U1000 Padding is unused but it is crucial to maintain performance + // of this rate limiter in case of collocation with other frequently accessed memory. + prepadding [64]byte // cache line size = 64; created to avoid false sharing. + state int64 // unix nanoseconds of the next permissions issue. + //lint:ignore U1000 like prepadding. + postpadding [56]byte // cache line size - state size = 64 - 8; created to avoid false sharing. + + perRequest time.Duration + maxSlack time.Duration + clock Clock +} + +// newAtomicBased returns a new atomic based limiter. +func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter { + // TODO consider moving config building to the implementation + // independent code. + config := buildConfig(opts) + perRequest := config.per / time.Duration(rate) + l := &atomicInt64Limiter{ + perRequest: perRequest, + maxSlack: time.Duration(config.slack) * perRequest, + clock: config.clock, + } + atomic.StoreInt64(&l.state, 0) + return l +} + +// Take blocks to ensure that the time spent between multiple +// Take calls is on average time.Second/rate. +func (t *atomicInt64Limiter) Take() time.Time { + var ( + newTimeOfNextPermissionIssue int64 + now int64 + ) + for { + now = t.clock.Now().UnixNano() + timeOfNextPermissionIssue := atomic.LoadInt64(&t.state) + + switch { + case timeOfNextPermissionIssue == 0: + // If this is our first request, then we allow it. + newTimeOfNextPermissionIssue = now + case now-timeOfNextPermissionIssue > int64(t.maxSlack): + // a lot of nanoseconds passed since the last Take call + // we will limit max accumulated time to maxSlack + newTimeOfNextPermissionIssue = now - int64(t.maxSlack) + default: + // calculate the time at which our permission was issued + newTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest) + } + + if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) { + break + } + } + nanosToSleepUntilOurPermissionIsIssued := newTimeOfNextPermissionIssue - now + if nanosToSleepUntilOurPermissionIsIssued > 0 { + t.clock.Sleep(time.Duration(nanosToSleepUntilOurPermissionIsIssued)) + } + return time.Unix(0, newTimeOfNextPermissionIssue) +} diff --git a/ratelimit_bench_test.go b/ratelimit_bench_test.go index 60b203b..a1125c0 100644 --- a/ratelimit_bench_test.go +++ b/ratelimit_bench_test.go @@ -14,8 +14,9 @@ func BenchmarkRateLimiter(b *testing.B) { for _, procs := range []int{1, 4, 8, 16} { runtime.GOMAXPROCS(procs) for name, limiter := range map[string]Limiter{ - "atomic": New(b.N * 10000000), - "mutex": newMutexBased(b.N * 10000000), + "atomic": newAtomicBased(b.N * 1000000000000), + "atomic_int64": newAtomicInt64Based(b.N * 1000000000000), + "mutex": newMutexBased(b.N * 1000000000000), } { for ng := 1; ng < 16; ng++ { runner(b, name, procs, ng, limiter, count) @@ -47,7 +48,9 @@ func BenchmarkRateLimiter(b *testing.B) { } func runner(b *testing.B, name string, procs int, ng int, limiter Limiter, count *atomic.Int64) bool { - return b.Run(fmt.Sprintf("type:%s-procs:%d-goroutines:%d", name, procs, ng), func(b *testing.B) { + return b.Run(fmt.Sprintf("type:%s;max_procs:%d;goroutines:%d", name, procs, ng), func(b *testing.B) { + b.ReportAllocs() + var wg sync.WaitGroup trigger := atomic.NewBool(true) n := b.N diff --git a/ratelimit_test.go b/ratelimit_test.go index 8a90984..30abc43 100644 --- a/ratelimit_test.go +++ b/ratelimit_test.go @@ -62,6 +62,12 @@ func runTest(t *testing.T, fn func(testRunner)) { return newAtomicBased(rate, opts...) }, }, + { + name: "atomic_int64", + constructor: func(rate int, opts ...Option) Limiter { + return newAtomicInt64Based(rate, opts...) + }, + }, } for _, tt := range impls { diff --git a/tools/go.mod b/tools/go.mod index ba45a9b..c15076e 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -11,6 +11,7 @@ require ( require ( github.com/BurntSushi/toml v1.0.0 // indirect + github.com/storozhukBM/benchart v1.0.0 golang.org/x/exp/typeparams v0.0.0-20220328175248-053ad81199eb // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f // indirect