Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor hospital admission to use delphi_utils create_export_csv #2032

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ def create_export_csv(
"geo_id",
"val",
"se",
"direction",
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
"sample_size",
"missing_val",
"missing_se",
"missing_sample_size"
"missing_sample_size",
]
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
if "missing_val" in export_df.columns:
Expand Down
22 changes: 14 additions & 8 deletions claims_hosp/delphi_claims_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@
when the module is run with `python -m delphi_claims_hosp`.
"""

# standard packages
import time
import os
import time
from datetime import datetime, timedelta
from pathlib import Path

# third party
from delphi_utils import get_structured_logger
from delphi_utils.export import create_export_csv

# first party
from .backfill import merge_backfill_file, store_backfill_file
from .config import Config
from .download_claims_ftp_files import download
from .modify_claims_drops import modify_and_write
from .get_latest_claims_name import get_latest_filename
from .modify_claims_drops import modify_and_write
from .update_indicator import ClaimsHospIndicatorUpdater
from .backfill import (store_backfill_file, merge_backfill_file)


def run_module(params):
Expand Down Expand Up @@ -137,11 +135,19 @@ def run_module(params):
params["indicator"]["write_se"],
signal_name
)
updater.update_indicator(
output = updater.update_indicator(
claims_file,
params["common"]["export_dir"],
logger,
)
filtered_output_df = updater.preprocess_output(output)
create_export_csv(
filtered_output_df,
export_dir=params["common"]["export_dir"],
start_date=startdate,
geo_res=geo,
sensor=signal_name,
)

max_dates.append(updater.output_dates[-1])
n_csv_export.append(len(updater.output_dates))
logger.info("finished updating", geo = geo)
Expand Down
134 changes: 49 additions & 85 deletions claims_hosp/delphi_claims_hosp/update_indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,15 @@ def geo_reindex(self, data):
data_frame.fillna(0, inplace=True)
return data_frame

def update_indicator(self, input_filepath, outpath, logger):
def update_indicator(self, input_filepath, logger):
"""
Generate and output indicator values.

Args:
input_filepath: path to the aggregated claims data
outpath: output path for the csv results

"""
self.shift_dates()
final_output_inds = \
(self.burn_in_dates >= self.startdate) & (self.burn_in_dates <= self.enddate)
final_output_inds = (self.burn_in_dates >= self.startdate) & (self.burn_in_dates <= self.enddate)

# load data
base_geo = Config.HRR_COL if self.geo == Config.HRR_COL else Config.FIPS_COL
Expand All @@ -164,22 +161,19 @@ def update_indicator(self, input_filepath, outpath, logger):
if self.weekday
else None
)
# run fitting code (maybe in parallel)
rates = {}
std_errs = {}
valid_inds = {}
df_lst = []
output_df = pd.DataFrame()
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
if not self.parallel:
for geo_id, sub_data in data_frame.groupby(level=0):
sub_data.reset_index(inplace=True)
if self.weekday:
sub_data = Weekday.calc_adjustment(
wd_params, sub_data, ["num"], Config.DATE_COL)
sub_data = Weekday.calc_adjustment(wd_params, sub_data, ["num"], Config.DATE_COL)
sub_data.set_index(Config.DATE_COL, inplace=True)
res = ClaimsHospIndicator.fit(sub_data, self.burnindate, geo_id)
res = pd.DataFrame(res)
rates[geo_id] = np.array(res.loc[final_output_inds, "rate"])
std_errs[geo_id] = np.array(res.loc[final_output_inds, "se"])
valid_inds[geo_id] = np.array(res.loc[final_output_inds, "incl"])
temp_df = pd.DataFrame(res)
temp_df = temp_df.loc[final_output_inds]
df_lst.append(pd.DataFrame(temp_df))
output_df = pd.concat(df_lst)
else:
n_cpu = min(Config.MAX_CPU_POOL, cpu_count())
logging.debug("starting pool with %d workers", n_cpu)
Expand All @@ -188,83 +182,53 @@ def update_indicator(self, input_filepath, outpath, logger):
for geo_id, sub_data in data_frame.groupby(level=0, as_index=False):
sub_data.reset_index(inplace=True)
if self.weekday:
sub_data = Weekday.calc_adjustment(
wd_params, sub_data, ["num"], Config.DATE_COL)
sub_data = Weekday.calc_adjustment(wd_params, sub_data, ["num"], Config.DATE_COL)
sub_data.set_index(Config.DATE_COL, inplace=True)
pool_results.append(
pool.apply_async(
ClaimsHospIndicator.fit,
args=(sub_data, self.burnindate, geo_id,),
args=(
sub_data,
self.burnindate,
geo_id,
),
)
)
pool_results = [proc.get() for proc in pool_results]
for res in pool_results:
geo_id = res["geo_id"]
res = pd.DataFrame(res)
rates[geo_id] = np.array(res.loc[final_output_inds, "rate"])
std_errs[geo_id] = np.array(res.loc[final_output_inds, "se"])
valid_inds[geo_id] = np.array(res.loc[final_output_inds, "incl"])

# write out results
unique_geo_ids = list(rates.keys())
output_dict = {
"rates": rates,
"se": std_errs,
"dates": self.output_dates,
"geo_ids": unique_geo_ids,
"geo_level": self.geo,
"include": valid_inds,
}

self.write_to_csv(output_dict, outpath)
logging.debug("wrote files to %s", outpath)

def write_to_csv(self, output_dict, output_path="./receiving"):
df_lst = [pd.DataFrame(proc.get()).loc([final_output_inds]) for proc in pool_results]
output_df = pd.concat(df_lst)

return output_df

def preprocess_output(self, df) -> pd.DataFrame:
"""
Write values to csv.
Check for any anomlies and formats the output for exports.

