Skip to content

Commit

Permalink
feat(components): Add dynamic machine parameter support in preview.cu…
Browse files Browse the repository at this point in the history
…stom_job.utils.create_custom_training_job_from_component

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 645295962
  • Loading branch information
Googler committed Jul 16, 2024
1 parent 9cb5913 commit cf8cc14
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
1 change: 1 addition & 0 deletions components/google-cloud/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Fix bug in Starry Net's upload decomposition plot step due to protobuf upgrade, by pinning protobuf library to 3.20.*.
* Add support for running tasks on a `PersistentResource` (see [CustomJobSpec](https://cloud.google.com/vertex-ai/docs/reference/rest/v1beta1/CustomJobSpec)) via `persistent_resource_id` parameter on `v1.custom_job.CustomTrainingJobOp` and `v1.custom_job.create_custom_training_job_from_component`
* Bump image for Structured Data pipelines.
* Add dynamic machine spec support for `preview.custom_job.utils.create_custom_training_job_from_component`.

## Release 2.15.0
* Add Gemini batch prediction support to `v1.model_evaluation.autosxs_pipeline`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ def insert_system_labels_into_payload(payload):
return json.dumps(job_spec)


def cast_accelerator_count_to_int(payload):
"""Casts accelerator_count from string to an int."""

job_spec = json.loads(payload)
# TODO(b/353577594): accelerator_count placeholder is not resolved to int.
if (
'accelerator_count'
in job_spec['job_spec']['worker_pool_specs'][0]['machine_spec']
):
job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][
'accelerator_count'
] = int(
job_spec['job_spec']['worker_pool_specs'][0]['machine_spec'][
'accelerator_count'
]
)
return json.dumps(job_spec)


def create_custom_job_with_client(job_client, parent, job_spec):
create_custom_job_fn = None
try:
Expand Down Expand Up @@ -86,6 +105,7 @@ def create_custom_job(
# Create custom job if it does not exist
job_name = remote_runner.check_if_job_exists()
if job_name is None:
payload = cast_accelerator_count_to_int(payload)
job_name = remote_runner.create_job(
create_custom_job_with_client,
insert_system_labels_into_payload(payload),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def create_custom_training_job_from_component(
display_name: str = '',
replica_count: int = 1,
machine_type: str = 'n1-standard-4',
accelerator_type: str = '',
accelerator_type: str = 'ACCELERATOR_TYPE_UNSPECIFIED',
accelerator_count: int = 1,
boot_disk_type: str = 'pd-ssd',
boot_disk_size_gb: int = 100,
Expand Down Expand Up @@ -83,7 +83,7 @@ def create_custom_training_job_from_component(
replica_count: The count of instances in the cluster. One replica always counts towards the master in worker_pool_spec[0] and the remaining replicas will be allocated in worker_pool_spec[1]. See [more information.](https://cloud.google.com/vertex-ai/docs/training/distributed-training#configure_a_distributed_training_job)
machine_type: The type of the machine to run the CustomJob. The default value is "n1-standard-4". See [more information](https://cloud.google.com/vertex-ai/docs/training/configure-compute#machine-types).
accelerator_type: The type of accelerator(s) that may be attached to the machine per `accelerator_count`. See [more information](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype).
accelerator_count: The number of accelerators to attach to the machine. Defaults to 1 if `accelerator_type` is set.
accelerator_count: The number of accelerators to attach to the machine. Defaults to 1 if `accelerator_type` is set statically.
boot_disk_type: Type of the boot disk (default is "pd-ssd"). Valid values: "pd-ssd" (Persistent Disk Solid State Drive) or "pd-standard" (Persistent Disk Hard Disk Drive). boot_disk_type is set as a static value and cannot be changed as a pipeline parameter.
boot_disk_size_gb: Size in GB of the boot disk (default is 100GB). `boot_disk_size_gb` is set as a static value and cannot be changed as a pipeline parameter.
timeout: The maximum job running time. The default is 7 days. A duration in seconds with up to nine fractional digits, terminated by 's', for example: "3.5s".
Expand Down Expand Up @@ -148,7 +148,11 @@ def create_custom_training_job_from_component(
)[0]['container']

worker_pool_spec = {
'machine_spec': {'machine_type': machine_type},
'machine_spec': {
'machine_type': "{{$.inputs.parameters['machine_type']}}",
'accelerator_type': "{{$.inputs.parameters['accelerator_type']}}",
'accelerator_count': "{{$.inputs.parameters['accelerator_count']}}",
},
'replica_count': 1,
'container_spec': {
'image_uri': user_component_container['image'],
Expand All @@ -161,9 +165,6 @@ def create_custom_training_job_from_component(
'env': env or [],
},
}
if accelerator_type:
worker_pool_spec['machine_spec']['accelerator_type'] = accelerator_type
worker_pool_spec['machine_spec']['accelerator_count'] = accelerator_count
if boot_disk_type:
worker_pool_spec['disk_spec'] = {
'boot_disk_type': boot_disk_type,
Expand Down Expand Up @@ -217,6 +218,27 @@ def create_custom_training_job_from_component(
cj_component_spec['outputDefinitions']['parameters'].update(
user_component_spec.get('outputDefinitions', {}).get('parameters', {})
)

# add machine parameters into the customjob component
if accelerator_type == 'ACCELERATOR_TYPE_UNSPECIFIED':
accelerator_count = 0

cj_component_spec['inputDefinitions']['parameters']['machine_type'] = {
'parameterType': 'STRING',
'defaultValue': machine_type,
'isOptional': True,
}
cj_component_spec['inputDefinitions']['parameters']['accelerator_type'] = {
'parameterType': 'STRING',
'defaultValue': accelerator_type,
'isOptional': True,
}
cj_component_spec['inputDefinitions']['parameters']['accelerator_count'] = {
'parameterType': 'NUMBER_INTEGER',
'defaultValue': accelerator_count,
'isOptional': True,
}

# use artifacts from user component
## assign artifacts, not update, since customjob has no artifact outputs
cj_component_spec['inputDefinitions']['artifacts'] = user_component_spec.get(
Expand Down

0 comments on commit cf8cc14

Please sign in to comment.