From 4aa15843c65c8e039a9b4b512a9022fc9bf30245 Mon Sep 17 00:00:00 2001 From: Augustin Husson Date: Wed, 6 Mar 2024 16:09:10 +0100 Subject: [PATCH] [BREAKINGCHANGE] Support cron job syntax and change the actual WithCron method Signed-off-by: Augustin Husson --- app/app.go | 18 ++-- async/taskhelper/basic_runner.go | 100 ++++++++++++++++++++++ async/taskhelper/cron_runner.go | 96 ++++++++++++++++++++++ async/taskhelper/helper.go | 137 ++++++++++--------------------- async/taskhelper/helper_test.go | 8 +- go.mod | 1 + go.sum | 2 + 7 files changed, 259 insertions(+), 103 deletions(-) create mode 100644 async/taskhelper/basic_runner.go create mode 100644 async/taskhelper/cron_runner.go diff --git a/app/app.go b/app/app.go index 5645652..bf66b55 100644 --- a/app/app.go +++ b/app/app.go @@ -80,7 +80,7 @@ func init() { flag.StringVar(&addr, "web.listen-address", ":8080", "The address to listen on for HTTP requests, web interface and telemetry.") } -type cron struct { +type timerTask struct { task interface{} duration time.Duration } @@ -88,8 +88,8 @@ type cron struct { type Runner struct { // waitTimeout is the amount of time to wait before killing the application once it received a cancellation order. waitTimeout time.Duration - // cronTasks is the different task that are executed periodically. - cronTasks []cron + // timerTasks is the different task that are executed periodically. + timerTasks []timerTask // tasks is the different task that are executed asynchronously only once time. // for each task a async.TaskRunner will be created tasks []interface{} @@ -118,7 +118,7 @@ func (r *Runner) SetTimeout(timeout time.Duration) *Runner { return r } -// SetBanner is setting a string (ideally the logo of the project) that would be printed when the runner is started. +// SetBanner is setting a string (ideally the logo of the project) that would be printed when the runner is started. // Additionally, you can also print the Version, the BuildTime and the Commit. // You just have to add '%s' in your banner where you want to print each information (one '%s' per additional information). // If set, then the main header won't be printed. The main header is printing the Version, the Commit and the BuildTime. @@ -134,10 +134,10 @@ func (r *Runner) WithTasks(t ...interface{}) *Runner { return r } -// WithCronTasks is the way to add different tasks that will be executed periodically at the frequency defined with the duration. -func (r *Runner) WithCronTasks(duration time.Duration, t ...interface{}) *Runner { +// WithTimerTask is the way to add different tasks that will be executed periodically at the frequency defined with the duration. +func (r *Runner) WithTimerTask(duration time.Duration, t ...interface{}) *Runner { for _, ts := range t { - r.cronTasks = append(r.cronTasks, cron{ + r.timerTasks = append(r.timerTasks, timerTask{ task: ts, duration: duration, }) @@ -233,8 +233,8 @@ func (r *Runner) buildTask() { signalsListener := async.NewSignalListener(syscall.SIGINT, syscall.SIGTERM) r.tasks = append(r.tasks, signalsListener) - for _, c := range r.cronTasks { - if taskHelper, err := taskhelper.NewCron(c.task, c.duration); err != nil { + for _, c := range r.timerTasks { + if taskHelper, err := taskhelper.NewTick(c.task, c.duration); err != nil { logrus.WithError(err).Fatal("unable to create the taskhelper.Helper to handle a cron set") } else { r.helpers = append(r.helpers, taskHelper) diff --git a/async/taskhelper/basic_runner.go b/async/taskhelper/basic_runner.go new file mode 100644 index 0000000..05e42a7 --- /dev/null +++ b/async/taskhelper/basic_runner.go @@ -0,0 +1,100 @@ +// Copyright The Perses Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package taskhelper + +import ( + "context" + "fmt" + "time" + + "github.com/perses/common/async" + "github.com/sirupsen/logrus" +) + +type runner struct { + Helper + // interval is used when the runner is used as a Cron + interval time.Duration + // task can be a SimpleTask or a Task + task interface{} + isSimpleTask bool + done chan struct{} +} + +func (r *runner) Done() <-chan struct{} { + return r.done +} + +func (r *runner) String() string { + return r.task.(async.SimpleTask).String() +} + +func (r *runner) Start(ctx context.Context, cancelFunc context.CancelFunc) (err error) { + // closing this channel will highlight the caller that the task is done. + defer close(r.done) + childCtx := ctx + if !r.isSimpleTask { + // childCancelFunc will be used to stop any sub go-routing using the childCtx when the current task is stopped. + // it's just to be sure that every sub go-routing created by the task will be stopped without stopping the whole application. + var childCancelFunc context.CancelFunc + childCtx, childCancelFunc = context.WithCancel(ctx) + t := r.task.(async.Task) + // then we have to call the finalize method of the task + defer func() { + childCancelFunc() + if finalErr := t.Finalize(); finalErr != nil { + if err == nil { + err = finalErr + } else { + logrus.WithError(finalErr).Error("error occurred when calling the method Finalize of the task") + } + } + }() + + // and the initialize method + if initError := t.Initialize(); initError != nil { + err = fmt.Errorf("unable to call the initialize method of the task: %w", initError) + return + } + } + + // then run the task + if executeErr := r.task.(async.SimpleTask).Execute(childCtx, cancelFunc); executeErr != nil { + err = fmt.Errorf("unable to call the execute method of the task: %w", executeErr) + return + } + + // in case the runner has an interval properly set, then we can create a ticker and periodically call the method that executes the task + return r.tick(childCtx, cancelFunc) +} + +func (r *runner) tick(ctx context.Context, cancelFunc context.CancelFunc) error { + simpleTask := r.task.(async.SimpleTask) + if r.interval <= 0 { + return nil + } + ticker := time.NewTicker(r.interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if executeErr := simpleTask.Execute(ctx, cancelFunc); executeErr != nil { + return fmt.Errorf("unable to call the execute method of the task %s: %w", simpleTask.String(), executeErr) + } + case <-ctx.Done(): + logrus.Debugf("task %s has been canceled", simpleTask.String()) + return nil + } + } +} diff --git a/async/taskhelper/cron_runner.go b/async/taskhelper/cron_runner.go new file mode 100644 index 0000000..c8d247e --- /dev/null +++ b/async/taskhelper/cron_runner.go @@ -0,0 +1,96 @@ +// Copyright The Perses Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package taskhelper + +import ( + "context" + "fmt" + "time" + + "github.com/perses/common/async" + "github.com/robfig/cron" + "github.com/sirupsen/logrus" +) + +type cronRunner struct { + Helper + // schedule is used to now when calling the task + schedule cron.Schedule + // task can be a SimpleTask or a Task + task interface{} + isSimpleTask bool + done chan struct{} +} + +func (r *cronRunner) Done() <-chan struct{} { + return r.done +} + +func (r *cronRunner) String() string { + return r.task.(async.SimpleTask).String() +} + +func (r *cronRunner) Start(ctx context.Context, cancelFunc context.CancelFunc) (err error) { + // closing this channel will highlight the caller that the task is done. + defer close(r.done) + childCtx := ctx + if !r.isSimpleTask { + // childCancelFunc will be used to stop any sub go-routing using the childCtx when the current task is stopped. + // it's just to be sure that every sub go-routing created by the task will be stopped without stopping the whole application. + var childCancelFunc context.CancelFunc + childCtx, childCancelFunc = context.WithCancel(ctx) + t := r.task.(async.Task) + // then we have to call the finalize method of the task + defer func() { + childCancelFunc() + if finalErr := t.Finalize(); finalErr != nil { + if err == nil { + err = finalErr + } else { + logrus.WithError(finalErr).Error("error occurred when calling the method Finalize of the task") + } + } + }() + + // and the initialize method + if initError := t.Initialize(); initError != nil { + err = fmt.Errorf("unable to call the initialize method of the task: %w", initError) + return + } + } + return r.cron(childCtx, cancelFunc) +} + +func (r *cronRunner) cron(ctx context.Context, cancelFunc context.CancelFunc) error { + simpleTask := r.task.(async.SimpleTask) + now := time.Now() + next := r.schedule.Next(now) + for { + timer := time.NewTimer(next.Sub(now)) + for { + select { + case now = <-timer.C: + // then run the task + if executeErr := r.task.(async.SimpleTask).Execute(ctx, cancelFunc); executeErr != nil { + return fmt.Errorf("unable to call the execute method of the task: %w", executeErr) + } + next = r.schedule.Next(now) + case <-ctx.Done(): + logrus.Debugf("task %s has been canceled", simpleTask.String()) + return nil + } + break + } + } +} diff --git a/async/taskhelper/helper.go b/async/taskhelper/helper.go index d4151db..38fba39 100644 --- a/async/taskhelper/helper.go +++ b/async/taskhelper/helper.go @@ -22,6 +22,7 @@ import ( "time" "github.com/perses/common/async" + "github.com/robfig/cron" "github.com/sirupsen/logrus" ) @@ -35,14 +36,9 @@ type Helper interface { } func New(task interface{}) (Helper, error) { - isSimpleTask := true - switch task.(type) { - case async.Task: - isSimpleTask = false - case async.SimpleTask: - // just here as sanity check - default: - return nil, fmt.Errorf("task is not a SimpleTask or a Task") + isSimpleTask, err := isSimpleTask(task) + if err != nil { + return nil, err } return &runner{ interval: 0, @@ -52,20 +48,15 @@ func New(task interface{}) (Helper, error) { }, nil } -// NewCron is returning a Helper that will execute the task periodically. +// NewTick is returning a Helper that will execute the task periodically. // The task can be a SimpleTask or a Task. It returns an error if it's something different -func NewCron(task interface{}, interval time.Duration) (Helper, error) { +func NewTick(task interface{}, interval time.Duration) (Helper, error) { if interval <= 0 { return nil, fmt.Errorf("interval cannot be negative or equal to 0 when creating a cron") } - isSimpleTask := true - switch task.(type) { - case async.Task: - isSimpleTask = false - case async.SimpleTask: - // just here as sanity check - default: - return nil, fmt.Errorf("task is not a SimpleTask or a Task") + isSimpleTask, err := isSimpleTask(task) + if err != nil { + return nil, err } return &runner{ interval: interval, @@ -75,82 +66,31 @@ func NewCron(task interface{}, interval time.Duration) (Helper, error) { }, nil } -type runner struct { - Helper - // interval is used when the runner is used as a Cron - interval time.Duration - // task can be a SimpleTask or a Task - task interface{} - isSimpleTask bool - done chan struct{} -} - -func (r *runner) Done() <-chan struct{} { - return r.done -} - -func (r *runner) String() string { - return r.task.(async.SimpleTask).String() -} - -func (r *runner) Start(ctx context.Context, cancelFunc context.CancelFunc) (err error) { - // closing this channel will highlight the caller that the task is done. - defer close(r.done) - childCtx := ctx - if !r.isSimpleTask { - // childCancelFunc will be used to stop any sub go-routing using the childCtx when the current task is stopped. - // it's just to be sure that every sub go-routing created by the task will be stopped without stopping the whole application. - var childCancelFunc context.CancelFunc - childCtx, childCancelFunc = context.WithCancel(ctx) - t := r.task.(async.Task) - // then we have to call the finalise method of the task - defer func() { - childCancelFunc() - if finalErr := t.Finalize(); finalErr != nil { - if err == nil { - err = finalErr - } else { - logrus.WithError(finalErr).Error("error occurred when calling the method Finalize of the task") - } - } - }() - - // and the initialize method - if initError := t.Initialize(); initError != nil { - err = fmt.Errorf("unable to call the initialize method of the task: %w", initError) - return - } - } - - // then run the task - if executeErr := r.task.(async.SimpleTask).Execute(childCtx, cancelFunc); executeErr != nil { - err = fmt.Errorf("unable to call the execute method of the task: %w", executeErr) - return - } - - // in case the runner has an interval properly set, then we can create a ticker and call periodically the method execute of the task - return r.tick(childCtx, cancelFunc) -} - -func (r *runner) tick(ctx context.Context, cancelFunc context.CancelFunc) error { - simpleTask := r.task.(async.SimpleTask) - if r.interval <= 0 { - return nil +// NewCron is returning a Helper that will execute the task according to the schedule. +// cronSchedule is following the standard syntax described here: https://en.wikipedia.org/wiki/Cron. +// It also supports the predefined scheduling definitions: +// - @yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 0 1 1 * +// - @monthly | Run once a month, midnight, first of month | 0 0 0 1 * * +// - @weekly | Run once a week, midnight between Sat/Sun | 0 0 0 * * 0 +// - @daily (or @midnight) | Run once a day, midnight | 0 0 0 * * * +// - @hourly | Run once an hour, beginning of hour | 0 0 * * * * +// +// We are directly relying on what the library https://pkg.go.dev/github.com/robfig/cron is supporting. +func NewCron(task interface{}, cronSchedule string) (Helper, error) { + sch, err := cron.ParseStandard(cronSchedule) + if err != nil { + return nil, err } - - ticker := time.NewTicker(r.interval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if executeErr := simpleTask.Execute(ctx, cancelFunc); executeErr != nil { - return fmt.Errorf("unable to call the execute method of the task %s: %w", simpleTask.String(), executeErr) - } - case <-ctx.Done(): - logrus.Debugf("task %s has been canceled", simpleTask.String()) - return nil - } + isSimpleTask, err := isSimpleTask(task) + if err != nil { + return nil, err } + return &cronRunner{ + schedule: sch, + task: task, + isSimpleTask: isSimpleTask, + done: make(chan struct{}), + }, nil } // Run is executing in a go-routing the Helper that handles a unique task @@ -188,3 +128,16 @@ func waitAll(timeout time.Duration, helpers []Helper) { } waitGroup.Wait() } + +func isSimpleTask(task interface{}) (bool, error) { + result := true + switch task.(type) { + case async.Task: + result = false + case async.SimpleTask: + // just here as sanity check + default: + return false, fmt.Errorf("task is not a SimpleTask or a Task") + } + return result, nil +} diff --git a/async/taskhelper/helper_test.go b/async/taskhelper/helper_test.go index 262ba4b..31034cc 100644 --- a/async/taskhelper/helper_test.go +++ b/async/taskhelper/helper_test.go @@ -78,12 +78,16 @@ func TestJoinAll(t *testing.T) { t1, err := New(&simpleTaskImpl{}) assert.NoError(t, err) complexTask := &complexTaskImpl{} - t2, err := NewCron(complexTask, 5*time.Second) + anotherComplexTask := &complexTaskImpl{} + t2, err := NewTick(complexTask, 5*time.Second) + assert.NoError(t, err) + t3, err := NewCron(anotherComplexTask, "@hourly") assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) // start all runner Run(ctx, cancel, t1) Run(ctx, cancel, t2) - JoinAll(ctx, 30*time.Second, []Helper{t1, t2}) + Run(ctx, cancel, t3) + JoinAll(ctx, 30*time.Second, []Helper{t1, t2, t3}) assert.True(t, complexTask.counter >= 2) } diff --git a/go.mod b/go.mod index ce96952..97ddffc 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/nexucis/lamenv v0.5.2 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/common v0.45.0 + github.com/robfig/cron v1.2.0 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/otel v1.24.0 diff --git a/go.sum b/go.sum index 9d219f8..e779638 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=