Skip to content

Commit

Permalink
[BREAKINGCHANGE] Support cron job syntax and change the actual WithCr…
Browse files Browse the repository at this point in the history
…on method

Signed-off-by: Augustin Husson <[email protected]>
  • Loading branch information
Nexucis committed Mar 6, 2024
1 parent 0e7fc53 commit 4aa1584
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 103 deletions.
18 changes: 9 additions & 9 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ func init() {
flag.StringVar(&addr, "web.listen-address", ":8080", "The address to listen on for HTTP requests, web interface and telemetry.")
}

type cron struct {
type timerTask struct {
task interface{}
duration time.Duration
}

type Runner struct {
// waitTimeout is the amount of time to wait before killing the application once it received a cancellation order.
waitTimeout time.Duration
// cronTasks is the different task that are executed periodically.
cronTasks []cron
// timerTasks is the different task that are executed periodically.
timerTasks []timerTask
// tasks is the different task that are executed asynchronously only once time.
// for each task a async.TaskRunner will be created
tasks []interface{}
Expand Down Expand Up @@ -118,7 +118,7 @@ func (r *Runner) SetTimeout(timeout time.Duration) *Runner {
return r
}

// SetBanner is setting a string (ideally the logo of the project) that would be printed when the runner is started.
// SetBanner is setting a string (ideally the logo of the project) that would be printed when the runner is started.
// Additionally, you can also print the Version, the BuildTime and the Commit.
// You just have to add '%s' in your banner where you want to print each information (one '%s' per additional information).
// If set, then the main header won't be printed. The main header is printing the Version, the Commit and the BuildTime.
Expand All @@ -134,10 +134,10 @@ func (r *Runner) WithTasks(t ...interface{}) *Runner {
return r
}

// WithCronTasks is the way to add different tasks that will be executed periodically at the frequency defined with the duration.
func (r *Runner) WithCronTasks(duration time.Duration, t ...interface{}) *Runner {
// WithTimerTask is the way to add different tasks that will be executed periodically at the frequency defined with the duration.
func (r *Runner) WithTimerTask(duration time.Duration, t ...interface{}) *Runner {
for _, ts := range t {
r.cronTasks = append(r.cronTasks, cron{
r.timerTasks = append(r.timerTasks, timerTask{
task: ts,
duration: duration,
})
Expand Down Expand Up @@ -233,8 +233,8 @@ func (r *Runner) buildTask() {
signalsListener := async.NewSignalListener(syscall.SIGINT, syscall.SIGTERM)
r.tasks = append(r.tasks, signalsListener)

for _, c := range r.cronTasks {
if taskHelper, err := taskhelper.NewCron(c.task, c.duration); err != nil {
for _, c := range r.timerTasks {
if taskHelper, err := taskhelper.NewTick(c.task, c.duration); err != nil {
logrus.WithError(err).Fatal("unable to create the taskhelper.Helper to handle a cron set")
} else {
r.helpers = append(r.helpers, taskHelper)
Expand Down
100 changes: 100 additions & 0 deletions async/taskhelper/basic_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The Perses Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package taskhelper

import (
"context"
"fmt"
"time"

"github.com/perses/common/async"
"github.com/sirupsen/logrus"
)

type runner struct {
Helper
// interval is used when the runner is used as a Cron
interval time.Duration
// task can be a SimpleTask or a Task
task interface{}
isSimpleTask bool
done chan struct{}
}

func (r *runner) Done() <-chan struct{} {
return r.done
}

func (r *runner) String() string {
return r.task.(async.SimpleTask).String()
}

func (r *runner) Start(ctx context.Context, cancelFunc context.CancelFunc) (err error) {
// closing this channel will highlight the caller that the task is done.
defer close(r.done)
childCtx := ctx
if !r.isSimpleTask {
// childCancelFunc will be used to stop any sub go-routing using the childCtx when the current task is stopped.
// it's just to be sure that every sub go-routing created by the task will be stopped without stopping the whole application.
var childCancelFunc context.CancelFunc
childCtx, childCancelFunc = context.WithCancel(ctx)
t := r.task.(async.Task)
// then we have to call the finalize method of the task
defer func() {
childCancelFunc()
if finalErr := t.Finalize(); finalErr != nil {
if err == nil {
err = finalErr
} else {
logrus.WithError(finalErr).Error("error occurred when calling the method Finalize of the task")
}
}
}()

// and the initialize method
if initError := t.Initialize(); initError != nil {
err = fmt.Errorf("unable to call the initialize method of the task: %w", initError)
return
}
}

// then run the task
if executeErr := r.task.(async.SimpleTask).Execute(childCtx, cancelFunc); executeErr != nil {
err = fmt.Errorf("unable to call the execute method of the task: %w", executeErr)
return
}

// in case the runner has an interval properly set, then we can create a ticker and periodically call the method that executes the task
return r.tick(childCtx, cancelFunc)
}

func (r *runner) tick(ctx context.Context, cancelFunc context.CancelFunc) error {
simpleTask := r.task.(async.SimpleTask)
if r.interval <= 0 {
return nil
}
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if executeErr := simpleTask.Execute(ctx, cancelFunc); executeErr != nil {
return fmt.Errorf("unable to call the execute method of the task %s: %w", simpleTask.String(), executeErr)
}
case <-ctx.Done():
logrus.Debugf("task %s has been canceled", simpleTask.String())
return nil
}
}
}
96 changes: 96 additions & 0 deletions async/taskhelper/cron_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright The Perses Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package taskhelper

import (
"context"
"fmt"
"time"

"github.com/perses/common/async"
"github.com/robfig/cron"
"github.com/sirupsen/logrus"
)

type cronRunner struct {
Helper
// schedule is used to now when calling the task
schedule cron.Schedule
// task can be a SimpleTask or a Task
task interface{}
isSimpleTask bool
done chan struct{}
}

func (r *cronRunner) Done() <-chan struct{} {
return r.done
}

func (r *cronRunner) String() string {
return r.task.(async.SimpleTask).String()
}

func (r *cronRunner) Start(ctx context.Context, cancelFunc context.CancelFunc) (err error) {
// closing this channel will highlight the caller that the task is done.
defer close(r.done)
childCtx := ctx
if !r.isSimpleTask {
// childCancelFunc will be used to stop any sub go-routing using the childCtx when the current task is stopped.
// it's just to be sure that every sub go-routing created by the task will be stopped without stopping the whole application.
var childCancelFunc context.CancelFunc
childCtx, childCancelFunc = context.WithCancel(ctx)
t := r.task.(async.Task)
// then we have to call the finalize method of the task
defer func() {
childCancelFunc()
if finalErr := t.Finalize(); finalErr != nil {
if err == nil {
err = finalErr
} else {
logrus.WithError(finalErr).Error("error occurred when calling the method Finalize of the task")
}
}
}()

// and the initialize method
if initError := t.Initialize(); initError != nil {
err = fmt.Errorf("unable to call the initialize method of the task: %w", initError)
return
}
}
return r.cron(childCtx, cancelFunc)
}

func (r *cronRunner) cron(ctx context.Context, cancelFunc context.CancelFunc) error {
simpleTask := r.task.(async.SimpleTask)
now := time.Now()
next := r.schedule.Next(now)
for {
timer := time.NewTimer(next.Sub(now))
for {
select {
case now = <-timer.C:
// then run the task
if executeErr := r.task.(async.SimpleTask).Execute(ctx, cancelFunc); executeErr != nil {
return fmt.Errorf("unable to call the execute method of the task: %w", executeErr)
}
next = r.schedule.Next(now)
case <-ctx.Done():
logrus.Debugf("task %s has been canceled", simpleTask.String())
return nil
}
break
}
}
}
Loading

0 comments on commit 4aa1584

Please sign in to comment.