Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix test approach for detecting issues #93

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ staticcheck: bin/staticcheck

.PHONY: test
test:
go test -race ./...
go test -v -race ./...
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module go.uber.org/ratelimit
go 1.18

require (
github.com/benbjohnson/clock v1.3.0
github.com/stretchr/testify v1.6.1
go.uber.org/atomic v1.7.0
)
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
14 changes: 11 additions & 3 deletions ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ package ratelimit // import "go.uber.org/ratelimit"

import (
"time"

"github.com/benbjohnson/clock"
)

// Note: This file is inspired by:
Expand All @@ -45,6 +43,16 @@ type Clock interface {
Sleep(time.Duration)
}

type internalClock struct{}

func (i *internalClock) Now() time.Time {
return time.Now()
}

func (i *internalClock) Sleep(duration time.Duration) {
time.Sleep(duration)
}

// config configures a limiter.
type config struct {
clock Clock
Expand All @@ -60,7 +68,7 @@ func New(rate int, opts ...Option) Limiter {
// buildConfig combines defaults with options.
func buildConfig(opts []Option) config {
c := config{
clock: clock.New(),
clock: &internalClock{},
slack: 10,
per: time.Second,
}
Expand Down
111 changes: 111 additions & 0 deletions ratelimit_mock_time_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package ratelimit

/**
This fake time implementation is a modification of time mocking
the mechanism used by Ian Lance Taylor in https://github.com/golang/time project
https://github.com/golang/time/commit/579cf78fd858857c0d766e0d63eb2b0ccf29f436

Modified parts:
- advanceUnlocked method uses in-place filtering of timers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the goal of in-place filtering just performance of the fake-clock?

I don't think this feels very important (?) - I'd rather have simpler code that's slightly slower?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, this approach looks simpler as well as more performant, but check the original and give me your opinion, if you think we should keep the original, I don't mind.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just that the whole i, j looks like we would need tests for tests (off by one errors).

I think I would rather pull the other version as is, and then push a separate PR to fix the copying with clear changes. If we worried about the performance though, wouldn't we want to sort once on insert, rather than advance?

But:

  • see my other comment below
  • I understand if fixing the time is not your primary focus for now :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I can go back to the prev version.

instead of a full copy on every remove.
Since we have 100s of timers in our tests current linear
the complexity of this operation is OK
If going to have 1000s in the future, we can use heap to store timers.
- advanceUnlocked method yields the processor, after every timer triggering,
allowing other goroutines to run
*/

import (
"runtime"
"sync"
"time"
)

// testTime is a fake time used for testing.
type testTime struct {
mu sync.Mutex
cur time.Time // current fake time
timers []testTimer // fake timers
}

// makeTestTime hooks the testTimer into the package.
func makeTestTime() *testTime {
return &testTime{
cur: time.Now(),
}
}

// testTimer is a fake timer.
type testTimer struct {
when time.Time
ch chan<- time.Time
}

// now returns the current fake time.
func (tt *testTime) now() time.Time {
tt.mu.Lock()
defer tt.mu.Unlock()
return tt.cur
}

// newTimer creates a fake timer. It returns the channel,
// a function to stop the timer (which we don't care about),
// and a function to advance to the next timer.
func (tt *testTime) newTimer(dur time.Duration) (<-chan time.Time, func() bool, func()) {
tt.mu.Lock()
defer tt.mu.Unlock()
ch := make(chan time.Time, 1)
timer := testTimer{
when: tt.cur.Add(dur),
ch: ch,
}
tt.timers = append(tt.timers, timer)
return ch, func() bool { return true }, tt.advanceToTimer
}

// advance advances the fake time.
func (tt *testTime) advance(dur time.Duration) {
tt.mu.Lock()
defer tt.mu.Unlock()
tt.advanceUnlocked(dur)
}

// advanceUnlock advances the fake time, assuming it is already locked.
func (tt *testTime) advanceUnlocked(dur time.Duration) {
tt.cur = tt.cur.Add(dur)

i := 0
j := 0
for i < len(tt.timers) {
if tt.timers[i].when.After(tt.cur) {
if i != j {
tt.timers[j] = tt.timers[i]
}
i++
j++
} else {
tt.timers[i].ch <- tt.cur
for i := 0; i < 16; i++ {
runtime.Gosched()
}
rabbbit marked this conversation as resolved.
Show resolved Hide resolved
i++
}
}
tt.timers = tt.timers[0:j]
}

// advanceToTimer advances the time to the next timer.
func (tt *testTime) advanceToTimer() {
tt.mu.Lock()
defer tt.mu.Unlock()
if len(tt.timers) == 0 {
panic("no timer")
}
when := tt.timers[0].when
for _, timer := range tt.timers[1:] {
if timer.when.Before(when) {
when = timer.when
}
}
tt.advanceUnlocked(when.Sub(tt.cur))
}
43 changes: 32 additions & 11 deletions ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ import (

"go.uber.org/atomic"

"github.com/benbjohnson/clock"
"github.com/stretchr/testify/assert"
)

func (tt *testTime) Now() time.Time {
rabbbit marked this conversation as resolved.
Show resolved Hide resolved
return tt.now()
}

func (tt *testTime) Sleep(duration time.Duration) {
timer, _, _ := tt.newTimer(duration)
<-timer
}

type testRunner interface {
// createLimiter builds a limiter with given options.
createLimiter(int, ...Option) Limiter
Expand All @@ -27,7 +35,7 @@ type testRunner interface {
type runnerImpl struct {
t *testing.T

clock *clock.Mock
clock *testTime
constructor func(int, ...Option) Limiter
count atomic.Int32
// maxDuration is the time we need to move into the future for a test.
Expand Down Expand Up @@ -64,21 +72,28 @@ func runTest(t *testing.T, fn func(testRunner)) {

for _, tt := range impls {
t.Run(tt.name, func(t *testing.T) {
// Set a non-default time.Time since some limiters (int64 in particular) use
// the default value as "non-initialized" state.
clockMock := clock.NewMock()
clockMock.Set(time.Now())
r := runnerImpl{
t: t,
clock: clockMock,
clock: makeTestTime(),
constructor: tt.constructor,
doneCh: make(chan struct{}),
}
defer close(r.doneCh)
defer r.wg.Wait()

fn(&r)
r.clock.Add(r.maxDuration)
go func() {
move := func() {
defer func() {
_ = recover()
time.Sleep(10 * time.Millisecond)
}()
r.clock.advanceToTimer()
}
for {
move()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but I think this needs a much larger explanation - perhaps as comments in code?

Am I reading this right that this is an infinite loop, trying to move to infinitely move to the next timer ... by recovering from a panic?

Irrespective of goroutine leaks, using panics for signaling - do you think the maxDuration is not useful here? It seems we should be able to just pass the duration into advance, theoretically?

Copy link
Contributor Author

@storozhukBM storozhukBM Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Irrespective of goroutine leaks

I think 1 goroutine leak per test case is not a big deal, but if you want, I can add some exit logic here.

using panics for signaling

Code from Ian just panics when you try to run the next timer, and there is nothing to run. We can potentially change that behavior to not handle panics, but I wanted to limit changes in that code.

Sorry, but I think this needs a much larger explanation - perhaps as comments in code?

I'll write some description here first and when we clarify all questions about it, I'll move it into code comments:

testTime internally has a queue of all timers currently registered within the clock. In our case, there will be timers for goroutines that try to consume permissions and timers for goroutines that should run test asserts.
When we call testTime.advanceToTimer() it basically shifts time to the next registered timer, runs it, and also runs runtime.Gosched() multiple times to give a chance for other goroutines to run and react to what happened in the current timer. Usually, when we run runtime.Gosched() other goroutines will observe the new state, make their moves, and potentially create new timers inside this clock to wait for some
other state change.

If testTime.advanceToTimer() panics, it means that there are no timers to run, and it can happen if the test is already finished or it didn't manage to set up the full test case, so we just sleep a bit to allow and try again to run timers.

Do you think the maxDuration is not useful here?

No, we can use that if we want to get out of the currently infinite loop, we can check on every iteration if testTime is already passed maxDuration.

It seems we should be able to just pass the duration into advance, theoretically?

Yes, but since we advance time in different goroutines in this approach, it is necessary for us to do it one step at a time and give other goroutines the opportunity to make their steps as well. There also can be situations where we start to advance time in this goroutine before test goroutines spawned all permission consumers and asserting goroutines, so we need to just wait for everything to setup, that is what we do when panic happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So perhaps naively, but do we need both

  • advanceToTimer
  • advance(Unlocked?)

Our test runner knows exactly how much time needs to move (maxDuration). We can just pass it into advance. advanceUnlocked already does all the runtime.Gosched needed.

And soit seem to me that advance should theoretically be enough?
If the problem is the initial goroutine setup, should we just add the 16*runtime.Gosched at the beginning of advance to let the scheduler kick in?
This would also help with r.clock.advance(time.Nanosecond) you have below.

I understand this kinda modifies the code we copy-paste, but I'd feel much more comfortable with we decreased the API by 50% here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rabbbit

Right now, it is hard for me to make changes because tests do not pass until we merge #95

I agree with the comments and changes that you propose right now, but they don't change the approach in general.
I propose to merge #95, and then I can apply your suggested changes, or we can merge this PR as it is right now, and I'll make these improvements in subsequent PRs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged #95. I might still want to re-review it later on, it feels like some simplification is possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update this branch with new version and this should potentially fix all tests, event though I'm not sure how your new test for initial calls will work 🤞

}()
})
}
}
Expand All @@ -96,6 +111,7 @@ func (r *runnerImpl) startTaking(rls ...Limiter) {
for _, rl := range rls {
rl.Take()
}
r.clock.advance(time.Nanosecond)
r.count.Inc()
select {
case <-r.doneCh:
Expand All @@ -120,14 +136,14 @@ func (r *runnerImpl) afterFunc(d time.Duration, fn func()) {
if d > r.maxDuration {
r.maxDuration = d
}

timer, _, _ := r.clock.newTimer(d)
r.goWait(func() {
select {
case <-r.doneCh:
return
case <-r.clock.After(d):
case <-timer:
fn()
}
fn()
})
}

Expand All @@ -143,6 +159,7 @@ func (r *runnerImpl) goWait(fn func()) {
}

func TestUnlimited(t *testing.T) {
t.Parallel()
now := time.Now()
rl := NewUnlimited()
for i := 0; i < 1000; i++ {
Expand All @@ -152,6 +169,7 @@ func TestUnlimited(t *testing.T) {
}

func TestRateLimiter(t *testing.T) {
t.Parallel()
rabbbit marked this conversation as resolved.
Show resolved Hide resolved
runTest(t, func(r testRunner) {
rl := r.createLimiter(100, WithoutSlack)

Expand All @@ -168,6 +186,7 @@ func TestRateLimiter(t *testing.T) {
}

func TestDelayedRateLimiter(t *testing.T) {
t.Parallel()
runTest(t, func(r testRunner) {
slow := r.createLimiter(10, WithoutSlack)
fast := r.createLimiter(100, WithoutSlack)
Expand All @@ -186,6 +205,7 @@ func TestDelayedRateLimiter(t *testing.T) {
}

func TestPer(t *testing.T) {
t.Parallel()
runTest(t, func(r testRunner) {
rl := r.createLimiter(7, WithoutSlack, Per(time.Minute))

Expand All @@ -199,6 +219,7 @@ func TestPer(t *testing.T) {
}

func TestSlack(t *testing.T) {
t.Parallel()
// To simulate slack, we combine two limiters.
// - First, we start a single goroutine with both of them,
// during this time the slow limiter will dominate,
Expand Down