Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): Use an Argo Workflow exit lifecycle hook for exit handlers #11470

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ jobs:
run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name sequential --results-gcs-dir output

- name: Basic sample tests - exit_handler
run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --results-gcs-dir output
run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --expected-result failed --results-gcs-dir output

- name: Collect test results
if: always()
Expand Down
27 changes: 26 additions & 1 deletion backend/src/v2/compiler/argocompiler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package argocompiler

import k8score "k8s.io/api/core/v1"
import (
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
k8score "k8s.io/api/core/v1"
)

// env vars in metadata-grpc-configmap is defined in component package
var metadataConfigIsOptional bool = true
Expand Down Expand Up @@ -42,3 +45,25 @@ var commonEnvs = []k8score.EnvVar{{
},
},
}}

// addExitTask adds an exit lifecycle hook to a task if exitTemplate is not empty.
func addExitTask(task *wfapi.DAGTask, exitTemplate string, parentDagID string) {
if exitTemplate == "" {
return
}

var args wfapi.Arguments

if parentDagID != "" {
args = wfapi.Arguments{Parameters: []wfapi.Parameter{
{Name: paramParentDagID, Value: wfapi.AnyStringPtr(parentDagID)},
}}
}

task.Hooks = wfapi.LifecycleHooks{
wfapi.ExitLifecycleEvent: wfapi.LifecycleHook{
Template: exitTemplate,
Arguments: args,
},
}
}
11 changes: 10 additions & 1 deletion backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ type containerExecutorInputs struct {
cachedDecision string
// if false, the container will be skipped.
condition string
// if provided, this will be the template the Argo Workflow exit lifecycle hook will execute.
exitTemplate string
// if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit
// lifecycle hook.
hookParentDagID string
}

// containerExecutorTask returns an argo workflows DAGTask.
Expand All @@ -192,7 +197,7 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx
if inputs.condition != "" {
when = inputs.condition + " != false"
}
return &wfapi.DAGTask{
task := &wfapi.DAGTask{
Name: name,
Template: c.addContainerExecutorTemplate(refName),
When: when,
Expand All @@ -203,6 +208,10 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx
},
},
}

addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID)

return task
}

// addContainerExecutorTemplate adds a generic container executor template for
Expand Down
141 changes: 98 additions & 43 deletions backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
if err != nil {
return err
}
dag := &wfapi.Template{
Name: c.templateName(name),
Inputs: wfapi.Inputs{
Parameters: []wfapi.Parameter{
{Name: paramParentDagID},
},
},
DAG: &wfapi.DAGTemplate{},
}
tasks := dagSpec.GetTasks()
// Iterate through tasks in deterministic order to facilitate testing.
// Note, order doesn't affect compiler with real effect right now.
Expand All @@ -58,6 +49,10 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
keys = append(keys, key)
}
sort.Strings(keys)

taskToExitTemplate := map[string]string{}

// First process exit tasks since those need to be set as lifecycle hooks on the the exit handler sub DAG.
for _, taskName := range keys {
kfpTask := dagSpec.GetTasks()[taskName]
if kfpTask.GetParameterIterator() != nil && kfpTask.GetArtifactIterator() != nil {
Expand All @@ -66,14 +61,81 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
if kfpTask.GetArtifactIterator() != nil {
return fmt.Errorf("artifact iterator not implemented yet")
}

if kfpTask.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" {
// Skip tasks that aren't exit tasks.
continue
}

tasks, err := c.task(taskName, kfpTask, taskInputs{
parentDagID: inputParameter(paramParentDagID),
})
if err != nil {
return err
}

// Generate the template name in the format of "exit-hook-<DAG name>-<task name>"
// (e.g. exit-hook-root-print-op).
name := fmt.Sprintf("exit-hook-%s-%s", name, taskName)

deps := kfpTask.GetDependentTasks()

for _, dep := range deps {
taskToExitTemplate[dep] = name
}

exitDag := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Parameters: []wfapi.Parameter{
{Name: paramParentDagID},
},
},
DAG: &wfapi.DAGTemplate{
Tasks: tasks,
},
}

