Skip to content

Commit

Permalink
refactor(collate-outputs): use -i daily, hourly, bouts, ... to specif…
Browse files Browse the repository at this point in the history
…y types of files to collate

Also other cleanups like code dedup.
  • Loading branch information
chanshing committed Oct 18, 2024
1 parent fa6551e commit b92e97c
Showing 1 changed file with 75 additions and 104 deletions.
179 changes: 75 additions & 104 deletions src/stepcount/cli_utils/collate_outputs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import argparse
import json
import os
from collections import OrderedDict

import pandas as pd
Expand All @@ -9,120 +8,96 @@


def collate_outputs(
results,
include_hourly=False,
include_minutely=False,
include_bouts=False,
outdir="collated_outputs/"
results_dir,
collated_results_dir="collated_outputs/",
included=["daily", "hourly", "minutely", "bouts"],
):
"""Collate all output files under <outdir> into one CSV file.
:param str outdir: Root directory from which to search for output files.
:param str outfile: Output CSV filename.
:return: New file written to <outfile>
"""Collate all results files in <results_dir>.
:param str results_dir: Root directory in which to search for result files.
:param str collated_results_dir: Directory to write the collated files to.
:param list included: Type of result files to collate ('daily', 'hourly', 'minutely', 'bouts').
:return: Collated files written to <collated_results_dir>
:rtype: void
"""

os.makedirs(outdir, exist_ok=True)
print("Searching files...")

# Find all relevant files under <outputs>/
# Find all relevant files under <results_dir>/
# - *-Info.json files contain the summary information
# - *-Daily.json files contain daily summaries
# - *-Hourly.json files contain hourly summaries
# - *-Minutely.json files contain minute-level summaries
# - *-Bouts.json files contain bout information
info_files = []
daily_files = []
hourly_files = []
minutes_files = []
dailyadj_files = []
hourlyadj_files = []
minutesadj_files = []
bouts_files = []

results = Path(results)

print("Searching files...")

for file in results.rglob('*'):
info_files = []
csv_files = {}

# lowercase the include list
included = [x.lower() for x in included]
if "daily" in included:
csv_files["Daily"] = []
csv_files["DailyAdjusted"] = []
if "hourly" in included:
csv_files["Hourly"] = []
csv_files["HourlyAdjusted"] = []
if "minutely" in included:
csv_files["Minutely"] = []
csv_files["MinutelyAdjusted"] = []
if "bouts" in included:
csv_files["Bouts"] = []

# Iterate through the files and append to the appropriate list based on the suffix
for file in Path(results_dir).rglob('*'):
if file.is_file():
if file.name.endswith("-Info.json"):
info_files.append(file)
if file.name.endswith("-Daily.csv.gz"):
daily_files.append(file)
if file.name.endswith("-Hourly.csv.gz"):
hourly_files.append(file)
if file.name.endswith("-Minutely.csv.gz"):
minutes_files.append(file)
if file.name.endswith("-DailyAdjusted.csv.gz"):
dailyadj_files.append(file)
if file.name.endswith("-HourlyAdjusted.csv.gz"):
hourlyadj_files.append(file)
if file.name.endswith("-MinutelyAdjusted.csv.gz"):
minutesadj_files.append(file)
if file.name.endswith("-Bouts.csv.gz"):
bouts_files.append(file)

outdir = Path(outdir)

print(f"Collating {len(info_files)} summary files...")
info = []
for file in tqdm(info_files):
for key, file_list in csv_files.items():
if file.name.endswith(f"-{key}.csv.gz"):
file_list.append(file)
break

collated_results_dir = Path(collated_results_dir)
collated_results_dir.mkdir(parents=True, exist_ok=True)

# Collate Info.json files
print(f"Collating {len(info_files)} Info files...")
outfile = collated_results_dir / "Info.csv.gz"
collate_jsons(info_files, outfile)
print('Collated info CSV written to', outfile)

# Collate the remaining files (Daily, Hourly, Minutely, Bouts, etc.)
for key, file_list in csv_files.items():
print(f"Collating {len(file_list)} {key} files...")
outfile = collated_results_dir / f"{key}.csv.gz"
collate_csvs(file_list, outfile)
print(f'Collated {key} CSV written to', outfile)

return


def collate_jsons(file_list, outfile, overwrite=True):
""" Collate a list of JSON files into a single CSV file."""

if overwrite and outfile.exists():
print(f"Overwriting existing file: {outfile}")
outfile.unlink() # remove existing file

df = []
for file in tqdm(file_list):
with open(file, 'r') as f:
info.append(json.load(f, object_pairs_hook=OrderedDict))
info = pd.DataFrame.from_dict(info) # merge to a dataframe
info = info.applymap(convert_ordereddict) # convert any OrderedDict cell values to regular dict
info_file = outdir / "Info.csv.gz"
info.to_csv(info_file, index=False)
print('Collated info CSV written to', info_file)

print(f"Collating {len(daily_files)} daily files...")
daily_csv = outdir / "Daily.csv.gz"
collate_to_csv(daily_files, daily_csv)
print('Collated daily CSV written to', daily_csv)

print(f"Collating {len(dailyadj_files)} adjusted daily files...")
dailyadj_csv = outdir / "DailyAdjusted.csv.gz"
collate_to_csv(dailyadj_files, dailyadj_csv)
print('Collated adjusted daily CSV written to', dailyadj_csv)

if include_hourly:

print(f"Collating {len(hourly_files)} hourly files...")
hourly_csv = outdir / "Hourly.csv.gz"
collate_to_csv(hourly_files, hourly_csv)
print('Collated hourly CSV written to', hourly_csv)

print(f"Collating {len(hourlyadj_files)} adjusted hourly files...")
hourlyadj_csv = outdir / "HourlyAdjusted.csv.gz"
collate_to_csv(hourlyadj_files, hourlyadj_csv)
print('Collated adjusted hourly CSV written to', hourlyadj_csv)

if include_minutely:

print(f"Collating {len(minutes_files)} minutes files...")
minutes_csv = outdir / "Minutely.csv.gz"
collate_to_csv(minutes_files, minutes_csv)
print('Collated minutes CSV written to', minutes_csv)

print(f"Collating {len(minutesadj_files)} adjusted minutes files...")
minutesadj_csv = outdir / "MinutelyAdjusted.csv.gz"
collate_to_csv(minutesadj_files, minutesadj_csv)
print('Collated adjusted minutes CSV written to', minutesadj_csv)

if include_bouts:

print(f"Collating {len(bouts_files)} bouts files...")
bouts_csv = outdir / "Bouts.csv.gz"
collate_to_csv(bouts_files, bouts_csv)
print('Collated bouts CSV written to', bouts_csv)
df.append(json.load(f, object_pairs_hook=OrderedDict))
df = pd.DataFrame.from_dict(df) # merge to a dataframe
df = df.applymap(convert_ordereddict) # convert any OrderedDict cell values to regular dict
df.to_csv(outfile, index=False)

return


def collate_to_csv(file_list, outfile, overwrite=True):
""" Collate a list of files into a single CSV file."""
def collate_csvs(file_list, outfile, overwrite=True):
""" Collate a list of CSV files into a single CSV file."""

if overwrite and outfile.exists():
print(f"Overwriting existing file: {outfile}")
outfile.unlink() # remove existing file

header_written = False
Expand All @@ -143,19 +118,15 @@ def convert_ordereddict(value):

def main():
parser = argparse.ArgumentParser()
parser.add_argument('results', help="Directory containing the result files")
parser.add_argument('--include-hourly', action='store_true', help="Collate hourly files")
parser.add_argument('--include-minutely', action='store_true', help="Collate minutely files")
parser.add_argument('--include-bouts', action='store_true', help="Collate bouts files")
parser.add_argument('--outdir', '-o', default="collated-outputs/", help="Output directory")
parser.add_argument('results_dir', help="Root directory in which to search for result files")
parser.add_argument('--output', '-o', default="collated-outputs/", help="Directory to write the collated files to")
parser.add_argument('--include', '-i', nargs='+', default=["daily", "hourly", "minutely", "bouts"], help="Type of result files to collate ('daily', 'hourly', 'minutely', 'bouts')")
args = parser.parse_args()

return collate_outputs(
results=args.results,
include_hourly=args.include_hourly,
include_minutely=args.include_minutely,
include_bouts=args.include_bouts,
outdir=args.outdir
results_dir=args.results_dir,
collated_results_dir=args.output,
included=args.include,
)


Expand Down

0 comments on commit b92e97c

Please sign in to comment.