Skip to content

Commit

Permalink
test: fix and optimize flaky cron tests. Partial fix for #10807 (#14024)
Browse files Browse the repository at this point in the history
  • Loading branch information
MasonM authored Dec 22, 2024
1 parent 1d85e68 commit 9d1efe9
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 121 deletions.
3 changes: 3 additions & 0 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,9 @@ spec:
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 2
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
podGC:
strategy: OnPodCompletion
Expand Down
137 changes: 64 additions & 73 deletions test/e2e/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"testing"
"time"

"github.com/argoproj/pkg/humanize"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -54,10 +52,10 @@ spec:
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 2
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand All @@ -67,11 +65,11 @@ spec:
image: argoproj/argosay:v2`).
When().
CreateCronWorkflow().
Wait(1 * time.Minute).
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.Equal(t, cronWf.Spec.GetScheduleWithTimezoneString(), cronWf.GetLatestSchedule())
assert.True(t, cronWf.Status.LastScheduledTime.Time.After(time.Now().Add(-1*time.Minute)))
assert.Greater(t, cronWf.Status.LastScheduledTime.Time, time.Now().Add(-1*time.Minute))
})
})
s.Run("TestBasicTimezone", func() {
Expand All @@ -97,6 +95,9 @@ metadata:
spec:
schedule: "%s"
timezone: "%s"
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
entrypoint: whalesay
templates:
Expand All @@ -105,11 +106,11 @@ spec:
image: argoproj/argosay:v2`, scheduleInTestTimezone, testTimezone)).
When().
CreateCronWorkflow().
Wait(1 * time.Minute).
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.Equal(t, cronWf.Spec.GetScheduleWithTimezoneString(), cronWf.GetLatestSchedule())
assert.True(t, cronWf.Status.LastScheduledTime.Time.After(time.Now().Add(-1*time.Minute)))
assert.Greater(t, cronWf.Status.LastScheduledTime.Time, time.Now().Add(-1*time.Minute))
})
})
s.Run("TestSuspend", func() {
Expand All @@ -124,10 +125,10 @@ spec:
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 2
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand Down Expand Up @@ -155,10 +156,10 @@ spec:
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 2
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand Down Expand Up @@ -186,10 +187,10 @@ spec:
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 2
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand All @@ -200,11 +201,12 @@ spec:
args: ["sleep", "300s"]`).
When().
CreateCronWorkflow().
Wait(2 * time.Minute).
WaitForWorkflow(fixtures.ToBeRunning).
Wait(time.Minute). // wait for next scheduled time to ensure it's skipped by concurrencyPolicy
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.Len(t, cronWf.Status.Active, 1)
assert.True(t, cronWf.Status.LastScheduledTime.Time.Before(time.Now().Add(-1*time.Minute)))
assert.Less(t, cronWf.Status.LastScheduledTime.Time, time.Now().Add(-1*time.Minute))
})
})
s.Run("TestBasicAllow", func() {
Expand All @@ -221,10 +223,10 @@ spec:
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 2
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand All @@ -235,7 +237,7 @@ spec:
args: ["sleep", "300s"]`).
When().
CreateCronWorkflow().
Wait(2 * time.Minute).
WaitForWorkflowListCount(3*time.Minute, 2).
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.Len(t, cronWf.Status.Active, 2)
Expand All @@ -253,10 +255,10 @@ spec:
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 2
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand All @@ -267,17 +269,17 @@ spec:
args: ["sleep", "300s"]`).
When().
CreateCronWorkflow().
Wait(2*time.Minute + 20*time.Second).
WaitForWorkflow(fixtures.ToBeRunning).
WaitForNewWorkflow(fixtures.ToBeRunning).
WaitForCronWorkflow().
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.Len(t, cronWf.Status.Active, 1)
require.NotNil(t, cronWf.Status.LastScheduledTime)
assert.True(t, cronWf.Status.LastScheduledTime.Time.After(time.Now().Add(-1*time.Minute)))
assert.Greater(t, cronWf.Status.LastScheduledTime.Time, time.Now().Add(-1*time.Minute))
})
})
s.Run("TestSuccessfulJobHistoryLimit", func() {
var listOptions v1.ListOptions
wfInformerListOptionsFunc(&listOptions, "test-cron-wf-succeed-1")
s.Given().
CronWorkflow(`apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
Expand All @@ -289,10 +291,10 @@ spec:
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand All @@ -302,16 +304,16 @@ spec:
image: argoproj/argosay:v2`).
When().
CreateCronWorkflow().
Wait(2*time.Minute+25*time.Second).
WaitForWorkflow(fixtures.ToBeSucceeded).
WaitForNewWorkflow(fixtures.ToBeRunning).
WaitForWorkflowListCount(2*time.Minute, 1).
Then().
ExpectWorkflowList(listOptions, func(t *testing.T, wfList *wfv1.WorkflowList) {
ExpectWorkflowListFromCronWorkflow(func(t *testing.T, wfList *wfv1.WorkflowList) {
assert.Len(t, wfList.Items, 1)
assert.True(t, wfList.Items[0].Status.FinishedAt.Time.After(time.Now().Add(-1*time.Minute)))
assert.Greater(t, wfList.Items[0].Status.FinishedAt.Time, time.Now().Add(-1*time.Minute))
})
})
s.Run("TestFailedJobHistoryLimit", func() {
var listOptions v1.ListOptions
wfInformerListOptionsFunc(&listOptions, "test-cron-wf-fail-1")
s.Given().
CronWorkflow(`apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
Expand All @@ -323,10 +325,10 @@ spec:
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 1
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand All @@ -337,11 +339,13 @@ spec:
args: ["exit", "1"]`).
When().
CreateCronWorkflow().
Wait(2*time.Minute+25*time.Second).
WaitForWorkflow(fixtures.ToBeFailed).
WaitForNewWorkflow(fixtures.ToBeRunning).
WaitForWorkflowListCount(2*time.Minute, 1).
Then().
ExpectWorkflowList(listOptions, func(t *testing.T, wfList *wfv1.WorkflowList) {
ExpectWorkflowListFromCronWorkflow(func(t *testing.T, wfList *wfv1.WorkflowList) {
assert.Len(t, wfList.Items, 1)
assert.True(t, wfList.Items[0].Status.FinishedAt.Time.After(time.Now().Add(-1*time.Minute)))
assert.Greater(t, wfList.Items[0].Status.FinishedAt.Time, time.Now().Add(-1*time.Minute))
})
})
s.Run("TestStoppingConditionWithSucceeded", func() {
Expand All @@ -358,10 +362,10 @@ spec:
failedJobsHistoryLimit: 2
stopStrategy:
expression: "cronworkflow.succeeded >= 1"
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand All @@ -372,7 +376,7 @@ spec:
command: [/argosay]`).
When().
CreateCronWorkflow().
Wait(2 * time.Minute). // wait to be scheduled 2 times, but only runs once
WaitForCronWorkflowCompleted(3 * time.Minute).
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.Equal(t, int64(0), cronWf.Status.Failed)
Expand All @@ -392,10 +396,10 @@ spec:
concurrencyPolicy: "Allow"
stopStrategy:
expression: "cronworkflow.failed >= 1"
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand All @@ -406,7 +410,7 @@ spec:
args: ["exit", "1"]`).
When().
CreateCronWorkflow().
Wait(2 * time.Minute). // wait to be scheduled 2 times, but only runs once
WaitForCronWorkflowCompleted(3 * time.Minute).
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.Equal(t, int64(0), cronWf.Status.Succeeded)
Expand All @@ -430,10 +434,10 @@ spec:
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 2
workflowMetadata:
labels:
workflows.argoproj.io/test: "true"
workflowSpec:
metadata:
labels:
workflows.argoproj.io/test: "true"
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
Expand All @@ -443,24 +447,21 @@ spec:
image: argoproj/argosay:v2`).
When().
CreateCronWorkflow().
Wait(1 * time.Minute).
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectCron(func(t *testing.T, cronWf *wfv1.CronWorkflow) {
assert.Equal(t, cronWf.Spec.GetScheduleWithTimezoneString(), cronWf.GetLatestSchedule())
assert.True(t, cronWf.Status.LastScheduledTime.Time.After(time.Now().Add(-1*time.Minute)))
assert.Greater(t, cronWf.Status.LastScheduledTime.Time, time.Now().Add(-1*time.Minute))
})
})
}

