diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index e564c587de4..2a89d87a97d 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -224,7 +224,7 @@ jobs: run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name sequential --results-gcs-dir output - name: Basic sample tests - exit_handler - run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --results-gcs-dir output + run: python3 ./test/sample-test/sample_test_launcher.py sample_test run_test --namespace kubeflow --test-name exit_handler --expected-result failed --results-gcs-dir output - name: Collect test results if: always() diff --git a/backend/src/v2/compiler/argocompiler/argo_test.go b/backend/src/v2/compiler/argocompiler/argo_test.go index cd802b5f38b..7ebf98ff49f 100644 --- a/backend/src/v2/compiler/argocompiler/argo_test.go +++ b/backend/src/v2/compiler/argocompiler/argo_test.go @@ -62,6 +62,11 @@ func Test_argo_compiler(t *testing.T) { platformSpecPath: "../testdata/create_pod_metadata.json", argoYAMLPath: "testdata/create_pod_metadata.yaml", }, + { + jobPath: "../testdata/exit_handler.json", + platformSpecPath: "", + argoYAMLPath: "testdata/exit_handler.yaml", + }, } for _, tt := range tests { t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) { diff --git a/backend/src/v2/compiler/argocompiler/common.go b/backend/src/v2/compiler/argocompiler/common.go index 2d203fc7acb..eab3b5304f5 100644 --- a/backend/src/v2/compiler/argocompiler/common.go +++ b/backend/src/v2/compiler/argocompiler/common.go @@ -14,7 +14,10 @@ package argocompiler -import k8score "k8s.io/api/core/v1" +import ( + wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + k8score "k8s.io/api/core/v1" +) // env vars in metadata-grpc-configmap is defined in component package var metadataConfigIsOptional bool = true @@ -42,3 +45,19 @@ var commonEnvs = []k8score.EnvVar{{ }, }, }} + +// addExitTask adds an exit lifecycle hook to a task if exitTemplate is not empty. +func addExitTask(task *wfapi.DAGTask, exitTemplate string, parentDagID string) { + if exitTemplate == "" { + return + } + + task.Hooks = wfapi.LifecycleHooks{ + wfapi.ExitLifecycleEvent: wfapi.LifecycleHook{ + Template: exitTemplate, + Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{ + {Name: paramParentDagID, Value: wfapi.AnyStringPtr(parentDagID)}, + }}, + }, + } +} diff --git a/backend/src/v2/compiler/argocompiler/container.go b/backend/src/v2/compiler/argocompiler/container.go index 03f0e3a119a..9409afd7aef 100644 --- a/backend/src/v2/compiler/argocompiler/container.go +++ b/backend/src/v2/compiler/argocompiler/container.go @@ -191,6 +191,10 @@ type containerExecutorInputs struct { cachedDecision string // if false, the container will be skipped. condition string + // if provided, this will be the template the Argo Workflow exit lifecycle hook will execute. + exitTemplate string + // this will be provided as the parent-dag-id input to the Argo Workflow exit lifecycle hook. + hookParentDagID string } // containerExecutorTask returns an argo workflows DAGTask. @@ -202,7 +206,7 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx if inputs.condition != "" { when = inputs.condition + " != false" } - return &wfapi.DAGTask{ + task := &wfapi.DAGTask{ Name: name, Template: c.addContainerExecutorTemplate(refName), When: when, @@ -213,6 +217,10 @@ func (c *workflowCompiler) containerExecutorTask(name string, inputs containerEx }, }, } + + addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID) + + return task } // addContainerExecutorTemplate adds a generic container executor template for diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go index 854ddd3bdaa..9944f5be1b5 100644 --- a/backend/src/v2/compiler/argocompiler/dag.go +++ b/backend/src/v2/compiler/argocompiler/dag.go @@ -39,15 +39,6 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen if err != nil { return err } - dag := &wfapi.Template{ - Name: c.templateName(name), - Inputs: wfapi.Inputs{ - Parameters: []wfapi.Parameter{ - {Name: paramParentDagID}, - }, - }, - DAG: &wfapi.DAGTemplate{}, - } tasks := dagSpec.GetTasks() // Iterate through tasks in deterministic order to facilitate testing. // Note, order doesn't affect compiler with real effect right now. @@ -58,6 +49,10 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen keys = append(keys, key) } sort.Strings(keys) + + taskToExitTemplate := map[string]string{} + + // First process exit tasks since those need to be set as lifecycle hooks on the exit handler sub DAG. for _, taskName := range keys { kfpTask := dagSpec.GetTasks()[taskName] if kfpTask.GetParameterIterator() != nil && kfpTask.GetArtifactIterator() != nil { @@ -66,14 +61,81 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen if kfpTask.GetArtifactIterator() != nil { return fmt.Errorf("artifact iterator not implemented yet") } + + if kfpTask.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { + // Skip tasks that aren't exit tasks. + continue + } + tasks, err := c.task(taskName, kfpTask, taskInputs{ parentDagID: inputParameter(paramParentDagID), }) if err != nil { return err } + + // Generate the template name in the format of "exit-hook--" + // (e.g. exit-hook-root-print-op). + name := fmt.Sprintf("exit-hook-%s-%s", name, taskName) + + deps := kfpTask.GetDependentTasks() + + for _, dep := range deps { + taskToExitTemplate[dep] = name + } + + exitDag := &wfapi.Template{ + Name: name, + Inputs: wfapi.Inputs{ + Parameters: []wfapi.Parameter{ + {Name: paramParentDagID}, + }, + }, + DAG: &wfapi.DAGTemplate{ + Tasks: tasks, + }, + } + + _, err = c.addTemplate(exitDag, name) + if err != nil { + return fmt.Errorf("DAG: %w", err) + } + } + + dag := &wfapi.Template{ + Name: c.templateName(name), + Inputs: wfapi.Inputs{ + Parameters: []wfapi.Parameter{ + {Name: paramParentDagID}, + }, + }, + DAG: &wfapi.DAGTemplate{}, + } + + for _, taskName := range keys { + kfpTask := dagSpec.GetTasks()[taskName] + if kfpTask.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" { + // Skip already processed exit tasks. + continue + } + + exitTemplate := taskToExitTemplate[taskName] + + tasks, err := c.task( + taskName, kfpTask, taskInputs{parentDagID: inputParameter(paramParentDagID), exitTemplate: exitTemplate}, + ) + if err != nil { + return err + } dag.DAG.Tasks = append(dag.DAG.Tasks, tasks...) } + + // The compilation should fail before this point, but add it as an extra precaution to guard against an orphaned + // exit task. + if len(dag.DAG.Tasks) == 0 { + return fmt.Errorf("DAG %s must contain one or more non-exit tasks", name) + } + _, err = c.addTemplate(dag, name) if err != nil { return fmt.Errorf("DAG: %w", err) @@ -116,14 +178,19 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen type dagInputs struct { // placeholder for parent DAG execution ID parentDagID string - condition string + // if provided along with exitTemplate, this will be provided as the parent-dag-id input to the Argo Workflow exit + // lifecycle hook. + hookParentDagID string + // this will be provided as the parent-dag-id input to the Argo Workflow exit lifecycle hook. + exitTemplate string + condition string } // dagTask generates task for a DAG component. // name: task name // componentName: DAG component name func (c *workflowCompiler) dagTask(name string, componentName string, inputs dagInputs) *wfapi.DAGTask { - return &wfapi.DAGTask{ + task := &wfapi.DAGTask{ Name: name, Template: c.templateName(componentName), Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{ @@ -131,11 +198,17 @@ func (c *workflowCompiler) dagTask(name string, componentName string, inputs dag {Name: paramCondition, Value: wfapi.AnyStringPtr(inputs.condition)}, }}, } + + addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID) + + return task } type taskInputs struct { parentDagID string iterationIndex string + // if provided, this will be the template the Argo Workflow exit lifecycle hook will execute. + exitTemplate string } // parentDagID: placeholder for parent DAG execution ID @@ -177,12 +250,15 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec return nil, err } // iterations belong to a sub-DAG, no need to add dependent tasks - if inputs.iterationIndex == "" { + // Also skip adding dependencies when it's an exit hook + if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { driver.Depends = depends(task.GetDependentTasks()) } dag := c.dagTask(name, componentName, dagInputs{ - parentDagID: driverOutputs.executionID, - condition: driverOutputs.condition, + parentDagID: driverOutputs.executionID, + exitTemplate: inputs.exitTemplate, + hookParentDagID: inputs.parentDagID, + condition: driverOutputs.condition, }) dag.Depends = depends([]string{driverTaskName}) if task.GetTriggerPolicy().GetCondition() != "" { @@ -215,13 +291,11 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec driverOutputs.condition = "" } // iterations belong to a sub-DAG, no need to add dependent tasks - if inputs.iterationIndex == "" { + // Also skip adding dependencies when it's an exit hook + if inputs.iterationIndex == "" && task.GetTriggerPolicy().GetStrategy().String() != "ALL_UPSTREAM_TASKS_COMPLETED" { driver.Depends = depends(task.GetDependentTasks()) } - // Handle exit handler dependency - if task.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" { - driver.Depends = depends_exit_handler(task.GetDependentTasks()) - } + // When using a dummy image, this means this task is for Kubernetes configs. // In this case skip executor(launcher). if dummyImages[e.Container.GetImage()] { @@ -229,9 +303,11 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec return []wfapi.DAGTask{*driver}, nil } executor := c.containerExecutorTask(name, containerExecutorInputs{ - podSpecPatch: driverOutputs.podSpecPatch, - cachedDecision: driverOutputs.cached, - condition: driverOutputs.condition, + podSpecPatch: driverOutputs.podSpecPatch, + cachedDecision: driverOutputs.cached, + condition: driverOutputs.condition, + exitTemplate: inputs.exitTemplate, + hookParentDagID: inputs.parentDagID, }, task.GetComponentRef().GetName()) executor.Depends = depends([]string{driverTaskName}) return []wfapi.DAGTask{*driver, *executor}, nil @@ -572,24 +648,3 @@ func depends(deps []string) string { } return builder.String() } - -// Exit handler task happens no matter the state of the upstream tasks -func depends_exit_handler(deps []string) string { - if len(deps) == 0 { - return "" - } - var builder strings.Builder - for index, dep := range deps { - if index > 0 { - builder.WriteString(" || ") - } - for inner_index, task_status := range []string{".Succeeded", ".Skipped", ".Failed", ".Errored"} { - if inner_index > 0 { - builder.WriteString(" || ") - } - builder.WriteString(dep) - builder.WriteString(task_status) - } - } - return builder.String() -} diff --git a/backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml b/backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml new file mode 100644 index 00000000000..4754f957c93 --- /dev/null +++ b/backend/src/v2/compiler/argocompiler/testdata/exit_handler.yaml @@ -0,0 +1,408 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + creationTimestamp: null + generateName: pipeline-with-exit-handler- +spec: + arguments: + parameters: + - name: components-8444a6bac7a3d81cc54291a13166a74231d98f9a98861815f15a055edde30ed8 + value: '{"executorLabel":"exec-fail-op","inputDefinitions":{"parameters":{"message":{"type":"STRING"}}}}' + - name: implementations-8444a6bac7a3d81cc54291a13166a74231d98f9a98861815f15a055edde30ed8 + value: '{"args":["--executor_input","{{$}}","--function_to_execute","fail_op"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==1.8.22'' + \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\nprintf + \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\"\npython3 -m + kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import + *\n\ndef fail_op(message: str):\n \"\"\"Fails.\"\"\"\n import sys\n print(message)\n sys.exit(1)\n\n"],"image":"python:3.7"}' + - name: components-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a + value: '{"executorLabel":"exec-print-op-2","inputDefinitions":{"parameters":{"message":{"type":"STRING"}}}}' + - name: implementations-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a + value: '{"args":["--executor_input","{{$}}","--function_to_execute","print_op"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==1.8.22'' + \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\nprintf + \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\"\npython3 -m + kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import + *\n\ndef print_op(message: str):\n \"\"\"Prints a message.\"\"\"\n print(message)\n\n"],"image":"python:3.7"}' + - name: components-comp-exit-handler-1 + value: '{"dag":{"tasks":{"fail-op":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-fail-op"},"inputs":{"parameters":{"message":{"runtimeValue":{"constantValue":{"stringValue":"Task + failed."}}}}},"taskInfo":{"name":"fail-op"}},"print-op-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-2"},"inputs":{"parameters":{"message":{"componentInputParameter":"pipelineparam--message"}}},"taskInfo":{"name":"print-op-2"}}}},"inputDefinitions":{"parameters":{"pipelineparam--message":{"type":"STRING"}}}}' + - name: components-root + value: '{"dag":{"tasks":{"exit-handler-1":{"componentRef":{"name":"comp-exit-handler-1"},"inputs":{"parameters":{"pipelineparam--message":{"componentInputParameter":"message"}}},"taskInfo":{"name":"exit-handler-1"}},"print-op":{"componentRef":{"name":"comp-print-op"},"dependentTasks":["exit-handler-1"],"inputs":{"parameters":{"message":{"runtimeValue":{"constantValue":{"stringValue":"Exit + handler has worked!"}}}}},"taskInfo":{"name":"print-op"},"triggerPolicy":{"strategy":"ALL_UPSTREAM_TASKS_COMPLETED"}}}},"inputDefinitions":{"parameters":{"message":{"type":"STRING"}}}}' + entrypoint: entrypoint + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - container: + args: + - --type + - CONTAINER + - --pipeline_name + - pipeline-with-exit-handler + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --container + - '{{inputs.parameters.container}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --cached_decision_path + - '{{outputs.parameters.cached-decision.path}}' + - --pod_spec_patch_path + - '{{outputs.parameters.pod-spec-patch.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --kubernetes_config + - '{{inputs.parameters.kubernetes-config}}' + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - name: task + - name: container + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + name: system-container-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + path: /tmp/outputs/pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + path: /tmp/outputs/cached-decision + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: pod-spec-patch + value: '{{inputs.parameters.pod-spec-patch}}' + name: executor + template: system-container-impl + when: '{{inputs.parameters.cached-decision}} != true' + inputs: + parameters: + - name: pod-spec-patch + - default: "false" + name: cached-decision + metadata: {} + name: system-container-executor + outputs: {} + - container: + command: + - should-be-overridden-during-runtime + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/should-be-overridden-during-runtime + name: "" + resources: {} + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch + initContainers: + - command: + - launcher-v2 + - --copy + - /kfp-launcher/launch + image: gcr.io/ml-pipeline/kfp-launcher + name: kfp-launcher + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 100m + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + inputs: + parameters: + - name: pod-spec-patch + metadata: {} + name: system-container-impl + outputs: {} + podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' + volumes: + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-8444a6bac7a3d81cc54291a13166a74231d98f9a98861815f15a055edde30ed8}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-fail-op"},"inputs":{"parameters":{"message":{"runtimeValue":{"constantValue":{"stringValue":"Task + failed."}}}}},"taskInfo":{"name":"fail-op"}}' + - name: container + value: '{{workflow.parameters.implementations-8444a6bac7a3d81cc54291a13166a74231d98f9a98861815f15a055edde30ed8}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: fail-op-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.fail-op-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.fail-op-driver.outputs.parameters.cached-decision}}' + depends: fail-op-driver.Succeeded + name: fail-op + template: system-container-executor + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-2"},"inputs":{"parameters":{"message":{"componentInputParameter":"pipelineparam--message"}}},"taskInfo":{"name":"print-op-2"}}' + - name: container + value: '{{workflow.parameters.implementations-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-2-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-2-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-2-driver.outputs.parameters.cached-decision}}' + depends: print-op-2-driver.Succeeded + name: print-op-2 + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-exit-handler-1 + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a}}' + - name: task + value: '{"componentRef":{"name":"comp-print-op"},"dependentTasks":["exit-handler-1"],"inputs":{"parameters":{"message":{"runtimeValue":{"constantValue":{"stringValue":"Exit + handler has worked!"}}}}},"taskInfo":{"name":"print-op"},"triggerPolicy":{"strategy":"ALL_UPSTREAM_TASKS_COMPLETED"}}' + - name: container + value: '{{workflow.parameters.implementations-f192dae3a3c4616f7637be7d0414bcffbff11a78dc03bf428f05490caa678f8a}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-driver.outputs.parameters.cached-decision}}' + depends: print-op-driver.Succeeded + name: print-op + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: exit-hook-root-print-op + outputs: {} + - container: + args: + - --type + - '{{inputs.parameters.driver-type}}' + - --pipeline_name + - pipeline-with-exit-handler + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --runtime_config + - '{{inputs.parameters.runtime-config}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --execution_id_path + - '{{outputs.parameters.execution-id.path}}' + - --iteration_count_path + - '{{outputs.parameters.iteration-count.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + path: /tmp/outputs/execution-id + - name: iteration-count + valueFrom: + default: "0" + path: /tmp/outputs/iteration-count + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-comp-exit-handler-1}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-exit-handler-1"},"inputs":{"parameters":{"pipelineparam--message":{"componentInputParameter":"message"}}},"taskInfo":{"name":"exit-handler-1"}}' + name: exit-handler-1-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.exit-handler-1-driver.outputs.parameters.execution-id}}' + - name: condition + value: '{{tasks.exit-handler-1-driver.outputs.parameters.condition}}' + depends: exit-handler-1-driver.Succeeded + hooks: + exit: + arguments: + parameters: + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + template: exit-hook-root-print-op + name: exit-handler-1 + template: comp-exit-handler-1 + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-root}}' + - name: runtime-config + value: '{"parameters":{"message":{"stringValue":"Hello World!"}}}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} +status: + finishedAt: null + startedAt: null diff --git a/backend/src/v2/compiler/testdata/Makefile b/backend/src/v2/compiler/testdata/Makefile index c22df9a6ec0..36e8f0b16ad 100644 --- a/backend/src/v2/compiler/testdata/Makefile +++ b/backend/src/v2/compiler/testdata/Makefile @@ -1,5 +1,6 @@ -REPO_ROOT=../../.. +REPO_ROOT=../../../../.. V2_SAMPLES=$(REPO_ROOT)/samples/v2 +CORE_SAMPLES=$(REPO_ROOT)/samples/core # Bobgy: I decided to commit compiled samples into the repo, because they are # used by compiler unit tests. Even if there are updates to v2 DSL compiler @@ -14,8 +15,9 @@ V2_SAMPLES=$(REPO_ROOT)/samples/v2 # Add more samples here when needed .PHONY: update update: - dsl-compile-v2 --py "$(V2_SAMPLES)/hello_world.py" --out "hello_world.json" - dsl-compile-v2 --py "$(V2_SAMPLES)/producer_consumer_param.py" --out "producer_consumer_param.json" + dsl-compile-v2 --py "$(V2_SAMPLES)/hello_world.py" --output "hello_world.json" + dsl-compile-v2 --py "$(V2_SAMPLES)/producer_consumer_param.py" --output "producer_consumer_param.json" + dsl-compile-v2 --py "$(CORE_SAMPLES)/exit_handler/exit_handler.py" --output "exit_handler.json" # currently commented, because v2 compiler generates duplicate component definitions # the commited component_used_twice.json file is hand edited. - # dsl-compile-v2 --py "component_used_twice.py" --out "component_used_twice.json" + # dsl-compile-v2 --py "component_used_twice.py" --output "component_used_twice.json" diff --git a/backend/src/v2/compiler/testdata/exit_handler.json b/backend/src/v2/compiler/testdata/exit_handler.json new file mode 100644 index 00000000000..becf09f390e --- /dev/null +++ b/backend/src/v2/compiler/testdata/exit_handler.json @@ -0,0 +1,218 @@ +{ + "pipelineSpec": { + "components": { + "comp-exit-handler-1": { + "dag": { + "tasks": { + "fail-op": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-fail-op" + }, + "inputs": { + "parameters": { + "message": { + "runtimeValue": { + "constantValue": { + "stringValue": "Task failed." + } + } + } + } + }, + "taskInfo": { + "name": "fail-op" + } + }, + "print-op-2": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op-2" + }, + "inputs": { + "parameters": { + "message": { + "componentInputParameter": "pipelineparam--message" + } + } + }, + "taskInfo": { + "name": "print-op-2" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "pipelineparam--message": { + "type": "STRING" + } + } + } + }, + "comp-fail-op": { + "executorLabel": "exec-fail-op", + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + }, + "comp-print-op": { + "executorLabel": "exec-print-op", + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + }, + "comp-print-op-2": { + "executorLabel": "exec-print-op-2", + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + } + }, + "deploymentSpec": { + "executors": { + "exec-fail-op": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "fail_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.22' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef fail_op(message: str):\n \"\"\"Fails.\"\"\"\n import sys\n print(message)\n sys.exit(1)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.22' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef print_op(message: str):\n \"\"\"Prints a message.\"\"\"\n print(message)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op-2": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.22' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef print_op(message: str):\n \"\"\"Prints a message.\"\"\"\n print(message)\n\n" + ], + "image": "python:3.7" + } + } + } + }, + "pipelineInfo": { + "name": "pipeline-with-exit-handler" + }, + "root": { + "dag": { + "tasks": { + "exit-handler-1": { + "componentRef": { + "name": "comp-exit-handler-1" + }, + "inputs": { + "parameters": { + "pipelineparam--message": { + "componentInputParameter": "message" + } + } + }, + "taskInfo": { + "name": "exit-handler-1" + } + }, + "print-op": { + "componentRef": { + "name": "comp-print-op" + }, + "dependentTasks": [ + "exit-handler-1" + ], + "inputs": { + "parameters": { + "message": { + "runtimeValue": { + "constantValue": { + "stringValue": "Exit handler has worked!" + } + } + } + } + }, + "taskInfo": { + "name": "print-op" + }, + "triggerPolicy": { + "strategy": "ALL_UPSTREAM_TASKS_COMPLETED" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + }, + "schemaVersion": "2.0.0", + "sdkVersion": "kfp-1.8.22" + }, + "runtimeConfig": { + "parameters": { + "message": { + "stringValue": "Hello World!" + } + } + } +} \ No newline at end of file diff --git a/test/sample-test/run_sample_test.py b/test/sample-test/run_sample_test.py index 2c9410fd148..65841fe6d40 100644 --- a/test/sample-test/run_sample_test.py +++ b/test/sample-test/run_sample_test.py @@ -36,7 +36,8 @@ def __init__(self, result, experiment_name, host, - namespace='kubeflow'): + namespace='kubeflow', + expected_result='succeeded'): """Util class for checking python sample test running results. :param testname: test name. @@ -45,6 +46,7 @@ def __init__(self, :param result: The path of the test result that will be exported. :param host: The hostname of KFP API endpoint. :param namespace: namespace of the deployed pipeline system. Default: kubeflow + :param expected_result: the expected status for the run, default is succeeded. :param experiment_name: Name of the experiment to monitor """ self._testname = testname @@ -65,6 +67,7 @@ def __init__(self, self._job_name = None self._test_args = None self._run_id = None + self._expected_result = expected_result def run(self): """Run compiled KFP pipeline.""" @@ -154,7 +157,7 @@ def check(self): ###### Monitor Job ###### start_time = datetime.now() response = self._client.wait_for_run_completion(self._run_id, self._test_timeout) - succ = (response.state.lower() == 'succeeded') + succ = (response.state.lower() == self._expected_result) end_time = datetime.now() elapsed_time = (end_time - start_time).seconds utils.add_junit_test(self._test_cases, 'job completion', succ, diff --git a/test/sample-test/sample_test_launcher.py b/test/sample-test/sample_test_launcher.py index dabbc790921..276c5c8d28b 100644 --- a/test/sample-test/sample_test_launcher.py +++ b/test/sample-test/sample_test_launcher.py @@ -42,7 +42,8 @@ def __init__(self, results_gcs_dir, host='', target_image_prefix='', - namespace='kubeflow'): + namespace='kubeflow', + expected_result='succeeded'): """Launch a KFP sample_test provided its name. :param test_name: name of the corresponding sample test. @@ -50,6 +51,7 @@ def __init__(self, :param host: host of KFP API endpoint, default is auto-discovery from inverse-proxy-config. :param target_image_prefix: prefix of docker image, default is empty. :param namespace: namespace for kfp, default is kubeflow. + :param expected_result: the expected status for the run, default is succeeded. """ self._test_name = test_name self._results_gcs_dir = results_gcs_dir @@ -81,6 +83,7 @@ def __init__(self, self._sample_test_result = 'junit_Sample%sOutput.xml' % self._test_name self._sample_test_output = self._results_gcs_dir + self._expected_result = expected_result def _compile(self): @@ -210,6 +213,7 @@ def run_test(self): host=self._host, namespace=self._namespace, experiment_name=experiment_name, + expected_result=self._expected_result, ) pysample_checker.run() pysample_checker.check()