From 7120dd843c18da304977e2c6b5d9081e228db38c Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Tue, 11 Jun 2024 19:32:15 +0800 Subject: [PATCH 01/10] add format check --- .github/workflows/format.yml | 41 ++++++++ .pre-commit-config.yaml | 28 ++++++ flagscale/auto_tuner/generate.py | 14 +-- flagscale/auto_tuner/prune/history.py | 76 +++++++++------ flagscale/auto_tuner/prune/pruner.py | 1 + flagscale/auto_tuner/record/recorder.py | 119 ++++++++++++++--------- flagscale/auto_tuner/search/algorithm.py | 12 ++- flagscale/auto_tuner/search/searcher.py | 10 +- flagscale/auto_tuner/tuner.py | 87 +++++++++-------- flagscale/auto_tuner/utils.py | 3 +- flagscale/datasets/sft_dataset.py | 56 +++++++---- flagscale/launcher/runner.py | 16 +-- flagscale/logger.py | 13 ++- flagscale/patches_utils.py | 9 +- megatron/tests/unit_tests/__init__.py | 3 +- 15 files changed, 324 insertions(+), 164 deletions(-) create mode 100644 .github/workflows/format.yml create mode 100644 .pre-commit-config.yaml diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml new file mode 100644 index 000000000..510db4a39 --- /dev/null +++ b/.github/workflows/format.yml @@ -0,0 +1,41 @@ +name: format + +on: + pull_request: + branches: [ "main" ] + types: [opened, synchronize, reopened] + +env: + CHECK_FILES: >- + flagscale/auto_tuner/*.py + flagscale/auto_tuner/prune/*.py + flagscale/auto_tuner/record/*.py + flagscale/auto_tuner/search/*.py + flagscale/launcher/*.py + flagscale/logger.py + flagscale/patches_utils.py + flagscale/datasets/sft_dataset.py + +jobs: + format: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.10 + uses: actions/setup-python@v2 + with: + python-version: "3.10" + - name: Install dependencies + run: | + pip install black isort + + - name: Run Black + run: >- + black --check --diff --include $CHECK_FILES ./ + + - name: Run Isort + run: | + isort --profile black --check --diff $CHECK_FILES + + diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 000000000..d63dccf7f --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,28 @@ +check_files: &check_files | + (?x)^( + flagscale/auto_tuner/.*\.py| + flagscale/auto_tuner/prune/\..*\.py| + flagscale/auto_tuner/record/\..*\.py| + flagscale/auto_tuner/search/\..*\.py| + flagscale/launcher/\..*\.py| + flagscale/logger\.py| + flagscale/patches_utils\.py| + flagscale/datasets/sft_dataset\.py + )$ + +repos: + - repo: local + hooks: + - id: black + name: black + entry: black + language: system + types: [python] + files: *check_files + - id: isort + name: isort + entry: isort + language: system + types: [python] + files: *check_files + args: ["--profile", "black"] diff --git a/flagscale/auto_tuner/generate.py b/flagscale/auto_tuner/generate.py index 18c0cd6b2..f5e4493dd 100644 --- a/flagscale/auto_tuner/generate.py +++ b/flagscale/auto_tuner/generate.py @@ -1,5 +1,5 @@ -import os import copy +import os class Generator: @@ -16,8 +16,7 @@ def __init__(self, config): "tensor_model_parallel_size": "tensor_model_parallel_size", "sequence_parallel": "sequence_parallel", "pipeline_model_parallel_size": "pipeline_model_parallel_size", - "num_layers_per_virtual_pipeline_stage": - "num_layers_per_virtual_pipeline_stage", + "num_layers_per_virtual_pipeline_stage": "num_layers_per_virtual_pipeline_stage", "recompute_method": "recompute_method", "recompute_granularity": "recompute_granularity", "recompute_num_layers": "recompute_num_layers", @@ -81,14 +80,15 @@ def gen(self, strategy): # Set train_iters of each task if "control" in config.experiment.auto_tuner: config.train.model.train_iters = config.experiment.auto_tuner.control.get( - "train_iters", 5) + "train_iters", 5 + ) else: config.train.model.train_iters = 5 # log dir - config.experiment.exp_dir = os.path.join(config.experiment.exp_dir, - "auto_tuner", - f"task_{strategy['idx']}") + config.experiment.exp_dir = os.path.join( + config.experiment.exp_dir, "auto_tuner", f"task_{strategy['idx']}" + ) return config diff --git a/flagscale/auto_tuner/prune/history.py b/flagscale/auto_tuner/prune/history.py index 5359d7a5a..0604d5f0c 100644 --- a/flagscale/auto_tuner/prune/history.py +++ b/flagscale/auto_tuner/prune/history.py @@ -1,4 +1,5 @@ import logging + from ..utils import beside _HISTORY_BASED_PRUNE_FUNC = [] @@ -26,8 +27,7 @@ def prune_by_micro_batch_size(config, strategy, history=[]): if retrieval: for item in retrieval: # performance prune - if item["micro_batch_size"] > micro_batch_size and item[ - "performance"]: + if item["micro_batch_size"] > micro_batch_size and item["performance"]: logger.info( f"The strategy {strategy} has been pruned by micro_batch_size performance." ) @@ -36,8 +36,7 @@ def prune_by_micro_batch_size(config, strategy, history=[]): strategy["pruned"] = True return True # memory prune - if item["micro_batch_size"] < micro_batch_size and item[ - "max_mem"] == "OOM": + if item["micro_batch_size"] < micro_batch_size and item["max_mem"] == "OOM": logger.info( f"The strategy {strategy} has been pruned by micro_batch_size memory." ) @@ -91,10 +90,13 @@ def prune_by_recompute(config, strategy, history=[]): strategy["pruned"] = True return True - if (use_recompute and item["use_recompute"] - and recompute_method == "block" - and recompute_method == item["recompute_method"] - and item["performance"]): + if ( + use_recompute + and item["use_recompute"] + and recompute_method == "block" + and recompute_method == item["recompute_method"] + and item["performance"] + ): if recompute_num_layers > item["recompute_num_layers"]: logger.info( f"The strategy {strategy} has been pruned by block recompute_num_layers performance." @@ -104,10 +106,13 @@ def prune_by_recompute(config, strategy, history=[]): strategy["pruned"] = True return True - if (use_recompute and item["use_recompute"] - and recompute_method == "uniform" - and recompute_method == item["recompute_method"] - and item["performance"]): + if ( + use_recompute + and item["use_recompute"] + and recompute_method == "uniform" + and recompute_method == item["recompute_method"] + and item["performance"] + ): if recompute_num_layers > item["recompute_num_layers"]: logger.info( f"The strategy {strategy} has been pruned by uniform recompute_num_layers performance." @@ -117,8 +122,7 @@ def prune_by_recompute(config, strategy, history=[]): strategy["pruned"] = True return True # memory prune - if not use_recompute and item["use_recompute"] and item[ - "max_mem"] == "OOM": + if not use_recompute and item["use_recompute"] and item["max_mem"] == "OOM": logger.info( f"The strategy {strategy} has been pruned by use_recompute memory." ) @@ -127,11 +131,16 @@ def prune_by_recompute(config, strategy, history=[]): strategy["pruned"] = True return True - if (use_recompute and item["use_recompute"] - and recompute_method == "uniform" - and recompute_method == item["recompute_method"]): - if (recompute_num_layers > item["recompute_num_layers"] - and item["max_mem"] == "OOM"): + if ( + use_recompute + and item["use_recompute"] + and recompute_method == "uniform" + and recompute_method == item["recompute_method"] + ): + if ( + recompute_num_layers > item["recompute_num_layers"] + and item["max_mem"] == "OOM" + ): logger.info( f"The strategy {strategy} has been pruned by uniform recompute_num_layers memory." ) @@ -140,11 +149,16 @@ def prune_by_recompute(config, strategy, history=[]): strategy["pruned"] = True return True - if (use_recompute and item["use_recompute"] - and recompute_method == "block" - and recompute_method == item["recompute_method"]): - if (recompute_num_layers < item["recompute_num_layers"] - and item["max_mem"] == "OOM"): + if ( + use_recompute + and item["use_recompute"] + and recompute_method == "block" + and recompute_method == item["recompute_method"] + ): + if ( + recompute_num_layers < item["recompute_num_layers"] + and item["max_mem"] == "OOM" + ): logger.info( f"The strategy {strategy} has been pruned by block recompute_num_layers memory." ) @@ -163,8 +177,11 @@ def prune_by_sequence_parallel(config, strategy, history=[]): if retrieval: for item in retrieval: # performance prune - if item["sequence_parallel"] and item[ - "performance"] and not sequence_parallel: + if ( + item["sequence_parallel"] + and item["performance"] + and not sequence_parallel + ): logger.info( f"The strategy {strategy} has been pruned by sequence_parallel performance." ) @@ -173,8 +190,11 @@ def prune_by_sequence_parallel(config, strategy, history=[]): strategy["pruned"] = True return True # memory prune - if item["sequence_parallel"] and item[ - "max_mem"] == "OOM" and not sequence_parallel: + if ( + item["sequence_parallel"] + and item["max_mem"] == "OOM" + and not sequence_parallel + ): logger.info( f"The strategy {strategy} has been pruned by sequence_parallel memory." ) diff --git a/flagscale/auto_tuner/prune/pruner.py b/flagscale/auto_tuner/prune/pruner.py index 7c332da7f..9f521283c 100644 --- a/flagscale/auto_tuner/prune/pruner.py +++ b/flagscale/auto_tuner/prune/pruner.py @@ -14,6 +14,7 @@ def prune(self, strategy, history=[]): if func(self.config, strategy, history): not_run = True break + history.append(strategy) if not_run: self.pruned_count += 1 diff --git a/flagscale/auto_tuner/record/recorder.py b/flagscale/auto_tuner/record/recorder.py index 3b9f216dd..ee819f137 100644 --- a/flagscale/auto_tuner/record/recorder.py +++ b/flagscale/auto_tuner/record/recorder.py @@ -1,7 +1,8 @@ +import logging import os import re -import logging import subprocess + import pandas as pd @@ -15,16 +16,24 @@ def __init__(self, config): "history.csv", ) # Metric to grep in the last rank of last node log file - if "auto_tuner" in self.config and "performance" in self.config.experiment.auto_tuner: + if ( + "auto_tuner" in self.config + and "performance" in self.config.experiment.auto_tuner + ): self.metric = self.config.experiment.auto_tuner.performance.get( - "name", "elapsed time per iteration \(ms\):") + "name", "elapsed time per iteration \(ms\):" + ) else: self.metric = "elapsed time per iteration \(ms\):" # Sort order of performance, order just in [ascend, and descend], default ascend - if "auto_tuner" in self.config and "performance" in self.config.experiment.auto_tuner: + if ( + "auto_tuner" in self.config + and "performance" in self.config.experiment.auto_tuner + ): self.sorted_order = self.config.experiment.auto_tuner.performance.get( - "order", "ascend") + "order", "ascend" + ) else: self.sorted_order = "ascend" @@ -46,8 +55,7 @@ def record(self, task, strategy): # If task is stopped by autotuner, task may not be failed,just hang or too slow. elif self.cur_strategy.get("stopped_by_tuner", False): - performace = self.grep_performance(peformance_path, - self.metric) + performace = self.grep_performance(peformance_path, self.metric) strategy["performance"] = performace strategy["max_mem"] = self.grep_max_memory(host_path) strategy["error"] = None @@ -66,9 +74,11 @@ def record(self, task, strategy): strategy["error"] = None # Pass back to platform if need - if ("airs_switch" in self.config.experiment.auto_tuner.platform - and self.config.experiment.auto_tuner.platform.airs_switch - and strategy["performance"]): + if ( + "airs_switch" in self.config.experiment.auto_tuner.platform + and self.config.experiment.auto_tuner.platform.airs_switch + and strategy["performance"] + ): self.pass_back_to_platform(strategy) def pass_back_to_platform(self, strategy): @@ -76,8 +86,9 @@ def pass_back_to_platform(self, strategy): seq_len = int(self.config.train.model.seq_length) throughput = gbs * seq_len / (strategy["performance"] / 1000) day = round( - self.config.train.model.train_samples * seq_len / - (throughput * 60 * 60 * 24), + self.config.train.model.train_samples + * seq_len + / (throughput * 60 * 60 * 24), 2, ) command = [ @@ -85,8 +96,11 @@ def pass_back_to_platform(self, strategy): "-D", f"{strategy['data_parallel_size']}", "--distributed_optimizer", - (f"{strategy['use_distributed_optimizer']}" if - strategy["use_distributed_optimizer"] is not None else "False"), + ( + f"{strategy['use_distributed_optimizer']}" + if strategy["use_distributed_optimizer"] is not None + else "False" + ), "-E", f"{strategy['expert_model_parallel_size']}", "-C", @@ -96,22 +110,37 @@ def pass_back_to_platform(self, strategy): "-L", f"{strategy['pipeline_model_parallel_size']}", "-G", - (f"{strategy['recompute_granularity']}" - if strategy["recompute_granularity"] else "None"), + ( + f"{strategy['recompute_granularity']}" + if strategy["recompute_granularity"] + else "None" + ), "-R", - (f"{strategy['recompute_method']}" - if strategy["recompute_granularity"] else "None"), + ( + f"{strategy['recompute_method']}" + if strategy["recompute_granularity"] + else "None" + ), "-N", - (f"{strategy['recompute_num_layers']}" - if strategy["recompute_num_layers"] else "0"), + ( + f"{strategy['recompute_num_layers']}" + if strategy["recompute_num_layers"] + else "0" + ), "-S", - (f"{strategy['sequence_parallel']}" - if strategy["sequence_parallel"] is not None else "False"), + ( + f"{strategy['sequence_parallel']}" + if strategy["sequence_parallel"] is not None + else "False" + ), "-T", f"{strategy['tensor_model_parallel_size']}", "-V", - (f"{strategy['num_layers_per_virtual_pipeline_stage']}" - if strategy["num_layers_per_virtual_pipeline_stage"] else "0"), + ( + f"{strategy['num_layers_per_virtual_pipeline_stage']}" + if strategy["num_layers_per_virtual_pipeline_stage"] + else "0" + ), "--throughput", f"{int(throughput)}", "--day", @@ -156,8 +185,7 @@ def grep_max_memory(self, path, pattern="max reserved"): except: continue assert value is not None, "Can't grep the max memory" - self.logger.info( - f"task_{self.cur_strategy['idx']} max_memory: {max_memory}") + self.logger.info(f"task_{self.cur_strategy['idx']} max_memory: {max_memory}") return max_memory def get_performance_and_host_path(self, task): @@ -183,10 +211,10 @@ def get_performance_and_host_path(self, task): outputs = os.listdir(os.path.join(details, max_host)) assert len(outputs) == 1, f"the sub dir of {outputs} must be just one." new_outputs = os.listdir(os.path.join(details, max_host, outputs[0])) - assert len(new_outputs - ) == 1, f"the sub dir of {new_outputs} must be just one." - last_path = os.path.join(details, max_host, outputs[0], new_outputs[0], - "attempt_0") + assert len(new_outputs) == 1, f"the sub dir of {new_outputs} must be just one." + last_path = os.path.join( + details, max_host, outputs[0], new_outputs[0], "attempt_0" + ) last_dir = None last_dir_rank = 0 for item in os.listdir(last_path): @@ -202,9 +230,7 @@ def get_performance_and_host_path(self, task): raise ValueError("The log file does not exist.") return log_path, logs - def grep_performance(self, - path, - pattern="elapsed time per iteration \(ms\):"): + def grep_performance(self, path, pattern="elapsed time per iteration \(ms\):"): """Read the log file and return the performance.""" metric_pattern = pattern + r":* *(\d+(\.\d*)?)|(\d+(\.\d*)?) *" + pattern if not path or not os.path.exists(path): @@ -228,8 +254,7 @@ def grep_performance(self, continue assert value is not None, "Can't grep the performance" if not performance: - self.logger.info( - f"task_{self.cur_strategy['idx']} performance: {None}") + self.logger.info(f"task_{self.cur_strategy['idx']} performance: {None}") return None if len(performance) == 1: self.logger.info( @@ -238,8 +263,7 @@ def grep_performance(self, return round(performance[0], 3) else: average = sum(performance[1:]) / (len(performance) - 1) - self.logger.info( - f"task_{self.cur_strategy['idx']} performance: {average}") + self.logger.info(f"task_{self.cur_strategy['idx']} performance: {average}") return round(average, 3) def grep_error(self, path, pattern="Error"): @@ -267,8 +291,7 @@ def grep_error(self, path, pattern="Error"): else: errors_info.add(line) - self.logger.info( - f"task_{self.cur_strategy['idx']} error: {errors_info}") + self.logger.info(f"task_{self.cur_strategy['idx']} error: {errors_info}") return errors_info def sort(self, history): @@ -281,21 +304,20 @@ def sort(self, history): if self.sorted_order == "ascend": sorted_history = sorted( no_pruned_history, - key=lambda x: - (x["performance"] - if x["performance"] is not None else float("inf")), + key=lambda x: ( + x["performance"] if x["performance"] is not None else float("inf") + ), ) elif self.sorted_order == "descend": sorted_history = sorted( no_pruned_history, - key=lambda x: - (x["performance"] - if x["performance"] is not None else float("-inf")), + key=lambda x: ( + x["performance"] if x["performance"] is not None else float("-inf") + ), reverse=True, ) else: - raise ValueError( - f"The sorted order {self.sorted_order} is not supported.") + raise ValueError(f"The sorted order {self.sorted_order} is not supported.") assert sorted_history is not None return sorted_history @@ -309,4 +331,5 @@ def save(self, history): df = df.reindex(columns=cols) if "stopped_by_tuner" in df.columns: df = df.drop(columns=["stopped_by_tuner"]) - df.to_csv(self.path, index=False, escapechar='\\') + + df.to_csv(self.path, index=False, escapechar="\\") diff --git a/flagscale/auto_tuner/search/algorithm.py b/flagscale/auto_tuner/search/algorithm.py index b36df980f..a9c03dc3a 100644 --- a/flagscale/auto_tuner/search/algorithm.py +++ b/flagscale/auto_tuner/search/algorithm.py @@ -24,15 +24,19 @@ def __init__(self, strategies, config): def checkout(self, mode): if mode == "memory": from ..utils import sort_by_memory + if self.idx > 0 and self.idx < len(self.strategies): - self.strategies = self.strategies[:self.idx] + sorted( - self.strategies[self.idx:], key=sort_by_memory) + self.strategies = self.strategies[: self.idx] + sorted( + self.strategies[self.idx :], key=sort_by_memory + ) elif mode == "performance": from ..utils import sort_by_performance + if self.idx > 0 and self.idx < len(self.strategies): - self.strategies = self.strategies[:self.idx] + sorted( - self.strategies[self.idx:], key=sort_by_performance) + self.strategies = self.strategies[: self.idx] + sorted( + self.strategies[self.idx :], key=sort_by_performance + ) def search(self): """Return a task iteratively.""" diff --git a/flagscale/auto_tuner/search/searcher.py b/flagscale/auto_tuner/search/searcher.py index 0a4835643..70257344f 100644 --- a/flagscale/auto_tuner/search/searcher.py +++ b/flagscale/auto_tuner/search/searcher.py @@ -1,8 +1,8 @@ -import time import copy import logging -from ..utils import divisible +import time +from ..utils import divisible __BUILT_IN_STRATEGY_DIMS__ = [ "data_parallel_size", @@ -151,8 +151,10 @@ def build_space(self, config): # Set virtual pipeline parallel degree space["num_layers_per_virtual_pipeline_stage"] = ( [i for i in range(1, num_layers + 1)] - if "num_layers_per_virtual_pipeline_stage" not in config.experiment.auto_tuner.space - or config.experiment.auto_tuner.space.num_layers_per_virtual_pipeline_stage == "auto" + if "num_layers_per_virtual_pipeline_stage" + not in config.experiment.auto_tuner.space + or config.experiment.auto_tuner.space.num_layers_per_virtual_pipeline_stage + == "auto" else config.experiment.auto_tuner.space.num_layers_per_virtual_pipeline_stage ) self._sort( diff --git a/flagscale/auto_tuner/tuner.py b/flagscale/auto_tuner/tuner.py index abe20f53e..9c6a73c10 100644 --- a/flagscale/auto_tuner/tuner.py +++ b/flagscale/auto_tuner/tuner.py @@ -1,18 +1,18 @@ -import os import copy -import time import datetime import logging +import os +import time from omegaconf import DictConfig, OmegaConf -from flagscale.launcher.runner import SSHRunner from flagscale.launcher.job_status import JobStatus +from flagscale.launcher.runner import SSHRunner -from .search import Searcher -from .prune import Pruner from .generate import Generator +from .prune import Pruner from .record import Recorder +from .search import Searcher class AutoTuner: @@ -33,7 +33,8 @@ def __init__(self, config: DictConfig): handler = logging.FileHandler(log_path, mode="w") handler.setLevel(logging.INFO) formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s") + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) handler.setFormatter(formatter) logger.addHandler(handler) self.logger = logger @@ -59,8 +60,7 @@ def __init__(self, config: DictConfig): # The interval of task monitoring if "control" not in self.config.experiment.auto_tuner: self.config.experiment.auto_tuner.control = {} - self.interval = self.config.experiment.auto_tuner.control.get( - "interval", 10) + self.interval = self.config.experiment.auto_tuner.control.get("interval", 10) # Set platform envs if "platform" not in self.config.experiment.auto_tuner: @@ -71,38 +71,42 @@ def __init__(self, config: DictConfig): self.config.experiment.auto_tuner.platform.airs_switch = True if os.environ.get("AIRS_SIZE", None): - self.config.experiment.auto_tuner.nnodes = int( - os.environ["AIRS_SIZE"]) + self.config.experiment.auto_tuner.nnodes = int(os.environ["AIRS_SIZE"]) # Set original config - self.orig_config.experiment.runner.nnodes = int( - os.environ["AIRS_SIZE"]) + self.orig_config.experiment.runner.nnodes = int(os.environ["AIRS_SIZE"]) # Set config - self.config.experiment.runner.nnodes = int( - os.environ["AIRS_SIZE"]) + self.config.experiment.runner.nnodes = int(os.environ["AIRS_SIZE"]) if os.environ.get("AIRS_ACCELERATOR_COUNT", None): self.config.experiment.auto_tuner.nproc_per_node = int( - os.environ["AIRS_ACCELERATOR_COUNT"]) + os.environ["AIRS_ACCELERATOR_COUNT"] + ) # Set original config self.orig_config.experiment.runner.nproc_per_node = int( - os.environ["AIRS_ACCELERATOR_COUNT"]) + os.environ["AIRS_ACCELERATOR_COUNT"] + ) # Set config self.config.experiment.runner.nproc_per_node = int( - os.environ["AIRS_ACCELERATOR_COUNT"]) + os.environ["AIRS_ACCELERATOR_COUNT"] + ) if os.environ.get("AIRS_FBMEM", None): - self.config.experiment.auto_tuner.memory = int( - os.environ["AIRS_FBMEM"]) + self.config.experiment.auto_tuner.memory = int(os.environ["AIRS_FBMEM"]) if os.environ.get("AIRS_HOSTFILE_PATH", None): # Set original config self.orig_config.experiment.runner.hostfile = os.environ[ - "AIRS_HOSTFILE_PATH"] + "AIRS_HOSTFILE_PATH" + ] # Set config self.config.experiment.runner.hostfile = os.environ[ - "AIRS_HOSTFILE_PATH"] + "AIRS_HOSTFILE_PATH" + ] - self.config.experiment.auto_tuner.cards = self.config.experiment.auto_tuner.nnodes * self.config.experiment.auto_tuner.nproc_per_node + self.config.experiment.auto_tuner.cards = ( + self.config.experiment.auto_tuner.nnodes + * self.config.experiment.auto_tuner.nproc_per_node + ) # Build core sub modules, such as Searcher, Pruner, Generator and Recorder self.searcher = Searcher(self.config) @@ -116,11 +120,11 @@ def __init__(self, config: DictConfig): # The max time per task, unit: second # NOTE: The task will be stopped if the time is reached or done. self.max_time_per_task = self.config.experiment.auto_tuner.control.get( - "max_time_per_task", 300) + "max_time_per_task", 300 + ) # The max time of auto tuner, if None, no limit. - self.max_time = self.config.experiment.auto_tuner.control.get( - "max_time", None) + self.max_time = self.config.experiment.auto_tuner.control.get("max_time", None) # The start time of each task, used to control each task when stop self.start_task_time = None @@ -158,9 +162,11 @@ def tune(self): self.logger.info(f"Record task_{self.idx}:") self.record() - if (self.cur_strategy["performance"] - and self.config.experiment.auto_tuner.platform.get( - "airs_switch", False) and not self.has_checkout): + if ( + self.cur_strategy["performance"] + and self.config.experiment.auto_tuner.platform.get("airs_switch", False) + and not self.has_checkout + ): self.checkout() # get best strategy @@ -173,7 +179,8 @@ def tune(self): self.logger.info(f"No strategy can run so far.") tuner_end_time = time.time() self.logger.info( - f"AutoTuner Ended in {tuner_end_time - tuner_start_time} seconds.") + f"AutoTuner Ended in {tuner_end_time - tuner_start_time} seconds." + ) # Run the best task if self.config.experiment.auto_tuner.control.get("run_best", True): @@ -182,8 +189,7 @@ def tune(self): self.logger.info(f"Run best Strategy: {best_strategy}") else: raise ValueError(f"No strategy can run.") - best_task = self.generator.gen_best_task(best_strategy, - self.orig_config) + best_task = self.generator.gen_best_task(best_strategy, self.orig_config) best_task.action = "run" runner = SSHRunner(best_task) runner.run(monitor=True, interval=60) @@ -259,7 +265,8 @@ def monitor(self): try: status = self.runner._query_status() self.logger.info( - f"task_{self.cur_strategy['idx']} status: {status.name}") + f"task_{self.cur_strategy['idx']} status: {status.name}" + ) if status == JobStatus.COMPLETED_OR_IDLE: break if status == JobStatus.RUNNING: @@ -276,15 +283,18 @@ def monitor(self): end_time = time.time() # Add elapsed time - self.cur_strategy["elapsed_time"] = round( - end_time - self.task_start_time, 2) + self.cur_strategy["elapsed_time"] = round(end_time - self.task_start_time, 2) # Add start time readable_task_start_time = datetime.datetime.fromtimestamp( - self.task_start_time).strftime("%Y-%m-%d %H:%M:%S") + self.task_start_time + ).strftime("%Y-%m-%d %H:%M:%S") self.cur_strategy["start_time"] = readable_task_start_time - self.logger.info("task_{} monitor time: {:.2f}s".format( - self.cur_strategy["idx"], self.cur_strategy["elapsed_time"])) + self.logger.info( + "task_{} monitor time: {:.2f}s".format( + self.cur_strategy["idx"], self.cur_strategy["elapsed_time"] + ) + ) def record(self): """Record the task result to csv""" @@ -293,7 +303,6 @@ def record(self): def get_best(self): sorted_history = self.recorder.sort(self.history) - if sorted_history and sorted_history[0] and sorted_history[0][ - "performance"]: + if sorted_history and sorted_history[0] and sorted_history[0]["performance"]: return sorted_history[0] return None diff --git a/flagscale/auto_tuner/utils.py b/flagscale/auto_tuner/utils.py index 9614fb341..ea3b55bb4 100644 --- a/flagscale/auto_tuner/utils.py +++ b/flagscale/auto_tuner/utils.py @@ -1,7 +1,8 @@ -import re import os +import re import socket import subprocess + from flagscale.launcher.runner import parse_hostfile diff --git a/flagscale/datasets/sft_dataset.py b/flagscale/datasets/sft_dataset.py index 874d9209b..0aa99a8e1 100644 --- a/flagscale/datasets/sft_dataset.py +++ b/flagscale/datasets/sft_dataset.py @@ -9,12 +9,21 @@ import numpy import torch -from megatron.core.datasets.gpt_dataset import GPTDataset, GPTDatasetConfig, _get_ltor_masks_and_position_ids -from megatron.core.datasets.indexed_dataset import IndexedDataset, get_bin_path, get_idx_path +from megatron.core.datasets.gpt_dataset import ( + GPTDataset, + GPTDatasetConfig, + _get_ltor_masks_and_position_ids, +) +from megatron.core.datasets.indexed_dataset import ( + IndexedDataset, + get_bin_path, + get_idx_path, +) from megatron.core.datasets.utils import Split logger = logging.getLogger(__name__) + @dataclass class SFTDatasetConfig(GPTDatasetConfig): """Configuration object for Megatron Core SFT datasets""" @@ -50,35 +59,44 @@ def __init__( config: SFTDatasetConfig, ) -> None: self.config = config - self.apply_sft_dataset_separated_loss_mask_if_existed = config.apply_sft_dataset_separated_loss_mask_if_existed + self.apply_sft_dataset_separated_loss_mask_if_existed = ( + config.apply_sft_dataset_separated_loss_mask_if_existed + ) self.loss_mask_dataset = None super().__init__( - indexed_dataset, dataset_path, indexed_indices, num_samples, index_split, config + indexed_dataset, + dataset_path, + indexed_indices, + num_samples, + index_split, + config, ) self._build_loss_mask_dataset() def _build_loss_mask_dataset(self) -> None: """ - Load Loss Mask IndexedDataset + Load Loss Mask IndexedDataset """ path_prefix = None - base_prefix = '_text_document' - loss_mask_prefix = '_loss_mask_document' + base_prefix = "_text_document" + loss_mask_prefix = "_loss_mask_document" if self.dataset_path.endswith(base_prefix): - path_prefix = self.dataset_path[:-len(base_prefix)] + loss_mask_prefix + path_prefix = self.dataset_path[: -len(base_prefix)] + loss_mask_prefix if self.apply_sft_dataset_separated_loss_mask_if_existed and path_prefix: idx_path = get_idx_path(path_prefix) bin_path = get_bin_path(path_prefix) if os.path.exists(idx_path) and os.path.exists(bin_path): self.loss_mask_dataset = IndexedDataset( - path_prefix, multimodal=False, mmap=self.config.mmap_bin_files) + path_prefix, multimodal=False, mmap=self.config.mmap_bin_files + ) - print(f'> Used Dataset: aux_loss_mask ...') + print(f"> Used Dataset: aux_loss_mask ...") if self.loss_mask_dataset is not None: - assert len(self.dataset) == len(self.loss_mask_dataset), \ - f"Samples are not equal, ({len(self.dataset)} != {len(self.loss_mask_dataset)})" + assert len(self.dataset) == len( + self.loss_mask_dataset + ), f"Samples are not equal, ({len(self.dataset)} != {len(self.loss_mask_dataset)})" def __getitem__(self, idx: Optional[int]) -> Dict[str, torch.Tensor]: """Abstract method implementation @@ -139,11 +157,12 @@ def __getitem__(self, idx: Optional[int]) -> Dict[str, torch.Tensor]: # aux dataset aux_loss_mask, _ = self._query_document_sample_shuffle_indices_aux_dataset( - self.loss_mask_dataset, idx) + self.loss_mask_dataset, idx + ) if aux_loss_mask is not None: - if idx % 100 == 0: - print(f'> Used aux_loss_mask at current sample={idx} ...') - loss_mask = torch.from_numpy(aux_loss_mask).float()[1:].contiguous() + if idx % 100 == 0: + print(f"> Used aux_loss_mask at current sample={idx} ...") + loss_mask = torch.from_numpy(aux_loss_mask).float()[1:].contiguous() if self.config.create_attention_mask: return { @@ -210,11 +229,12 @@ def _query_document_sample_shuffle_indices_aux_dataset( offset = 0 if i > doc_index_beg else doc_index_beg_offset length = None if i < doc_index_end else doc_index_end_offset + 1 sample_parts.append( - aux_dataset.get(self.document_index[i], offset=offset, length=length) + aux_dataset.get( + self.document_index[i], offset=offset, length=length + ) ) return ( numpy.array(numpy.concatenate(sample_parts), dtype=numpy.int64), numpy.array(document_ids, dtype=numpy.int64), ) - diff --git a/flagscale/launcher/runner.py b/flagscale/launcher/runner.py index 2807efbe5..492dffefa 100644 --- a/flagscale/launcher/runner.py +++ b/flagscale/launcher/runner.py @@ -1,17 +1,19 @@ -import os -import re -import sys import collections import copy -import socket +import json +import os +import re import shlex +import socket import subprocess -import json -import uuid +import sys import time -from datetime import datetime +import uuid from abc import ABC, abstractmethod +from datetime import datetime + from omegaconf import DictConfig, OmegaConf + from ..logger import logger from .job_status import JobStatus diff --git a/flagscale/logger.py b/flagscale/logger.py index d25781f1a..136879873 100644 --- a/flagscale/logger.py +++ b/flagscale/logger.py @@ -1,5 +1,5 @@ -import sys import logging +import sys class Logger: @@ -12,7 +12,9 @@ def __init__(self, name, level=logging.INFO): for handler in self.logger.handlers[:]: self.logger.removeHandler(handler) - formatter = logging.Formatter('[%(asctime)s %(name)s %(filename)s %(levelname)s] %(message)s') + formatter = logging.Formatter( + "[%(asctime)s %(name)s %(filename)s %(levelname)s] %(message)s" + ) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) @@ -34,12 +36,15 @@ def critical(self, message): def debug(self, message): self.logger.debug(message) + GLOBAL_LOGGER = None + def get_logger(): global GLOBAL_LOGGER if GLOBAL_LOGGER is None: - GLOBAL_LOGGER = Logger('FlagScale') + GLOBAL_LOGGER = Logger("FlagScale") return GLOBAL_LOGGER -logger = get_logger() \ No newline at end of file + +logger = get_logger() diff --git a/flagscale/patches_utils.py b/flagscale/patches_utils.py index 8e86ce1fd..68d619dd5 100644 --- a/flagscale/patches_utils.py +++ b/flagscale/patches_utils.py @@ -1,16 +1,19 @@ -# this file is used for adding tools func to processing patches +# this file is used for adding tools func to processing patches + def add_patches_module(path: str, module_dict: dict): if len(module_dict) == 0: raise Exception(f"module dict is None") import sys + print(f"{path} is being instead, using module {module_dict}") for k in sys.modules: if k.startswith(path): for module_name, module_ in module_dict.items(): import re + class_pattern = re.compile("\w*\.w*") - if not re.match(class_pattern, module_name): + if not re.match(class_pattern, module_name): try: if getattr(sys.modules[k], module_name, None): setattr(sys.modules[k], module_name, module_) @@ -19,5 +22,5 @@ def add_patches_module(path: str, module_dict: dict): else: class_name, fuc_name = module_name.split(".") class_obj = getattr(sys.modules[k], class_name, None) - if class_obj and getattr(class_obj, fuc_name , None): + if class_obj and getattr(class_obj, fuc_name, None): setattr(class_obj, fuc_name, module_) diff --git a/megatron/tests/unit_tests/__init__.py b/megatron/tests/unit_tests/__init__.py index 1d3c586a5..88828cda5 100644 --- a/megatron/tests/unit_tests/__init__.py +++ b/megatron/tests/unit_tests/__init__.py @@ -1,2 +1,3 @@ import torch._dynamo -torch._dynamo.config.suppress_errors = True \ No newline at end of file +torch._dynamo.config.suppress_errors = True + From e5106f620ae3b6b68acdd4f2d9cac728e260eafc Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Wed, 12 Jun 2024 09:55:11 +0800 Subject: [PATCH 02/10] update workflows --- .github/workflows/format.yml | 2 ++ .github/workflows/test.yml | 25 +++++----------------- tests/scripts/functional_test_flagscale.sh | 15 ++----------- tests/scripts/unit_test_megatron.sh | 18 +++++++--------- 4 files changed, 17 insertions(+), 43 deletions(-) diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index 510db4a39..4eb8f490f 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -22,10 +22,12 @@ jobs: steps: - uses: actions/checkout@v2 + - name: Set up Python 3.10 uses: actions/setup-python@v2 with: python-version: "3.10" + - name: Install dependencies run: | pip install black isort diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e5a4909b0..6a1931d47 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,31 +24,16 @@ jobs: uses: actions/checkout@v2 - name: Megatron Unit Test - run: | - cd megatron - export PYTHONPATH=..:$PYTHONPATH - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/data - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/dist_checkpointing - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/fusions - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/models - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/pipeline_parallel - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/tensor_parallel - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/transformer - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/*.py + run: tests/scripts/unit_test_megatron.sh - name: Megatron Unit Test Coverage Online Report - run: | - echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-megatron/index.html" + run: echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-megatron/index.html" - name: Flagscale Unit Test - run: | - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-flagscale --cov=flagscale -q -x tests/unit_tests/launcher + run: tests/scripts/unit_test_flagscale.sh - name: Flagscale Unit Test Coverage Online Report - run: | - echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-flagscale/index.html" + run: echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-flagscale/index.html" - name: Flagscale Functional Test - run: | - python run.py --config-path tests/functional_tests/aquila/conf --config-name config action=test - pytest -s tests/functional_tests/test_result.py --test_reaults_path=./tests/functional_tests/aquila/test_result \ No newline at end of file + run: tests/scripts/functional_test_flagscale.sh diff --git a/tests/scripts/functional_test_flagscale.sh b/tests/scripts/functional_test_flagscale.sh index 2edf8f913..9df71e8b4 100755 --- a/tests/scripts/functional_test_flagscale.sh +++ b/tests/scripts/functional_test_flagscale.sh @@ -1,13 +1,2 @@ -# if [ "$2" = "stop" ]; then -# python run.py --config-path tests/functional_tests/$1/conf --config-name config action=stop -# exit 0 -# fi - -OUT_DIR=./tests/functional_tests/$1/test_result -if [ -d "$OUT_DIR" ]; then - echo "$OUT_DIR exist." - rm -r $OUT_DIR - sleep 3s -fi - -python run.py --config-path tests/functional_tests/$1/conf --config-name config action=test +python run.py --config-path tests/functional_tests/aquila/conf --config-name config action=test +pytest -s tests/functional_tests/test_result.py --test_reaults_path=./tests/functional_tests/aquila/test_result \ No newline at end of file diff --git a/tests/scripts/unit_test_megatron.sh b/tests/scripts/unit_test_megatron.sh index eef8e585a..0a68b63ae 100755 --- a/tests/scripts/unit_test_megatron.sh +++ b/tests/scripts/unit_test_megatron.sh @@ -1,12 +1,10 @@ cd megatron - export PYTHONPATH=..:$PYTHONPATH - -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/data -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/dist_checkpointing -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/fusions -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/models -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/pipeline_parallel -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/tensor_parallel -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/transformer -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/*.py \ No newline at end of file +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/data +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/dist_checkpointing +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/fusions +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/models +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/pipeline_parallel +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/tensor_parallel +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/transformer +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/*.py \ No newline at end of file From 16c5b30c31d1720f5b3b0fdc9807d3ca4b887dbd Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Wed, 12 Jun 2024 11:09:08 +0800 Subject: [PATCH 03/10] update workflows --- .github/workflows/test.yml | 46 ++++++++++++++-------------- tests/scripts/unit_test_flagscale.sh | 10 +++++- tests/scripts/unit_test_megatron.sh | 25 ++++++++++----- 3 files changed, 49 insertions(+), 32 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6a1931d47..0f6df4512 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,29 +11,29 @@ on: jobs: test: - runs-on: self-hosted - container: - image: localhost:5000/flagscale_cicd:v1.3 - ports: - - 80 - volumes: - - /home/flagscale_cicd/flask/static:/workspace/report - options: --gpus all --hostname flagscale_cicd - steps: - - name: Checkout Code - uses: actions/checkout@v2 + runs-on: self-hosted + container: + image: localhost:5000/flagscale_cicd:v1.3 + ports: + - 80 + volumes: + - /home/flagscale_cicd/flask/static:/workspace/report + options: --gpus all --hostname flagscale_cicd + steps: + - name: Checkout Code + uses: actions/checkout@v2 - - name: Megatron Unit Test - run: tests/scripts/unit_test_megatron.sh - - - name: Megatron Unit Test Coverage Online Report - run: echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-megatron/index.html" + - name: Megatron Unit Test + run: tests/scripts/unit_test_megatron.sh ${{github.sha}} + + - name: Megatron Unit Test Coverage Online Report + run: echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-megatron/index.html" - - name: Flagscale Unit Test - run: tests/scripts/unit_test_flagscale.sh - - - name: Flagscale Unit Test Coverage Online Report - run: echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-flagscale/index.html" + - name: Flagscale Unit Test + run: tests/scripts/unit_test_flagscale.sh ${{github.sha}} + + - name: Flagscale Unit Test Coverage Online Report + run: echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-flagscale/index.html" - - name: Flagscale Functional Test - run: tests/scripts/functional_test_flagscale.sh + - name: Flagscale Functional Test + run: tests/scripts/functional_test_flagscale.sh diff --git a/tests/scripts/unit_test_flagscale.sh b/tests/scripts/unit_test_flagscale.sh index 46861add6..19179e62b 100755 --- a/tests/scripts/unit_test_flagscale.sh +++ b/tests/scripts/unit_test_flagscale.sh @@ -1 +1,9 @@ -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:./cov-report-flagscale --cov=flagscale -q -x tests/unit_tests/launcher \ No newline at end of file +if [ -z "$1" ] +then + code_id=0 +else + code_id=$1 +fi + + +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-flagscale --cov=flagscale -q -x tests/unit_tests/launcher \ No newline at end of file diff --git a/tests/scripts/unit_test_megatron.sh b/tests/scripts/unit_test_megatron.sh index 0a68b63ae..4967e7cc1 100755 --- a/tests/scripts/unit_test_megatron.sh +++ b/tests/scripts/unit_test_megatron.sh @@ -1,10 +1,19 @@ +if [ -z "$1" ] +then + code_id=0 +else + code_id=$1 +fi + cd megatron + export PYTHONPATH=..:$PYTHONPATH -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/data -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/dist_checkpointing -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/fusions -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/models -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/pipeline_parallel -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/tensor_parallel -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/transformer -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/*.py \ No newline at end of file + +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/data +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/dist_checkpointing +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/fusions +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/models +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/pipeline_parallel +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/tensor_parallel +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/transformer +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/*.py \ No newline at end of file From 630d48b8e0f0515f7c5b1f404b40619370b14413 Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Wed, 12 Jun 2024 11:20:43 +0800 Subject: [PATCH 04/10] update format --- flagscale/launcher/runner.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/flagscale/launcher/runner.py b/flagscale/launcher/runner.py index 7943ef58b..a2a50996a 100644 --- a/flagscale/launcher/runner.py +++ b/flagscale/launcher/runner.py @@ -471,13 +471,13 @@ def _generate_run_script_train( return host_run_script_file -def _generate_run_script_inference(config, host, node_rank, cmd, background=True, with_test=False): +def _generate_run_script_inference( + config, host, node_rank, cmd, background=True, with_test=False +): logging_config = config.inference.system.logging no_shared_fs = config.experiment.runner.get("no_shared_fs", False) if no_shared_fs: - host_output_file = os.path.join( - logging_config.log_dir, f"host.output" - ) + host_output_file = os.path.join(logging_config.log_dir, f"host.output") else: host_output_file = os.path.join( logging_config.log_dir, f"host_{node_rank}_{host}.output" @@ -594,9 +594,7 @@ def _prepare(self): _update_config_inference(self.config) self.user_args = get_args_vllm(self.config) else: - raise ValueError( - f"Unsupported task type in SSHRunner: {self.mode}" - ) + raise ValueError(f"Unsupported task type in SSHRunner: {self.mode}") self.rdzv_id = datetime.now().strftime("%Y%m%d_%H%M%S.%f") self.user_envs = self.config.experiment.get("envs", {}) @@ -904,9 +902,7 @@ def _prepare(self): _update_config_train(self.config) self.user_args = get_args_megatron(self.config) else: - raise ValueError( - f"Unsupported task type in CloudRunner: {self.mode}" - ) + raise ValueError(f"Unsupported task type in CloudRunner: {self.mode}") logger.info("\n************** configuration ***********") logger.info(f"\n{OmegaConf.to_yaml(self.config)}") From 1aebaf9caf8a63b6ace45634568ba9f894a4d0ca Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Wed, 12 Jun 2024 11:26:35 +0800 Subject: [PATCH 05/10] update workflow --- .github/workflows/test.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0f6df4512..7f4a9813c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,6 +9,9 @@ on: pull_request: branches: [ "main" ] +env: + REPORT_ADDR: http://120.92.110.224:8081 + jobs: test: runs-on: self-hosted @@ -27,13 +30,13 @@ jobs: run: tests/scripts/unit_test_megatron.sh ${{github.sha}} - name: Megatron Unit Test Coverage Online Report - run: echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-megatron/index.html" + run: echo "You can access the test coverage report at the $REPORT_ADDR/${{github.sha}}/cov-report-megatron/index.html" - name: Flagscale Unit Test run: tests/scripts/unit_test_flagscale.sh ${{github.sha}} - name: Flagscale Unit Test Coverage Online Report - run: echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-flagscale/index.html" + run: echo "You can access the test coverage report at the $REPORT_ADDR/${{github.sha}}/cov-report-flagscale/index.html" - name: Flagscale Functional Test run: tests/scripts/functional_test_flagscale.sh From dc7bffe1c153964a0e7f983853972c4e59f06a92 Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Wed, 12 Jun 2024 12:33:52 +0800 Subject: [PATCH 06/10] update workflow --- .github/workflows/test.yml | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7f4a9813c..7d35fd7fb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,7 +13,7 @@ env: REPORT_ADDR: http://120.92.110.224:8081 jobs: - test: + megatron-unit-test: runs-on: self-hosted container: image: localhost:5000/flagscale_cicd:v1.3 @@ -32,11 +32,39 @@ jobs: - name: Megatron Unit Test Coverage Online Report run: echo "You can access the test coverage report at the $REPORT_ADDR/${{github.sha}}/cov-report-megatron/index.html" + + flagscale-unit-test: + runs-on: self-hosted + container: + image: localhost:5000/flagscale_cicd:v1.3 + ports: + - 80 + volumes: + - /home/flagscale_cicd/flask/static:/workspace/report + options: --gpus all --hostname flagscale_cicd + steps: + - name: Checkout Code + uses: actions/checkout@v2 + - name: Flagscale Unit Test run: tests/scripts/unit_test_flagscale.sh ${{github.sha}} - name: Flagscale Unit Test Coverage Online Report run: echo "You can access the test coverage report at the $REPORT_ADDR/${{github.sha}}/cov-report-flagscale/index.html" + + flagscale-functional-test: + runs-on: self-hosted + container: + image: localhost:5000/flagscale_cicd:v1.3 + ports: + - 80 + volumes: + - /home/flagscale_cicd/flask/static:/workspace/report + options: --gpus all --hostname flagscale_cicd + steps: + - name: Checkout Code + uses: actions/checkout@v2 + - name: Flagscale Functional Test - run: tests/scripts/functional_test_flagscale.sh + run: tests/scripts/functional_test_flagscale.sh \ No newline at end of file From db1422f79aadf6374e2a63ce401e0824991ca803 Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Wed, 12 Jun 2024 12:48:17 +0800 Subject: [PATCH 07/10] update workflow --- .github/workflows/test.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7d35fd7fb..b2a4bbb07 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,6 +15,7 @@ env: jobs: megatron-unit-test: runs-on: self-hosted + concurrency: job-group container: image: localhost:5000/flagscale_cicd:v1.3 ports: @@ -35,6 +36,7 @@ jobs: flagscale-unit-test: runs-on: self-hosted + concurrency: job-group container: image: localhost:5000/flagscale_cicd:v1.3 ports: @@ -55,6 +57,7 @@ jobs: flagscale-functional-test: runs-on: self-hosted + concurrency: job-group container: image: localhost:5000/flagscale_cicd:v1.3 ports: From 0538459ff9feb44f8fed5068d353929531aeae2d Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Wed, 12 Jun 2024 12:55:04 +0800 Subject: [PATCH 08/10] update workflow --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b2a4bbb07..530a48e60 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ env: jobs: megatron-unit-test: runs-on: self-hosted - concurrency: job-group + concurrency: test-group container: image: localhost:5000/flagscale_cicd:v1.3 ports: @@ -36,7 +36,7 @@ jobs: flagscale-unit-test: runs-on: self-hosted - concurrency: job-group + concurrency: test-group container: image: localhost:5000/flagscale_cicd:v1.3 ports: @@ -57,7 +57,7 @@ jobs: flagscale-functional-test: runs-on: self-hosted - concurrency: job-group + concurrency: test-group container: image: localhost:5000/flagscale_cicd:v1.3 ports: From c9b0b32c2d5d47d4afb76a7d05ef32bbee559ec7 Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Wed, 12 Jun 2024 13:15:18 +0800 Subject: [PATCH 09/10] update workflow --- .github/workflows/test.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 530a48e60..c418356a1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,6 @@ env: jobs: megatron-unit-test: runs-on: self-hosted - concurrency: test-group container: image: localhost:5000/flagscale_cicd:v1.3 ports: @@ -36,7 +35,7 @@ jobs: flagscale-unit-test: runs-on: self-hosted - concurrency: test-group + needs: megatron-unit-test container: image: localhost:5000/flagscale_cicd:v1.3 ports: @@ -57,7 +56,7 @@ jobs: flagscale-functional-test: runs-on: self-hosted - concurrency: test-group + needs: flagscale-unit-test container: image: localhost:5000/flagscale_cicd:v1.3 ports: From a94f067806e1669155f022fe513abfe08919919f Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Wed, 12 Jun 2024 14:55:22 +0800 Subject: [PATCH 10/10] update format for merge --- .github/workflows/format.yml | 3 ++- flagscale/auto_tuner/prune/history.py | 3 +-- flagscale/auto_tuner/tuner.py | 2 +- flagscale/launcher/runner.py | 8 ++------ 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index 4eb8f490f..b131c6ae4 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -21,7 +21,8 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - name: Checkout Code + uses: actions/checkout@v2 - name: Set up Python 3.10 uses: actions/setup-python@v2 diff --git a/flagscale/auto_tuner/prune/history.py b/flagscale/auto_tuner/prune/history.py index 438c36bd7..c40793d90 100644 --- a/flagscale/auto_tuner/prune/history.py +++ b/flagscale/auto_tuner/prune/history.py @@ -1,7 +1,6 @@ import logging -from ..utils import beside -from ..utils import compare_by_recompute +from ..utils import beside, compare_by_recompute _HISTORY_BASED_PRUNE_FUNC = [] logger = logging.getLogger("FlagScale-AutoTuner") diff --git a/flagscale/auto_tuner/tuner.py b/flagscale/auto_tuner/tuner.py index 7e87e0bb4..7f4cbaef3 100644 --- a/flagscale/auto_tuner/tuner.py +++ b/flagscale/auto_tuner/tuner.py @@ -276,7 +276,7 @@ def monitor(self): if running: self.runner.stop() break - + # Add sub process monitor sub_process = self.runner._query_sub_process_status() if sub_process: diff --git a/flagscale/launcher/runner.py b/flagscale/launcher/runner.py index 6c3c163bb..752e234f7 100644 --- a/flagscale/launcher/runner.py +++ b/flagscale/launcher/runner.py @@ -822,17 +822,13 @@ def _generate_query_sub_process_script(self, host, node_rank): f.write("#!/bin/bash\n\n") f.write("if [ -f " + host_pid_file + " ]; then\n") f.write(" pid=$(cat " + host_pid_file + ")\n") - f.write( - " ps -eo pid,ppid | awk -v ppid=$pid '$2 == ppid {print $1}'\n" - ) + f.write(" ps -eo pid,ppid | awk -v ppid=$pid '$2 == ppid {print $1}'\n") f.write("else\n") # TODO: This is a temporary fix. We need to find a better way to query the job. f.write( " pid=$(ps aux | grep 'torchrun' | grep -v grep | head -n 1 | awk '{print $2}')\n" ) - f.write( - " ps -eo pid,ppid | awk -v ppid=$pid '$2 == ppid {print $1}'\n" - ) + f.write(" ps -eo pid,ppid | awk -v ppid=$pid '$2 == ppid {print $1}'\n") f.write("fi\n") f.flush() os.fsync(f.fileno())