Skip to content

Commit

Permalink
issue-654: allow setting a stopTime for job. (go-co-op#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
Higan authored Jul 19, 2024
1 parent 256265f commit 3b2dcd8
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 0 deletions.
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var (
ErrWithMonitorNil = fmt.Errorf("gocron: WithMonitor: monitor must not be nil")
ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty")
ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past")
ErrWithStopDateTimePast = fmt.Errorf("gocron: WithStopDateTime: end must not be in the past")
ErrStartTimeLaterThanEndTime = fmt.Errorf("gocron: WithStartDateTime: start must not be later than end")
ErrStopTimeEarlierThanStartTime = fmt.Errorf("gocron: WithStopDateTime: end must not be earlier than start")
ErrWithStopTimeoutZeroOrNegative = fmt.Errorf("gocron: WithStopTimeout: timeout must be greater than 0")
)

Expand Down
4 changes: 4 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
default:
}

if j.stopTimeReached(e.clock.Now()) {
return
}

if e.elector != nil {
if err := e.elector.IsLeader(j.ctx); err != nil {
e.sendOutForRescheduling(&jIn)
Expand Down
38 changes: 38 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type internalJob struct {
limitRunsTo *limitRunsTo
startTime time.Time
startImmediately bool
stopTime time.Time
// event listeners
afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string)
Expand All @@ -60,6 +61,13 @@ func (j *internalJob) stop() {
j.cancel()
}

func (j *internalJob) stopTimeReached(now time.Time) bool {
if j.stopTime.IsZero() {
return false
}
return j.stopTime.Before(now)
}

// task stores the function and parameters
// that are actually run when the job is executed.
type task struct {
Expand Down Expand Up @@ -594,11 +602,41 @@ func WithStartDateTime(start time.Time) StartAtOption {
if start.IsZero() || start.Before(now) {
return ErrWithStartDateTimePast
}
if !j.stopTime.IsZero() && j.stopTime.Before(start) {
return ErrStartTimeLaterThanEndTime
}
j.startTime = start
return nil
}
}

// WithStopAt sets the option for stopping the job from running
// after the specified time.
func WithStopAt(option StopAtOption) JobOption {
return func(j *internalJob, now time.Time) error {
return option(j, now)
}
}

// StopAtOption defines options for stopping the job
type StopAtOption func(*internalJob, time.Time) error

// WithStopDateTime sets the final date & time after which the job should stop.
// This must be in the future and should be after the startTime (if specified).
// The job's final run may be at the stop time, but not after.
func WithStopDateTime(end time.Time) StopAtOption {
return func(j *internalJob, now time.Time) error {
if end.IsZero() || end.Before(now) {
return ErrWithStopDateTimePast
}
if end.Before(j.startTime) {
return ErrStopTimeEarlierThanStartTime
}
j.stopTime = end
return nil
}
}

// WithTags sets the tags for the job. Tags provide
// a way to identify jobs by a set of tags and remove
// multiple jobs by tag.
Expand Down
4 changes: 4 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
return
}

if j.stopTimeReached(s.now()) {
return
}

scheduleFrom := j.lastRun
if len(j.nextScheduled) > 0 {
// always grab the last element in the slice as that is the furthest
Expand Down
31 changes: 31 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ func TestScheduler_LongRunningJobs(t *testing.T) {
options []SchedulerOption
expectedRuns int
}{
{
"duration with stop time between executions",
durationCh,
DurationJob(
time.Millisecond * 500,
),
NewTask(
func() {
time.Sleep(1 * time.Second)
durationCh <- struct{}{}
}),
[]JobOption{WithStopAt(WithStopDateTime(time.Now().Add(time.Millisecond * 1100)))},
[]SchedulerOption{WithStopTimeout(time.Second * 2)},
2,
},
{
"duration",
durationCh,
Expand Down Expand Up @@ -755,6 +770,22 @@ func TestScheduler_NewJobErrors(t *testing.T) {
[]JobOption{WithStartAt(WithStartDateTime(time.Now().Add(-time.Second)))},
ErrWithStartDateTimePast,
},
{
"WithStartDateTime is later than the end",
DurationJob(
time.Second,
),
[]JobOption{WithStopAt(WithStopDateTime(time.Now().Add(time.Second))), WithStartAt(WithStartDateTime(time.Now().Add(time.Hour)))},
ErrStartTimeLaterThanEndTime,
},
{
"WithStopDateTime is earlier than the start",
DurationJob(
time.Second,
),
[]JobOption{WithStartAt(WithStartDateTime(time.Now().Add(time.Hour))), WithStopAt(WithStopDateTime(time.Now().Add(time.Second)))},
ErrStopTimeEarlierThanStartTime,
},
{
"oneTimeJob start at is zero",
OneTimeJob(OneTimeJobStartDateTime(time.Time{})),
Expand Down

0 comments on commit 3b2dcd8

Please sign in to comment.