From b2b7875964623dec3a2794d1547e5cae55676811 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Wed, 8 Dec 2021 15:49:22 -0500 Subject: [PATCH 1/4] CHNG ILI Signal --- .../templates/changehc-params-prod.json.j2 | 2 +- changehc/delphi_changehc/constants.py | 5 ++- changehc/delphi_changehc/load_data.py | 37 +++++++++++++++++++ changehc/delphi_changehc/run.py | 17 ++++++++- changehc/delphi_changehc/update_sensor.py | 4 +- changehc/tests/test_load_data.py | 17 +++++++++ 6 files changed, 78 insertions(+), 4 deletions(-) diff --git a/ansible/templates/changehc-params-prod.json.j2 b/ansible/templates/changehc-params-prod.json.j2 index 041107a7d..95eeaa757 100644 --- a/ansible/templates/changehc-params-prod.json.j2 +++ b/ansible/templates/changehc-params-prod.json.j2 @@ -22,7 +22,7 @@ "parallel": false, "geos": ["state", "msa", "hrr", "county", "hhs", "nation"], "weekday": [true, false], - "types": ["covid","cli","flu"], + "types": ["covid","cli","flu","ili"], "wip_signal": "", "ftp_conn": { "host": "{{ changehc_sftp_host }}", diff --git a/changehc/delphi_changehc/constants.py b/changehc/delphi_changehc/constants.py index 38ee1e2c1..803962c84 100644 --- a/changehc/delphi_changehc/constants.py +++ b/changehc/delphi_changehc/constants.py @@ -5,7 +5,10 @@ SMOOTHED_ADJ_CLI = "smoothed_adj_outpatient_cli" SMOOTHED_FLU = "smoothed_outpatient_flu" SMOOTHED_ADJ_FLU = "smoothed_adj_outpatient_flu" -SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, SMOOTHED_FLU, SMOOTHED_ADJ_FLU] +SMOOTHED_ILI = "smoothed_outpatient_ili" +SMOOTHED_ADJ_ILI = "smoothed_adj_outpatient_ili" +SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI,\ + SMOOTHED_FLU, SMOOTHED_ADJ_FLU, SMOOTHED_ILI, SMOOTHED_ADJ_ILI] NA = "NA" HRR = "hrr" FIPS = "fips" diff --git a/changehc/delphi_changehc/load_data.py b/changehc/delphi_changehc/load_data.py index 1f9a43117..7d7572a82 100644 --- a/changehc/delphi_changehc/load_data.py +++ b/changehc/delphi_changehc/load_data.py @@ -186,3 +186,40 @@ def load_flu_data(denom_filepath, flu_filepath, dropdate, base_geo): data = data[["num", "den"]] return data + + +def load_ili_data(denom_filepath, flu_filepath, flu_like_filepath, dropdate, base_geo): + """Load in denominator and ili data, and combine them. + + Args: + denom_filepath: path to the aggregated denominator data + flu_filepath: path to the aggregated flu data + flu_like_filepath: path to the aggregated flu_like data + dropdate: data drop date (datetime object) + base_geo: base geographic unit before aggregation ('fips') + + Returns: + combined multiindexed dataframe, index 0 is geo_base, index 1 is date + """ + assert base_geo == "fips", "base unit must be 'fips'" + + # load each data stream + denom_data = load_chng_data(denom_filepath, dropdate, base_geo, + Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL) + flu_data = load_chng_data(flu_filepath, dropdate, base_geo, + Config.FLU_COLS, Config.FLU_DTYPES, Config.FLU_COL) + flu_like_data = load_chng_data(flu_like_filepath, dropdate, base_geo, + Config.FLU_LIKE_COLS, Config.FLU_DTYPES, Config.FLU_LIKE_COL) + + # merge data + data = denom_data.merge(flu_data, how="outer", left_index=True, right_index=True) + data = data.merge(flu_like_data, how="outer", left_index=True, right_index=True) + assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge" + + # calculate combined numerator and denominator + data.fillna(0, inplace=True) + data["num"] = data[Config.FLU_COL] + data[Config.FLU_LIKE_COL] + data["den"] = data[Config.DENOM_COL] + data = data[["num", "den"]] + + return data diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 145f3267a..98022466a 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -15,7 +15,7 @@ # first party from .download_ftp_files import download_counts -from .load_data import load_combined_data, load_cli_data, load_flu_data +from .load_data import load_combined_data, load_cli_data, load_flu_data, load_ili_data from .update_sensor import CHCSensorUpdater @@ -52,6 +52,9 @@ def retrieve_files(params, filedate, logger): file_dict["covid_like"] = covid_like_file if "flu" in params["indicator"]["types"]: file_dict["flu"] = flu_file + if "ili" in params["indicator"]["types"]: + file_dict["flu"] = flu_file + file_dict["flu_like"] = flu_like_file return file_dict @@ -77,6 +80,15 @@ def make_asserts(params): if "flu" in params["indicator"]["types"]: assert (files["denom"] is None) == (files["flu"] is None), \ "exactly one of denom and flu files are provided" + if "ili" in params["indicator"]["types"]: + if files["denom"] is None: + assert files["flu"] is None and \ + files["flu_like"] is None,\ + "files must be all present or all absent" + else: + assert files["flu"] is not None and \ + files["flu_like"] is not None,\ + "files must be all present or all absent" def run_module(params: Dict[str, Dict[str, Any]]): @@ -187,6 +199,9 @@ def run_module(params: Dict[str, Dict[str, Any]]): elif numtype == "flu": data = load_flu_data(file_dict["denom"],file_dict["flu"], dropdate_dt,"fips") + elif numtype == "ili": + data = load_ili_data(file_dict["denom"],file_dict["flu"], + file_dict["flu_like"],dropdate_dt,"fips") more_stats = su_inst.update_sensor( data, params["common"]["export_dir"], diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index 52a1af47f..705615639 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -16,7 +16,7 @@ # first party from .config import Config from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI,\ - SMOOTHED_FLU, SMOOTHED_ADJ_FLU, NA + SMOOTHED_FLU, SMOOTHED_ADJ_FLU, SMOOTHED_ILI, SMOOTHED_ADJ_ILI, NA from .sensor import CHCSensor @@ -117,6 +117,8 @@ def __init__(self, signal_name = SMOOTHED_ADJ_CLI if self.weekday else SMOOTHED_CLI elif self.numtype == "flu": signal_name = SMOOTHED_ADJ_FLU if self.weekday else SMOOTHED_FLU + elif self.numtype == "ili": + signal_name = SMOOTHED_ADJ_ILI if self.weekday else SMOOTHED_ILI else: raise ValueError(f'Unsupported numtype received "{numtype}",' f' must be one of ["covid", "cli", "flu"]') diff --git a/changehc/tests/test_load_data.py b/changehc/tests/test_load_data.py index f70366355..c025e4aff 100644 --- a/changehc/tests/test_load_data.py +++ b/changehc/tests/test_load_data.py @@ -15,11 +15,13 @@ "input_denom_file": "test_data/20200601_Counts_Products_Denom.dat.gz", "input_covid_file": "test_data/20200601_Counts_Products_Covid.dat.gz", "input_flu_file": "test_data/20200601_Counts_Products_Covid.dat.gz", + "input_flu_like_file": "test_data/20200601_Counts_Products_Covid.dat.gz", "drop_date": "2020-06-01" } } COVID_FILEPATH = PARAMS["indicator"]["input_covid_file"] FLU_FILEPATH = PARAMS["indicator"]["input_flu_file"] +FLU_LIKE_FILEPATH = PARAMS["indicator"]["input_flu_like_file"] DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) @@ -33,6 +35,8 @@ class TestLoadData: "fips") flu_data = load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, DROP_DATE, "fips") + ili_data = load_ili_data(DENOM_FILEPATH, FLU_FILEPATH, FLU_LIKE_FILEPATH, + DROP_DATE,"fips") gmpr = GeoMapper() def test_base_unit(self): @@ -50,6 +54,9 @@ def test_base_unit(self): with pytest.raises(AssertionError): load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, DROP_DATE, "foo") + with pytest.raises(AssertionError): + load_ili_data(DENOM_FILEPATH, FLU_FILEPATH, FLU_LIKE_FILEPATH, DROP_DATE, "foo") + def test_denom_columns(self): assert "fips" in self.denom_data.index.names assert "timestamp" in self.denom_data.index.names @@ -88,6 +95,16 @@ def test_flu_columns(self): assert len( set(self.flu_data.columns) - set(expected_flu_columns)) == 0 + def test_ili_columns(self): + assert "fips" in self.ili_data.index.names + assert "timestamp" in self.ili_data.index.names + + expected_ili_columns = ["num", "den"] + for col in expected_ili_columns: + assert col in self.ili_data.columns + assert len( + set(self.ili_data.columns) - set(expected_ili_columns)) == 0 + def test_edge_values(self): for data in [self.denom_data, self.covid_data, From 6217f7f102d000898d368d98cfe9943e0e4f5cc1 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Wed, 8 Dec 2021 19:33:09 -0500 Subject: [PATCH 2/4] Refactor for linting --- changehc/delphi_changehc/run.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 98022466a..adb383af1 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -91,6 +91,23 @@ def make_asserts(params): "files must be all present or all absent" +def get_data(file_dict, numtype, dropdate_dt): + """Call correct load_data function depending on numtype""" + if numtype == "covid": + data = load_combined_data(file_dict["denom"], + file_dict["covid"],dropdate_dt,"fips") + elif numtype == "cli": + data = load_cli_data(file_dict["denom"],file_dict["flu"],file_dict["mixed"], + file_dict["flu_like"],file_dict["covid_like"],dropdate_dt,"fips") + elif numtype == "flu": + data = load_flu_data(file_dict["denom"],file_dict["flu"], + dropdate_dt,"fips") + elif numtype == "ili": + data = load_ili_data(file_dict["denom"],file_dict["flu"], + file_dict["flu_like"],dropdate_dt,"fips") + return(data) + + def run_module(params: Dict[str, Dict[str, Any]]): """ Run the delphi_changehc module. @@ -190,18 +207,7 @@ def run_module(params: Dict[str, Dict[str, Any]]): params["indicator"]["wip_signal"], logger ) - if numtype == "covid": - data = load_combined_data(file_dict["denom"], - file_dict["covid"],dropdate_dt,"fips") - elif numtype == "cli": - data = load_cli_data(file_dict["denom"],file_dict["flu"],file_dict["mixed"], - file_dict["flu_like"],file_dict["covid_like"],dropdate_dt,"fips") - elif numtype == "flu": - data = load_flu_data(file_dict["denom"],file_dict["flu"], - dropdate_dt,"fips") - elif numtype == "ili": - data = load_ili_data(file_dict["denom"],file_dict["flu"], - file_dict["flu_like"],dropdate_dt,"fips") + data = get_data(file_dict, numtype, dropdate_dt) more_stats = su_inst.update_sensor( data, params["common"]["export_dir"], From 6212b4585537dd8a210804027376bdf2868e9f74 Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Wed, 8 Dec 2021 20:34:17 -0500 Subject: [PATCH 3/4] Fix linting --- changehc/delphi_changehc/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index adb383af1..b15c44871 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -105,7 +105,7 @@ def get_data(file_dict, numtype, dropdate_dt): elif numtype == "ili": data = load_ili_data(file_dict["denom"],file_dict["flu"], file_dict["flu_like"],dropdate_dt,"fips") - return(data) + return data def run_module(params: Dict[str, Dict[str, Any]]): From 25ea62bffb91e130c45e907dc03a07c0e84b179b Mon Sep 17 00:00:00 2001 From: rumackaaron Date: Wed, 8 Dec 2021 20:42:38 -0500 Subject: [PATCH 4/4] Fix pydoc --- changehc/delphi_changehc/load_data.py | 2 +- changehc/delphi_changehc/run.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/changehc/delphi_changehc/load_data.py b/changehc/delphi_changehc/load_data.py index 7d7572a82..cfdd4032f 100644 --- a/changehc/delphi_changehc/load_data.py +++ b/changehc/delphi_changehc/load_data.py @@ -194,7 +194,7 @@ def load_ili_data(denom_filepath, flu_filepath, flu_like_filepath, dropdate, bas Args: denom_filepath: path to the aggregated denominator data flu_filepath: path to the aggregated flu data - flu_like_filepath: path to the aggregated flu_like data + flu_like_filepath: path to the aggregated flu_like data dropdate: data drop date (datetime object) base_geo: base geographic unit before aggregation ('fips') diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index b15c44871..4c7610919 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -92,7 +92,7 @@ def make_asserts(params): def get_data(file_dict, numtype, dropdate_dt): - """Call correct load_data function depending on numtype""" + """Call correct load_data function depending on numtype.""" if numtype == "covid": data = load_combined_data(file_dict["denom"], file_dict["covid"],dropdate_dt,"fips")