Skip to content

Commit

Permalink
NANS for HHS:
Browse files Browse the repository at this point in the history
* add missing columns
  • Loading branch information
dshemetov committed Nov 12, 2021
1 parent 0b7103a commit 29ba7b9
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 81 deletions.
145 changes: 80 additions & 65 deletions hhs_hosp/delphi_hhs/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@

import time
from delphi_epidata import Epidata
from delphi_utils.export import create_export_csv
from delphi_utils.geomap import GeoMapper
from delphi_utils import get_structured_logger
from delphi_utils import create_export_csv, get_structured_logger, Nans, GeoMapper
import numpy as np
import pandas as pd

from .constants import SIGNALS, GEOS, SMOOTHERS, CONFIRMED, SUM_CONF_SUSP, CONFIRMED_FLU


def _date_to_int(d):
"""Return a date object as a yyyymmdd int."""
return int(d.strftime("%Y%m%d"))
Expand Down Expand Up @@ -64,6 +63,19 @@ def generate_date_ranges(start, end):
return output


def add_nancodes(df):
"""Add nancodes to a signal dataframe."""
# Default missingness codes
df["missing_val"] = Nans.NOT_MISSING
df["missing_se"] = Nans.NOT_APPLICABLE
df["missing_sample_size"] = Nans.NOT_APPLICABLE

# Mark any remaining nans with unknown
remaining_nans_mask = df["val"].isnull()
df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER
return df


def run_module(params):
"""
Generate ground truth HHS hospitalization data.
Expand All @@ -79,16 +91,16 @@ def run_module(params):
"""
start_time = time.time()
logger = get_structured_logger(
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)
mapper = GeoMapper()
request_all_states = ",".join(mapper.get_geo_values("state_id"))
end_day = date.today()
if "epidata" in params["common"] and \
"as_of" in params["common"]["epidata"]:
if "epidata" in params["common"] and "as_of" in params["common"]["epidata"]:
end_day = min(
end_day,
datetime.strptime(str(params["common"]["epidata"]["as_of"]), "%Y%m%d").date()
end_day, datetime.strptime(str(params["common"]["epidata"]["as_of"]), "%Y%m%d").date()
)
past_reference_day = date(year=2020, month=1, day=1) # first available date in DB
date_range = generate_date_ranges(past_reference_day, end_day)
Expand All @@ -100,33 +112,32 @@ def run_module(params):
raise Exception(f"Bad result from Epidata for {r}: {response['message']}")
if response["result"] == -2 and r == date_range[-1]: # -2 code means no results
continue
dfs.append(pd.DataFrame(response['epidata']))
dfs.append(pd.DataFrame(response["epidata"]))
all_columns = pd.concat(dfs)
geo_mapper = GeoMapper()
stats = []
for sensor, smoother, geo in product(SIGNALS, SMOOTHERS, GEOS):
logger.info("Generating signal and exporting to CSV",
geo_res = geo,
sensor = sensor,
smoother = smoother)
df = geo_mapper.add_geocode(make_signal(all_columns, sensor),
"state_id",
"state_code",
from_col="state")
logger.info(
"Generating signal and exporting to CSV", geo_res=geo, sensor=sensor, smoother=smoother
)
df = geo_mapper.add_geocode(
make_signal(all_columns, sensor), "state_id", "state_code", from_col="state"
)
if sensor.endswith("_prop"):
df=pop_proportion(df, geo_mapper)
df = pop_proportion(df, geo_mapper)
df = make_geo(df, geo, geo_mapper)
df["se"] = np.nan
df["sample_size"] = np.nan
df = smooth_values(df, smoother[0])
df = add_nancodes(df)
if df.empty:
continue
sensor_name = sensor + smoother[1]
# don't export first 6 days for smoothed signals since they'll be nan.
start_date = min(df.timestamp) + timedelta(6) if smoother[1] else min(df.timestamp)
dates = create_export_csv(df,
params["common"]["export_dir"],
geo,
sensor_name,
start_date=start_date)
dates = create_export_csv(
df, params["common"]["export_dir"], geo, sensor_name, start_date=start_date
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))

