-
Notifications
You must be signed in to change notification settings - Fork 0
/
ratelimit.go
181 lines (148 loc) · 3.72 KB
/
ratelimit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package ratelimit
import (
"errors"
"fmt"
"sync/atomic"
"time"
)
type Opts struct {
BurstRate int64 // max concurrent commands
CMDLimitPerTime uint64 // max commands per {time}
Commands <-chan Command
ReturnChan chan<- Command
// per sec/min/...
RateLimitTime *time.Duration
// how often check for new tokens
NewTokenIssueFreq *time.Duration
}
type Command interface {
Do()
Cancel()
}
type rateLimit struct {
burstRate int64
cmdsLimitPerTime uint64
rateLimitTime time.Duration
newTokenIssueFreq uint64
tokenPerTime uint64
commands <-chan Command
returnChanBuf chan Command
returnChanReal chan<- Command
}
type state struct {
tokens uint64
leftover uint64
lastUpd time.Time
}
// Rate limit implementation of leaky token algorithm
// By default limits per minute and "checks" every ms for new tokens
//
// NB: LEAKS IF NOT CLOSE Commands chan
func Run(o *Opts) error {
name := errors.New("rate limiter")
switch {
case o == nil:
return fmt.Errorf("%s: Opts is nil", name)
case o.Commands == nil:
return fmt.Errorf("%s: Commands is nil chan", name)
case o.BurstRate < 1:
return fmt.Errorf("%s: BurstRate must be > 0", name)
case o.CMDLimitPerTime < 1:
return fmt.Errorf("%s: CMDLimitPerTime must be > 0", name)
case o.NewTokenIssueFreq != nil && *o.NewTokenIssueFreq < 1:
return fmt.Errorf("%s: NewTokenIssueFreq must be > 0", name)
case o.RateLimitTime != nil && *o.RateLimitTime < 1:
return fmt.Errorf("%s: RateLimitTime must be > 0", name)
case o.RateLimitTime != nil && o.NewTokenIssueFreq != nil &&
*o.RateLimitTime < *o.NewTokenIssueFreq:
return fmt.Errorf("%s: NewTokenIssueFreq must be < RateLimitTime", name)
}
rt := initRates(*o)
go rt.run()
return nil
}
func initRates(o Opts) *rateLimit {
rt := &rateLimit{
burstRate: o.BurstRate,
cmdsLimitPerTime: o.CMDLimitPerTime,
commands: o.Commands,
returnChanReal: o.ReturnChan,
}
if o.ReturnChan != nil {
rt.returnChanBuf = make(chan Command, 1024)
go returning(rt.returnChanBuf, rt.returnChanReal)
}
rt.rateLimitTime = time.Minute
if o.RateLimitTime != nil {
rt.rateLimitTime = *o.RateLimitTime
}
rt.newTokenIssueFreq = uint64(time.Millisecond)
if o.NewTokenIssueFreq != nil {
rt.newTokenIssueFreq = uint64(*o.NewTokenIssueFreq)
}
rt.tokenPerTime = uint64(rt.rateLimitTime) / rt.cmdsLimitPerTime
return rt
}
func (rl *rateLimit) run() {
var nGOing int64
state := state{
tokens: rl.cmdsLimitPerTime,
lastUpd: time.Now(),
leftover: 0,
}
for cmd := range rl.commands {
if atomic.LoadInt64(&nGOing) >= rl.burstRate {
rl.returnCase(cmd)
continue
}
state = rl.updState(state)
if state.tokens > 0 {
state.tokens--
atomic.AddInt64(&nGOing, 1)
go func(f Command) {
f.Do()
atomic.AddInt64(&nGOing, -1)
}(cmd)
} else {
rl.returnCase(cmd)
}
} //end for
if rl.returnChanBuf != nil {
close(rl.returnChanBuf)
}
}
func (rl *rateLimit) updState(s state) state {
if s.tokens == rl.cmdsLimitPerTime {
return s
}
now := time.Now()
elapsed := uint64(now.Sub(s.lastUpd)) + s.leftover
// count elapsed time with fraction of newTokenIssueFreq,
// ie do we must upd tokens now or not
if (elapsed / rl.newTokenIssueFreq) < 1 {
return s
}
newTokens := elapsed / rl.tokenPerTime
s.leftover = elapsed - (newTokens * rl.tokenPerTime)
s.tokens += newTokens
if s.tokens >= rl.cmdsLimitPerTime {
s.tokens = rl.cmdsLimitPerTime
s.leftover = 0
}
s.lastUpd = now
return s
}
func (rl *rateLimit) returnCase(cmd Command) {
switch {
case rl.returnChanReal != nil:
rl.returnChanBuf <- cmd
default:
cmd.Cancel()
}
}
func returning(buf <-chan Command, real chan<- Command) {
for cmd := range buf {
real <- cmd
}
close(real)
}