Skip to content

Commit

Permalink
fix(backend): Use an Argo Workflow exit lifecycle hook for exit handlers
Browse files Browse the repository at this point in the history
As described in #10917, exit handlers were implemented as dependent
tasks that always ran within an Argo Workflow. The issue is that this
caused the pipeline to have a succeeded status regardless of if the
tasks within the exit handlers all succeeded.

This commit changes exit handlers to be exit lifecycle hooks on an
Argo Workflow so that the overall pipeline status is not impacted.

Resolves:
#11405

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl committed Dec 17, 2024
1 parent cb07619 commit eee3e56
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ jobs:
run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name sequential --results-gcs-dir output

- name: Basic sample tests - exit_handler
run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --results-gcs-dir output
run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --expected-result failed --results-gcs-dir output

- name: Collect test results
if: always()
Expand Down
27 changes: 26 additions & 1 deletion backend/src/v2/compiler/argocompiler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package argocompiler

import k8score "k8s.io/api/core/v1"
import (
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
k8score "k8s.io/api/core/v1"
)

// env vars in metadata-grpc-configmap is defined in component package
var metadataConfigIsOptional bool = true
Expand Down Expand Up @@ -42,3 +45,25 @@ var commonEnvs = []k8score.EnvVar{{
},
},
}}

// addExitTask adds an exit lifecycle hook to a task if exitTemplate is not empty.
func addExitTask(task *wfapi.DAGTask, exitTemplate string, parentDagID string) {
if exitTemplate == "" {
return
}

var args wfapi.Arguments

if parentDagID != "" {
args = wfapi.Arguments{Parameters: []wfapi.Parameter{
{Name: paramParentDagID, Value: wfapi.AnyStringPtr(parentDagID)},
}}
}

task.Hooks = wfapi.LifecycleHooks{
wfapi.ExitLifecycleEvent: wfapi.LifecycleHook{
Template: exitTemplate,
Arguments: args,
},
}
}
11 changes: 10 additions & 1 deletion backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ type containerExecutorInputs struct {
cachedDecision string
// if false, the container will be skipped.
condition string
// if provided, this will be the template the Argo Workflow exit lifecycle hook will execute.
exitTemplate string
// if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit
// lifecycle hook.
hookParentDagID string
}

// containerExecutorTask returns an argo workflows DAGTask.
Expand All @@ -192,7 +197,7 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx
if inputs.condition != "" {
when = inputs.condition + " != false"
}
return &wfapi.DAGTask{
task := &wfapi.DAGTask{
Name: name,
Template: c.addContainerExecutorTemplate(refName),
When: when,
Expand All @@ -203,6 +208,10 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx
},
},
}

addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID)

return task
}

// addContainerExecutorTemplate adds a generic container executor template for
Expand Down
141 changes: 98 additions & 43 deletions backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
if err != nil {
return err
}
dag := &wfapi.Template{
Name: c.templateName(name),
Inputs: wfapi.Inputs{
Parameters: []wfapi.Parameter{
{Name: paramParentDagID},
},
},
DAG: &wfapi.DAGTemplate{},
}
tasks := dagSpec.GetTasks()
// Iterate through tasks in deterministic order to facilitate testing.
// Note, order doesn't affect compiler with real effect right now.
Expand All @@ -58,6 +49,10 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
keys = append(keys, key)
}
sort.Strings(keys)

taskToExitTemplate := map[string]string{}

// First process exit tasks since those need to be set as lifecycle hooks on the the exit handler sub DAG.
for _, taskName := range keys {
kfpTask := dagSpec.GetTasks()[taskName]
if kfpTask.GetParameterIterator() != nil && kfpTask.GetArtifactIterator() != nil {
Expand All @@ -66,14 +61,81 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
if kfpTask.GetArtifactIterator() != nil {
return fmt.Errorf("artifact iterator not implemented yet")
}

if kfpTask.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" {
// Skip tasks that aren't exit tasks.
continue
}

tasks, err := c.task(taskName, kfpTask, taskInputs{
parentDagID: inputParameter(paramParentDagID),
})
if err != nil {
return err
}

// Generate the template name in the format of "exit-hook-<DAG name>-<task name>"
// (e.g. exit-hook-root-print-op).
name := fmt.Sprintf("exit-hook-%s-%s", name, taskName)

deps := kfpTask.GetDependentTasks()

for _, dep := range deps {
taskToExitTemplate[dep] = name
}

exitDag := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Parameters: []wfapi.Parameter{
{Name: paramParentDagID},
},
},
DAG: &wfapi.DAGTemplate{
Tasks: tasks,
},
}

