diff --git a/ansible/templates/changehc-params-prod.json.j2 b/ansible/templates/changehc-params-prod.json.j2 index c365fa605..d343b9d71 100644 --- a/ansible/templates/changehc-params-prod.json.j2 +++ b/ansible/templates/changehc-params-prod.json.j2 @@ -25,7 +25,7 @@ "parallel": false, "geos": ["state", "msa", "hrr", "county", "hhs", "nation"], "weekday": [true, false], - "types": ["covid","cli","flu"], + "types": ["covid","cli","flu","ili"], "wip_signal": "", "ftp_conn": { "host": "{{ changehc_sftp_host }}", diff --git a/changehc/delphi_changehc/constants.py b/changehc/delphi_changehc/constants.py index a458f8819..1d5f6b974 100644 --- a/changehc/delphi_changehc/constants.py +++ b/changehc/delphi_changehc/constants.py @@ -5,7 +5,10 @@ SMOOTHED_ADJ_CLI = "smoothed_adj_outpatient_cli" SMOOTHED_FLU = "smoothed_outpatient_flu" SMOOTHED_ADJ_FLU = "smoothed_adj_outpatient_flu" -SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, SMOOTHED_FLU, SMOOTHED_ADJ_FLU] +SMOOTHED_ILI = "smoothed_outpatient_ili" +SMOOTHED_ADJ_ILI = "smoothed_adj_outpatient_ili" +SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI,\ + SMOOTHED_FLU, SMOOTHED_ADJ_FLU, SMOOTHED_ILI, SMOOTHED_ADJ_ILI] NA = "NA" HRR = "hrr" FIPS = "fips" diff --git a/changehc/delphi_changehc/load_data.py b/changehc/delphi_changehc/load_data.py index c4a5f1e9c..23ee522fe 100644 --- a/changehc/delphi_changehc/load_data.py +++ b/changehc/delphi_changehc/load_data.py @@ -225,3 +225,40 @@ def load_flu_data(denom_filepath, flu_filepath, base_geo, issue_date, test_mode=False, check_nd=25) store_backfill_file(data, issue_date, backfill_dir, numtype, geo, weekday) return data + + +def load_ili_data(denom_filepath, flu_filepath, flu_like_filepath, dropdate, base_geo): + """Load in denominator and ili data, and combine them. + + Args: + denom_filepath: path to the aggregated denominator data + flu_filepath: path to the aggregated flu data + flu_like_filepath: path to the aggregated flu_like data + dropdate: data drop date (datetime object) + base_geo: base geographic unit before aggregation ('fips') + + Returns: + combined multiindexed dataframe, index 0 is geo_base, index 1 is date + """ + assert base_geo == "fips", "base unit must be 'fips'" + + # load each data stream + denom_data = load_chng_data(denom_filepath, dropdate, base_geo, + Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL) + flu_data = load_chng_data(flu_filepath, dropdate, base_geo, + Config.FLU_COLS, Config.FLU_DTYPES, Config.FLU_COL) + flu_like_data = load_chng_data(flu_like_filepath, dropdate, base_geo, + Config.FLU_LIKE_COLS, Config.FLU_DTYPES, Config.FLU_LIKE_COL) + + # merge data + data = denom_data.merge(flu_data, how="outer", left_index=True, right_index=True) + data = data.merge(flu_like_data, how="outer", left_index=True, right_index=True) + assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge" + + # calculate combined numerator and denominator + data.fillna(0, inplace=True) + data["num"] = data[Config.FLU_COL] + data[Config.FLU_LIKE_COL] + data["den"] = data[Config.DENOM_COL] + data = data[["num", "den"]] + + return data diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 8d4d25261..35ad5c69a 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -15,7 +15,7 @@ # first party from .download_ftp_files import download_counts -from .load_data import (load_combined_data, load_cli_data, load_flu_data) +from .load_data import load_combined_data, load_cli_data, load_flu_data, load_ili_data from .update_sensor import CHCSensorUpdater @@ -52,6 +52,9 @@ def retrieve_files(params, filedate, logger): file_dict["covid_like"] = covid_like_file if "flu" in params["indicator"]["types"]: file_dict["flu"] = flu_file + if "ili" in params["indicator"]["types"]: + file_dict["flu"] = flu_file + file_dict["flu_like"] = flu_like_file return file_dict @@ -77,6 +80,15 @@ def make_asserts(params): if "flu" in params["indicator"]["types"]: assert (files["denom"] is None) == (files["flu"] is None), \ "exactly one of denom and flu files are provided" + if "ili" in params["indicator"]["types"]: + if files["denom"] is None: + assert files["flu"] is None and \ + files["flu_like"] is None,\ + "files must be all present or all absent" + else: + assert files["flu"] is not None and \ + files["flu_like"] is not None,\ + "files must be all present or all absent" def run_module(params: Dict[str, Dict[str, Any]]): @@ -195,6 +207,10 @@ def run_module(params: Dict[str, Dict[str, Any]]): data = load_flu_data(file_dict["denom"],file_dict["flu"], "fips",backfill_dir, geo, weekday, numtype, generate_backfill_files, backfill_merge_day) + elif numtype == "ili": + data = load_ili_data(file_dict["denom"],file_dict["flu"], + file_dict["flu_like"],dropdate_dt,"fips") + more_stats = su_inst.update_sensor( data, params["common"]["export_dir"], diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index 52a1af47f..705615639 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -16,7 +16,7 @@ # first party from .config import Config from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI,\ - SMOOTHED_FLU, SMOOTHED_ADJ_FLU, NA + SMOOTHED_FLU, SMOOTHED_ADJ_FLU, SMOOTHED_ILI, SMOOTHED_ADJ_ILI, NA from .sensor import CHCSensor @@ -117,6 +117,8 @@ def __init__(self, signal_name = SMOOTHED_ADJ_CLI if self.weekday else SMOOTHED_CLI elif self.numtype == "flu": signal_name = SMOOTHED_ADJ_FLU if self.weekday else SMOOTHED_FLU + elif self.numtype == "ili": + signal_name = SMOOTHED_ADJ_ILI if self.weekday else SMOOTHED_ILI else: raise ValueError(f'Unsupported numtype received "{numtype}",' f' must be one of ["covid", "cli", "flu"]') diff --git a/changehc/tests/test_load_data.py b/changehc/tests/test_load_data.py index 9ce6f94f8..ae83ee8f5 100644 --- a/changehc/tests/test_load_data.py +++ b/changehc/tests/test_load_data.py @@ -18,12 +18,14 @@ "input_denom_file": "test_data/20200601_Counts_Products_Denom.dat.gz", "input_covid_file": "test_data/20200601_Counts_Products_Covid.dat.gz", "input_flu_file": "test_data/20200601_Counts_Products_Covid.dat.gz", + "input_flu_like_file": "test_data/20200601_Counts_Products_Covid.dat.gz", "backfill_dir": "./backfill", "drop_date": "2020-06-01" } } COVID_FILEPATH = PARAMS["indicator"]["input_covid_file"] FLU_FILEPATH = PARAMS["indicator"]["input_flu_file"] +FLU_LIKE_FILEPATH = PARAMS["indicator"]["input_flu_like_file"] DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) backfill_dir = PARAMS["indicator"]["backfill_dir"] @@ -42,6 +44,8 @@ class TestLoadData: True, backfill_merge_day) flu_data = load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, "fips", backfill_dir, geo, weekday, "flu", True, backfill_merge_day) + ili_data = load_ili_data(DENOM_FILEPATH, FLU_FILEPATH, FLU_LIKE_FILEPATH, + DROP_DATE,"fips") gmpr = GeoMapper() def test_base_unit(self): @@ -61,6 +65,9 @@ def test_base_unit(self): load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, "foo", backfill_dir, geo, weekday, "covid", True, backfill_merge_day) + with pytest.raises(AssertionError): + load_ili_data(DENOM_FILEPATH, FLU_FILEPATH, FLU_LIKE_FILEPATH, DROP_DATE, "foo") + def test_denom_columns(self): assert "fips" in self.denom_data.index.names assert "timestamp" in self.denom_data.index.names @@ -99,6 +106,16 @@ def test_flu_columns(self): assert len( set(self.flu_data.columns) - set(expected_flu_columns)) == 0 + def test_ili_columns(self): + assert "fips" in self.ili_data.index.names + assert "timestamp" in self.ili_data.index.names + + expected_ili_columns = ["num", "den"] + for col in expected_ili_columns: + assert col in self.ili_data.columns + assert len( + set(self.ili_data.columns) - set(expected_ili_columns)) == 0 + def test_edge_values(self): for data in [self.denom_data, self.covid_data,