Skip to content

Commit

Permalink
Merge pull request #254 from ImperialCollegeLondon/maccor-poarser-fil…
Browse files Browse the repository at this point in the history
…e-formats

Maccor parser file formats
  • Loading branch information
dandavies99 authored Jan 26, 2023
2 parents 645cb8e + d1fc53f commit 4a605c4
Show file tree
Hide file tree
Showing 5 changed files with 11,938 additions and 591 deletions.
119 changes: 102 additions & 17 deletions parsing_engines/maccor_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class MaccorParsingEngine(ParsingEngineBase):
("application/CDFV2", ".xls"),
("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ".xlsx"),
("application/zip", ".xlsx"),
("text/plain", ".csv"),
("text/plain", ".txt"),
]
mandatory_columns: Dict[str, Dict[str, Union[str, Tuple[str, str]]]] = {
"Rec#": dict(symbol="Rec", unit=("Unitless", "1")),
Expand All @@ -50,28 +52,33 @@ class MaccorParsingEngine(ParsingEngineBase):
@classmethod
def factory(cls, file_obj: TextIO) -> ParsingEngineBase:
"""Factory method for creating a parsing engine.
Different functions/methods are used depending on the file extension.
Args:
file_obj (TextIO): File to parse.
"""
column_name_mapping = MACCOR_COLUMN_MAPPING
ext = os.path.splitext(file_obj.name)[1]
if ext == ".xls":
if ext in [".csv", ".txt"]:
skip_rows = get_header_size_csv(file_obj, set(cls.mandatory_columns.keys()))
data = load_maccor_data_csv(file_obj, skip_rows)
file_metadata = get_file_header_csv(file_obj, skip_rows)
return cls(file_obj, skip_rows, data, file_metadata, column_name_mapping)
elif ext not in [".xls", ".xlsx"]:
raise UnsupportedFileTypeError(
f"Unknown file extension for Maccor parsing engine '{ext}'."
)
elif ext == ".xls":
sheet, datemode = factory_xls(file_obj)
if sheet.ncols < 1 or sheet.nrows < 2:
raise EmptyFileError()
elif ext == ".xlsx":
sheet, datemode = factory_xlsx(file_obj)
if sheet.max_column < 1 or sheet.max_row < 2:
raise EmptyFileError()
else:
raise UnsupportedFileTypeError(
f"Unknown file extension for Maccor parsing engine '{ext}'."
)

skip_rows = get_header_size(sheet, set(cls.mandatory_columns.keys()))
data = load_maccor_data(file_obj, skip_rows)
file_metadata = get_file_header(sheet, skip_rows, datemode)
skip_rows = get_header_size_excel(sheet, set(cls.mandatory_columns.keys()))
data = load_maccor_data_excel(file_obj, skip_rows)
file_metadata = get_file_header_excel(sheet, skip_rows, datemode)
return cls(file_obj, skip_rows, data, file_metadata, column_name_mapping)


Expand Down Expand Up @@ -103,7 +110,7 @@ def factory_xlsx(file_obj: TextIO) -> Tuple[Union[Sheet, Worksheet], Optional[in
return book.active, None


def get_file_header(
def get_file_header_excel(
sheet: Union[Sheet, Worksheet], skip_rows: int, datemode: Optional[int]
) -> Dict[str, Any]:
"""Extracts the header from the Maccor file.
Expand Down Expand Up @@ -134,7 +141,28 @@ def get_file_header(
return header


def get_header_size(sheet: Union[Sheet, Worksheet], columns: Set) -> int:
def get_file_header_csv(file_obj: TextIO, skip_rows: int) -> List[str]:
"""Extracts the header from the Maccor csv or txt file.
Args:
file_obj (TextIO): File to load the data from.
skip_rows (int): Location of the header, assumed equal to the number of rows to
skip.
Returns:
A list of rows for the header, without the termination characters and
empty lines.
"""
if skip_rows == 0:
return []
with file_obj.open("r") as f:
header = list(filter(len, (f.readline().rstrip() for _ in range(skip_rows))))
if type(header[0]) == bytes:
header = [i.decode("iso-8859-1") for i in header]
return header


def get_header_size_excel(sheet: Union[Sheet, Worksheet], columns: Set) -> int:
"""Reads the file and determines the size of the header.
Args:
Expand All @@ -146,12 +174,31 @@ def get_header_size(sheet: Union[Sheet, Worksheet], columns: Set) -> int:
int: The size of the header.
"""
for i, row in enumerate(sheet):
if not is_metadata_row(row, columns):
if not is_metadata_row(row, columns, "excel"):
return i
return 0


def load_maccor_data(file_obj: TextIO, skip_rows: int) -> pd.DataFrame:
def get_header_size_csv(file_obj: TextIO, columns: Set) -> int:
"""Reads the file and determines the size of the header.
Args:
file_obj (TextIO): File to load the data from.
columns (Set): Iterable with the elements to check that would identify
this as NOT a metadata row.
Returns:
int: The size of the header.
"""
file_obj.seek(0)
with file_obj.open("r") as f:
lines = iter([i.decode("iso-8859-1") for i in f.readlines()])
for i, line in enumerate(lines):
if not is_metadata_row(line, columns, "csv"):
return i


def load_maccor_data_excel(file_obj: TextIO, skip_rows: int) -> pd.DataFrame:
"""Loads the data as a Pandas data frame.
Args:
Expand All @@ -166,6 +213,38 @@ def load_maccor_data(file_obj: TextIO, skip_rows: int) -> pd.DataFrame:
data = pd.read_excel(file_obj, sheet_name=None, header=skip_rows, index_col=None)
if isinstance(data, dict):
data = pd.concat(data.values(), ignore_index=True)
return data


def load_maccor_data_csv(file_obj: TextIO, skip_rows: int) -> pd.DataFrame:
"""Loads the data as a Pandas data frame.
Args:
file_obj (TextIO): File to load.
skip_rows (int): Location of the header, assumed equal to the number of rows to
skip.
Returns:
pd.DataFrame: A pandas dataframe with all the data.
"""
kwargs = dict(skiprows=skip_rows, encoding="iso-8859-1", encoding_errors="replace")
file_obj.open("r")
try:
file_obj.seek(0)
data = pd.read_csv(file_obj, delimiter=",", **kwargs)
except pd.errors.ParserError:
try:
file_obj.seek(0)
data = pd.read_csv(file_obj, delimiter="\t", **kwargs)
except pd.errors.ParserError as err:
raise UnsupportedFileTypeError(err)

if len(data.columns) == 1:
file_obj.seek(0)
data = pd.read_csv(file_obj, delimiter="\t", **kwargs)

if len(data.columns) == 1:
raise UnsupportedFileTypeError()

return data

Expand All @@ -182,19 +261,25 @@ def clean_value(value: str) -> str:
return value.replace("''", "'").strip().rstrip("\0").strip()


def is_metadata_row(row: Iterable, withnesses: Iterable) -> bool:
def is_metadata_row(row: Iterable, indicators: Iterable, filetype: str) -> bool:
"""Checks if a row is a metadata row.
Args:
row (Iterable): Iterable with the row data to check
withnesses (Iterable): Iterable with the elements to check that would identify
indicators (Iterable): Iterable with the elements to check that would identify
this as NOT a metadata row.
filetype (Str): Either "excel" or "csv".
Returns:
bool: True if it is identified as a metadata row
"""
row_values = [y.value if hasattr(y, "value") else y for y in row]
return not any(x in withnesses for x in row_values)
if filetype == "excel":
row_values = [y.value if hasattr(y, "value") else y for y in row]
return not any(x in indicators for x in row_values)
elif filetype == "csv":
return not any(x in row for x in indicators)
else:
raise ValueError(f"Unrecognized filetype: {filetype}")


def get_metadata_value(
Expand Down
Loading

0 comments on commit 4a605c4

Please sign in to comment.