diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index a5cfed5faef..1f1c19ed3ec 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -15,12 +15,16 @@ package argocompiler import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" "fmt" "strings" wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" "github.com/kubeflow/pipelines/backend/src/v2/compiler" + log "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" k8score "k8s.io/api/core/v1" @@ -63,7 +67,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S if err != nil { return nil, err } - // fill root component default paramters to PipelineJob + // fill root component default parameters to PipelineJob specParams := spec.GetRoot().GetInputDefinitions().GetParameters() for name, param := range specParams { _, ok := job.RuntimeConfig.ParameterValues[name] @@ -108,6 +112,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S "pipelines.kubeflow.org/v2_component": "true", }, }, + Arguments: wfapi.Arguments{ + Parameters: []wfapi.Parameter{}, + }, ServiceAccountName: "pipeline-runner", Entrypoint: tmplEntrypoint, }, @@ -180,69 +187,134 @@ func (c *workflowCompiler) templateName(componentName string) string { return componentName } -// WIP: store component spec, task spec and executor spec in annotations - const ( - annotationComponents = "pipelines.kubeflow.org/components-" - annotationContainers = "pipelines.kubeflow.org/implementations-" - annotationKubernetesSpec = "pipelines.kubeflow.org/kubernetes-" + argumentsComponents = "components-" + argumentsContainers = "implementations-" + argumentsKubernetesSpec = "kubernetes-" ) func (c *workflowCompiler) saveComponentSpec(name string, spec *pipelinespec.ComponentSpec) error { - return c.saveProtoToAnnotation(annotationComponents+name, spec) + hashedComponent := c.hashComponentContainer(name) + + return c.saveProtoToArguments(argumentsComponents+hashedComponent, spec) } // useComponentSpec returns a placeholder we can refer to the component spec // in argo workflow fields. func (c *workflowCompiler) useComponentSpec(name string) (string, error) { - return c.annotationPlaceholder(annotationComponents + name) + hashedComponent := c.hashComponentContainer(name) + + return c.argumentsPlaceholder(argumentsComponents + hashedComponent) } func (c *workflowCompiler) saveComponentImpl(name string, msg proto.Message) error { - return c.saveProtoToAnnotation(annotationContainers+name, msg) + hashedComponent := c.hashComponentContainer(name) + + return c.saveProtoToArguments(argumentsContainers+hashedComponent, msg) } func (c *workflowCompiler) useComponentImpl(name string) (string, error) { - return c.annotationPlaceholder(annotationContainers + name) + hashedComponent := c.hashComponentContainer(name) + + return c.argumentsPlaceholder(argumentsContainers + hashedComponent) } func (c *workflowCompiler) saveKubernetesSpec(name string, spec *structpb.Struct) error { - return c.saveProtoToAnnotation(annotationKubernetesSpec+name, spec) + return c.saveProtoToArguments(argumentsKubernetesSpec+name, spec) } func (c *workflowCompiler) useKubernetesImpl(name string) (string, error) { - return c.annotationPlaceholder(annotationKubernetesSpec + name) + return c.argumentsPlaceholder(argumentsKubernetesSpec + name) } -// TODO(Bobgy): sanitize component name -func (c *workflowCompiler) saveProtoToAnnotation(name string, msg proto.Message) error { +// saveProtoToArguments saves a proto message to the workflow arguments. The +// message is serialized to JSON and stored in the workflow arguments and then +// referenced by the workflow templates using AWF templating syntax. The reason +// for storing it in the workflow arguments is because there is a 1-many +// relationship between components and tasks that reference them. The workflow +// arguments allow us to deduplicate the component logic (implementation & spec +// in IR), significantly reducing the size of the argo workflow manifest. +func (c *workflowCompiler) saveProtoToArguments(componentName string, msg proto.Message) error { if c == nil { return fmt.Errorf("compiler is nil") } - if c.wf.Annotations == nil { - c.wf.Annotations = make(map[string]string) + if c.wf.Spec.Arguments.Parameters == nil { + c.wf.Spec.Arguments = wfapi.Arguments{Parameters: []wfapi.Parameter{}} } - if _, alreadyExists := c.wf.Annotations[name]; alreadyExists { - return fmt.Errorf("annotation %q already exists", name) + if c.wf.Spec.Arguments.GetParameterByName(componentName) != nil { + return nil } json, err := stablyMarshalJSON(msg) if err != nil { - return fmt.Errorf("saving component spec of %q to annotations: %w", name, err) + return fmt.Errorf("saving component spec of %q to arguments: %w", componentName, err) } - // TODO(Bobgy): verify name adheres to Kubernetes annotation restrictions: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set - c.wf.Annotations[name] = json + c.wf.Spec.Arguments.Parameters = append(c.wf.Spec.Arguments.Parameters, wfapi.Parameter{ + Name: componentName, + Value: wfapi.AnyStringPtr(json), + }) return nil } -func (c *workflowCompiler) annotationPlaceholder(name string) (string, error) { +// argumentsPlaceholder checks for the unique component name within the workflow +// arguments and returns a template tag that references the component in the +// workflow arguments. +func (c *workflowCompiler) argumentsPlaceholder(componentName string) (string, error) { if c == nil { return "", fmt.Errorf("compiler is nil") } - if _, exists := c.wf.Annotations[name]; !exists { - return "", fmt.Errorf("using component spec: failed to find annotation %q", name) + if c.wf.Spec.Arguments.GetParameterByName(componentName) == nil { + return "", fmt.Errorf("using component spec: failed to find workflow parameter %q", componentName) + } + + return workflowParameter(componentName), nil +} + +// hashComponentContainer serializes and hashes the container field of a given +// component. +func (c *workflowCompiler) hashComponentContainer(componentName string) string { + log.Debug("componentName: ", componentName) + // Return early for root component since it has no command and args. + if componentName == "root" { + return componentName } - // Reference: https://argoproj.github.io/argo-workflows/variables/ - return fmt.Sprintf("{{workflow.annotations.%s}}", name), nil + if c.executors != nil { // Don't bother if there are no executors in the pipeline spec. + // Look up the executorLabel for the component in question. + executorLabel := c.spec.Components[componentName].GetExecutorLabel() + log.Debug("executorLabel: ", executorLabel) + // Iterate through the list of executors. + for executorName, executorValue := range c.executors { + log.Debug("executorName: ", executorName) + // If one of them matches the executorLabel we extracted earlier... + if executorName == executorLabel { + // Get the corresponding container. + container := executorValue.GetContainer() + if container != nil { + containerHash, err := hashValue(container) + if err != nil { + // Do not bubble up since this is not a breaking error + // and we can just return the componentName in full. + log.Debug("Error hashing container: ", err) + } + + return containerHash + } + } + } + } + + return componentName +} + +// hashValue serializes and hashes a provided value. +func hashValue(value interface{}) (string, error) { + bytes, err := json.Marshal(value) + if err != nil { + return "", err + } + h := sha256.New() + h.Write([]byte(bytes)) + + return hex.EncodeToString(h.Sum(nil)), nil } const ( diff --git a/backend/src/v2/compiler/argocompiler/container.go b/backend/src/v2/compiler/argocompiler/container.go index 14ed4f70679..b04adc58f8a 100644 --- a/backend/src/v2/compiler/argocompiler/container.go +++ b/backend/src/v2/compiler/argocompiler/container.go @@ -358,9 +358,11 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string { }, } // Update pod metadata if it defined in the Kubernetes Spec - if kubernetesConfigString, ok := c.wf.Annotations[annotationKubernetesSpec+refName]; ok { + kubernetesConfigParam := c.wf.Spec.Arguments.GetParameterByName(argumentsKubernetesSpec + refName) + + if kubernetesConfigParam != nil { k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{} - if err := jsonpb.UnmarshalString(kubernetesConfigString, k8sExecCfg); err == nil { + if err := jsonpb.UnmarshalString(string(*kubernetesConfigParam.Value), k8sExecCfg); err == nil { extendPodMetadata(&executor.Metadata, k8sExecCfg) } } diff --git a/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml b/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml index e3b427d2455..7a5565595ed 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml @@ -1,36 +1,36 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/components-comp-comp: '{"executorLabel":"exec-comp"}' - pipelines.kubeflow.org/components-comp-comp-2: '{"executorLabel":"exec-comp-2"}' - pipelines.kubeflow.org/components-comp-createpvc: '{"executorLabel":"exec-createpvc","inputDefinitions":{"parameters":{"access_modes":{"parameterType":"LIST"},"annotations":{"isOptional":true,"parameterType":"STRUCT"},"pvc_name":{"isOptional":true,"parameterType":"STRING"},"pvc_name_suffix":{"isOptional":true,"parameterType":"STRING"},"size":{"parameterType":"STRING"},"storage_class_name":{"defaultValue":"","isOptional":true,"parameterType":"STRING"},"volume_name":{"isOptional":true,"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"name":{"parameterType":"STRING"}}}}' - pipelines.kubeflow.org/components-comp-deletepvc: '{"executorLabel":"exec-deletepvc","inputDefinitions":{"parameters":{"pvc_name":{"parameterType":"STRING"}}}}' - pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"comp":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp"},"dependentTasks":["createpvc"],"taskInfo":{"name":"comp"}},"comp-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp-2"},"dependentTasks":["comp","createpvc"],"taskInfo":{"name":"comp-2"}},"createpvc":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-createpvc"},"inputs":{"parameters":{"access_modes":{"runtimeValue":{"constant":["ReadWriteOnce"]}},"pvc_name_suffix":{"runtimeValue":{"constant":"-my-pvc"}},"size":{"runtimeValue":{"constant":"5Gi"}},"storage_class_name":{"runtimeValue":{"constant":"standard"}}}},"taskInfo":{"name":"createpvc"}},"deletepvc":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-deletepvc"},"dependentTasks":["comp-2","createpvc"],"inputs":{"parameters":{"pvc_name":{"taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}}},"taskInfo":{"name":"deletepvc"}}}}}' - pipelines.kubeflow.org/implementations-comp-comp: '{"args":["--executor_input","{{$}}","--function_to_execute","comp"],"command":["sh","-c","\nif - ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m - ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 - python3 -m pip install --quiet --no-warn-script-location ''kfp==2.0.0-beta.16'' - \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d) printf \"%s\" - \"$0\" \u003e \"$program_path/ephemeral_component.py\" python3 -m kfp.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\" - ","\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import - *\n\ndef comp():\n pass\n\n"],"image":"python:3.7"}' - pipelines.kubeflow.org/implementations-comp-comp-2: '{"args":["--executor_input","{{$}}","--function_to_execute","comp"],"command":["sh","-c","\nif - ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m - ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 - python3 -m pip install --quiet --no-warn-script-location ''kfp==2.0.0-beta.16'' - \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d) printf \"%s\" - \"$0\" \u003e \"$program_path/ephemeral_component.py\" python3 -m kfp.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\" - ","\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import - *\n\ndef comp():\n pass\n\n"],"image":"python:3.7"}' - pipelines.kubeflow.org/implementations-comp-createpvc: '{"image":"argostub/createpvc"}' - pipelines.kubeflow.org/implementations-comp-deletepvc: '{"image":"argostub/deletepvc"}' - pipelines.kubeflow.org/kubernetes-comp-comp: '{"pvcMount":[{"mountPath":"/data","taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}]}' - pipelines.kubeflow.org/kubernetes-comp-comp-2: '{"pvcMount":[{"mountPath":"/reused_data","taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}]}' creationTimestamp: null generateName: my-pipeline- spec: - arguments: {} + arguments: + parameters: + - name: kubernetes-comp-comp + value: '{"pvcMount":[{"mountPath":"/data","taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}]}' + - name: components-95f802401136aebf1bf728a6675d7adba5513b53673a3698e00a6d8744638080 + value: '{"executorLabel":"exec-comp"}' + - name: implementations-95f802401136aebf1bf728a6675d7adba5513b53673a3698e00a6d8744638080 + value: '{"args":["--executor_input","{{$}}","--function_to_execute","comp"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.0.0-beta.16'' + \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d) printf + \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\" python3 -m kfp.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\" + ","\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import + *\n\ndef comp():\n pass\n\n"],"image":"python:3.7"}' + - name: kubernetes-comp-comp-2 + value: '{"pvcMount":[{"mountPath":"/reused_data","taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}]}' + - name: components-98f254581598234b59377784d6cbf209de79e0bcda8013fe4c4397b5d3a26767 + value: '{"executorLabel":"exec-createpvc","inputDefinitions":{"parameters":{"access_modes":{"parameterType":"LIST"},"annotations":{"isOptional":true,"parameterType":"STRUCT"},"pvc_name":{"isOptional":true,"parameterType":"STRING"},"pvc_name_suffix":{"isOptional":true,"parameterType":"STRING"},"size":{"parameterType":"STRING"},"storage_class_name":{"defaultValue":"","isOptional":true,"parameterType":"STRING"},"volume_name":{"isOptional":true,"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"name":{"parameterType":"STRING"}}}}' + - name: implementations-98f254581598234b59377784d6cbf209de79e0bcda8013fe4c4397b5d3a26767 + value: '{"image":"argostub/createpvc"}' + - name: components-ecfc655dce17b0d317707d37fc226fb7de858cc93d45916945122484a13ef725 + value: '{"executorLabel":"exec-deletepvc","inputDefinitions":{"parameters":{"pvc_name":{"parameterType":"STRING"}}}}' + - name: implementations-ecfc655dce17b0d317707d37fc226fb7de858cc93d45916945122484a13ef725 + value: '{"image":"argostub/deletepvc"}' + - name: components-root + value: '{"dag":{"tasks":{"comp":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp"},"dependentTasks":["createpvc"],"taskInfo":{"name":"comp"}},"comp-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp-2"},"dependentTasks":["comp","createpvc"],"taskInfo":{"name":"comp-2"}},"createpvc":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-createpvc"},"inputs":{"parameters":{"access_modes":{"runtimeValue":{"constant":["ReadWriteOnce"]}},"pvc_name_suffix":{"runtimeValue":{"constant":"-my-pvc"}},"size":{"runtimeValue":{"constant":"5Gi"}},"storage_class_name":{"runtimeValue":{"constant":"standard"}}}},"taskInfo":{"name":"createpvc"}},"deletepvc":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-deletepvc"},"dependentTasks":["comp-2","createpvc"],"inputs":{"parameters":{"pvc_name":{"taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}}},"taskInfo":{"name":"deletepvc"}}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -180,32 +180,32 @@ spec: volumes: - emptyDir: {} name: kfp-launcher - - emptyDir: { } + - emptyDir: {} name: gcs-scratch - - emptyDir: { } + - emptyDir: {} name: s3-scratch - - emptyDir: { } + - emptyDir: {} name: minio-scratch - - emptyDir: { } + - emptyDir: {} name: dot-local-scratch - - emptyDir: { } + - emptyDir: {} name: dot-cache-scratch - - emptyDir: { } + - emptyDir: {} name: dot-config-scratch - dag: tasks: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-comp}}' + value: '{{workflow.parameters.components-95f802401136aebf1bf728a6675d7adba5513b53673a3698e00a6d8744638080}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp"},"dependentTasks":["createpvc"],"taskInfo":{"name":"comp"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-comp}}' + value: '{{workflow.parameters.implementations-95f802401136aebf1bf728a6675d7adba5513b53673a3698e00a6d8744638080}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' - name: kubernetes-config - value: '{{workflow.annotations.pipelines.kubeflow.org/kubernetes-comp-comp}}' + value: '{{workflow.parameters.kubernetes-comp-comp}}' depends: createpvc.Succeeded name: comp-driver template: system-container-driver @@ -222,15 +222,15 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-comp-2}}' + value: '{{workflow.parameters.components-95f802401136aebf1bf728a6675d7adba5513b53673a3698e00a6d8744638080}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp-2"},"dependentTasks":["comp","createpvc"],"taskInfo":{"name":"comp-2"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-comp-2}}' + value: '{{workflow.parameters.implementations-95f802401136aebf1bf728a6675d7adba5513b53673a3698e00a6d8744638080}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' - name: kubernetes-config - value: '{{workflow.annotations.pipelines.kubeflow.org/kubernetes-comp-comp-2}}' + value: '{{workflow.parameters.kubernetes-comp-comp-2}}' depends: comp.Succeeded && createpvc.Succeeded name: comp-2-driver template: system-container-driver @@ -247,11 +247,11 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-createpvc}}' + value: '{{workflow.parameters.components-98f254581598234b59377784d6cbf209de79e0bcda8013fe4c4397b5d3a26767}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-createpvc"},"inputs":{"parameters":{"access_modes":{"runtimeValue":{"constant":["ReadWriteOnce"]}},"pvc_name_suffix":{"runtimeValue":{"constant":"-my-pvc"}},"size":{"runtimeValue":{"constant":"5Gi"}},"storage_class_name":{"runtimeValue":{"constant":"standard"}}}},"taskInfo":{"name":"createpvc"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-createpvc}}' + value: '{{workflow.parameters.implementations-98f254581598234b59377784d6cbf209de79e0bcda8013fe4c4397b5d3a26767}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' name: createpvc @@ -259,11 +259,11 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-deletepvc}}' + value: '{{workflow.parameters.components-ecfc655dce17b0d317707d37fc226fb7de858cc93d45916945122484a13ef725}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-deletepvc"},"dependentTasks":["comp-2","createpvc"],"inputs":{"parameters":{"pvc_name":{"taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}}},"taskInfo":{"name":"deletepvc"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-deletepvc}}' + value: '{{workflow.parameters.implementations-ecfc655dce17b0d317707d37fc226fb7de858cc93d45916945122484a13ef725}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' depends: comp-2.Succeeded && createpvc.Succeeded @@ -343,7 +343,7 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-root}}' + value: '{{workflow.parameters.components-root}}' - name: runtime-config value: '{}' - name: driver-type diff --git a/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml b/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml index 450aed6aeca..67308ba33dd 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml @@ -1,20 +1,24 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/components-comp-hello-world: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' - pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' - pipelines.kubeflow.org/implementations-comp-hello-world: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf - \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def - hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser - = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", - dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args - = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.7"}' - pipelines.kubeflow.org/kubernetes-comp-hello-world: '{"podMetadata":{"annotations":{"experiment_id":"234567","run_id":"123456"},"labels":{"kubeflow.com/common":"test","kubeflow.com/kfp":"pipeline-node"}}}' creationTimestamp: null generateName: hello-world- spec: - arguments: {} + arguments: + parameters: + - name: kubernetes-comp-hello-world + value: '{"podMetadata":{"annotations":{"experiment_id":"234567","run_id":"123456"},"labels":{"kubeflow.com/common":"test","kubeflow.com/kfp":"pipeline-node"}}}' + - name: components-34e222d692a0573c88000b8cb02ad24423491a53e061e9bba36d3718dd4c3390 + value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + - name: implementations-34e222d692a0573c88000b8cb02ad24423491a53e061e9bba36d3718dd4c3390 + value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf + \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def + hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", + dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.7"}' + - name: components-root + value: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -124,20 +128,20 @@ spec: name: "" resources: {} volumeMounts: - - mountPath: /kfp-launcher - name: kfp-launcher - - mountPath: /gcs - name: gcs-scratch - - mountPath: /s3 - name: s3-scratch - - mountPath: /minio - name: minio-scratch - - mountPath: /.local - name: dot-local-scratch - - mountPath: /.cache - name: dot-cache-scratch - - mountPath: /.config - name: dot-config-scratch + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch initContainers: - command: - launcher-v2 @@ -168,34 +172,34 @@ spec: outputs: {} podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' volumes: - - emptyDir: {} - name: kfp-launcher - - emptyDir: { } - name: gcs-scratch - - emptyDir: { } - name: s3-scratch - - emptyDir: { } - name: minio-scratch - - emptyDir: { } - name: dot-local-scratch - - emptyDir: { } - name: dot-cache-scratch - - emptyDir: { } - name: dot-config-scratch + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch - dag: tasks: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-hello-world}}' + value: '{{workflow.parameters.components-34e222d692a0573c88000b8cb02ad24423491a53e061e9bba36d3718dd4c3390}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-hello-world}}' + value: '{{workflow.parameters.implementations-34e222d692a0573c88000b8cb02ad24423491a53e061e9bba36d3718dd4c3390}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' - name: kubernetes-config - value: '{{workflow.annotations.pipelines.kubeflow.org/kubernetes-comp-hello-world}}' + value: '{{workflow.parameters.kubernetes-comp-hello-world}}' name: hello-world-driver template: system-container-driver - arguments: @@ -282,7 +286,7 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-root}}' + value: '{{workflow.parameters.components-root}}' - name: runtime-config value: '{"parameters":{"text":{"stringValue":"hi there"}}}' - name: driver-type diff --git a/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml b/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml index 5685ece5de5..6bc59366d8d 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml @@ -1,19 +1,22 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/components-comp-hello-world: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' - pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' - pipelines.kubeflow.org/implementations-comp-hello-world: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf - \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def - hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser - = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", - dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args - = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.7"}' creationTimestamp: null generateName: hello-world- spec: - arguments: {} + arguments: + parameters: + - name: components-34e222d692a0573c88000b8cb02ad24423491a53e061e9bba36d3718dd4c3390 + value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + - name: implementations-34e222d692a0573c88000b8cb02ad24423491a53e061e9bba36d3718dd4c3390 + value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf + \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def + hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", + dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.7"}' + - name: components-root + value: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -180,11 +183,11 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-hello-world}}' + value: '{{workflow.parameters.components-34e222d692a0573c88000b8cb02ad24423491a53e061e9bba36d3718dd4c3390}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}' - name: container - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-hello-world}}' + value: '{{workflow.parameters.implementations-34e222d692a0573c88000b8cb02ad24423491a53e061e9bba36d3718dd4c3390}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' name: hello-world-driver @@ -273,7 +276,7 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-root}}' + value: '{{workflow.parameters.components-root}}' - name: runtime-config value: '{"parameters":{"text":{"stringValue":"hi there"}}}' - name: driver-type diff --git a/backend/src/v2/compiler/argocompiler/testdata/importer.yaml b/backend/src/v2/compiler/argocompiler/testdata/importer.yaml index d0e6ef6eaae..23ec461c307 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/importer.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/importer.yaml @@ -1,14 +1,17 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - annotations: - pipelines.kubeflow.org/components-comp-importer: '{"executorLabel":"exec-importer","inputDefinitions":{"parameters":{"uri":{"type":"STRING"}}},"outputDefinitions":{"artifacts":{"artifact":{"artifactType":{"schemaTitle":"system.Dataset"}}}}}' - pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"importer":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}}}}},"taskInfo":{"name":"importer"}}}},"inputDefinitions":{"parameters":{"dataset2":{"type":"STRING"}}}}' - pipelines.kubeflow.org/implementations-comp-importer: '{"artifactUri":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}},"typeSchema":{"schemaTitle":"system.Dataset"}}' creationTimestamp: null generateName: pipeline-with-importer- spec: - arguments: {} + arguments: + parameters: + - name: components-comp-importer + value: '{"executorLabel":"exec-importer","inputDefinitions":{"parameters":{"uri":{"type":"STRING"}}},"outputDefinitions":{"artifacts":{"artifact":{"artifactType":{"schemaTitle":"system.Dataset"}}}}}' + - name: implementations-comp-importer + value: '{"artifactUri":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}},"typeSchema":{"schemaTitle":"system.Dataset"}}' + - name: components-root + value: '{"dag":{"tasks":{"importer":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}}}}},"taskInfo":{"name":"importer"}}}},"inputDefinitions":{"parameters":{"dataset2":{"type":"STRING"}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -81,9 +84,9 @@ spec: - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}}}}},"taskInfo":{"name":"importer"}}' - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-importer}}' + value: '{{workflow.parameters.components-comp-importer}}' - name: importer - value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-importer}}' + value: '{{workflow.parameters.implementations-comp-importer}}' - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' name: importer @@ -162,7 +165,7 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.annotations.pipelines.kubeflow.org/components-root}}' + value: '{{workflow.parameters.components-root}}' - name: runtime-config value: '{}' - name: driver-type