Skip to content

Commit

Permalink
Add support for expiring jobs (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
lackstein authored Oct 28, 2022
1 parent 32e6055 commit 175eed9
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 18 deletions.
39 changes: 21 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Specify the host with `DD_AGENT_HOST`

Enables enqueueing of jobs with a cron format

### Configuration
#### Configuration

```
[cron_plugin]
Expand Down Expand Up @@ -94,29 +94,28 @@ Each `[[ cron ]]` entry must have:

`type`: Faktory Job type to be queued

### Cron expressions

Field name | Mandatory? | Allowed values | Allowed special characters
---------- | ---------- | -------------- | --------------------------
Seconds | Yes | 0-59 | * / , -
Minutes | Yes | 0-59 | * / , -
Hours | Yes | 0-23 | * / , -
Day of month | Yes | 1-31 | * / , - ?
Month | Yes | 1-12 or JAN-DEC | * / , -
Day of week | Yes | 0-6 or SUN-SAT | * / , - ?
#### Cron expressions

| Field name | Mandatory? | Allowed values | Allowed special characters |
| ------------ | ---------- | --------------- | -------------------------- |
| Seconds | Yes | 0-59 | \* / , - |
| Minutes | Yes | 0-59 | \* / , - |
| Hours | Yes | 0-23 | \* / , - |
| Day of month | Yes | 1-31 | \* / , - ? |
| Month | Yes | 1-12 or JAN-DEC | \* / , - |
| Day of week | Yes | 0-6 or SUN-SAT | \* / , - ? |

Predefined schedules

You may use one of several pre-defined schedules in place of a cron expression.

Entry | Description | Equivalent To
----- | ----------- | -------------
@yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 0 1 1 *
@monthly | Run once a month, midnight, first of month | 0 0 0 1 * *
@weekly | Run once a week, midnight between Sat/Sun | 0 0 0 * * 0
@daily (or @midnight) | Run once a day, midnight | 0 0 0 * * *
@hourly | Run once an hour, beginning of hour | 0 0 * * * *
| Entry | Description | Equivalent To |
| ---------------------- | ------------------------------------------ | --------------- |
| @yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 0 1 1 \* |
| @monthly | Run once a month, midnight, first of month | 0 0 0 1 \* \* |
| @weekly | Run once a week, midnight between Sat/Sun | 0 0 0 \* \* 0 |
| @daily (or @midnight) | Run once a day, midnight | 0 0 0 \* \* \* |
| @hourly | Run once an hour, beginning of hour | 0 0 \* \* \* \* |

Intervals

Expand All @@ -130,6 +129,10 @@ Passing job arguments

arguments for jobs can be passed using `args = [1,2,3]` or using `[[cron.job.args]]` and adding key, pair values.

### Expiring Jobs

Jobs can be configured to automatically expire after a preset time has passed by setting the `expires_at` custom attribute to an ISO8601 timestamp. Alternatively, you can set the `expires_in` custom attribute to a Golang Duration string to expire a job a given duration after it was enqueued.

## License

All code in this repository is licensed under the AGPL.
2 changes: 2 additions & 0 deletions cmd/faktory/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/contribsys/faktory/webui"
"github.com/fossas/faktory-plugins/batch"
"github.com/fossas/faktory-plugins/cron"
"github.com/fossas/faktory-plugins/expire"
"github.com/fossas/faktory-plugins/metrics"
"github.com/fossas/faktory-plugins/uniq"
)
Expand Down Expand Up @@ -56,6 +57,7 @@ func main() {
s.Register(new(metrics.MetricsSubsystem))
s.Register(new(cron.CronSubsystem))
s.Register(new(batch.BatchSubsystem))
s.Register(new(expire.ExpireSubsystem))
go cli.HandleSignals(s)

go func() {
Expand Down
91 changes: 91 additions & 0 deletions expire/expire.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package expire

import (
"errors"
"fmt"
"time"

"github.com/contribsys/faktory/manager"
"github.com/contribsys/faktory/server"
"github.com/contribsys/faktory/util"
)

var _ server.Subsystem = &ExpireSubsystem{}

// ExpireSubsystem allows jobs to expire.
// It's implementation is similar to that of https://github.com/contribsys/faktory/wiki/Ent-Expiring-Jobs
// except that `expires_in` takes a duration string rather than seconds.
type ExpireSubsystem struct{}

// Start loads the plugin by installing the middlewares.
func (e *ExpireSubsystem) Start(s *server.Server) error {
s.Manager().AddMiddleware("push", e.parseExpiration)
s.Manager().AddMiddleware("fetch", e.skipExpiredJobs)
util.Info("Loaded expiring jobs plugin")
return nil
}

// Name returns the name of the plugin.
func (e *ExpireSubsystem) Name() string {
return "ExpiringJobs"
}

// Reload does not do anything.
func (e *ExpireSubsystem) Reload(s *server.Server) error {
return nil
}

// parseExpiration ensures that `expires_at` and `expires_in` are valid.
// If `expires_in` is set it will be parsed and used to set `expires_at`.
func (e *ExpireSubsystem) parseExpiration(next func() error, ctx manager.Context) error {
// If `expires_at` is set then validate that it's a proper ISO 8601 timestamp.
ea, aok := ctx.Job().GetCustom("expires_at")
if aok {
expires_at, ok := ea.(string)
if !ok {
return errors.New("expire: Could not cast expires_at")
}
if _, err := time.Parse(time.RFC3339Nano, expires_at); err != nil {
return fmt.Errorf("expire: Could not parse expires_at: %w", err)
}
}

// Set `expires_at` if `expires_in` is set
if ei, iok := ctx.Job().GetCustom("expires_in"); iok {
// Error out if `expires_at` is already set.
if aok {
return errors.New("expire: Can not queue job with both expires_at and expires_in set")
}

expires_in, ok := ei.(string)
if !ok {
return errors.New("expire: Could not cast expires_in")
}
duration, err := time.ParseDuration(expires_in)
if err != nil {
return fmt.Errorf("expire: Could not parse expires_in: %w", err)
}
ctx.Job().SetExpiresIn(duration)
}

return next()
}

// skipExpiredJobs is a FETCH chain middleware that ensures a job is not expired.
// If the job is expired it will return an error that instructs Faktory to
// restart the fetch.
func (e *ExpireSubsystem) skipExpiredJobs(next func() error, ctx manager.Context) error {
if ea, ok := ctx.Job().GetCustom("expires_at"); ok {
expires_at, err := time.Parse(time.RFC3339Nano, ea.(string))
if err != nil {
util.Warnf("expire: error parsing expires_at: %v", err)
return next()
}

if time.Now().After(expires_at) {
return manager.Discard("job expired")
}
}

return next()
}

0 comments on commit 175eed9

Please sign in to comment.