From d5ee564a2f244460a9dad4fba9a3e91659939b1b Mon Sep 17 00:00:00 2001 From: Jan Broer Date: Wed, 13 Jan 2016 23:01:06 +0100 Subject: [PATCH] Throttle Route53 API calls Route53 API enforces a rate limit of max 5 req/s --- Godeps/Godeps.json | 4 + .../src/github.com/juju/ratelimit/LICENSE | 191 +++++++++ .../src/github.com/juju/ratelimit/README.md | 117 ++++++ .../github.com/juju/ratelimit/ratelimit.go | 245 +++++++++++ .../juju/ratelimit/ratelimit_test.go | 389 ++++++++++++++++++ external-dns.go | 9 - providers/route53.go | 8 +- 7 files changed, 953 insertions(+), 10 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE create mode 100644 Godeps/_workspace/src/github.com/juju/ratelimit/README.md create mode 100644 Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go create mode 100644 Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 54f9bc1..4ec623d 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -28,6 +28,10 @@ "ImportPath": "github.com/gorilla/websocket", "Rev": "5ed2f4547d7b9c036541b27a67ecf54e41756997" }, + { + "ImportPath": "github.com/juju/ratelimit", + "Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177" + }, { "ImportPath": "github.com/mitchellh/goamz/aws", "Rev": "caaaea8b30ee15616494ee68abd5d8ebbbef05cf" diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE b/Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE new file mode 100644 index 0000000..ade9307 --- /dev/null +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE @@ -0,0 +1,191 @@ +All files in this repository are licensed as follows. If you contribute +to this repository, it is assumed that you license your contribution +under the same license unless you state otherwise. + +All files Copyright (C) 2015 Canonical Ltd. unless otherwise specified in the file. + +This software is licensed under the LGPLv3, included below. + +As a special exception to the GNU Lesser General Public License version 3 +("LGPL3"), the copyright holders of this Library give you permission to +convey to a third party a Combined Work that links statically or dynamically +to this Library without providing any Minimal Corresponding Source or +Minimal Application Code as set out in 4d or providing the installation +information set out in section 4e, provided that you comply with the other +provisions of LGPL3 and provided that you meet, for the Application the +terms and conditions of the license(s) which apply to the Application. + +Except as stated in this special exception, the provisions of LGPL3 will +continue to comply in full to this Library. If you modify this Library, you +may apply this exception to your version of this Library, but you are not +obliged to do so. If you do not wish to do so, delete this exception +statement from your version. This exception does not (and cannot) modify any +license terms which apply to the Application, with which you must still +comply. + + + GNU LESSER GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + + This version of the GNU Lesser General Public License incorporates +the terms and conditions of version 3 of the GNU General Public +License, supplemented by the additional permissions listed below. + + 0. Additional Definitions. + + As used herein, "this License" refers to version 3 of the GNU Lesser +General Public License, and the "GNU GPL" refers to version 3 of the GNU +General Public License. + + "The Library" refers to a covered work governed by this License, +other than an Application or a Combined Work as defined below. + + An "Application" is any work that makes use of an interface provided +by the Library, but which is not otherwise based on the Library. +Defining a subclass of a class defined by the Library is deemed a mode +of using an interface provided by the Library. + + A "Combined Work" is a work produced by combining or linking an +Application with the Library. The particular version of the Library +with which the Combined Work was made is also called the "Linked +Version". + + The "Minimal Corresponding Source" for a Combined Work means the +Corresponding Source for the Combined Work, excluding any source code +for portions of the Combined Work that, considered in isolation, are +based on the Application, and not on the Linked Version. + + The "Corresponding Application Code" for a Combined Work means the +object code and/or source code for the Application, including any data +and utility programs needed for reproducing the Combined Work from the +Application, but excluding the System Libraries of the Combined Work. + + 1. Exception to Section 3 of the GNU GPL. + + You may convey a covered work under sections 3 and 4 of this License +without being bound by section 3 of the GNU GPL. + + 2. Conveying Modified Versions. + + If you modify a copy of the Library, and, in your modifications, a +facility refers to a function or data to be supplied by an Application +that uses the facility (other than as an argument passed when the +facility is invoked), then you may convey a copy of the modified +version: + + a) under this License, provided that you make a good faith effort to + ensure that, in the event an Application does not supply the + function or data, the facility still operates, and performs + whatever part of its purpose remains meaningful, or + + b) under the GNU GPL, with none of the additional permissions of + this License applicable to that copy. + + 3. Object Code Incorporating Material from Library Header Files. + + The object code form of an Application may incorporate material from +a header file that is part of the Library. You may convey such object +code under terms of your choice, provided that, if the incorporated +material is not limited to numerical parameters, data structure +layouts and accessors, or small macros, inline functions and templates +(ten or fewer lines in length), you do both of the following: + + a) Give prominent notice with each copy of the object code that the + Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the object code with a copy of the GNU GPL and this license + document. + + 4. Combined Works. + + You may convey a Combined Work under terms of your choice that, +taken together, effectively do not restrict modification of the +portions of the Library contained in the Combined Work and reverse +engineering for debugging such modifications, if you also do each of +the following: + + a) Give prominent notice with each copy of the Combined Work that + the Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the Combined Work with a copy of the GNU GPL and this license + document. + + c) For a Combined Work that displays copyright notices during + execution, include the copyright notice for the Library among + these notices, as well as a reference directing the user to the + copies of the GNU GPL and this license document. + + d) Do one of the following: + + 0) Convey the Minimal Corresponding Source under the terms of this + License, and the Corresponding Application Code in a form + suitable for, and under terms that permit, the user to + recombine or relink the Application with a modified version of + the Linked Version to produce a modified Combined Work, in the + manner specified by section 6 of the GNU GPL for conveying + Corresponding Source. + + 1) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (a) uses at run time + a copy of the Library already present on the user's computer + system, and (b) will operate properly with a modified version + of the Library that is interface-compatible with the Linked + Version. + + e) Provide Installation Information, but only if you would otherwise + be required to provide such information under section 6 of the + GNU GPL, and only to the extent that such information is + necessary to install and execute a modified version of the + Combined Work produced by recombining or relinking the + Application with a modified version of the Linked Version. (If + you use option 4d0, the Installation Information must accompany + the Minimal Corresponding Source and Corresponding Application + Code. If you use option 4d1, you must provide the Installation + Information in the manner specified by section 6 of the GNU GPL + for conveying Corresponding Source.) + + 5. Combined Libraries. + + You may place library facilities that are a work based on the +Library side by side in a single library together with other library +facilities that are not Applications and are not covered by this +License, and convey such a combined library under terms of your +choice, if you do both of the following: + + a) Accompany the combined library with a copy of the same work based + on the Library, uncombined with any other library facilities, + conveyed under the terms of this License. + + b) Give prominent notice with the combined library that part of it + is a work based on the Library, and explaining where to find the + accompanying uncombined form of the same work. + + 6. Revised Versions of the GNU Lesser General Public License. + + The Free Software Foundation may publish revised and/or new versions +of the GNU Lesser General Public License from time to time. Such new +versions will be similar in spirit to the present version, but may +differ in detail to address new problems or concerns. + + Each version is given a distinguishing version number. If the +Library as you received it specifies that a certain numbered version +of the GNU Lesser General Public License "or any later version" +applies to it, you have the option of following the terms and +conditions either of that published version or of any later version +published by the Free Software Foundation. If the Library as you +received it does not specify a version number of the GNU Lesser +General Public License, you may choose any version of the GNU Lesser +General Public License ever published by the Free Software Foundation. + + If the Library as you received it specifies that a proxy can decide +whether future versions of the GNU Lesser General Public License shall +apply, that proxy's public statement of acceptance of any version is +permanent authorization for you to choose that version for the +Library. diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/README.md b/Godeps/_workspace/src/github.com/juju/ratelimit/README.md new file mode 100644 index 0000000..a0fdfe2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/README.md @@ -0,0 +1,117 @@ +# ratelimit +-- + import "github.com/juju/ratelimit" + +The ratelimit package provides an efficient token bucket implementation. See +http://en.wikipedia.org/wiki/Token_bucket. + +## Usage + +#### func Reader + +```go +func Reader(r io.Reader, bucket *Bucket) io.Reader +``` +Reader returns a reader that is rate limited by the given token bucket. Each +token in the bucket represents one byte. + +#### func Writer + +```go +func Writer(w io.Writer, bucket *Bucket) io.Writer +``` +Writer returns a writer that is rate limited by the given token bucket. Each +token in the bucket represents one byte. + +#### type Bucket + +```go +type Bucket struct { +} +``` + +Bucket represents a token bucket that fills at a predetermined rate. Methods on +Bucket may be called concurrently. + +#### func NewBucket + +```go +func NewBucket(fillInterval time.Duration, capacity int64) *Bucket +``` +NewBucket returns a new token bucket that fills at the rate of one token every +fillInterval, up to the given maximum capacity. Both arguments must be positive. +The bucket is initially full. + +#### func NewBucketWithQuantum + +```go +func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket +``` +NewBucketWithQuantum is similar to NewBucket, but allows the specification of +the quantum size - quantum tokens are added every fillInterval. + +#### func NewBucketWithRate + +```go +func NewBucketWithRate(rate float64, capacity int64) *Bucket +``` +NewBucketWithRate returns a token bucket that fills the bucket at the rate of +rate tokens per second up to the given maximum capacity. Because of limited +clock resolution, at high rates, the actual rate may be up to 1% different from +the specified rate. + +#### func (*Bucket) Rate + +```go +func (tb *Bucket) Rate() float64 +``` +Rate returns the fill rate of the bucket, in tokens per second. + +#### func (*Bucket) Take + +```go +func (tb *Bucket) Take(count int64) time.Duration +``` +Take takes count tokens from the bucket without blocking. It returns the time +that the caller should wait until the tokens are actually available. + +Note that if the request is irrevocable - there is no way to return tokens to +the bucket once this method commits us to taking them. + +#### func (*Bucket) TakeAvailable + +```go +func (tb *Bucket) TakeAvailable(count int64) int64 +``` +TakeAvailable takes up to count immediately available tokens from the bucket. It +returns the number of tokens removed, or zero if there are no available tokens. +It does not block. + +#### func (*Bucket) TakeMaxDuration + +```go +func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) +``` +TakeMaxDuration is like Take, except that it will only take tokens from the +bucket if the wait time for the tokens is no greater than maxWait. + +If it would take longer than maxWait for the tokens to become available, it does +nothing and reports false, otherwise it returns the time that the caller should +wait until the tokens are actually available, and reports true. + +#### func (*Bucket) Wait + +```go +func (tb *Bucket) Wait(count int64) +``` +Wait takes count tokens from the bucket, waiting until they are available. + +#### func (*Bucket) WaitMaxDuration + +```go +func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool +``` +WaitMaxDuration is like Wait except that it will only take tokens from the +bucket if it needs to wait for no greater than maxWait. It reports whether any +tokens have been removed from the bucket If no tokens have been removed, it +returns immediately. diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go new file mode 100644 index 0000000..3ef32fb --- /dev/null +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go @@ -0,0 +1,245 @@ +// Copyright 2014 Canonical Ltd. +// Licensed under the LGPLv3 with static-linking exception. +// See LICENCE file for details. + +// The ratelimit package provides an efficient token bucket implementation +// that can be used to limit the rate of arbitrary things. +// See http://en.wikipedia.org/wiki/Token_bucket. +package ratelimit + +import ( + "math" + "strconv" + "sync" + "time" +) + +// Bucket represents a token bucket that fills at a predetermined rate. +// Methods on Bucket may be called concurrently. +type Bucket struct { + startTime time.Time + capacity int64 + quantum int64 + fillInterval time.Duration + + // The mutex guards the fields following it. + mu sync.Mutex + + // avail holds the number of available tokens + // in the bucket, as of availTick ticks from startTime. + // It will be negative when there are consumers + // waiting for tokens. + avail int64 + availTick int64 +} + +// NewBucket returns a new token bucket that fills at the +// rate of one token every fillInterval, up to the given +// maximum capacity. Both arguments must be +// positive. The bucket is initially full. +func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { + return NewBucketWithQuantum(fillInterval, capacity, 1) +} + +// rateMargin specifes the allowed variance of actual +// rate from specified rate. 1% seems reasonable. +const rateMargin = 0.01 + +// NewBucketWithRate returns a token bucket that fills the bucket +// at the rate of rate tokens per second up to the given +// maximum capacity. Because of limited clock resolution, +// at high rates, the actual rate may be up to 1% different from the +// specified rate. +func NewBucketWithRate(rate float64, capacity int64) *Bucket { + for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) { + fillInterval := time.Duration(1e9 * float64(quantum) / rate) + if fillInterval <= 0 { + continue + } + tb := NewBucketWithQuantum(fillInterval, capacity, quantum) + if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin { + return tb + } + } + panic("cannot find suitable quantum for " + strconv.FormatFloat(rate, 'g', -1, 64)) +} + +// nextQuantum returns the next quantum to try after q. +// We grow the quantum exponentially, but slowly, so we +// get a good fit in the lower numbers. +func nextQuantum(q int64) int64 { + q1 := q * 11 / 10 + if q1 == q { + q1++ + } + return q1 +} + +// NewBucketWithQuantum is similar to NewBucket, but allows +// the specification of the quantum size - quantum tokens +// are added every fillInterval. +func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket { + if fillInterval <= 0 { + panic("token bucket fill interval is not > 0") + } + if capacity <= 0 { + panic("token bucket capacity is not > 0") + } + if quantum <= 0 { + panic("token bucket quantum is not > 0") + } + return &Bucket{ + startTime: time.Now(), + capacity: capacity, + quantum: quantum, + avail: capacity, + fillInterval: fillInterval, + } +} + +// Wait takes count tokens from the bucket, waiting until they are +// available. +func (tb *Bucket) Wait(count int64) { + if d := tb.Take(count); d > 0 { + time.Sleep(d) + } +} + +// WaitMaxDuration is like Wait except that it will +// only take tokens from the bucket if it needs to wait +// for no greater than maxWait. It reports whether +// any tokens have been removed from the bucket +// If no tokens have been removed, it returns immediately. +func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool { + d, ok := tb.TakeMaxDuration(count, maxWait) + if d > 0 { + time.Sleep(d) + } + return ok +} + +const infinityDuration time.Duration = 0x7fffffffffffffff + +// Take takes count tokens from the bucket without blocking. It returns +// the time that the caller should wait until the tokens are actually +// available. +// +// Note that if the request is irrevocable - there is no way to return +// tokens to the bucket once this method commits us to taking them. +func (tb *Bucket) Take(count int64) time.Duration { + d, _ := tb.take(time.Now(), count, infinityDuration) + return d +} + +// TakeMaxDuration is like Take, except that +// it will only take tokens from the bucket if the wait +// time for the tokens is no greater than maxWait. +// +// If it would take longer than maxWait for the tokens +// to become available, it does nothing and reports false, +// otherwise it returns the time that the caller should +// wait until the tokens are actually available, and reports +// true. +func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) { + return tb.take(time.Now(), count, maxWait) +} + +// TakeAvailable takes up to count immediately available tokens from the +// bucket. It returns the number of tokens removed, or zero if there are +// no available tokens. It does not block. +func (tb *Bucket) TakeAvailable(count int64) int64 { + return tb.takeAvailable(time.Now(), count) +} + +// takeAvailable is the internal version of TakeAvailable - it takes the +// current time as an argument to enable easy testing. +func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { + if count <= 0 { + return 0 + } + tb.mu.Lock() + defer tb.mu.Unlock() + + tb.adjust(now) + if tb.avail <= 0 { + return 0 + } + if count > tb.avail { + count = tb.avail + } + tb.avail -= count + return count +} + +// Available returns the number of available tokens. It will be negative +// when there are consumers waiting for tokens. Note that if this +// returns greater than zero, it does not guarantee that calls that take +// tokens from the buffer will succeed, as the number of available +// tokens could have changed in the meantime. This method is intended +// primarily for metrics reporting and debugging. +func (tb *Bucket) Available() int64 { + return tb.available(time.Now()) +} + +// available is the internal version of available - it takes the current time as +// an argument to enable easy testing. +func (tb *Bucket) available(now time.Time) int64 { + tb.mu.Lock() + defer tb.mu.Unlock() + tb.adjust(now) + return tb.avail +} + +// Capacity returns the capacity that the bucket was created with. +func (tb *Bucket) Capacity() int64 { + return tb.capacity +} + +// Rate returns the fill rate of the bucket, in tokens per second. +func (tb *Bucket) Rate() float64 { + return 1e9 * float64(tb.quantum) / float64(tb.fillInterval) +} + +// take is the internal version of Take - it takes the current time as +// an argument to enable easy testing. +func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) { + if count <= 0 { + return 0, true + } + tb.mu.Lock() + defer tb.mu.Unlock() + + currentTick := tb.adjust(now) + avail := tb.avail - count + if avail >= 0 { + tb.avail = avail + return 0, true + } + // Round up the missing tokens to the nearest multiple + // of quantum - the tokens won't be available until + // that tick. + endTick := currentTick + (-avail+tb.quantum-1)/tb.quantum + endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval) + waitTime := endTime.Sub(now) + if waitTime > maxWait { + return 0, false + } + tb.avail = avail + return waitTime, true +} + +// adjust adjusts the current bucket capacity based on the current time. +// It returns the current tick. +func (tb *Bucket) adjust(now time.Time) (currentTick int64) { + currentTick = int64(now.Sub(tb.startTime) / tb.fillInterval) + + if tb.avail >= tb.capacity { + return + } + tb.avail += (currentTick - tb.availTick) * tb.quantum + if tb.avail > tb.capacity { + tb.avail = tb.capacity + } + tb.availTick = currentTick + return +} diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit_test.go b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit_test.go new file mode 100644 index 0000000..62d88de --- /dev/null +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit_test.go @@ -0,0 +1,389 @@ +// Copyright 2014 Canonical Ltd. +// Licensed under the LGPLv3 with static-linking exception. +// See LICENCE file for details. + +package ratelimit + +import ( + "math" + "testing" + "time" + + gc "gopkg.in/check.v1" +) + +func TestPackage(t *testing.T) { + gc.TestingT(t) +} + +type rateLimitSuite struct{} + +var _ = gc.Suite(rateLimitSuite{}) + +type takeReq struct { + time time.Duration + count int64 + expectWait time.Duration +} + +var takeTests = []struct { + about string + fillInterval time.Duration + capacity int64 + reqs []takeReq +}{{ + about: "serial requests", + fillInterval: 250 * time.Millisecond, + capacity: 10, + reqs: []takeReq{{ + time: 0, + count: 0, + expectWait: 0, + }, { + time: 0, + count: 10, + expectWait: 0, + }, { + time: 0, + count: 1, + expectWait: 250 * time.Millisecond, + }, { + time: 250 * time.Millisecond, + count: 1, + expectWait: 250 * time.Millisecond, + }}, +}, { + about: "concurrent requests", + fillInterval: 250 * time.Millisecond, + capacity: 10, + reqs: []takeReq{{ + time: 0, + count: 10, + expectWait: 0, + }, { + time: 0, + count: 2, + expectWait: 500 * time.Millisecond, + }, { + time: 0, + count: 2, + expectWait: 1000 * time.Millisecond, + }, { + time: 0, + count: 1, + expectWait: 1250 * time.Millisecond, + }}, +}, { + about: "more than capacity", + fillInterval: 1 * time.Millisecond, + capacity: 10, + reqs: []takeReq{{ + time: 0, + count: 10, + expectWait: 0, + }, { + time: 20 * time.Millisecond, + count: 15, + expectWait: 5 * time.Millisecond, + }}, +}, { + about: "sub-quantum time", + fillInterval: 10 * time.Millisecond, + capacity: 10, + reqs: []takeReq{{ + time: 0, + count: 10, + expectWait: 0, + }, { + time: 7 * time.Millisecond, + count: 1, + expectWait: 3 * time.Millisecond, + }, { + time: 8 * time.Millisecond, + count: 1, + expectWait: 12 * time.Millisecond, + }}, +}, { + about: "within capacity", + fillInterval: 10 * time.Millisecond, + capacity: 5, + reqs: []takeReq{{ + time: 0, + count: 5, + expectWait: 0, + }, { + time: 60 * time.Millisecond, + count: 5, + expectWait: 0, + }, { + time: 60 * time.Millisecond, + count: 1, + expectWait: 10 * time.Millisecond, + }, { + time: 80 * time.Millisecond, + count: 2, + expectWait: 10 * time.Millisecond, + }}, +}} + +var availTests = []struct { + about string + capacity int64 + fillInterval time.Duration + take int64 + sleep time.Duration + + expectCountAfterTake int64 + expectCountAfterSleep int64 +}{{ + about: "should fill tokens after interval", + capacity: 5, + fillInterval: time.Second, + take: 5, + sleep: time.Second, + expectCountAfterTake: 0, + expectCountAfterSleep: 1, +}, { + about: "should fill tokens plus existing count", + capacity: 2, + fillInterval: time.Second, + take: 1, + sleep: time.Second, + expectCountAfterTake: 1, + expectCountAfterSleep: 2, +}, { + about: "shouldn't fill before interval", + capacity: 2, + fillInterval: 2 * time.Second, + take: 1, + sleep: time.Second, + expectCountAfterTake: 1, + expectCountAfterSleep: 1, +}, { + about: "should fill only once after 1*interval before 2*interval", + capacity: 2, + fillInterval: 2 * time.Second, + take: 1, + sleep: 3 * time.Second, + expectCountAfterTake: 1, + expectCountAfterSleep: 2, +}} + +func (rateLimitSuite) TestTake(c *gc.C) { + for i, test := range takeTests { + tb := NewBucket(test.fillInterval, test.capacity) + for j, req := range test.reqs { + d, ok := tb.take(tb.startTime.Add(req.time), req.count, infinityDuration) + c.Assert(ok, gc.Equals, true) + if d != req.expectWait { + c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expectWait) + } + } + } +} + +func (rateLimitSuite) TestTakeMaxDuration(c *gc.C) { + for i, test := range takeTests { + tb := NewBucket(test.fillInterval, test.capacity) + for j, req := range test.reqs { + if req.expectWait > 0 { + d, ok := tb.take(tb.startTime.Add(req.time), req.count, req.expectWait-1) + c.Assert(ok, gc.Equals, false) + c.Assert(d, gc.Equals, time.Duration(0)) + } + d, ok := tb.take(tb.startTime.Add(req.time), req.count, req.expectWait) + c.Assert(ok, gc.Equals, true) + if d != req.expectWait { + c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expectWait) + } + } + } +} + +type takeAvailableReq struct { + time time.Duration + count int64 + expect int64 +} + +var takeAvailableTests = []struct { + about string + fillInterval time.Duration + capacity int64 + reqs []takeAvailableReq +}{{ + about: "serial requests", + fillInterval: 250 * time.Millisecond, + capacity: 10, + reqs: []takeAvailableReq{{ + time: 0, + count: 0, + expect: 0, + }, { + time: 0, + count: 10, + expect: 10, + }, { + time: 0, + count: 1, + expect: 0, + }, { + time: 250 * time.Millisecond, + count: 1, + expect: 1, + }}, +}, { + about: "concurrent requests", + fillInterval: 250 * time.Millisecond, + capacity: 10, + reqs: []takeAvailableReq{{ + time: 0, + count: 5, + expect: 5, + }, { + time: 0, + count: 2, + expect: 2, + }, { + time: 0, + count: 5, + expect: 3, + }, { + time: 0, + count: 1, + expect: 0, + }}, +}, { + about: "more than capacity", + fillInterval: 1 * time.Millisecond, + capacity: 10, + reqs: []takeAvailableReq{{ + time: 0, + count: 10, + expect: 10, + }, { + time: 20 * time.Millisecond, + count: 15, + expect: 10, + }}, +}, { + about: "within capacity", + fillInterval: 10 * time.Millisecond, + capacity: 5, + reqs: []takeAvailableReq{{ + time: 0, + count: 5, + expect: 5, + }, { + time: 60 * time.Millisecond, + count: 5, + expect: 5, + }, { + time: 70 * time.Millisecond, + count: 1, + expect: 1, + }}, +}} + +func (rateLimitSuite) TestTakeAvailable(c *gc.C) { + for i, test := range takeAvailableTests { + tb := NewBucket(test.fillInterval, test.capacity) + for j, req := range test.reqs { + d := tb.takeAvailable(tb.startTime.Add(req.time), req.count) + if d != req.expect { + c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expect) + } + } + } +} + +func (rateLimitSuite) TestPanics(c *gc.C) { + c.Assert(func() { NewBucket(0, 1) }, gc.PanicMatches, "token bucket fill interval is not > 0") + c.Assert(func() { NewBucket(-2, 1) }, gc.PanicMatches, "token bucket fill interval is not > 0") + c.Assert(func() { NewBucket(1, 0) }, gc.PanicMatches, "token bucket capacity is not > 0") + c.Assert(func() { NewBucket(1, -2) }, gc.PanicMatches, "token bucket capacity is not > 0") +} + +func isCloseTo(x, y, tolerance float64) bool { + return math.Abs(x-y)/y < tolerance +} + +func (rateLimitSuite) TestRate(c *gc.C) { + tb := NewBucket(1, 1) + if !isCloseTo(tb.Rate(), 1e9, 0.00001) { + c.Fatalf("got %v want 1e9", tb.Rate()) + } + tb = NewBucket(2*time.Second, 1) + if !isCloseTo(tb.Rate(), 0.5, 0.00001) { + c.Fatalf("got %v want 0.5", tb.Rate()) + } + tb = NewBucketWithQuantum(100*time.Millisecond, 1, 5) + if !isCloseTo(tb.Rate(), 50, 0.00001) { + c.Fatalf("got %v want 50", tb.Rate()) + } +} + +func checkRate(c *gc.C, rate float64) { + tb := NewBucketWithRate(rate, 1<<62) + if !isCloseTo(tb.Rate(), rate, rateMargin) { + c.Fatalf("got %g want %v", tb.Rate(), rate) + } + d, ok := tb.take(tb.startTime, 1<<62, infinityDuration) + c.Assert(ok, gc.Equals, true) + c.Assert(d, gc.Equals, time.Duration(0)) + + // Check that the actual rate is as expected by + // asking for a not-quite multiple of the bucket's + // quantum and checking that the wait time + // correct. + d, ok = tb.take(tb.startTime, tb.quantum*2-tb.quantum/2, infinityDuration) + c.Assert(ok, gc.Equals, true) + expectTime := 1e9 * float64(tb.quantum) * 2 / rate + if !isCloseTo(float64(d), expectTime, rateMargin) { + c.Fatalf("rate %g: got %g want %v", rate, float64(d), expectTime) + } +} + +func (rateLimitSuite) TestNewWithRate(c *gc.C) { + for rate := float64(1); rate < 1e6; rate += 7 { + checkRate(c, rate) + } + for _, rate := range []float64{ + 1024 * 1024 * 1024, + 1e-5, + 0.9e-5, + 0.5, + 0.9, + 0.9e8, + 3e12, + 4e18, + } { + checkRate(c, rate) + checkRate(c, rate/3) + checkRate(c, rate*1.3) + } +} + +func TestAvailable(t *testing.T) { + for i, tt := range availTests { + tb := NewBucket(tt.fillInterval, tt.capacity) + if c := tb.takeAvailable(tb.startTime, tt.take); c != tt.take { + t.Fatalf("#%d: %s, take = %d, want = %d", i, tt.about, c, tt.take) + } + if c := tb.available(tb.startTime); c != tt.expectCountAfterTake { + t.Fatalf("#%d: %s, after take, available = %d, want = %d", i, tt.about, c, tt.expectCountAfterTake) + } + if c := tb.available(tb.startTime.Add(tt.sleep)); c != tt.expectCountAfterSleep { + t.Fatalf("#%d: %s, after some time it should fill in new tokens, available = %d, want = %d", + i, tt.about, c, tt.expectCountAfterSleep) + } + } + +} + +func BenchmarkWait(b *testing.B) { + tb := NewBucket(1, 16*1024) + for i := b.N - 1; i >= 0; i-- { + tb.Wait(1) + } +} diff --git a/external-dns.go b/external-dns.go index a08d82d..922c8d3 100644 --- a/external-dns.go +++ b/external-dns.go @@ -5,7 +5,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/rancher/external-dns/dns" "strings" - "time" ) func UpdateProviderDnsRecords(metadataRecs map[string]dns.DnsRecord) ([]dns.DnsRecord, error) { @@ -41,7 +40,6 @@ func addMissingRecords(metadataRecs map[string]dns.DnsRecord, providerRecs map[s } func updateRecords(toChange []dns.DnsRecord, op *Op) []dns.DnsRecord { - count := 0 var changed []dns.DnsRecord for _, value := range toChange { switch *op { @@ -65,13 +63,6 @@ func updateRecords(toChange []dns.DnsRecord, op *Op) []dns.DnsRecord { changed = append(changed, value) } } - - // to workaround rate limit on Amazon (5 requests per second) - count = count + 1 - if count == 5 { - time.Sleep(1000 * time.Millisecond) - count = 0 - } } return changed } diff --git a/providers/route53.go b/providers/route53.go index 876bc92..ba03e26 100644 --- a/providers/route53.go +++ b/providers/route53.go @@ -2,6 +2,7 @@ package providers import ( "fmt" + "github.com/juju/ratelimit" "github.com/Sirupsen/logrus" "github.com/mitchellh/goamz/aws" "github.com/mitchellh/goamz/route53" @@ -18,6 +19,7 @@ var ( client *route53.Route53 hostedZone *route53.HostedZone region aws.Region + limiter *ratelimit.Bucket ) func init() { @@ -49,6 +51,9 @@ func init() { logrus.Fatalf("Failed to set hosted zone for root domain %s: %v", dns.RootDomainName, err) } + // Throttle Route53 API calls to 5 req/s + limiter = ratelimit.NewBucketWithRate(5.0, 1) + logrus.Infof("Configured %s with hosted zone \"%s\" in region \"%s\" ", route53Handler.GetName(), dns.RootDomainName, region.Name) } @@ -110,6 +115,7 @@ func (*Route53Handler) changeRecord(record dns.DnsRecord, action string) error { update := route53.Change{action, recordSet} changes := []route53.Change{update} req := route53.ChangeResourceRecordSetsRequest{Comment: "Updated by Rancher", Changes: changes} + limiter.Wait(1) _, err := client.ChangeResourceRecordSets(hostedZone.ID, &req) return err } @@ -117,7 +123,7 @@ func (*Route53Handler) changeRecord(record dns.DnsRecord, action string) error { func (*Route53Handler) GetRecords() ([]dns.DnsRecord, error) { var records []dns.DnsRecord opts := route53.ListOpts{} - + limiter.Wait(1) resp, err := client.ListResourceRecordSets(hostedZone.ID, &opts) if err != nil { return records, fmt.Errorf("Route53 API call has failed: %v", err)