From 52d499f687d53c49f58b2a7e0a294f956c82e2b6 Mon Sep 17 00:00:00 2001 From: Googler Date: Tue, 5 Mar 2024 10:26:46 -0800 Subject: [PATCH] feat(components): Add location validation to `preview.llm.rlhf_pipeline` PiperOrigin-RevId: 612888991 --- components/google-cloud/RELEASE.md | 1 + .../_implementation/llm/deployment_graph.py | 3 +- .../_implementation/llm/function_based.py | 35 ++++++++++--------- .../llm/reinforcement_learning_graph.py | 11 +++--- .../_implementation/llm/reward_model_graph.py | 11 +++--- .../_implementation/llm/validate_pipeline.py | 23 ++++++------ .../preview/llm/infer/component.py | 10 +++--- .../preview/llm/rlhf/component.py | 11 ++++-- 8 files changed, 60 insertions(+), 45 deletions(-) diff --git a/components/google-cloud/RELEASE.md b/components/google-cloud/RELEASE.md index 35fc80e9d38b..5d4ba1d32b92 100644 --- a/components/google-cloud/RELEASE.md +++ b/components/google-cloud/RELEASE.md @@ -4,6 +4,7 @@ * Fix issue where AutoSxS was not propagating location to all sub-components. * Add CMEK support to `preview.llm.infer_pipeline`. * Use `eval_dataset` for train-time evalutation when training a reward model. Requires `eval_dataset` to contain the same fields as the [preference dataset](https://cloud.google.com/vertex-ai/docs/generative-ai/models/tune-text-models-rlhf#human-preference-dataset). +* Add location validation `preview.llm.rlhf_pipeline`. ## Release 2.10.0 * Fix the missing output of pipeline remote runner. `AutoMLImageTrainingJobRunOp` now passes the model artifacts correctly to downstream components. diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/deployment_graph.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/deployment_graph.py index 9cff44a55a4c..f525f272865a 100644 --- a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/deployment_graph.py +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/deployment_graph.py @@ -37,6 +37,7 @@ def pipeline( model_display_name: Optional[str] = None, deploy_model: bool = True, encryption_spec_key_name: str = '', + location: str = _placeholders.LOCATION_PLACEHOLDER, ) -> PipelineOutput: # fmt: off """Uploads a tuned language model and (optionally) deploys it to an endpoint. @@ -53,7 +54,7 @@ def pipeline( endpoint_resource_name: Path the Online Prediction Endpoint. This will be an empty string if the model was not deployed. """ # fmt: on - upload_location = 'us-central1' + upload_location = location adapter_artifact = kfp.dsl.importer( artifact_uri=output_adapter_path, artifact_class=kfp.dsl.Artifact, diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/function_based.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/function_based.py index a7f5c7bd4fce..d7a4d3f71f77 100644 --- a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/function_based.py +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/function_based.py @@ -22,19 +22,24 @@ @dsl.component(base_image=_image.GCPC_IMAGE_TAG, install_kfp_package=False) def resolve_machine_spec( - location: str, + accelerator_type: str = '', use_test_spec: bool = False, ) -> NamedTuple( - 'MachineSpec', machine_type=str, accelerator_type=str, accelerator_count=int + 'MachineSpec', + machine_type=str, + tuning_location=str, + accelerator_count=int, ): - """Returns machine spec to use for a given location. + """Returns machine spec to use for a given accelerator_type. Args: - location: Where the machine will run. + accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning runs + in europe-west4, else in us-central1. Default is 'GPU'. use_test_spec: Whether to use a lower resource machine for testing. Returns: Machine spec. + tuning_location: Where the machine will run. Raises: ValueError: If accelerators are requested in an unsupported location. @@ -42,39 +47,37 @@ def resolve_machine_spec( outputs = NamedTuple( 'MachineSpec', machine_type=str, - accelerator_type=str, accelerator_count=int, + tuning_location=str, ) - tpu_regions = {'europe-west4'} - gpu_regions = {'us-central1'} if use_test_spec: - if location in tpu_regions: + if accelerator_type == 'TPU': return outputs( machine_type='cloud-tpu', - accelerator_type='TPU_V3', accelerator_count=32, + tuning_location='europe-west4', ) else: return outputs( machine_type='a2-highgpu-1g', - accelerator_type='NVIDIA_TESLA_A100', accelerator_count=1, + tuning_location='us-central1', ) - elif location in tpu_regions: + elif accelerator_type == 'TPU': return outputs( machine_type='cloud-tpu', - accelerator_type='TPU_V3', accelerator_count=64, + tuning_location='europe-west4', ) - elif location in gpu_regions: + elif accelerator_type == 'GPU': return outputs( machine_type='a2-ultragpu-8g', - accelerator_type='NVIDIA_A100_80GB', accelerator_count=8, + tuning_location='us-central1', ) raise ValueError( - f'Unsupported accelerator location {location}. Must be one of' - f' {tpu_regions | gpu_regions}.' + f'Unsupported accelerator type {accelerator_type}. Must be one of' + 'TPU or GPU.' ) diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/reinforcement_learning_graph.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/reinforcement_learning_graph.py index bd83baf0325e..76504632897b 100644 --- a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/reinforcement_learning_graph.py +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/reinforcement_learning_graph.py @@ -51,6 +51,7 @@ def pipeline( kl_coeff: float = 0.1, instruction: Optional[str] = None, project: str = _placeholders.PROJECT_ID_PLACEHOLDER, + accelerator_type: str = 'GPU', location: str = _placeholders.LOCATION_PLACEHOLDER, tensorboard_resource_id: Optional[str] = None, encryption_spec_key_name: str = '', @@ -73,6 +74,7 @@ def pipeline( kl_coeff: Coefficient for KL penalty. This regularizes the policy model and penalizes if it diverges from its initial distribution. If set to 0, the reference language model is not loaded into memory. Default value is 0.1. instruction: This field lets the model know what task it needs to perform. Base models have been trained over a large set of varied instructions. You can give a simple and intuitive description of the task and the model will follow it, e.g. "Classify this movie review as positive or negative" or "Translate this sentence to Danish". Do not specify this if your dataset already prepends the instruction to the inputs field. project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used. + accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning runs in europe-west4, else in us-central1. Default is 'GPU'. location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used. tensorboard_resource_id: Optional tensorboard resource id in format `projects/{project_number}/locations/{location}/tensorboards/{tensorboard_id}`. If provided, tensorboard metrics will be uploaded to this location. encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment. @@ -84,7 +86,8 @@ def pipeline( # fmt: on prompt_column = 'input_text' machine_spec = function_based.resolve_machine_spec( - location=location, use_test_spec=env.get_use_test_machine_spec() + accelerator_type=accelerator_type, + use_test_spec=env.get_use_test_machine_spec(), ).set_display_name('Resolve Machine Spec') reference_model_metadata = function_based.resolve_reference_model_metadata( @@ -116,7 +119,7 @@ def pipeline( .set_caching_options(False) ) rl_image_uri = function_based.resolve_private_refined_image_uri( - accelerator_type=machine_spec.outputs['accelerator_type'], + accelerator_type=accelerator_type, ).set_display_name('Resolve Reinforcer Image URI') num_microbatches = function_based.resolve_num_microbatches( large_model_reference=reference_model_metadata.outputs[ @@ -126,7 +129,7 @@ def pipeline( rl_model = ( reinforcer.reinforcer( project=project, - location=location, + location=machine_spec.outputs['tuning_location'], input_reference_model_path=reference_model_metadata.outputs[ 'reference_model_path' ], @@ -137,7 +140,7 @@ def pipeline( ], input_preference_dataset_path=input_preference_dataset_path, train_steps=reinforcement_learning_train_steps, - accelerator_type=machine_spec.outputs['accelerator_type'], + accelerator_type=accelerator_type, accelerator_count=machine_spec.outputs['accelerator_count'], large_model_reference=reference_model_metadata.outputs[ 'large_model_reference' diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/reward_model_graph.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/reward_model_graph.py index 52e822616721..c873ff80245e 100644 --- a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/reward_model_graph.py +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/reward_model_graph.py @@ -48,6 +48,7 @@ def pipeline( eval_dataset: Optional[str] = None, instruction: Optional[str] = None, project: str = _placeholders.PROJECT_ID_PLACEHOLDER, + accelerator_type: str = 'GPU', location: str = _placeholders.LOCATION_PLACEHOLDER, tensorboard_resource_id: Optional[str] = None, encryption_spec_key_name: str = '', @@ -66,6 +67,7 @@ def pipeline( reward_model_train_steps: Number of steps to use when training a reward model. Default value is 1000. instruction: This field lets the model know what task it needs to perform. Base models have been trained over a large set of varied instructions. You can give a simple and intuitive description of the task and the model will follow it, e.g. "Classify this movie review as positive or negative" or "Translate this sentence to Danish". Do not specify this if your dataset already prepends the instruction to the inputs field. project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used. + accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning runs in europe-west4, else in us-central1. Default is 'GPU'. location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used. tensorboard_resource_id: Optional tensorboard resource id in format `projects/{project_number}/locations/{location}/tensorboards/{tensorboard_id}`. If provided, tensorboard metrics will be uploaded to this location. encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment. @@ -80,7 +82,8 @@ def pipeline( candidate_columns = ['candidate_0', 'candidate_1'] choice_column = 'choice' machine_spec = function_based.resolve_machine_spec( - location=location, use_test_spec=env.get_use_test_machine_spec() + accelerator_type=accelerator_type, + use_test_spec=env.get_use_test_machine_spec(), ).set_display_name('Resolve Machine Spec') reference_model_metadata = function_based.resolve_reference_model_metadata( @@ -140,7 +143,7 @@ def pipeline( ) reward_model_image_uri = function_based.resolve_private_refined_image_uri( - accelerator_type=machine_spec.outputs['accelerator_type'], + accelerator_type=accelerator_type, ).set_display_name('Resolve Reward Model Image URI') num_microbatches = function_based.resolve_num_microbatches( large_model_reference=reference_model_metadata.outputs[ @@ -150,7 +153,7 @@ def pipeline( reward_model = ( reward_model_trainer.reward_model_trainer( project=project, - location=location, + location=machine_spec.outputs['tuning_location'], input_model_path=reference_model_metadata.outputs[ 'reward_model_path' ], @@ -161,7 +164,7 @@ def pipeline( 'output_dataset_path' ], train_steps=reward_model_train_steps, - accelerator_type=machine_spec.outputs['accelerator_type'], + accelerator_type=accelerator_type, accelerator_count=machine_spec.outputs['accelerator_count'], large_model_reference=reference_model_metadata.outputs[ 'reward_model_reference' diff --git a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/validate_pipeline.py b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/validate_pipeline.py index f884c2919e35..a44371849baa 100644 --- a/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/validate_pipeline.py +++ b/components/google-cloud/google_cloud_pipeline_components/_implementation/llm/validate_pipeline.py @@ -13,7 +13,7 @@ # limitations under the License. """KFP Component for validate_pipeline.""" -from typing import Optional +from typing import NamedTuple, Optional from google_cloud_pipeline_components import _image from google_cloud_pipeline_components import _placeholders @@ -25,10 +25,10 @@ def validate_pipeline( large_model_reference: str, location: str, encryption_spec_key_name: str = '', - machine_type: str = '', + accelerator_type: str = '', pipeline_region: str = '{{$.pipeline_google_cloud_location}}', eval_dataset: Optional[str] = None, -): +) -> NamedTuple('outputs', model_location=str): # fmt: off """Validate and preprocess pipeline parameters. @@ -40,8 +40,8 @@ def validate_pipeline( location: Region in which all the components except for tuning job should run. encryption_spec_key_name: If set, CMEK support will be validated. - machine_type: If 'tpu' is specified, tuning runs in - europe-west4, else in us-central1. + accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning + runs in europe-west4, else in us-central1 pipeline_region: The region the pipeline runs in. eval_dataset: Optional Cloud storage path to an evaluation dataset. Note, eval dataset can only be provided for third-party models. If provided, @@ -75,13 +75,6 @@ def validate_pipeline( 'Prediction.' ) - if 'gpu' in machine_type: - accelerator_type = 'GPU' - elif 'tpu' in machine_type: - accelerator_type = 'TPU' - else: - accelerator_type = None - supported_pipeline_regions = { 'europe-west4', 'us-central1', @@ -91,7 +84,6 @@ def validate_pipeline( f'Unsupported pipeline region: {pipeline_region}. Must be one of' f' {supported_pipeline_regions}.' ) - location = pipeline_region if not location else location valid_cmek_config = location == 'us-central1' and accelerator_type == 'GPU' @@ -101,6 +93,11 @@ def validate_pipeline( ' in us-central1. Please either unset encryption_spec_key_name or' ' create your pipeline in us-central1 to use GPU instead.' ) + outputs = NamedTuple('outputs', model_location=str) + ret = outputs( + model_location=location, + ) + return ret except Exception as e: # pylint: disable=broad-exception-caught if isinstance(e, ValueError): raise diff --git a/components/google-cloud/google_cloud_pipeline_components/preview/llm/infer/component.py b/components/google-cloud/google_cloud_pipeline_components/preview/llm/infer/component.py index 9f3d254800ec..af83616b44c3 100644 --- a/components/google-cloud/google_cloud_pipeline_components/preview/llm/infer/component.py +++ b/components/google-cloud/google_cloud_pipeline_components/preview/llm/infer/component.py @@ -41,6 +41,7 @@ def infer_pipeline( sampling_strategy: str = 'greedy', instruction: Optional[str] = None, project: str = _placeholders.PROJECT_ID_PLACEHOLDER, + accelerator_type: str = 'GPU', location: str = _placeholders.LOCATION_PLACEHOLDER, encryption_spec_key_name: str = '', ) -> PipelineOutput: @@ -56,6 +57,7 @@ def infer_pipeline( sampling_strategy: This field specifies the sampling strategy. The valid options are 'greedy' and 'temperature_sampling'. instruction: This field lets the model know what task it needs to perform. Base models have been trained over a large set of varied instructions. You can give a simple and intuitive description of the task and the model will follow it, e.g. "Classify this movie review as positive or negative" or "Translate this sentence to Danish". Do not specify this if your dataset already prepends the instruction to the inputs field. project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used. + accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning runs in europe-west4, else in us-central1. Default is 'GPU'. location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used. encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment. @@ -65,7 +67,7 @@ def infer_pipeline( # fmt: on prompt_column = 'input_text' machine_spec = function_based.resolve_machine_spec( - location=location, + accelerator_type=accelerator_type, use_test_spec=env.get_use_test_machine_spec(), ).set_display_name('Resolve Machine Spec') reference_model_metadata = function_based.resolve_reference_model_metadata( @@ -103,11 +105,11 @@ def infer_pipeline( ) bulk_inferrer_image_uri = function_based.resolve_private_refined_image_uri( - accelerator_type=machine_spec.outputs['accelerator_type'], + accelerator_type=accelerator_type, ).set_display_name('Resolve Bulk Inferrer Image URI') bulk_inference = bulk_inferrer.bulk_inferrer( project=project, - location=location, + location=machine_spec.outputs['tuning_location'], input_model=reference_model_metadata.outputs['reference_model_path'], input_dataset_path=prompt_dataset_importer.outputs['imported_data_path'], dataset_split=env.TRAIN_SPLIT, @@ -117,7 +119,7 @@ def infer_pipeline( 'large_model_reference' ], sampling_strategy=sampling_strategy, - accelerator_type=machine_spec.outputs['accelerator_type'], + accelerator_type=accelerator_type, accelerator_count=machine_spec.outputs['accelerator_count'], machine_type=machine_spec.outputs['machine_type'], image_uri=bulk_inferrer_image_uri.output, diff --git a/components/google-cloud/google_cloud_pipeline_components/preview/llm/rlhf/component.py b/components/google-cloud/google_cloud_pipeline_components/preview/llm/rlhf/component.py index d13e47f663f9..7cfc17627992 100644 --- a/components/google-cloud/google_cloud_pipeline_components/preview/llm/rlhf/component.py +++ b/components/google-cloud/google_cloud_pipeline_components/preview/llm/rlhf/component.py @@ -50,6 +50,7 @@ def rlhf_pipeline( deploy_model: bool = True, eval_dataset: Optional[str] = None, project: str = _placeholders.PROJECT_ID_PLACEHOLDER, + accelerator_type: str = 'GPU', location: str = _placeholders.LOCATION_PLACEHOLDER, encryption_spec_key_name: str = '', tensorboard_resource_id: Optional[str] = None, @@ -73,6 +74,7 @@ def rlhf_pipeline( deploy_model: Whether to deploy the model to an endpoint in `us-central1`. Default is True. eval_dataset: Optional Cloud storage path to an evaluation dataset. The dataset format is jsonl. The evaluation dataset can be used to compute train-time metrics (when training a reward model) or perform bulk inference for third-party models. To compute train-time metrics this dataset must contain the same fields as the peference dataset. For bulk inference with third-party models only `input_text` is needed. Note, train-time metrics are only computed for the first 5000 samples in the dataset for efficient evaluation during training. project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used. + accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning runs in europe-west4, else in us-central1. Default is 'GPU'. location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used. encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment. tensorboard_resource_id: Optional tensorboard resource id in format `projects/{project_number}/locations/{location}/tensorboards/{tensorboard_id}`. If provided, tensorboard metrics will be uploaded to this location. @@ -91,11 +93,12 @@ def rlhf_pipeline( reward_lora_dim = 4 machine_spec = function_based.resolve_machine_spec( - location=location, use_test_spec=env.get_use_test_machine_spec() - ).set_display_name('Resolve Machine Spec') + accelerator_type=accelerator_type, + use_test_spec=env.get_use_test_machine_spec(), + ).set_display_name('Resolve Machine Spec and Tuning Location') validate_pipeline_task = validate_pipeline.validate_pipeline( - machine_type=machine_spec.outputs['machine_type'], + accelerator_type=accelerator_type, location=location, encryption_spec_key_name=encryption_spec_key_name, large_model_reference=large_model_reference, @@ -116,6 +119,7 @@ def rlhf_pipeline( lora_dim=reward_lora_dim, project=project, location=location, + accelerator_type=accelerator_type, tensorboard_resource_id=tensorboard_resource_id, encryption_spec_key_name=encryption_spec_key_name, ) @@ -180,6 +184,7 @@ def rlhf_pipeline( model_display_name=model_display_name, deploy_model=deploy_model, encryption_spec_key_name=encryption_spec_key_name, + location=validate_pipeline_task.outputs['model_location'], ).set_display_name('Upload and Deploy Tuned Model') return PipelineOutput(