Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): retry strategy support on daemon containers, fixes #13705 #13738

Open
wants to merge 57 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
a929797
feat: add retry strategy support to daemon containers
Oct 7, 2024
f3ef097
feat: add retry strategy support to daemon containers
Oct 7, 2024
693f502
feat: daemon retry works, but node.IP doesn't get updated
Oct 10, 2024
2e1c501
feat: style is better
Oct 11, 2024
cbf168b
feat: now works, need to squash
Oct 12, 2024
a7faecc
feat: now works, need to squash
Oct 12, 2024
c0ad779
feat(docs): added daemon retry strategy examples for steps and dags
Oct 12, 2024
a5861f5
feat(docs): added daemon retry strategy examples for steps and dags
Oct 12, 2024
fa8ced4
fix(tests): func TestParametrizableLimit, exit code is int instead of…
Oct 13, 2024
b126f92
fix(controller): retry logic failed to retry nodes that failed fast e…
Oct 13, 2024
6f4efc7
fix(controller): retry logic failed to retry nodes that failed fast e…
Oct 13, 2024
0eb5824
fix(docs): codegen
Oct 13, 2024
880a261
fix(docs): codegen
Oct 13, 2024
2ef248d
fix(docs): codegen
Oct 13, 2024
53eca43
fix(docs): codegen
Oct 13, 2024
a0b2aa9
feat(controller): daemon retries now works on dag
Oct 13, 2024
12814d5
fix(docs): withSequence string instead of int
Oct 14, 2024
1e74858
fix(tests): test featured names not complying with RFC-1123
Oct 14, 2024
ad863f7
fix(tests): retry node will now be pending if the last retry child no…
Oct 14, 2024
d5480e7
fix(tests): retry node will now be pending if the last retry child no…
Oct 14, 2024
06f80ea
fix(tests): retry node will now be pending if the last retry child no…
Oct 14, 2024
63e868d
fix(controller): simplified if statement
Oct 14, 2024
fa12577
fix(tests): daemoned is unecessary in this test
Oct 14, 2024
67a087e
feat(tests): E2E test for daemon retry
Oct 18, 2024
c4da80e
feat(tests): E2E test for daemon retry
Oct 18, 2024
8ba2ffc
feat(tests): E2E test for daemon retry
Oct 18, 2024
b250d55
feat(tests): E2E test for daemon retry
Oct 18, 2024
9c8e638
feat(tests): E2E test for daemon retry
Oct 18, 2024
8e18791
feat(tests): E2E test for daemon retry
Oct 18, 2024
c83082a
feat(tests): E2E test for daemon retry
Oct 19, 2024
8cb45d3
feat(tests): E2E test for daemon retry
Oct 19, 2024
5bcd528
feat(tests): E2E test for daemon retry
Oct 19, 2024
874c688
feat(tests): E2E test for daemon retry
Oct 19, 2024
dcc036e
feat(tests): E2E test for daemon retry
Oct 19, 2024
72bd016
feat(tests): E2E test for daemon retry
Oct 19, 2024
f5645ee
feat(tests): E2E test for daemon retry
Oct 19, 2024
c0cce20
feat(tests): E2E test for daemon retry
Oct 19, 2024
d1bf8fd
fix(tests): debuggin daemon E2E
Oct 19, 2024
ea105b8
fix(tests): debuggin daemon E2E
Oct 19, 2024
c60697b
fix(tests): debuggin daemon E2E
Oct 19, 2024
fccfcdc
fix(tests): removed e2e test for now, because it only works locally
Oct 19, 2024
8292890
fix(tests): removed e2e test for now, because it only works locally
Oct 19, 2024
837c461
fix(tests): e2e test for daemon retry
Oct 19, 2024
551562f
fix(tests): e2e test for daemon retry
Oct 19, 2024
2bb5c64
fix(tests): e2e test for daemon retry
Oct 19, 2024
a76980f
fix(tests): e2e test for daemon retry
Oct 19, 2024
8278b72
fix(tests): e2e test for daemon retry
Oct 19, 2024
a5eba4d
fix(tests): e2e test for daemon retry
Oct 19, 2024
02171da
fix(tests): e2e test for daemon retry
Oct 19, 2024
8e30a71
fix(tests): e2e test for daemon retry
Oct 19, 2024
80ffbb5
fix(tests): lint does not allow yoda syntax
Oct 19, 2024
d7c602f
Merge branch 'main' into feat/daemon-retry-strategy
MenD32 Nov 2, 2024
6fb567d
Merge branch 'main' into feat/daemon-retry-strategy
MenD32 Nov 29, 2024
49063a3
Merge branch 'main' into feat/daemon-retry-strategy
MenD32 Dec 2, 2024
aa954af
Merge branch 'main' into feat/daemon-retry-strategy
MenD32 Dec 3, 2024
2695fdf
Merge branch 'main' into feat/daemon-retry-strategy
MenD32 Dec 8, 2024
ec06bdb
fix: fields.md
Dec 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions docs/fields.md