_, err = c.addTemplate(exitDag, name)
if err != nil {
return fmt.Errorf("DAG: %w", err)
}
}

dag := &wfapi.Template{
Name: c.templateName(name),
Inputs: wfapi.Inputs{
Parameters: []wfapi.Parameter{
{Name: paramParentDagID},
},
},
DAG: &wfapi.DAGTemplate{},
}

for _, taskName := range keys {
kfpTask := dagSpec.GetTasks()[taskName]
if kfpTask.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" {
// Skip already processed exit tasks.
continue
}

exitTemplate := taskToExitTemplate[taskName]

tasks, err := c.task(
taskName, kfpTask, taskInputs{parentDagID: inputParameter(paramParentDagID), exitTemplate: exitTemplate},
)
if err != nil {
return err
}
dag.DAG.Tasks = append(dag.DAG.Tasks, tasks...)
}

// The compilation should fail before this point, but add it as an extra precaution to guard against an orphaned
// exit task.
if len(dag.DAG.Tasks) == 0 {
return fmt.Errorf("DAG %s must contain one or more non-exit tasks", name)
}

_, err = c.addTemplate(dag, name)
if err != nil {
return fmt.Errorf("DAG: %w", err)
Expand Down Expand Up @@ -116,26 +178,37 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
type dagInputs struct {
// placeholder for parent DAG execution ID
parentDagID string
condition string
// if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit
// lifecycle hook.
hookParentDagID string
// if provided, this will be the template the Argo Workflow exit lifecycle hook will execute.
exitTemplate string
condition string
}

// dagTask generates task for a DAG component.
// name: task name
// componentName: DAG component name
func (c *workflowCompiler) dagTask(name string, componentName string, inputs dagInputs) *wfapi.DAGTask {
return &wfapi.DAGTask{
task := &wfapi.DAGTask{
Name: name,
Template: c.templateName(componentName),
Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{
{Name: paramParentDagID, Value: wfapi.AnyStringPtr(inputs.parentDagID)},
{Name: paramCondition, Value: wfapi.AnyStringPtr(inputs.condition)},
}},
}

addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID)

return task
}

type taskInputs struct {
parentDagID string
iterationIndex string
// if provided, this will be the template the Argo Workflow exit lifecycle hook will execute.
exitTemplate string
}

// parentDagID: placeholder for parent DAG execution ID
Expand Down Expand Up @@ -177,12 +250,15 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
return nil, err
}
// iterations belong to a sub-DAG, no need to add dependent tasks
if inputs.iterationIndex == "" {
// Also skip adding dependencies when it's an exit hook
if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" {
driver.Depends = depends(task.GetDependentTasks())
}
dag := c.dagTask(name, componentName, dagInputs{
parentDagID: driverOutputs.executionID,
condition: driverOutputs.condition,
parentDagID: driverOutputs.executionID,
exitTemplate: inputs.exitTemplate,
hookParentDagID: inputs.parentDagID,
condition: driverOutputs.condition,
})
dag.Depends = depends([]string{driverTaskName})
if task.GetTriggerPolicy().GetCondition() != "" {
Expand Down Expand Up @@ -215,23 +291,23 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
driverOutputs.condition = ""
}
// iterations belong to a sub-DAG, no need to add dependent tasks
if inputs.iterationIndex == "" {
// Also skip adding dependencies when it's an exit hook
if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" {
driver.Depends = depends(task.GetDependentTasks())
}
// Handle exit handler dependency
if task.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" {
driver.Depends = depends_exit_handler(task.GetDependentTasks())
}

