Skip to content

Commit

Permalink
feat(components): Add location validation to preview.llm.rlhf_pipeline
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 612888991
  • Loading branch information
Googler committed Mar 6, 2024
1 parent 731cb81 commit 52d499f
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 45 deletions.
1 change: 1 addition & 0 deletions components/google-cloud/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Fix issue where AutoSxS was not propagating location to all sub-components.
* Add CMEK support to `preview.llm.infer_pipeline`.
* Use `eval_dataset` for train-time evalutation when training a reward model. Requires `eval_dataset` to contain the same fields as the [preference dataset](https://cloud.google.com/vertex-ai/docs/generative-ai/models/tune-text-models-rlhf#human-preference-dataset).
* Add location validation `preview.llm.rlhf_pipeline`.

## Release 2.10.0
* Fix the missing output of pipeline remote runner. `AutoMLImageTrainingJobRunOp` now passes the model artifacts correctly to downstream components.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def pipeline(
model_display_name: Optional[str] = None,
deploy_model: bool = True,
encryption_spec_key_name: str = '',
location: str = _placeholders.LOCATION_PLACEHOLDER,
) -> PipelineOutput:
# fmt: off
"""Uploads a tuned language model and (optionally) deploys it to an endpoint.
Expand All @@ -53,7 +54,7 @@ def pipeline(
endpoint_resource_name: Path the Online Prediction Endpoint. This will be an empty string if the model was not deployed.
"""
# fmt: on
upload_location = 'us-central1'
upload_location = location
adapter_artifact = kfp.dsl.importer(
artifact_uri=output_adapter_path,
artifact_class=kfp.dsl.Artifact,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,59 +22,62 @@

@dsl.component(base_image=_image.GCPC_IMAGE_TAG, install_kfp_package=False)
def resolve_machine_spec(
location: str,
accelerator_type: str = '',
use_test_spec: bool = False,
) -> NamedTuple(
'MachineSpec', machine_type=str, accelerator_type=str, accelerator_count=int
'MachineSpec',
machine_type=str,
tuning_location=str,
accelerator_count=int,
):
"""Returns machine spec to use for a given location.
"""Returns machine spec to use for a given accelerator_type.
Args:
location: Where the machine will run.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning runs
in europe-west4, else in us-central1. Default is 'GPU'.
use_test_spec: Whether to use a lower resource machine for testing.
Returns:
Machine spec.
tuning_location: Where the machine will run.
Raises:
ValueError: If accelerators are requested in an unsupported location.
"""
outputs = NamedTuple(
'MachineSpec',
machine_type=str,
accelerator_type=str,
accelerator_count=int,
tuning_location=str,
)
tpu_regions = {'europe-west4'}
gpu_regions = {'us-central1'}
if use_test_spec:
if location in tpu_regions:
if accelerator_type == 'TPU':
return outputs(
machine_type='cloud-tpu',
accelerator_type='TPU_V3',
accelerator_count=32,
tuning_location='europe-west4',
)
else:
return outputs(
machine_type='a2-highgpu-1g',
accelerator_type='NVIDIA_TESLA_A100',
accelerator_count=1,
tuning_location='us-central1',
)
elif location in tpu_regions:
elif accelerator_type == 'TPU':
return outputs(
machine_type='cloud-tpu',
accelerator_type='TPU_V3',
accelerator_count=64,
tuning_location='europe-west4',
)
elif location in gpu_regions:
elif accelerator_type == 'GPU':
return outputs(
machine_type='a2-ultragpu-8g',
accelerator_type='NVIDIA_A100_80GB',
accelerator_count=8,
tuning_location='us-central1',
)
raise ValueError(
f'Unsupported accelerator location {location}. Must be one of'
f' {tpu_regions | gpu_regions}.'
f'Unsupported accelerator type {accelerator_type}. Must be one of'
'TPU or GPU.'
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def pipeline(
kl_coeff: float = 0.1,
instruction: Optional[str] = None,
project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
accelerator_type: str = 'GPU',
location: str = _placeholders.LOCATION_PLACEHOLDER,
tensorboard_resource_id: Optional[str] = None,
encryption_spec_key_name: str = '',
Expand All @@ -73,6 +74,7 @@ def pipeline(
kl_coeff: Coefficient for KL penalty. This regularizes the policy model and penalizes if it diverges from its initial distribution. If set to 0, the reference language model is not loaded into memory. Default value is 0.1.
instruction: This field lets the model know what task it needs to perform. Base models have been trained over a large set of varied instructions. You can give a simple and intuitive description of the task and the model will follow it, e.g. "Classify this movie review as positive or negative" or "Translate this sentence to Danish". Do not specify this if your dataset already prepends the instruction to the inputs field.
project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning runs in europe-west4, else in us-central1. Default is 'GPU'.
location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used.
tensorboard_resource_id: Optional tensorboard resource id in format `projects/{project_number}/locations/{location}/tensorboards/{tensorboard_id}`. If provided, tensorboard metrics will be uploaded to this location.
encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment.
Expand All @@ -84,7 +86,8 @@ def pipeline(
# fmt: on
prompt_column = 'input_text'
machine_spec = function_based.resolve_machine_spec(
location=location, use_test_spec=env.get_use_test_machine_spec()
accelerator_type=accelerator_type,
use_test_spec=env.get_use_test_machine_spec(),
).set_display_name('Resolve Machine Spec')

reference_model_metadata = function_based.resolve_reference_model_metadata(
Expand Down Expand Up @@ -116,7 +119,7 @@ def pipeline(
.set_caching_options(False)
)
rl_image_uri = function_based.resolve_private_refined_image_uri(
accelerator_type=machine_spec.outputs['accelerator_type'],
accelerator_type=accelerator_type,
).set_display_name('Resolve Reinforcer Image URI')
num_microbatches = function_based.resolve_num_microbatches(
large_model_reference=reference_model_metadata.outputs[
Expand All @@ -126,7 +129,7 @@ def pipeline(
rl_model = (
reinforcer.reinforcer(
project=project,
location=location,
location=machine_spec.outputs['tuning_location'],
input_reference_model_path=reference_model_metadata.outputs[
'reference_model_path'
],
Expand All @@ -137,7 +140,7 @@ def pipeline(
],
input_preference_dataset_path=input_preference_dataset_path,
train_steps=reinforcement_learning_train_steps,
accelerator_type=machine_spec.outputs['accelerator_type'],
accelerator_type=accelerator_type,
accelerator_count=machine_spec.outputs['accelerator_count'],
large_model_reference=reference_model_metadata.outputs[
'large_model_reference'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def pipeline(
eval_dataset: Optional[str] = None,
instruction: Optional[str] = None,
project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
accelerator_type: str = 'GPU',
location: str = _placeholders.LOCATION_PLACEHOLDER,
tensorboard_resource_id: Optional[str] = None,
encryption_spec_key_name: str = '',
Expand All @@ -66,6 +67,7 @@ def pipeline(
reward_model_train_steps: Number of steps to use when training a reward model. Default value is 1000.
instruction: This field lets the model know what task it needs to perform. Base models have been trained over a large set of varied instructions. You can give a simple and intuitive description of the task and the model will follow it, e.g. "Classify this movie review as positive or negative" or "Translate this sentence to Danish". Do not specify this if your dataset already prepends the instruction to the inputs field.
project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning runs in europe-west4, else in us-central1. Default is 'GPU'.
location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used.
tensorboard_resource_id: Optional tensorboard resource id in format `projects/{project_number}/locations/{location}/tensorboards/{tensorboard_id}`. If provided, tensorboard metrics will be uploaded to this location.
encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment.
Expand All @@ -80,7 +82,8 @@ def pipeline(
candidate_columns = ['candidate_0', 'candidate_1']
choice_column = 'choice'
machine_spec = function_based.resolve_machine_spec(
location=location, use_test_spec=env.get_use_test_machine_spec()
accelerator_type=accelerator_type,
use_test_spec=env.get_use_test_machine_spec(),
).set_display_name('Resolve Machine Spec')

reference_model_metadata = function_based.resolve_reference_model_metadata(
Expand Down Expand Up @@ -140,7 +143,7 @@ def pipeline(
)

reward_model_image_uri = function_based.resolve_private_refined_image_uri(
accelerator_type=machine_spec.outputs['accelerator_type'],
accelerator_type=accelerator_type,
).set_display_name('Resolve Reward Model Image URI')
num_microbatches = function_based.resolve_num_microbatches(
large_model_reference=reference_model_metadata.outputs[
Expand All @@ -150,7 +153,7 @@ def pipeline(
reward_model = (
reward_model_trainer.reward_model_trainer(
project=project,
location=location,
location=machine_spec.outputs['tuning_location'],
input_model_path=reference_model_metadata.outputs[
'reward_model_path'
],
Expand All @@ -161,7 +164,7 @@ def pipeline(
'output_dataset_path'
],
train_steps=reward_model_train_steps,
accelerator_type=machine_spec.outputs['accelerator_type'],
accelerator_type=accelerator_type,
accelerator_count=machine_spec.outputs['accelerator_count'],
large_model_reference=reference_model_metadata.outputs[
'reward_model_reference'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
"""KFP Component for validate_pipeline."""

from typing import Optional
from typing import NamedTuple, Optional

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
Expand All @@ -25,10 +25,10 @@ def validate_pipeline(
large_model_reference: str,
location: str,
encryption_spec_key_name: str = '',
machine_type: str = '',
accelerator_type: str = '',
pipeline_region: str = '{{$.pipeline_google_cloud_location}}',
eval_dataset: Optional[str] = None,
):
) -> NamedTuple('outputs', model_location=str):
# fmt: off
"""Validate and preprocess pipeline parameters.
Expand All @@ -40,8 +40,8 @@ def validate_pipeline(
location: Region in which all the components except for tuning job should
run.
encryption_spec_key_name: If set, CMEK support will be validated.
machine_type: If 'tpu' is specified, tuning runs in
europe-west4, else in us-central1.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning
runs in europe-west4, else in us-central1
pipeline_region: The region the pipeline runs in.
eval_dataset: Optional Cloud storage path to an evaluation dataset. Note,
eval dataset can only be provided for third-party models. If provided,
Expand Down Expand Up @@ -75,13 +75,6 @@ def validate_pipeline(
'Prediction.'
)

if 'gpu' in machine_type:
accelerator_type = 'GPU'
elif 'tpu' in machine_type:
accelerator_type = 'TPU'
else:
accelerator_type = None

supported_pipeline_regions = {
'europe-west4',
'us-central1',
Expand All @@ -91,7 +84,6 @@ def validate_pipeline(
f'Unsupported pipeline region: {pipeline_region}. Must be one of'
f' {supported_pipeline_regions}.'
)

location = pipeline_region if not location else location

valid_cmek_config = location == 'us-central1' and accelerator_type == 'GPU'
Expand All @@ -101,6 +93,11 @@ def validate_pipeline(
' in us-central1. Please either unset encryption_spec_key_name or'
' create your pipeline in us-central1 to use GPU instead.'
)
outputs = NamedTuple('outputs', model_location=str)
ret = outputs(
model_location=location,
)
return ret
except Exception as e: # pylint: disable=broad-exception-caught
if isinstance(e, ValueError):
raise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def infer_pipeline(
sampling_strategy: str = 'greedy',
instruction: Optional[str] = None,
project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
accelerator_type: str = 'GPU',
location: str = _placeholders.LOCATION_PLACEHOLDER,
encryption_spec_key_name: str = '',
) -> PipelineOutput:
Expand All @@ -56,6 +57,7 @@ def infer_pipeline(
sampling_strategy: This field specifies the sampling strategy. The valid options are 'greedy' and 'temperature_sampling'.
instruction: This field lets the model know what task it needs to perform. Base models have been trained over a large set of varied instructions. You can give a simple and intuitive description of the task and the model will follow it, e.g. "Classify this movie review as positive or negative" or "Translate this sentence to Danish". Do not specify this if your dataset already prepends the instruction to the inputs field.
project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning runs in europe-west4, else in us-central1. Default is 'GPU'.
location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used.
encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment.
Expand All @@ -65,7 +67,7 @@ def infer_pipeline(
# fmt: on
prompt_column = 'input_text'
machine_spec = function_based.resolve_machine_spec(
location=location,
accelerator_type=accelerator_type,
use_test_spec=env.get_use_test_machine_spec(),
).set_display_name('Resolve Machine Spec')
reference_model_metadata = function_based.resolve_reference_model_metadata(
Expand Down Expand Up @@ -103,11 +105,11 @@ def infer_pipeline(
)

bulk_inferrer_image_uri = function_based.resolve_private_refined_image_uri(
accelerator_type=machine_spec.outputs['accelerator_type'],
accelerator_type=accelerator_type,
).set_display_name('Resolve Bulk Inferrer Image URI')
bulk_inference = bulk_inferrer.bulk_inferrer(
project=project,
location=location,
location=machine_spec.outputs['tuning_location'],
input_model=reference_model_metadata.outputs['reference_model_path'],
input_dataset_path=prompt_dataset_importer.outputs['imported_data_path'],
dataset_split=env.TRAIN_SPLIT,
Expand All @@ -117,7 +119,7 @@ def infer_pipeline(
'large_model_reference'
],
sampling_strategy=sampling_strategy,
accelerator_type=machine_spec.outputs['accelerator_type'],
accelerator_type=accelerator_type,
accelerator_count=machine_spec.outputs['accelerator_count'],
machine_type=machine_spec.outputs['machine_type'],
image_uri=bulk_inferrer_image_uri.output,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def rlhf_pipeline(
deploy_model: bool = True,
eval_dataset: Optional[str] = None,
project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
accelerator_type: str = 'GPU',
location: str = _placeholders.LOCATION_PLACEHOLDER,
encryption_spec_key_name: str = '',
tensorboard_resource_id: Optional[str] = None,
Expand All @@ -73,6 +74,7 @@ def rlhf_pipeline(
deploy_model: Whether to deploy the model to an endpoint in `us-central1`. Default is True.
eval_dataset: Optional Cloud storage path to an evaluation dataset. The dataset format is jsonl. The evaluation dataset can be used to compute train-time metrics (when training a reward model) or perform bulk inference for third-party models. To compute train-time metrics this dataset must contain the same fields as the peference dataset. For bulk inference with third-party models only `input_text` is needed. Note, train-time metrics are only computed for the first 5000 samples in the dataset for efficient evaluation during training.
project: Project used to run custom jobs. If not specified the project used to run the pipeline will be used.
accelerator_type: One of 'TPU' or 'GPU'. If 'TPU' is specified, tuning runs in europe-west4, else in us-central1. Default is 'GPU'.
location: Location used to run custom jobs. If not specified the location used to run the pipeline will be used.
encryption_spec_key_name: Customer-managed encryption key. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. Note that this is not supported for TPU at the moment.
tensorboard_resource_id: Optional tensorboard resource id in format `projects/{project_number}/locations/{location}/tensorboards/{tensorboard_id}`. If provided, tensorboard metrics will be uploaded to this location.
Expand All @@ -91,11 +93,12 @@ def rlhf_pipeline(
reward_lora_dim = 4

machine_spec = function_based.resolve_machine_spec(
location=location, use_test_spec=env.get_use_test_machine_spec()
).set_display_name('Resolve Machine Spec')
accelerator_type=accelerator_type,
use_test_spec=env.get_use_test_machine_spec(),
).set_display_name('Resolve Machine Spec and Tuning Location')

validate_pipeline_task = validate_pipeline.validate_pipeline(
machine_type=machine_spec.outputs['machine_type'],
accelerator_type=accelerator_type,
location=location,
encryption_spec_key_name=encryption_spec_key_name,
large_model_reference=large_model_reference,
Expand All @@ -116,6 +119,7 @@ def rlhf_pipeline(
lora_dim=reward_lora_dim,
project=project,
location=location,
accelerator_type=accelerator_type,
tensorboard_resource_id=tensorboard_resource_id,
encryption_spec_key_name=encryption_spec_key_name,
)
Expand Down Expand Up @@ -180,6 +184,7 @@ def rlhf_pipeline(
model_display_name=model_display_name,
deploy_model=deploy_model,
encryption_spec_key_name=encryption_spec_key_name,
location=validate_pipeline_task.outputs['model_location'],
).set_display_name('Upload and Deploy Tuned Model')

return PipelineOutput(
Expand Down

0 comments on commit 52d499f

Please sign in to comment.