func wfInformerListOptionsFunc(options *v1.ListOptions, cronWfName string) {
options.LabelSelector = common.LabelKeyCronWorkflow + "=" + cronWfName
}

func (s *CronSuite) TestMalformedCronWorkflow() {
s.Given().
Exec("kubectl", []string{"apply", "-f", "testdata/malformed/malformed-cronworkflow.yaml"}, fixtures.NoError).
Exec("kubectl", []string{"apply", "-f", "testdata/wellformed/wellformed-cronworkflow.yaml"}, fixtures.NoError).
CronWorkflow("@testdata/wellformed/wellformed-cronworkflow.yaml").
When().
CreateCronWorkflow().
WaitForWorkflow(2*time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *wfv1.WorkflowStatus) {
Expand All @@ -479,15 +480,5 @@ func (s *CronSuite) TestMalformedCronWorkflow() {
}

func TestCronSuite(t *testing.T) {
// To ensure consistency, always start at the next 30 second mark
_, _, sec := time.Now().Clock()
var toWait time.Duration
if sec <= 30 {
toWait = time.Duration(30-sec) * time.Second
} else {
toWait = time.Duration(90-sec) * time.Second
}
log.Infof("Waiting %s to start", humanize.Duration(toWait))
time.Sleep(toWait)
suite.Run(t, new(CronSuite))
}
9 changes: 9 additions & 0 deletions test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ func (t *Then) ExpectCron(block func(t *testing.T, cronWf *wfv1.CronWorkflow)) *
return t
}

func (t *Then) ExpectWorkflowListFromCronWorkflow(block func(t *testing.T, wfList *wfv1.WorkflowList)) *Then {
t.t.Helper()
if t.cronWf == nil {
t.t.Fatal("No cron workflow to match against")
}
labelSelector := common.LabelKeyCronWorkflow + "=" + t.cronWf.Name
return t.ExpectWorkflowList(metav1.ListOptions{LabelSelector: labelSelector}, block)
}

func (t *Then) ExpectWorkflowList(listOptions metav1.ListOptions, block func(t *testing.T, wfList *wfv1.WorkflowList)) *Then {
t.t.Helper()
_, _ = fmt.Println("Listing workflows")
Expand Down
Loading

0 comments on commit 9d1efe9

Please sign in to comment.