From 2c4d290e11f3ef904479d1ccca125a792bfa3534 Mon Sep 17 00:00:00 2001 From: Alexander Dokuchaev Date: Mon, 11 Sep 2023 12:51:54 +0300 Subject: [PATCH] Add marks nightly and weakly for torch tests (#2092) ### Changes - Add marks `nightly` and `weakly` for tests. - Mark sanity tests as `nightly` - Split `test_functions.TestParametrized` to fast for precommit and long for nightly - Time of torch precommit reduced from 60 to 40 mins - Set `xfail` for sanity tests with `--mode train` in case of segment fault. Sporadic segment fault reproduced on torch>=2.0.0 on call `backward` function. ### Related tickets 119128 --- Makefile | 12 +- tests/torch/nas/test_sanity_sample.py | 1 + tests/torch/pytest.ini | 2 + tests/torch/quantization/test_functions.py | 36 +- .../torch/quantization/test_sanity_sample.py | 2 + tests/torch/test_sanity_sample.py | 858 +++++++++--------- 6 files changed, 475 insertions(+), 436 deletions(-) diff --git a/Makefile b/Makefile index 173d7d57738..5bd8cf8f4dd 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,10 @@ ifdef DATA DATA_ARG := --data $(DATA) endif +ifdef WEEKLY_MODELS + WEEKLY_MODELS_ARG := --weekly-models $(WEEKLY_MODELS) +endif + install-pre-commit: pip install pre-commit==3.2.2 @@ -124,7 +128,13 @@ install-torch-dev: install-torch-test install-pre-commit install-pylint pip install -r examples/post_training_quantization/torch/ssd300_vgg16/requirements.txt test-torch: - pytest ${COVERAGE_ARGS} tests/common tests/torch --junitxml ${JUNITXML_PATH} $(DATA_ARG) + pytest ${COVERAGE_ARGS} tests/common tests/torch -m "not weekly and not nightly" --junitxml ${JUNITXML_PATH} $(DATA_ARG) + +test-torch-nightly: + pytest ${COVERAGE_ARGS} tests/torch -m nightly --junitxml ${JUNITXML_PATH} $(DATA_ARG) + +test-torch-weekly: + pytest ${COVERAGE_ARGS} tests/torch -m weekly --junitxml ${JUNITXML_PATH} $(DATA_ARG) ${WEEKLY_MODELS_ARG} COMMON_PYFILES := $(shell python3 tools/collect_pylint_input_files_for_backend.py common) pylint-torch: diff --git a/tests/torch/nas/test_sanity_sample.py b/tests/torch/nas/test_sanity_sample.py index ee9f5399265..c2575e14428 100644 --- a/tests/torch/nas/test_sanity_sample.py +++ b/tests/torch/nas/test_sanity_sample.py @@ -96,6 +96,7 @@ def fixture_nas_desc(request, dataset_dir): return desc.finalize(dataset_dir) +@pytest.mark.nightly def test_e2e_supernet_training(nas_desc: NASSampleTestDescriptor, tmp_path, mocker): validator = nas_desc.get_validator() args = validator.get_default_args(tmp_path) diff --git a/tests/torch/pytest.ini b/tests/torch/pytest.ini index c016408adde..b35c6a2b6b7 100644 --- a/tests/torch/pytest.ini +++ b/tests/torch/pytest.ini @@ -5,5 +5,7 @@ markers = convert train install + nightly + weekly python_files = test_* xfail_strict = true diff --git a/tests/torch/quantization/test_functions.py b/tests/torch/quantization/test_functions.py index d150fb265d5..fa17bbe69fc 100644 --- a/tests/torch/quantization/test_functions.py +++ b/tests/torch/quantization/test_functions.py @@ -171,16 +171,16 @@ def skip_if_half_on_cpu(is_fp16, use_cuda): def check_quant_moved(test_input, test_val, ref_val, quant_len, input_low, input_range, is_fp16, rtol, atol=1e-10): """ Checks values in `test_val` are inside of closest quant and - values in `test_val` and `ref_val` elementwise eather equal with given rtol/atol or + values in `test_val` and `ref_val` elementwise either equal with given rtol/atol or values differ by correspondent `quant_len` +- rtol. :param test_input: Input of a quantizer. :param test_val: Given test value. :param ref_val: Given reference value. - :param quant_len: Lenghts of quants in quantizers + :param quant_len: Length of quants in quantizers (for each channel in case per channel quantization). - :param atol: Absolute tollerance. - :param rtol: Relative tollerance. + :param atol: Absolute tolerance. + :param rtol: Relative tolerance. """ def to_tensor(a): @@ -214,15 +214,10 @@ def check_outputs_for_quantization_functions(test_val: torch.Tensor, ref_val: np PTTensorListComparator.check_equal(test_val, ref_val, rtol, atol) -@pytest.mark.parametrize( - "input_size", - [[1, 48, 112, 112], [1, 96, 28, 28], [1, 288, 14, 14], [16, 96, 112, 112], [16, 192, 28, 28], [16, 576, 14, 14]], - ids=idfn, -) @pytest.mark.parametrize("bits", (8, 4), ids=("8bit", "4bit")) @pytest.mark.parametrize("scale_mode", ["single_scale", "per_channel_scale"]) @pytest.mark.parametrize("is_fp16", (True, False), ids=("fp16", "fp32")) -class TestParametrized: +class BaseParametrized: class TestSymmetric: @staticmethod def generate_scale(input_size, scale_mode, is_weights, is_fp16, fixed=None): @@ -523,12 +518,12 @@ def test_quantize_asymmetric_backward(self, _seed, input_size, bits, use_cuda, i if is_fp16: # This is needed to make scale == 1 to prevent # quant movement on forward pass in FP16 precision. - # In case scale != 1., not precice scale multiplication in FP16 + # In case scale != 1., not precise scale multiplication in FP16 # could lead to big deviations, so even if an input point # lies in safe range (far from middles of quants) after a scaling # it could end up in the middle of a quant. It happens mostly - # when target quant > 150 because in real life scenarious quantization range - # usualy less than 2 ** quantization bits, + # when target quant > 150 because in real life scenarios quantization range + # usually less than 2 ** quantization bits, # so input is small and scale is big, small FP16 input multiplies big fp16 scale, # deviation is significant. fixed = {} @@ -589,6 +584,21 @@ def test_quantize_asymmetric_backward(self, _seed, input_size, bits, use_cuda, i check_outputs_for_quantization_functions(test_grads, ref_grads, rtol=1e-2 if is_fp16 else 1e-3) +@pytest.mark.parametrize("input_size", [[1, 16, 64, 64], [4, 16, 16, 16]], ids=idfn) +class TestParametrizedFast(BaseParametrized): + pass + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "input_size", + [[1, 48, 112, 112], [1, 96, 28, 28], [1, 288, 14, 14], [16, 96, 112, 112], [16, 192, 28, 28], [16, 576, 14, 14]], + ids=idfn, +) +class TestParametrizedLong(BaseParametrized): + pass + + @pytest.mark.parametrize("device", ["cuda", "cpu"]) def test_mapping_to_zero(quantization_mode, device): torch.manual_seed(42) diff --git a/tests/torch/quantization/test_sanity_sample.py b/tests/torch/quantization/test_sanity_sample.py index 10909d57c59..e42832f1186 100644 --- a/tests/torch/quantization/test_sanity_sample.py +++ b/tests/torch/quantization/test_sanity_sample.py @@ -272,6 +272,7 @@ def fixture_precision_desc(request, dataset_dir): return desc.finalize(dataset_dir) +@pytest.mark.nightly def test_precision_init(precision_desc: PrecisionTestCaseDescriptor, tmp_path, mocker): validator = precision_desc.get_validator() args = validator.get_default_args(tmp_path) @@ -351,6 +352,7 @@ def fixture_export_desc(request): return desc.finalize() +@pytest.mark.nightly @pytest.mark.parametrize( ("extra_args", "is_export_called"), (({}, False), ({"-m": ["export", "train"]}, True)), diff --git a/tests/torch/test_sanity_sample.py b/tests/torch/test_sanity_sample.py index e7df2c2f1ab..42da1e08774 100644 --- a/tests/torch/test_sanity_sample.py +++ b/tests/torch/test_sanity_sample.py @@ -107,6 +107,10 @@ CONFIG_PARAMS.append((sample_type_,) + tpl) +def _get_test_case_id(p) -> str: + return "-".join([p[0], p[1].name, p[2], str(p[3])]) + + def update_compression_algo_dict_with_reduced_bn_adapt_params(algo_dict): if algo_dict["algorithm"] == "rb_sparsity": return @@ -139,469 +143,479 @@ def update_compression_algo_dict_with_legr_save_load_params(nncf_config, tmp_pat return nncf_config -def _get_test_case_id(p) -> str: - return "-".join([p[0], p[1].name, p[2], str(p[3])]) - +def extract_compression_stage_from_checkpoint(last_checkpoint_path: str) -> CompressionStage: + compression_state = torch.load(last_checkpoint_path)[COMPRESSION_STATE_ATTR] + ctrl_state = compression_state[BaseController.CONTROLLER_STATE] + compression_stage = next(iter(ctrl_state.values()))[BaseControllerStateNames.COMPRESSION_STAGE] + return compression_stage -@pytest.fixture(params=CONFIG_PARAMS, name="config", ids=[_get_test_case_id(p) for p in CONFIG_PARAMS]) -def fixture_config(request, dataset_dir): - sample_type, config_path, dataset_name, batch_size = request.param - dataset_path = DATASET_PATHS[sample_type][dataset_name](dataset_dir) - with config_path.open() as f: - jconfig = json.load(f) +def depends_on_pretrained_train(request, test_case_id: str, current_multiprocessing_distributed: bool): + full_test_case_id = test_case_id + ("-distributed" if current_multiprocessing_distributed else "-dataparallel") + primary_test_case_name = f"TestSanitySample::test_pretrained_model_train[{full_test_case_id}]" + depends(request, [primary_test_case_name]) - if "checkpoint_save_dir" in jconfig.keys(): - del jconfig["checkpoint_save_dir"] - # Use a reduced number of BN adaptation samples for speed - if "compression" in jconfig: - if isinstance(jconfig["compression"], list): - algos_list = jconfig["compression"] - for algo_dict in algos_list: - update_compression_algo_dict_with_reduced_bn_adapt_params(algo_dict) - else: - algo_dict = jconfig["compression"] - update_compression_algo_dict_with_reduced_bn_adapt_params(algo_dict) - jconfig["dataset"] = dataset_name - - return { - "sample_type": sample_type, - "sample_config": jconfig, - "model_name": jconfig["model"], - "dataset_path": dataset_path, - "batch_size": batch_size, - "test_case_id": _get_test_case_id(request.param), - } - - -@pytest.fixture(scope="module", name="case_common_dirs") -def fixture_case_common_dirs(tmp_path_factory): - return { - "checkpoint_save_dir": str(tmp_path_factory.mktemp("models")), - "save_coeffs_path": str(tmp_path_factory.mktemp("ranking_coeffs")), - } - - -@pytest.mark.parametrize(" multiprocessing_distributed", (True, False), ids=["distributed", "dataparallel"]) -def test_pretrained_model_eval(config, tmp_path, multiprocessing_distributed, case_common_dirs): - if version.parse(torchvision.__version__) < version.parse("0.13") and "voc" in str(config["dataset_path"]): - pytest.skip( - f"Test calls sample that uses `datasets.VOCDetection.parse_voc_xml` function from latest " - f"torchvision.\nThe signature of the function is not compatible with the corresponding signature " - f"from the current torchvision version : {torchvision.__version__}" - ) - config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") - config_factory.config = update_compression_algo_dict_with_legr_save_load_params( - config_factory.config, case_common_dirs["save_coeffs_path"] - ) - args = { - "--mode": "test", - "--data": config["dataset_path"], - "--config": config_factory.serialize(), - "--log-dir": tmp_path, - "--batch-size": config["batch_size"] * NUM_DEVICES, - "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue - "--dist-url": "tcp://127.0.0.1:8987", - } - - if not torch.cuda.is_available(): - args["--cpu-only"] = True - elif multiprocessing_distributed: - args["--multiprocessing-distributed"] = True - - runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) - runner.run() - - -@pytest.mark.dependency() -@pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) -def test_pretrained_model_train(config, tmp_path, multiprocessing_distributed, case_common_dirs): - checkpoint_save_dir = os.path.join( - case_common_dirs["checkpoint_save_dir"], "distributed" if multiprocessing_distributed else "data_parallel" - ) - config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") - config_factory.config = update_compression_algo_dict_with_legr_save_load_params( - config_factory.config, case_common_dirs["save_coeffs_path"] +def get_resuming_checkpoint_path(config_factory, multiprocessing_distributed, checkpoint_save_dir): + return os.path.join( + checkpoint_save_dir, + "distributed" if multiprocessing_distributed else "data_parallel", + get_run_name(config_factory.config) + "_last.pth", ) - args = { - "--mode": "train", - "--data": config["dataset_path"], - "--config": config_factory.serialize(), - "--log-dir": tmp_path, - "--batch-size": config["batch_size"] * NUM_DEVICES, - "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue - "--epochs": 2, - "--checkpoint-save-dir": checkpoint_save_dir, - "--dist-url": "tcp://127.0.0.1:8989", - } - - if not torch.cuda.is_available(): - args["--cpu-only"] = True - elif multiprocessing_distributed: - args["--multiprocessing-distributed"] = True - elif config["sample_config"]["model"] == "inception_v3": - pytest.skip( - "InceptionV3 may not be trained in DataParallel " - "because it outputs namedtuple, which DP seems to be unable " - "to support even still." - ) - runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) - runner.run() - last_checkpoint_path = os.path.join(checkpoint_save_dir, get_run_name(config_factory.config) + "_last.pth") - assert os.path.exists(last_checkpoint_path) - if "compression" in config["sample_config"]: - allowed_compression_stages = (CompressionStage.FULLY_COMPRESSED, CompressionStage.PARTIALLY_COMPRESSED) - else: - allowed_compression_stages = (CompressionStage.UNCOMPRESSED,) - compression_stage = extract_compression_stage_from_checkpoint(last_checkpoint_path) - assert compression_stage in allowed_compression_stages +@contextmanager +def set_num_threads_locally(n=1): + old_n = torch.get_num_threads() + try: + torch.set_num_threads(n) + yield + finally: + torch.set_num_threads(old_n) -def depends_on_pretrained_train(request, test_case_id: str, current_multiprocessing_distributed: bool): - full_test_case_id = test_case_id + ("-distributed" if current_multiprocessing_distributed else "-dataparallel") - primary_test_case_name = f"test_pretrained_model_train[{full_test_case_id}]" - depends(request, [primary_test_case_name]) +def _run_with_xfail_119128(runner: Command): + returncode = runner.run(assert_returncode_zero=False) + if returncode == 139: + pytest.xfail("Bug 119128: sporadic segment fault on backward") -@pytest.mark.dependency() -@pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) -def test_trained_model_eval(request, config, tmp_path, multiprocessing_distributed, case_common_dirs): - if version.parse(torchvision.__version__) < version.parse("0.13") and "voc" in str(config["dataset_path"]): - pytest.skip( - f"Test calls sample that uses `datasets.VOCDetection.parse_voc_xml` function from latest " - f"torchvision.\nThe signature of the function is not compatible with the corresponding signature " - f"from the current torchvision version : {torchvision.__version__}" - ) - depends_on_pretrained_train(request, config["test_case_id"], multiprocessing_distributed) - config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") - config_factory.config = update_compression_algo_dict_with_legr_save_load_params( - config_factory.config, case_common_dirs["save_coeffs_path"] - ) +@pytest.mark.nightly +class TestSanitySample: + @staticmethod + @pytest.fixture(params=CONFIG_PARAMS, name="config", ids=[_get_test_case_id(p) for p in CONFIG_PARAMS]) + def fixture_config(request, dataset_dir): + sample_type, config_path, dataset_name, batch_size = request.param + dataset_path = DATASET_PATHS[sample_type][dataset_name](dataset_dir) - ckpt_path = os.path.join( - case_common_dirs["checkpoint_save_dir"], - "distributed" if multiprocessing_distributed else "data_parallel", - get_run_name(config_factory.config) + "_last.pth", - ) - args = { - "--mode": "test", - "--data": config["dataset_path"], - "--config": config_factory.serialize(), - "--log-dir": tmp_path, - "--batch-size": config["batch_size"] * NUM_DEVICES, - "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue - "--weights": ckpt_path, - "--dist-url": "tcp://127.0.0.1:8987", - } - - if not torch.cuda.is_available(): - args["--cpu-only"] = True - elif multiprocessing_distributed: - args["--multiprocessing-distributed"] = True - - runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) - runner.run() + with config_path.open() as f: + jconfig = json.load(f) + if "checkpoint_save_dir" in jconfig.keys(): + del jconfig["checkpoint_save_dir"] -def get_resuming_checkpoint_path(config_factory, multiprocessing_distributed, checkpoint_save_dir): - return os.path.join( - checkpoint_save_dir, - "distributed" if multiprocessing_distributed else "data_parallel", - get_run_name(config_factory.config) + "_last.pth", - ) + # Use a reduced number of BN adaptation samples for speed + if "compression" in jconfig: + if isinstance(jconfig["compression"], list): + algos_list = jconfig["compression"] + for algo_dict in algos_list: + update_compression_algo_dict_with_reduced_bn_adapt_params(algo_dict) + else: + algo_dict = jconfig["compression"] + update_compression_algo_dict_with_reduced_bn_adapt_params(algo_dict) + jconfig["dataset"] = dataset_name + + return { + "sample_type": sample_type, + "sample_config": jconfig, + "model_name": jconfig["model"], + "dataset_path": dataset_path, + "batch_size": batch_size, + "test_case_id": _get_test_case_id(request.param), + } + @staticmethod + @pytest.fixture(scope="module", name="case_common_dirs") + def fixture_case_common_dirs(tmp_path_factory): + return { + "checkpoint_save_dir": str(tmp_path_factory.mktemp("models")), + "save_coeffs_path": str(tmp_path_factory.mktemp("ranking_coeffs")), + } -@pytest.mark.dependency() -@pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) -def test_resume(request, config, tmp_path, multiprocessing_distributed, case_common_dirs): - depends_on_pretrained_train(request, config["test_case_id"], multiprocessing_distributed) - checkpoint_save_dir = os.path.join(str(tmp_path), "models") - config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") - config_factory.config = update_compression_algo_dict_with_legr_save_load_params( - config_factory.config, case_common_dirs["save_coeffs_path"], False - ) + @staticmethod + @pytest.mark.parametrize(" multiprocessing_distributed", (True, False), ids=["distributed", "dataparallel"]) + def test_pretrained_model_eval(config, tmp_path, multiprocessing_distributed, case_common_dirs): + if version.parse(torchvision.__version__) < version.parse("0.13") and "voc" in str(config["dataset_path"]): + pytest.skip( + f"Test calls sample that uses `datasets.VOCDetection.parse_voc_xml` function from latest " + f"torchvision.\nThe signature of the function is not compatible with the corresponding signature " + f"from the current torchvision version : {torchvision.__version__}" + ) + config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") + config_factory.config = update_compression_algo_dict_with_legr_save_load_params( + config_factory.config, case_common_dirs["save_coeffs_path"] + ) + args = { + "--mode": "test", + "--data": config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--dist-url": "tcp://127.0.0.1:8987", + } - ckpt_path = get_resuming_checkpoint_path( - config_factory, multiprocessing_distributed, case_common_dirs["checkpoint_save_dir"] - ) - if "max_iter" in config_factory.config: - config_factory.config["max_iter"] += 2 - args = { - "--mode": "train", - "--data": config["dataset_path"], - "--config": config_factory.serialize(), - "--log-dir": tmp_path, - "--batch-size": config["batch_size"] * NUM_DEVICES, - "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue - "--epochs": 3, - "--checkpoint-save-dir": checkpoint_save_dir, - "--resume": ckpt_path, - "--dist-url": "tcp://127.0.0.1:8986", - } - - if not torch.cuda.is_available(): - args["--cpu-only"] = True - elif multiprocessing_distributed: - args["--multiprocessing-distributed"] = True - - runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) - runner.run() - last_checkpoint_path = os.path.join(checkpoint_save_dir, get_run_name(config_factory.config) + "_last.pth") - assert os.path.exists(last_checkpoint_path) - if "compression" in config["sample_config"]: - allowed_compression_stages = (CompressionStage.FULLY_COMPRESSED, CompressionStage.PARTIALLY_COMPRESSED) - else: - allowed_compression_stages = (CompressionStage.UNCOMPRESSED,) - compression_stage = extract_compression_stage_from_checkpoint(last_checkpoint_path) - assert compression_stage in allowed_compression_stages + if not torch.cuda.is_available(): + args["--cpu-only"] = True + elif multiprocessing_distributed: + args["--multiprocessing-distributed"] = True + runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + runner.run() -def extract_compression_stage_from_checkpoint(last_checkpoint_path: str) -> CompressionStage: - compression_state = torch.load(last_checkpoint_path)[COMPRESSION_STATE_ATTR] - ctrl_state = compression_state[BaseController.CONTROLLER_STATE] - compression_stage = next(iter(ctrl_state.values()))[BaseControllerStateNames.COMPRESSION_STAGE] - return compression_stage + @staticmethod + @pytest.mark.dependency() + @pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) + def test_pretrained_model_train(config, tmp_path, multiprocessing_distributed, case_common_dirs): + checkpoint_save_dir = os.path.join( + case_common_dirs["checkpoint_save_dir"], "distributed" if multiprocessing_distributed else "data_parallel" + ) + config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") + config_factory.config = update_compression_algo_dict_with_legr_save_load_params( + config_factory.config, case_common_dirs["save_coeffs_path"] + ) + args = { + "--mode": "train", + "--data": config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--epochs": 2, + "--checkpoint-save-dir": checkpoint_save_dir, + "--dist-url": "tcp://127.0.0.1:8989", + } -@pytest.mark.dependency() -@pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) -def test_export_with_resume(request, config, tmp_path, multiprocessing_distributed, case_common_dirs): - depends_on_pretrained_train(request, config["test_case_id"], multiprocessing_distributed) - config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") - config_factory.config = update_compression_algo_dict_with_legr_save_load_params( - config_factory.config, case_common_dirs["save_coeffs_path"], False - ) + if not torch.cuda.is_available(): + args["--cpu-only"] = True + elif multiprocessing_distributed: + args["--multiprocessing-distributed"] = True + elif config["sample_config"]["model"] == "inception_v3": + pytest.skip( + "InceptionV3 may not be trained in DataParallel " + "because it outputs namedtuple, which DP seems to be unable " + "to support even still." + ) + + runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + _run_with_xfail_119128(runner) + last_checkpoint_path = os.path.join(checkpoint_save_dir, get_run_name(config_factory.config) + "_last.pth") + assert os.path.exists(last_checkpoint_path) + if "compression" in config["sample_config"]: + allowed_compression_stages = (CompressionStage.FULLY_COMPRESSED, CompressionStage.PARTIALLY_COMPRESSED) + else: + allowed_compression_stages = (CompressionStage.UNCOMPRESSED,) + compression_stage = extract_compression_stage_from_checkpoint(last_checkpoint_path) + assert compression_stage in allowed_compression_stages + + @staticmethod + @pytest.mark.dependency() + @pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) + def test_trained_model_eval(request, config, tmp_path, multiprocessing_distributed, case_common_dirs): + if version.parse(torchvision.__version__) < version.parse("0.13") and "voc" in str(config["dataset_path"]): + pytest.skip( + f"Test calls sample that uses `datasets.VOCDetection.parse_voc_xml` function from latest " + f"torchvision.\nThe signature of the function is not compatible with the corresponding signature " + f"from the current torchvision version : {torchvision.__version__}" + ) + depends_on_pretrained_train(request, config["test_case_id"], multiprocessing_distributed) + config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") + config_factory.config = update_compression_algo_dict_with_legr_save_load_params( + config_factory.config, case_common_dirs["save_coeffs_path"] + ) - ckpt_path = get_resuming_checkpoint_path( - config_factory, multiprocessing_distributed, case_common_dirs["checkpoint_save_dir"] - ) + ckpt_path = os.path.join( + case_common_dirs["checkpoint_save_dir"], + "distributed" if multiprocessing_distributed else "data_parallel", + get_run_name(config_factory.config) + "_last.pth", + ) + args = { + "--mode": "test", + "--data": config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--weights": ckpt_path, + "--dist-url": "tcp://127.0.0.1:8987", + } - onnx_path = os.path.join(str(tmp_path), "model.onnx") - args = {"--mode": "export", "--config": config_factory.serialize(), "--resume": ckpt_path, "--to-onnx": onnx_path} + if not torch.cuda.is_available(): + args["--cpu-only"] = True + elif multiprocessing_distributed: + args["--multiprocessing-distributed"] = True + + runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + runner.run() + + @staticmethod + @pytest.mark.dependency() + @pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) + def test_resume(request, config, tmp_path, multiprocessing_distributed, case_common_dirs): + depends_on_pretrained_train(request, config["test_case_id"], multiprocessing_distributed) + checkpoint_save_dir = os.path.join(str(tmp_path), "models") + config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") + config_factory.config = update_compression_algo_dict_with_legr_save_load_params( + config_factory.config, case_common_dirs["save_coeffs_path"], False + ) - if not torch.cuda.is_available(): - args["--cpu-only"] = True + ckpt_path = get_resuming_checkpoint_path( + config_factory, multiprocessing_distributed, case_common_dirs["checkpoint_save_dir"] + ) + if "max_iter" in config_factory.config: + config_factory.config["max_iter"] += 2 + args = { + "--mode": "train", + "--data": config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--epochs": 3, + "--checkpoint-save-dir": checkpoint_save_dir, + "--resume": ckpt_path, + "--dist-url": "tcp://127.0.0.1:8986", + } - runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) - runner.run() - assert os.path.exists(onnx_path) + if not torch.cuda.is_available(): + args["--cpu-only"] = True + elif multiprocessing_distributed: + args["--multiprocessing-distributed"] = True + + runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + _run_with_xfail_119128(runner) + last_checkpoint_path = os.path.join(checkpoint_save_dir, get_run_name(config_factory.config) + "_last.pth") + assert os.path.exists(last_checkpoint_path) + if "compression" in config["sample_config"]: + allowed_compression_stages = (CompressionStage.FULLY_COMPRESSED, CompressionStage.PARTIALLY_COMPRESSED) + else: + allowed_compression_stages = (CompressionStage.UNCOMPRESSED,) + compression_stage = extract_compression_stage_from_checkpoint(last_checkpoint_path) + assert compression_stage in allowed_compression_stages + + @staticmethod + @pytest.mark.dependency() + @pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) + def test_export_with_resume(request, config, tmp_path, multiprocessing_distributed, case_common_dirs): + depends_on_pretrained_train(request, config["test_case_id"], multiprocessing_distributed) + config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") + config_factory.config = update_compression_algo_dict_with_legr_save_load_params( + config_factory.config, case_common_dirs["save_coeffs_path"], False + ) + ckpt_path = get_resuming_checkpoint_path( + config_factory, multiprocessing_distributed, case_common_dirs["checkpoint_save_dir"] + ) -def test_export_with_pretrained(tmp_path): - config = SampleConfig() - config.update( - { - "model": "resnet18", - "dataset": "imagenet", - "input_info": {"sample_size": [2, 3, 299, 299]}, - "num_classes": 1000, - "compression": {"algorithm": "magnitude_sparsity"}, + onnx_path = os.path.join(str(tmp_path), "model.onnx") + args = { + "--mode": "export", + "--config": config_factory.serialize(), + "--resume": ckpt_path, + "--to-onnx": onnx_path, } - ) - config_factory = ConfigFactory(config, tmp_path / "config.json") - onnx_path = os.path.join(str(tmp_path), "model.onnx") - args = {"--mode": "export", "--config": config_factory.serialize(), "--pretrained": "", "--to-onnx": onnx_path} + if not torch.cuda.is_available(): + args["--cpu-only"] = True + + runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + runner.run() + assert os.path.exists(onnx_path) + + @staticmethod + def test_export_with_pretrained(tmp_path): + config = SampleConfig() + config.update( + { + "model": "resnet18", + "dataset": "imagenet", + "input_info": {"sample_size": [2, 3, 299, 299]}, + "num_classes": 1000, + "compression": {"algorithm": "magnitude_sparsity"}, + } + ) + config_factory = ConfigFactory(config, tmp_path / "config.json") - if not torch.cuda.is_available(): - args["--cpu-only"] = True + onnx_path = os.path.join(str(tmp_path), "model.onnx") + args = {"--mode": "export", "--config": config_factory.serialize(), "--pretrained": "", "--to-onnx": onnx_path} - runner = Command(create_command_line(args, "classification"), env=ROOT_PYTHONPATH_ENV) - runner.run() - assert os.path.exists(onnx_path) + if not torch.cuda.is_available(): + args["--cpu-only"] = True + runner = Command(create_command_line(args, "classification"), env=ROOT_PYTHONPATH_ENV) + runner.run() + assert os.path.exists(onnx_path) -@pytest.mark.parametrize( - ("algo", "ref_weight_decay"), - (("rb_sparsity", 0), ("const_sparsity", 1e-4), ("magnitude_sparsity", 1e-4), ("quantization", 1e-4)), -) -def test_get_default_weight_decay(algo, ref_weight_decay): - config = NNCFConfig() - config.update({"compression": {"algorithm": algo}}) - assert ref_weight_decay == get_default_weight_decay(config) + @staticmethod + @pytest.mark.parametrize( + ("algo", "ref_weight_decay"), + (("rb_sparsity", 0), ("const_sparsity", 1e-4), ("magnitude_sparsity", 1e-4), ("quantization", 1e-4)), + ) + def test_get_default_weight_decay(algo, ref_weight_decay): + config = NNCFConfig() + config.update({"compression": {"algorithm": algo}}) + assert ref_weight_decay == get_default_weight_decay(config) + + @staticmethod + def test_cpu_only_mode_produces_cpu_only_model(config, tmp_path, mocker): + config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") + args = { + "--data": config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--epochs": 1, + "--cpu-only": True, + } + # to prevent starting a not closed mlflow session due to memory leak of config and SafeMLFLow happens with a + # mocked train function + mocker.patch("examples.torch.common.utils.SafeMLFLow") + arg_list = arg_list_from_arg_dict(args) + if config["sample_type"] == "classification": + import examples.torch.classification.main as sample -@contextmanager -def set_num_threads_locally(n=1): - old_n = torch.get_num_threads() - try: - torch.set_num_threads(n) - yield - finally: - torch.set_num_threads(old_n) + if is_staged_quantization(config["sample_config"]): + mocker.patch("examples.torch.classification.staged_quantization_worker.train_epoch_staged") + mocker.patch("examples.torch.classification.staged_quantization_worker.validate") + import examples.torch.classification.staged_quantization_worker as staged_worker + staged_worker.validate.return_value = (0, 0, 0) + else: + mocker.patch("examples.torch.classification.main.train_epoch") + mocker.patch("examples.torch.classification.main.validate") + sample.validate.return_value = (0, 0, 0) + elif config["sample_type"] == "semantic_segmentation": + import examples.torch.semantic_segmentation.main as sample + import examples.torch.semantic_segmentation.train + + mocker.spy(examples.torch.semantic_segmentation.train.Train, "__init__") + elif config["sample_type"] == "object_detection": + import examples.torch.object_detection.main as sample + + mocker.spy(sample, "train") + + # Set number of threads = 1 to avoid hang for UNet (ticket 100106). + # Potentially it might happen when OpenMP is used before fork. + # The relevant thread: https://github.com/pytorch/pytorch/issues/91547 + with set_num_threads_locally(1) if config["sample_type"] == "semantic_segmentation" else nullcontext(): + sample.main(arg_list) + + # pylint: disable=no-member + if config["sample_type"] == "classification": + if is_staged_quantization(config["sample_config"]): + import examples.torch.classification.staged_quantization_worker as staged_worker + + model_to_be_trained = staged_worker.train_epoch_staged.call_args[0][2] # model + else: + model_to_be_trained = sample.train_epoch.call_args[0][1] # model + elif config["sample_type"] == "semantic_segmentation": + model_to_be_trained = examples.torch.semantic_segmentation.train.Train.__init__.call_args[0][1] # model + elif config["sample_type"] == "object_detection": + model_to_be_trained = sample.train.call_args[0][0] # net + + for p in model_to_be_trained.parameters(): + assert not p.is_cuda + + @staticmethod + @pytest.mark.parametrize("target_device", [x.value for x in HWConfigType]) + def test_sample_propagates_target_device_cl_param_to_nncf_config(mocker, tmp_path, target_device): + config_dict = { + "input_info": { + "sample_size": [1, 1, 32, 32], + }, + "compression": {"algorithm": "quantization"}, + } + config_factory = ConfigFactory(config_dict, tmp_path / "config.json") + args = { + "--data": str(tmp_path), + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": 1, + "--target-device": target_device, + } + if not torch.cuda.is_available(): + args["--cpu-only"] = True -def test_cpu_only_mode_produces_cpu_only_model(config, tmp_path, mocker): - config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") - args = { - "--data": config["dataset_path"], - "--config": config_factory.serialize(), - "--log-dir": tmp_path, - "--batch-size": config["batch_size"] * NUM_DEVICES, - "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue - "--epochs": 1, - "--cpu-only": True, - } - - # to prevent starting a not closed mlflow session due to memory leak of config and SafeMLFLow happens with a - # mocked train function - mocker.patch("examples.torch.common.utils.SafeMLFLow") - arg_list = arg_list_from_arg_dict(args) - if config["sample_type"] == "classification": + arg_list = arg_list_from_arg_dict(args) import examples.torch.classification.main as sample - if is_staged_quantization(config["sample_config"]): - mocker.patch("examples.torch.classification.staged_quantization_worker.train_epoch_staged") - mocker.patch("examples.torch.classification.staged_quantization_worker.validate") - import examples.torch.classification.staged_quantization_worker as staged_worker - - staged_worker.validate.return_value = (0, 0, 0) - else: - mocker.patch("examples.torch.classification.main.train_epoch") - mocker.patch("examples.torch.classification.main.validate") - sample.validate.return_value = (0, 0, 0) - elif config["sample_type"] == "semantic_segmentation": - import examples.torch.semantic_segmentation.main as sample - import examples.torch.semantic_segmentation.train - - mocker.spy(examples.torch.semantic_segmentation.train.Train, "__init__") - elif config["sample_type"] == "object_detection": - import examples.torch.object_detection.main as sample - - mocker.spy(sample, "train") - - # Set number of threads = 1 to avoid hang for UNet (ticket 100106). - # Potentially it might happen when OpenMP is used before fork. - # The relevant thread: https://github.com/pytorch/pytorch/issues/91547 - with set_num_threads_locally(1) if config["sample_type"] == "semantic_segmentation" else nullcontext(): + start_worker_mock = mocker.patch("examples.torch.classification.main.start_worker") sample.main(arg_list) - # pylint: disable=no-member - if config["sample_type"] == "classification": - if is_staged_quantization(config["sample_config"]): - import examples.torch.classification.staged_quantization_worker as staged_worker + config = start_worker_mock.call_args[0][1].nncf_config + assert config["target_device"] == target_device - model_to_be_trained = staged_worker.train_epoch_staged.call_args[0][2] # model - else: - model_to_be_trained = sample.train_epoch.call_args[0][1] # model - elif config["sample_type"] == "semantic_segmentation": - model_to_be_trained = examples.torch.semantic_segmentation.train.Train.__init__.call_args[0][1] # model - elif config["sample_type"] == "object_detection": - model_to_be_trained = sample.train.call_args[0][0] # net - - for p in model_to_be_trained.parameters(): - assert not p.is_cuda - - -@pytest.mark.parametrize("target_device", [x.value for x in HWConfigType]) -def test_sample_propagates_target_device_cl_param_to_nncf_config(mocker, tmp_path, target_device): - config_dict = { - "input_info": { - "sample_size": [1, 1, 32, 32], - }, - "compression": {"algorithm": "quantization"}, - } - config_factory = ConfigFactory(config_dict, tmp_path / "config.json") - args = { - "--data": str(tmp_path), - "--config": config_factory.serialize(), - "--log-dir": tmp_path, - "--batch-size": 1, - "--target-device": target_device, - } - if not torch.cuda.is_available(): - args["--cpu-only"] = True - - arg_list = arg_list_from_arg_dict(args) - import examples.torch.classification.main as sample - - start_worker_mock = mocker.patch("examples.torch.classification.main.start_worker") - sample.main(arg_list) - - config = start_worker_mock.call_args[0][1].nncf_config - assert config["target_device"] == target_device - - -@pytest.fixture( - name="accuracy_aware_config", - params=[ - TEST_ROOT / "torch" / "data" / "configs" / "resnet18_pruning_accuracy_aware.json", - TEST_ROOT / "torch" / "data" / "configs" / "resnet18_int8_accuracy_aware.json", - ], -) -def fixture_accuracy_aware_config(request): - config_path = request.param - with config_path.open() as f: - jconfig = json.load(f) - - dataset_name = "mock_32x32" - dataset_path = os.path.join("/tmp", "mock_32x32") - sample_type = "classification" - - jconfig["dataset"] = dataset_name - - return { - "sample_type": sample_type, - "sample_config": jconfig, - "model_name": jconfig["model"], - "dataset_path": dataset_path, - "batch_size": 12, - } - - -@pytest.mark.dependency() -@pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) -def test_accuracy_aware_training_pipeline(accuracy_aware_config, tmp_path, multiprocessing_distributed): - config_factory = ConfigFactory(accuracy_aware_config["sample_config"], tmp_path / "config.json") - log_dir = tmp_path / "accuracy_aware" - log_dir = log_dir / "distributed" if multiprocessing_distributed else log_dir / "dataparallel" - - args = { - "--mode": "train", - "--data": accuracy_aware_config["dataset_path"], - "--config": config_factory.serialize(), - "--log-dir": log_dir, - "--batch-size": accuracy_aware_config["batch_size"] * NUM_DEVICES, - "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue - "--epochs": 2, - "--dist-url": "tcp://127.0.0.1:8989", - } - - if not torch.cuda.is_available(): - args["--cpu-only"] = True - elif multiprocessing_distributed: - args["--multiprocessing-distributed"] = True - - runner = Command(create_command_line(args, accuracy_aware_config["sample_type"]), env=ROOT_PYTHONPATH_ENV) - runner.run() - - checkpoint_save_dir = log_dir / get_run_name(config_factory.config) - aa_checkpoint_path = get_accuracy_aware_checkpoint_dir_path(checkpoint_save_dir) - last_checkpoint_path = aa_checkpoint_path / "acc_aware_checkpoint_last.pth" - - assert last_checkpoint_path.exists() - if "compression" in accuracy_aware_config["sample_config"]: - allowed_compression_stages = (CompressionStage.FULLY_COMPRESSED, CompressionStage.PARTIALLY_COMPRESSED) - else: - allowed_compression_stages = (CompressionStage.UNCOMPRESSED,) - compression_stage = extract_compression_stage_from_checkpoint(str(last_checkpoint_path)) - assert compression_stage in allowed_compression_stages + @staticmethod + @pytest.fixture( + name="accuracy_aware_config", + params=[ + TEST_ROOT / "torch" / "data" / "configs" / "resnet18_pruning_accuracy_aware.json", + TEST_ROOT / "torch" / "data" / "configs" / "resnet18_int8_accuracy_aware.json", + ], + ) + def fixture_accuracy_aware_config(request): + config_path = request.param + with config_path.open() as f: + jconfig = json.load(f) + + dataset_name = "mock_32x32" + dataset_path = os.path.join("/tmp", "mock_32x32") + sample_type = "classification" + + jconfig["dataset"] = dataset_name + + return { + "sample_type": sample_type, + "sample_config": jconfig, + "model_name": jconfig["model"], + "dataset_path": dataset_path, + "batch_size": 12, + } + @staticmethod + @pytest.mark.dependency() + @pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) + def test_accuracy_aware_training_pipeline(accuracy_aware_config, tmp_path, multiprocessing_distributed): + config_factory = ConfigFactory(accuracy_aware_config["sample_config"], tmp_path / "config.json") + log_dir = tmp_path / "accuracy_aware" + log_dir = log_dir / "distributed" if multiprocessing_distributed else log_dir / "dataparallel" + + args = { + "--mode": "train", + "--data": accuracy_aware_config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": log_dir, + "--batch-size": accuracy_aware_config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--epochs": 2, + "--dist-url": "tcp://127.0.0.1:8989", + } -@pytest.mark.parametrize("sample_type", SAMPLE_TYPES) -def test_eval_only_config_fails_to_train(tmp_path, sample_type): - config_factory = ConfigFactory( - {"model": "mock", "input_infos": {"sample_size": [1, 1, 1, 1]}, "eval_only": True}, tmp_path / "config.json" - ) - args = { - "--mode": "train", - "--config": config_factory.serialize(), - } - - runner = Command(create_command_line(args, sample_type), env=ROOT_PYTHONPATH_ENV) - return_code = runner.run(assert_returncode_zero=False) - assert return_code != 0 - assert remove_line_breaks(EVAL_ONLY_ERROR_TEXT) in remove_line_breaks("".join(runner.output)) + if not torch.cuda.is_available(): + args["--cpu-only"] = True + elif multiprocessing_distributed: + args["--multiprocessing-distributed"] = True + + runner = Command(create_command_line(args, accuracy_aware_config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + _run_with_xfail_119128(runner) + + checkpoint_save_dir = log_dir / get_run_name(config_factory.config) + aa_checkpoint_path = get_accuracy_aware_checkpoint_dir_path(checkpoint_save_dir) + last_checkpoint_path = aa_checkpoint_path / "acc_aware_checkpoint_last.pth" + + assert last_checkpoint_path.exists() + if "compression" in accuracy_aware_config["sample_config"]: + allowed_compression_stages = (CompressionStage.FULLY_COMPRESSED, CompressionStage.PARTIALLY_COMPRESSED) + else: + allowed_compression_stages = (CompressionStage.UNCOMPRESSED,) + compression_stage = extract_compression_stage_from_checkpoint(str(last_checkpoint_path)) + assert compression_stage in allowed_compression_stages + + @staticmethod + @pytest.mark.parametrize("sample_type", SAMPLE_TYPES) + def test_eval_only_config_fails_to_train(tmp_path, sample_type): + config_factory = ConfigFactory( + {"model": "mock", "input_infos": {"sample_size": [1, 1, 1, 1]}, "eval_only": True}, tmp_path / "config.json" + ) + args = { + "--mode": "train", + "--config": config_factory.serialize(), + } + + runner = Command(create_command_line(args, sample_type), env=ROOT_PYTHONPATH_ENV) + return_code = runner.run(assert_returncode_zero=False) + assert return_code != 0 + assert remove_line_breaks(EVAL_ONLY_ERROR_TEXT) in remove_line_breaks("".join(runner.output))