diff --git a/components/google-cloud/RELEASE.md b/components/google-cloud/RELEASE.md index cb51fb6e857b..56ff2c9b55dc 100644 --- a/components/google-cloud/RELEASE.md +++ b/components/google-cloud/RELEASE.md @@ -6,6 +6,7 @@ * Add support for running tasks on a `PersistentResource` (see [CustomJobSpec](https://cloud.google.com/vertex-ai/docs/reference/rest/v1beta1/CustomJobSpec)) via `persistent_resource_id` parameter on `v1.custom_job.CustomTrainingJobOp` and `v1.custom_job.create_custom_training_job_from_component` * Bump image for Structured Data pipelines. * Add check that component in preview.custom_job.utils.create_custom_training_job_from_component doesn't have any parameters that share names with any custom job fields +* Add dynamic machine spec support for `preview.custom_job.utils.create_custom_training_job_from_component`. ## Release 2.15.0 * Add Gemini batch prediction support to `v1.model_evaluation.autosxs_pipeline`. diff --git a/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py b/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py index d72f35833a1b..e56548c002fe 100644 --- a/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py +++ b/components/google-cloud/google_cloud_pipeline_components/container/preview/custom_job/remote_runner.py @@ -32,6 +32,27 @@ def insert_system_labels_into_payload(payload): return json.dumps(job_spec) +def cast_accelerator_count_to_int(payload): + """Casts accelerator_count from string to an int.""" + + job_spec = json.loads(payload) + # TODO(b/353577594): accelerator_count placeholder is not resolved to int. + # Need to typecast to int to avoid type mismatch error. Can remove when fix + # placeholder resolution. + if ( + 'accelerator_count' + in job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'] + ): + job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][ + 'accelerator_count' + ] = int( + job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][ + 'accelerator_count' + ] + ) + return json.dumps(job_spec) + + def create_custom_job_with_client(job_client, parent, job_spec): create_custom_job_fn = None try: @@ -86,6 +107,7 @@ def create_custom_job( # Create custom job if it does not exist job_name = remote_runner.check_if_job_exists() if job_name is None: + payload = cast_accelerator_count_to_int(payload) job_name = remote_runner.create_job( create_custom_job_with_client, insert_system_labels_into_payload(payload), diff --git a/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py b/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py index dc78cf8dee79..ed35a559f557 100644 --- a/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py +++ b/components/google-cloud/google_cloud_pipeline_components/preview/custom_job/utils.py @@ -54,7 +54,7 @@ def create_custom_training_job_from_component( display_name: str = '', replica_count: int = 1, machine_type: str = 'n1-standard-4', - accelerator_type: str = '', + accelerator_type: str = 'ACCELERATOR_TYPE_UNSPECIFIED', accelerator_count: int = 1, boot_disk_type: str = 'pd-ssd', boot_disk_size_gb: int = 100, @@ -83,7 +83,7 @@ def create_custom_training_job_from_component( replica_count: The count of instances in the cluster. One replica always counts towards the master in worker_pool_spec[0] and the remaining replicas will be allocated in worker_pool_spec[1]. See [more information.](https://cloud.google.com/vertex-ai/docs/training/distributed-training#configure_a_distributed_training_job) machine_type: The type of the machine to run the CustomJob. The default value is "n1-standard-4". See [more information](https://cloud.google.com/vertex-ai/docs/training/configure-compute#machine-types). accelerator_type: The type of accelerator(s) that may be attached to the machine per `accelerator_count`. See [more information](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype). - accelerator_count: The number of accelerators to attach to the machine. Defaults to 1 if `accelerator_type` is set. + accelerator_count: The number of accelerators to attach to the machine. Defaults to 1 if `accelerator_type` is set statically. boot_disk_type: Type of the boot disk (default is "pd-ssd"). Valid values: "pd-ssd" (Persistent Disk Solid State Drive) or "pd-standard" (Persistent Disk Hard Disk Drive). boot_disk_type is set as a static value and cannot be changed as a pipeline parameter. boot_disk_size_gb: Size in GB of the boot disk (default is 100GB). `boot_disk_size_gb` is set as a static value and cannot be changed as a pipeline parameter. timeout: The maximum job running time. The default is 7 days. A duration in seconds with up to nine fractional digits, terminated by 's', for example: "3.5s". @@ -148,7 +148,11 @@ def create_custom_training_job_from_component( )[0]['container'] worker_pool_spec = { - 'machine_spec': {'machine_type': machine_type}, + 'machine_spec': { + 'machine_type': "{{$.inputs.parameters['machine_type']}}", + 'accelerator_type': "{{$.inputs.parameters['accelerator_type']}}", + 'accelerator_count': "{{$.inputs.parameters['accelerator_count']}}", + }, 'replica_count': 1, 'container_spec': { 'image_uri': user_component_container['image'], @@ -161,9 +165,6 @@ def create_custom_training_job_from_component( 'env': env or [], }, } - if accelerator_type: - worker_pool_spec['machine_spec']['accelerator_type'] = accelerator_type - worker_pool_spec['machine_spec']['accelerator_count'] = accelerator_count if boot_disk_type: worker_pool_spec['disk_spec'] = { 'boot_disk_type': boot_disk_type, @@ -210,6 +211,26 @@ def create_custom_training_job_from_component( 'defaultValue' ] = default_value + # add machine parameters into the customjob component + if accelerator_type == 'ACCELERATOR_TYPE_UNSPECIFIED': + accelerator_count = 0 + + cj_component_spec['inputDefinitions']['parameters']['machine_type'] = { + 'parameterType': 'STRING', + 'defaultValue': machine_type, + 'isOptional': True, + } + cj_component_spec['inputDefinitions']['parameters']['accelerator_type'] = { + 'parameterType': 'STRING', + 'defaultValue': accelerator_type, + 'isOptional': True, + } + cj_component_spec['inputDefinitions']['parameters']['accelerator_count'] = { + 'parameterType': 'NUMBER_INTEGER', + 'defaultValue': accelerator_count, + 'isOptional': True, + } + # check if user component has any input parameters that already exist in the # custom job component for param_name in user_component_spec.get('inputDefinitions', {}).get( @@ -234,6 +255,7 @@ def create_custom_training_job_from_component( cj_component_spec['outputDefinitions']['parameters'].update( user_component_spec.get('outputDefinitions', {}).get('parameters', {}) ) + # use artifacts from user component ## assign artifacts, not update, since customjob has no artifact outputs cj_component_spec['inputDefinitions']['artifacts'] = user_component_spec.get(