Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor hospital admission to use delphi_utils create_export_csv #2032

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions claims_hosp/delphi_claims_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,25 @@
when the module is run with `python -m delphi_claims_hosp`.
"""

import os

# standard packages
import time
import os
from datetime import datetime, timedelta
from pathlib import Path

# third party
from delphi_utils import get_structured_logger
from delphi_utils.export import create_export_csv

from .backfill import merge_backfill_file, store_backfill_file

# first party
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
from .config import Config
from .download_claims_ftp_files import download
from .modify_claims_drops import modify_and_write
from .get_latest_claims_name import get_latest_filename
from .modify_claims_drops import modify_and_write
from .update_indicator import ClaimsHospIndicatorUpdater
from .backfill import (store_backfill_file, merge_backfill_file)


def run_module(params):
Expand Down Expand Up @@ -137,11 +140,20 @@ def run_module(params):
params["indicator"]["write_se"],
signal_name
)
updater.update_indicator(
output = updater.update_indicator_to_df(
claims_file,
params["common"]["export_dir"],
logger,
)
filtered_output_df = updater.filter_output(output)
create_export_csv(
filtered_output_df,
export_dir=params["common"]["export_dir"],
start_date=startdate,
geo_res=geo,
sensor=signal_name,
)

max_dates.append(updater.output_dates[-1])
n_csv_export.append(len(updater.output_dates))
logger.info("finished updating", geo = geo)
Expand Down
96 changes: 90 additions & 6 deletions claims_hosp/delphi_claims_hosp/update_indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,76 @@ def geo_reindex(self, data):
data_frame.fillna(0, inplace=True)
return data_frame

def update_indicator(self, input_filepath, outpath, logger):
def update_indicator_to_df(self, input_filepath, logger):
dshemetov marked this conversation as resolved.
Show resolved Hide resolved
"""
Generate and output indicator values.

Args:
input_filepath: path to the aggregated claims data
outpath: output path for the csv results
"""
self.shift_dates()
final_output_inds = (self.burn_in_dates >= self.startdate) & (self.burn_in_dates <= self.enddate)

# load data
base_geo = Config.HRR_COL if self.geo == Config.HRR_COL else Config.FIPS_COL
data = load_data(input_filepath, self.dropdate, base_geo)
data_frame = self.geo_reindex(data)

# handle if we need to adjust by weekday
wd_params = (
Weekday.get_params_legacy(
data_frame,
"den",
["num"],
Config.DATE_COL,
[1, 1e5],
logger,
)
if self.weekday
else None
)
output_df = pd.DataFrame()
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
if not self.parallel:
for geo_id, sub_data in data_frame.groupby(level=0):
sub_data.reset_index(inplace=True)
if self.weekday:
sub_data = Weekday.calc_adjustment(wd_params, sub_data, ["num"], Config.DATE_COL)
sub_data.set_index(Config.DATE_COL, inplace=True)
res = ClaimsHospIndicator.fit(sub_data, self.burnindate, geo_id)
output_df = output_df.append(pd.DataFrame(res))
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
else:

n_cpu = min(Config.MAX_CPU_POOL, cpu_count())
logging.debug("starting pool with %d workers", n_cpu)
with Pool(n_cpu) as pool:
pool_results = []
for geo_id, sub_data in data_frame.groupby(level=0, as_index=False):
sub_data.reset_index(inplace=True)
if self.weekday:
sub_data = Weekday.calc_adjustment(wd_params, sub_data, ["num"], Config.DATE_COL)
sub_data.set_index(Config.DATE_COL, inplace=True)
pool_results.append(
pool.apply_async(
ClaimsHospIndicator.fit,
args=(
sub_data,
self.burnindate,
geo_id,
),
)
)
pool_results = [proc.get() for proc in pool_results]
for res in pool_results:
output_df.append(pd.DataFrame(res))

return output_df

def update_indicator(self, input_filepath, logger):
"""
Generate and output indicator values.

