Skip to content

Commit

Permalink
Implement a REQUEUE command (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
lackstein authored Dec 2, 2022
1 parent 87d22d1 commit c56528a
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 0 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ arguments for jobs can be passed using `args = [1,2,3]` or using `[[cron.job.arg

Jobs can be configured to automatically expire after a preset time has passed by setting the `expires_at` custom attribute to an ISO8601 timestamp. Alternatively, you can set the `expires_in` custom attribute to a Golang Duration string to expire a job a given duration after it was enqueued.

### Requeue Jobs

Implements a `REQUEUE {"jid": "..."}` command that ACKs the given job and then immediately requeues it to the beginning of the queue. This can be useful if you need a worker to give up execution of a job without impacting the retry count, for example if the worker is scaling down.

## License

All code in this repository is licensed under the AGPL.
2 changes: 2 additions & 0 deletions cmd/faktory/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/fossas/faktory-plugins/cron"
"github.com/fossas/faktory-plugins/expire"
"github.com/fossas/faktory-plugins/metrics"
"github.com/fossas/faktory-plugins/requeue"
"github.com/fossas/faktory-plugins/uniq"
)

Expand Down Expand Up @@ -58,6 +59,7 @@ func main() {
s.Register(new(cron.CronSubsystem))
s.Register(new(batch.BatchSubsystem))
s.Register(new(expire.ExpireSubsystem))
s.Register(new(requeue.RequeueSubsystem))
go cli.HandleSignals(s)

go func() {
Expand Down
137 changes: 137 additions & 0 deletions requeue/requeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package requeue

import (
"context"
"encoding/json"
"fmt"
"reflect"

"github.com/contribsys/faktory/client"
"github.com/contribsys/faktory/manager"
"github.com/contribsys/faktory/server"
"github.com/contribsys/faktory/util"
)

var _ server.Subsystem = &RequeueSubsystem{}

// RequeueSubsystem allows a client to indicate that it is not able to complete
// the job and that it should be retried.
type RequeueSubsystem struct{}

// Start loads the plugin.
func (r *RequeueSubsystem) Start(s *server.Server) error {
server.CommandSet["REQUEUE"] = r.requeueCommand
util.Info("Loaded requeue jobs plugin")
return nil
}

// Name returns the name of the plugin.
func (r *RequeueSubsystem) Name() string {
return "Requeue"
}

// Reload does not do anything.
func (r *RequeueSubsystem) Reload(s *server.Server) error {
return nil
}

// requeueCommand implements the REQUEUE client command which
// - ACKs the job
// - Requeues the job to the _front_ of the queue
// REQUEUE {"jid":"123456789"}
func (r *RequeueSubsystem) requeueCommand(c *server.Connection, s *server.Server, cmd string) {
data := cmd[8:]

var hash map[string]string
err := json.Unmarshal([]byte(data), &hash)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid REQUEUE %s", data))
return
}
jid, ok := hash["jid"]
if !ok {
_ = c.Error(cmd, fmt.Errorf("invalid REQUEUE %s", data))
return
}
job, err := s.Manager().Acknowledge(jid)
if err != nil {
_ = c.Error(cmd, err)
return
}

q, err := s.Store().GetQueue(job.Queue)
if err != nil {
_ = c.Error(cmd, err)
return
}

pushChain := pushChain(s.Manager())
ctx := newMiddlewareCtx(context.Background(), job, s.Manager())
// Our final function in the middleware chain is based off of the
// manager.Push -> manager.enqueue -> redisQueue.Push implementation upstream.
// https://github.com/contribsys/faktory/blob/v1.6.2/manager/manager.go#L233
// https://github.com/contribsys/faktory/blob/v1.6.2/manager/manager.go#L254
// https://github.com/contribsys/faktory/blob/v1.6.2/storage/queue_redis.go#L104
err = callMiddleware(pushChain, ctx, func() error {
job.EnqueuedAt = util.Nows()
jdata, err := json.Marshal(job)
if err != nil {
return err
}
// redisQueue.Push does an LPUSH and redisQueue.Pop does an RPOP, so to
// insert a job at the front of the queue we want to do an RPUSH.
s.Manager().Redis().RPush(q.Name(), jdata)
return nil
})
if err != nil {
_ = c.Error(cmd, err)
return
}

_ = c.Ok()
}

// callMiddleware runs the given job through the given middleware chain.
// `final` is the function called if the entire chain passes the job along.
// Copied from https://github.com/contribsys/faktory/blob/v1.6.2/manager/manager.go#L182
func callMiddleware(chain manager.MiddlewareChain, ctx manager.Context, final func() error) error {
if len(chain) == 0 {
return final()
}

link := chain[0]
rest := chain[1:]
return link(func() error { return callMiddleware(rest, ctx, final) }, ctx)
}

// pushChain pulls the PUSH middleware chain out of a manager.manager
// https://github.com/contribsys/faktory/blob/v1.6.2/manager/manager.go#L183
func pushChain(mgr manager.Manager) manager.MiddlewareChain {
rm := reflect.ValueOf(mgr).Elem()
rf := rm.FieldByName("pushChain")
rf = reflect.NewAt(rf.Type(), rf.Addr().UnsafePointer()).Elem()
return rf.Interface().(manager.MiddlewareChain)
}

// newMiddlewareCtx builds a manager.Ctx.
// https://github.com/contribsys/faktory/blob/v1.6.2/manager/middleware.go#L20
// We need to use reflection here to set the job and mgr because they're
// unexported fields that middlewares will expect to be set.
// We need to create new Values for each field because reflect does not allow
// reading or setting unexported fields.
func newMiddlewareCtx(ctx context.Context, job *client.Job, mgr manager.Manager) manager.Ctx {
mctx := manager.Ctx{Context: ctx}
rmctx := reflect.ValueOf(&mctx).Elem()

rjob := rmctx.FieldByName("job")
rjob = reflect.NewAt(rjob.Type(), rjob.Addr().UnsafePointer()).Elem()
rjob.Set(reflect.ValueOf(job))

rmgr := rmctx.FieldByName("mgr")
rmgr = reflect.NewAt(rmgr.Type(), rmgr.Addr().UnsafePointer()).Elem()
// s.Manager()'s type is the interface manager.Manager but we need a *manager.manager
// so we're getting the underlying value's address.
rmgr.Set(reflect.ValueOf(mgr).Elem().Addr())

return mctx
}
137 changes: 137 additions & 0 deletions requeue/requeue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package requeue

import (
"fmt"
"math/rand"
"os"
"strconv"
"testing"

"github.com/contribsys/faktory/cli"
"github.com/contribsys/faktory/client"
"github.com/contribsys/faktory/manager"
"github.com/contribsys/faktory/server"
"github.com/stretchr/testify/assert"
)

func TestRequeue(t *testing.T) {
withServer(func(s *server.Server, cl *client.Client) {
j1 := client.NewJob("JobOne", 1)
err := cl.Push(j1)
assert.Nil(t, err)

j2 := client.NewJob("JobTwo", 2)
err = cl.Push(j2)
assert.Nil(t, err)

job, _ := cl.Fetch("default")
_, err = cl.Generic(fmt.Sprintf(`REQUEUE {"jid":%q}`, job.Jid))
assert.Nil(t, err)

// j2 would be at the front of the queue if j1 had ACKed without REQUEUE, so
// if the fetched job is j1 then REQUEUE successfully put j1 at the front of
// the queue.
job, _ = cl.Fetch("default")
assert.Equal(t, j1.Jid, job.Jid)
// Verify the j2 is next
job, _ = cl.Fetch("default")
assert.Equal(t, j2.Jid, job.Jid)
// Verify that the queue is empty
job, _ = cl.Fetch("default")
assert.Nil(t, job)
})
}

func TestPushMiddleware(t *testing.T) {
withServer(func(s *server.Server, cl *client.Client) {
// Sentinel values
attr := "push_chain"
val := "hello world"

// Queue a job without any custom attributes being set
j1 := client.NewJob("JobOne", 1)
err := cl.Push(j1)
assert.Nil(t, err)
_, ok := j1.GetCustom(attr)
assert.False(t, ok)

// Add a PUSH middleware that will run when the job is REQUEUEd
// This sets the custom attribute
s.Manager().AddMiddleware("push", func(next func() error, ctx manager.Context) error {
ctx.Job().SetCustom(attr, val)
return next()
})

// Fetch and REQUEUE the job
cl.Fetch("default")
_, err = cl.Generic(fmt.Sprintf(`REQUEUE {"jid":%q}`, j1.Jid))
assert.Nil(t, err)

// Check that the middleware ran and added the custom attribute
j2, _ := cl.Fetch("default")
assert.Equal(t, j1.Jid, j2.Jid)
v, ok := j2.GetCustom(attr)
assert.True(t, ok)
assert.Equal(t, val, v)
})
}

func withServer(runner func(s *server.Server, cl *client.Client)) {
dir := "/tmp/requeue_test.db"
defer os.RemoveAll(dir)

opts := &cli.CliOptions{
CmdBinding: "localhost:7414",
Environment: "development",
ConfigDirectory: ".",
LogLevel: "debug",
StorageDirectory: dir,
}
s, stopper, err := cli.BuildServer(opts)
if err != nil {
panic(err)
}
defer stopper()

go cli.HandleSignals(s)

err = s.Boot()
if err != nil {
panic(err)
}
s.Register(new(RequeueSubsystem))

go func() {
err := s.Run()
if err != nil {
panic(err)
}
}()

cl, err := getClient()
if err != nil {
panic(err)
}
defer cl.Close()

runner(s, cl)
close(s.Stopper())
s.Stop(nil)
}

func getClient() (*client.Client, error) {
// this is a worker process so we need to set the global WID before connecting
client.RandomProcessWid = strconv.FormatInt(rand.Int63(), 32)

srv := client.DefaultServer()
srv.Address = "localhost:7414"
cl, err := client.Dial(srv, "123456")
if err != nil {
return nil, err
}
if _, err = cl.Beat(); err != nil {
return nil, err
}

return cl, nil
}

0 comments on commit c56528a

Please sign in to comment.