From c1bd539ed5337e0726479bf503fec3921e368d21 Mon Sep 17 00:00:00 2001 From: Noah Lackstein Date: Thu, 9 Feb 2023 12:53:29 -0800 Subject: [PATCH] Bug fixes (#30) --- batch/batch_test.go | 3 +- expire/expire.go | 1 + expire/expire_test.go | 147 ++++++++++++++++++++++++++++++++++++++++ metrics/metrics_test.go | 3 +- requeue/requeue.go | 6 ++ requeue/requeue_test.go | 23 ++++++- 6 files changed, 176 insertions(+), 7 deletions(-) create mode 100644 expire/expire_test.go diff --git a/batch/batch_test.go b/batch/batch_test.go index 3bd9bfe..bd9b78d 100644 --- a/batch/batch_test.go +++ b/batch/batch_test.go @@ -424,7 +424,6 @@ func withServer(batchSystem *BatchSubsystem, enabled bool, runner func(cl *clien defer os.RemoveAll(dir) opts := &cli.CliOptions{ CmdBinding: "localhost:7416", - WebBinding: "localhost:7420", Environment: "development", ConfigDirectory: ".", LogLevel: "debug", @@ -458,10 +457,10 @@ func withServer(batchSystem *BatchSubsystem, enabled bool, runner func(cl *clien }() cl, err := getClient() - defer cl.Close() if err != nil { panic(err) } + defer cl.Close() runner(cl) close(s.Stopper()) diff --git a/expire/expire.go b/expire/expire.go index ebe136d..27b41a4 100644 --- a/expire/expire.go +++ b/expire/expire.go @@ -66,6 +66,7 @@ func (e *ExpireSubsystem) parseExpiration(next func() error, ctx manager.Context return fmt.Errorf("expire: Could not parse expires_in: %w", err) } ctx.Job().SetExpiresIn(duration) + delete(ctx.Job().Custom, "expires_in") } return next() diff --git a/expire/expire_test.go b/expire/expire_test.go new file mode 100644 index 0000000..8cc6e52 --- /dev/null +++ b/expire/expire_test.go @@ -0,0 +1,147 @@ +package expire + +import ( + "fmt" + "math/rand" + "os" + "strconv" + "testing" + "time" + + "github.com/contribsys/faktory/cli" + "github.com/contribsys/faktory/client" + "github.com/contribsys/faktory/server" + "github.com/stretchr/testify/assert" +) + +func TestExpiresAt(t *testing.T) { + withServer(func(s *server.Server, cl *client.Client) { + // Push a job that has already expired + j1 := client.NewJob("JobOne", 1) + j1.SetExpiresAt(time.Now().Add(time.Minute * -1)) + err := cl.Push(j1) + assert.Nil(t, err) + + // Push a job that will expire + j2 := client.NewJob("JobOne", 1) + j2.SetExpiresAt(time.Now().Add(time.Minute * 1)) + err = cl.Push(j2) + assert.Nil(t, err) + + // The fetched job should be j2 because j1 is expired + job, err := cl.Fetch("default") + assert.Nil(t, err) + assert.Equal(t, j2.Jid, job.Jid) + + // Verify that the queue is empty + job, err = cl.Fetch("default") + assert.Nil(t, err) + assert.Nil(t, job) + }) +} + +func TestExpiresIn(t *testing.T) { + withServer(func(s *server.Server, cl *client.Client) { + // Push a job that has already expired + j1 := client.NewJob("JobOne", 1) + j1.SetExpiresIn(time.Minute * -1) + err := cl.Push(j1) + assert.Nil(t, err) + + // Push a job that will expire + j2 := client.NewJob("JobOne", 1) + j2.SetExpiresIn(time.Minute * 1) + err = cl.Push(j2) + assert.Nil(t, err) + + // The fetched job should be j2 because j1 is expired + job, err := cl.Fetch("default") + assert.Nil(t, err) + assert.Equal(t, j2.Jid, job.Jid) + + // Verify that the queue is empty + job, err = cl.Fetch("default") + assert.Nil(t, err) + assert.Nil(t, job) + }) +} + +// The `expires_in` value is converted to a timestamp and saved as `expires_at` +func TestRemovesExpiresIn(t *testing.T) { + withServer(func(s *server.Server, cl *client.Client) { + // Push a job that has already expired + j1 := client.NewJob("JobOne", 1) + j1.SetCustom("expires_in", (time.Duration(1) * time.Minute).String()) + err := cl.Push(j1) + assert.Nil(t, err) + + // The fetched job should not have `expires_in` set + job, err := cl.Fetch("default") + assert.Nil(t, err) + assert.Equal(t, j1.Jid, job.Jid) + jobExpIn, _ := job.GetCustom("expires_in") + assert.Nil(t, jobExpIn) + jobExpAt, _ := job.GetCustom("expires_at") + assert.NotNil(t, jobExpAt) + }) +} + +func withServer(runner func(s *server.Server, cl *client.Client)) { + dir := fmt.Sprintf("/tmp/expire_test_%d.db", rand.Int()) + defer os.RemoveAll(dir) + + opts := &cli.CliOptions{ + CmdBinding: "localhost:7414", + Environment: "development", + ConfigDirectory: ".", + LogLevel: "debug", + StorageDirectory: dir, + } + s, stopper, err := cli.BuildServer(opts) + if err != nil { + panic(err) + } + defer stopper() + + go cli.HandleSignals(s) + + err = s.Boot() + if err != nil { + panic(err) + } + s.Register(new(ExpireSubsystem)) + + go func() { + err := s.Run() + if err != nil { + panic(err) + } + }() + + cl, err := getClient() + if err != nil { + panic(err) + } + defer cl.Close() + + runner(s, cl) + close(s.Stopper()) + s.Stop(nil) +} + +func getClient() (*client.Client, error) { + // this is a worker process so we need to set the global WID before connecting + client.RandomProcessWid = strconv.FormatInt(rand.Int63(), 32) + + srv := client.DefaultServer() + srv.Address = "localhost:7414" + cl, err := client.Dial(srv, "123456") + if err != nil { + return nil, err + } + if _, err = cl.Beat(); err != nil { + return nil, err + } + + return cl, nil +} diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index 71d039a..0e3f4cb 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -244,11 +244,10 @@ func createJob(queue string, jobtype string, args ...interface{}) *client.Job { } func runSystem(configDir string, runner func(s *server.Server, cl *client.Client)) { - dir := "/tmp/batching_system.db" + dir := fmt.Sprintf("/tmp/metrics_test_%d.db", rand.Int()) defer os.RemoveAll(dir) opts := &cli.CliOptions{ CmdBinding: "localhost:7418", - WebBinding: "localhost:7420", Environment: "development", ConfigDirectory: configDir, LogLevel: "debug", diff --git a/requeue/requeue.go b/requeue/requeue.go index a62fc1c..11f1313 100644 --- a/requeue/requeue.go +++ b/requeue/requeue.go @@ -58,6 +58,12 @@ func (r *RequeueSubsystem) requeueCommand(c *server.Connection, s *server.Server _ = c.Error(cmd, err) return } + // s.Manager().Acknowledge() will return (nil, nil) if there's no in-memory + // reservation for the given jid. + if job == nil { + _ = c.Error(cmd, fmt.Errorf("requeue: Can't requeue job with no reservation")) + return + } q, err := s.Store().GetQueue(job.Queue) if err != nil { diff --git a/requeue/requeue_test.go b/requeue/requeue_test.go index 1ad0f48..3674f4d 100644 --- a/requeue/requeue_test.go +++ b/requeue/requeue_test.go @@ -76,12 +76,29 @@ func TestPushMiddleware(t *testing.T) { }) } +func TestNoReservation(t *testing.T) { + withServer(func(s *server.Server, cl *client.Client) { + j1 := client.NewJob("JobOne", 1) + err := cl.Push(j1) + assert.Nil(t, err) + + job, _ := cl.Fetch("default") + + // Acknowledge the job so that `manager.clearReservation` is called + s.Manager().Acknowledge(job.Jid) + + // Attempting to requeue should return an error + _, err = cl.Generic(fmt.Sprintf(`REQUEUE {"jid":%q}`, job.Jid)) + assert.ErrorContains(t, err, "requeue: Can't requeue job with no reservation") + }) +} + func withServer(runner func(s *server.Server, cl *client.Client)) { - dir := "/tmp/requeue_test.db" + dir := fmt.Sprintf("/tmp/requeue_test_%d.db", rand.Int()) defer os.RemoveAll(dir) opts := &cli.CliOptions{ - CmdBinding: "localhost:7414", + CmdBinding: "localhost:7412", Environment: "development", ConfigDirectory: ".", LogLevel: "debug", @@ -124,7 +141,7 @@ func getClient() (*client.Client, error) { client.RandomProcessWid = strconv.FormatInt(rand.Int63(), 32) srv := client.DefaultServer() - srv.Address = "localhost:7414" + srv.Address = "localhost:7412" cl, err := client.Dial(srv, "123456") if err != nil { return nil, err