Skip to content

Commit

Permalink
Improve test coverage of compression utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
ghiggi committed Dec 2, 2023
1 parent 5db1e7a commit 00133c7
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 77 deletions.
4 changes: 2 additions & 2 deletions disdrodb/data_transfer/download_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from disdrodb.api.path import define_metadata_filepath
from disdrodb.configs import get_base_dir
from disdrodb.metadata import get_list_metadata
from disdrodb.utils.compression import _unzip_file
from disdrodb.utils.compression import unzip_file
from disdrodb.utils.directories import is_empty_directory
from disdrodb.utils.yaml import read_yaml

Expand Down Expand Up @@ -229,7 +229,7 @@ def _select_metadata_with_remote_data_url(metadata_filepaths: List[str]) -> List

def _extract_station_files(zip_filepath, station_dir):
"""Extract files from the station.zip file and remove the station.zip file."""
_unzip_file(filepath=zip_filepath, dest_path=station_dir)
unzip_file(filepath=zip_filepath, dest_path=station_dir)
if os.path.exists(zip_filepath):
os.remove(zip_filepath)

Expand Down
84 changes: 60 additions & 24 deletions disdrodb/tests/test_utils/test_utils_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,23 @@

import pytest

from disdrodb.utils.compression import _unzip_file, _zip_dir, compress_station_files
from disdrodb.tests.conftest import create_fake_raw_data_file
from disdrodb.utils.compression import _zip_dir, compress_station_files, unzip_file


def create_fake_data_dir(base_dir, data_source, campaign_name, station_name):
"""Create a station data directory with files inside it.
station_name
|-- dir1
|-- 2020
|-- file1.txt
|-- dir1
|-- Jan
|-- file2.txt
"""

data_dir = base_dir / "Raw" / data_source / campaign_name / "data" / station_name
dir1 = data_dir / "dir1"
dir2 = dir1 / "dir2"
dir1 = data_dir / "2020"
dir2 = dir1 / "Jan"
if not dir2.exists():
dir2.mkdir(parents=True)

Expand All @@ -48,54 +49,89 @@ def create_fake_data_dir(base_dir, data_source, campaign_name, station_name):
file2_txt.touch()


def test_files_compression(tmp_path):
@pytest.mark.parametrize("method", ["zip", "gzip", "bzip2"])
def test_files_compression(tmp_path, method):
"""Test compression of files in a directory."""

base_dir = tmp_path / "DISDRODB"
data_source = "test_data_source"
campaign_name = "test_campaign_name"
station_name = "station_name"

# Directory that does not exist yet
compress_station_files(base_dir, data_source, campaign_name, "station1", "zip")

