diff --git a/disdrodb/data_transfer/download_data.py b/disdrodb/data_transfer/download_data.py index 94a0f6a4..5fb5e566 100644 --- a/disdrodb/data_transfer/download_data.py +++ b/disdrodb/data_transfer/download_data.py @@ -29,7 +29,7 @@ from disdrodb.api.path import define_metadata_filepath from disdrodb.configs import get_base_dir from disdrodb.metadata import get_list_metadata -from disdrodb.utils.compression import _unzip_file +from disdrodb.utils.compression import unzip_file from disdrodb.utils.directories import is_empty_directory from disdrodb.utils.yaml import read_yaml @@ -229,7 +229,7 @@ def _select_metadata_with_remote_data_url(metadata_filepaths: List[str]) -> List def _extract_station_files(zip_filepath, station_dir): """Extract files from the station.zip file and remove the station.zip file.""" - _unzip_file(filepath=zip_filepath, dest_path=station_dir) + unzip_file(filepath=zip_filepath, dest_path=station_dir) if os.path.exists(zip_filepath): os.remove(zip_filepath) diff --git a/disdrodb/tests/test_utils/test_utils_compression.py b/disdrodb/tests/test_utils/test_utils_compression.py index df458f15..b77379a7 100644 --- a/disdrodb/tests/test_utils/test_utils_compression.py +++ b/disdrodb/tests/test_utils/test_utils_compression.py @@ -23,22 +23,23 @@ import pytest -from disdrodb.utils.compression import _unzip_file, _zip_dir, compress_station_files +from disdrodb.tests.conftest import create_fake_raw_data_file +from disdrodb.utils.compression import _zip_dir, compress_station_files, unzip_file def create_fake_data_dir(base_dir, data_source, campaign_name, station_name): """Create a station data directory with files inside it. station_name - |-- dir1 + |-- 2020 |-- file1.txt - |-- dir1 + |-- Jan |-- file2.txt """ data_dir = base_dir / "Raw" / data_source / campaign_name / "data" / station_name - dir1 = data_dir / "dir1" - dir2 = dir1 / "dir2" + dir1 = data_dir / "2020" + dir2 = dir1 / "Jan" if not dir2.exists(): dir2.mkdir(parents=True) @@ -48,46 +49,81 @@ def create_fake_data_dir(base_dir, data_source, campaign_name, station_name): file2_txt.touch() -def test_files_compression(tmp_path): +@pytest.mark.parametrize("method", ["zip", "gzip", "bzip2"]) +def test_files_compression(tmp_path, method): """Test compression of files in a directory.""" base_dir = tmp_path / "DISDRODB" data_source = "test_data_source" campaign_name = "test_campaign_name" + station_name = "station_name" - # Directory that does not exist yet - compress_station_files(base_dir, data_source, campaign_name, "station1", "zip") - - methods = ["zip", "gzip", "bzip2"] - for i, method in enumerate(methods): - station_name = f"test_station_name_{i}" - create_fake_data_dir( + # Check raise an error if the directory does not yet exist + with pytest.raises(ValueError): + compress_station_files( base_dir=base_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name, + method=method, ) + + # Create fake data + create_fake_data_dir( + base_dir=base_dir, + data_source=data_source, + campaign_name=campaign_name, + station_name=station_name, + ) + + # Compress files + compress_station_files( + base_dir=base_dir, + data_source=data_source, + campaign_name=campaign_name, + station_name=station_name, + method=method, + ) + + # Try to compress directory with already compressed files (skip=True) + compress_station_files( + base_dir=base_dir, + data_source=data_source, + campaign_name=campaign_name, + station_name=station_name, + method=method, + skip=True, + ) + + # Try to compress directory with already compressed files (skip=False) + with pytest.raises(ValueError): compress_station_files( base_dir=base_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name, method=method, + skip=False, ) - # Directory with already compressed files - station_name = "test_station_name_0" - compress_station_files( + # Try to compress with invalid method + with pytest.raises(ValueError): + compress_station_files( + base_dir=base_dir, + data_source=data_source, + campaign_name=campaign_name, + station_name=station_name, + method="unknown_compression_method", + ) + + # Try to compress a netCDF file + create_fake_raw_data_file( base_dir=base_dir, + product="RAW", data_source=data_source, campaign_name=campaign_name, station_name=station_name, - method="zip", - ) - - station_name = "test_station_name" - create_fake_data_dir( - base_dir=base_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name + filename="test_data.nc", ) with pytest.raises(ValueError): compress_station_files( @@ -95,7 +131,7 @@ def test_files_compression(tmp_path): data_source=data_source, campaign_name=campaign_name, station_name=station_name, - method="unknown_compression_method", + method=method, ) @@ -109,5 +145,5 @@ def test_zip_unzip_directory(tmp_path): assert os.path.isfile(zip_path) unzip_path = tmp_path / "test_dir_unzipped" - _unzip_file(zip_path, unzip_path) + unzip_file(zip_path, unzip_path) assert os.path.isdir(unzip_path) diff --git a/disdrodb/utils/compression.py b/disdrodb/utils/compression.py index 9c49a9c0..aef7ed9e 100644 --- a/disdrodb/utils/compression.py +++ b/disdrodb/utils/compression.py @@ -30,8 +30,14 @@ from disdrodb.api.path import define_station_dir from disdrodb.utils.directories import list_files +COMPRESSION_OPTIONS = { + "zip": ".zip", + "gzip": ".gz", + "bzip2": ".bz2", +} -def _unzip_file(filepath: str, dest_path: str) -> None: + +def unzip_file(filepath: str, dest_path: str) -> None: """Unzip a file into a directory Parameters @@ -67,8 +73,31 @@ def _zip_dir(dir_path: str) -> str: return output_path -def compress_station_files(base_dir: str, data_source: str, campaign_name: str, station_name: str, method: str) -> None: - """Compress all files of a station. +def archive_station_data(metadata_filepath: str) -> str: + """Archive station data into a zip file for subsequent data upload. + + It create a zip file into a temporary directory ! + + Parameters + ---------- + metadata_filepath: str + Metadata file path. + + Returns + ------- + station_zip_filepath + Filepath of the zip file containing the station's data. + """ + station_data_path = metadata_filepath.replace("metadata", "data") + station_data_path = os.path.splitext(station_data_path)[0] # remove trailing ".yml" + station_zip_filepath = _zip_dir(station_data_path) + return station_zip_filepath + + +def compress_station_files( + base_dir: str, data_source: str, campaign_name: str, station_name: str, method: str = "gzip", skip: bool = True +) -> None: + """Compress each raw file of a station. Parameters ---------- @@ -82,8 +111,14 @@ def compress_station_files(base_dir: str, data_source: str, campaign_name: str, Station name of interest. method : str Compression method. "zip", "gzip" or "bzip2". - + skip : bool + Whether to raise an error if a file is already compressed. + If True, it does not raise an error and try to compress the other files. + If False, it raise an error and stop the compression routine. + THe default is True. """ + if method not in COMPRESSION_OPTIONS: + raise ValueError(f"Invalid compression method {method}. Valid methods are {list(COMPRESSION_OPTIONS.keys())}") base_dir = check_base_dir(base_dir) station_dir = define_station_dir( @@ -95,18 +130,18 @@ def compress_station_files(base_dir: str, data_source: str, campaign_name: str, check_exists=False, ) if not os.path.isdir(station_dir): - print(f"Station data directory {station_dir} does not exist. Skipping.") - return + raise ValueError(f"Station data directory {station_dir} does not exist.") - # use glob to get list of files recursively + # Get list of files inside the station directory (in all nested directories) filepaths = list_files(station_dir, glob_pattern="*", recursive=True) - for filepath in filepaths: - if os.path.isfile(filepath): - _compress_file(filepath, method) + _ = _compress_file(filepath, method, skip=skip) + + print("All files of {data_source} {campaign_name} {station_name} have been compressed.") + print("Please now remember to update the glob_pattern of the reader ยจ!") -def _compress_file(filepath: str, method: str) -> str: +def _compress_file(filepath: str, method: str, skip: bool) -> str: """Compress a file and delete the original. If the file is already compressed, it is not compressed again. @@ -115,31 +150,29 @@ def _compress_file(filepath: str, method: str) -> str: ---------- filepath : str Path of the file to compress. - method : str Compression method. None, "zip", "gzip" or "bzip2". - + skip : bool + Whether to raise an error if a file is already compressed. + If True, it does not raise an error return the input filepath. + If False, it raise an error. Returns ------- str Path of the compressed file. Same as input if no compression. """ + if filepath.endswith(".nc") or filepath.endswith(".netcdf4"): + raise ValueError("netCDF files must be not compressed !") if _check_file_compression(filepath) is not None: - print(f"File {filepath} is already compressed. Skipping.") - return filepath + if skip: + print(f"File {filepath} is already compressed. Skipping.") + return filepath + else: + raise ValueError(f"File {filepath} is already compressed !") - valid_extensions = { - "zip": ".zip", - "gzip": ".gz", - "bzip2": ".bz2", - } - - if method not in valid_extensions: - raise ValueError(f"Invalid compression method {method}. Valid methods are {list(valid_extensions.keys())}") - - extension = valid_extensions[method] + extension = COMPRESSION_OPTIONS[method] archive_name = os.path.basename(filepath) + extension compressed_filepath = os.path.join(os.path.dirname(filepath), archive_name) compress_file_function = { @@ -155,7 +188,7 @@ def _compress_file(filepath: str, method: str) -> str: def _check_file_compression(filepath: str) -> Optional[str]: - """Check the method used to compress a file. + """Check the method used to compress a raw text file. From https://stackoverflow.com/questions/13044562/python-mechanism-to-identify-compressed-file-type-and-uncompress @@ -164,20 +197,17 @@ def _check_file_compression(filepath: str) -> Optional[str]: filepath : str Path of the file to check. - Returns ------- Optional[str] Compression method. None, "zip", "gzip" or "bzip2". """ - magic_dict = { b"\x1f\x8b\x08": "gzip", b"\x42\x5a\x68": "bzip2", b"\x50\x4b\x03\x04": "zip", } - with open(filepath, "rb") as f: file_start = f.read(4) for magic, filetype in magic_dict.items(): @@ -238,25 +268,3 @@ def _compress_file_bzip2(filepath: str, compressed_filepath: str) -> None: with open(filepath, "rb") as f_in: with bz2.open(compressed_filepath, "wb") as f_out: f_out.writelines(f_in) - - -def archive_station_data(metadata_filepath: str) -> str: - """Archive station data into a zip file (based on metadata filepath). - - It create a zip file into a temporary directory ! - - Parameters - ---------- - metadata_filepath: str - Metadata file path. - - Returns - ------- - station_zip_filepath - Filepath of the zip file containing the station's data. - """ - - station_data_path = metadata_filepath.replace("metadata", "data") - station_data_path = os.path.splitext(station_data_path)[0] # remove trailing ".yml" - station_zip_filepath = _zip_dir(station_data_path) - return station_zip_filepath