Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LLM-VM does not support multiple GPUs currently #411

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/llm_vm/completion/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,80 @@
# we need to package-ify so this works
import llm_vm.completion.data_synthesis as data_synthesis
import inspect
from __future__ import annotations


from typing import Optional


from google.auth import default
from google.cloud import aiplatform
import pandas as pd
import vertexai
from vertexai.language_models import TextGenerationModel
from vertexai.preview.language_models import TuningEvaluationSpec


credentials, _ = default(scopes=["https://www.googleapis.com/auth/cloud-platform"])



def tuning(
project_id: str,
location: str,
model_display_name: str,
training_data: pd.DataFrame | str,
train_steps: int = 10,
evaluation_dataset: Optional[str] = None,
tensorboard_instance_name: Optional[str] = None,
) -> TextGenerationModel:
"""Tune a new model, based on a prompt-response data.
"training_data" can be either the GCS URI of a file formatted in JSONL format
(for example: training_data=f'gs://{bucket}/{filename}.jsonl'), or a pandas
DataFrame. Each training example should be JSONL record with two keys, for
example:
{
"input_text": <input prompt>,
"output_text": <associated output>
},
or the pandas DataFame should contain two columns:
['input_text', 'output_text']
with rows for each training example.
Args:
project_id: GCP Project ID, used to initialize vertexai
location: GCP Region, used to initialize vertexai
model_display_name: Customized Tuned LLM model name.
training_data: GCS URI of jsonl file or pandas dataframe of training data.
train_steps: Number of training steps to use when tuning the model.
evaluation_dataset: GCS URI of jsonl file of evaluation data.
tensorboard_instance_name: The full name of the existing Vertex AI TensorBoard instance:
projects/PROJECT_ID/locations/LOCATION_ID/tensorboards/TENSORBOARD_INSTANCE_ID
Note that this instance must be in the same region as your tuning job.
"""
vertexai.init(project=project_id, location=location, credentials=credentials)
eval_spec = TuningEvaluationSpec(evaluation_data=evaluation_dataset)
eval_spec.tensorboard = aiplatform.Tensorboard(
tensorboard_name=tensorboard_instance_name
)
model = TextGenerationModel.from_pretrained("text-bison@001")

model.tune_model(
training_data=training_data,
# Optional:
model_display_name=model_display_name,
train_steps=train_steps,
tuning_job_location="europe-west4",
tuned_model_location=location,
tuning_evaluation_spec=eval_spec,
)

print(model._job.status)

return model


if __name__ == "__main__":
tuning()


job_id = None # we want to be able to cancel a fine_tune if you kill the program
Expand Down
134 changes: 134 additions & 0 deletions src/llm_vm/onsite_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,140 @@
from sentence_transformers import SentenceTransformer


def train_func(config):
"""Your training function that will be launched on each worker."""

# Unpack training configs
lr = config["lr"]
seed = config["seed"]
num_epochs = config["num_epochs"]
train_batch_size = config["train_batch_size"]
eval_batch_size = config["eval_batch_size"]
train_ds_size = config["train_dataset_size"]

set_seed(seed)

# Initialize accelerator
accelerator = Accelerator()

# Load datasets and metrics
metric = evaluate.load("glue", "mrpc")

# Prepare Ray Data loaders
# ====================================================
train_ds = ray.train.get_dataset_shard("train")
eval_ds = ray.train.get_dataset_shard("validation")

tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

def collate_fn(batch):
outputs = tokenizer(
list(batch["sentence1"]),
list(batch["sentence2"]),
truncation=True,
padding="longest",
return_tensors="pt",
)
outputs["labels"] = torch.LongTensor(batch["label"])
outputs = {k: v.to(accelerator.device) for k, v in outputs.items()}
return outputs

train_dataloader = train_ds.iter_torch_batches(
batch_size=train_batch_size, collate_fn=collate_fn
)
eval_dataloader = eval_ds.iter_torch_batches(
batch_size=eval_batch_size, collate_fn=collate_fn
)
# ====================================================

# Instantiate the model, optimizer, lr_scheduler
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", return_dict=True
)

optimizer = AdamW(params=model.parameters(), lr=lr)

steps_per_epoch = train_ds_size // (accelerator.num_processes * train_batch_size)
lr_scheduler = get_linear_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=100,
num_training_steps=(steps_per_epoch * num_epochs),
)

# Prepare everything with accelerator
model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler)

for epoch in range(num_epochs):
# Training
model.train()
for batch in train_dataloader:
outputs = model(**batch)
loss = outputs.loss
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()

# Evaluation
model.eval()
for batch in eval_dataloader:
with torch.no_grad():
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)

predictions, references = accelerator.gather_for_metrics(
(predictions, batch["labels"])
)
metric.add_batch(
predictions=predictions,
references=references,
)

eval_metric = metric.compute()
accelerator.print(f"epoch {epoch}:", eval_metric)

# Report Checkpoint and metrics to Ray Train
# ==========================================
with TemporaryDirectory() as tmpdir:
if accelerator.is_main_process:
unwrapped_model = accelerator.unwrap_model(model)
accelerator.save(unwrapped_model, f"{tmpdir}/ckpt_{epoch}.bin")
checkpoint = Checkpoint.from_directory(tmpdir)
else:
checkpoint = None
ray.train.report(metrics=eval_metric, checkpoint=checkpoint)


if __name__ == "__main__":
config = {
"lr": 2e-5,
"num_epochs": 3,
"seed": 42,
"train_batch_size": 16,
"eval_batch_size": 32,
}

# Prepare Ray Datasets
hf_datasets = load_dataset("glue", "mrpc")
ray_datasets = {
"train": ray.data.from_huggingface(hf_datasets["train"]),
"validation": ray.data.from_huggingface(hf_datasets["validation"]),
}
config["train_dataset_size"] = ray_datasets["train"].count()

trainer = TorchTrainer(
train_func,
train_loop_config=config,
datasets=ray_datasets,
dataset_config=DataConfig(datasets_to_split=["train", "validation"]),
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)

result = trainer.fit()

# __accelerate_torch_basic_example_end__



__private_key_value_models_map = {}
# [] {
Expand Down