Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CHNG ILI Signal #1401

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ansible/templates/changehc-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"parallel": false,
"geos": ["state", "msa", "hrr", "county", "hhs", "nation"],
"weekday": [true, false],
"types": ["covid","cli","flu"],
"types": ["covid","cli","flu","ili"],
"wip_signal": "",
"ftp_conn": {
"host": "{{ changehc_sftp_host }}",
Expand Down
5 changes: 4 additions & 1 deletion changehc/delphi_changehc/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
SMOOTHED_ADJ_CLI = "smoothed_adj_outpatient_cli"
SMOOTHED_FLU = "smoothed_outpatient_flu"
SMOOTHED_ADJ_FLU = "smoothed_adj_outpatient_flu"
SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, SMOOTHED_FLU, SMOOTHED_ADJ_FLU]
SMOOTHED_ILI = "smoothed_outpatient_ili"
SMOOTHED_ADJ_ILI = "smoothed_adj_outpatient_ili"
SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI,\
SMOOTHED_FLU, SMOOTHED_ADJ_FLU, SMOOTHED_ILI, SMOOTHED_ADJ_ILI]
NA = "NA"
HRR = "hrr"
FIPS = "fips"
Expand Down
37 changes: 37 additions & 0 deletions changehc/delphi_changehc/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,40 @@ def load_flu_data(denom_filepath, flu_filepath, base_geo,
issue_date, test_mode=False, check_nd=25)
store_backfill_file(data, issue_date, backfill_dir, numtype, geo, weekday)
return data


def load_ili_data(denom_filepath, flu_filepath, flu_like_filepath, dropdate, base_geo):
"""Load in denominator and ili data, and combine them.

Args:
denom_filepath: path to the aggregated denominator data
flu_filepath: path to the aggregated flu data
flu_like_filepath: path to the aggregated flu_like data
dropdate: data drop date (datetime object)
base_geo: base geographic unit before aggregation ('fips')

Returns:
combined multiindexed dataframe, index 0 is geo_base, index 1 is date
"""
assert base_geo == "fips", "base unit must be 'fips'"

# load each data stream
denom_data = load_chng_data(denom_filepath, dropdate, base_geo,
Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL)
flu_data = load_chng_data(flu_filepath, dropdate, base_geo,
Config.FLU_COLS, Config.FLU_DTYPES, Config.FLU_COL)
flu_like_data = load_chng_data(flu_like_filepath, dropdate, base_geo,
Config.FLU_LIKE_COLS, Config.FLU_DTYPES, Config.FLU_LIKE_COL)

# merge data
data = denom_data.merge(flu_data, how="outer", left_index=True, right_index=True)
data = data.merge(flu_like_data, how="outer", left_index=True, right_index=True)
assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge"

# calculate combined numerator and denominator
data.fillna(0, inplace=True)
data["num"] = data[Config.FLU_COL] + data[Config.FLU_LIKE_COL]
data["den"] = data[Config.DENOM_COL]
data = data[["num", "den"]]

return data
18 changes: 17 additions & 1 deletion changehc/delphi_changehc/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

# first party
from .download_ftp_files import download_counts
from .load_data import (load_combined_data, load_cli_data, load_flu_data)
from .load_data import load_combined_data, load_cli_data, load_flu_data, load_ili_data
from .update_sensor import CHCSensorUpdater


Expand Down Expand Up @@ -52,6 +52,9 @@ def retrieve_files(params, filedate, logger):
file_dict["covid_like"] = covid_like_file
if "flu" in params["indicator"]["types"]:
file_dict["flu"] = flu_file
if "ili" in params["indicator"]["types"]:
file_dict["flu"] = flu_file
file_dict["flu_like"] = flu_like_file
return file_dict


Expand All @@ -77,6 +80,15 @@ def make_asserts(params):
if "flu" in params["indicator"]["types"]:
assert (files["denom"] is None) == (files["flu"] is None), \
"exactly one of denom and flu files are provided"
if "ili" in params["indicator"]["types"]:
if files["denom"] is None:
assert files["flu"] is None and \
files["flu_like"] is None,\
"files must be all present or all absent"
else:
assert files["flu"] is not None and \
files["flu_like"] is not None,\
"files must be all present or all absent"