_, err = c.addTemplate(exitDag, name)
if err != nil {
return fmt.Errorf("DAG: %w", err)
}
}

dag := &wfapi.Template{
Name: c.templateName(name),
Inputs: wfapi.Inputs{
Parameters: []wfapi.Parameter{
{Name: paramParentDagID},
},
},
DAG: &wfapi.DAGTemplate{},
}

for _, taskName := range keys {
kfpTask := dagSpec.GetTasks()[taskName]
if kfpTask.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" {
// Skip already processed exit tasks.
continue
}

exitTemplate := taskToExitTemplate[taskName]

tasks, err := c.task(
taskName, kfpTask, taskInputs{parentDagID: inputParameter(paramParentDagID), exitTemplate: exitTemplate},
)
if err != nil {
return err
}
dag.DAG.Tasks = append(dag.DAG.Tasks, tasks...)
}

// The compilation should fail before this point, but add it as an extra precaution to guard against an orphaned
// exit task.
if len(dag.DAG.Tasks) == 0 {
return fmt.Errorf("DAG %s must contain one or more non-exit tasks", name)
}

_, err = c.addTemplate(dag, name)
if err != nil {
return fmt.Errorf("DAG: %w", err)
Expand Down Expand Up @@ -116,26 +178,37 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
type dagInputs struct {
// placeholder for parent DAG execution ID
parentDagID string
condition string
// if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit
// lifecycle hook.
hookParentDagID string
// if provided, this will be the template the Argo Workflow exit lifecycle hook will execute.
exitTemplate string
condition string
}

// dagTask generates task for a DAG component.
// name: task name
// componentName: DAG component name
func (c *workflowCompiler) dagTask(name string, componentName string, inputs dagInputs) *wfapi.DAGTask {
return &wfapi.DAGTask{
task := &wfapi.DAGTask{
Name: name,
Template: c.templateName(componentName),
Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{
{Name: paramParentDagID, Value: wfapi.AnyStringPtr(inputs.parentDagID)},
{Name: paramCondition, Value: wfapi.AnyStringPtr(inputs.condition)},
}},
}

addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID)

return task
}

type taskInputs struct {
parentDagID string
iterationIndex string
// if provided, this will be the template the Argo Workflow exit lifecycle hook will execute.
exitTemplate string
}

// parentDagID: placeholder for parent DAG execution ID
Expand Down Expand Up @@ -177,12 +250,15 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
return nil, err
}
// iterations belong to a sub-DAG, no need to add dependent tasks
if inputs.iterationIndex == "" {
// Also skip adding dependencies when it's an exit hook
if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" {
driver.Depends = depends(task.GetDependentTasks())
}
dag := c.dagTask(name, componentName, dagInputs{
parentDagID: driverOutputs.executionID,
condition: driverOutputs.condition,
parentDagID: driverOutputs.executionID,
exitTemplate: inputs.exitTemplate,
hookParentDagID: inputs.parentDagID,
condition: driverOutputs.condition,
})
dag.Depends = depends([]string{driverTaskName})
if task.GetTriggerPolicy().GetCondition() != "" {
Expand Down Expand Up @@ -215,23 +291,23 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
driverOutputs.condition = ""
}
// iterations belong to a sub-DAG, no need to add dependent tasks
if inputs.iterationIndex == "" {
// Also skip adding dependencies when it's an exit hook
if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" {
driver.Depends = depends(task.GetDependentTasks())
}
// Handle exit handler dependency
if task.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" {
driver.Depends = depends_exit_handler(task.GetDependentTasks())
}