methods = ["zip", "gzip", "bzip2"]
for i, method in enumerate(methods):
station_name = f"test_station_name_{i}"
create_fake_data_dir(
# Check raise an error if the directory does not yet exist
with pytest.raises(ValueError):
compress_station_files(
base_dir=base_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
method=method,
)

# Create fake data
create_fake_data_dir(
base_dir=base_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
)

# Compress files
compress_station_files(
base_dir=base_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
method=method,
)

# Try to compress directory with already compressed files (skip=True)
compress_station_files(
base_dir=base_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
method=method,
skip=True,
)

# Try to compress directory with already compressed files (skip=False)
with pytest.raises(ValueError):
compress_station_files(
base_dir=base_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
method=method,
skip=False,
)

# Directory with already compressed files
station_name = "test_station_name_0"
compress_station_files(
# Try to compress with invalid method
with pytest.raises(ValueError):
compress_station_files(
base_dir=base_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
method="unknown_compression_method",
)

# Try to compress a netCDF file
create_fake_raw_data_file(
base_dir=base_dir,
product="RAW",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
method="zip",
)

station_name = "test_station_name"
create_fake_data_dir(
base_dir=base_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name
filename="test_data.nc",
)
with pytest.raises(ValueError):
compress_station_files(
base_dir=base_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
method="unknown_compression_method",
method=method,
)


Expand All @@ -109,5 +145,5 @@ def test_zip_unzip_directory(tmp_path):
assert os.path.isfile(zip_path)

unzip_path = tmp_path / "test_dir_unzipped"
_unzip_file(zip_path, unzip_path)
unzip_file(zip_path, unzip_path)
assert os.path.isdir(unzip_path)
110 changes: 59 additions & 51 deletions disdrodb/utils/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@
from disdrodb.api.path import define_station_dir
from disdrodb.utils.directories import list_files

COMPRESSION_OPTIONS = {
"zip": ".zip",
"gzip": ".gz",
"bzip2": ".bz2",
}

def _unzip_file(filepath: str, dest_path: str) -> None:

def unzip_file(filepath: str, dest_path: str) -> None:
"""Unzip a file into a directory
Parameters
Expand Down Expand Up @@ -67,8 +73,31 @@ def _zip_dir(dir_path: str) -> str:
return output_path


def compress_station_files(base_dir: str, data_source: str, campaign_name: str, station_name: str, method: str) -> None:
"""Compress all files of a station.
def archive_station_data(metadata_filepath: str) -> str:
"""Archive station data into a zip file for subsequent data upload.
It create a zip file into a temporary directory !
Parameters
----------
metadata_filepath: str
Metadata file path.
Returns
-------
station_zip_filepath
Filepath of the zip file containing the station's data.
"""
station_data_path = metadata_filepath.replace("metadata", "data")
station_data_path = os.path.splitext(station_data_path)[0] # remove trailing ".yml"
station_zip_filepath = _zip_dir(station_data_path)
return station_zip_filepath


def compress_station_files(
base_dir: str, data_source: str, campaign_name: str, station_name: str, method: str = "gzip", skip: bool = True
) -> None:
"""Compress each raw file of a station.
Parameters
----------
Expand All @@ -82,8 +111,14 @@ def compress_station_files(base_dir: str, data_source: str, campaign_name: str,
Station name of interest.
method : str
Compression method. "zip", "gzip" or "bzip2".
skip : bool
Whether to raise an error if a file is already compressed.
If True, it does not raise an error and try to compress the other files.
If False, it raise an error and stop the compression routine.
THe default is True.
"""
if method not in COMPRESSION_OPTIONS:
raise ValueError(f"Invalid compression method {method}. Valid methods are {list(COMPRESSION_OPTIONS.keys())}")

base_dir = check_base_dir(base_dir)
station_dir = define_station_dir(
Expand All @@ -95,18 +130,18 @@ def compress_station_files(base_dir: str, data_source: str, campaign_name: str,
check_exists=False,
)
if not os.path.isdir(station_dir):
print(f"Station data directory {station_dir} does not exist. Skipping.")
return
raise ValueError(f"Station data directory {station_dir} does not exist.")

# use glob to get list of files recursively
# Get list of files inside the station directory (in all nested directories)
filepaths = list_files(station_dir, glob_pattern="*", recursive=True)

for filepath in filepaths:
if os.path.isfile(filepath):
_compress_file(filepath, method)
_ = _compress_file(filepath, method, skip=skip)

print("All files of {data_source} {campaign_name} {station_name} have been compressed.")
print("Please now remember to update the glob_pattern of the reader ¨!")


def _compress_file(filepath: str, method: str) -> str:
def _compress_file(filepath: str, method: str, skip: bool) -> str:
"""Compress a file and delete the original.
If the file is already compressed, it is not compressed again.
Expand All @@ -115,31 +150,29 @@ def _compress_file(filepath: str, method: str) -> str:
----------
filepath : str
Path of the file to compress.
method : str
Compression method. None, "zip", "gzip" or "bzip2".
skip : bool
Whether to raise an error if a file is already compressed.
If True, it does not raise an error return the input filepath.
If False, it raise an error.
Returns
-------
str
Path of the compressed file. Same as input if no compression.
"""
if filepath.endswith(".nc") or filepath.endswith(".netcdf4"):
raise ValueError("netCDF files must be not compressed !")

if _check_file_compression(filepath) is not None:
print(f"File {filepath} is already compressed. Skipping.")
return filepath
if skip:
print(f"File {filepath} is already compressed. Skipping.")
return filepath
else:
raise ValueError(f"File {filepath} is already compressed !")

valid_extensions = {
"zip": ".zip",
"gzip": ".gz",
"bzip2": ".bz2",
}

if method not in valid_extensions:
raise ValueError(f"Invalid compression method {method}. Valid methods are {list(valid_extensions.keys())}")

extension = valid_extensions[method]
extension = COMPRESSION_OPTIONS[method]
archive_name = os.path.basename(filepath) + extension
compressed_filepath = os.path.join(os.path.dirname(filepath), archive_name)
compress_file_function = {
Expand All @@ -155,7 +188,7 @@ def _compress_file(filepath: str, method: str) -> str:


def _check_file_compression(filepath: str) -> Optional[str]:
"""Check the method used to compress a file.
"""Check the method used to compress a raw text file.
From https://stackoverflow.com/questions/13044562/python-mechanism-to-identify-compressed-file-type-and-uncompress
Expand All @@ -164,20 +197,17 @@ def _check_file_compression(filepath: str) -> Optional[str]:
filepath : str
Path of the file to check.
Returns
-------
Optional[str]
Compression method. None, "zip", "gzip" or "bzip2".
"""

magic_dict = {
b"\x1f\x8b\x08": "gzip",
b"\x42\x5a\x68": "bzip2",
b"\x50\x4b\x03\x04": "zip",
}

with open(filepath, "rb") as f:
file_start = f.read(4)
for magic, filetype in magic_dict.items():
Expand Down Expand Up @@ -238,25 +268,3 @@ def _compress_file_bzip2(filepath: str, compressed_filepath: str) -> None:
with open(filepath, "rb") as f_in:
with bz2.open(compressed_filepath, "wb") as f_out:
f_out.writelines(f_in)


def archive_station_data(metadata_filepath: str) -> str:
"""Archive station data into a zip file (based on metadata filepath).
It create a zip file into a temporary directory !
Parameters
----------
metadata_filepath: str
Metadata file path.
Returns
-------
station_zip_filepath
Filepath of the zip file containing the station's data.
"""

station_data_path = metadata_filepath.replace("metadata", "data")
station_data_path = os.path.splitext(station_data_path)[0] # remove trailing ".yml"
station_zip_filepath = _zip_dir(station_data_path)
return station_zip_filepath

0 comments on commit 00133c7

Please sign in to comment.