Skip to content

Commit

Permalink
redis: add the option to use a separate redis pool for per second lim…
Browse files Browse the repository at this point in the history
…its (#41)
  • Loading branch information
junr03 authored Jul 6, 2018
1 parent 1189b6a commit ff278b8
Show file tree
Hide file tree
Showing 8 changed files with 390 additions and 202 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ sudo: required
language: go
go: "1.10"
services: redis-server
env:
- REDIS_SOCKET_TYPE=tcp REDIS_URL="localhost:6379"
install: make bootstrap
before_script: redis-server --port 6380 &
script: make check_format tests
45 changes: 40 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
- [Request Fields](#request-fields)
- [Statistics](#statistics)
- [Debug Port](#debug-port)
- [Redis](#redis)
- [One Redis Instance](#one-redis-instance)
- [Two Redis Instances](#two-redis-instances)
- [Contact](#contact)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->
Expand All @@ -28,7 +31,7 @@

The rate limit service is a Go/gRPC service designed to enable generic rate limit scenarios from different types of
applications. Applications request a rate limit decision based on a domain and a set of descriptors. The service
reads the configuration from disk via [runtime](https://github.com/lyft/goruntime), composes a cache key, and talks to the redis cache. A
reads the configuration from disk via [runtime](https://github.com/lyft/goruntime), composes a cache key, and talks to the Redis cache. A
decision is then returned to the caller.

# Deprecation of Legacy Ratelimit Proto
Expand All @@ -55,13 +58,13 @@ to give time to community members running ratelimit off of `master`.

# Building and Testing

* Install redis-server.
* Install Redis-server.
* Make sure go is setup correctly and checkout rate limit service into your go path. More information about installing
go [here](https://golang.org/doc/install).
* In order to run the integration tests using a local default redis install you will also need these environment variables set:
* In order to run the integration tests using a local Redis server please run two Redis-server instances: one on port `6379` and another on port `6380`
```bash
export REDIS_SOCKET_TYPE=tcp
export REDIS_URL=localhost:6379
Redis-server --port 6379 &
Redis-server --port 6380 &
```
* To setup for the first time (only done once):
```bash
Expand Down Expand Up @@ -352,6 +355,38 @@ $ curl 0:6070/

You can specify the debug port with the `DEBUG_PORT` environment variable. It defaults to `6070`.

# Redis

Ratelimit uses Redis as its caching layer. Ratelimit supports two operation modes:

1. One Redis server for all limits.
1. Two Redis instances: one for per second limits and another one for all other limits.

## One Redis Instance

To configure one Redis instance use the following environment variables:

1. `REDIS_SOCKET_TYPE`
1. `REDIS_URL`
1. `REDIS_POOL_SIZE`

This setup will use the same Redis server for all limits.

## Two Redis Instances

To configure two Redis instances use the following environment variables:

1. `REDIS_SOCKET_TYPE`
1. `REDIS_URL`
1. `REDIS_POOL_SIZE`
1. `REDIS_PERSECOND`: set this to `"true"`.
1. `REDIS_PERSECOND_SOCKET_TYPE`
1. `REDIS_PERSECOND_URL`
1. `REDIS_PERSECOND_POOL_SIZE`

This setup will use the Redis server configured with the `_PERSECOND_` vars for
per second limits, and the other Redis server for all other limits.

# Contact

* [envoy-announce](https://groups.google.com/forum/#!forum/envoy-announce): Low frequency mailing
Expand Down
119 changes: 99 additions & 20 deletions src/redis/cache_impl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redis

import (
"bytes"
"math"
"math/rand"
"strconv"
Expand All @@ -16,10 +17,17 @@ import (
)

type rateLimitCacheImpl struct {
pool Pool
pool Pool
// Optional Pool for a dedicated cache of per second limits.
// If this pool is nil, then the Cache will use the pool for all
// limits regardless of unit. If this pool is not nil, then it
// is used for limits that have a SECOND unit.
perSecondPool Pool
timeSource TimeSource
jitterRand *rand.Rand
expirationJitterMaxSeconds int64
// bytes.Buffer pool used to efficiently generate cache keys.
bufferPool sync.Pool
}

// Convert a rate limit into a time divider.
Expand All @@ -45,23 +53,41 @@ func unitToDivider(unit pb.RateLimitResponse_RateLimit_Unit) int64 {
// @param descriptor supplies the descriptor to generate the key for.
// @param limit supplies the rate limit to generate the key for (may be nil).
// @param now supplies the current unix time.
// @return the cache key.
// @return cacheKey struct.
func (this *rateLimitCacheImpl) generateCacheKey(
domain string, descriptor *pb_struct.RateLimitDescriptor, limit *config.RateLimit, now int64) string {
domain string, descriptor *pb_struct.RateLimitDescriptor, limit *config.RateLimit, now int64) cacheKey {

if limit == nil {
return ""
return cacheKey{
key: "",
perSecond: false,
}
}

var cacheKey string = domain + "_"
b := this.bufferPool.Get().(*bytes.Buffer)
defer this.bufferPool.Put(b)
b.Reset()

b.WriteString(domain)
b.WriteByte('_')

for _, entry := range descriptor.Entries {
cacheKey += entry.Key + "_"
cacheKey += entry.Value + "_"
b.WriteString(entry.Key)
b.WriteByte('_')
b.WriteString(entry.Value)
b.WriteByte('_')
}

divider := unitToDivider(limit.Limit.Unit)
cacheKey += strconv.FormatInt((now/divider)*divider, 10)
return cacheKey
b.WriteString(strconv.FormatInt((now/divider)*divider, 10))

return cacheKey{
key: b.String(),
perSecond: isPerSecondLimit(limit.Limit.Unit)}
}

func isPerSecondLimit(unit pb.RateLimitResponse_RateLimit_Unit) bool {
return unit == pb.RateLimitResponse_RateLimit_SECOND
}

func max(a uint32, b uint32) uint32 {
Expand All @@ -71,22 +97,50 @@ func max(a uint32, b uint32) uint32 {
return b
}

type cacheKey struct {
key string
// True if the key corresponds to a limit with a SECOND unit. False otherwise.
perSecond bool
}

func pipelineAppend(conn Connection, key string, hitsAddend uint32, expirationSeconds int64) {
conn.PipeAppend("INCRBY", key, hitsAddend)
conn.PipeAppend("EXPIRE", key, expirationSeconds)
}

func pipelineFetch(conn Connection) uint32 {
ret := uint32(conn.PipeResponse().Int())
// Pop off EXPIRE response and check for error.
conn.PipeResponse()
return ret
}

func (this *rateLimitCacheImpl) DoLimit(
ctx context.Context,
request *pb.RateLimitRequest,
limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus {

logger.Debugf("starting cache lookup")

conn := this.pool.Get()
defer this.pool.Put(conn)

// Optional connection for per second limits. If the cache has a perSecondPool setup,
// then use a connection from the pool for per second limits.
var perSecondConn Connection = nil
if this.perSecondPool != nil {
perSecondConn = this.perSecondPool.Get()
defer this.perSecondPool.Put(perSecondConn)
}

// request.HitsAddend could be 0 (default value) if not specified by the caller in the Ratelimit request.
hitsAddend := max(1, request.HitsAddend)

// First build a list of all cache keys that we are actually going to hit. generateCacheKey()
// returns "" if there is no limit so that we can keep the arrays all the same size.
// returns an empty string in the key if there is no limit so that we can keep the arrays
// all the same size.
assert.Assert(len(request.Descriptors) == len(limits))
cacheKeys := make([]string, len(request.Descriptors))
cacheKeys := make([]cacheKey, len(request.Descriptors))
now := this.timeSource.UnixNow()
for i := 0; i < len(request.Descriptors); i++ {
cacheKeys[i] = this.generateCacheKey(request.Domain, request.Descriptors[i], limits[i], now)
Expand All @@ -99,7 +153,7 @@ func (this *rateLimitCacheImpl) DoLimit(

// Now, actually setup the pipeline, skipping empty cache keys.
for i, cacheKey := range cacheKeys {
if cacheKey == "" {
if cacheKey.key == "" {
continue
}
logger.Debugf("looking up cache key: %s", cacheKey)
Expand All @@ -109,15 +163,19 @@ func (this *rateLimitCacheImpl) DoLimit(
expirationSeconds += this.jitterRand.Int63n(this.expirationJitterMaxSeconds)
}

conn.PipeAppend("INCRBY", cacheKey, hitsAddend)
conn.PipeAppend("EXPIRE", cacheKey, expirationSeconds)
// Use the perSecondConn if it is not nil and the cacheKey represents a per second Limit.
if perSecondConn != nil && cacheKey.perSecond {
pipelineAppend(perSecondConn, cacheKey.key, hitsAddend, expirationSeconds)
} else {
pipelineAppend(conn, cacheKey.key, hitsAddend, expirationSeconds)
}
}

// Now fetch the pipeline.
responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus,
len(request.Descriptors))
for i, cacheKey := range cacheKeys {
if cacheKey == "" {
if cacheKey.key == "" {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Code: pb.RateLimitResponse_OK,
Expand All @@ -126,16 +184,22 @@ func (this *rateLimitCacheImpl) DoLimit(
}
continue
}
limitAfterIncrease := uint32(conn.PipeResponse().Int())
conn.PipeResponse() // Pop off EXPIRE response and check for error.

var limitAfterIncrease uint32
// Use the perSecondConn if it is not nil and the cacheKey represents a per second Limit.
if this.perSecondPool != nil && cacheKey.perSecond {
limitAfterIncrease = pipelineFetch(perSecondConn)
} else {
limitAfterIncrease = pipelineFetch(conn)
}

limitBeforeIncrease := limitAfterIncrease - hitsAddend
overLimitThreshold := limits[i].Limit.RequestsPerUnit
// The nearLimitThreshold is the number of requests that can be made before hitting the NearLimitRatio.
// We need to know it in both the OK and OVER_LIMIT scenarios.
nearLimitThreshold := uint32(math.Floor(float64(float32(overLimitThreshold) * config.NearLimitRatio)))

logger.Debugf("cache key: %s current: %d", cacheKey, limitAfterIncrease)
logger.Debugf("cache key: %s current: %d", cacheKey.key, limitAfterIncrease)
if limitAfterIncrease > overLimitThreshold {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Expand Down Expand Up @@ -184,8 +248,23 @@ func (this *rateLimitCacheImpl) DoLimit(
return responseDescriptorStatuses
}

func NewRateLimitCacheImpl(pool Pool, timeSource TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64) RateLimitCache {
return &rateLimitCacheImpl{pool, timeSource, jitterRand, expirationJitterMaxSeconds}
func NewRateLimitCacheImpl(pool Pool, perSecondPool Pool, timeSource TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64) RateLimitCache {
return &rateLimitCacheImpl{
pool: pool,
perSecondPool: perSecondPool,
timeSource: timeSource,
jitterRand: jitterRand,
expirationJitterMaxSeconds: expirationJitterMaxSeconds,
bufferPool: newBufferPool(),
}
}

func newBufferPool() sync.Pool {
return sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
}

type timeSourceImpl struct{}
Expand Down
13 changes: 6 additions & 7 deletions src/redis/driver_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package redis
import (
"github.com/lyft/gostats"
"github.com/lyft/ratelimit/src/assert"
"github.com/lyft/ratelimit/src/settings"
"github.com/mediocregopher/radix.v2/pool"
"github.com/mediocregopher/radix.v2/redis"
logger "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -65,13 +64,13 @@ func (this *poolImpl) Put(c Connection) {
}
}

func NewPoolImpl(scope stats.Scope) Pool {
s := settings.NewSettings()

logger.Warnf("connecting to redis on %s %s with pool size %d", s.RedisSocketType, s.RedisUrl, s.RedisPoolSize)
pool, err := pool.New(s.RedisSocketType, s.RedisUrl, s.RedisPoolSize)
func NewPoolImpl(scope stats.Scope, socketType string, url string, poolSize int) Pool {
logger.Warnf("connecting to redis on %s %s with pool size %d", socketType, url, poolSize)
pool, err := pool.New(socketType, url, poolSize)
checkError(err)
return &poolImpl{pool, newPoolStats(scope)}
return &poolImpl{
pool: pool,
stats: newPoolStats(scope)}
}

func (this *connectionImpl) PipeAppend(cmd string, args ...interface{}) {
Expand Down
10 changes: 9 additions & 1 deletion src/service_cmd/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@ func Run() {
srv := server.NewServer("ratelimit", settings.GrpcUnaryInterceptor(nil))

s := settings.NewSettings()

var perSecondPool redis.Pool
if s.RedisPerSecond {
perSecondPool = redis.NewPoolImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondSocketType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize)

}

service := ratelimit.NewService(
srv.Runtime(),
redis.NewRateLimitCacheImpl(
redis.NewPoolImpl(srv.Scope().Scope("redis_pool")),
redis.NewPoolImpl(srv.Scope().Scope("redis_pool"), s.RedisSocketType, s.RedisUrl, s.RedisPoolSize),
perSecondPool,
redis.NewTimeSourceImpl(),
rand.New(redis.NewLockedSource(time.Now().Unix())),
s.ExpirationJitterMaxSeconds),
Expand Down
4 changes: 4 additions & 0 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type Settings struct {
RedisSocketType string `envconfig:"REDIS_SOCKET_TYPE" default:"unix"`
RedisUrl string `envconfig:"REDIS_URL" default:"/var/run/nutcracker/ratelimit.sock"`
RedisPoolSize int `envconfig:"REDIS_POOL_SIZE" default:"10"`
RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"`
RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"`
RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"`
RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"`
ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"`
}

Expand Down
Loading

0 comments on commit ff278b8

Please sign in to comment.