Skip to content

Commit

Permalink
Merge pull request gocelery#42 from akkumar/redis_pool
Browse files Browse the repository at this point in the history
refactor: use standardized redis uri for dialer arguments
  • Loading branch information
yoonsio authored Aug 7, 2018
2 parents 442c6c1 + 8a490e4 commit b2f0dd7
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 36 deletions.
4 changes: 2 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
name = "github.com/satori/go.uuid"

[[constraint]]
name = "github.com/garyburd/redigo"
version = "1.6.0"
name = "github.com/gomodule/redigo"
version = "2.0.0"

[[constraint]]
branch = "master"
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func add(a int, b int) int {

func main() {
// create broker and backend
celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "")
celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "")
celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379")
celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379")

// use AMQP instead
// celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
Expand Down Expand Up @@ -155,8 +155,8 @@ Submit Task from Go Client
```go
func main() {
// create broker and backend
celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "")
celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "")
celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379")
celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379")

// use AMQP instead
// celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
Expand Down
6 changes: 3 additions & 3 deletions backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (

func getBackends() []CeleryBackend {
return []CeleryBackend{
NewRedisCeleryBackend("localhost:6379", ""),
NewRedisCeleryBackend("redis://localhost:6379"),
NewAMQPCeleryBackend("amqp://"),
}
}

// TestGetResult is Redis specific test to get result from backend
func TestGetResult(t *testing.T) {
backend := NewRedisCeleryBackend("localhost:6379", "")
backend := NewRedisCeleryBackend("redis://localhost:6379")
taskID := generateUUID()

// value must be float64 for testing due to json limitation
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestGetResult(t *testing.T) {

// TestSetResult is Redis specific test to set result to backend
func TestSetResult(t *testing.T) {
backend := NewRedisCeleryBackend("localhost:6379", "")
backend := NewRedisCeleryBackend("redis://localhost:6379")
taskID := generateUUID()
value := reflect.ValueOf(rand.Float64())
resultMessage := getReflectionResultMessage(&value)
Expand Down
6 changes: 3 additions & 3 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ func makeCeleryMessage() (*CeleryMessage, error) {
// test all brokers
func getBrokers() []CeleryBroker {
return []CeleryBroker{
NewRedisCeleryBroker("localhost:6379", ""),
NewRedisCeleryBroker("redis://localhost:6379"),
//NewAMQPCeleryBroker("amqp://"),
}
}

// TestSend is Redis specific test that sets CeleryMessage to queue
func TestSend(t *testing.T) {
broker := NewRedisCeleryBroker("localhost:6379", "")
broker := NewRedisCeleryBroker("redis://localhost:6379")
celeryMessage, err := makeCeleryMessage()
if err != nil || celeryMessage == nil {
t.Errorf("failed to construct celery message: %v", err)
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestSend(t *testing.T) {

// TestGet is Redis specific test that gets CeleryMessage from queue
func TestGet(t *testing.T) {
broker := NewRedisCeleryBroker("localhost:6379", "")
broker := NewRedisCeleryBroker("redis://localhost:6379")
celeryMessage, err := makeCeleryMessage()
if err != nil || celeryMessage == nil {
t.Errorf("failed to construct celery message: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions example/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
func main() {

// create broker and backend
celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "")
celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "")
celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379")
celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379")

// AMQP example
//celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
Expand Down
4 changes: 2 additions & 2 deletions example/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (a *AddTask) RunTask() (interface{}, error) {

func main() {
// create broker and backend
celeryBroker := gocelery.NewRedisCeleryBroker("localhost:6379", "")
celeryBackend := gocelery.NewRedisCeleryBackend("localhost:6379", "")
celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379")
celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379")

// AMQP example
//celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
Expand Down
4 changes: 2 additions & 2 deletions gocelery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func getAMQPClient() (*CeleryClient, error) {
}

func getRedisClient() (*CeleryClient, error) {
redisBroker := NewRedisCeleryBroker("localhost:6379", "")
redisBackend := NewRedisCeleryBackend("localhost:6379", "")
redisBroker := NewRedisCeleryBroker("redis://localhost:6379")
redisBackend := NewRedisCeleryBackend("redis://localhost:6379")
return NewCeleryClient(redisBroker, redisBackend, 1)
}

Expand Down
6 changes: 3 additions & 3 deletions redis_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"encoding/json"
"fmt"

"github.com/garyburd/redigo/redis"
"github.com/gomodule/redigo/redis"
)

// RedisCeleryBackend is CeleryBackend for Redis
Expand All @@ -13,9 +13,9 @@ type RedisCeleryBackend struct {
}

// NewRedisCeleryBackend creates new RedisCeleryBackend
func NewRedisCeleryBackend(host, pass string) *RedisCeleryBackend {
func NewRedisCeleryBackend(uri string) *RedisCeleryBackend {
return &RedisCeleryBackend{
Pool: NewRedisPool(host, pass),
Pool: NewRedisPool(uri),
}
}

Expand Down
20 changes: 7 additions & 13 deletions redis_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync"
"time"

"github.com/garyburd/redigo/redis"
"github.com/gomodule/redigo/redis"
)

// RedisCeleryBroker is CeleryBroker for Redis
Expand All @@ -17,22 +17,16 @@ type RedisCeleryBroker struct {
workWG sync.WaitGroup
}

// NewRedisPool creates pool of redis connections
func NewRedisPool(host, pass string) *redis.Pool {
// NewRedisPool creates pool of redis connections from given uri
func NewRedisPool(uri string) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", host)
c, err := redis.DialURL(uri)
if err != nil {
return nil, err
}
if pass != "" {
if _, err = c.Do("AUTH", pass); err != nil {
c.Close()
return nil, err
}
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
Expand All @@ -42,10 +36,10 @@ func NewRedisPool(host, pass string) *redis.Pool {
}
}

// NewRedisCeleryBroker creates new RedisCeleryBroker
func NewRedisCeleryBroker(host, pass string) *RedisCeleryBroker {
// NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri
func NewRedisCeleryBroker(uri string) *RedisCeleryBroker {
return &RedisCeleryBroker{
Pool: NewRedisPool(host, pass),
Pool: NewRedisPool(uri),
queueName: "celery",
}
}
Expand Down
4 changes: 2 additions & 2 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ func add(a int, b int) int {

// newCeleryWorker creates celery worker
func newCeleryWorker(numWorkers int) *CeleryWorker {
broker := NewRedisCeleryBroker("localhost:6379", "")
backend := NewRedisCeleryBackend("localhost:6379", "")
broker := NewRedisCeleryBroker("redis://localhost:6379")
backend := NewRedisCeleryBackend("redis://localhost:6379")
celeryWorker := NewCeleryWorker(broker, backend, numWorkers)
return celeryWorker
}
Expand Down

0 comments on commit b2f0dd7

Please sign in to comment.