diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index e564c587de4..2a89d87a97d 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -224,7 +224,7 @@ jobs: run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name sequential --results-gcs-dir output - name: Basic sample tests - exit_handler - run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --results-gcs-dir output + run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --expected-result failed --results-gcs-dir output - name: Collect test results if: always() diff --git a/backend/src/v2/compiler/argocompiler/common.go b/backend/src/v2/compiler/argocompiler/common.go index 2d203fc7acb..72d4ce389f4 100644 --- a/backend/src/v2/compiler/argocompiler/common.go +++ b/backend/src/v2/compiler/argocompiler/common.go @@ -14,7 +14,10 @@ package argocompiler -import k8score "k8s.io/api/core/v1" +import ( + wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + k8score "k8s.io/api/core/v1" +) // env vars in metadata-grpc-configmap is defined in component package var metadataConfigIsOptional bool = true @@ -42,3 +45,25 @@ var commonEnvs = []k8score.EnvVar{{ }, }, }} + +// addExitTask adds an exit lifecycle hook to a task if exitTemplate is not empty. +func addExitTask(task *wfapi.DAGTask, exitTemplate string, parentDagID string) { + if exitTemplate == "" { + return + } + + var args wfapi.Arguments + + if parentDagID != "" { + args = wfapi.Arguments{Parameters: []wfapi.Parameter{ + {Name: paramParentDagID, Value: wfapi.AnyStringPtr(parentDagID)}, + }} + } + + task.Hooks = wfapi.LifecycleHooks{ + wfapi.ExitLifecycleEvent: wfapi.LifecycleHook{ + Template: exitTemplate, + Arguments: args, + }, + } +} diff --git a/backend/src/v2/compiler/argocompiler/container.go b/backend/src/v2/compiler/argocompiler/container.go index 989dfffb8c2..77771f13e46 100644 --- a/backend/src/v2/compiler/argocompiler/container.go +++ b/backend/src/v2/compiler/argocompiler/container.go @@ -181,6 +181,11 @@ type containerExecutorInputs struct { cachedDecision string // if false, the container will be skipped. condition string + // if provided, this will be the template the Argo Workflow exit lifecycle hook will execute. + exitTemplate string + // if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit + // lifecycle hook. + hookParentDagID string } // containerExecutorTask returns an argo workflows DAGTask. @@ -192,7 +197,7 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx if inputs.condition != "" { when = inputs.condition + " != false" } - return &wfapi.DAGTask{ + task := &wfapi.DAGTask{ Name: name, Template: c.addContainerExecutorTemplate(refName), When: when, @@ -203,6 +208,10 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx }, }, } + + addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID) + + return task } // addContainerExecutorTemplate adds a generic container executor template for diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go index 7c997ee61d4..d6309d77ea5 100644 --- a/backend/src/v2/compiler/argocompiler/dag.go +++ b/backend/src/v2/compiler/argocompiler/dag.go @@ -39,15 +39,6 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen if err != nil { return err } - dag := &wfapi.Template{ - Name: c.templateName(name), - Inputs: wfapi.Inputs{ - Parameters: []wfapi.Parameter{ - {Name: paramParentDagID}, - }, - }, - DAG: &wfapi.DAGTemplate{}, - } tasks := dagSpec.GetTasks() // Iterate through tasks in deterministic order to facilitate testing. // Note, order doesn't affect compiler with real effect right now. @@ -58,6 +49,10 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen keys = append(keys, key) } sort.Strings(keys) + + taskToExitTemplate := map[string]string{} + + // First process exit tasks since those need to be set as lifecycle hooks on the the exit handler sub DAG. for _, taskName := range keys { kfpTask := dagSpec.GetTasks()[taskName] if kfpTask.GetParameterIterator() != nil && kfpTask.GetArtifactIterator() != nil { @@ -66,14 +61,81 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen if kfpTask.GetArtifactIterator() != nil { return fmt.Errorf("artifact iterator not implemented yet") } + + if kfpTask.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { + // Skip tasks that aren't exit tasks. + continue + } + tasks, err := c.task(taskName, kfpTask, taskInputs{ parentDagID: inputParameter(paramParentDagID), }) if err != nil { return err } + + // Generate the template name in the format of "exit-hook--" + // (e.g. exit-hook-root-print-op). + name := fmt.Sprintf("exit-hook-%s-%s", name, taskName) + + deps := kfpTask.GetDependentTasks() + + for _, dep := range deps { + taskToExitTemplate[dep] = name + } + + exitDag := &wfapi.Template{ + Name: name, + Inputs: wfapi.Inputs{ + Parameters: []wfapi.Parameter{ + {Name: paramParentDagID}, + }, + }, + DAG: &wfapi.DAGTemplate{ + Tasks: tasks, + }, + } + + _, err = c.addTemplate(exitDag, name) + if err != nil { + return fmt.Errorf("DAG: %w", err) + } + } + + dag := &wfapi.Template{ + Name: c.templateName(name), + Inputs: wfapi.Inputs{ + Parameters: []wfapi.Parameter{ + {Name: paramParentDagID}, + }, + }, + DAG: &wfapi.DAGTemplate{}, + } + + for _, taskName := range keys { + kfpTask := dagSpec.GetTasks()[taskName] + if kfpTask.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" { + // Skip already processed exit tasks. + continue + } + + exitTemplate := taskToExitTemplate[taskName] + + tasks, err := c.task( + taskName, kfpTask, taskInputs{parentDagID: inputParameter(paramParentDagID), exitTemplate: exitTemplate}, + ) + if err != nil { + return err + } dag.DAG.Tasks = append(dag.DAG.Tasks, tasks...) } + + // The compilation should fail before this point, but add it as an extra precaution to guard against an orphaned + // exit task. + if len(dag.DAG.Tasks) == 0 { + return fmt.Errorf("DAG %s must contain one or more non-exit tasks", name) + } + _, err = c.addTemplate(dag, name) if err != nil { return fmt.Errorf("DAG: %w", err) @@ -116,14 +178,19 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen type dagInputs struct { // placeholder for parent DAG execution ID parentDagID string - condition string + // if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit + // lifecycle hook. + hookParentDagID string + // if provided, this will be the template the Argo Workflow exit lifecycle hook will execute. + exitTemplate string + condition string } // dagTask generates task for a DAG component. // name: task name // componentName: DAG component name func (c *workflowCompiler) dagTask(name string, componentName string, inputs dagInputs) *wfapi.DAGTask { - return &wfapi.DAGTask{ + task := &wfapi.DAGTask{ Name: name, Template: c.templateName(componentName), Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{ @@ -131,11 +198,17 @@ func (c *workflowCompiler) dagTask(name string, componentName string, inputs dag {Name: paramCondition, Value: wfapi.AnyStringPtr(inputs.condition)}, }}, } + + addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID) + + return task } type taskInputs struct { parentDagID string iterationIndex string + // if provided, this will be the template the Argo Workflow exit lifecycle hook will execute. + exitTemplate string } // parentDagID: placeholder for parent DAG execution ID @@ -177,12 +250,15 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec return nil, err } // iterations belong to a sub-DAG, no need to add dependent tasks - if inputs.iterationIndex == "" { + // Also skip adding dependencies when it's an exit hook + if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { driver.Depends = depends(task.GetDependentTasks()) } dag := c.dagTask(name, componentName, dagInputs{ - parentDagID: driverOutputs.executionID, - condition: driverOutputs.condition, + parentDagID: driverOutputs.executionID, + exitTemplate: inputs.exitTemplate, + hookParentDagID: inputs.parentDagID, + condition: driverOutputs.condition, }) dag.Depends = depends([]string{driverTaskName}) if task.GetTriggerPolicy().GetCondition() != "" { @@ -215,13 +291,11 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec driverOutputs.condition = "" } // iterations belong to a sub-DAG, no need to add dependent tasks - if inputs.iterationIndex == "" { + // Also skip adding dependencies when it's an exit hook + if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { driver.Depends = depends(task.GetDependentTasks()) } - // Handle exit handler dependency - if task.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" { - driver.Depends = depends_exit_handler(task.GetDependentTasks()) - } + // When using a dummy image, this means this task is for Kubernetes configs. // In this case skip executor(launcher). if dummyImages[e.Container.GetImage()] { @@ -229,9 +303,11 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec return []wfapi.DAGTask{*driver}, nil } executor := c.containerExecutorTask(name, containerExecutorInputs{ - podSpecPatch: driverOutputs.podSpecPatch, - cachedDecision: driverOutputs.cached, - condition: driverOutputs.condition, + podSpecPatch: driverOutputs.podSpecPatch, + cachedDecision: driverOutputs.cached, + condition: driverOutputs.condition, + exitTemplate: inputs.exitTemplate, + hookParentDagID: inputs.parentDagID, }, task.GetComponentRef().GetName()) executor.Depends = depends([]string{driverTaskName}) return []wfapi.DAGTask{*driver, *executor}, nil @@ -572,24 +648,3 @@ func depends(deps []string) string { } return builder.String() } - -// Exit handler task happens no matter the state of the upstream tasks -func depends_exit_handler(deps []string) string { - if len(deps) == 0 { - return "" - } - var builder strings.Builder - for index, dep := range deps { - if index > 0 { - builder.WriteString(" || ") - } - for inner_index, task_status := range []string{".Succeeded", ".Skipped", ".Failed", ".Errored"} { - if inner_index > 0 { - builder.WriteString(" || ") - } - builder.WriteString(dep) - builder.WriteString(task_status) - } - } - return builder.String() -} diff --git a/test/sample-test/run_sample_test.py b/test/sample-test/run_sample_test.py index 2c9410fd148..65841fe6d40 100644 --- a/test/sample-test/run_sample_test.py +++ b/test/sample-test/run_sample_test.py @@ -36,7 +36,8 @@ def __init__(self, result, experiment_name, host, - namespace='kubeflow'): + namespace='kubeflow', + expected_result='succeeded'): """Util class for checking python sample test running results. :param testname: test name. @@ -45,6 +46,7 @@ def __init__(self, :param result: The path of the test result that will be exported. :param host: The hostname of KFP API endpoint. :param namespace: namespace of the deployed pipeline system. Default: kubeflow + :param expected_result: the expected status for the run, default is succeeded. :param experiment_name: Name of the experiment to monitor """ self._testname = testname @@ -65,6 +67,7 @@ def __init__(self, self._job_name = None self._test_args = None self._run_id = None + self._expected_result = expected_result def run(self): """Run compiled KFP pipeline.""" @@ -154,7 +157,7 @@ def check(self): ###### Monitor Job ###### start_time = datetime.now() response = self._client.wait_for_run_completion(self._run_id, self._test_timeout) - succ = (response.state.lower() == 'succeeded') + succ = (response.state.lower() == self._expected_result) end_time = datetime.now() elapsed_time = (end_time - start_time).seconds utils.add_junit_test(self._test_cases, 'job completion', succ, diff --git a/test/sample-test/sample_test_launcher.py b/test/sample-test/sample_test_launcher.py index dabbc790921..276c5c8d28b 100644 --- a/test/sample-test/sample_test_launcher.py +++ b/test/sample-test/sample_test_launcher.py @@ -42,7 +42,8 @@ def __init__(self, results_gcs_dir, host='', target_image_prefix='', - namespace='kubeflow'): + namespace='kubeflow', + expected_result='succeeded'): """Launch a KFP sample_test provided its name. :param test_name: name of the corresponding sample test. @@ -50,6 +51,7 @@ def __init__(self, :param host: host of KFP API endpoint, default is auto-discovery from inverse-proxy-config. :param target_image_prefix: prefix of docker image, default is empty. :param namespace: namespace for kfp, default is kubeflow. + :param expected_result: the expected status for the run, default is succeeded. """ self._test_name = test_name self._results_gcs_dir = results_gcs_dir @@ -81,6 +83,7 @@ def __init__(self, self._sample_test_result = 'junit_Sample%sOutput.xml' % self._test_name self._sample_test_output = self._results_gcs_dir + self._expected_result = expected_result def _compile(self): @@ -210,6 +213,7 @@ def run_test(self): host=self._host, namespace=self._namespace, experiment_name=experiment_name, + expected_result=self._expected_result, ) pysample_checker.run() pysample_checker.check()