Args:
input_filepath: path to the aggregated claims data
"""
self.shift_dates()
final_output_inds = \
Expand Down Expand Up @@ -215,10 +277,30 @@ def update_indicator(self, input_filepath, outpath, logger):
"geo_level": self.geo,
"include": valid_inds,
}

self.write_to_csv(output_dict, outpath)
logging.debug("wrote files to %s", outpath)

return output_dict

def filter_output(self, df):
filtered_df = df[df["incl"]]
filtered_df = filtered_df.reset_index()
filtered_df.rename(columns={"rate": "val"}, inplace=True)
filtered_df["timestamp"] = filtered_df["timestamp"].astype(str)
output_df = pd.DataFrame()
for geo_id, group in filtered_df.groupby("geo_id"):
assert not group.val.isnull().any()
assert not group.se.isnull().any()
assert np.all(group.se < 5), f"se suspicious, {geo_id}: {np.where(group.se >= 5)[0]}"
if np.any(group.val > 90):
for sus_val in np.where(group.val > 90):
logging.warning("value suspicious, %s: %d", geo_id, sus_val)
if self.write_se:
assert np.all(group.val > 0) and np.all(group.se > 0), "p=0, std_err=0 invalid"
else:
group["se"] = np.NaN
group.drop("incl", inplace=True, axis="columns")
Copy link
Contributor

@dshemetov dshemetov Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: is this necessary here? create_export_csv will drop it anyway.

broader question: tracing the code above, i actually don't know what columns are in output_df at this step. in the previous code, we at least knew that we were dealing with

        output_dict = {
            "rates": rates,
            "se": std_errs,
            "dates": self.output_dates,
            "geo_ids": unique_geo_ids,
            "geo_level": self.geo,
            "include": valid_inds,
        }

suggestion: i suppose that depends on what res = ClaimsHospIndicator.fit(sub_data, self.burnindate, geo_id) outputs in update_indicator, but i haven't tracked that down. what do you think about adding an assert to update_indicator at the end that makes sure that output_df has the all the right columns that we expect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you take a look at the comment above again, i updated it

Copy link
Contributor Author

@aysim319 aysim319 Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point! I actually missed sample_size if it wasn't for your comment. Hopefully this should fix the issue.

question: is this necessary here? create_export_csv will drop it anyway.

Yes, we do need the incl at least until the preprocess_output that filters out with incl column being true.

Copy link
Contributor

@dshemetov dshemetov Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point! I actually missed sample_size if it wasn't for your comment. Hopefully this should fix the issue.

Glad that helped!

Yes, we do need the incl at least until the preprocess_output that filters out with incl column being true.

I meant is it necessary to even drop it in line 230, since create_export_csv will ignore it when writing the csv. But it's a minor thing, not a big deal.

group["direction"] = np.NaN
dshemetov marked this conversation as resolved.
Show resolved Hide resolved
output_df = output_df.append(group)
aysim319 marked this conversation as resolved.
Show resolved Hide resolved

return output_df
def write_to_csv(self, output_dict, output_path="./receiving"):
dshemetov marked this conversation as resolved.
Show resolved Hide resolved
"""
Write values to csv.
Expand All @@ -228,6 +310,7 @@ def write_to_csv(self, output_dict, output_path="./receiving"):
output_path: outfile path to write the csv

