diff --git a/README.md b/README.md index 8577500..c228b64 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ Specify the host with `DD_AGENT_HOST` Enables enqueueing of jobs with a cron format -### Configuration +#### Configuration ``` [cron_plugin] @@ -94,29 +94,28 @@ Each `[[ cron ]]` entry must have: `type`: Faktory Job type to be queued -### Cron expressions - - Field name | Mandatory? | Allowed values | Allowed special characters ----------- | ---------- | -------------- | -------------------------- -Seconds | Yes | 0-59 | * / , - -Minutes | Yes | 0-59 | * / , - -Hours | Yes | 0-23 | * / , - -Day of month | Yes | 1-31 | * / , - ? -Month | Yes | 1-12 or JAN-DEC | * / , - -Day of week | Yes | 0-6 or SUN-SAT | * / , - ? +#### Cron expressions +| Field name | Mandatory? | Allowed values | Allowed special characters | +| ------------ | ---------- | --------------- | -------------------------- | +| Seconds | Yes | 0-59 | \* / , - | +| Minutes | Yes | 0-59 | \* / , - | +| Hours | Yes | 0-23 | \* / , - | +| Day of month | Yes | 1-31 | \* / , - ? | +| Month | Yes | 1-12 or JAN-DEC | \* / , - | +| Day of week | Yes | 0-6 or SUN-SAT | \* / , - ? | Predefined schedules You may use one of several pre-defined schedules in place of a cron expression. -Entry | Description | Equivalent To ------ | ----------- | ------------- -@yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 0 1 1 * -@monthly | Run once a month, midnight, first of month | 0 0 0 1 * * -@weekly | Run once a week, midnight between Sat/Sun | 0 0 0 * * 0 -@daily (or @midnight) | Run once a day, midnight | 0 0 0 * * * -@hourly | Run once an hour, beginning of hour | 0 0 * * * * +| Entry | Description | Equivalent To | +| ---------------------- | ------------------------------------------ | --------------- | +| @yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 0 1 1 \* | +| @monthly | Run once a month, midnight, first of month | 0 0 0 1 \* \* | +| @weekly | Run once a week, midnight between Sat/Sun | 0 0 0 \* \* 0 | +| @daily (or @midnight) | Run once a day, midnight | 0 0 0 \* \* \* | +| @hourly | Run once an hour, beginning of hour | 0 0 \* \* \* \* | Intervals @@ -130,6 +129,10 @@ Passing job arguments arguments for jobs can be passed using `args = [1,2,3]` or using `[[cron.job.args]]` and adding key, pair values. +### Expiring Jobs + +Jobs can be configured to automatically expire after a preset time has passed by setting the `expires_at` custom attribute to an ISO8601 timestamp. Alternatively, you can set the `expires_in` custom attribute to a Golang Duration string to expire a job a given duration after it was enqueued. + ## License All code in this repository is licensed under the AGPL. diff --git a/cmd/faktory/daemon.go b/cmd/faktory/daemon.go index 773fe7c..76878dd 100644 --- a/cmd/faktory/daemon.go +++ b/cmd/faktory/daemon.go @@ -11,6 +11,7 @@ import ( "github.com/contribsys/faktory/webui" "github.com/fossas/faktory-plugins/batch" "github.com/fossas/faktory-plugins/cron" + "github.com/fossas/faktory-plugins/expire" "github.com/fossas/faktory-plugins/metrics" "github.com/fossas/faktory-plugins/uniq" ) @@ -56,6 +57,7 @@ func main() { s.Register(new(metrics.MetricsSubsystem)) s.Register(new(cron.CronSubsystem)) s.Register(new(batch.BatchSubsystem)) + s.Register(new(expire.ExpireSubsystem)) go cli.HandleSignals(s) go func() { diff --git a/expire/expire.go b/expire/expire.go new file mode 100644 index 0000000..ebe136d --- /dev/null +++ b/expire/expire.go @@ -0,0 +1,91 @@ +package expire + +import ( + "errors" + "fmt" + "time" + + "github.com/contribsys/faktory/manager" + "github.com/contribsys/faktory/server" + "github.com/contribsys/faktory/util" +) + +var _ server.Subsystem = &ExpireSubsystem{} + +// ExpireSubsystem allows jobs to expire. +// It's implementation is similar to that of https://github.com/contribsys/faktory/wiki/Ent-Expiring-Jobs +// except that `expires_in` takes a duration string rather than seconds. +type ExpireSubsystem struct{} + +// Start loads the plugin by installing the middlewares. +func (e *ExpireSubsystem) Start(s *server.Server) error { + s.Manager().AddMiddleware("push", e.parseExpiration) + s.Manager().AddMiddleware("fetch", e.skipExpiredJobs) + util.Info("Loaded expiring jobs plugin") + return nil +} + +// Name returns the name of the plugin. +func (e *ExpireSubsystem) Name() string { + return "ExpiringJobs" +} + +// Reload does not do anything. +func (e *ExpireSubsystem) Reload(s *server.Server) error { + return nil +} + +// parseExpiration ensures that `expires_at` and `expires_in` are valid. +// If `expires_in` is set it will be parsed and used to set `expires_at`. +func (e *ExpireSubsystem) parseExpiration(next func() error, ctx manager.Context) error { + // If `expires_at` is set then validate that it's a proper ISO 8601 timestamp. + ea, aok := ctx.Job().GetCustom("expires_at") + if aok { + expires_at, ok := ea.(string) + if !ok { + return errors.New("expire: Could not cast expires_at") + } + if _, err := time.Parse(time.RFC3339Nano, expires_at); err != nil { + return fmt.Errorf("expire: Could not parse expires_at: %w", err) + } + } + + // Set `expires_at` if `expires_in` is set + if ei, iok := ctx.Job().GetCustom("expires_in"); iok { + // Error out if `expires_at` is already set. + if aok { + return errors.New("expire: Can not queue job with both expires_at and expires_in set") + } + + expires_in, ok := ei.(string) + if !ok { + return errors.New("expire: Could not cast expires_in") + } + duration, err := time.ParseDuration(expires_in) + if err != nil { + return fmt.Errorf("expire: Could not parse expires_in: %w", err) + } + ctx.Job().SetExpiresIn(duration) + } + + return next() +} + +// skipExpiredJobs is a FETCH chain middleware that ensures a job is not expired. +// If the job is expired it will return an error that instructs Faktory to +// restart the fetch. +func (e *ExpireSubsystem) skipExpiredJobs(next func() error, ctx manager.Context) error { + if ea, ok := ctx.Job().GetCustom("expires_at"); ok { + expires_at, err := time.Parse(time.RFC3339Nano, ea.(string)) + if err != nil { + util.Warnf("expire: error parsing expires_at: %v", err) + return next() + } + + if time.Now().After(expires_at) { + return manager.Discard("job expired") + } + } + + return next() +}