diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 8ac5de48e..8d0a97d3b 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -11,7 +11,8 @@ from .nancodes import Nans -def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): + +def filter_contradicting_missing_codes(df, sensor, metric, logger=None): """Find values with contradictory missingness codes, filter them, and log.""" columns = ["val", "se", "sample_size"] # Get indicies where the XNOR is true (i.e. both are true or both are false). @@ -21,12 +22,8 @@ def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): ] for mask in masks: if not logger is None and df.loc[mask].size > 0: - logger.info( - "Filtering contradictory missing code", - sensor=sensor, - metric=metric, - date=date.strftime(format="%Y-%m-%d"), - ) + date = df.loc[mask]["timestamp"][0].strftime("%Y%m%d") + logger.info("Filtering contradictory missing code", sensor=sensor, metric=metric, date=date) df = df.loc[~mask] elif logger is None and df.loc[mask].size > 0: df = df.loc[~mask] @@ -100,6 +97,26 @@ def create_export_csv( else: dates = pd.date_range(start_date, end_date) + if remove_null_samples: + df = df[df["sample_size"].notnull()] + if sort_geos: + df = df.sort_values(by="geo_id") + if "missing_val" in df.columns: + df = filter_contradicting_missing_codes(df, sensor, metric, logger=logger) + + expected_columns = [ + "geo_id", + "val", + "se", + "sample_size", + "timestamp", + "missing_val", + "missing_se", + "missing_sample_size", + ] + df = df.filter(items=expected_columns) + df = df.round({"val": 7, "se": 7}) + for date in dates: if weekly_dates: t = Week.fromdate(pd.to_datetime(str(date))) @@ -111,24 +128,14 @@ def create_export_csv( else: export_filename = f"{date_str}_{geo_res}_{metric}_{sensor}.csv" export_file = join(export_dir, export_filename) - expected_columns = [ - "geo_id", - "val", - "se", - "sample_size", - "missing_val", - "missing_se", - "missing_sample_size" - ] - export_df = df[df["timestamp"] == date].filter(items=expected_columns) - if "missing_val" in export_df.columns: - export_df = filter_contradicting_missing_codes( - export_df, sensor, metric, date, logger=logger - ) - if remove_null_samples: - export_df = export_df[export_df["sample_size"].notnull()] - export_df = export_df.round({"val": 7, "se": 7}) - if sort_geos: - export_df = export_df.sort_values(by="geo_id") + export_df = df[df["timestamp"] == date] + export_df = export_df.drop("timestamp", axis=1) export_df.to_csv(export_file, index=False, na_rep="NA") + + logger.debug( + "Wrote rows", + num_rows=df.size, + geo_type=geo_res, + num_geo_ids=export_df["geo_id"].unique().size, + ) return dates diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index c9c1f8483..abe5f0347 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -1,4 +1,5 @@ """Tests for exporting CSV files.""" +import logging from datetime import datetime from os import listdir from os.path import join @@ -11,6 +12,7 @@ from delphi_utils import create_export_csv, Nans +TEST_LOGGER = logging.getLogger() def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame: assert all(isinstance(e, type) or isinstance(e, str) for e in dtypes.values()), ( @@ -102,6 +104,7 @@ def test_export_with_metric(self, tmp_path): metric="deaths", geo_res="county", sensor="test", + logger=TEST_LOGGER ) assert set(listdir(tmp_path)) == set( @@ -122,6 +125,7 @@ def test_export_rounding(self, tmp_path): metric="deaths", geo_res="county", sensor="test", + logger=TEST_LOGGER ) assert_frame_equal( pd.read_csv(join(tmp_path, "20200215_county_deaths_test.csv")), @@ -144,6 +148,7 @@ def test_export_without_metric(self, tmp_path): export_dir=tmp_path, geo_res="county", sensor="test", + logger=TEST_LOGGER ) assert set(listdir(tmp_path)) == set( @@ -163,6 +168,7 @@ def test_export_with_limiting_start_date(self, tmp_path): export_dir=tmp_path, geo_res="county", sensor="test", + logger=TEST_LOGGER ) assert set(listdir(tmp_path)) == set( @@ -182,6 +188,7 @@ def test_export_with_limiting_end_date(self, tmp_path): export_dir=tmp_path, geo_res="county", sensor="test", + logger=TEST_LOGGER ) assert set(listdir(tmp_path)) == set( @@ -199,6 +206,7 @@ def test_export_with_no_dates(self, tmp_path): export_dir=tmp_path, geo_res="state", sensor="test", + logger=TEST_LOGGER ) assert set(listdir(tmp_path)) == set( @@ -228,7 +236,8 @@ def test_export_with_null_removal(self, tmp_path): export_dir=tmp_path, geo_res="state", sensor="test", - remove_null_samples=True + remove_null_samples=True, + logger=TEST_LOGGER ) assert set(listdir(tmp_path)) == set( @@ -259,7 +268,8 @@ def test_export_without_null_removal(self, tmp_path): export_dir=tmp_path, geo_res="state", sensor="test", - remove_null_samples=False + remove_null_samples=False, + logger=TEST_LOGGER ) assert set(listdir(tmp_path)) == set( @@ -275,7 +285,7 @@ def test_export_without_null_removal(self, tmp_path): def test_export_df_without_missingness(self, tmp_path): create_export_csv( - df=self.DF.copy(), export_dir=tmp_path, geo_res="county", sensor="test" + df=self.DF.copy(), export_dir=tmp_path, geo_res="county", sensor="test", logger=TEST_LOGGER ) df = pd.read_csv(join(tmp_path, "20200215_county_test.csv")).astype( {"geo_id": str, "sample_size": int} @@ -297,6 +307,7 @@ def test_export_df_with_missingness(self, tmp_path): export_dir=tmp_path, geo_res="county", sensor="test", + logger=TEST_LOGGER ) assert set(listdir(tmp_path)) == set( [ @@ -324,12 +335,12 @@ def test_export_df_with_missingness(self, tmp_path): @mock.patch("delphi_utils.logger") def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path): sensor = "test" - geo_res = "state" + geo = "state" create_export_csv( df=self.DF3.copy(), export_dir=tmp_path, + geo_res=geo, sensor=sensor, - geo_res=geo_res, logger=mock_logger ) assert set(listdir(tmp_path)) == set( @@ -360,7 +371,8 @@ def test_export_sort(self, tmp_path): unsorted_df, export_dir=tmp_path, geo_res="county", - sensor="test" + sensor="test", + logger=TEST_LOGGER ) expected_df = pd.DataFrame({ "geo_id": ["51175", "51093"], @@ -376,7 +388,8 @@ def test_export_sort(self, tmp_path): export_dir=tmp_path, geo_res="county", sensor="test", - sort_geos=True + sort_geos=True, + logger=TEST_LOGGER ) expected_df = pd.DataFrame({ "geo_id": ["51093", "51175"], diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index 7c78dc020..180aa0df8 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -58,7 +58,8 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa start_date=start_date, end_date=end_date, sensor=out_name, - write_empty_days=True + write_empty_days=True, + logger=logger, ) logger.debug("Wrote rows", num_rows=df.size, geo_type=geo_level, num_geo_ids=df["geo_id"].unique().size) logger.debug("Wrote files", export_dir=output_path) diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index a9752072c..ed9b2296b 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -5,22 +5,20 @@ when the module is run with `python -m delphi_claims_hosp`. """ -# standard packages -import time import os +import time from datetime import datetime, timedelta from pathlib import Path -# third party from delphi_utils import get_structured_logger +from delphi_utils.export import create_export_csv -# first party +from .backfill import merge_backfill_file, store_backfill_file from .config import Config from .download_claims_ftp_files import download -from .modify_claims_drops import modify_and_write from .get_latest_claims_name import get_latest_filename +from .modify_claims_drops import modify_and_write from .update_indicator import ClaimsHospIndicatorUpdater -from .backfill import (store_backfill_file, merge_backfill_file) def run_module(params): @@ -138,10 +136,19 @@ def run_module(params): signal_name, logger, ) - updater.update_indicator( + output = updater.update_indicator( claims_file, - params["common"]["export_dir"], ) + filtered_output_df = updater.preprocess_output(output) + create_export_csv( + filtered_output_df, + export_dir=params["common"]["export_dir"], + start_date=startdate, + geo_res=geo, + sensor=signal_name, + logger=logger, + ) + max_dates.append(updater.output_dates[-1]) n_csv_export.append(len(updater.output_dates)) logger.info("Finished updating", geo_type=geo) diff --git a/claims_hosp/delphi_claims_hosp/update_indicator.py b/claims_hosp/delphi_claims_hosp/update_indicator.py index 5ba8ddd22..56dd2fe54 100644 --- a/claims_hosp/delphi_claims_hosp/update_indicator.py +++ b/claims_hosp/delphi_claims_hosp/update_indicator.py @@ -132,18 +132,15 @@ def geo_reindex(self, data): data_frame.fillna(0, inplace=True) return data_frame - def update_indicator(self, input_filepath, outpath): + def update_indicator(self, input_filepath): """ Generate and output indicator values. Args: input_filepath: path to the aggregated claims data - outpath: output path for the csv results - """ self.shift_dates() - final_output_inds = \ - (self.burn_in_dates >= self.startdate) & (self.burn_in_dates <= self.enddate) + final_output_inds = (self.burn_in_dates >= self.startdate) & (self.burn_in_dates <= self.enddate) # load data base_geo = Config.HRR_COL if self.geo == Config.HRR_COL else Config.FIPS_COL @@ -163,22 +160,18 @@ def update_indicator(self, input_filepath, outpath): if self.weekday else None ) - # run fitting code (maybe in parallel) - rates = {} - std_errs = {} - valid_inds = {} + df_lst = [] if not self.parallel: for geo_id, sub_data in data_frame.groupby(level=0): sub_data.reset_index(inplace=True) if self.weekday: - sub_data = Weekday.calc_adjustment( - wd_params, sub_data, ["num"], Config.DATE_COL) + sub_data = Weekday.calc_adjustment(wd_params, sub_data, ["num"], Config.DATE_COL) sub_data.set_index(Config.DATE_COL, inplace=True) res = ClaimsHospIndicator.fit(sub_data, self.burnindate, geo_id) - res = pd.DataFrame(res) - rates[geo_id] = np.array(res.loc[final_output_inds, "rate"]) - std_errs[geo_id] = np.array(res.loc[final_output_inds, "se"]) - valid_inds[geo_id] = np.array(res.loc[final_output_inds, "incl"]) + temp_df = pd.DataFrame(res) + temp_df = temp_df.loc[final_output_inds] + df_lst.append(pd.DataFrame(temp_df)) + output_df = pd.concat(df_lst) else: n_cpu = min(Config.MAX_CPU_POOL, cpu_count()) self.logger.debug("Starting pool", n_workers=n_cpu) @@ -187,82 +180,57 @@ def update_indicator(self, input_filepath, outpath): for geo_id, sub_data in data_frame.groupby(level=0, as_index=False): sub_data.reset_index(inplace=True) if self.weekday: - sub_data = Weekday.calc_adjustment( - wd_params, sub_data, ["num"], Config.DATE_COL) + sub_data = Weekday.calc_adjustment(wd_params, sub_data, ["num"], Config.DATE_COL) sub_data.set_index(Config.DATE_COL, inplace=True) pool_results.append( pool.apply_async( ClaimsHospIndicator.fit, - args=(sub_data, self.burnindate, geo_id,), + args=( + sub_data, + self.burnindate, + geo_id, + ), ) ) - pool_results = [proc.get() for proc in pool_results] - for res in pool_results: - geo_id = res["geo_id"] - res = pd.DataFrame(res) - rates[geo_id] = np.array(res.loc[final_output_inds, "rate"]) - std_errs[geo_id] = np.array(res.loc[final_output_inds, "se"]) - valid_inds[geo_id] = np.array(res.loc[final_output_inds, "incl"]) - - # write out results - unique_geo_ids = list(rates.keys()) - output_dict = { - "rates": rates, - "se": std_errs, - "dates": self.output_dates, - "geo_ids": unique_geo_ids, - "geo_level": self.geo, - "include": valid_inds, - } - - self.write_to_csv(output_dict, outpath) - self.logger.debug("Wrote files", export_dir=outpath) - - def write_to_csv(self, output_dict, output_path="./receiving"): + df_lst = [pd.DataFrame(proc.get()).loc([final_output_inds]) for proc in pool_results] + output_df = pd.concat(df_lst) + + return output_df + + def preprocess_output(self, df) -> pd.DataFrame: """ - Write values to csv. + Check for any anomlies and formats the output for exports. - Args: - output_dict: dictionary containing values, se, unique dates, and unique geo_id - output_path: outfile path to write the csv + Parameters + ---------- + df + Returns + ------- + df """ + filtered_df = df[df["incl"]] + filtered_df = filtered_df.reset_index() + filtered_df.rename(columns={"rate": "val"}, inplace=True) + filtered_df["timestamp"] = filtered_df["timestamp"].astype(str) + filtered_df["sample_size"] = np.NaN + filtered_df.drop(columns=["incl"], inplace=True) + + # sanity check for data + for geo_id, group in filtered_df.groupby("geo_id"): + assert not group.val.isnull().any() + assert not group.se.isnull().any() + assert np.all(group.se < 5), f"se suspicious, {geo_id}: {np.where(group.se >= 5)[0]}" + if np.any(group.val > 90): + for sus_val in np.where(group.val > 90): + self.logger.warning("value suspicious, %s: %d", geo_id, sus_val) + if self.write_se: + assert np.all(group.val > 0) and np.all(group.se > 0), "p=0, std_err=0 invalid" + if self.write_se: - self.logger.info("WARNING: WRITING SEs", signal=self.signal_name) - - geo_level = output_dict["geo_level"] - dates = output_dict["dates"] - geo_ids = output_dict["geo_ids"] - all_rates = output_dict["rates"] - all_se = output_dict["se"] - all_include = output_dict["include"] - out_n = 0 - for i, date in enumerate(dates): - filename = "%s/%s_%s_%s.csv" % ( - output_path, - (date + Config.DAY_SHIFT).strftime("%Y%m%d"), - geo_level, - self.signal_name, - ) - with open(filename, "w") as outfile: - outfile.write("geo_id,val,se,direction,sample_size\n") - for geo_id in geo_ids: - val = all_rates[geo_id][i] - se = all_se[geo_id][i] - if all_include[geo_id][i]: - assert not np.isnan(val), "value for included value is nan" - assert not np.isnan(se), "se for included rate is nan" - if val > 90: - self.logger.warning("Value suspicious", geo_type=geo_level, geo_value=geo_id, value=val) - assert se < 5, f"se suspicious, {geo_id}: {se}" - if self.write_se: - assert val > 0 and se > 0, "p=0, std_err=0 invalid" - outfile.write( - "%s,%f,%s,%s,%s\n" % (geo_id, val, se, "NA", "NA")) - else: - # for privacy reasons we will not report the standard error - outfile.write( - "%s,%f,%s,%s,%s\n" % (geo_id, val, "NA", "NA", "NA")) - out_n += 1 - - self.logger.debug("Wrote rows", num_rows=out_n, geo_type=geo_level, num_geo_ids=len(geo_ids)) + self.logger.info("WARNING: WRITING SEs") + else: + filtered_df["se"] = np.NaN + + assert sorted(list(filtered_df.columns)) == ["geo_id", "sample_size", "se", "timestamp", "val"] + return filtered_df diff --git a/claims_hosp/tests/test_update_indicator.py b/claims_hosp/tests/test_update_indicator.py index 5ca527287..d8c46e717 100644 --- a/claims_hosp/tests/test_update_indicator.py +++ b/claims_hosp/tests/test_update_indicator.py @@ -13,6 +13,7 @@ # first party from delphi_claims_hosp.config import Config, GeoConstants from delphi_claims_hosp.update_indicator import ClaimsHospIndicatorUpdater +from delphi_utils.export import create_export_csv CONFIG = Config() CONSTANTS = GeoConstants() @@ -35,6 +36,9 @@ class TestClaimsHospIndicatorUpdater: weekday = False write_se = False prefix = "foo" + start_date = "02-01-2020" + end_date = "06-01-2020" + drop_date = "06-12-2020" small_test_data = pd.DataFrame({ "num": [0, 100, 200, 300, 400, 500, 600, 100, 200, 300, 400, 500, 600], "hrr": [1.0] * 7 + [2.0] * 6, @@ -44,9 +48,9 @@ class TestClaimsHospIndicatorUpdater: def test_shift_dates(self): updater = ClaimsHospIndicatorUpdater( - "02-01-2020", - "06-01-2020", - "06-12-2020", + self.start_date, + self.end_date, + self.drop_date, self.geo, self.parallel, self.weekday, @@ -66,9 +70,9 @@ def test_shift_dates(self): def test_geo_reindex(self): updater = ClaimsHospIndicatorUpdater( - "02-01-2020", - "06-01-2020", - "06-12-2020", + self.start_date, + self.end_date, + self.drop_date, self.geo, self.parallel, self.weekday, @@ -85,9 +89,9 @@ def test_update_indicator(self): for geo in ["state", "hrr", "hhs", "nation"]: td = TemporaryDirectory() updater = ClaimsHospIndicatorUpdater( - "02-01-2020", - "06-01-2020", - "06-12-2020", + self.start_date, + self.end_date, + self.drop_date, geo, self.parallel, self.weekday, @@ -96,20 +100,44 @@ def test_update_indicator(self): TEST_LOGGER ) - updater.update_indicator( + output = updater.update_indicator( DATA_FILEPATH, - td.name + TEST_LOGGER ) + filtered_output = updater.preprocess_output(output) + create_export_csv(filtered_output, + td.name, + start_date=self.start_date, + end_date=self.end_date, + geo_res=geo, + write_empty_days=True, + sensor=Config.signal_name, + logger=TEST_LOGGER + ) + # output date range is half exclusive [2020-02-01 to 2020-06-01) + # while the export function considers fully inclusive [2020-02-01. 2020-06-01] assert len(os.listdir(td.name)) == len( - updater.output_dates), f"failed {geo} update_indicator test" + updater.output_dates) + 1, f"failed {geo} update_indicator test" td.cleanup() + def prepare_df(self, d): + df_list = [] + for geo in d.get("geo_ids"): + df_list.append(pd.DataFrame({"geo_id": geo,"rate": d["rates"][geo], "se": d["se"][geo], + "incl": d["include"][geo], "timestamp": d["dates"], + "sample_size": [np.nan, np.nan, np.nan] + })) + + output_df = pd.concat(df_list) + output_df.index = output_df.timestamp + output_df.drop(columns=["timestamp"], inplace=True) + return output_df def test_write_to_csv_results(self): updater = ClaimsHospIndicatorUpdater( - "02-01-2020", - "06-01-2020", - "06-12-2020", + self.start_date, + self.end_date, + self.drop_date, self.geo, self.parallel, self.weekday, @@ -140,46 +168,51 @@ def test_write_to_csv_results(self): "geo_level": "geography", } + output_df = self.prepare_df(res0) + filtered_output_df = updater.preprocess_output(output_df) + td = TemporaryDirectory() - updater.write_to_csv(res0, td.name) + create_export_csv(filtered_output_df, td.name, + start_date=self.start_date, + end_date=self.end_date, + geo_res=res0["geo_level"], + sensor=Config.signal_name, + logger=TEST_LOGGER) # check outputs expected_name = f"20200501_geography_{Config.signal_name}.csv" assert exists(join(td.name, expected_name)) output_data = pd.read_csv(join(td.name, expected_name)) assert ( - output_data.columns == ["geo_id", "val", "se", "direction", "sample_size"] + output_data.columns == ["geo_id", "val", "se", "sample_size"] ).all() assert (output_data.geo_id == ["a", "b"]).all() assert np.array_equal(output_data.val.values, np.array([0.1, 1])) # for privacy we do not usually report SEs assert np.isnan(output_data.se.values).all() - assert np.isnan(output_data.direction.values).all() assert np.isnan(output_data.sample_size.values).all() expected_name = f"20200502_geography_{Config.signal_name}.csv" assert exists(join(td.name, expected_name)) output_data = pd.read_csv(join(td.name, expected_name)) assert ( - output_data.columns == ["geo_id", "val", "se", "direction", "sample_size"] + output_data.columns == ["geo_id", "val", "se", "sample_size"] ).all() assert (output_data.geo_id == ["a"]).all() assert np.array_equal(output_data.val.values, np.array([0.5])) assert np.isnan(output_data.se.values).all() - assert np.isnan(output_data.direction.values).all() assert np.isnan(output_data.sample_size.values).all() expected_name = f"20200504_geography_{Config.signal_name}.csv" assert exists(join(td.name, expected_name)) output_data = pd.read_csv(join(td.name, expected_name)) assert ( - output_data.columns == ["geo_id", "val", "se", "direction", "sample_size"] + output_data.columns == ["geo_id", "val", "se", "sample_size"] ).all() assert (output_data.geo_id == ["a", "b"]).all() assert np.array_equal(output_data.val.values, np.array([1.5, 3])) assert np.isnan(output_data.se.values).all() - assert np.isnan(output_data.direction.values).all() assert np.isnan(output_data.sample_size.values).all() td.cleanup() @@ -188,9 +221,9 @@ def test_write_to_csv_with_se_results(self): obfuscated_name = PARAMS["indicator"]["obfuscated_prefix"] signal_name = obfuscated_name + "_" + Config.signal_weekday_name updater = ClaimsHospIndicatorUpdater( - "02-01-2020", - "06-01-2020", - "06-12-2020", + self.start_date, + self.end_date, + self.drop_date, self.geo, self.parallel, True, @@ -221,28 +254,36 @@ def test_write_to_csv_with_se_results(self): "geo_level": "geography", } + output_df = self.prepare_df(res0) + filtered_output_df = updater.preprocess_output(output_df) + td = TemporaryDirectory() - updater.write_to_csv(res0, td.name) + + create_export_csv(filtered_output_df, td.name, + start_date=self.start_date, + end_date=self.end_date, + geo_res=res0["geo_level"], + sensor=signal_name, + logger=TEST_LOGGER) # check outputs expected_name = f"20200501_geography_{signal_name}.csv" assert exists(join(td.name, expected_name)) output_data = pd.read_csv(join(td.name, expected_name)) assert ( - output_data.columns == ["geo_id", "val", "se", "direction", "sample_size"] + output_data.columns == ["geo_id", "val", "se", "sample_size"] ).all() assert (output_data.geo_id == ["a", "b"]).all() assert np.array_equal(output_data.val.values, np.array([0.1, 1])) assert np.array_equal(output_data.se.values, np.array([0.1, 0.5])) - assert np.isnan(output_data.direction.values).all() assert np.isnan(output_data.sample_size.values).all() td.cleanup() - def test_write_to_csv_wrong_results(self): + def test_preprocess_wrong_results(self): updater = ClaimsHospIndicatorUpdater( - "02-01-2020", - "06-01-2020", - "06-12-2020", + self.start_date, + self.end_date, + self.drop_date, self.geo, self.parallel, self.weekday, @@ -273,24 +314,23 @@ def test_write_to_csv_wrong_results(self): "geo_level": "geography", } - td = TemporaryDirectory() - # nan value for included loc-date res1 = deepcopy(res0) res1["rates"]["a"][1] = np.nan + output_df = self.prepare_df(res1) with pytest.raises(AssertionError): - updater.write_to_csv(res1, td.name) + updater.preprocess_output(output_df) # nan se for included loc-date res2 = deepcopy(res0) res2["se"]["a"][1] = np.nan + output_df = self.prepare_df(res2) with pytest.raises(AssertionError): - updater.write_to_csv(res2, td.name) + updater.preprocess_output(output_df) # large se value res3 = deepcopy(res0) res3["se"]["a"][0] = 10 + output_df = self.prepare_df(res3) with pytest.raises(AssertionError): - updater.write_to_csv(res3, td.name) - - td.cleanup() + updater.preprocess_output(output_df) diff --git a/google_symptoms/delphi_google_symptoms/run.py b/google_symptoms/delphi_google_symptoms/run.py index 8ad1d6d10..ec93d4979 100644 --- a/google_symptoms/delphi_google_symptoms/run.py +++ b/google_symptoms/delphi_google_symptoms/run.py @@ -101,7 +101,9 @@ def run_module(params, logger=None): start_date=SMOOTHERS_MAP[smoother][1](export_start_date), metric=metric.lower(), geo_res=geo_res, - sensor=sensor_name) + sensor=sensor_name, + logger=logger, + ) if not exported_csv_dates.empty: logger.info("Exported CSV", csv_export_count=exported_csv_dates.size, diff --git a/hhs_hosp/delphi_hhs/run.py b/hhs_hosp/delphi_hhs/run.py index 22c259f30..12809cc56 100644 --- a/hhs_hosp/delphi_hhs/run.py +++ b/hhs_hosp/delphi_hhs/run.py @@ -122,11 +122,9 @@ def run_module(params): sensor_name = sensor + smoother[1] # don't export first 6 days for smoothed signals since they'll be nan. start_date = min(df.timestamp) + timedelta(6) if smoother[1] else min(df.timestamp) - dates = create_export_csv(df, - params["common"]["export_dir"], - geo, - sensor_name, - start_date=start_date) + dates = create_export_csv( + df, params["common"]["export_dir"], geo, sensor_name, start_date=start_date, logger=logger + ) if len(dates) > 0: stats.append((max(dates), len(dates))) diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index 50ce46cfb..b493a1806 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -91,7 +91,8 @@ def run_module(params: Dict[str, Any]): export_dir=daily_export_dir, start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), sensor=SENSOR_NAME_MAP[metric], - weekly_dates=True + weekly_dates=True, + logger=logger, ) else: for sensor in SENSORS: @@ -116,7 +117,8 @@ def run_module(params: Dict[str, Any]): export_dir=daily_export_dir, start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), sensor=sensor_name, - weekly_dates=True + weekly_dates=True, + logger=logger, ) if len(dates) > 0: stats.append((max(dates), len(dates))) diff --git a/nssp/delphi_nssp/run.py b/nssp/delphi_nssp/run.py index b22d03c20..5c9c286c0 100644 --- a/nssp/delphi_nssp/run.py +++ b/nssp/delphi_nssp/run.py @@ -128,11 +128,7 @@ def run_module(params): df_csv = df[CSV_COLS + ["timestamp"]] # actual export dates = create_export_csv( - df_csv, - geo_res=geo, - export_dir=export_dir, - sensor=signal, - weekly_dates=True, + df_csv, geo_res=geo, export_dir=export_dir, sensor=signal, weekly_dates=True, logger=logger ) if len(dates) > 0: run_stats.append((max(dates), len(dates))) diff --git a/nwss_wastewater/delphi_nwss/run.py b/nwss_wastewater/delphi_nwss/run.py index 60bfc84c7..9228db6ad 100644 --- a/nwss_wastewater/delphi_nwss/run.py +++ b/nwss_wastewater/delphi_nwss/run.py @@ -154,9 +154,7 @@ def run_module(params): # add se, sample_size, and na codes agg_df = add_needed_columns(agg_df) # actual export - dates = create_export_csv( - agg_df, geo_res=geo, export_dir=export_dir, sensor=sensor - ) + dates = create_export_csv(agg_df, geo_res=geo, export_dir=export_dir, sensor=sensor, logger=logger) if len(dates) > 0: run_stats.append((max(dates), len(dates))) ## log this indicator run diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index e6974b6aa..a0b6fada2 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -63,15 +63,17 @@ def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device, export_start_date, export_end_date, # export args threaded_logger): # logger args """Generate sensors, create export CSV then return stats.""" - threaded_logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name) - res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device, - first_date, last_date, suffix) - dates = create_export_csv(res_df, geo_res=geo_res, - sensor=sensor_name, export_dir=export_dir, - start_date=export_start_date, - end_date=export_end_date) + threaded_logger.info("Generating signal and exporting to CSV", geo_res=geo_res, sensor=sensor_name) + res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device, first_date, last_date, suffix) + dates = create_export_csv( + res_df, + geo_res=geo_res, + sensor=sensor_name, + export_dir=export_dir, + start_date=export_start_date, + end_date=export_end_date, + logger=threaded_logger, + ) return dates def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, @@ -80,16 +82,20 @@ def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, de export_start_date, export_end_date, # export args threaded_logger): # logger args """Generate sensors, create export CSV then return stats.""" - threaded_logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name) - res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, - first_date, last_date, suffix) - dates = create_export_csv(res_df, geo_res=geo_res, - sensor=sensor_name, export_dir=export_dir, - start_date=export_start_date, - end_date=export_end_date, - remove_null_samples=True) # for parent geo, remove null sample size + threaded_logger.info("Generating signal and exporting to CSV", geo_res=geo_res, sensor=sensor_name) + res_df = generate_sensor_for_parent_geo( + geo_groups, geo_data, res_key, smooth, device, first_date, last_date, suffix + ) + dates = create_export_csv( + res_df, + geo_res=geo_res, + sensor=sensor_name, + export_dir=export_dir, + start_date=export_start_date, + end_date=export_end_date, + remove_null_samples=True, + logger=threaded_logger, + ) # for parent geo, remove null sample size return dates def run_module(params: Dict[str, Any]):