Args:
output_dict: dictionary containing values, se, unique dates, and unique geo_id
output_path: outfile path to write the csv
Parameters
----------
df

Returns
-------
df
"""
if self.write_se:
logging.info("========= WARNING: WRITING SEs TO %s =========",
self.signal_name)

geo_level = output_dict["geo_level"]
dates = output_dict["dates"]
geo_ids = output_dict["geo_ids"]
all_rates = output_dict["rates"]
all_se = output_dict["se"]
all_include = output_dict["include"]
out_n = 0
for i, date in enumerate(dates):
filename = "%s/%s_%s_%s.csv" % (
output_path,
(date + Config.DAY_SHIFT).strftime("%Y%m%d"),
geo_level,
self.signal_name,
)
with open(filename, "w") as outfile:
outfile.write("geo_id,val,se,direction,sample_size\n")
for geo_id in geo_ids:
val = all_rates[geo_id][i]
se = all_se[geo_id][i]
if all_include[geo_id][i]:
assert not np.isnan(val), "value for included value is nan"
assert not np.isnan(se), "se for included rate is nan"
if val > 90:
logging.warning("value suspicious, %s: %d", geo_id, val)
assert se < 5, f"se suspicious, {geo_id}: {se}"
if self.write_se:
assert val > 0 and se > 0, "p=0, std_err=0 invalid"
outfile.write(
"%s,%f,%s,%s,%s\n" % (geo_id, val, se, "NA", "NA"))
else:
# for privacy reasons we will not report the standard error
outfile.write(
"%s,%f,%s,%s,%s\n" % (geo_id, val, "NA", "NA", "NA"))
out_n += 1

logging.debug("wrote %d rows for %d %s", out_n, len(geo_ids), geo_level)
filtered_df = df[df["incl"]]
filtered_df = filtered_df.reset_index()
filtered_df.rename(columns={"rate": "val"}, inplace=True)
filtered_df["timestamp"] = filtered_df["timestamp"].astype(str)
output_df = pd.DataFrame()
for geo_id, group in filtered_df.groupby("geo_id"):
assert not group.val.isnull().any()
assert not group.se.isnull().any()
assert np.all(group.se < 5), f"se suspicious, {geo_id}: {np.where(group.se >= 5)[0]}"
if np.any(group.val > 90):
for sus_val in np.where(group.val > 90):
logging.warning("value suspicious, %s: %d", geo_id, sus_val)
if self.write_se:
assert np.all(group.val > 0) and np.all(group.se > 0), "p=0, std_err=0 invalid"
else:
group["se"] = np.NaN
group.drop("incl", inplace=True, axis="columns")
Copy link
Contributor

@dshemetov dshemetov Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: is this necessary here? create_export_csv will drop it anyway.

broader question: tracing the code above, i actually don't know what columns are in output_df at this step. in the previous code, we at least knew that we were dealing with

        output_dict = {
            "rates": rates,
            "se": std_errs,
            "dates": self.output_dates,
            "geo_ids": unique_geo_ids,
            "geo_level": self.geo,
            "include": valid_inds,
        }

suggestion: i suppose that depends on what res = ClaimsHospIndicator.fit(sub_data, self.burnindate, geo_id) outputs in update_indicator, but i haven't tracked that down. what do you think about adding an assert to update_indicator at the end that makes sure that output_df has the all the right columns that we expect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you take a look at the comment above again, i updated it

Copy link
Contributor Author

@aysim319 aysim319 Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point! I actually missed sample_size if it wasn't for your comment. Hopefully this should fix the issue.

question: is this necessary here? create_export_csv will drop it anyway.

Yes, we do need the incl at least until the preprocess_output that filters out with incl column being true.

Copy link
Contributor

@dshemetov dshemetov Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point! I actually missed sample_size if it wasn't for your comment. Hopefully this should fix the issue.

Glad that helped!

Yes, we do need the incl at least until the preprocess_output that filters out with incl column being true.

I meant is it necessary to even drop it in line 230, since create_export_csv will ignore it when writing the csv. But it's a minor thing, not a big deal.

group["direction"] = np.NaN
dshemetov marked this conversation as resolved.
Show resolved Hide resolved
output_df = output_df.append(group)
aysim319 marked this conversation as resolved.
Show resolved Hide resolved

return output_df
Loading
Loading