From 1d13f4b012e43a34a84d52f8a901593e4bfb0d1c Mon Sep 17 00:00:00 2001 From: green Date: Thu, 3 Oct 2024 23:03:29 -0400 Subject: [PATCH] inital changes into enable openai finetuning --- data/dataset_info.json | 15 ++ data/oai.json | 162 +++++++++++++ examples/train_lora/llama3_lora_sft.yaml | 6 +- .../train_lora/openai_finetuning/client.py | 22 ++ .../openai_finetuning/training_data.jsonl | 19 ++ src/llamafactory/cli.py | 4 + src/llamafactory/data/loader.py | 25 +- src/llamafactory/data/parser.py | 44 ++-- src/llamafactory/openai_train/__init__.py | 6 + src/llamafactory/openai_train/file_ops.py | 101 ++++++++ src/llamafactory/openai_train/oai_train.py | 218 ++++++++++++++++++ src/llamafactory/openai_train/server.py | 186 +++++++++++++++ src/llamafactory/openai_train/utils.py | 17 ++ 13 files changed, 799 insertions(+), 26 deletions(-) create mode 100644 data/oai.json create mode 100644 examples/train_lora/openai_finetuning/client.py create mode 100644 examples/train_lora/openai_finetuning/training_data.jsonl create mode 100644 src/llamafactory/openai_train/__init__.py create mode 100644 src/llamafactory/openai_train/file_ops.py create mode 100644 src/llamafactory/openai_train/oai_train.py create mode 100644 src/llamafactory/openai_train/server.py create mode 100644 src/llamafactory/openai_train/utils.py diff --git a/data/dataset_info.json b/data/dataset_info.json index f0ee130c94..2e22e22db6 100644 --- a/data/dataset_info.json +++ b/data/dataset_info.json @@ -622,5 +622,20 @@ "prompt": "content" }, "folder": "python" + }, + + "oai_fine": { + "file_name": "oai", + "formatting": "sharegpt", + "columns": { + "messages": "messages" + }, + "tags": { + "role_tag": "role", + "content_tag": "content", + "user_tag": "user", + "assistant_tag": "assistant", + "system_tag": "system" + } } } \ No newline at end of file diff --git a/data/oai.json b/data/oai.json new file mode 100644 index 0000000000..0d905f6f0e --- /dev/null +++ b/data/oai.json @@ -0,0 +1,162 @@ +[ + { + "messages": [ + { + "role": "system", + "content": "You are a knowledgeable and encouraging fitness guide." + }, + { + "role": "user", + "content": "I want to start working out but I'm not sure where to begin." + }, + { + "role": "assistant", + "content": "That's great that you want to start your fitness journey! A good way to begin is by setting small, achievable goals. Maybe start with a 15-minute walk each day and some basic bodyweight exercises like squats and push-ups. Remember, consistency is key!" + } + ] + }, + { + "messages": [ + { + "role": "system", + "content": "You are a knowledgeable and encouraging fitness guide." + }, + { + "role": "user", + "content": "I'm trying to lose weight. Should I focus more on diet or exercise?" + }, + { + "role": "assistant", + "content": "Both diet and exercise are important for weight loss, but diet typically plays a larger role. Focus on creating a slight calorie deficit through healthier food choices and portion control. Combine this with regular exercise for best results. Remember, sustainable weight loss is a gradual process!" + } + ] + }, + { + "messages": [ + { + "role": "system", + "content": "You are a knowledgeable and encouraging fitness guide." + }, + { + "role": "user", + "content": "What's the best way to build muscle?" + }, + { + "role": "assistant", + "content": "Building muscle requires a combination of resistance training and proper nutrition. Focus on compound exercises like squats, deadlifts, and bench presses. Aim for progressive overload by gradually increasing weights or reps. Ensure you're eating enough protein and calories to support muscle growth. Don't forget rest days for recovery!" + } + ] + }, + { + "messages": [ + { + "role": "system", + "content": "You are a knowledgeable and encouraging fitness guide." + }, + { + "role": "user", + "content": "How often should I work out?" + }, + { + "role": "assistant", + "content": "The frequency of your workouts depends on your goals and current fitness level. For general health, aim for at least 150 minutes of moderate-intensity exercise per week. This could be 30 minutes, 5 days a week. If you're strength training, 2-3 sessions per week with rest days in between is a good start. Listen to your body and adjust as needed!" + } + ] + }, + { + "messages": [ + { + "role": "system", + "content": "You are a knowledgeable and encouraging fitness guide." + }, + { + "role": "user", + "content": "I get bored easily with workouts. Any tips?" + }, + { + "role": "assistant", + "content": "Keeping workouts interesting is key to staying motivated! Try varying your routine with different types of exercises like HIIT, yoga, or dance classes. You could also explore outdoor activities like hiking or cycling. Setting new challenges or goals can help too. Remember, fitness should be enjoyable!" + } + ] + }, + { + "messages": [ + { + "role": "system", + "content": "You are a knowledgeable and encouraging fitness guide." + }, + { + "role": "user", + "content": "What should I eat before and after a workout?" + }, + { + "role": "assistant", + "content": "Great question! Before a workout, eat a meal with carbs and some protein about 2-3 hours prior, or a small snack 30-60 minutes before. After your workout, aim for a combination of protein and carbs within 30 minutes to an hour. This helps with recovery and muscle growth. Stay hydrated before, during, and after exercise too!" + } + ] + }, + { + "messages": [ + { + "role": "system", + "content": "You are a knowledgeable and encouraging fitness guide." + }, + { + "role": "user", + "content": "I'm not seeing results from my workouts. What am I doing wrong?" + }, + { + "role": "assistant", + "content": "Don't get discouraged! Results take time. Make sure you're challenging yourself enough - progressive overload is key. Also, check your diet - are you eating in line with your goals? Sleep and stress management are important too. Keep a workout journal to track progress. Sometimes small changes are happening that you don't notice right away. Stay consistent and patient!" + } + ] + }, + { + "messages": [ + { + "role": "system", + "content": "You are a knowledgeable and encouraging fitness guide." + }, + { + "role": "user", + "content": "How can I improve my flexibility?" + }, + { + "role": "assistant", + "content": "Improving flexibility is great for overall fitness! Try incorporating regular stretching into your routine, especially after workouts when muscles are warm. Yoga or Pilates classes can be helpful. Focus on major muscle groups and hold stretches for 15-30 seconds. Remember to breathe deeply and never stretch to the point of pain. Consistency is key for flexibility gains!" + } + ] + }, + { + "messages": [ + { + "role": "system", + "content": "You are a knowledgeable and encouraging fitness guide." + }, + { + "role": "user", + "content": "I have bad knees. What exercises can I do?" + }, + { + "role": "assistant", + "content": "There are many low-impact exercises you can do with bad knees! Swimming and water aerobics are excellent options. Stationary biking, elliptical machines, and rowing can also be good. For strength training, focus on exercises that don't put direct stress on the knees, like seated leg extensions or hamstring curls. Always consult with a doctor or physical therapist for personalized advice." + } + ] + }, + { + "messages": [ + { + "role": "system", + "content": "You are a knowledgeable and encouraging fitness guide." + }, + { + "role": "user", + "content": "How do I stay motivated to exercise regularly?" + }, + { + "role": "assistant", + "content": "Staying motivated is key to a successful fitness journey! Set clear, achievable goals and track your progress. Find activities you enjoy - fitness should be fun! Consider working out with a friend or joining a class for accountability. Reward yourself for meeting milestones. Remember why you started and visualize your end goal. It's okay to have off days, just get back on track as soon as you can. You've got this!" + } + ] + } +] \ No newline at end of file diff --git a/examples/train_lora/llama3_lora_sft.yaml b/examples/train_lora/llama3_lora_sft.yaml index 55a8077ec7..787880b8fd 100644 --- a/examples/train_lora/llama3_lora_sft.yaml +++ b/examples/train_lora/llama3_lora_sft.yaml @@ -1,5 +1,5 @@ ### model -model_name_or_path: meta-llama/Meta-Llama-3-8B-Instruct +model_name_or_path: microsoft/Phi-3.5-mini-instruct ### method stage: sft @@ -8,8 +8,8 @@ finetuning_type: lora lora_target: all ### dataset -dataset: identity,alpaca_en_demo -template: llama3 +dataset: oai_fine +template: phi cutoff_len: 1024 max_samples: 1000 overwrite_cache: true diff --git a/examples/train_lora/openai_finetuning/client.py b/examples/train_lora/openai_finetuning/client.py new file mode 100644 index 0000000000..923f594da4 --- /dev/null +++ b/examples/train_lora/openai_finetuning/client.py @@ -0,0 +1,22 @@ +import os +from openai import OpenAI + +client = OpenAI(base_url='http://127.0.0.1:8000', + api_key='sk-6TWftpgBjwbF3nCnHIeYT3klbkFJhG1jHR0LFZ4RYLzPRBz1' + ) + + + +file = client.files.create( + file=open("training_data.jsonl", "rb"), + purpose="fine-tune" +) + +res =client.fine_tuning.jobs.create( + training_file=file.id, + model="microsoft/Phi-3.5-mini-instruct", +) + +progress = client.fine_tuning.jobs.list_events(fine_tuning_job_id=res.id, limit=10) +print(progress) + diff --git a/examples/train_lora/openai_finetuning/training_data.jsonl b/examples/train_lora/openai_finetuning/training_data.jsonl new file mode 100644 index 0000000000..0fb862e9ad --- /dev/null +++ b/examples/train_lora/openai_finetuning/training_data.jsonl @@ -0,0 +1,19 @@ +{"messages": [{"role": "system", "content": "You are a knowledgeable and encouraging fitness guide."}, {"role": "user", "content": "I want to start working out but I'm not sure where to begin."}, {"role": "assistant", "content": "That's great that you want to start your fitness journey! A good way to begin is by setting small, achievable goals. Maybe start with a 15-minute walk each day and some basic bodyweight exercises like squats and push-ups. Remember, consistency is key!"}]} + +{"messages": [{"role": "system", "content": "You are a knowledgeable and encouraging fitness guide."}, {"role": "user", "content": "I'm trying to lose weight. Should I focus more on diet or exercise?"}, {"role": "assistant", "content": "Both diet and exercise are important for weight loss, but diet typically plays a larger role. Focus on creating a slight calorie deficit through healthier food choices and portion control. Combine this with regular exercise for best results. Remember, sustainable weight loss is a gradual process!"}]} + +{"messages": [{"role": "system", "content": "You are a knowledgeable and encouraging fitness guide."}, {"role": "user", "content": "What's the best way to build muscle?"}, {"role": "assistant", "content": "Building muscle requires a combination of resistance training and proper nutrition. Focus on compound exercises like squats, deadlifts, and bench presses. Aim for progressive overload by gradually increasing weights or reps. Ensure you're eating enough protein and calories to support muscle growth. Don't forget rest days for recovery!"}]} + +{"messages": [{"role": "system", "content": "You are a knowledgeable and encouraging fitness guide."}, {"role": "user", "content": "How often should I work out?"}, {"role": "assistant", "content": "The frequency of your workouts depends on your goals and current fitness level. For general health, aim for at least 150 minutes of moderate-intensity exercise per week. This could be 30 minutes, 5 days a week. If you're strength training, 2-3 sessions per week with rest days in between is a good start. Listen to your body and adjust as needed!"}]} + +{"messages": [{"role": "system", "content": "You are a knowledgeable and encouraging fitness guide."}, {"role": "user", "content": "I get bored easily with workouts. Any tips?"}, {"role": "assistant", "content": "Keeping workouts interesting is key to staying motivated! Try varying your routine with different types of exercises like HIIT, yoga, or dance classes. You could also explore outdoor activities like hiking or cycling. Setting new challenges or goals can help too. Remember, fitness should be enjoyable!"}]} + +{"messages": [{"role": "system", "content": "You are a knowledgeable and encouraging fitness guide."}, {"role": "user", "content": "What should I eat before and after a workout?"}, {"role": "assistant", "content": "Great question! Before a workout, eat a meal with carbs and some protein about 2-3 hours prior, or a small snack 30-60 minutes before. After your workout, aim for a combination of protein and carbs within 30 minutes to an hour. This helps with recovery and muscle growth. Stay hydrated before, during, and after exercise too!"}]} + +{"messages": [{"role": "system", "content": "You are a knowledgeable and encouraging fitness guide."}, {"role": "user", "content": "I'm not seeing results from my workouts. What am I doing wrong?"}, {"role": "assistant", "content": "Don't get discouraged! Results take time. Make sure you're challenging yourself enough - progressive overload is key. Also, check your diet - are you eating in line with your goals? Sleep and stress management are important too. Keep a workout journal to track progress. Sometimes small changes are happening that you don't notice right away. Stay consistent and patient!"}]} + +{"messages": [{"role": "system", "content": "You are a knowledgeable and encouraging fitness guide."}, {"role": "user", "content": "How can I improve my flexibility?"}, {"role": "assistant", "content": "Improving flexibility is great for overall fitness! Try incorporating regular stretching into your routine, especially after workouts when muscles are warm. Yoga or Pilates classes can be helpful. Focus on major muscle groups and hold stretches for 15-30 seconds. Remember to breathe deeply and never stretch to the point of pain. Consistency is key for flexibility gains!"}]} + +{"messages": [{"role": "system", "content": "You are a knowledgeable and encouraging fitness guide."}, {"role": "user", "content": "I have bad knees. What exercises can I do?"}, {"role": "assistant", "content": "There are many low-impact exercises you can do with bad knees! Swimming and water aerobics are excellent options. Stationary biking, elliptical machines, and rowing can also be good. For strength training, focus on exercises that don't put direct stress on the knees, like seated leg extensions or hamstring curls. Always consult with a doctor or physical therapist for personalized advice."}]} + +{"messages": [{"role": "system", "content": "You are a knowledgeable and encouraging fitness guide."}, {"role": "user", "content": "How do I stay motivated to exercise regularly?"}, {"role": "assistant", "content": "Staying motivated is key to a successful fitness journey! Set clear, achievable goals and track your progress. Find activities you enjoy - fitness should be fun! Consider working out with a friend or joining a class for accountability. Reward yourself for meeting milestones. Remember why you started and visualize your end goal. It's okay to have off days, just get back on track as soon as you can. You've got this!"}]} \ No newline at end of file diff --git a/src/llamafactory/cli.py b/src/llamafactory/cli.py index 8012d85549..a43f80ca53 100644 --- a/src/llamafactory/cli.py +++ b/src/llamafactory/cli.py @@ -25,6 +25,7 @@ from .extras.env import VERSION, print_env from .extras.logging import get_logger from .extras.misc import get_device_count +from .openai_train import run_oai_train from .train.tuner import export_model, run_exp from .webui.interface import run_web_demo, run_web_ui @@ -71,6 +72,7 @@ class Command(str, Enum): WEBUI = "webui" VER = "version" HELP = "help" + OAI_TRAIN = "openai_train" def main(): @@ -81,6 +83,8 @@ def main(): run_chat() elif command == Command.ENV: print_env() + elif command == Command.OAI_TRAIN: + run_oai_train() elif command == Command.EVAL: run_eval() elif command == Command.EXPORT: diff --git a/src/llamafactory/data/loader.py b/src/llamafactory/data/loader.py index 362d57e9f8..fd120ddc6f 100644 --- a/src/llamafactory/data/loader.py +++ b/src/llamafactory/data/loader.py @@ -15,9 +15,9 @@ import os import sys from typing import TYPE_CHECKING, Dict, Literal, Optional, Sequence, Union - +import json import numpy as np -from datasets import DatasetDict, load_dataset, load_from_disk +from datasets import DatasetDict, load_dataset, load_from_disk, Dataset, features from transformers.utils.versions import require_version from ..extras.constants import FILEEXT2TYPE @@ -137,6 +137,8 @@ def _load_single_dataset( return align_dataset(dataset, dataset_attr, data_args, training_args) + + def _get_merged_dataset( dataset_names: Optional[Sequence[str]], model_args: "ModelArguments", @@ -144,21 +146,30 @@ def _get_merged_dataset( training_args: "Seq2SeqTrainingArguments", stage: Literal["pt", "sft", "rm", "ppo", "kto"], ) -> Optional[Union["Dataset", "IterableDataset"]]: - r""" + """ Gets the merged datasets in the standard format. """ if dataset_names is None: return None datasets = [] - for dataset_attr in get_dataset_list(dataset_names, data_args.dataset_dir): - if (stage == "rm" and dataset_attr.ranking is False) or (stage != "rm" and dataset_attr.ranking is True): - raise ValueError("The dataset is not applicable in the current training stage.") + if dataset_names[0] == 'oai_finetune': + if len(dataset_names) != 2: + raise ValueError("For OAI fine-tuning, dataset_names should contain exactly two elements: 'oai_finetune' and the data file path.") + + oai_dataset_path = dataset_names[1] + dataset_attr = get_dataset_list(["oai_dataset"], data_args.dataset_dir, isOai=True, oai_dataset=oai_dataset_path)[0] datasets.append(_load_single_dataset(dataset_attr, model_args, data_args, training_args)) + else: + for dataset_attr in get_dataset_list(dataset_names, data_args.dataset_dir): + if (stage == "rm" and dataset_attr.ranking is False) or (stage != "rm" and dataset_attr.ranking is True): + raise ValueError("The dataset is not applicable in the current training stage.") + datasets.append(_load_single_dataset(dataset_attr, model_args, data_args, training_args)) - return merge_dataset(datasets, data_args, seed=training_args.seed) + merged_dataset = merge_dataset(datasets, data_args, seed=training_args.seed) + return merged_dataset def _get_preprocessed_dataset( dataset: Optional[Union["Dataset", "IterableDataset"]], diff --git a/src/llamafactory/data/parser.py b/src/llamafactory/data/parser.py index 15a6eab8d3..f0e9dd8049 100644 --- a/src/llamafactory/data/parser.py +++ b/src/llamafactory/data/parser.py @@ -71,14 +71,34 @@ def set_attr(self, key: str, obj: Dict[str, Any], default: Optional[Any] = None) setattr(self, key, obj.get(key, default)) -def get_dataset_list(dataset_names: Optional[Sequence[str]], dataset_dir: str) -> List["DatasetAttr"]: - r""" + +def get_dataset_list(dataset_names: Optional[Sequence[str]], dataset_dir: str, isOai=False, oai_dataset=None) -> List["DatasetAttr"]: + """ Gets the attributes of the datasets. """ if dataset_names is None: dataset_names = [] - if dataset_dir == "ONLINE": + if isOai and oai_dataset is not None: + # Use the provided OAI dataset configuration + dataset_info = { + "oai_dataset": { + "file_name": oai_dataset, + "formatting": "sharegpt", + "columns": { + "messages": "messages" + }, + "tags": { + "role_tag": "role", + "content_tag": "content", + "user_tag": "user", + "assistant_tag": "assistant", + "system_tag": "system" + } + } + } + dataset_names = ["oai_dataset"] + elif dataset_dir == "ONLINE": dataset_info = None else: if dataset_dir.startswith("REMOTE:"): @@ -91,8 +111,7 @@ def get_dataset_list(dataset_names: Optional[Sequence[str]], dataset_dir: str) - dataset_info = json.load(f) except Exception as err: if len(dataset_names) != 0: - raise ValueError("Cannot open {} due to {}.".format(config_path, str(err))) - + raise ValueError(f"Cannot open {config_path} due to {str(err)}.") dataset_info = None dataset_list: List["DatasetAttr"] = [] @@ -100,17 +119,10 @@ def get_dataset_list(dataset_names: Optional[Sequence[str]], dataset_dir: str) - if dataset_info is None: # dataset_dir is ONLINE load_from = "ms_hub" if use_modelscope() else "hf_hub" dataset_attr = DatasetAttr(load_from, dataset_name=name) - dataset_list.append(dataset_attr) - continue - - if name not in dataset_info: - raise ValueError("Undefined dataset {} in {}.".format(name, DATA_CONFIG)) - - has_hf_url = "hf_hub_url" in dataset_info[name] - has_ms_url = "ms_hub_url" in dataset_info[name] - - if has_hf_url or has_ms_url: - if (use_modelscope() and has_ms_url) or (not has_hf_url): + elif isOai and name == "oai_dataset": + dataset_attr = DatasetAttr("file", dataset_name=dataset_info[name]["file_name"]) + elif "hf_hub_url" in dataset_info[name] or "ms_hub_url" in dataset_info[name]: + if (use_modelscope() and "ms_hub_url" in dataset_info[name]) or ("hf_hub_url" not in dataset_info[name]): dataset_attr = DatasetAttr("ms_hub", dataset_name=dataset_info[name]["ms_hub_url"]) else: dataset_attr = DatasetAttr("hf_hub", dataset_name=dataset_info[name]["hf_hub_url"]) diff --git a/src/llamafactory/openai_train/__init__.py b/src/llamafactory/openai_train/__init__.py new file mode 100644 index 0000000000..79b48b4543 --- /dev/null +++ b/src/llamafactory/openai_train/__init__.py @@ -0,0 +1,6 @@ +from .server import run_oai_train + + +__all__ = [ +"run_oai_train" +] diff --git a/src/llamafactory/openai_train/file_ops.py b/src/llamafactory/openai_train/file_ops.py new file mode 100644 index 0000000000..64c06e8862 --- /dev/null +++ b/src/llamafactory/openai_train/file_ops.py @@ -0,0 +1,101 @@ +# file_ops/file_operations.py + +import os +import uuid +import json +from openai.pagination import SyncPage +from openai.types import FileObject +from typing import List, Dict, Any +from werkzeug.datastructures import FileStorage + + +ROOT = os.path.expanduser(os.environ.get('ROOT_OAI_TRAIN', '~/.OAI_TRAIN')) +UPLOAD_FOLDER = os.path.expanduser(os.environ.get('UPLOAD_FOLDER', '~/.OAI_TRAIN/uploads')) +MASTER_CONFIG_FILE = os.path.expanduser(os.environ.get('MASTER_CONFIG', '~/.OAI_TRAIN/configs/master_config.json')) + +if not os.path.exists(UPLOAD_FOLDER): + os.makedirs(UPLOAD_FOLDER) + +def create_file(file: FileStorage, purpose: str="fine-tune") -> Dict[str, Any]: + if purpose in ["fine-tune", "batch"]: + if not file.filename.lower().endswith('.jsonl'): + raise ValueError(f"For {purpose} purpose, only .jsonl files are supported.") + + if purpose == "batch": + file_size = file.content_length + if file_size > 100 * 1024 * 1024: # 100 MB + raise ValueError("For batch purpose, file size must not exceed 100 MB.") + + name = str(uuid.uuid4()) # Generate a unique filename + filename= name+".jsonl" + file_path = os.path.join(UPLOAD_FOLDER, filename) + file.save(file_path) + + file_info = { + "id": name, + "purpose": purpose, + "filename": file.filename, + "bytes": os.path.getsize(file_path), + "created_at": int(os.path.getctime(file_path)), + "status": "uploaded", + 'object': 'file' # Add this line + } + + # Save metadata + with open(f"{os.path.join(UPLOAD_FOLDER, name)}.json", 'w') as f: + json.dump(file_info, f) + + return file_info + + +def list_files(purpose: str = None) -> SyncPage[FileObject]: + files: List[FileObject] = [] + for filename in os.listdir(UPLOAD_FOLDER): + if filename.endswith('.json'): + try: + with open(os.path.join(UPLOAD_FOLDER, filename), 'r') as f: + file_info = json.load(f) + if purpose is None or file_info['purpose'] == purpose: + print(file_info) + file_object = FileObject(**file_info) + files.append(file_object) + except Exception as e: + print(f"Error processing file {filename}: {e}") + continue + + return SyncPage(data=files, object="list") + + +def retrieve_file(file_id: str) -> Dict[str, Any]: + metadata_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.json") + if not os.path.exists(metadata_path): + raise FileNotFoundError(f"'No such File object: {file_id}") + + with open(metadata_path, 'r') as f: + return json.load(f) + +def delete_file(file_id: str) -> Dict[str, Any]: + file_path = os.path.join(UPLOAD_FOLDER, file_id) + metadata_path = f"{file_path}.json" + + if not os.path.exists(metadata_path): + raise FileNotFoundError(f"'No such File object: {file_id}") + + with open(metadata_path, 'r') as f: + file_info = json.load(f) + + os.remove(file_path) + os.remove(metadata_path) + tmp = {} + tmp['id'] = file_id + tmp['deleted']=True + tmp['object']='file' + return tmp + +def get_file_content(file_id: str) -> bytes: + file_path = os.path.join(UPLOAD_FOLDER, file_id) + if not os.path.exists(file_path): + raise FileNotFoundError(f"'No such File object: {file_id}") + + with open(file_path, 'rb') as f: + return f.read() \ No newline at end of file diff --git a/src/llamafactory/openai_train/oai_train.py b/src/llamafactory/openai_train/oai_train.py new file mode 100644 index 0000000000..5a11d3ed02 --- /dev/null +++ b/src/llamafactory/openai_train/oai_train.py @@ -0,0 +1,218 @@ +import os +import json +import uuid +import time +import yaml +import threading +from typing import Dict, Any, List, Optional + +# In-memory storage +job_queue = [] +job_info_storage = {} +job_events_storage = {} +queue_lock = threading.Lock() +import os +from .utils import convert_jsonl_to_json +ROOT = os.path.expanduser(os.environ.get('ROOT_OAI_TRAIN', '~/.OAI_TRAIN')) +UPLOAD_FOLDER = os.path.expanduser(os.environ.get('UPLOAD_FOLDER', '~/.OAI_TRAIN/uploads')) + + +MASTER_CONFIG_FILE = os.path.expanduser(os.environ.get('MASTER_CONFIG', '~/.OAI_TRAIN/configs/master_config.json')) + +from ..train.tuner import export_model, run_exp + + +def create_fine_tuning_job( + model: str, + training_file: str, + hyperparameters: Dict[str, Any] = {}, + validation_file: Optional[str] = None, + suffix: Optional[str] = None +) -> Dict[str, Any]: + job_id = str(uuid.uuid4()) + job_info = { + "id": job_id, + "model": model, + "training_file": training_file, + "validation_file": validation_file, + "hyperparameters": hyperparameters, + "suffix": suffix, + "status": "queued", + "created_at": int(time.time()), + } + + # Store job info in memory + job_info_storage[job_id] = job_info + + # Add job to queue and start processing + with queue_lock: + job_queue.append(job_id) + + # Start a new thread to process the job + threading.Thread(target=process_job, args=(job_id,)).start() + + return job_info + +def retrieve_fine_tuning_job(job_id: str) -> Dict[str, Any]: + job_info = job_info_storage.get(job_id) + if job_info is None: + raise ValueError(f"Job with id {job_id} not found") + return job_info + +def list_fine_tuning_jobs(after: Optional[str] = None, limit: int = 20) -> Dict[str, Any]: + all_jobs = list(job_info_storage.values()) + jobs = [] + for job_info in all_jobs: + if after is None or job_info['created_at'] > int(after): + jobs.append(job_info) + + # Sort jobs by created_at in descending order + jobs.sort(key=lambda x: x['created_at'], reverse=True) + + # Limit the number of jobs + jobs = jobs[:limit] + + # Prepare the response + response = { + "object": "list", + "data": jobs, + "has_more": len(all_jobs) > len(jobs) + } + + # Add pagination info if there are more jobs + if response["has_more"] and jobs: + response["after"] = jobs[-1]["id"] + + return response + +def list_fine_tuning_job_events(job_id: str, after: Optional[str] = None, limit: int = 20) -> Dict[str, Any]: + all_events = job_events_storage.get(job_id, []) + events = [] + + for event_data in all_events: + if after is None or event_data['id'] > after: + events.append(event_data) + + # Sort events by created_at in descending order + events.sort(key=lambda x: x['created_at'], reverse=True) + + # Limit the number of events + events = events[:limit] + + # Prepare the response + response = { + "object": "list", + "data": events, + "has_more": len(all_events) > len(events) + } + + return response + +def cancel_fine_tuning_job(job_id: str) -> Dict[str, Any]: + job_info = retrieve_fine_tuning_job(job_id) + if job_info['status'] in ['succeeded', 'failed', 'cancelled']: + raise ValueError(f"Cannot cancel job with status {job_info['status']}") + + job_info['status'] = 'cancelled' + job_info_storage[job_id] = job_info + + # Remove job from queue if it's still there + with queue_lock: + if job_id in job_queue: + job_queue.remove(job_id) + + return job_info + +# Helper function to add an event to a job +def add_job_event(job_id: str, message: str, level: str = "info"): + event = { + "object": "fine_tuning.job.event", + "id": str(uuid.uuid4()), + "created_at": int(time.time()), + "level": level, + "message": message, + } + if job_id not in job_events_storage: + job_events_storage[job_id] = [] + job_events_storage[job_id].append(event) + + # Update job status if necessary + if level == "error": + job_info = retrieve_fine_tuning_job(job_id) + job_info['status'] = 'failed' + job_info_storage[job_id] = job_info + + + + + +def load_json(file_path): + with open(file_path, 'r') as file: + return json.load(file) + +def process_job(job_id: str) -> None: + job = retrieve_fine_tuning_job(job_id) + print(f"\nProcessing job: {job_id}") + print("=" * 50) + + # Print job information + print(f"Model: {job.get('model', 'N/A')}") + print(f"Status: {job.get('status', 'N/A')}") + print(f"Created at: {job.get('created_at', 'N/A')}") + print(f"Training file: {job.get('training_file', 'N/A')}") + + + print("\nHyperparameters:") + hyperparams = job.get('hyperparameters', {}) + for param, value in hyperparams.items(): + print(f" {param}: {value}") + + print(f"\nOrganization ID: {job.get('organization_id', 'N/A')}") + print(f"Seed: {job.get('seed', 'N/A')}") + print(f"User provided suffix: {job.get('user_provided_suffix', 'N/A')}") + + print("=" * 50) + print("\nStarting model training...") + + # Update job status to 'running' + job['status'] = 'running' + job_info_storage[job_id] = job + + + config =json.load(open(os.path.expanduser("~/.OAI_TRAIN/configs/master.json"), "r")) + + config['model_name_or_path'] = job.get('model', 'N/A') + + config['output_dir'] = os.path.join(ROOT, "output_dir", str(job_id)) + config['num_train_epochs'] = float(hyperparams.get('n_epochs', config['num_train_epochs'])) + config['learning_rate'] = hyperparams.get('learning_rate_multiplier', config['learning_rate']) + + + training_file =os.path.join(ROOT, "uploads", f"data_{job_id}.json") + jsonl_file= os.path.join(UPLOAD_FOLDER, f"{job['training_file']}.jsonl") + convert_jsonl_to_json(jsonl_file ,training_file ) + config['dataset'] = ['oai_finetune',training_file] + config['template']='phi' + print(config) + run_exp(config) + + + + print(f"\nJob {job_id} completed successfully.") + + + # except Exception as e: + # # Update job status to 'failed' and add error information + # job['status'] = 'failed' + # job['error'] = str(e) + # job_info_storage[job_id] = job + + # print(f"\nJob {job_id} failed.") + # print(f"Error: {str(e)}") + + print("=" * 50) + + # Remove the job from the queue after processing + with queue_lock: + if job_id in job_queue: + job_queue.remove(job_id) \ No newline at end of file diff --git a/src/llamafactory/openai_train/server.py b/src/llamafactory/openai_train/server.py new file mode 100644 index 0000000000..9dd97b42b7 --- /dev/null +++ b/src/llamafactory/openai_train/server.py @@ -0,0 +1,186 @@ +from flask import Flask, request, jsonify, send_file +from typing import List, Dict, Any +import json +import subprocess +import os +from io import BytesIO +from openai.pagination import SyncPage +from openai.types import FileObject, FileDeleted +import threading +from .utils import load_api_keys + +app = Flask(__name__) +def pydantic_to_dict(obj: Any) -> Dict[str, Any]: + if isinstance(obj, (FileObject, SyncPage, FileDeleted)): + return obj.model_dump() + return obj + +# Import file operations +from .file_ops import ( + create_file, + retrieve_file, + list_files, + delete_file, + get_file_content +) + +# Import fine-tuning operations +from .oai_train import ( + create_fine_tuning_job, + retrieve_fine_tuning_job, + list_fine_tuning_jobs, + cancel_fine_tuning_job, + list_fine_tuning_job_events + +) + + +# stand in for db +API_KEYS = load_api_keys() + +def validate_api_key(): + auth_header = request.headers.get('Authorization') + api_key = auth_header.split('Bearer ')[1] + + if api_key not in API_KEYS: + return jsonify({"error": "Invalid or missing API key"}), 401 + return None + +@app.before_request +def before_request(): + if request.endpoint != 'healthcheck': # Skip API key check for healthcheck + error_response = validate_api_key() + if error_response: + return error_response + + +@app.route('/fine_tuning/jobs', methods=['POST']) +def create_job(): + data = request.json + print(data) + try: + result = jsonify(create_fine_tuning_job(**data)) + print(result) + return result, 201 + except ValueError as e: + return jsonify({"error": str(e)}), 400 + +@app.route('/fine_tuning/jobs/', methods=['GET']) +def retrieve_job(job_id: str): + try: + result = retrieve_fine_tuning_job(job_id) + return jsonify(result) + except ValueError as e: + return jsonify({"error": str(e)}), 404 + +@app.route('/fine_tuning/jobs', methods=['GET']) +def list_jobs(): + after = request.args.get('after') + limit = request.args.get('limit', default=20, type=int) + result = list_fine_tuning_jobs(after=after, limit=limit) + return jsonify(result) + +@app.route('/fine_tuning/jobs//cancel', methods=['POST']) +def cancel_job(job_id: str): + try: + result = cancel_fine_tuning_job(job_id) + return jsonify(result) + except ValueError as e: + return jsonify({"error": str(e)}), 400 + +@app.route('/fine_tuning/jobs//events', methods=['GET']) +def list_job_events(job_id: str): + after = request.args.get('after') + limit = request.args.get('limit', default=20, type=int) + result = list_fine_tuning_job_events(job_id, after=after, limit=limit) + return jsonify(result) + + + +@app.route('/files', methods=['POST']) +def create_file_route(): + if 'file' not in request.files: + return jsonify({"error": {"message": "No file part"}}), 400 + file = request.files['file'] + if file.filename == '': + return jsonify({"error": {"message": "No selected file"}}), 400 + purpose = request.form.get('purpose') + if not purpose: + return jsonify({"error": {"message": "Purpose is required"}}), 400 + + try: + result = create_file(file, purpose) + return jsonify(pydantic_to_dict(result)), 201 + except ValueError as e: + return jsonify({"error": {"message": str(e)}}), 400 + +@app.route('/files/', methods=['GET']) +def retrieve_file_route(file_id: str): + try: + result = retrieve_file(file_id) + return jsonify(pydantic_to_dict(result)) + except FileNotFoundError as e: + return jsonify({"error": {"message": str(e),'type': 'invalid_request_error', 'param': 'id', 'code': None}}), 404 + +@app.route('/files', methods=['GET']) +def list_files_route(): + purpose = request.args.get('purpose') + result = list_files(purpose=purpose) + return jsonify(pydantic_to_dict(result)) + +@app.route('/files/', methods=['DELETE']) +def delete_file_route(file_id: str): + try: + result = delete_file(file_id) + return jsonify(pydantic_to_dict(result)) + except FileNotFoundError as e: + return jsonify({"error": {"message": str(e),'type': 'invalid_request_error', 'param': 'id', 'code': None}}), 404 + +@app.route('/files//content', methods=['GET']) +def get_file_content_route(file_id: str): + try: + content = get_file_content(file_id) + return send_file( + BytesIO(content), + mimetype='application/octet-stream', + as_attachment=True, + download_name=file_id + ) + except FileNotFoundError as e: + return jsonify({"error": {"message": str(e),'type': 'invalid_request_error', 'param': 'id', 'code': None}}), 404 + + + +app = Flask(__name__) + +def run_oai_train() -> None: + oai_train_dir = os.path.expanduser("~/.OAI_TRAIN") + + # Remove existing directory if it exists + if os.path.exists(oai_train_dir): + shutil.rmtree(oai_train_dir) + + # Create necessary directories + os.makedirs(oai_train_dir, exist_ok=True) + os.makedirs(os.path.join(oai_train_dir, "uploads"), exist_ok=True) + os.makedirs(os.path.join(oai_train_dir, "configs"), exist_ok=True) + os.makedirs(os.path.join(oai_train_dir, "output_dir"), exist_ok=True) + + # Prepare configuration data + data = { + "model_name_or_path": "meta-llama/Llama-3.2-1B", + "num_train_epochs": 3.0, + "learning_rate": "5.0e-6", + "output_dir": os.path.join(oai_train_dir, "output_dir") + } + + # Write configuration to file + config_path = os.path.join(oai_train_dir, "configs", "master.json") + with open(config_path, "w") as f: + json.dump(data, f) + + # Set up and run the API + api_host = os.environ.get("API_HOST", "0.0.0.0") + api_port = int(os.environ.get("API_PORT", "8000")) + print(f"API is running on http://{api_host}:{api_port}") + app.run(host=api_host, port=api_port, debug=False) \ No newline at end of file diff --git a/src/llamafactory/openai_train/utils.py b/src/llamafactory/openai_train/utils.py new file mode 100644 index 0000000000..d4fb511c25 --- /dev/null +++ b/src/llamafactory/openai_train/utils.py @@ -0,0 +1,17 @@ +import json + + +def load_api_keys(): + #dummy api keys + return { "sk-6TWftpgBjwbF3nCnHIeYT3klbkFJhG1jHR0LFZ4RYLzPRBz1": "user1","key2": "user2"} + + + +def convert_jsonl_to_json(jsonl_file_path, json_file_path): + with open(jsonl_file_path, 'r') as jsonl_file: + json_lines = [line.strip() for line in jsonl_file if line.strip()] + + json_objects = [json.loads(line) for line in json_lines] + print(json_file_path) + with open(json_file_path, 'w') as json_file: + json.dump(json_objects, json_file, indent=4) \ No newline at end of file