Skip to content

Commit

Permalink
Introduce redis replication wait option for enqueuer (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
chopraanmol1 authored May 25, 2023
1 parent d85e706 commit 075fbb8
Show file tree
Hide file tree
Showing 5 changed files with 679 additions and 4 deletions.
63 changes: 60 additions & 3 deletions enqueue.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package work

import (
"errors"
"sync"
"time"

"github.com/gomodule/redigo/redis"
)

var ErrReplicationFailed = errors.New("replication failed")

// Enqueuer can enqueue jobs.
type Enqueuer struct {
Namespace string // eg, "myapp-work"
Pool *redis.Pool
Option EnqueuerOption

queuePrefix string // eg, "myapp-work:jobs:"
knownJobs map[string]int64
Expand All @@ -19,15 +23,27 @@ type Enqueuer struct {
mtx sync.RWMutex
}

// EnqueuerOption can be passed to NewEnqueuerWithOptions.
type EnqueuerOption struct {
MinWaitReplicas int // MinWaitReplicas is passed as numreplicas in redis wait command, if zero then skips wait command altogether
MaxWaitTimeoutMS int // MaxWaitTimeoutMS is passed as timeout in redis wait command
}

// NewEnqueuer creates a new enqueuer with the specified Redis namespace and Redis pool.
func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer {
return NewEnqueuerWithOptions(namespace, pool, EnqueuerOption{})
}

// NewEnqueuerWithOptions creates a new enqueuer with the specified Redis namespace, Redis pool and Enqueuer option.
func NewEnqueuerWithOptions(namespace string, pool *redis.Pool, opt EnqueuerOption) *Enqueuer {
if pool == nil {
panic("NewEnqueuer needs a non-nil *redis.Pool")
}

return &Enqueuer{
Namespace: namespace,
Pool: pool,
Option: opt,
queuePrefix: redisKeyJobsPrefix(namespace),
knownJobs: make(map[string]int64),
enqueueUniqueScript: redis.NewScript(2, redisLuaEnqueueUnique),
Expand All @@ -53,7 +69,7 @@ func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, e
conn := e.Pool.Get()
defer conn.Close()

if _, err := conn.Do("LPUSH", e.queuePrefix+jobName, rawJSON); err != nil {
if _, err := e.redisDoHelper(conn, "LPUSH", e.queuePrefix+jobName, rawJSON); err != nil {
return nil, err
}

Expand Down Expand Up @@ -86,7 +102,7 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri
Job: job,
}

_, err = conn.Do("ZADD", redisKeyScheduled(e.Namespace), scheduledJob.RunAt, rawJSON)
_, err = e.redisDoHelper(conn, "ZADD", redisKeyScheduled(e.Namespace), scheduledJob.RunAt, rawJSON)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -238,8 +254,49 @@ func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{},
script = e.enqueueUniqueInScript
}

return redis.String(script.Do(conn, scriptArgs...))
status, err := redis.String(script.Do(conn, scriptArgs...))
if err != nil {
return "", err
}
if e.Option.MinWaitReplicas > 0 {
numReplicas, err := redis.Int(conn.Do("WAIT", e.Option.MinWaitReplicas, e.Option.MaxWaitTimeoutMS))
if err != nil {
return "", err
}
if numReplicas < e.Option.MinWaitReplicas {
return "", ErrReplicationFailed
}
}
return status, err
}

return enqueueFn, job, nil
}

func (e *Enqueuer) redisDoHelper(c redis.Conn, cmdName string, args ...interface{}) (reply interface{}, err error) {
if err = c.Send(cmdName, args...); err != nil {
return
}
if e.Option.MinWaitReplicas > 0 {
if err = c.Send("WAIT", e.Option.MinWaitReplicas, e.Option.MaxWaitTimeoutMS); err != nil {
return
}
}

c.Flush()

reply, err = c.Receive()
if err != nil {
return
}
if e.Option.MinWaitReplicas > 0 {
var numReplicas int
if numReplicas, err = redis.Int(c.Receive()); err != nil {
return
}
if numReplicas < e.Option.MinWaitReplicas {
err = ErrReplicationFailed
}
}
return
}
Loading

0 comments on commit 075fbb8

Please sign in to comment.