Skip to content

Commit

Permalink
refactor rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
jacky committed Aug 25, 2022
1 parent 24d05eb commit 254d0f3
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 30 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/armon/go-proxyproto v0.0.0-20200108142055-f0b8253b1507
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
github.com/jinzhu/gorm v1.9.16
github.com/juju/ratelimit v1.0.2
github.com/mattn/go-sqlite3 v1.14.4
github.com/pkg/errors v0.9.1
github.com/shopspring/decimal v1.2.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ github.com/jinzhu/now v1.0.1 h1:HjfetcXq097iXP0uoPCdnM4Efp5/9MsM0/M+XOTeR3M=
github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI=
github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
Expand Down
72 changes: 50 additions & 22 deletions internal/rate_limit.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,67 @@
package internal

import (
"container/list"
"sync"
"time"

"github.com/juju/ratelimit"
)

const (
OnceTakeCount = 1
DefaultFillInterval = time.Second
)
type token struct {
enableTime time.Time
}

type RateLimiter struct {
ratePerSecond int64
bucket *ratelimit.Bucket
type LimitBucket struct {
fillInterval time.Duration
queue *list.List
mutex sync.Mutex
}

func NewRateLimiter(ratePerSecond int64) *RateLimiter {
rateLimiter := &RateLimiter{
ratePerSecond: ratePerSecond,
func New(fillInterval time.Duration, limit uint64) *LimitBucket {
bucket := &LimitBucket{
fillInterval: fillInterval,
queue: list.New(),
}
if rateLimiter.RateLimitIsOpen() {
rateLimiter.bucket = ratelimit.NewBucketWithQuantum(DefaultFillInterval, ratePerSecond, ratePerSecond)
now := time.Now()
for i := 0; i < int(limit); i++ {
bucket.queue.PushBack(&token{
enableTime: now,
})
}
return rateLimiter
return bucket
}

func (l *RateLimiter) RateLimitIsOpen() bool {
return l.ratePerSecond > 0
func (bucket *LimitBucket) Wait() {
for {
if bucket.tryTakeToken() {
return
}
}
}

func (bucket *LimitBucket) WaitForTimeout(timeout time.Duration) {
begin := time.Now()
for {
if time.Now().After(begin.Add(timeout)) {
return
}
if bucket.tryTakeToken() {
return
}
}
}

func (l *RateLimiter) WaitRateLimit() {
//if set ratePerSecond zero, not control send rate
if !l.RateLimitIsOpen() {
return
func (bucket *LimitBucket) tryTakeToken() bool {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()
front := bucket.queue.Front()
fristToken := front.Value.(*token)
now := time.Now()
if now.After(fristToken.enableTime) {
bucket.queue.Remove(front)
bucket.queue.PushBack(&token{
enableTime: now.Add(bucket.fillInterval),
})
return true
}
l.bucket.Wait(OnceTakeCount)
return false
}
103 changes: 103 additions & 0 deletions internal/ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package internal

import (
"fmt"
"math/rand"
"testing"
"time"
)

func TestWaitNormal(t *testing.T) {
bucket := New(time.Second*1, 2)

for i := 0; i < 10; i++ {
bucket.Wait()
t.Log(time.Now(), "|", i)
}
}

func TestWaitForTimeout(t *testing.T) {
bucket := New(time.Second*3, 3)

for i := 0; i < 10; i++ {
bucket.WaitForTimeout(time.Second)
t.Log(time.Now(), "|", i)
}

for i := 0; i < 10; i++ {
bucket.WaitForTimeout(time.Second * 5)
t.Log(time.Now(), "|", i)
}
}

func TestWaitRandom(t *testing.T) {
bucket := New(time.Second*1, 3)
for i := 0; i < 10; i++ {
bucket.Wait()
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
t.Log(time.Now(), "|", i)
}
}

// func TestWaitConcurrenyOld(t *testing.T) {
// bucket := NewRateLimiter(8)

// for i := 0; i < 5; i++ {
// bucket.WaitRateLimit()
// t.Log("1:", time.Now(), "|", i)
// }
// time.Sleep(time.Millisecond * 900)
// go func() {
// for i := 0; i < 55; i++ {
// bucket.WaitRateLimit()
// t.Log("2:", time.Now(), "|", i)
// }
// }()

// time.Sleep(time.Second * 30)
// }

func TestWaitConcurreny1(t *testing.T) {
bucket := New(time.Second, 8)

for i := 0; i < 5; i++ {
bucket.Wait()
t.Log("1:", time.Now(), "|", i)
}
time.Sleep(time.Millisecond * 900)
go func() {
for i := 0; i < 55; i++ {
bucket.Wait()
t.Log("1:", time.Now(), "|", i)
}
}()

time.Sleep(time.Second * 30)
}

func write(messageOut chan string) {
for {
text := <-messageOut
fmt.Println(time.Now(), text)
time.Sleep(time.Millisecond * time.Duration(100))
}
}

func TestWaitConcurreny2(t *testing.T) {
bucket := New(time.Second*2, 5)
messageOut := make(chan string)
go write(messageOut)
go func() {
for i := 0; i < 20; i++ {
bucket.Wait()
messageOut <- fmt.Sprintf("No.1 Worker, task:%v", i)
}
}()
go func() {
for i := 0; i < 20; i++ {
bucket.Wait()
messageOut <- fmt.Sprintf("No.2 Worker, task:%v", i)
}
}()
time.Sleep(time.Second * 30)
}
3 changes: 2 additions & 1 deletion internal/session_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ type SessionSettings struct {
SocketConnectAddress []string

//rate limt
SendRateLimiter *RateLimiter
//SendRateLimiter *RateLimiter
LimitBucket *LimitBucket
}
4 changes: 3 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,9 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {

func (s *session) sendQueued() {
for _, msgBytes := range s.toSend {
s.SendRateLimiter.WaitRateLimit()
if s.LimitBucket != nil {
s.LimitBucket.Wait()
}
s.sendBytes(msgBytes)
}

Expand Down
2 changes: 1 addition & 1 deletion session_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ func (f sessionFactory) newSession(
if sendRatePerSecond, err = settings.IntSetting(config.SendRatePerSecond); err != nil {
return
}
s.LimitBucket = internal.New(time.Second, uint64(sendRatePerSecond))
}
s.SendRateLimiter = internal.NewRateLimiter(int64(sendRatePerSecond))

s.sessionEvent = make(chan internal.Event)
s.messageEvent = make(chan bool, 1)
Expand Down
2 changes: 0 additions & 2 deletions session_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ func (s *SessionFactorySuite) TestNewSessionBuildInitiators() {
s.Equal(10*time.Second, session.LogonTimeout)
s.Equal(2*time.Second, session.LogoutTimeout)
s.Equal("127.0.0.1:5000", session.SocketConnectAddress[0])
s.False(session.SendRateLimiter.RateLimitIsOpen())
}

func (s *SessionFactorySuite) TestNewSessionBuildInitiatorsValidHeartBtInt() {
Expand Down Expand Up @@ -592,5 +591,4 @@ func (s *SessionFactorySuite) TestSendRatePerSecond() {
s.Equal(10*time.Second, session.LogonTimeout)
s.Equal(2*time.Second, session.LogoutTimeout)
s.Equal("127.0.0.1:5000", session.SocketConnectAddress[0])
s.True(session.SendRateLimiter.RateLimitIsOpen())
}

0 comments on commit 254d0f3

Please sign in to comment.