def run_module(params: Dict[str, Dict[str, Any]]):
Expand Down Expand Up @@ -195,6 +207,10 @@ def run_module(params: Dict[str, Dict[str, Any]]):
data = load_flu_data(file_dict["denom"],file_dict["flu"],
"fips",backfill_dir, geo, weekday,
numtype, generate_backfill_files, backfill_merge_day)
elif numtype == "ili":
data = load_ili_data(file_dict["denom"],file_dict["flu"],
file_dict["flu_like"],dropdate_dt,"fips")

more_stats = su_inst.update_sensor(
data,
params["common"]["export_dir"],
Expand Down
4 changes: 3 additions & 1 deletion changehc/delphi_changehc/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# first party
from .config import Config
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI,\
SMOOTHED_FLU, SMOOTHED_ADJ_FLU, NA
SMOOTHED_FLU, SMOOTHED_ADJ_FLU, SMOOTHED_ILI, SMOOTHED_ADJ_ILI, NA
from .sensor import CHCSensor


Expand Down Expand Up @@ -117,6 +117,8 @@ def __init__(self,
signal_name = SMOOTHED_ADJ_CLI if self.weekday else SMOOTHED_CLI
elif self.numtype == "flu":
signal_name = SMOOTHED_ADJ_FLU if self.weekday else SMOOTHED_FLU
elif self.numtype == "ili":
signal_name = SMOOTHED_ADJ_ILI if self.weekday else SMOOTHED_ILI
else:
raise ValueError(f'Unsupported numtype received "{numtype}",'
f' must be one of ["covid", "cli", "flu"]')
Expand Down
17 changes: 17 additions & 0 deletions changehc/tests/test_load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
"input_denom_file": "test_data/20200601_Counts_Products_Denom.dat.gz",
"input_covid_file": "test_data/20200601_Counts_Products_Covid.dat.gz",
"input_flu_file": "test_data/20200601_Counts_Products_Covid.dat.gz",
"input_flu_like_file": "test_data/20200601_Counts_Products_Covid.dat.gz",
"backfill_dir": "./backfill",
"drop_date": "2020-06-01"
}
}
COVID_FILEPATH = PARAMS["indicator"]["input_covid_file"]
FLU_FILEPATH = PARAMS["indicator"]["input_flu_file"]
FLU_LIKE_FILEPATH = PARAMS["indicator"]["input_flu_like_file"]
DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"]
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
backfill_dir = PARAMS["indicator"]["backfill_dir"]
Expand All @@ -42,6 +44,8 @@ class TestLoadData:
True, backfill_merge_day)
flu_data = load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, "fips",
backfill_dir, geo, weekday, "flu", True, backfill_merge_day)
ili_data = load_ili_data(DENOM_FILEPATH, FLU_FILEPATH, FLU_LIKE_FILEPATH,
DROP_DATE,"fips")
gmpr = GeoMapper()

def test_base_unit(self):
Expand All @@ -61,6 +65,9 @@ def test_base_unit(self):
load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, "foo",
backfill_dir, geo, weekday, "covid", True, backfill_merge_day)

with pytest.raises(AssertionError):
load_ili_data(DENOM_FILEPATH, FLU_FILEPATH, FLU_LIKE_FILEPATH, DROP_DATE, "foo")

def test_denom_columns(self):
assert "fips" in self.denom_data.index.names
assert "timestamp" in self.denom_data.index.names
Expand Down Expand Up @@ -99,6 +106,16 @@ def test_flu_columns(self):
assert len(
set(self.flu_data.columns) - set(expected_flu_columns)) == 0

def test_ili_columns(self):
assert "fips" in self.ili_data.index.names
assert "timestamp" in self.ili_data.index.names

expected_ili_columns = ["num", "den"]
for col in expected_ili_columns:
assert col in self.ili_data.columns
assert len(
set(self.ili_data.columns) - set(expected_ili_columns)) == 0

def test_edge_values(self):
for data in [self.denom_data,
self.covid_data,
Expand Down