Skip to content

Commit

Permalink
Bug fixes (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
lackstein authored Feb 9, 2023
1 parent c56528a commit c1bd539
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 7 deletions.
3 changes: 1 addition & 2 deletions batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ func withServer(batchSystem *BatchSubsystem, enabled bool, runner func(cl *clien
defer os.RemoveAll(dir)
opts := &cli.CliOptions{
CmdBinding: "localhost:7416",
WebBinding: "localhost:7420",
Environment: "development",
ConfigDirectory: ".",
LogLevel: "debug",
Expand Down Expand Up @@ -458,10 +457,10 @@ func withServer(batchSystem *BatchSubsystem, enabled bool, runner func(cl *clien
}()

cl, err := getClient()
defer cl.Close()
if err != nil {
panic(err)
}
defer cl.Close()

runner(cl)
close(s.Stopper())
Expand Down
1 change: 1 addition & 0 deletions expire/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (e *ExpireSubsystem) parseExpiration(next func() error, ctx manager.Context
return fmt.Errorf("expire: Could not parse expires_in: %w", err)
}
ctx.Job().SetExpiresIn(duration)
delete(ctx.Job().Custom, "expires_in")
}

return next()
Expand Down
147 changes: 147 additions & 0 deletions expire/expire_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package expire

import (
"fmt"
"math/rand"
"os"
"strconv"
"testing"
"time"

"github.com/contribsys/faktory/cli"
"github.com/contribsys/faktory/client"
"github.com/contribsys/faktory/server"
"github.com/stretchr/testify/assert"
)

func TestExpiresAt(t *testing.T) {
withServer(func(s *server.Server, cl *client.Client) {
// Push a job that has already expired
j1 := client.NewJob("JobOne", 1)
j1.SetExpiresAt(time.Now().Add(time.Minute * -1))
err := cl.Push(j1)
assert.Nil(t, err)

// Push a job that will expire
j2 := client.NewJob("JobOne", 1)
j2.SetExpiresAt(time.Now().Add(time.Minute * 1))
err = cl.Push(j2)
assert.Nil(t, err)

// The fetched job should be j2 because j1 is expired
job, err := cl.Fetch("default")
assert.Nil(t, err)
assert.Equal(t, j2.Jid, job.Jid)

// Verify that the queue is empty
job, err = cl.Fetch("default")
assert.Nil(t, err)
assert.Nil(t, job)
})
}

func TestExpiresIn(t *testing.T) {
withServer(func(s *server.Server, cl *client.Client) {
// Push a job that has already expired
j1 := client.NewJob("JobOne", 1)
j1.SetExpiresIn(time.Minute * -1)
err := cl.Push(j1)
assert.Nil(t, err)

// Push a job that will expire
j2 := client.NewJob("JobOne", 1)
j2.SetExpiresIn(time.Minute * 1)
err = cl.Push(j2)
assert.Nil(t, err)

// The fetched job should be j2 because j1 is expired
job, err := cl.Fetch("default")
assert.Nil(t, err)
assert.Equal(t, j2.Jid, job.Jid)

// Verify that the queue is empty
job, err = cl.Fetch("default")
assert.Nil(t, err)
assert.Nil(t, job)
})
}

// The `expires_in` value is converted to a timestamp and saved as `expires_at`
func TestRemovesExpiresIn(t *testing.T) {
withServer(func(s *server.Server, cl *client.Client) {
// Push a job that has already expired
j1 := client.NewJob("JobOne", 1)
j1.SetCustom("expires_in", (time.Duration(1) * time.Minute).String())
err := cl.Push(j1)
assert.Nil(t, err)

// The fetched job should not have `expires_in` set
job, err := cl.Fetch("default")
assert.Nil(t, err)
assert.Equal(t, j1.Jid, job.Jid)
jobExpIn, _ := job.GetCustom("expires_in")
assert.Nil(t, jobExpIn)
jobExpAt, _ := job.GetCustom("expires_at")
assert.NotNil(t, jobExpAt)
})
}

func withServer(runner func(s *server.Server, cl *client.Client)) {
dir := fmt.Sprintf("/tmp/expire_test_%d.db", rand.Int())
defer os.RemoveAll(dir)

opts := &cli.CliOptions{
CmdBinding: "localhost:7414",
Environment: "development",
ConfigDirectory: ".",
LogLevel: "debug",
StorageDirectory: dir,
}
s, stopper, err := cli.BuildServer(opts)
if err != nil {
panic(err)
}
defer stopper()

go cli.HandleSignals(s)

err = s.Boot()
if err != nil {
panic(err)
}
s.Register(new(ExpireSubsystem))

go func() {
err := s.Run()
if err != nil {
panic(err)
}
}()

cl, err := getClient()
if err != nil {
panic(err)
}
defer cl.Close()

runner(s, cl)
close(s.Stopper())
s.Stop(nil)
}

func getClient() (*client.Client, error) {
// this is a worker process so we need to set the global WID before connecting
client.RandomProcessWid = strconv.FormatInt(rand.Int63(), 32)

srv := client.DefaultServer()
srv.Address = "localhost:7414"
cl, err := client.Dial(srv, "123456")
if err != nil {
return nil, err
}
if _, err = cl.Beat(); err != nil {
return nil, err
}

return cl, nil
}
3 changes: 1 addition & 2 deletions metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,10 @@ func createJob(queue string, jobtype string, args ...interface{}) *client.Job {
}

func runSystem(configDir string, runner func(s *server.Server, cl *client.Client)) {
dir := "/tmp/batching_system.db"
dir := fmt.Sprintf("/tmp/metrics_test_%d.db", rand.Int())
defer os.RemoveAll(dir)
opts := &cli.CliOptions{
CmdBinding: "localhost:7418",
WebBinding: "localhost:7420",
Environment: "development",
ConfigDirectory: configDir,
LogLevel: "debug",
Expand Down
6 changes: 6 additions & 0 deletions requeue/requeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func (r *RequeueSubsystem) requeueCommand(c *server.Connection, s *server.Server
_ = c.Error(cmd, err)
return
}
// s.Manager().Acknowledge() will return (nil, nil) if there's no in-memory
// reservation for the given jid.
if job == nil {
_ = c.Error(cmd, fmt.Errorf("requeue: Can't requeue job with no reservation"))
return
}

q, err := s.Store().GetQueue(job.Queue)
if err != nil {
Expand Down
23 changes: 20 additions & 3 deletions requeue/requeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,29 @@ func TestPushMiddleware(t *testing.T) {
})
}

func TestNoReservation(t *testing.T) {
withServer(func(s *server.Server, cl *client.Client) {
j1 := client.NewJob("JobOne", 1)
err := cl.Push(j1)
assert.Nil(t, err)

job, _ := cl.Fetch("default")

// Acknowledge the job so that `manager.clearReservation` is called
s.Manager().Acknowledge(job.Jid)

// Attempting to requeue should return an error
_, err = cl.Generic(fmt.Sprintf(`REQUEUE {"jid":%q}`, job.Jid))
assert.ErrorContains(t, err, "requeue: Can't requeue job with no reservation")
})
}

func withServer(runner func(s *server.Server, cl *client.Client)) {
dir := "/tmp/requeue_test.db"
dir := fmt.Sprintf("/tmp/requeue_test_%d.db", rand.Int())
defer os.RemoveAll(dir)

opts := &cli.CliOptions{
CmdBinding: "localhost:7414",
CmdBinding: "localhost:7412",
Environment: "development",
ConfigDirectory: ".",
LogLevel: "debug",
Expand Down Expand Up @@ -124,7 +141,7 @@ func getClient() (*client.Client, error) {
client.RandomProcessWid = strconv.FormatInt(rand.Int63(), 32)

srv := client.DefaultServer()
srv.Address = "localhost:7414"
srv.Address = "localhost:7412"
cl, err := client.Dial(srv, "123456")
if err != nil {
return nil, err
Expand Down

0 comments on commit c1bd539

Please sign in to comment.