diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 72f7864bf91..77ece32b911 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -1,6 +1,7 @@ # Current Version (in development) ## Features +* Support dynamic machine type parameters in CustomTrainingJobOp. [\#10883](https://github.com/kubeflow/pipelines/pull/10883) ## Breaking changes * Drop support for Python 3.7 since it has reached end-of-life. [\#10750](https://github.com/kubeflow/pipelines/pull/10750) diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index 029b93c802e..421bef1ad2c 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -772,3 +772,34 @@ def get_dependencies( dependencies[downstream_names[0]].add(upstream_names[0]) return dependencies + + +def recursive_replace_placeholders(data: Union[Dict, List], old_value: str, + new_value: str) -> Union[Dict, List]: + """Recursively replaces values in a nested dict/list object. + + This method is used to replace PipelineChannel objects with input parameter + placeholders in a nested object like worker_pool_specs for custom jobs. + + Args: + data: A nested object that can contain dictionaries and/or lists. + old_value: The value that will be replaced. + new_value: The value to replace the old value with. + + Returns: + A copy of data with all occurences of old_value replaced by new_value. + """ + if isinstance(data, dict): + return { + k: recursive_replace_placeholders(v, old_value, new_value) + for k, v in data.items() + } + elif isinstance(data, list): + return [ + recursive_replace_placeholders(i, old_value, new_value) + for i in data + ] + else: + if isinstance(data, pipeline_channel.PipelineChannel): + data = str(data) + return new_value if data == old_value else data diff --git a/sdk/python/kfp/compiler/compiler_utils_test.py b/sdk/python/kfp/compiler/compiler_utils_test.py index ec20833b30f..4763bf965d4 100644 --- a/sdk/python/kfp/compiler/compiler_utils_test.py +++ b/sdk/python/kfp/compiler/compiler_utils_test.py @@ -53,6 +53,67 @@ def test_additional_input_name_for_pipeline_channel(self, channel, expected, compiler_utils.additional_input_name_for_pipeline_channel(channel)) + @parameterized.parameters( + { + 'data': [{ + 'container_spec': { + 'image_uri': + 'gcr.io/ml-pipeline/google-cloud-pipeline-components:2.5.0', + 'command': ['echo'], + 'args': ['foo'] + }, + 'machine_spec': { + 'machine_type': + pipeline_channel.PipelineParameterChannel( + name='Output', + channel_type='String', + task_name='machine-type'), + 'accelerator_type': + pipeline_channel.PipelineParameterChannel( + name='Output', + channel_type='String', + task_name='accelerator-type'), + 'accelerator_count': + 1 + }, + 'replica_count': 1 + }], + 'old_value': + '{{channel:task=machine-type;name=Output;type=String;}}', + 'new_value': + '{{$.inputs.parameters[' + 'pipelinechannel--machine-type-Output' + ']}}', + 'expected': [{ + 'container_spec': { + 'image_uri': + 'gcr.io/ml-pipeline/google-cloud-pipeline-components:2.5.0', + 'command': ['echo'], + 'args': ['foo'] + }, + 'machine_spec': { + 'machine_type': + '{{$.inputs.parameters[' + 'pipelinechannel--machine-type-Output' + ']}}', + 'accelerator_type': + pipeline_channel.PipelineParameterChannel( + name='Output', + channel_type='String', + task_name='accelerator-type'), + 'accelerator_count': + 1 + }, + 'replica_count': 1 + }], + },) + def test_recursive_replace_placeholders(self, data, old_value, new_value, + expected): + self.assertEqual( + expected, + compiler_utils.recursive_replace_placeholders( + data, old_value, new_value)) + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 86e446673e2..6d2a0cfa9d2 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -238,12 +238,14 @@ def build_task_spec_for_task( input_name].component_input_parameter = ( component_input_parameter) - elif isinstance(input_value, str): - # Handle extra input due to string concat + elif isinstance(input_value, (str, int, float, bool, dict, list)): pipeline_channels = ( pipeline_channel.extract_pipeline_channels_from_any(input_value) ) for channel in pipeline_channels: + # NOTE: case like this p3 = print_and_return_str(s='Project = {}'.format(project)) + # triggers this code + # value contains PipelineChannel placeholders which needs to be # replaced. And the input needs to be added to the task spec. @@ -265,8 +267,14 @@ def build_task_spec_for_task( additional_input_placeholder = placeholders.InputValuePlaceholder( additional_input_name)._to_string() - input_value = input_value.replace(channel.pattern, - additional_input_placeholder) + + if isinstance(input_value, str): + input_value = input_value.replace( + channel.pattern, additional_input_placeholder) + else: + input_value = compiler_utils.recursive_replace_placeholders( + input_value, channel.pattern, + additional_input_placeholder) if channel.task_name: # Value is produced by an upstream task. @@ -299,11 +307,6 @@ def build_task_spec_for_task( additional_input_name].component_input_parameter = ( component_input_parameter) - pipeline_task_spec.inputs.parameters[ - input_name].runtime_value.constant.string_value = input_value - - elif isinstance(input_value, (str, int, float, bool, dict, list)): - pipeline_task_spec.inputs.parameters[ input_name].runtime_value.constant.CopyFrom( to_protobuf_value(input_value)) diff --git a/sdk/python/kfp/dsl/pipeline_channel.py b/sdk/python/kfp/dsl/pipeline_channel.py index 4731030709f..ed1ab0fdb6d 100644 --- a/sdk/python/kfp/dsl/pipeline_channel.py +++ b/sdk/python/kfp/dsl/pipeline_channel.py @@ -586,7 +586,7 @@ def extract_pipeline_channels_from_string( def extract_pipeline_channels_from_any( - payload: Union[PipelineChannel, str, list, tuple, dict] + payload: Union[PipelineChannel, str, int, float, bool, list, tuple, dict] ) -> List[PipelineChannel]: """Recursively extract PipelineChannels from any object or list of objects. diff --git a/sdk/python/test_data/pipelines/pipeline_with_condition_dynamic_task_output_custom_training_job.py b/sdk/python/test_data/pipelines/pipeline_with_condition_dynamic_task_output_custom_training_job.py new file mode 100644 index 00000000000..d112232478f --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_condition_dynamic_task_output_custom_training_job.py @@ -0,0 +1,64 @@ +import google_cloud_pipeline_components.v1.custom_job as custom_job +from kfp import dsl + + +@dsl.component +def flip_biased_coin_op() -> str: + """Flip a coin and output heads.""" + return 'heads' + + +@dsl.component +def machine_type() -> str: + return 'n1-standard-4' + + +@dsl.component +def accelerator_type() -> str: + return 'NVIDIA_TESLA_P4' + + +@dsl.component +def accelerator_count() -> int: + return 1 + + +@dsl.pipeline +def pipeline( + project: str, + location: str, + encryption_spec_key_name: str = '', +): + flip1 = flip_biased_coin_op().set_caching_options(False) + machine_type_task = machine_type() + accelerator_type_task = accelerator_type() + accelerator_count_task = accelerator_count() + + with dsl.Condition(flip1.output == 'heads'): + custom_job.CustomTrainingJobOp( + display_name='add-numbers', + worker_pool_specs=[{ + 'container_spec': { + 'image_uri': ( + 'gcr.io/ml-pipeline/google-cloud-pipeline-components:2.5.0' + ), + 'command': ['echo'], + 'args': ['foo'], + }, + 'machine_spec': { + 'machine_type': machine_type_task.output, + 'accelerator_type': accelerator_type_task.output, + 'accelerator_count': accelerator_count_task.output, + }, + 'replica_count': 1, + }], + project=project, + location=location, + encryption_spec_key_name=encryption_spec_key_name, + ) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pipeline_with_condition_dynamic_task_output_custom_training_job.yaml b/sdk/python/test_data/pipelines/pipeline_with_condition_dynamic_task_output_custom_training_job.yaml new file mode 100644 index 00000000000..9c233c63dde --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_condition_dynamic_task_output_custom_training_job.yaml @@ -0,0 +1,419 @@ +# PIPELINE DEFINITION +# Name: pipeline +# Inputs: +# encryption_spec_key_name: str [Default: ''] +# location: str +# project: str +components: + comp-accelerator-count: + executorLabel: exec-accelerator-count + outputDefinitions: + parameters: + Output: + parameterType: NUMBER_INTEGER + comp-accelerator-type: + executorLabel: exec-accelerator-type + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-condition-1: + dag: + tasks: + custom-training-job: + cachingOptions: + enableCache: true + componentRef: + name: comp-custom-training-job + inputs: + parameters: + display_name: + runtimeValue: + constant: add-numbers + encryption_spec_key_name: + componentInputParameter: pipelinechannel--encryption_spec_key_name + location: + componentInputParameter: pipelinechannel--location + pipelinechannel--accelerator-count-Output: + componentInputParameter: pipelinechannel--accelerator-count-Output + pipelinechannel--accelerator-type-Output: + componentInputParameter: pipelinechannel--accelerator-type-Output + pipelinechannel--machine-type-Output: + componentInputParameter: pipelinechannel--machine-type-Output + project: + componentInputParameter: pipelinechannel--project + worker_pool_specs: + runtimeValue: + constant: + - container_spec: + args: + - foo + command: + - echo + image_uri: gcr.io/ml-pipeline/google-cloud-pipeline-components:2.5.0 + machine_spec: + accelerator_count: '{{$.inputs.parameters[''pipelinechannel--accelerator-count-Output'']}}' + accelerator_type: '{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}' + machine_type: '{{$.inputs.parameters[''pipelinechannel--machine-type-Output'']}}' + replica_count: 1.0 + taskInfo: + name: custom-training-job + inputDefinitions: + parameters: + pipelinechannel--accelerator-count-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--accelerator-type-Output: + parameterType: STRING + pipelinechannel--encryption_spec_key_name: + parameterType: STRING + pipelinechannel--flip-biased-coin-op-Output: + parameterType: STRING + pipelinechannel--location: + parameterType: STRING + pipelinechannel--machine-type-Output: + parameterType: STRING + pipelinechannel--project: + parameterType: STRING + comp-custom-training-job: + executorLabel: exec-custom-training-job + inputDefinitions: + parameters: + base_output_directory: + defaultValue: '' + description: The Cloud Storage location to store the output of this CustomJob + or HyperparameterTuningJob. See [more information ](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/GcsDestination). + isOptional: true + parameterType: STRING + display_name: + description: The name of the CustomJob. + parameterType: STRING + enable_web_access: + defaultValue: false + description: Whether you want Vertex AI to enable [interactive shell access + ](https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell) + to training containers. If `True`, you can access interactive shells at + the URIs given by [CustomJob.web_access_uris][]. + isOptional: true + parameterType: BOOLEAN + encryption_spec_key_name: + defaultValue: '' + description: Customer-managed encryption key options for the CustomJob. + If this is set, then all resources created by the CustomJob will be encrypted + with the provided encryption key. + isOptional: true + parameterType: STRING + labels: + defaultValue: {} + description: The labels with user-defined metadata to organize the CustomJob. + See [more information](https://goo.gl/xmQnxf). + isOptional: true + parameterType: STRUCT + location: + defaultValue: us-central1 + description: Location for creating the custom training job. If not set, + default to us-central1. + isOptional: true + parameterType: STRING + network: + defaultValue: '' + description: The full name of the Compute Engine network to which the job + should be peered. For example, `projects/12345/global/networks/myVPC`. + Format is of the form `projects/{project}/global/networks/{network}`. + Where `{project}` is a project number, as in `12345`, and `{network}` + is a network name. Private services access must already be configured + for the network. If left unspecified, the job is not peered with any network. + isOptional: true + parameterType: STRING + project: + defaultValue: '{{$.pipeline_google_cloud_project_id}}' + description: Project to create the custom training job in. Defaults to the + project in which the PipelineJob is run. + isOptional: true + parameterType: STRING + reserved_ip_ranges: + defaultValue: [] + description: A list of names for the reserved IP ranges under the VPC network + that can be used for this job. If set, we will deploy the job within the + provided IP ranges. Otherwise, the job will be deployed to any IP ranges + under the provided VPC network. + isOptional: true + parameterType: LIST + restart_job_on_worker_restart: + defaultValue: false + description: Restarts the entire CustomJob if a worker gets restarted. This + feature can be used by distributed training jobs that are not resilient + to workers leaving and joining a job. + isOptional: true + parameterType: BOOLEAN + service_account: + defaultValue: '' + description: Sets the default service account for workload run-as account. + The [service account ](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project#service-account) + running the pipeline submitting jobs must have act-as permission on this + run-as account. If unspecified, the Vertex AI Custom Code [Service Agent + ](https://cloud.google.com/vertex-ai/docs/general/access-control#service-agents) + for the CustomJob's project. + isOptional: true + parameterType: STRING + tensorboard: + defaultValue: '' + description: The name of a Vertex AI TensorBoard resource to which this + CustomJob will upload TensorBoard logs. + isOptional: true + parameterType: STRING + timeout: + defaultValue: 604800s + description: 'The maximum job running time. The default is 7 days. A duration + in seconds with up to nine fractional digits, terminated by ''s'', for + example: "3.5s".' + isOptional: true + parameterType: STRING + worker_pool_specs: + defaultValue: [] + description: Serialized json spec of the worker pools including machine + type and Docker image. All worker pools except the first one are optional + and can be skipped by providing an empty value. See [more information](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#WorkerPoolSpec). + isOptional: true + parameterType: LIST + outputDefinitions: + parameters: + gcp_resources: + description: Serialized JSON of `gcp_resources` [proto](https://github.com/kubeflow/pipelines/tree/master/components/google-cloud/google_cloud_pipeline_components/proto) + which tracks the CustomJob. + parameterType: STRING + comp-flip-biased-coin-op: + executorLabel: exec-flip-biased-coin-op + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-machine-type: + executorLabel: exec-machine-type + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-accelerator-count: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - accelerator_count + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef accelerator_count() -> int:\n return 1\n\n" + image: python:3.8 + exec-accelerator-type: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - accelerator_type + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef accelerator_type() -> str:\n return 'NVIDIA_TESLA_P4'\n\n" + image: python:3.8 + exec-custom-training-job: + container: + args: + - --type + - CustomJob + - --payload + - '{"display_name": "{{$.inputs.parameters[''display_name'']}}", "job_spec": + {"worker_pool_specs": {{$.inputs.parameters[''worker_pool_specs'']}}, "scheduling": + {"timeout": "{{$.inputs.parameters[''timeout'']}}", "restart_job_on_worker_restart": + {{$.inputs.parameters[''restart_job_on_worker_restart'']}}}, "service_account": + "{{$.inputs.parameters[''service_account'']}}", "tensorboard": "{{$.inputs.parameters[''tensorboard'']}}", + "enable_web_access": {{$.inputs.parameters[''enable_web_access'']}}, "network": + "{{$.inputs.parameters[''network'']}}", "reserved_ip_ranges": {{$.inputs.parameters[''reserved_ip_ranges'']}}, + "base_output_directory": {"output_uri_prefix": "{{$.inputs.parameters[''base_output_directory'']}}"}}, + "labels": {{$.inputs.parameters[''labels'']}}, "encryption_spec": {"kms_key_name": + "{{$.inputs.parameters[''encryption_spec_key_name'']}}"}}' + - --project + - '{{$.inputs.parameters[''project'']}}' + - --location + - '{{$.inputs.parameters[''location'']}}' + - --gcp_resources + - '{{$.outputs.parameters[''gcp_resources''].output_file}}' + command: + - python3 + - -u + - -m + - google_cloud_pipeline_components.container.v1.custom_job.launcher + image: gcr.io/ml-pipeline/google-cloud-pipeline-components:2.14.1 + exec-flip-biased-coin-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - flip_biased_coin_op + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef flip_biased_coin_op() -> str:\n \"\"\"Flip a coin and output\ + \ heads.\"\"\"\n return 'heads'\n\n" + image: python:3.8 + exec-machine-type: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - machine_type + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef machine_type() -> str:\n return 'n1-standard-4'\n\n" + image: python:3.8 +pipelineInfo: + name: pipeline +root: + dag: + tasks: + accelerator-count: + cachingOptions: + enableCache: true + componentRef: + name: comp-accelerator-count + taskInfo: + name: accelerator-count + accelerator-type: + cachingOptions: + enableCache: true + componentRef: + name: comp-accelerator-type + taskInfo: + name: accelerator-type + condition-1: + componentRef: + name: comp-condition-1 + dependentTasks: + - accelerator-count + - accelerator-type + - flip-biased-coin-op + - machine-type + inputs: + parameters: + pipelinechannel--accelerator-count-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: accelerator-count + pipelinechannel--accelerator-type-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: accelerator-type + pipelinechannel--encryption_spec_key_name: + componentInputParameter: encryption_spec_key_name + pipelinechannel--flip-biased-coin-op-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-biased-coin-op + pipelinechannel--location: + componentInputParameter: location + pipelinechannel--machine-type-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: machine-type + pipelinechannel--project: + componentInputParameter: project + taskInfo: + name: condition-1 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--flip-biased-coin-op-Output'] + == 'heads' + flip-biased-coin-op: + cachingOptions: {} + componentRef: + name: comp-flip-biased-coin-op + taskInfo: + name: flip-biased-coin-op + machine-type: + cachingOptions: + enableCache: true + componentRef: + name: comp-machine-type + taskInfo: + name: machine-type + inputDefinitions: + parameters: + encryption_spec_key_name: + defaultValue: '' + isOptional: true + parameterType: STRING + location: + parameterType: STRING + project: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.7.0 diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_pipeline_input_custom_training_job.py b/sdk/python/test_data/pipelines/pipeline_with_dynamic_pipeline_input_custom_training_job.py new file mode 100644 index 00000000000..b34bb8f6ed7 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_pipeline_input_custom_training_job.py @@ -0,0 +1,40 @@ +import google_cloud_pipeline_components.v1.custom_job as custom_job +from kfp import dsl + + +@dsl.pipeline +def pipeline( + project: str, + location: str, + machine_type: str, + accelerator_type: str, + accelerator_count: int, + encryption_spec_key_name: str = '', +): + custom_job.CustomTrainingJobOp( + display_name='add-numbers', + worker_pool_specs=[{ + 'container_spec': { + 'image_uri': + ('gcr.io/ml-pipeline/google-cloud-pipeline-components:2.5.0' + ), + 'command': ['echo'], + 'args': ['foo'], + }, + 'machine_spec': { + 'machine_type': machine_type, + 'accelerator_type': accelerator_type, + 'accelerator_count': accelerator_count, + }, + 'replica_count': 1, + }], + project=project, + location=location, + encryption_spec_key_name=encryption_spec_key_name, + ) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_pipeline_input_custom_training_job.yaml b/sdk/python/test_data/pipelines/pipeline_with_dynamic_pipeline_input_custom_training_job.yaml new file mode 100644 index 00000000000..2b70925a3ab --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_pipeline_input_custom_training_job.yaml @@ -0,0 +1,208 @@ +# PIPELINE DEFINITION +# Name: pipeline +# Inputs: +# accelerator_count: int +# accelerator_type: str +# encryption_spec_key_name: str [Default: ''] +# location: str +# machine_type: str +# project: str +components: + comp-custom-training-job: + executorLabel: exec-custom-training-job + inputDefinitions: + parameters: + base_output_directory: + defaultValue: '' + description: The Cloud Storage location to store the output of this CustomJob + or HyperparameterTuningJob. See [more information ](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/GcsDestination). + isOptional: true + parameterType: STRING + display_name: + description: The name of the CustomJob. + parameterType: STRING + enable_web_access: + defaultValue: false + description: Whether you want Vertex AI to enable [interactive shell access + ](https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell) + to training containers. If `True`, you can access interactive shells at + the URIs given by [CustomJob.web_access_uris][]. + isOptional: true + parameterType: BOOLEAN + encryption_spec_key_name: + defaultValue: '' + description: Customer-managed encryption key options for the CustomJob. + If this is set, then all resources created by the CustomJob will be encrypted + with the provided encryption key. + isOptional: true + parameterType: STRING + labels: + defaultValue: {} + description: The labels with user-defined metadata to organize the CustomJob. + See [more information](https://goo.gl/xmQnxf). + isOptional: true + parameterType: STRUCT + location: + defaultValue: us-central1 + description: Location for creating the custom training job. If not set, + default to us-central1. + isOptional: true + parameterType: STRING + network: + defaultValue: '' + description: The full name of the Compute Engine network to which the job + should be peered. For example, `projects/12345/global/networks/myVPC`. + Format is of the form `projects/{project}/global/networks/{network}`. + Where `{project}` is a project number, as in `12345`, and `{network}` + is a network name. Private services access must already be configured + for the network. If left unspecified, the job is not peered with any network. + isOptional: true + parameterType: STRING + project: + defaultValue: '{{$.pipeline_google_cloud_project_id}}' + description: Project to create the custom training job in. Defaults to the + project in which the PipelineJob is run. + isOptional: true + parameterType: STRING + reserved_ip_ranges: + defaultValue: [] + description: A list of names for the reserved IP ranges under the VPC network + that can be used for this job. If set, we will deploy the job within the + provided IP ranges. Otherwise, the job will be deployed to any IP ranges + under the provided VPC network. + isOptional: true + parameterType: LIST + restart_job_on_worker_restart: + defaultValue: false + description: Restarts the entire CustomJob if a worker gets restarted. This + feature can be used by distributed training jobs that are not resilient + to workers leaving and joining a job. + isOptional: true + parameterType: BOOLEAN + service_account: + defaultValue: '' + description: Sets the default service account for workload run-as account. + The [service account ](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project#service-account) + running the pipeline submitting jobs must have act-as permission on this + run-as account. If unspecified, the Vertex AI Custom Code [Service Agent + ](https://cloud.google.com/vertex-ai/docs/general/access-control#service-agents) + for the CustomJob's project. + isOptional: true + parameterType: STRING + tensorboard: + defaultValue: '' + description: The name of a Vertex AI TensorBoard resource to which this + CustomJob will upload TensorBoard logs. + isOptional: true + parameterType: STRING + timeout: + defaultValue: 604800s + description: 'The maximum job running time. The default is 7 days. A duration + in seconds with up to nine fractional digits, terminated by ''s'', for + example: "3.5s".' + isOptional: true + parameterType: STRING + worker_pool_specs: + defaultValue: [] + description: Serialized json spec of the worker pools including machine + type and Docker image. All worker pools except the first one are optional + and can be skipped by providing an empty value. See [more information](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#WorkerPoolSpec). + isOptional: true + parameterType: LIST + outputDefinitions: + parameters: + gcp_resources: + description: Serialized JSON of `gcp_resources` [proto](https://github.com/kubeflow/pipelines/tree/master/components/google-cloud/google_cloud_pipeline_components/proto) + which tracks the CustomJob. + parameterType: STRING +deploymentSpec: + executors: + exec-custom-training-job: + container: + args: + - --type + - CustomJob + - --payload + - '{"display_name": "{{$.inputs.parameters[''display_name'']}}", "job_spec": + {"worker_pool_specs": {{$.inputs.parameters[''worker_pool_specs'']}}, "scheduling": + {"timeout": "{{$.inputs.parameters[''timeout'']}}", "restart_job_on_worker_restart": + {{$.inputs.parameters[''restart_job_on_worker_restart'']}}}, "service_account": + "{{$.inputs.parameters[''service_account'']}}", "tensorboard": "{{$.inputs.parameters[''tensorboard'']}}", + "enable_web_access": {{$.inputs.parameters[''enable_web_access'']}}, "network": + "{{$.inputs.parameters[''network'']}}", "reserved_ip_ranges": {{$.inputs.parameters[''reserved_ip_ranges'']}}, + "base_output_directory": {"output_uri_prefix": "{{$.inputs.parameters[''base_output_directory'']}}"}}, + "labels": {{$.inputs.parameters[''labels'']}}, "encryption_spec": {"kms_key_name": + "{{$.inputs.parameters[''encryption_spec_key_name'']}}"}}' + - --project + - '{{$.inputs.parameters[''project'']}}' + - --location + - '{{$.inputs.parameters[''location'']}}' + - --gcp_resources + - '{{$.outputs.parameters[''gcp_resources''].output_file}}' + command: + - python3 + - -u + - -m + - google_cloud_pipeline_components.container.v1.custom_job.launcher + image: gcr.io/ml-pipeline/google-cloud-pipeline-components:2.14.1 +pipelineInfo: + name: pipeline +root: + dag: + tasks: + custom-training-job: + cachingOptions: + enableCache: true + componentRef: + name: comp-custom-training-job + inputs: + parameters: + display_name: + runtimeValue: + constant: add-numbers + encryption_spec_key_name: + componentInputParameter: encryption_spec_key_name + location: + componentInputParameter: location + pipelinechannel--accelerator_count: + componentInputParameter: accelerator_count + pipelinechannel--accelerator_type: + componentInputParameter: accelerator_type + pipelinechannel--machine_type: + componentInputParameter: machine_type + project: + componentInputParameter: project + worker_pool_specs: + runtimeValue: + constant: + - container_spec: + args: + - foo + command: + - echo + image_uri: gcr.io/ml-pipeline/google-cloud-pipeline-components:2.5.0 + machine_spec: + accelerator_count: '{{$.inputs.parameters[''pipelinechannel--accelerator_count'']}}' + accelerator_type: '{{$.inputs.parameters[''pipelinechannel--accelerator_type'']}}' + machine_type: '{{$.inputs.parameters[''pipelinechannel--machine_type'']}}' + replica_count: 1.0 + taskInfo: + name: custom-training-job + inputDefinitions: + parameters: + accelerator_count: + parameterType: NUMBER_INTEGER + accelerator_type: + parameterType: STRING + encryption_spec_key_name: + defaultValue: '' + isOptional: true + parameterType: STRING + location: + parameterType: STRING + machine_type: + parameterType: STRING + project: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.7.0 diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_task_output_custom_training_job.py b/sdk/python/test_data/pipelines/pipeline_with_dynamic_task_output_custom_training_job.py new file mode 100644 index 00000000000..6308d1b0b36 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_task_output_custom_training_job.py @@ -0,0 +1,56 @@ +import google_cloud_pipeline_components.v1.custom_job as custom_job +from kfp import dsl + + +@dsl.component +def machine_type() -> str: + return 'n1-standard-4' + + +@dsl.component +def accelerator_type() -> str: + return 'NVIDIA_TESLA_P4' + + +@dsl.component +def accelerator_count() -> int: + return 1 + + +@dsl.pipeline +def pipeline( + project: str, + location: str, + encryption_spec_key_name: str = '', +): + machine_type_task = machine_type() + accelerator_type_task = accelerator_type() + accelerator_count_task = accelerator_count() + + custom_job.CustomTrainingJobOp( + display_name='add-numbers', + worker_pool_specs=[{ + 'container_spec': { + 'image_uri': + ('gcr.io/ml-pipeline/google-cloud-pipeline-components:2.5.0' + ), + 'command': ['echo'], + 'args': ['foo'], + }, + 'machine_spec': { + 'machine_type': machine_type_task.output, + 'accelerator_type': accelerator_type_task.output, + 'accelerator_count': accelerator_count_task.output, + }, + 'replica_count': 1, + }], + project=project, + location=location, + encryption_spec_key_name=encryption_spec_key_name, + ) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_task_output_custom_training_job.yaml b/sdk/python/test_data/pipelines/pipeline_with_dynamic_task_output_custom_training_job.yaml new file mode 100644 index 00000000000..52caab3e992 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_task_output_custom_training_job.yaml @@ -0,0 +1,332 @@ +# PIPELINE DEFINITION +# Name: pipeline +# Inputs: +# encryption_spec_key_name: str [Default: ''] +# location: str +# project: str +components: + comp-accelerator-count: + executorLabel: exec-accelerator-count + outputDefinitions: + parameters: + Output: + parameterType: NUMBER_INTEGER + comp-accelerator-type: + executorLabel: exec-accelerator-type + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-custom-training-job: + executorLabel: exec-custom-training-job + inputDefinitions: + parameters: + base_output_directory: + defaultValue: '' + description: The Cloud Storage location to store the output of this CustomJob + or HyperparameterTuningJob. See [more information ](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/GcsDestination). + isOptional: true + parameterType: STRING + display_name: + description: The name of the CustomJob. + parameterType: STRING + enable_web_access: + defaultValue: false + description: Whether you want Vertex AI to enable [interactive shell access + ](https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell) + to training containers. If `True`, you can access interactive shells at + the URIs given by [CustomJob.web_access_uris][]. + isOptional: true + parameterType: BOOLEAN + encryption_spec_key_name: + defaultValue: '' + description: Customer-managed encryption key options for the CustomJob. + If this is set, then all resources created by the CustomJob will be encrypted + with the provided encryption key. + isOptional: true + parameterType: STRING + labels: + defaultValue: {} + description: The labels with user-defined metadata to organize the CustomJob. + See [more information](https://goo.gl/xmQnxf). + isOptional: true + parameterType: STRUCT + location: + defaultValue: us-central1 + description: Location for creating the custom training job. If not set, + default to us-central1. + isOptional: true + parameterType: STRING + network: + defaultValue: '' + description: The full name of the Compute Engine network to which the job + should be peered. For example, `projects/12345/global/networks/myVPC`. + Format is of the form `projects/{project}/global/networks/{network}`. + Where `{project}` is a project number, as in `12345`, and `{network}` + is a network name. Private services access must already be configured + for the network. If left unspecified, the job is not peered with any network. + isOptional: true + parameterType: STRING + project: + defaultValue: '{{$.pipeline_google_cloud_project_id}}' + description: Project to create the custom training job in. Defaults to the + project in which the PipelineJob is run. + isOptional: true + parameterType: STRING + reserved_ip_ranges: + defaultValue: [] + description: A list of names for the reserved IP ranges under the VPC network + that can be used for this job. If set, we will deploy the job within the + provided IP ranges. Otherwise, the job will be deployed to any IP ranges + under the provided VPC network. + isOptional: true + parameterType: LIST + restart_job_on_worker_restart: + defaultValue: false + description: Restarts the entire CustomJob if a worker gets restarted. This + feature can be used by distributed training jobs that are not resilient + to workers leaving and joining a job. + isOptional: true + parameterType: BOOLEAN + service_account: + defaultValue: '' + description: Sets the default service account for workload run-as account. + The [service account ](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project#service-account) + running the pipeline submitting jobs must have act-as permission on this + run-as account. If unspecified, the Vertex AI Custom Code [Service Agent + ](https://cloud.google.com/vertex-ai/docs/general/access-control#service-agents) + for the CustomJob's project. + isOptional: true + parameterType: STRING + tensorboard: + defaultValue: '' + description: The name of a Vertex AI TensorBoard resource to which this + CustomJob will upload TensorBoard logs. + isOptional: true + parameterType: STRING + timeout: + defaultValue: 604800s + description: 'The maximum job running time. The default is 7 days. A duration + in seconds with up to nine fractional digits, terminated by ''s'', for + example: "3.5s".' + isOptional: true + parameterType: STRING + worker_pool_specs: + defaultValue: [] + description: Serialized json spec of the worker pools including machine + type and Docker image. All worker pools except the first one are optional + and can be skipped by providing an empty value. See [more information](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#WorkerPoolSpec). + isOptional: true + parameterType: LIST + outputDefinitions: + parameters: + gcp_resources: + description: Serialized JSON of `gcp_resources` [proto](https://github.com/kubeflow/pipelines/tree/master/components/google-cloud/google_cloud_pipeline_components/proto) + which tracks the CustomJob. + parameterType: STRING + comp-machine-type: + executorLabel: exec-machine-type + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-accelerator-count: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - accelerator_count + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef accelerator_count() -> int:\n return 1\n\n" + image: python:3.8 + exec-accelerator-type: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - accelerator_type + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef accelerator_type() -> str:\n return 'NVIDIA_TESLA_P4'\n\n" + image: python:3.8 + exec-custom-training-job: + container: + args: + - --type + - CustomJob + - --payload + - '{"display_name": "{{$.inputs.parameters[''display_name'']}}", "job_spec": + {"worker_pool_specs": {{$.inputs.parameters[''worker_pool_specs'']}}, "scheduling": + {"timeout": "{{$.inputs.parameters[''timeout'']}}", "restart_job_on_worker_restart": + {{$.inputs.parameters[''restart_job_on_worker_restart'']}}}, "service_account": + "{{$.inputs.parameters[''service_account'']}}", "tensorboard": "{{$.inputs.parameters[''tensorboard'']}}", + "enable_web_access": {{$.inputs.parameters[''enable_web_access'']}}, "network": + "{{$.inputs.parameters[''network'']}}", "reserved_ip_ranges": {{$.inputs.parameters[''reserved_ip_ranges'']}}, + "base_output_directory": {"output_uri_prefix": "{{$.inputs.parameters[''base_output_directory'']}}"}}, + "labels": {{$.inputs.parameters[''labels'']}}, "encryption_spec": {"kms_key_name": + "{{$.inputs.parameters[''encryption_spec_key_name'']}}"}}' + - --project + - '{{$.inputs.parameters[''project'']}}' + - --location + - '{{$.inputs.parameters[''location'']}}' + - --gcp_resources + - '{{$.outputs.parameters[''gcp_resources''].output_file}}' + command: + - python3 + - -u + - -m + - google_cloud_pipeline_components.container.v1.custom_job.launcher + image: gcr.io/ml-pipeline/google-cloud-pipeline-components:2.14.1 + exec-machine-type: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - machine_type + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef machine_type() -> str:\n return 'n1-standard-4'\n\n" + image: python:3.8 +pipelineInfo: + name: pipeline +root: + dag: + tasks: + accelerator-count: + cachingOptions: + enableCache: true + componentRef: + name: comp-accelerator-count + taskInfo: + name: accelerator-count + accelerator-type: + cachingOptions: + enableCache: true + componentRef: + name: comp-accelerator-type + taskInfo: + name: accelerator-type + custom-training-job: + cachingOptions: + enableCache: true + componentRef: + name: comp-custom-training-job + dependentTasks: + - accelerator-count + - accelerator-type + - machine-type + inputs: + parameters: + display_name: + runtimeValue: + constant: add-numbers + encryption_spec_key_name: + componentInputParameter: encryption_spec_key_name + location: + componentInputParameter: location + pipelinechannel--accelerator-count-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: accelerator-count + pipelinechannel--accelerator-type-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: accelerator-type + pipelinechannel--machine-type-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: machine-type + project: + componentInputParameter: project + worker_pool_specs: + runtimeValue: + constant: + - container_spec: + args: + - foo + command: + - echo + image_uri: gcr.io/ml-pipeline/google-cloud-pipeline-components:2.5.0 + machine_spec: + accelerator_count: '{{$.inputs.parameters[''pipelinechannel--accelerator-count-Output'']}}' + accelerator_type: '{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}' + machine_type: '{{$.inputs.parameters[''pipelinechannel--machine-type-Output'']}}' + replica_count: 1.0 + taskInfo: + name: custom-training-job + machine-type: + cachingOptions: + enableCache: true + componentRef: + name: comp-machine-type + taskInfo: + name: machine-type + inputDefinitions: + parameters: + encryption_spec_key_name: + defaultValue: '' + isOptional: true + parameterType: STRING + location: + parameterType: STRING + project: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.7.0