Skip to content

Commit

Permalink
Revert Revert PROD-XXX - Use server porvided training code
Browse files Browse the repository at this point in the history
  • Loading branch information
YYL-Nimble committed May 27, 2024
1 parent 4bf7df1 commit e82fe1d
Showing 1 changed file with 14 additions and 93 deletions.
107 changes: 14 additions & 93 deletions execute.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""This module contains the code to execute the task."""

import json
import sys
import git
Expand All @@ -9,6 +9,7 @@
import subprocess
import numpy as np
import requests
import hashlib
import torch
from datetime import datetime
now = datetime.now()
Expand Down Expand Up @@ -51,58 +52,6 @@ def check_disk_space():
print_in_color(f"Total: {total / (2**30):.2f} GB", "\033[31m")
print_in_color(f"Used: {used / (2**30):.2f} GB", "\033[31m")
print_in_color(f"Free: {free / (2**30):.2f} GB", "\033[31m")


def execute(task_args):
"""This function executes the task."""
print_in_color("Starting training...", "\033[34m") # Blue for start

tokenizer = AutoTokenizer.from_pretrained(task_args["model_name"])

def tokenize_function(examples):
return tokenizer(
examples["text"], padding="max_length", truncation=True
)

def model_init():
return AutoModelForSequenceClassification.from_pretrained(task_args["model_name"], num_labels=task_args["num_labels"])

model = AutoModelForSequenceClassification.from_pretrained(
task_args["model_name"], num_labels=task_args["num_labels"]
)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

dataset = load_dataset(task_args["dataset_name"])
tokenized_datasets = dataset.map(tokenize_function, batched=True)

small_train_dataset = (
tokenized_datasets["train"].shuffle(seed=task_args["seed"]).select(range(task_args["num_rows"]))
)
small_eval_dataset = (
tokenized_datasets["train"].shuffle(seed=task_args["seed"]).select(range(task_args["num_rows"]))
)

training_args = TrainingArguments(
output_dir="my_model", evaluation_strategy="epoch", save_strategy='epoch', seed=task_args['seed']
)

trainer = Trainer(
model_init=model_init,
args=training_args,
train_dataset=small_train_dataset,
eval_dataset=small_eval_dataset,
compute_metrics=compute_metrics,
)
# Start the timer
start_time = time.time()
trainer.train()
trainer.save_model("my_model")
# End timer
end_time = time.time()
# Calculate total training time
training_duration = end_time - start_time
return training_duration

def print_in_color(text, color_code):
"""This function prints the text in the specified color."""
Expand Down Expand Up @@ -137,38 +86,8 @@ def get_gpu_name():
print(f"An error occurred while running nvidia-smi: {e}")
return []


def complete_task(wallet_address,training_duration=0, max_retries=5, retry_delay=10):
retries = 0
while retries < max_retries:
try:
url = f"{node_url}/complete_task"
files = {
"file1": open("my_model/config.json", "rb"),
"file2": open("my_model/training_args.bin", "rb"),
"file3": open("my_model/model.safetensors", "rb"),
}
json_data = json.dumps({"address": wallet_address})
files["r"] = (None, json_data, "application/json")
response = requests.post(url, files=files, timeout=600)
if response.status_code == 200:
log_task(wallet_address,training_duration,"Success")
return response.json()
else:
log_task(wallet_address,training_duration,"Failed")
raise Exception(f"Failed to complete task: {response.text}")
except Exception as e:
retries += 1
if retries == max_retries:
log_task(wallet_address,"","Failed")
raise e
else:
print(f"Retrying in {retry_delay} seconds... ({retries}/{max_retries})")
time.sleep(retry_delay)


def perform():
addr = sys.argv[1]
addr = sys.argv[1]
if addr is not None:
print_in_color(f"Address {addr} started to work.", "\033[33m")
while True:
Expand All @@ -179,18 +98,20 @@ def perform():
time.sleep(30)
gpu_names = get_gpu_name()
task_args = register_particle(addr, gpu_names)
print_in_color(f"Address {addr} received the task.", "\033[33m")
training_duration = execute(task_args)
print_in_color(f"Address {addr} executed the task.", "\033[32m")
complete_task(addr, training_duration)
print_in_color(f"Address {addr} completed the task. Waiting for next", "\033[32m")
shutil.rmtree("my_model")
print_in_color("### Deleted the model.", "\033[31m")
print_in_color("### Disk space:", "\033[31m")
check_disk_space()
globals()['hash'] = hashlib.md5(task_args["exec"].encode('utf-8')).hexdigest()
print(task_args)
print(f"Calculated hash: {globals()['hash']}")
exec(task_args["exec"])
time.sleep(30)
except Exception as e:
print_in_color(f"Error: {e}", "\033[31m")
finally:
if "model_dir" in globals():
shutil.rmtree(globals()["model_dir"])
print_in_color("### Deleted the model.", "\033[31m")
print_in_color("### Disk space:", "\033[31m")
check_disk_space()
return
else:
print_in_color("Address not provided.", "\033[31m")

Expand Down

0 comments on commit e82fe1d

Please sign in to comment.