Expand All @@ -135,71 +146,75 @@ def run_module(params):
csv_export_count = sum(s[-1] for s in stats)
max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days
formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d")
logger.info("Completed indicator run",
elapsed_time_in_seconds = elapsed_time_in_seconds,
csv_export_count = csv_export_count,
max_lag_in_days = max_lag_in_days,
oldest_final_export_date = formatted_min_max_date)
logger.info(
"Completed indicator run",
elapsed_time_in_seconds=elapsed_time_in_seconds,
csv_export_count=csv_export_count,
max_lag_in_days=max_lag_in_days,
oldest_final_export_date=formatted_min_max_date,
)


def smooth_values(df, smoother):
"""Smooth the value column in the dataframe."""
df["val"] = df["val"].astype(float)
df["val"] = df[["geo_id", "val"]].groupby("geo_id")["val"].transform(
smoother.smooth
)
df["val"] = df[["geo_id", "val"]].groupby("geo_id")["val"].transform(smoother.smooth)
return df

def pop_proportion(df,geo_mapper):

def pop_proportion(df, geo_mapper):
"""Get the population-proportionate variants as the dataframe val."""
pop_val=geo_mapper.add_population_column(df, "state_code")
df["val"]=round(df["val"]/pop_val["population"]*100000, 7)
pop_val = geo_mapper.add_population_column(df, "state_code")
df["val"] = round(df["val"] / pop_val["population"] * 100000, 7)
pop_val.drop("population", axis=1, inplace=True)
return df


def make_geo(state, geo, geo_mapper):
"""Transform incoming geo (state) to another geo."""
if geo == "state":
exported = state.rename(columns={"state": "geo_id"})
else:
exported = geo_mapper.replace_geocode(state, "state_code", geo, new_col="geo_id")
exported["se"] = np.nan
exported["sample_size"] = np.nan
exported = geo_mapper.replace_geocode(
state, "state_code", geo, new_col="geo_id", date_col="timestamp"
)
return exported


def make_signal(all_columns, sig):
"""Generate column sums according to signal name."""
assert sig in SIGNALS, f"Unexpected signal name '{sig}';" + \
" familiar names are '{', '.join(SIGNALS)}'"
assert sig in SIGNALS, (
f"Unexpected signal name '{sig}';" + " familiar names are '{', '.join(SIGNALS)}'"
)
if sig.startswith(CONFIRMED):
df = pd.DataFrame({
"state": all_columns.state.apply(str.lower),
"timestamp":int_date_to_previous_day_datetime(all_columns.date),
"val": \
all_columns.previous_day_admission_adult_covid_confirmed + \
all_columns.previous_day_admission_pediatric_covid_confirmed
})
df = pd.DataFrame(
{
"state": all_columns.state.apply(str.lower),
"timestamp": int_date_to_previous_day_datetime(all_columns.date),
"val": all_columns.previous_day_admission_adult_covid_confirmed
+ all_columns.previous_day_admission_pediatric_covid_confirmed,
}
)
elif sig.startswith(SUM_CONF_SUSP):
df = pd.DataFrame({
"state": all_columns.state.apply(str.lower),
"timestamp":int_date_to_previous_day_datetime(all_columns.date),
"val": \
all_columns.previous_day_admission_adult_covid_confirmed + \
all_columns.previous_day_admission_adult_covid_suspected + \
all_columns.previous_day_admission_pediatric_covid_confirmed + \
all_columns.previous_day_admission_pediatric_covid_suspected,
})
df = pd.DataFrame(
{
"state": all_columns.state.apply(str.lower),
"timestamp": int_date_to_previous_day_datetime(all_columns.date),
"val": all_columns.previous_day_admission_adult_covid_confirmed
+ all_columns.previous_day_admission_adult_covid_suspected
+ all_columns.previous_day_admission_pediatric_covid_confirmed
+ all_columns.previous_day_admission_pediatric_covid_suspected,
}
)
elif sig.startswith(CONFIRMED_FLU):
df = pd.DataFrame({
"state": all_columns.state.apply(str.lower),
"timestamp":int_date_to_previous_day_datetime(all_columns.date),
"val": \
all_columns.previous_day_admission_influenza_confirmed
})
else:
raise Exception(
"Bad programmer: signal '{sig}' in SIGNALS but not handled in make_signal"
df = pd.DataFrame(
{
"state": all_columns.state.apply(str.lower),
"timestamp": int_date_to_previous_day_datetime(all_columns.date),
"val": all_columns.previous_day_admission_influenza_confirmed,
}
)
else:
raise Exception("Bad programmer: signal '{sig}' in SIGNALS but not handled in make_signal")
df["val"] = df.val.astype(float)
return df
47 changes: 31 additions & 16 deletions hhs_hosp/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import tempfile
import os

from delphi_hhs.run import _date_to_int, int_date_to_previous_day_datetime, generate_date_ranges, \
from delphi_hhs.run import _date_to_int, add_nancodes, int_date_to_previous_day_datetime, generate_date_ranges, \
make_signal, make_geo, run_module, pop_proportion
from delphi_hhs.constants import SMOOTHERS, GEOS, SIGNALS, \
CONFIRMED, SUM_CONF_SUSP, CONFIRMED_FLU, CONFIRMED_PROP, SUM_CONF_SUSP_PROP, CONFIRMED_FLU_PROP
from delphi_utils.geomap import GeoMapper
from delphi_utils import GeoMapper, Nans
from freezegun import freeze_time
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -85,15 +85,15 @@ def test_make_signal():
})
pd.testing.assert_frame_equal(expected_flu, make_signal(data, CONFIRMED_FLU))
pd.testing.assert_frame_equal(expected_flu, make_signal(data, CONFIRMED_FLU_PROP))