// When using a dummy image, this means this task is for Kubernetes configs.
// In this case skip executor(launcher).
if dummyImages[e.Container.GetImage()] {
driver.Name = name
return []wfapi.DAGTask{*driver}, nil
}
executor := c.containerExecutorTask(name, containerExecutorInputs{
podSpecPatch: driverOutputs.podSpecPatch,
cachedDecision: driverOutputs.cached,
condition: driverOutputs.condition,
podSpecPatch: driverOutputs.podSpecPatch,
cachedDecision: driverOutputs.cached,
condition: driverOutputs.condition,
exitTemplate: inputs.exitTemplate,
hookParentDagID: inputs.parentDagID,
}, task.GetComponentRef().GetName())
executor.Depends = depends([]string{driverTaskName})
return []wfapi.DAGTask{*driver, *executor}, nil
Expand Down Expand Up @@ -572,24 +648,3 @@ func depends(deps []string) string {
}
return builder.String()
}

// Exit handler task happens no matter the state of the upstream tasks
func depends_exit_handler(deps []string) string {
if len(deps) == 0 {
return ""
}
var builder strings.Builder
for index, dep := range deps {
if index > 0 {
builder.WriteString(" || ")
}
for inner_index, task_status := range []string{".Succeeded", ".Skipped", ".Failed", ".Errored"} {
if inner_index > 0 {
builder.WriteString(" || ")
}
builder.WriteString(dep)
builder.WriteString(task_status)
}
}
return builder.String()
}
7 changes: 5 additions & 2 deletions test/sample-test/run_sample_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def __init__(self,
result,
experiment_name,
host,
namespace='kubeflow'):
namespace='kubeflow',
expected_result='succeeded'):
"""Util class for checking python sample test running results.
:param testname: test name.
Expand All @@ -45,6 +46,7 @@ def __init__(self,
:param result: The path of the test result that will be exported.
:param host: The hostname of KFP API endpoint.
:param namespace: namespace of the deployed pipeline system. Default: kubeflow
:param expected_result: the expected status for the run, default is succeeded.
:param experiment_name: Name of the experiment to monitor
"""
self._testname = testname
Expand All @@ -65,6 +67,7 @@ def __init__(self,
self._job_name = None
self._test_args = None
self._run_id = None
self._expected_result = expected_result

def run(self):
"""Run compiled KFP pipeline."""
Expand Down Expand Up @@ -154,7 +157,7 @@ def check(self):
###### Monitor Job ######
start_time = datetime.now()
response = self._client.wait_for_run_completion(self._run_id, self._test_timeout)
succ = (response.state.lower() == 'succeeded')
succ = (response.state.lower() == self._expected_result)
end_time = datetime.now()
elapsed_time = (end_time - start_time).seconds
utils.add_junit_test(self._test_cases, 'job completion', succ,
Expand Down
6 changes: 5 additions & 1 deletion test/sample-test/sample_test_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ def __init__(self,
results_gcs_dir,
host='',
target_image_prefix='',
namespace='kubeflow'):
namespace='kubeflow',
expected_result='succeeded'):
"""Launch a KFP sample_test provided its name.

:param test_name: name of the corresponding sample test.
:param results_gcs_dir: gs dir to store test result.
:param host: host of KFP API endpoint, default is auto-discovery from inverse-proxy-config.
:param target_image_prefix: prefix of docker image, default is empty.
:param namespace: namespace for kfp, default is kubeflow.
:param expected_result: the expected status for the run, default is succeeded.
"""
self._test_name = test_name
self._results_gcs_dir = results_gcs_dir
Expand Down Expand Up @@ -81,6 +83,7 @@ def __init__(self,

self._sample_test_result = 'junit_Sample%sOutput.xml' % self._test_name
self._sample_test_output = self._results_gcs_dir
self._expected_result = expected_result

def _compile(self):

Expand Down Expand Up @@ -210,6 +213,7 @@ def run_test(self):
host=self._host,
namespace=self._namespace,
experiment_name=experiment_name,
expected_result=self._expected_result,
)
pysample_checker.run()
pysample_checker.check()
Expand Down
Loading