From eee3e56dc77dec5e0d527771556c090b2d0cdb67 Mon Sep 17 00:00:00 2001 From: mprahl Date: Tue, 17 Dec 2024 14:18:09 -0500 Subject: [PATCH] fix(backend): Use an Argo Workflow exit lifecycle hook for exit handlers As described in #10917, exit handlers were implemented as dependent tasks that always ran within an Argo Workflow. The issue is that this caused the pipeline to have a succeeded status regardless of if the tasks within the exit handlers all succeeded. This commit changes exit handlers to be exit lifecycle hooks on an Argo Workflow so that the overall pipeline status is not impacted. Resolves: https://github.com/kubeflow/pipelines/issues/11405 Signed-off-by: mprahl --- .github/workflows/e2e-test.yml | 2 +- .../src/v2/compiler/argocompiler/common.go | 27 +++- .../src/v2/compiler/argocompiler/container.go | 11 +- backend/src/v2/compiler/argocompiler/dag.go | 141 ++++++++++++------ test/sample-test/run_sample_test.py | 7 +- test/sample-test/sample_test_launcher.py | 6 +- 6 files changed, 145 insertions(+), 49 deletions(-) diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index e564c587de4..2a89d87a97d 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -224,7 +224,7 @@ jobs: run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name sequential --results-gcs-dir output - name: Basic sample tests - exit_handler - run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --results-gcs-dir output + run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --expected-result failed --results-gcs-dir output - name: Collect test results if: always() diff --git a/backend/src/v2/compiler/argocompiler/common.go b/backend/src/v2/compiler/argocompiler/common.go index 2d203fc7acb..72d4ce389f4 100644 --- a/backend/src/v2/compiler/argocompiler/common.go +++ b/backend/src/v2/compiler/argocompiler/common.go @@ -14,7 +14,10 @@ package argocompiler -import k8score "k8s.io/api/core/v1" +import ( + wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + k8score "k8s.io/api/core/v1" +) // env vars in metadata-grpc-configmap is defined in component package var metadataConfigIsOptional bool = true @@ -42,3 +45,25 @@ var commonEnvs = []k8score.EnvVar{{ }, }, }} + +// addExitTask adds an exit lifecycle hook to a task if exitTemplate is not empty. +func addExitTask(task *wfapi.DAGTask, exitTemplate string, parentDagID string) { + if exitTemplate == "" { + return + } + + var args wfapi.Arguments + + if parentDagID != "" { + args = wfapi.Arguments{Parameters: []wfapi.Parameter{ + {Name: paramParentDagID, Value: wfapi.AnyStringPtr(parentDagID)}, + }} + } + + task.Hooks = wfapi.LifecycleHooks{ + wfapi.ExitLifecycleEvent: wfapi.LifecycleHook{ + Template: exitTemplate, + Arguments: args, + }, + } +} diff --git a/backend/src/v2/compiler/argocompiler/container.go b/backend/src/v2/compiler/argocompiler/container.go index 989dfffb8c2..77771f13e46 100644 --- a/backend/src/v2/compiler/argocompiler/container.go +++ b/backend/src/v2/compiler/argocompiler/container.go @@ -181,6 +181,11 @@ type containerExecutorInputs struct { cachedDecision string // if false, the container will be skipped. condition string + // if provided, this will be the template the Argo Workflow exit lifecycle hook will execute. + exitTemplate string + // if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit + // lifecycle hook. + hookParentDagID string } // containerExecutorTask returns an argo workflows DAGTask. @@ -192,7 +197,7 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx if inputs.condition != "" { when = inputs.condition + " != false" } - return &wfapi.DAGTask{ + task := &wfapi.DAGTask{ Name: name, Template: c.addContainerExecutorTemplate(refName), When: when, @@ -203,6 +208,10 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx }, }, } + + addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID) + + return task } // addContainerExecutorTemplate adds a generic container executor template for diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go index 7c997ee61d4..d6309d77ea5 100644 --- a/backend/src/v2/compiler/argocompiler/dag.go +++ b/backend/src/v2/compiler/argocompiler/dag.go @@ -39,15 +39,6 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen if err != nil { return err } - dag := &wfapi.Template{ - Name: c.templateName(name), - Inputs: wfapi.Inputs{ - Parameters: []wfapi.Parameter{ - {Name: paramParentDagID}, - }, - }, - DAG: &wfapi.DAGTemplate{}, - } tasks := dagSpec.GetTasks() // Iterate through tasks in deterministic order to facilitate testing. // Note, order doesn't affect compiler with real effect right now. @@ -58,6 +49,10 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen keys = append(keys, key) } sort.Strings(keys) + + taskToExitTemplate := map[string]string{} + + // First process exit tasks since those need to be set as lifecycle hooks on the the exit handler sub DAG. for _, taskName := range keys { kfpTask := dagSpec.GetTasks()[taskName] if kfpTask.GetParameterIterator() != nil && kfpTask.GetArtifactIterator() != nil { @@ -66,14 +61,81 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen if kfpTask.GetArtifactIterator() != nil { return fmt.Errorf("artifact iterator not implemented yet") } + + if kfpTask.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { + // Skip tasks that aren't exit tasks. + continue + } + tasks, err := c.task(taskName, kfpTask, taskInputs{ parentDagID: inputParameter(paramParentDagID), }) if err != nil { return err } + + // Generate the template name in the format of "exit-hook--" + // (e.g. exit-hook-root-print-op). + name := fmt.Sprintf("exit-hook-%s-%s", name, taskName) + + deps := kfpTask.GetDependentTasks() + + for _, dep := range deps { + taskToExitTemplate[dep] = name + } + + exitDag := &wfapi.Template{ + Name: name, + Inputs: wfapi.Inputs{ + Parameters: []wfapi.Parameter{ + {Name: paramParentDagID}, + }, + }, + DAG: &wfapi.DAGTemplate{ + Tasks: tasks, + }, + } + + _, err = c.addTemplate(exitDag, name) + if err != nil { + return fmt.Errorf("DAG: %w", err) + } + } + + dag := &wfapi.Template{ + Name: c.templateName(name), + Inputs: wfapi.Inputs{ + Parameters: []wfapi.Parameter{ + {Name: paramParentDagID}, + }, + }, + DAG: &wfapi.DAGTemplate{}, + } + + for _, taskName := range keys { + kfpTask := dagSpec.GetTasks()[taskName] + if kfpTask.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" { + // Skip already processed exit tasks. + continue + } + + exitTemplate := taskToExitTemplate[taskName] + + tasks, err := c.task( + taskName, kfpTask, taskInputs{parentDagID: inputParameter(paramParentDagID), exitTemplate: exitTemplate}, + ) + if err != nil { + return err + } dag.DAG.Tasks = append(dag.DAG.Tasks, tasks...) } + + // The compilation should fail before this point, but add it as an extra precaution to guard against an orphaned + // exit task. + if len(dag.DAG.Tasks) == 0 { + return fmt.Errorf("DAG %s must contain one or more non-exit tasks", name) + } + _, err = c.addTemplate(dag, name) if err != nil { return fmt.Errorf("DAG: %w", err) @@ -116,14 +178,19 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen type dagInputs struct { // placeholder for parent DAG execution ID parentDagID string - condition string + // if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit + // lifecycle hook. + hookParentDagID string + // if provided, this will be the template the Argo Workflow exit lifecycle hook will execute. + exitTemplate string + condition string } // dagTask generates task for a DAG component. // name: task name // componentName: DAG component name func (c *workflowCompiler) dagTask(name string, componentName string, inputs dagInputs) *wfapi.DAGTask { - return &wfapi.DAGTask{ + task := &wfapi.DAGTask{ Name: name, Template: c.templateName(componentName), Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{ @@ -131,11 +198,17 @@ func (c *workflowCompiler) dagTask(name string, componentName string, inputs dag {Name: paramCondition, Value: wfapi.AnyStringPtr(inputs.condition)}, }}, } + + addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID) + + return task } type taskInputs struct { parentDagID string iterationIndex string + // if provided, this will be the template the Argo Workflow exit lifecycle hook will execute. + exitTemplate string } // parentDagID: placeholder for parent DAG execution ID @@ -177,12 +250,15 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec return nil, err } // iterations belong to a sub-DAG, no need to add dependent tasks - if inputs.iterationIndex == "" { + // Also skip adding dependencies when it's an exit hook + if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { driver.Depends = depends(task.GetDependentTasks()) } dag := c.dagTask(name, componentName, dagInputs{ - parentDagID: driverOutputs.executionID, - condition: driverOutputs.condition, + parentDagID: driverOutputs.executionID, + exitTemplate: inputs.exitTemplate, + hookParentDagID: inputs.parentDagID, + condition: driverOutputs.condition, }) dag.Depends = depends([]string{driverTaskName}) if task.GetTriggerPolicy().GetCondition() != "" { @@ -215,13 +291,11 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec driverOutputs.condition = "" } // iterations belong to a sub-DAG, no need to add dependent tasks - if inputs.iterationIndex == "" { + // Also skip adding dependencies when it's an exit hook + if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { driver.Depends = depends(task.GetDependentTasks()) } - // Handle exit handler dependency - if task.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" { - driver.Depends = depends_exit_handler(task.GetDependentTasks()) - } + // When using a dummy image, this means this task is for Kubernetes configs. // In this case skip executor(launcher). if dummyImages[e.Container.GetImage()] { @@ -229,9 +303,11 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec return []wfapi.DAGTask{*driver}, nil } executor := c.containerExecutorTask(name, containerExecutorInputs{ - podSpecPatch: driverOutputs.podSpecPatch, - cachedDecision: driverOutputs.cached, - condition: driverOutputs.condition, + podSpecPatch: driverOutputs.podSpecPatch, + cachedDecision: driverOutputs.cached, + condition: driverOutputs.condition, + exitTemplate: inputs.exitTemplate, + hookParentDagID: inputs.parentDagID, }, task.GetComponentRef().GetName()) executor.Depends = depends([]string{driverTaskName}) return []wfapi.DAGTask{*driver, *executor}, nil @@ -572,24 +648,3 @@ func depends(deps []string) string { } return builder.String() } - -// Exit handler task happens no matter the state of the upstream tasks -func depends_exit_handler(deps []string) string { - if len(deps) == 0 { - return "" - } - var builder strings.Builder - for index, dep := range deps { - if index > 0 { - builder.WriteString(" || ") - } - for inner_index, task_status := range []string{".Succeeded", ".Skipped", ".Failed", ".Errored"} { - if inner_index > 0 { - builder.WriteString(" || ") - } - builder.WriteString(dep) - builder.WriteString(task_status) - } - } - return builder.String() -} diff --git a/test/sample-test/run_sample_test.py b/test/sample-test/run_sample_test.py index 2c9410fd148..65841fe6d40 100644 --- a/test/sample-test/run_sample_test.py +++ b/test/sample-test/run_sample_test.py @@ -36,7 +36,8 @@ def __init__(self, result, experiment_name, host, - namespace='kubeflow'): + namespace='kubeflow', + expected_result='succeeded'): """Util class for checking python sample test running results. :param testname: test name. @@ -45,6 +46,7 @@ def __init__(self, :param result: The path of the test result that will be exported. :param host: The hostname of KFP API endpoint. :param namespace: namespace of the deployed pipeline system. Default: kubeflow + :param expected_result: the expected status for the run, default is succeeded. :param experiment_name: Name of the experiment to monitor """ self._testname = testname @@ -65,6 +67,7 @@ def __init__(self, self._job_name = None self._test_args = None self._run_id = None + self._expected_result = expected_result def run(self): """Run compiled KFP pipeline.""" @@ -154,7 +157,7 @@ def check(self): ###### Monitor Job ###### start_time = datetime.now() response = self._client.wait_for_run_completion(self._run_id, self._test_timeout) - succ = (response.state.lower() == 'succeeded') + succ = (response.state.lower() == self._expected_result) end_time = datetime.now() elapsed_time = (end_time - start_time).seconds utils.add_junit_test(self._test_cases, 'job completion', succ, diff --git a/test/sample-test/sample_test_launcher.py b/test/sample-test/sample_test_launcher.py index dabbc790921..276c5c8d28b 100644 --- a/test/sample-test/sample_test_launcher.py +++ b/test/sample-test/sample_test_launcher.py @@ -42,7 +42,8 @@ def __init__(self, results_gcs_dir, host='', target_image_prefix='', - namespace='kubeflow'): + namespace='kubeflow', + expected_result='succeeded'): """Launch a KFP sample_test provided its name. :param test_name: name of the corresponding sample test. @@ -50,6 +51,7 @@ def __init__(self, :param host: host of KFP API endpoint, default is auto-discovery from inverse-proxy-config. :param target_image_prefix: prefix of docker image, default is empty. :param namespace: namespace for kfp, default is kubeflow. + :param expected_result: the expected status for the run, default is succeeded. """ self._test_name = test_name self._results_gcs_dir = results_gcs_dir @@ -81,6 +83,7 @@ def __init__(self, self._sample_test_result = 'junit_Sample%sOutput.xml' % self._test_name self._sample_test_output = self._results_gcs_dir + self._expected_result = expected_result def _compile(self): @@ -210,6 +213,7 @@ def run_test(self): host=self._host, namespace=self._namespace, experiment_name=experiment_name, + expected_result=self._expected_result, ) pysample_checker.run() pysample_checker.check()