with pytest.raises(Exception):
make_signal(data, "zig")

def test_pop_proportion():
geo_mapper = GeoMapper()
state_pop = geo_mapper.get_crosswalk("state_code", "pop")

test_df = pd.DataFrame({
test_df = pd.DataFrame({
'state': ['PA'],
'state_code': [42],
'timestamp': [datetime(year=2020, month=1, day=1)],
Expand All @@ -109,7 +109,7 @@ def test_pop_proportion():
'val': [15/pa_pop*100000],})
)

test_df= pd.DataFrame({
test_df= pd.DataFrame({
'state': ['WV'],
'state_code': [54],
'timestamp': [datetime(year=2020, month=1, day=1)],
Expand Down Expand Up @@ -137,30 +137,23 @@ def test_make_geo():
'val': [1., 2., 4.],
})

template = {
'se': np.nan,
'sample_size': np.nan,
}
expecteds = {
"state": pd.DataFrame(
dict(template,
geo_id=data.state,
dict(geo_id=data.state,
timestamp=data.timestamp,
val=data.val)),
"hhs": pd.DataFrame(
dict(template,
geo_id=['3', '5'],
dict(geo_id=['3', '5'],
timestamp=[test_timestamp] * 2,
val=[3., 4.])),
"nation": pd.DataFrame(
dict(template,
geo_id=['us'],
dict(geo_id=['us'],
timestamp=[test_timestamp],
val=[7.]))
}
for geo, expected in expecteds.items():
result = make_geo(data, geo, geo_mapper)
for series in ["geo_id", "timestamp", "val", "se", "sample_size"]:
for series in ["geo_id", "timestamp", "val"]:
pd.testing.assert_series_equal(expected[series], result[series], obj=f"{geo}:{series}")


Expand Down Expand Up @@ -207,3 +200,25 @@ def test_ignore_last_range_no_results(mock_covid_hosp, mock_export):
}
}
assert not run_module(params) # function should not raise value error and has no return value

def test_add_nancode():
data = pd.DataFrame({
'state': ['PA','WV','OH'],
'state_code': [42, 54, 39],
'timestamp': [pd.to_datetime("20200601")]*3,
'val': [1, 2, np.nan],
'se': [np.nan] * 3,
'sample_size': [np.nan] * 3,
})
expected = pd.DataFrame({
'state': ['PA','WV','OH'],
'state_code': [42, 54, 39],
'timestamp': [pd.to_datetime("20200601")]*3,
'val': [1, 2, np.nan],
'se': [np.nan] * 3,
'sample_size': [np.nan] * 3,
'missing_val': [Nans.NOT_MISSING] * 2 + [Nans.OTHER],
'missing_se': [Nans.NOT_APPLICABLE] * 3,
'missing_sample_size': [Nans.NOT_APPLICABLE] * 3,
})
pd.testing.assert_frame_equal(expected, add_nancodes(data))

0 comments on commit 29ba7b9

Please sign in to comment.