diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml new file mode 100644 index 000000000..b131c6ae4 --- /dev/null +++ b/.github/workflows/format.yml @@ -0,0 +1,44 @@ +name: format + +on: + pull_request: + branches: [ "main" ] + types: [opened, synchronize, reopened] + +env: + CHECK_FILES: >- + flagscale/auto_tuner/*.py + flagscale/auto_tuner/prune/*.py + flagscale/auto_tuner/record/*.py + flagscale/auto_tuner/search/*.py + flagscale/launcher/*.py + flagscale/logger.py + flagscale/patches_utils.py + flagscale/datasets/sft_dataset.py + +jobs: + format: + runs-on: ubuntu-latest + + steps: + - name: Checkout Code + uses: actions/checkout@v2 + + - name: Set up Python 3.10 + uses: actions/setup-python@v2 + with: + python-version: "3.10" + + - name: Install dependencies + run: | + pip install black isort + + - name: Run Black + run: >- + black --check --diff --include $CHECK_FILES ./ + + - name: Run Isort + run: | + isort --profile black --check --diff $CHECK_FILES + + diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e5a4909b0..c418356a1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,46 +9,64 @@ on: pull_request: branches: [ "main" ] +env: + REPORT_ADDR: http://120.92.110.224:8081 + jobs: - test: - runs-on: self-hosted - container: - image: localhost:5000/flagscale_cicd:v1.3 - ports: - - 80 - volumes: - - /home/flagscale_cicd/flask/static:/workspace/report - options: --gpus all --hostname flagscale_cicd - steps: - - name: Checkout Code - uses: actions/checkout@v2 - - - name: Megatron Unit Test - run: | - cd megatron - export PYTHONPATH=..:$PYTHONPATH - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/data - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/dist_checkpointing - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/fusions - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/models - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/pipeline_parallel - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/tensor_parallel - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/transformer - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/*.py - - - name: Megatron Unit Test Coverage Online Report - run: | - echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-megatron/index.html" - - - name: Flagscale Unit Test - run: | - torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/${{github.sha}}/cov-report-flagscale --cov=flagscale -q -x tests/unit_tests/launcher - - - name: Flagscale Unit Test Coverage Online Report - run: | - echo "You can access the test coverage report at the http://120.92.110.224:8081/${{github.sha}}/cov-report-flagscale/index.html" - - - name: Flagscale Functional Test - run: | - python run.py --config-path tests/functional_tests/aquila/conf --config-name config action=test - pytest -s tests/functional_tests/test_result.py --test_reaults_path=./tests/functional_tests/aquila/test_result \ No newline at end of file + megatron-unit-test: + runs-on: self-hosted + container: + image: localhost:5000/flagscale_cicd:v1.3 + ports: + - 80 + volumes: + - /home/flagscale_cicd/flask/static:/workspace/report + options: --gpus all --hostname flagscale_cicd + steps: + - name: Checkout Code + uses: actions/checkout@v2 + + - name: Megatron Unit Test + run: tests/scripts/unit_test_megatron.sh ${{github.sha}} + + - name: Megatron Unit Test Coverage Online Report + run: echo "You can access the test coverage report at the $REPORT_ADDR/${{github.sha}}/cov-report-megatron/index.html" + + + flagscale-unit-test: + runs-on: self-hosted + needs: megatron-unit-test + container: + image: localhost:5000/flagscale_cicd:v1.3 + ports: + - 80 + volumes: + - /home/flagscale_cicd/flask/static:/workspace/report + options: --gpus all --hostname flagscale_cicd + steps: + - name: Checkout Code + uses: actions/checkout@v2 + + - name: Flagscale Unit Test + run: tests/scripts/unit_test_flagscale.sh ${{github.sha}} + + - name: Flagscale Unit Test Coverage Online Report + run: echo "You can access the test coverage report at the $REPORT_ADDR/${{github.sha}}/cov-report-flagscale/index.html" + + + flagscale-functional-test: + runs-on: self-hosted + needs: flagscale-unit-test + container: + image: localhost:5000/flagscale_cicd:v1.3 + ports: + - 80 + volumes: + - /home/flagscale_cicd/flask/static:/workspace/report + options: --gpus all --hostname flagscale_cicd + steps: + - name: Checkout Code + uses: actions/checkout@v2 + + - name: Flagscale Functional Test + run: tests/scripts/functional_test_flagscale.sh \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 000000000..d63dccf7f --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,28 @@ +check_files: &check_files | + (?x)^( + flagscale/auto_tuner/.*\.py| + flagscale/auto_tuner/prune/\..*\.py| + flagscale/auto_tuner/record/\..*\.py| + flagscale/auto_tuner/search/\..*\.py| + flagscale/launcher/\..*\.py| + flagscale/logger\.py| + flagscale/patches_utils\.py| + flagscale/datasets/sft_dataset\.py + )$ + +repos: + - repo: local + hooks: + - id: black + name: black + entry: black + language: system + types: [python] + files: *check_files + - id: isort + name: isort + entry: isort + language: system + types: [python] + files: *check_files + args: ["--profile", "black"] diff --git a/flagscale/auto_tuner/generate.py b/flagscale/auto_tuner/generate.py index 18c0cd6b2..f5e4493dd 100644 --- a/flagscale/auto_tuner/generate.py +++ b/flagscale/auto_tuner/generate.py @@ -1,5 +1,5 @@ -import os import copy +import os class Generator: @@ -16,8 +16,7 @@ def __init__(self, config): "tensor_model_parallel_size": "tensor_model_parallel_size", "sequence_parallel": "sequence_parallel", "pipeline_model_parallel_size": "pipeline_model_parallel_size", - "num_layers_per_virtual_pipeline_stage": - "num_layers_per_virtual_pipeline_stage", + "num_layers_per_virtual_pipeline_stage": "num_layers_per_virtual_pipeline_stage", "recompute_method": "recompute_method", "recompute_granularity": "recompute_granularity", "recompute_num_layers": "recompute_num_layers", @@ -81,14 +80,15 @@ def gen(self, strategy): # Set train_iters of each task if "control" in config.experiment.auto_tuner: config.train.model.train_iters = config.experiment.auto_tuner.control.get( - "train_iters", 5) + "train_iters", 5 + ) else: config.train.model.train_iters = 5 # log dir - config.experiment.exp_dir = os.path.join(config.experiment.exp_dir, - "auto_tuner", - f"task_{strategy['idx']}") + config.experiment.exp_dir = os.path.join( + config.experiment.exp_dir, "auto_tuner", f"task_{strategy['idx']}" + ) return config diff --git a/flagscale/auto_tuner/prune/history.py b/flagscale/auto_tuner/prune/history.py index f3597be26..c40793d90 100644 --- a/flagscale/auto_tuner/prune/history.py +++ b/flagscale/auto_tuner/prune/history.py @@ -1,6 +1,6 @@ import logging -from ..utils import beside -from ..utils import compare_by_recompute + +from ..utils import beside, compare_by_recompute _HISTORY_BASED_PRUNE_FUNC = [] logger = logging.getLogger("FlagScale-AutoTuner") diff --git a/flagscale/auto_tuner/prune/pruner.py b/flagscale/auto_tuner/prune/pruner.py index 7c332da7f..9f521283c 100644 --- a/flagscale/auto_tuner/prune/pruner.py +++ b/flagscale/auto_tuner/prune/pruner.py @@ -14,6 +14,7 @@ def prune(self, strategy, history=[]): if func(self.config, strategy, history): not_run = True break + history.append(strategy) if not_run: self.pruned_count += 1 diff --git a/flagscale/auto_tuner/record/recorder.py b/flagscale/auto_tuner/record/recorder.py index 61b9e4868..38c755953 100644 --- a/flagscale/auto_tuner/record/recorder.py +++ b/flagscale/auto_tuner/record/recorder.py @@ -1,7 +1,8 @@ +import logging import os import re -import logging import subprocess + import pandas as pd diff --git a/flagscale/auto_tuner/search/algorithm.py b/flagscale/auto_tuner/search/algorithm.py index b36df980f..a9c03dc3a 100644 --- a/flagscale/auto_tuner/search/algorithm.py +++ b/flagscale/auto_tuner/search/algorithm.py @@ -24,15 +24,19 @@ def __init__(self, strategies, config): def checkout(self, mode): if mode == "memory": from ..utils import sort_by_memory + if self.idx > 0 and self.idx < len(self.strategies): - self.strategies = self.strategies[:self.idx] + sorted( - self.strategies[self.idx:], key=sort_by_memory) + self.strategies = self.strategies[: self.idx] + sorted( + self.strategies[self.idx :], key=sort_by_memory + ) elif mode == "performance": from ..utils import sort_by_performance + if self.idx > 0 and self.idx < len(self.strategies): - self.strategies = self.strategies[:self.idx] + sorted( - self.strategies[self.idx:], key=sort_by_performance) + self.strategies = self.strategies[: self.idx] + sorted( + self.strategies[self.idx :], key=sort_by_performance + ) def search(self): """Return a task iteratively.""" diff --git a/flagscale/auto_tuner/search/searcher.py b/flagscale/auto_tuner/search/searcher.py index 46970e860..075031c48 100644 --- a/flagscale/auto_tuner/search/searcher.py +++ b/flagscale/auto_tuner/search/searcher.py @@ -1,8 +1,8 @@ -import time import copy import logging -from ..utils import divisible +import time +from ..utils import divisible __BUILT_IN_STRATEGY_DIMS__ = [ "data_parallel_size", diff --git a/flagscale/auto_tuner/tuner.py b/flagscale/auto_tuner/tuner.py index c0967b172..7f4cbaef3 100644 --- a/flagscale/auto_tuner/tuner.py +++ b/flagscale/auto_tuner/tuner.py @@ -1,18 +1,18 @@ -import os import copy -import time import datetime import logging +import os +import time from omegaconf import DictConfig, OmegaConf -from flagscale.launcher.runner import SSHRunner from flagscale.launcher.job_status import JobStatus +from flagscale.launcher.runner import SSHRunner -from .search import Searcher -from .prune import Pruner from .generate import Generator +from .prune import Pruner from .record import Recorder +from .search import Searcher class AutoTuner: @@ -33,7 +33,8 @@ def __init__(self, config: DictConfig): handler = logging.FileHandler(log_path, mode="w") handler.setLevel(logging.INFO) formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s") + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) handler.setFormatter(formatter) logger.addHandler(handler) self.logger = logger @@ -59,8 +60,7 @@ def __init__(self, config: DictConfig): # The interval of task monitoring if "control" not in self.config.experiment.auto_tuner: self.config.experiment.auto_tuner.control = {} - self.interval = self.config.experiment.auto_tuner.control.get( - "interval", 10) + self.interval = self.config.experiment.auto_tuner.control.get("interval", 10) # Set platform envs if "platform" not in self.config.experiment.auto_tuner: @@ -71,38 +71,42 @@ def __init__(self, config: DictConfig): self.config.experiment.auto_tuner.platform.airs_switch = True if os.environ.get("AIRS_SIZE", None): - self.config.experiment.auto_tuner.nnodes = int( - os.environ["AIRS_SIZE"]) + self.config.experiment.auto_tuner.nnodes = int(os.environ["AIRS_SIZE"]) # Set original config - self.orig_config.experiment.runner.nnodes = int( - os.environ["AIRS_SIZE"]) + self.orig_config.experiment.runner.nnodes = int(os.environ["AIRS_SIZE"]) # Set config - self.config.experiment.runner.nnodes = int( - os.environ["AIRS_SIZE"]) + self.config.experiment.runner.nnodes = int(os.environ["AIRS_SIZE"]) if os.environ.get("AIRS_ACCELERATOR_COUNT", None): self.config.experiment.auto_tuner.nproc_per_node = int( - os.environ["AIRS_ACCELERATOR_COUNT"]) + os.environ["AIRS_ACCELERATOR_COUNT"] + ) # Set original config self.orig_config.experiment.runner.nproc_per_node = int( - os.environ["AIRS_ACCELERATOR_COUNT"]) + os.environ["AIRS_ACCELERATOR_COUNT"] + ) # Set config self.config.experiment.runner.nproc_per_node = int( - os.environ["AIRS_ACCELERATOR_COUNT"]) + os.environ["AIRS_ACCELERATOR_COUNT"] + ) if os.environ.get("AIRS_FBMEM", None): - self.config.experiment.auto_tuner.memory = int( - os.environ["AIRS_FBMEM"]) + self.config.experiment.auto_tuner.memory = int(os.environ["AIRS_FBMEM"]) if os.environ.get("AIRS_HOSTFILE_PATH", None): # Set original config self.orig_config.experiment.runner.hostfile = os.environ[ - "AIRS_HOSTFILE_PATH"] + "AIRS_HOSTFILE_PATH" + ] # Set config self.config.experiment.runner.hostfile = os.environ[ - "AIRS_HOSTFILE_PATH"] + "AIRS_HOSTFILE_PATH" + ] - self.config.experiment.auto_tuner.cards = self.config.experiment.auto_tuner.nnodes * self.config.experiment.auto_tuner.nproc_per_node + self.config.experiment.auto_tuner.cards = ( + self.config.experiment.auto_tuner.nnodes + * self.config.experiment.auto_tuner.nproc_per_node + ) # Build core sub modules, such as Searcher, Pruner, Generator and Recorder self.searcher = Searcher(self.config) @@ -116,11 +120,11 @@ def __init__(self, config: DictConfig): # The max time per task, unit: second # NOTE: The task will be stopped if the time is reached or done. self.max_time_per_task = self.config.experiment.auto_tuner.control.get( - "max_time_per_task", 300) + "max_time_per_task", 300 + ) # The max time of auto tuner, if None, no limit. - self.max_time = self.config.experiment.auto_tuner.control.get( - "max_time", None) + self.max_time = self.config.experiment.auto_tuner.control.get("max_time", None) # The start time of each task, used to control each task when stop self.start_task_time = None @@ -158,9 +162,11 @@ def tune(self): self.logger.info(f"Record task_{self.idx}:") self.record() - if (self.cur_strategy["performance"] - and self.config.experiment.auto_tuner.platform.get( - "airs_switch", False) and not self.has_checkout): + if ( + self.cur_strategy["performance"] + and self.config.experiment.auto_tuner.platform.get("airs_switch", False) + and not self.has_checkout + ): self.checkout() # get best strategy @@ -173,7 +179,8 @@ def tune(self): self.logger.info(f"No strategy can run so far.") tuner_end_time = time.time() self.logger.info( - f"AutoTuner Ended in {tuner_end_time - tuner_start_time} seconds.") + f"AutoTuner Ended in {tuner_end_time - tuner_start_time} seconds." + ) # Run the best task if self.config.experiment.auto_tuner.control.get("run_best", True): @@ -182,8 +189,7 @@ def tune(self): self.logger.info(f"Run best Strategy: {best_strategy}") else: raise ValueError(f"No strategy can run.") - best_task = self.generator.gen_best_task(best_strategy, - self.orig_config) + best_task = self.generator.gen_best_task(best_strategy, self.orig_config) best_task.action = "run" runner = SSHRunner(best_task) runner.run(monitor=True, interval=60) @@ -260,7 +266,8 @@ def monitor(self): try: status = self.runner._query_status() self.logger.info( - f"task_{self.cur_strategy['idx']} status: {status.name}") + f"task_{self.cur_strategy['idx']} status: {status.name}" + ) if status == JobStatus.COMPLETED_OR_IDLE: break if status == JobStatus.RUNNING: @@ -269,7 +276,7 @@ def monitor(self): if running: self.runner.stop() break - + # Add sub process monitor sub_process = self.runner._query_sub_process_status() if sub_process: @@ -290,15 +297,18 @@ def monitor(self): end_time = time.time() # Add elapsed time - self.cur_strategy["elapsed_time"] = round( - end_time - self.task_start_time, 2) + self.cur_strategy["elapsed_time"] = round(end_time - self.task_start_time, 2) # Add start time readable_task_start_time = datetime.datetime.fromtimestamp( - self.task_start_time).strftime("%Y-%m-%d %H:%M:%S") + self.task_start_time + ).strftime("%Y-%m-%d %H:%M:%S") self.cur_strategy["start_time"] = readable_task_start_time - self.logger.info("task_{} monitor time: {:.2f}s".format( - self.cur_strategy["idx"], self.cur_strategy["elapsed_time"])) + self.logger.info( + "task_{} monitor time: {:.2f}s".format( + self.cur_strategy["idx"], self.cur_strategy["elapsed_time"] + ) + ) def record(self): """Record the task result to csv""" @@ -307,7 +317,6 @@ def record(self): def get_best(self): sorted_history = self.recorder.sort(self.history) - if sorted_history and sorted_history[0] and sorted_history[0][ - "performance"]: + if sorted_history and sorted_history[0] and sorted_history[0]["performance"]: return sorted_history[0] return None diff --git a/flagscale/auto_tuner/utils.py b/flagscale/auto_tuner/utils.py index 9a6d0267e..b833f66ed 100644 --- a/flagscale/auto_tuner/utils.py +++ b/flagscale/auto_tuner/utils.py @@ -1,7 +1,8 @@ -import re import os +import re import socket import subprocess + from flagscale.launcher.runner import parse_hostfile diff --git a/flagscale/datasets/sft_dataset.py b/flagscale/datasets/sft_dataset.py index 874d9209b..0aa99a8e1 100644 --- a/flagscale/datasets/sft_dataset.py +++ b/flagscale/datasets/sft_dataset.py @@ -9,12 +9,21 @@ import numpy import torch -from megatron.core.datasets.gpt_dataset import GPTDataset, GPTDatasetConfig, _get_ltor_masks_and_position_ids -from megatron.core.datasets.indexed_dataset import IndexedDataset, get_bin_path, get_idx_path +from megatron.core.datasets.gpt_dataset import ( + GPTDataset, + GPTDatasetConfig, + _get_ltor_masks_and_position_ids, +) +from megatron.core.datasets.indexed_dataset import ( + IndexedDataset, + get_bin_path, + get_idx_path, +) from megatron.core.datasets.utils import Split logger = logging.getLogger(__name__) + @dataclass class SFTDatasetConfig(GPTDatasetConfig): """Configuration object for Megatron Core SFT datasets""" @@ -50,35 +59,44 @@ def __init__( config: SFTDatasetConfig, ) -> None: self.config = config - self.apply_sft_dataset_separated_loss_mask_if_existed = config.apply_sft_dataset_separated_loss_mask_if_existed + self.apply_sft_dataset_separated_loss_mask_if_existed = ( + config.apply_sft_dataset_separated_loss_mask_if_existed + ) self.loss_mask_dataset = None super().__init__( - indexed_dataset, dataset_path, indexed_indices, num_samples, index_split, config + indexed_dataset, + dataset_path, + indexed_indices, + num_samples, + index_split, + config, ) self._build_loss_mask_dataset() def _build_loss_mask_dataset(self) -> None: """ - Load Loss Mask IndexedDataset + Load Loss Mask IndexedDataset """ path_prefix = None - base_prefix = '_text_document' - loss_mask_prefix = '_loss_mask_document' + base_prefix = "_text_document" + loss_mask_prefix = "_loss_mask_document" if self.dataset_path.endswith(base_prefix): - path_prefix = self.dataset_path[:-len(base_prefix)] + loss_mask_prefix + path_prefix = self.dataset_path[: -len(base_prefix)] + loss_mask_prefix if self.apply_sft_dataset_separated_loss_mask_if_existed and path_prefix: idx_path = get_idx_path(path_prefix) bin_path = get_bin_path(path_prefix) if os.path.exists(idx_path) and os.path.exists(bin_path): self.loss_mask_dataset = IndexedDataset( - path_prefix, multimodal=False, mmap=self.config.mmap_bin_files) + path_prefix, multimodal=False, mmap=self.config.mmap_bin_files + ) - print(f'> Used Dataset: aux_loss_mask ...') + print(f"> Used Dataset: aux_loss_mask ...") if self.loss_mask_dataset is not None: - assert len(self.dataset) == len(self.loss_mask_dataset), \ - f"Samples are not equal, ({len(self.dataset)} != {len(self.loss_mask_dataset)})" + assert len(self.dataset) == len( + self.loss_mask_dataset + ), f"Samples are not equal, ({len(self.dataset)} != {len(self.loss_mask_dataset)})" def __getitem__(self, idx: Optional[int]) -> Dict[str, torch.Tensor]: """Abstract method implementation @@ -139,11 +157,12 @@ def __getitem__(self, idx: Optional[int]) -> Dict[str, torch.Tensor]: # aux dataset aux_loss_mask, _ = self._query_document_sample_shuffle_indices_aux_dataset( - self.loss_mask_dataset, idx) + self.loss_mask_dataset, idx + ) if aux_loss_mask is not None: - if idx % 100 == 0: - print(f'> Used aux_loss_mask at current sample={idx} ...') - loss_mask = torch.from_numpy(aux_loss_mask).float()[1:].contiguous() + if idx % 100 == 0: + print(f"> Used aux_loss_mask at current sample={idx} ...") + loss_mask = torch.from_numpy(aux_loss_mask).float()[1:].contiguous() if self.config.create_attention_mask: return { @@ -210,11 +229,12 @@ def _query_document_sample_shuffle_indices_aux_dataset( offset = 0 if i > doc_index_beg else doc_index_beg_offset length = None if i < doc_index_end else doc_index_end_offset + 1 sample_parts.append( - aux_dataset.get(self.document_index[i], offset=offset, length=length) + aux_dataset.get( + self.document_index[i], offset=offset, length=length + ) ) return ( numpy.array(numpy.concatenate(sample_parts), dtype=numpy.int64), numpy.array(document_ids, dtype=numpy.int64), ) - diff --git a/flagscale/launcher/runner.py b/flagscale/launcher/runner.py index a69ac1d9c..752e234f7 100644 --- a/flagscale/launcher/runner.py +++ b/flagscale/launcher/runner.py @@ -1,17 +1,19 @@ -import os -import re -import sys import collections import copy -import socket +import json +import os +import re import shlex +import socket import subprocess -import json -import uuid +import sys import time -from datetime import datetime +import uuid from abc import ABC, abstractmethod +from datetime import datetime + from omegaconf import DictConfig, OmegaConf + from ..logger import logger from .job_status import JobStatus @@ -469,13 +471,13 @@ def _generate_run_script_train( return host_run_script_file -def _generate_run_script_inference(config, host, node_rank, cmd, background=True, with_test=False): +def _generate_run_script_inference( + config, host, node_rank, cmd, background=True, with_test=False +): logging_config = config.inference.system.logging no_shared_fs = config.experiment.runner.get("no_shared_fs", False) if no_shared_fs: - host_output_file = os.path.join( - logging_config.log_dir, f"host.output" - ) + host_output_file = os.path.join(logging_config.log_dir, f"host.output") else: host_output_file = os.path.join( logging_config.log_dir, f"host_{node_rank}_{host}.output" @@ -592,9 +594,7 @@ def _prepare(self): _update_config_inference(self.config) self.user_args = get_args_vllm(self.config) else: - raise ValueError( - f"Unsupported task type in SSHRunner: {self.mode}" - ) + raise ValueError(f"Unsupported task type in SSHRunner: {self.mode}") self.rdzv_id = datetime.now().strftime("%Y%m%d_%H%M%S.%f") self.user_envs = self.config.experiment.get("envs", {}) @@ -822,17 +822,13 @@ def _generate_query_sub_process_script(self, host, node_rank): f.write("#!/bin/bash\n\n") f.write("if [ -f " + host_pid_file + " ]; then\n") f.write(" pid=$(cat " + host_pid_file + ")\n") - f.write( - " ps -eo pid,ppid | awk -v ppid=$pid '$2 == ppid {print $1}'\n" - ) + f.write(" ps -eo pid,ppid | awk -v ppid=$pid '$2 == ppid {print $1}'\n") f.write("else\n") # TODO: This is a temporary fix. We need to find a better way to query the job. f.write( " pid=$(ps aux | grep 'torchrun' | grep -v grep | head -n 1 | awk '{print $2}')\n" ) - f.write( - " ps -eo pid,ppid | awk -v ppid=$pid '$2 == ppid {print $1}'\n" - ) + f.write(" ps -eo pid,ppid | awk -v ppid=$pid '$2 == ppid {print $1}'\n") f.write("fi\n") f.flush() os.fsync(f.fileno()) @@ -991,9 +987,7 @@ def _prepare(self): _update_config_train(self.config) self.user_args = get_args_megatron(self.config) else: - raise ValueError( - f"Unsupported task type in CloudRunner: {self.mode}" - ) + raise ValueError(f"Unsupported task type in CloudRunner: {self.mode}") logger.info("\n************** configuration ***********") logger.info(f"\n{OmegaConf.to_yaml(self.config)}") diff --git a/flagscale/logger.py b/flagscale/logger.py index d25781f1a..136879873 100644 --- a/flagscale/logger.py +++ b/flagscale/logger.py @@ -1,5 +1,5 @@ -import sys import logging +import sys class Logger: @@ -12,7 +12,9 @@ def __init__(self, name, level=logging.INFO): for handler in self.logger.handlers[:]: self.logger.removeHandler(handler) - formatter = logging.Formatter('[%(asctime)s %(name)s %(filename)s %(levelname)s] %(message)s') + formatter = logging.Formatter( + "[%(asctime)s %(name)s %(filename)s %(levelname)s] %(message)s" + ) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) @@ -34,12 +36,15 @@ def critical(self, message): def debug(self, message): self.logger.debug(message) + GLOBAL_LOGGER = None + def get_logger(): global GLOBAL_LOGGER if GLOBAL_LOGGER is None: - GLOBAL_LOGGER = Logger('FlagScale') + GLOBAL_LOGGER = Logger("FlagScale") return GLOBAL_LOGGER -logger = get_logger() \ No newline at end of file + +logger = get_logger() diff --git a/flagscale/patches_utils.py b/flagscale/patches_utils.py index 8e86ce1fd..68d619dd5 100644 --- a/flagscale/patches_utils.py +++ b/flagscale/patches_utils.py @@ -1,16 +1,19 @@ -# this file is used for adding tools func to processing patches +# this file is used for adding tools func to processing patches + def add_patches_module(path: str, module_dict: dict): if len(module_dict) == 0: raise Exception(f"module dict is None") import sys + print(f"{path} is being instead, using module {module_dict}") for k in sys.modules: if k.startswith(path): for module_name, module_ in module_dict.items(): import re + class_pattern = re.compile("\w*\.w*") - if not re.match(class_pattern, module_name): + if not re.match(class_pattern, module_name): try: if getattr(sys.modules[k], module_name, None): setattr(sys.modules[k], module_name, module_) @@ -19,5 +22,5 @@ def add_patches_module(path: str, module_dict: dict): else: class_name, fuc_name = module_name.split(".") class_obj = getattr(sys.modules[k], class_name, None) - if class_obj and getattr(class_obj, fuc_name , None): + if class_obj and getattr(class_obj, fuc_name, None): setattr(class_obj, fuc_name, module_) diff --git a/megatron/tests/unit_tests/__init__.py b/megatron/tests/unit_tests/__init__.py index 1d3c586a5..88828cda5 100644 --- a/megatron/tests/unit_tests/__init__.py +++ b/megatron/tests/unit_tests/__init__.py @@ -1,2 +1,3 @@ import torch._dynamo -torch._dynamo.config.suppress_errors = True \ No newline at end of file +torch._dynamo.config.suppress_errors = True + diff --git a/tests/scripts/functional_test_flagscale.sh b/tests/scripts/functional_test_flagscale.sh index 2edf8f913..9df71e8b4 100755 --- a/tests/scripts/functional_test_flagscale.sh +++ b/tests/scripts/functional_test_flagscale.sh @@ -1,13 +1,2 @@ -# if [ "$2" = "stop" ]; then -# python run.py --config-path tests/functional_tests/$1/conf --config-name config action=stop -# exit 0 -# fi - -OUT_DIR=./tests/functional_tests/$1/test_result -if [ -d "$OUT_DIR" ]; then - echo "$OUT_DIR exist." - rm -r $OUT_DIR - sleep 3s -fi - -python run.py --config-path tests/functional_tests/$1/conf --config-name config action=test +python run.py --config-path tests/functional_tests/aquila/conf --config-name config action=test +pytest -s tests/functional_tests/test_result.py --test_reaults_path=./tests/functional_tests/aquila/test_result \ No newline at end of file diff --git a/tests/scripts/unit_test_flagscale.sh b/tests/scripts/unit_test_flagscale.sh index 46861add6..19179e62b 100755 --- a/tests/scripts/unit_test_flagscale.sh +++ b/tests/scripts/unit_test_flagscale.sh @@ -1 +1,9 @@ -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:./cov-report-flagscale --cov=flagscale -q -x tests/unit_tests/launcher \ No newline at end of file +if [ -z "$1" ] +then + code_id=0 +else + code_id=$1 +fi + + +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-flagscale --cov=flagscale -q -x tests/unit_tests/launcher \ No newline at end of file diff --git a/tests/scripts/unit_test_megatron.sh b/tests/scripts/unit_test_megatron.sh index eef8e585a..4967e7cc1 100755 --- a/tests/scripts/unit_test_megatron.sh +++ b/tests/scripts/unit_test_megatron.sh @@ -1,12 +1,19 @@ +if [ -z "$1" ] +then + code_id=0 +else + code_id=$1 +fi + cd megatron export PYTHONPATH=..:$PYTHONPATH -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/data -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/dist_checkpointing -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/fusions -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/models -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/pipeline_parallel -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/tensor_parallel -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/transformer -torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:../cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/*.py \ No newline at end of file +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/data +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/dist_checkpointing +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/fusions +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/models +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/pipeline_parallel +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/tensor_parallel +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/transformer +torchrun --nproc_per_node=8 -m pytest --import-mode=importlib --cov-append --cov-report=html:/workspace/report/$code_id/cov-report-megatron --cov=megatron/core -q -x tests/unit_tests/*.py \ No newline at end of file