"""

if self.write_se:
logging.info("========= WARNING: WRITING SEs TO %s =========",
self.signal_name)
Expand Down Expand Up @@ -268,3 +351,4 @@ def write_to_csv(self, output_dict, output_path="./receiving"):
out_n += 1

logging.debug("wrote %d rows for %d %s", out_n, len(geo_ids), geo_level)
logging.debug("wrote files to %s", output_path)
Binary file not shown.
4 changes: 2 additions & 2 deletions claims_hosp/tests/test_modify_claims_drops.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def test_modify_and_write(self):
logger = Mock()
files, dfs_list = modify_and_write(data_path, logger, test_mode=True)
expected_colnames = ['PatCountyFIPS', 'Pat HRR Name', 'Pat HRR ID', 'PatAgeGroup']
assert len(files) == 1
assert len(dfs_list) == 1
assert len(files) == 2
assert len(dfs_list) == 2
assert files[0] == Path('./test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz')
assert set(expected_colnames).issubset(set(dfs_list[0].columns))
89 changes: 69 additions & 20 deletions claims_hosp/tests/test_update_indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from copy import deepcopy
from os.path import join, exists
import json
from tempfile import TemporaryDirectory

# third party
Expand All @@ -13,6 +14,7 @@
# first party
from delphi_claims_hosp.config import Config, GeoConstants
from delphi_claims_hosp.update_indicator import ClaimsHospIndicatorUpdater
from delphi_utils.export import create_export_csv

CONFIG = Config()
CONSTANTS = GeoConstants()
Expand All @@ -35,6 +37,9 @@ class TestClaimsHospIndicatorUpdater:
weekday = False
write_se = False
prefix = "foo"
start_date = "02-01-2020"
end_date = "06-01-2020"
drop_date = "2020-06-12"
dshemetov marked this conversation as resolved.
Show resolved Hide resolved
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
small_test_data = pd.DataFrame({
"num": [0, 100, 200, 300, 400, 500, 600, 100, 200, 300, 400, 500, 600],
"hrr": [1.0] * 7 + [2.0] * 6,
Expand All @@ -44,9 +49,9 @@ class TestClaimsHospIndicatorUpdater:

def test_shift_dates(self):
updater = ClaimsHospIndicatorUpdater(
"02-01-2020",
"06-01-2020",
"06-12-2020",
self.start_date,
self.end_date,
self.drop_date,
self.geo,
self.parallel,
self.weekday,
Expand All @@ -65,9 +70,9 @@ def test_shift_dates(self):

def test_geo_reindex(self):
updater = ClaimsHospIndicatorUpdater(
"02-01-2020",
"06-01-2020",
"06-12-2020",
self.start_date,
self.end_date,
self.drop_date,
self.geo,
self.parallel,
self.weekday,
Expand All @@ -83,31 +88,32 @@ def test_update_indicator(self):
for geo in ["state", "hrr", "hhs", "nation"]:
td = TemporaryDirectory()
updater = ClaimsHospIndicatorUpdater(
"02-01-2020",
"06-01-2020",
"06-12-2020",
self.start_date,
self.end_date,
self.drop_date,
geo,
self.parallel,
self.weekday,
self.write_se,
Config.signal_name
)

updater.update_indicator(
output = updater.update_indicator(
DATA_FILEPATH,
td.name,
TEST_LOGGER
)

updater.write_to_csv(output, td.name)

assert len(os.listdir(td.name)) == len(
updater.output_dates), f"failed {geo} update_indicator test"
td.cleanup()

def test_write_to_csv_results(self):
updater = ClaimsHospIndicatorUpdater(
"02-01-2020",
"06-01-2020",
"06-12-2020",
self.start_date,
self.end_date,
self.drop_date,
self.geo,
self.parallel,
self.weekday,
Expand Down Expand Up @@ -185,9 +191,9 @@ def test_write_to_csv_with_se_results(self):
obfuscated_name = PARAMS["indicator"]["obfuscated_prefix"]
signal_name = obfuscated_name + "_" + Config.signal_weekday_name
updater = ClaimsHospIndicatorUpdater(
"02-01-2020",
"06-01-2020",
"06-12-2020",
self.start_date,
self.end_date,
self.drop_date,
self.geo,
self.parallel,
True,
Expand Down Expand Up @@ -236,9 +242,9 @@ def test_write_to_csv_with_se_results(self):

def test_write_to_csv_wrong_results(self):
updater = ClaimsHospIndicatorUpdater(
"02-01-2020",
"06-01-2020",
"06-12-2020",
self.start_date,
self.end_date,
self.drop_date,
self.geo,
self.parallel,
self.weekday,
Expand Down Expand Up @@ -289,3 +295,46 @@ def test_write_to_csv_wrong_results(self):
updater.write_to_csv(res3, td.name)

td.cleanup()

def test_prefilter_results(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: nice test, thanks!

question: how big is the dataset we're comparing here? do you think it's representative and gets a lot of code coverage?

suggestion: this seems like another migration test we can remove once this PR is ready for merge.

suggestion: if we want to be especially careful, we could run this same kind of test but compare staging and prod output CSVs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A1: I need to double check, but I believe I got an actual file from a one-off run and should be about a gig, but I would need to double check. Do you think I should add another file that's more recent?

Response to S1: that's the idea

Response to S2: that seems like a good idea; I would need to poke around how staging is and see what happens

Copy link
Contributor

@dshemetov dshemetov Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A1: I need to double check, but I believe I got an actual file from a one-off run and should be about a gig, but I would need to double check. Do you think I should add another file that's more recent?

I'm not familiar with the source files for hospital admission, but the answer here really depends on whether source file is a one of many signals, one of many geos, etc. if this single drop contains every signal as a column and it's the source geo that we aggregate up, then that's good coverage. but if it's not, then doing a prod/staging comparison will get that coverage instead.

side-note: very important that we squash merge this PR, so the gig-sized file doesn't make it into the commit history.

Response to S2: that seems like a good idea; I would need to poke around how staging is and see what happens

I think it would be worthwhile, so let's do that at some point. I also think that your experience with doing prod/staging comparisons will help us streamline this process in the future and make something that does branch comparisons with the press of a button.

Copy link
Contributor Author

@aysim319 aysim319 Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in staging: ran the older version and saved the output in /common/text_hosptial_admission_test_export_20240903

and after scp into local and comparing with the new version
sample script below

    def test_compare_run(self):
        expected_path = "../from_staging/test_export"
        actual_path = "../receiving"
        expected_files = sorted(glob.glob(f"{expected_path}/*.csv"))
        actual_files = sorted(glob.glob(f"{actual_path}/*.csv"))
        for expected, actual in zip(expected_files, actual_files):
            with open(f"{expected_path}/{expected}", "rb") as expected_f, \
                 open(f"{actual_path}/{actual}", "rb") as actual_f:
                expected_df = pd.read_csv(expected_f)
                actual_df = pd.read_csv(actual_f)
                pd.testing.assert_frame_equal(expected_df, actual_df)

passed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many export csvs are produced by the staging run? /common/text_hosptial_admission_test_export_20240903?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20076 or so. hospital-admission creates all geos starting from 2020-02-01 till 2024-08-31 (there's some lag)

td = TemporaryDirectory()
td2 = TemporaryDirectory()

updater = ClaimsHospIndicatorUpdater(
self.start_date,
self.end_date,
self.drop_date,
"state",
self.parallel,
self.weekday,
self.write_se,
Config.signal_name
)

output = updater.update_indicator(
"test_data/EDI_AGG_INPATIENT_1_06092020_1451CDT.csv.gz",
TEST_LOGGER
)

updater.write_to_csv(output, td.name)

output_df = updater.update_indicator_to_df(
"test_data/EDI_AGG_INPATIENT_1_06092020_1451CDT.csv.gz",
TEST_LOGGER
)

filtered_output_df = updater.filter_output(output_df)
create_export_csv(filtered_output_df, td2.name,
start_date=self.start_date,
end_date=self.end_date,
geo_res="state",
sensor=Config.signal_name)
expected_files = sorted(os.listdir(td.name))
actual_files = sorted(os.listdir(td2.name))
for expected, actual in zip(expected_files, actual_files):
with open(join(td2.name, expected), "rb") as expected_f, \
open(join(td2.name, actual), "rb") as actual_f:
expected_df = pd.read_csv(expected_f)
actual_df = pd.read_csv(actual_f)
pd.testing.assert_frame_equal(expected_df, actual_df)
td.cleanup()
Loading