Large diffs are not rendered by default.

48 changes: 48 additions & 0 deletions examples/dag-daemon-retry-strategy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-daemon-retry-
spec:
entrypoint: main

templates:
- name: main
dag:
tasks:
- name: server
template: server
- name: client
depends: server
template: client
arguments:
parameters:
- name: server-ip
value: "{{tasks.server.ip}}"
withSequence:
count: "10"

- name: server
retryStrategy:
limit: "10"
daemon: true
container:
image: nginx:1.13
readinessProbe:
httpGet:
path: /
port: 80
initialDelaySeconds: 2
timeoutSeconds: 1

- name: client
inputs:
parameters:
- name: server-ip
synchronization:
mutex:
name: client-{{workflow.uid}}
container:
image: appropriate/curl:latest
command: ["/bin/sh", "-c"]
args: ["echo curl --silent -G http://{{inputs.parameters.server-ip}}:80/ && curl --silent -G http://{{inputs.parameters.server-ip}}:80/"]

46 changes: 46 additions & 0 deletions examples/steps-daemon-retry-strategy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-daemon-retry-
spec:
entrypoint: main

templates:
- name: main
steps:
- - name: server
template: server
- - name: client
template: client
arguments:
parameters:
- name: server-ip
value: "{{steps.server.ip}}"
withSequence:
count: "10"

- name: server
retryStrategy:
limit: "10"
daemon: true
container:
image: nginx:1.13
readinessProbe:
httpGet:
path: /
port: 80
initialDelaySeconds: 2
timeoutSeconds: 1

- name: client
inputs:
parameters:
- name: server-ip
synchronization:
mutex:
name: client-{{workflow.uid}}
container:
image: appropriate/curl:latest
command: ["/bin/sh", "-c"]
args: ["echo curl --silent -G http://{{inputs.parameters.server-ip}}:80/ && curl --silent -G http://{{inputs.parameters.server-ip}}:80/"]

47 changes: 47 additions & 0 deletions test/e2e/daemon_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
)

Expand Down Expand Up @@ -177,6 +179,51 @@ func (s *DaemonPodSuite) TestMarkDaemonedPodSucceeded() {
})
}

func (s *DaemonPodSuite) TestDaemonPodRetry() {
s.Given().
Workflow(`
metadata:
name: daemon-retry
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: daemoned
template: daemoned
- name: whale
dependencies: [daemoned]
template: whale-tmpl
- name: daemoned
retryStrategy:
limit: 2
daemon: true
container:
image: argoproj/argosay:v2
command: ["bash"]
args: ["-c", "sleep 10 && exit 1"]
- name: whale-tmpl
container:
image: argoproj/argosay:v2
command: ["bash"]
args: ["-c", "echo hi & sleep 15 && echo bye"]
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, md *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
failedNode := status.Nodes.FindByDisplayName("daemoned(0)")
succeededNode := status.Nodes.FindByDisplayName("daemoned(1)")
require.NotNil(t, failedNode)
require.NotNil(t, succeededNode)
assert.Equal(t, wfv1.NodeFailed, failedNode.Phase)
assert.Equal(t, wfv1.NodeSucceeded, succeededNode.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
})
}

func TestDaemonPodSuite(t *testing.T) {
suite.Run(t, new(DaemonPodSuite))
}
2 changes: 1 addition & 1 deletion test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ spec:
limit: "{{inputs.parameters.limit}}"
container:
image: argoproj/argosay:v2
args: [exit, 1]
args: [exit, "1"]
`).
When().
SubmitWorkflow().
Expand Down
13 changes: 12 additions & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,17 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
targetTasks = strings.Split(tmpl.DAG.Target, " ")
}

// pre-execute daemoned tasks
for _, task := range tmpl.DAG.Tasks {
taskNode := dagCtx.getTaskNode(task.Name)
if err != nil {
continue
}
if taskNode != nil && taskNode.IsDaemoned() {
woc.executeDAGTask(ctx, dagCtx, task.Name)
}
}

// kick off execution of each target task asynchronously
onExitCompleted := true
for _, taskName := range targetTasks {
Expand Down Expand Up @@ -429,7 +440,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
}
}

