From 3059f7c124dc95f867e6f755f7c0720aaa32d48b Mon Sep 17 00:00:00 2001 From: Matt Prahl Date: Fri, 10 Jan 2025 11:05:27 -0500 Subject: [PATCH] fix(backend): Use an Argo Workflow exit lifecycle hook for exit handlers (#11470) As described in #10917, exit handlers were implemented as dependent tasks that always ran within an Argo Workflow. The issue is that this caused the pipeline to have a succeeded status regardless of if the tasks within the exit handlers all succeeded. This commit changes exit handlers to be exit lifecycle hooks on an Argo Workflow so that the overall pipeline status is not impacted. Resolves: https://github.com/kubeflow/pipelines/issues/11405 Signed-off-by: mprahl --- .github/workflows/e2e-test.yml | 2 +- .../src/v2/compiler/argocompiler/argo_test.go | 5 + .../src/v2/compiler/argocompiler/common.go | 21 +- .../src/v2/compiler/argocompiler/container.go | 10 +- backend/src/v2/compiler/argocompiler/dag.go | 141 ++++-- .../argocompiler/testdata/exit_handler.yaml | 408 ++++++++++++++++++ backend/src/v2/compiler/testdata/Makefile | 10 +- .../v2/compiler/testdata/exit_handler.json | 218 ++++++++++ test/sample-test/run_sample_test.py | 7 +- test/sample-test/sample_test_launcher.py | 6 +- 10 files changed, 775 insertions(+), 53 deletions(-) create mode 100644 backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml create mode 100644 backend/src/v2/compiler/testdata/exit_handler.json diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index e564c587de4..2a89d87a97d 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -224,7 +224,7 @@ jobs: run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name sequential --results-gcs-dir output - name: Basic sample tests - exit_handler - run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --results-gcs-dir output + run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --expected-result failed --results-gcs-dir output - name: Collect test results if: always() diff --git a/backend/src/v2/compiler/argocompiler/argo_test.go b/backend/src/v2/compiler/argocompiler/argo_test.go index cd802b5f38b..7ebf98ff49f 100644 --- a/backend/src/v2/compiler/argocompiler/argo_test.go +++ b/backend/src/v2/compiler/argocompiler/argo_test.go @@ -62,6 +62,11 @@ func Test_argo_compiler(t *testing.T) { platformSpecPath: "../testdata/create_pod_metadata.json", argoYAMLPath: "testdata/create_pod_metadata.yaml", }, + { + jobPath: "../testdata/exit_handler.json", + platformSpecPath: "", + argoYAMLPath: "testdata/exit_handler.yaml", + }, } for _, tt := range tests { t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) { diff --git a/backend/src/v2/compiler/argocompiler/common.go b/backend/src/v2/compiler/argocompiler/common.go index 2d203fc7acb..eab3b5304f5 100644 --- a/backend/src/v2/compiler/argocompiler/common.go +++ b/backend/src/v2/compiler/argocompiler/common.go @@ -14,7 +14,10 @@ package argocompiler -import k8score "k8s.io/api/core/v1" +import ( + wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + k8score "k8s.io/api/core/v1" +) // env vars in metadata-grpc-configmap is defined in component package var metadataConfigIsOptional bool = true @@ -42,3 +45,19 @@ var commonEnvs = []k8score.EnvVar{{ }, }, }} + +// addExitTask adds an exit lifecycle hook to a task if exitTemplate is not empty. +func addExitTask(task *wfapi.DAGTask, exitTemplate string, parentDagID string) { + if exitTemplate == "" { + return + } + + task.Hooks = wfapi.LifecycleHooks{ + wfapi.ExitLifecycleEvent: wfapi.LifecycleHook{ + Template: exitTemplate, + Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{ + {Name: paramParentDagID, Value: wfapi.AnyStringPtr(parentDagID)}, + }}, + }, + } +} diff --git a/backend/src/v2/compiler/argocompiler/container.go b/backend/src/v2/compiler/argocompiler/container.go index 03f0e3a119a..9409afd7aef 100644 --- a/backend/src/v2/compiler/argocompiler/container.go +++ b/backend/src/v2/compiler/argocompiler/container.go @@ -191,6 +191,10 @@ type containerExecutorInputs struct { cachedDecision string // if false, the container will be skipped. condition string + // if provided, this will be the template the Argo Workflow exit lifecycle hook will execute. + exitTemplate string + // this will be provided as the parent-dag-id input to the Argo Workflow exit lifecycle hook. + hookParentDagID string } // containerExecutorTask returns an argo workflows DAGTask. @@ -202,7 +206,7 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx if inputs.condition != "" { when = inputs.condition + " != false" } - return &wfapi.DAGTask{ + task := &wfapi.DAGTask{ Name: name, Template: c.addContainerExecutorTemplate(refName), When: when, @@ -213,6 +217,10 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx }, }, } + + addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID) + + return task } // addContainerExecutorTemplate adds a generic container executor template for diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go index 854ddd3bdaa..9944f5be1b5 100644 --- a/backend/src/v2/compiler/argocompiler/dag.go +++ b/backend/src/v2/compiler/argocompiler/dag.go @@ -39,15 +39,6 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen if err != nil { return err } - dag := &wfapi.Template{ - Name: c.templateName(name), - Inputs: wfapi.Inputs{ - Parameters: []wfapi.Parameter{ - {Name: paramParentDagID}, - }, - }, - DAG: &wfapi.DAGTemplate{}, - } tasks := dagSpec.GetTasks() // Iterate through tasks in deterministic order to facilitate testing. // Note, order doesn't affect compiler with real effect right now. @@ -58,6 +49,10 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen keys = append(keys, key) } sort.Strings(keys) + + taskToExitTemplate := map[string]string{} + + // First process exit tasks since those need to be set as lifecycle hooks on the exit handler sub DAG. for _, taskName := range keys { kfpTask := dagSpec.GetTasks()[taskName] if kfpTask.GetParameterIterator() != nil && kfpTask.GetArtifactIterator() != nil { @@ -66,14 +61,81 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen if kfpTask.GetArtifactIterator() != nil { return fmt.Errorf("artifact iterator not implemented yet") } + + if kfpTask.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { + // Skip tasks that aren't exit tasks. + continue + } + tasks, err := c.task(taskName, kfpTask, taskInputs{ parentDagID: inputParameter(paramParentDagID), }) if err != nil { return err } + + // Generate the template name in the format of "exit-hook--" + // (e.g. exit-hook-root-print-op). + name := fmt.Sprintf("exit-hook-%s-%s", name, taskName) + + deps := kfpTask.GetDependentTasks() + + for _, dep := range deps { + taskToExitTemplate[dep] = name + } + + exitDag := &wfapi.Template{ + Name: name, + Inputs: wfapi.Inputs{ + Parameters: []wfapi.Parameter{ + {Name: paramParentDagID}, + }, + }, + DAG: &wfapi.DAGTemplate{ + Tasks: tasks, + }, + } + + _, err = c.addTemplate(exitDag, name) + if err != nil { + return fmt.Errorf("DAG: %w", err) + } + } + + dag := &wfapi.Template{ + Name: c.templateName(name), + Inputs: wfapi.Inputs{ + Parameters: []wfapi.Parameter{ + {Name: paramParentDagID}, + }, + }, + DAG: &wfapi.DAGTemplate{}, + } + + for _, taskName := range keys { + kfpTask := dagSpec.GetTasks()[taskName] + if kfpTask.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" { + // Skip already processed exit tasks. + continue + } + + exitTemplate := taskToExitTemplate[taskName] + + tasks, err := c.task( + taskName, kfpTask, taskInputs{parentDagID: inputParameter(paramParentDagID), exitTemplate: exitTemplate}, + ) + if err != nil { + return err + } dag.DAG.Tasks = append(dag.DAG.Tasks, tasks...) } + + // The compilation should fail before this point, but add it as an extra precaution to guard against an orphaned + // exit task. + if len(dag.DAG.Tasks) == 0 { + return fmt.Errorf("DAG %s must contain one or more non-exit tasks", name) + } + _, err = c.addTemplate(dag, name) if err != nil { return fmt.Errorf("DAG: %w", err) @@ -116,14 +178,19 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen type dagInputs struct { // placeholder for parent DAG execution ID parentDagID string - condition string + // if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit + // lifecycle hook. + hookParentDagID string + // this will be provided as the parent-dag-id input to the Argo Workflow exit lifecycle hook. + exitTemplate string + condition string } // dagTask generates task for a DAG component. // name: task name // componentName: DAG component name func (c *workflowCompiler) dagTask(name string, componentName string, inputs dagInputs) *wfapi.DAGTask { - return &wfapi.DAGTask{ + task := &wfapi.DAGTask{ Name: name, Template: c.templateName(componentName), Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{ @@ -131,11 +198,17 @@ func (c *workflowCompiler) dagTask(name string, componentName string, inputs dag {Name: paramCondition, Value: wfapi.AnyStringPtr(inputs.condition)}, }}, } + + addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID) + + return task } type taskInputs struct { parentDagID string iterationIndex string + // if provided, this will be the template the Argo Workflow exit lifecycle hook will execute. + exitTemplate string } // parentDagID: placeholder for parent DAG execution ID @@ -177,12 +250,15 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec return nil, err } // iterations belong to a sub-DAG, no need to add dependent tasks - if inputs.iterationIndex == "" { + // Also skip adding dependencies when it's an exit hook + if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { driver.Depends = depends(task.GetDependentTasks()) } dag := c.dagTask(name, componentName, dagInputs{ - parentDagID: driverOutputs.executionID, - condition: driverOutputs.condition, + parentDagID: driverOutputs.executionID, + exitTemplate: inputs.exitTemplate, + hookParentDagID: inputs.parentDagID, + condition: driverOutputs.condition, }) dag.Depends = depends([]string{driverTaskName}) if task.GetTriggerPolicy().GetCondition() != "" { @@ -215,13 +291,11 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec driverOutputs.condition = "" } // iterations belong to a sub-DAG, no need to add dependent tasks - if inputs.iterationIndex == "" { + // Also skip adding dependencies when it's an exit hook + if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { driver.Depends = depends(task.GetDependentTasks()) } - // Handle exit handler dependency - if task.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" { - driver.Depends = depends_exit_handler(task.GetDependentTasks()) - } + // When using a dummy image, this means this task is for Kubernetes configs. // In this case skip executor(launcher). if dummyImages[e.Container.GetImage()] { @@ -229,9 +303,11 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec return []wfapi.DAGTask{*driver}, nil } executor := c.containerExecutorTask(name, containerExecutorInputs{ - podSpecPatch: driverOutputs.podSpecPatch, - cachedDecision: driverOutputs.cached, - condition: driverOutputs.condition, + podSpecPatch: driverOutputs.podSpecPatch, + cachedDecision: driverOutputs.cached, + condition: driverOutputs.condition, + exitTemplate: inputs.exitTemplate, + hookParentDagID: inputs.parentDagID, }, task.GetComponentRef().GetName()) executor.Depends = depends([]string{driverTaskName}) return []wfapi.DAGTask{*driver, *executor}, nil @@ -572,24 +648,3 @@ func depends(deps []string) string { } return builder.String() } - -// Exit handler task happens no matter the state of the upstream tasks -func depends_exit_handler(deps []string) string { - if len(deps) == 0 { - return "" - } - var builder strings.Builder - for index, dep := range deps { - if index > 0 { - builder.WriteString(" || ") - } - for inner_index, task_status := range []string{".Succeeded", ".Skipped", ".Failed", ".Errored"} { - if inner_index > 0 { - builder.WriteString(" || ") - } - builder.WriteString(dep) - builder.WriteString(task_status) - } - } - return builder.String() -} diff --git a/backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml b/backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml new file mode 100644 index 00000000000..4754f957c93 --- /dev/null +++ b/backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml @@ -0,0 +1,408 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + creationTimestamp: null + generateName: pipeline-with-exit-handler- +spec: + arguments: + parameters: + - name: components-8444a6bac7a3d81cc54291a13166a74231d98f9a98861815f15a055edde30ed8 + value: '{"executorLabel":"exec-fail-op","inputDefinitions":{"parameters":{"message":{"type":"STRING"}}}}' + - name: implementations-8444a6bac7a3d81cc54291a13166a74231d98f9a98861815f15a055edde30ed8 + value: '{"args":["--executor_input","{{$}}","--function_to_execute","fail_op"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==1.8.22'' + \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\nprintf + \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\"\npython3 -m + kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import + *\n\ndef fail_op(message: str):\n \"\"\"Fails.\"\"\"\n import sys\n print(message)\n sys.exit(1)\n\n"],"image":"python:3.7"}' + - name: components-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a + value: '{"executorLabel":"exec-print-op-2","inputDefinitions":{"parameters":{"message":{"type":"STRING"}}}}' + - name: implementations-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a + value: '{"args":["--executor_input","{{$}}","--function_to_execute","print_op"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==1.8.22'' + \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\nprintf + \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\"\npython3 -m + kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import + *\n\ndef print_op(message: str):\n \"\"\"Prints a message.\"\"\"\n print(message)\n\n"],"image":"python:3.7"}' + - name: components-comp-exit-handler-1 + value: '{"dag":{"tasks":{"fail-op":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-fail-op"},"inputs":{"parameters":{"message":{"runtimeValue":{"constantValue":{"stringValue":"Task + failed."}}}}},"taskInfo":{"name":"fail-op"}},"print-op-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-2"},"inputs":{"parameters":{"message":{"componentInputParameter":"pipelineparam--message"}}},"taskInfo":{"name":"print-op-2"}}}},"inputDefinitions":{"parameters":{"pipelineparam--message":{"type":"STRING"}}}}' + - name: components-root + value: '{"dag":{"tasks":{"exit-handler-1":{"componentRef":{"name":"comp-exit-handler-1"},"inputs":{"parameters":{"pipelineparam--message":{"componentInputParameter":"message"}}},"taskInfo":{"name":"exit-handler-1"}},"print-op":{"componentRef":{"name":"comp-print-op"},"dependentTasks":["exit-handler-1"],"inputs":{"parameters":{"message":{"runtimeValue":{"constantValue":{"stringValue":"Exit + handler has worked!"}}}}},"taskInfo":{"name":"print-op"},"triggerPolicy":{"strategy":"ALL_UPSTREAM_TASKS_COMPLETED"}}}},"inputDefinitions":{"parameters":{"message":{"type":"STRING"}}}}' + entrypoint: entrypoint + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - container: + args: + - --type + - CONTAINER + - --pipeline_name + - pipeline-with-exit-handler + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --container + - '{{inputs.parameters.container}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --cached_decision_path + - '{{outputs.parameters.cached-decision.path}}' + - --pod_spec_patch_path + - '{{outputs.parameters.pod-spec-patch.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --kubernetes_config + - '{{inputs.parameters.kubernetes-config}}' + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - name: task + - name: container + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + name: system-container-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + path: /tmp/outputs/pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + path: /tmp/outputs/cached-decision + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: pod-spec-patch + value: '{{inputs.parameters.pod-spec-patch}}' + name: executor + template: system-container-impl + when: '{{inputs.parameters.cached-decision}} != true' + inputs: + parameters: + - name: pod-spec-patch + - default: "false" + name: cached-decision + metadata: {} + name: system-container-executor + outputs: {} + - container: + command: + - should-be-overridden-during-runtime + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/should-be-overridden-during-runtime + name: "" + resources: {} + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch + initContainers: + - command: + - launcher-v2 + - --copy + - /kfp-launcher/launch + image: gcr.io/ml-pipeline/kfp-launcher + name: kfp-launcher + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 100m + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + inputs: + parameters: + - name: pod-spec-patch + metadata: {} + name: system-container-impl + outputs: {} + podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' + volumes: + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-8444a6bac7a3d81cc54291a13166a74231d98f9a98861815f15a055edde30ed8}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-fail-op"},"inputs":{"parameters":{"message":{"runtimeValue":{"constantValue":{"stringValue":"Task + failed."}}}}},"taskInfo":{"name":"fail-op"}}' + - name: container + value: '{{workflow.parameters.implementations-8444a6bac7a3d81cc54291a13166a74231d98f9a98861815f15a055edde30ed8}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: fail-op-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.fail-op-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.fail-op-driver.outputs.parameters.cached-decision}}' + depends: fail-op-driver.Succeeded + name: fail-op + template: system-container-executor + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-2"},"inputs":{"parameters":{"message":{"componentInputParameter":"pipelineparam--message"}}},"taskInfo":{"name":"print-op-2"}}' + - name: container + value: '{{workflow.parameters.implementations-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-2-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-2-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-2-driver.outputs.parameters.cached-decision}}' + depends: print-op-2-driver.Succeeded + name: print-op-2 + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-exit-handler-1 + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a}}' + - name: task + value: '{"componentRef":{"name":"comp-print-op"},"dependentTasks":["exit-handler-1"],"inputs":{"parameters":{"message":{"runtimeValue":{"constantValue":{"stringValue":"Exit + handler has worked!"}}}}},"taskInfo":{"name":"print-op"},"triggerPolicy":{"strategy":"ALL_UPSTREAM_TASKS_COMPLETED"}}' + - name: container + value: '{{workflow.parameters.implementations-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-driver.outputs.parameters.cached-decision}}' + depends: print-op-driver.Succeeded + name: print-op + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: exit-hook-root-print-op + outputs: {} + - container: + args: + - --type + - '{{inputs.parameters.driver-type}}' + - --pipeline_name + - pipeline-with-exit-handler + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --runtime_config + - '{{inputs.parameters.runtime-config}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --execution_id_path + - '{{outputs.parameters.execution-id.path}}' + - --iteration_count_path + - '{{outputs.parameters.iteration-count.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + path: /tmp/outputs/execution-id + - name: iteration-count + valueFrom: + default: "0" + path: /tmp/outputs/iteration-count + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-comp-exit-handler-1}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-exit-handler-1"},"inputs":{"parameters":{"pipelineparam--message":{"componentInputParameter":"message"}}},"taskInfo":{"name":"exit-handler-1"}}' + name: exit-handler-1-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.exit-handler-1-driver.outputs.parameters.execution-id}}' + - name: condition + value: '{{tasks.exit-handler-1-driver.outputs.parameters.condition}}' + depends: exit-handler-1-driver.Succeeded + hooks: + exit: + arguments: + parameters: + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + template: exit-hook-root-print-op + name: exit-handler-1 + template: comp-exit-handler-1 + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-root}}' + - name: runtime-config + value: '{"parameters":{"message":{"stringValue":"Hello World!"}}}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} +status: + finishedAt: null + startedAt: null diff --git a/backend/src/v2/compiler/testdata/Makefile b/backend/src/v2/compiler/testdata/Makefile index c22df9a6ec0..36e8f0b16ad 100644 --- a/backend/src/v2/compiler/testdata/Makefile +++ b/backend/src/v2/compiler/testdata/Makefile @@ -1,5 +1,6 @@ -REPO_ROOT=../../.. +REPO_ROOT=../../../../.. V2_SAMPLES=$(REPO_ROOT)/samples/v2 +CORE_SAMPLES=$(REPO_ROOT)/samples/core # Bobgy: I decided to commit compiled samples into the repo, because they are # used by compiler unit tests. Even if there are updates to v2 DSL compiler @@ -14,8 +15,9 @@ V2_SAMPLES=$(REPO_ROOT)/samples/v2 # Add more samples here when needed .PHONY: update update: - dsl-compile-v2 --py "$(V2_SAMPLES)/hello_world.py" --out "hello_world.json" - dsl-compile-v2 --py "$(V2_SAMPLES)/producer_consumer_param.py" --out "producer_consumer_param.json" + dsl-compile-v2 --py "$(V2_SAMPLES)/hello_world.py" --output "hello_world.json" + dsl-compile-v2 --py "$(V2_SAMPLES)/producer_consumer_param.py" --output "producer_consumer_param.json" + dsl-compile-v2 --py "$(CORE_SAMPLES)/exit_handler/exit_handler.py" --output "exit_handler.json" # currently commented, because v2 compiler generates duplicate component definitions # the commited component_used_twice.json file is hand edited. - # dsl-compile-v2 --py "component_used_twice.py" --out "component_used_twice.json" + # dsl-compile-v2 --py "component_used_twice.py" --output "component_used_twice.json" diff --git a/backend/src/v2/compiler/testdata/exit_handler.json b/backend/src/v2/compiler/testdata/exit_handler.json new file mode 100644 index 00000000000..becf09f390e --- /dev/null +++ b/backend/src/v2/compiler/testdata/exit_handler.json @@ -0,0 +1,218 @@ +{ + "pipelineSpec": { + "components": { + "comp-exit-handler-1": { + "dag": { + "tasks": { + "fail-op": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-fail-op" + }, + "inputs": { + "parameters": { + "message": { + "runtimeValue": { + "constantValue": { + "stringValue": "Task failed." + } + } + } + } + }, + "taskInfo": { + "name": "fail-op" + } + }, + "print-op-2": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op-2" + }, + "inputs": { + "parameters": { + "message": { + "componentInputParameter": "pipelineparam--message" + } + } + }, + "taskInfo": { + "name": "print-op-2" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "pipelineparam--message": { + "type": "STRING" + } + } + } + }, + "comp-fail-op": { + "executorLabel": "exec-fail-op", + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + }, + "comp-print-op": { + "executorLabel": "exec-print-op", + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + }, + "comp-print-op-2": { + "executorLabel": "exec-print-op-2", + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + } + }, + "deploymentSpec": { + "executors": { + "exec-fail-op": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "fail_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.22' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef fail_op(message: str):\n \"\"\"Fails.\"\"\"\n import sys\n print(message)\n sys.exit(1)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.22' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef print_op(message: str):\n \"\"\"Prints a message.\"\"\"\n print(message)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op-2": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.22' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef print_op(message: str):\n \"\"\"Prints a message.\"\"\"\n print(message)\n\n" + ], + "image": "python:3.7" + } + } + } + }, + "pipelineInfo": { + "name": "pipeline-with-exit-handler" + }, + "root": { + "dag": { + "tasks": { + "exit-handler-1": { + "componentRef": { + "name": "comp-exit-handler-1" + }, + "inputs": { + "parameters": { + "pipelineparam--message": { + "componentInputParameter": "message" + } + } + }, + "taskInfo": { + "name": "exit-handler-1" + } + }, + "print-op": { + "componentRef": { + "name": "comp-print-op" + }, + "dependentTasks": [ + "exit-handler-1" + ], + "inputs": { + "parameters": { + "message": { + "runtimeValue": { + "constantValue": { + "stringValue": "Exit handler has worked!" + } + } + } + } + }, + "taskInfo": { + "name": "print-op" + }, + "triggerPolicy": { + "strategy": "ALL_UPSTREAM_TASKS_COMPLETED" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + }, + "schemaVersion": "2.0.0", + "sdkVersion": "kfp-1.8.22" + }, + "runtimeConfig": { + "parameters": { + "message": { + "stringValue": "Hello World!" + } + } + } +} \ No newline at end of file diff --git a/test/sample-test/run_sample_test.py b/test/sample-test/run_sample_test.py index 2c9410fd148..65841fe6d40 100644 --- a/test/sample-test/run_sample_test.py +++ b/test/sample-test/run_sample_test.py @@ -36,7 +36,8 @@ def __init__(self, result, experiment_name, host, - namespace='kubeflow'): + namespace='kubeflow', + expected_result='succeeded'): """Util class for checking python sample test running results. :param testname: test name. @@ -45,6 +46,7 @@ def __init__(self, :param result: The path of the test result that will be exported. :param host: The hostname of KFP API endpoint. :param namespace: namespace of the deployed pipeline system. Default: kubeflow + :param expected_result: the expected status for the run, default is succeeded. :param experiment_name: Name of the experiment to monitor """ self._testname = testname @@ -65,6 +67,7 @@ def __init__(self, self._job_name = None self._test_args = None self._run_id = None + self._expected_result = expected_result def run(self): """Run compiled KFP pipeline.""" @@ -154,7 +157,7 @@ def check(self): ###### Monitor Job ###### start_time = datetime.now() response = self._client.wait_for_run_completion(self._run_id, self._test_timeout) - succ = (response.state.lower() == 'succeeded') + succ = (response.state.lower() == self._expected_result) end_time = datetime.now() elapsed_time = (end_time - start_time).seconds utils.add_junit_test(self._test_cases, 'job completion', succ, diff --git a/test/sample-test/sample_test_launcher.py b/test/sample-test/sample_test_launcher.py index dabbc790921..276c5c8d28b 100644 --- a/test/sample-test/sample_test_launcher.py +++ b/test/sample-test/sample_test_launcher.py @@ -42,7 +42,8 @@ def __init__(self, results_gcs_dir, host='', target_image_prefix='', - namespace='kubeflow'): + namespace='kubeflow', + expected_result='succeeded'): """Launch a KFP sample_test provided its name. :param test_name: name of the corresponding sample test. @@ -50,6 +51,7 @@ def __init__(self, :param host: host of KFP API endpoint, default is auto-discovery from inverse-proxy-config. :param target_image_prefix: prefix of docker image, default is empty. :param namespace: namespace for kfp, default is kubeflow. + :param expected_result: the expected status for the run, default is succeeded. """ self._test_name = test_name self._results_gcs_dir = results_gcs_dir @@ -81,6 +83,7 @@ def __init__(self, self._sample_test_result = 'junit_Sample%sOutput.xml' % self._test_name self._sample_test_output = self._results_gcs_dir + self._expected_result = expected_result def _compile(self): @@ -210,6 +213,7 @@ def run_test(self): host=self._host, namespace=self._namespace, experiment_name=experiment_name, + expected_result=self._expected_result, ) pysample_checker.run() pysample_checker.check()