// When using a dummy image, this means this task is for Kubernetes configs.
// In this case skip executor(launcher).
if dummyImages[e.Container.GetImage()] {
driver.Name = name
return []wfapi.DAGTask{*driver}, nil
}
executor := c.containerExecutorTask(name, containerExecutorInputs{
podSpecPatch: driverOutputs.podSpecPatch,
cachedDecision: driverOutputs.cached,
condition: driverOutputs.condition,
podSpecPatch: driverOutputs.podSpecPatch,
cachedDecision: driverOutputs.cached,
condition: driverOutputs.condition,
exitTemplate: inputs.exitTemplate,
hookParentDagID: inputs.parentDagID,
}, task.GetComponentRef().GetName())
executor.Depends = depends([]string{driverTaskName})
return []wfapi.DAGTask{*driver, *executor}, nil
Expand Down Expand Up @@ -572,24 +648,3 @@ func depends(deps []string) string {
}
return builder.String()
}

// Exit handler task happens no matter the state of the upstream tasks
func depends_exit_handler(deps []string) string {
if len(deps) == 0 {
return ""
}
var builder strings.Builder
for index, dep := range deps {
if index > 0 {
builder.WriteString(" || ")
}
for inner_index, task_status := range []string{".Succeeded", ".Skipped", ".Failed", ".Errored"} {
if inner_index > 0 {
builder.WriteString(" || ")
}
builder.WriteString(dep)
builder.WriteString(task_status)
}
}
return builder.String()
}
7 changes: 5 additions & 2 deletions test/sample-test/run_sample_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def __init__(self,
result,
experiment_name,
host,
namespace='kubeflow'):
namespace='kubeflow',
expected_result='succeeded'):
"""Util class for checking python sample test running results.
:param testname: test name.
Expand All @@ -45,6 +46,7 @@ def __init__(self,
:param result: The path of the test result that will be exported.
:param host: The hostname of KFP API endpoint.
:param namespace: namespace of the deployed pipeline system. Default: kubeflow
:param expected_result: the expected status for the run, default is succeeded.
:param experiment_name: Name of the experiment to monitor
"""
self._testname = testname
Expand All @@ -65,6 +67,7 @@ def __init__(self,
self._job_name = None
self._test_args = None
self._run_id = None
self._expected_result = expected_result

def run(self):
"""Run compiled KFP pipeline."""
Expand Down Expand Up @@ -154,7 +157,7 @@ def check(self):
###### Monitor Job ######
start_time = datetime.now()
response = self._client.wait_for_run_completion(self._run_id, self._test_timeout)
succ = (response.state.lower() == 'succeeded')
succ = (response.state.lower() == self._expected_result)
end_time = datetime.now()
elapsed_time = (end_time - start_time).seconds
utils.add_junit_test(self._test_cases, 'job completion', succ,
Expand Down
6 changes: 5 additions & 1 deletion test/sample-test/sample_test_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ def __init__(self,
results_gcs_dir,
host='',
target_image_prefix='',
namespace='kubeflow'):
namespace='kubeflow',
expected_result='succeeded'):
"""Launch a KFP sample_test provided its name.
:param test_name: name of the corresponding sample test.
:param results_gcs_dir: gs dir to store test result.
:param host: host of KFP API endpoint, default is auto-discovery from inverse-proxy-config.
:param target_image_prefix: prefix of docker image, default is empty.
:param namespace: namespace for kfp, default is kubeflow.
:param expected_result: the expected status for the run, default is succeeded.
"""
self._test_name = test_name
self._results_gcs_dir = results_gcs_dir
Expand Down Expand Up @@ -81,6 +83,7 @@ def __init__(self,

self._sample_test_result = 'junit_Sample%sOutput.xml' % self._test_name
self._sample_test_output = self._results_gcs_dir
self._expected_result = expected_result

def _compile(self):

Expand Down Expand Up @@ -210,6 +213,7 @@ def run_test(self):
host=self._host,
namespace=self._namespace,
experiment_name=experiment_name,
expected_result=self._expected_result,
)
pysample_checker.run()
pysample_checker.check()
Expand Down

0 comments on commit eee3e56

Please sign in to comment.