Skip to content

Commit

Permalink
fix: CronWorkflows startingDeadlineSeconds with timezone fix
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel committed Oct 21, 2024
1 parent ea6dae9 commit 561bbda
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 27 deletions.
24 changes: 5 additions & 19 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,27 +320,13 @@ func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun(ctx context.Conte
}
// If this CronWorkflow has been run before, check if we have missed any scheduled executions
if woc.cronWf.Status.LastScheduledTime != nil {
for _, schedule := range woc.cronWf.Spec.GetSchedules(ctx) {
for _, schedule := range woc.cronWf.Spec.GetSchedulesWithTimezone(ctx) {
var now time.Time
var cronSchedule cron.Schedule
if woc.cronWf.Spec.Timezone != "" {
loc, err := time.LoadLocation(woc.cronWf.Spec.Timezone)
if err != nil {
return time.Time{}, fmt.Errorf("invalid timezone '%s': %s", woc.cronWf.Spec.Timezone, err)
}
now = time.Now().In(loc)

cronSchedule, err = cron.ParseStandard(schedule)
if err != nil {
return time.Time{}, fmt.Errorf("unable to form timezone schedule '%s': %s", schedule, err)
}
} else {
var err error
now = time.Now()
cronSchedule, err = cron.ParseStandard(schedule)
if err != nil {
return time.Time{}, err
}
now = time.Now()
cronSchedule, err := cron.ParseStandard(schedule)
if err != nil {
return time.Time{}, err
}

var missedExecutionTime time.Time
Expand Down
91 changes: 83 additions & 8 deletions workflow/cron/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cron

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -10,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake"
Expand Down Expand Up @@ -72,8 +74,7 @@ func TestRunOutstandingWorkflows(t *testing.T) {

cronWf.Status.LastScheduledTime = &v1.Time{Time: time.Now().Add(-1 * time.Minute)}
// StartingDeadlineSeconds is after the current second, so cron should be run
startingDeadlineSeconds := int64(35)
cronWf.Spec.StartingDeadlineSeconds = &startingDeadlineSeconds
cronWf.Spec.StartingDeadlineSeconds = ptr.To(int64(35))
woc := &cronWfOperationCtx{
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
Expand All @@ -85,8 +86,7 @@ func TestRunOutstandingWorkflows(t *testing.T) {
assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix())

// StartingDeadlineSeconds is not after the current second, so cron should not be run
startingDeadlineSeconds = int64(25)
cronWf.Spec.StartingDeadlineSeconds = &startingDeadlineSeconds
cronWf.Spec.StartingDeadlineSeconds = ptr.To(int64(25))
woc = &cronWfOperationCtx{
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
Expand All @@ -112,8 +112,7 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf.Status.LastScheduledTime = &v1.Time{Time: cronWf.Status.LastScheduledTime.In(testLocation)}

// StartingDeadlineSeconds is after the current second, so cron should be run
startingDeadlineSeconds = int64(35)
cronWf.Spec.StartingDeadlineSeconds = &startingDeadlineSeconds
cronWf.Spec.StartingDeadlineSeconds = ptr.To(int64(35))
woc = &cronWfOperationCtx{
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
Expand All @@ -126,8 +125,7 @@ func TestRunOutstandingWorkflows(t *testing.T) {
assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix())

// StartingDeadlineSeconds is not after the current second, so cron should not be run
startingDeadlineSeconds = int64(25)
cronWf.Spec.StartingDeadlineSeconds = &startingDeadlineSeconds
cronWf.Spec.StartingDeadlineSeconds = ptr.To(int64(25))
woc = &cronWfOperationCtx{
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
Expand All @@ -144,6 +142,83 @@ func TestRunOutstandingWorkflows(t *testing.T) {
assert.True(t, missedExecutionTime.IsZero())
}

func getCWFShouldJustHaveStarted(locationStr string, loc *time.Location) v1alpha1.CronWorkflow {
oneMinuteAgo := time.Now().Add(-1 * time.Minute).In(loc)
cwf := fmt.Sprintf(`apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: start
spec:
schedules:
- "%d %d * * *"
timezone: "%s"
startingDeadlineSeconds: 120
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: argoproj/argosay:v2
status:
lastScheduledTime: "2020-02-28T19:05:00Z"`,
oneMinuteAgo.Minute(),
oneMinuteAgo.Hour(),
locationStr,
)
fmt.Printf("%s\n", cwf)
var cronWf v1alpha1.CronWorkflow
v1alpha1.MustUnmarshal([]byte(cwf), &cronWf)
return cronWf
}

func TestRunOutstandingWorkflowsAcrossTimezones(t *testing.T) {
// To ensure consistency, always start at the next 30 second mark
_, _, sec := time.Now().Clock()
ctx := context.Background()
var toWait time.Duration
if sec <= 30 {
toWait = time.Duration(30-sec) * time.Second
} else {
toWait = time.Duration(90-sec) * time.Second
}
logrus.Infof("Waiting %s to start", humanize.Duration(toWait))
time.Sleep(toWait)

const testLocation = "Pacific/Auckland"
locAuckland, err := time.LoadLocation(testLocation)
require.NoError(t, err)
cronWf := getCWFShouldJustHaveStarted(testLocation, locAuckland)
// Second value at runtime should be 30-31

cronWf.Status.LastScheduledTime = &v1.Time{Time: time.Now().Add(-24*time.Hour + -1*time.Minute)}
woc := &cronWfOperationCtx{
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleWithTimezoneString())
missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun(ctx)
require.NoError(t, err)
// The missedExecutionTime should be the current complete minute mark, which we can get with inferScheduledTime
assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix()+60)

// We are assuming local time is not Auckland here
locHere := time.Now().Local().Location()
assert.NotEqual(t, locHere, locAuckland, "If you are in New Zealand and this test fails you'll need to modify the test, it's not a real failure")
cronWf = getCWFShouldJustHaveStarted(testLocation, locHere)
// Second value at runtime should be 30-31

cronWf.Status.LastScheduledTime = &v1.Time{Time: time.Now().Add(-24*time.Hour + -1*time.Minute)}
woc = &cronWfOperationCtx{
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleWithTimezoneString())
missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun(ctx)
require.NoError(t, err)
// We're outside the window for execution now
assert.True(t, missedExecutionTime.IsZero())
}

type fakeLister struct{}

func (f fakeLister) List() ([]*v1alpha1.Workflow, error) {
Expand Down

0 comments on commit 561bbda

Please sign in to comment.