if node != nil && node.Fulfilled() {
if node != nil && node.Phase.Fulfilled() {
// Collect the completed task metrics
_, tmpl, _, tmplErr := dagCtx.tmplCtx.ResolveTemplate(task)
if tmplErr != nil {
Expand Down
12 changes: 6 additions & 6 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3465,8 +3465,8 @@ spec:
templates:
- name: linuxExitHandler
steps:
- - name: printExit
template: printExit
- - name: print-exit
template: print-exit
- container:
args:
- echo
Expand All @@ -3475,7 +3475,7 @@ spec:
- /argosay
image: argoproj/argosay:v2
name: ""
name: printExit
name: print-exit
- container:
args:
- echo
Expand Down Expand Up @@ -3593,7 +3593,7 @@ func TestRetryTypeDagTaskRunExitNodeAfterCompleted(t *testing.T) {
woc.operate(ctx)
nextDAGTaskNode := woc.wf.Status.Nodes.FindByDisplayName("dependencyTesting")
assert.NotNil(t, nextDAGTaskNode)
assert.Equal(t, wfv1.NodeRunning, nextDAGTaskNode.Phase)
assert.Equal(t, wfv1.NodePending, nextDAGTaskNode.Phase)
}

func TestDagParallelism(t *testing.T) {
Expand Down Expand Up @@ -3662,7 +3662,7 @@ func TestDagWftmplHookWithRetry(t *testing.T) {
assert.Equal(t, wfv1.NodeFailed, taskNode.Phase)
failHookRetryNode := woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure")
failHookChild0Node := woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure(0)")
assert.Equal(t, wfv1.NodeRunning, failHookRetryNode.Phase)
assert.Equal(t, wfv1.NodePending, failHookRetryNode.Phase)
assert.Equal(t, wfv1.NodePending, failHookChild0Node.Phase)

// onFailure retry hook(0) failed
Expand All @@ -3675,7 +3675,7 @@ func TestDagWftmplHookWithRetry(t *testing.T) {
failHookRetryNode = woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure")
failHookChild0Node = woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure(0)")
failHookChild1Node := woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure(1)")
assert.Equal(t, wfv1.NodeRunning, failHookRetryNode.Phase)
assert.Equal(t, wfv1.NodePending, failHookRetryNode.Phase)
assert.Equal(t, wfv1.NodeFailed, failHookChild0Node.Phase)
assert.Equal(t, wfv1.NodePending, failHookChild1Node.Phase)

Expand Down
68 changes: 52 additions & 16 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var (
// maxOperationTime is the maximum time a workflow operation is allowed to run
// for before requeuing the workflow onto the workqueue.
var (
maxOperationTime = envutil.LookupEnvDurationOr("MAX_OPERATION_TIME", 30*time.Second)
maxOperationTime = envutil.LookupEnvDurationOr("MAX_OPERATION_TIME", 1000*time.Second)
)

// failedNodeStatus is a subset of NodeStatus that is only used to Marshal certain fields into a JSON of failed nodes
Expand Down Expand Up @@ -955,7 +955,7 @@ func (woc *wfOperationCtx) requeue() {

// processNodeRetries updates the retry node state based on the child node state and the retry strategy and returns the node.
func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrategy wfv1.RetryStrategy, opts *executeTemplateOpts) (*wfv1.NodeStatus, bool, error) {
if node.Fulfilled() {
if node.Phase.Fulfilled() {
return node, true, nil
}

Expand All @@ -968,8 +968,16 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return node, true, nil
}

if !lastChildNode.Fulfilled() {
if lastChildNode.IsDaemoned() {
node.Daemoned = ptr.To(true)
}

if !lastChildNode.Phase.Fulfilled() {
// last child node is still running.
node = woc.markNodePhase(node.Name, lastChildNode.Phase)
if lastChildNode.IsDaemoned() { // markNodePhase doesn't pass the Daemoned field
node.Daemoned = ptr.To(true)
}
return node, true, nil
}

Expand Down Expand Up @@ -1072,7 +1080,7 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
}
woc.log.Infof("Retry Policy: %s (onFailed: %v, onError %v)", retryStrategy.RetryPolicyActual(), retryOnFailed, retryOnError)

if (lastChildNode.Phase == wfv1.NodeFailed && !retryOnFailed) || (lastChildNode.Phase == wfv1.NodeError && !retryOnError) {
if ((lastChildNode.Phase == wfv1.NodeFailed || lastChildNode.IsDaemoned() && (lastChildNode.Phase == wfv1.NodeSucceeded)) && !retryOnFailed) || (lastChildNode.Phase == wfv1.NodeError && !retryOnError) {
woc.log.Infof("Node not set to be retried after status: %s", lastChildNode.Phase)
return woc.markNodePhase(node.Name, lastChildNode.Phase, lastChildNode.Message), true, nil
}
Expand Down Expand Up @@ -1347,17 +1355,21 @@ func (woc *wfOperationCtx) assessNodeStatus(ctx context.Context, pod *apiv1.Pod,
woc.controller.metrics.ChangePodPending(ctx, new.Message, pod.ObjectMeta.Namespace)
}
case apiv1.PodSucceeded:
new.Phase = wfv1.NodeSucceeded
// if the pod is succeeded, we need to check if it is a daemoned step or not
// if it is daemoned, we need to mark it as failed, since daemon pods should run indefinitely
if tmpl.IsDaemon() {
woc.log.Debugf("Daemoned pod %s succeeded. Marking it as failed", pod.Name)
new.Phase = wfv1.NodeFailed
} else {
new.Phase = wfv1.NodeSucceeded
}

new.Daemoned = nil
case apiv1.PodFailed:
// ignore pod failure for daemoned steps
if tmpl != nil && tmpl.IsDaemon() {
new.Phase = wfv1.NodeSucceeded
} else {
new.Phase, new.Message = woc.inferFailedReason(pod, tmpl)
woc.log.WithField("displayName", old.DisplayName).WithField("templateName", wfutil.GetTemplateFromNode(*old)).
WithField("pod", pod.Name).Infof("Pod failed: %s", new.Message)
}
new.Phase, new.Message = woc.inferFailedReason(pod, tmpl)
woc.log.WithField("displayName", old.DisplayName).WithField("templateName", wfutil.GetTemplateFromNode(*old)).
WithField("pod", pod.Name).Infof("Pod failed: %s", new.Message)
new.Daemoned = nil
case apiv1.PodRunning:
// Daemons are a special case we need to understand the rules:
Expand Down Expand Up @@ -2104,7 +2116,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
childNodeIDs, lastChildNode := getChildNodeIdsAndLastRetriedNode(retryParentNode, woc.wf.Status.Nodes)

// The retry node might have completed by now.
if retryParentNode.Fulfilled() {
if retryParentNode.Fulfilled() && woc.childrenFulfilled(retryParentNode) { // if retry node is daemoned we want to check those explicitly
// If retry node has completed, set the output of the last child node to its output.
// Runtime parameters (e.g., `status`, `resourceDuration`) in the output will be used to emit metrics.
if lastChildNode != nil {
Expand Down Expand Up @@ -2137,7 +2149,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
}

var retryNum int
if lastChildNode != nil && !lastChildNode.Fulfilled() {
if lastChildNode != nil && !lastChildNode.Phase.Fulfilled() {
// Last child node is either still running, or in some cases the corresponding Pod hasn't even been
// created yet, for example if it exceeded the ResourceQuota
nodeName = lastChildNode.Name
Expand Down Expand Up @@ -2240,12 +2252,19 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
return node, err
}

if !retryNode.Fulfilled() && node.Fulfilled() { // if the retry child has completed we need to update outself
if !retryNode.Phase.Fulfilled() && node.Phase.Fulfilled() { // if the retry child has completed we need to update outself
retryNode, err = woc.executeTemplate(ctx, retryNodeName, orgTmpl, tmplCtx, args, opts)
if err != nil {
return woc.markNodeError(node.Name, err), err
}
}

if !node.Phase.Fulfilled() {
retryNode = woc.markNodePhase(retryNodeName, node.Phase)
if node.IsDaemoned() { // markNodePhase doesn't pass the Daemoned field
retryNode.Daemoned = ptr.To(true)
}
}
node = retryNode
}

Expand Down Expand Up @@ -2275,7 +2294,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
}

func (woc *wfOperationCtx) handleNodeFulfilled(ctx context.Context, nodeName string, node *wfv1.NodeStatus, processedTmpl *wfv1.Template) *wfv1.NodeStatus {
if node == nil || !node.Fulfilled() {
if node == nil || !node.Phase.Fulfilled() {
return nil
}

Expand Down Expand Up @@ -2449,6 +2468,23 @@ func (woc *wfOperationCtx) hasDaemonNodes() bool {
return false
}

// check if all of the nodes children are fulffilled
func (woc *wfOperationCtx) childrenFulfilled(node *wfv1.NodeStatus) bool {
if len(node.Children) == 0 {
return node.Fulfilled()
}
for _, childID := range node.Children {
childNode, err := woc.wf.Status.Nodes.Get(childID)
if err != nil {
continue
}
if !woc.childrenFulfilled(childNode) {
return false
}
}
return true
}

func (woc *wfOperationCtx) GetNodeTemplate(node *wfv1.NodeStatus) (*wfv1.Template, error) {
if node.TemplateRef != nil {
tmplCtx, err := woc.createTemplateContext(node.GetTemplateScope())
Expand Down
1 change: 0 additions & 1 deletion workflow/controller/operator_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,6 @@ spec:
- - name: hello2
template: whalesay
- name: whalesay
daemon: true
synchronization:
semaphore:
configMapKeyRef:
Expand Down
Loading
Loading