diff --git a/_delphi_utils_python/README.md b/_delphi_utils_python/README.md index 0ac6350bf..e1b091cae 100644 --- a/_delphi_utils_python/README.md +++ b/_delphi_utils_python/README.md @@ -22,13 +22,22 @@ Source code can be found here: ## Logger Usage +To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic, the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier), and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument to the logger call (for use in filtering, thresholding, grouping, visualization, etc). + +### Commonly used argument names: +- data_source +- geo_type +- signal +- issue_date +- filename + Single-thread usage. ```py from delphi_utils.logger import get_structured_logger logger = get_structured_logger('my_logger') -logger.info('Hello, world!') +logger.info('Hello', name='World') ``` Multi-thread usage. diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 37f8faf98..8ac5de48e 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -22,8 +22,10 @@ def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): for mask in masks: if not logger is None and df.loc[mask].size > 0: logger.info( - "Filtering contradictory missing code in " + - "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + "Filtering contradictory missing code", + sensor=sensor, + metric=metric, + date=date.strftime(format="%Y-%m-%d"), ) df = df.loc[~mask] elif logger is None and df.loc[mask].size > 0: diff --git a/_delphi_utils_python/delphi_utils/flash_eval/eval_day.py b/_delphi_utils_python/delphi_utils/flash_eval/eval_day.py index 660fca042..3c8012803 100644 --- a/_delphi_utils_python/delphi_utils/flash_eval/eval_day.py +++ b/_delphi_utils_python/delphi_utils/flash_eval/eval_day.py @@ -153,8 +153,7 @@ def output(evd_ranking, day, lag, signal, logger): p_text += f"\t{start_link}|*{index}*, {'{:.2f}'.format(value)}>\n" else: break - name = f"Signal: {signal} Lag: {lag}" - logger.info(name, payload=p_text) + logger.info("FLaSH: worth inspecting", signal=signal, lag=lag, payload=p_text) def evd_ranking_fn(ts_streams, EVD_max, EVD_min): diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index c0e4502a8..30fd78059 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -1,11 +1,11 @@ """Structured logger utility for creating JSON logs. -See the delphi_utils README.md for usage examples. +To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic, +the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier), +and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument +to the logger call (for use in filtering, thresholding, grouping, visualization, etc) -The Delphi group uses two ~identical versions of this file. -Try to keep them in sync with edits, for sanity. - https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py - https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py +See the delphi_utils README.md for usage examples. """ import contextlib diff --git a/_delphi_utils_python/delphi_utils/runner.py b/_delphi_utils_python/delphi_utils/runner.py index abc28ba19..9083371aa 100644 --- a/_delphi_utils_python/delphi_utils/runner.py +++ b/_delphi_utils_python/delphi_utils/runner.py @@ -51,6 +51,7 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None], #Get version and indicator name for startup ind_name = indicator_fn.__module__.replace(".run", "") + #Check for version.cfg in indicator directory if os.path.exists("version.cfg"): with open("version.cfg") as ver_file: @@ -59,9 +60,15 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None], if "current_version" in line: current_version = str.strip(line) current_version = current_version.replace("current_version = ", "") - #Logging - Starting Indicator - logger.info(f"Started {ind_name} with covidcast-indicators version {current_version}") - else: logger.info(f"Started {ind_name} without version.cfg") + logger.info( + "Started a covidcast-indicator", + indicator_name=ind_name, + current_version=current_version, + ) + else: + logger.info( + "Started a covidcast-indicator without version.cfg", indicator_name=ind_name + ) indicator_fn(params) validator = validator_fn(params) @@ -77,8 +84,10 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None], break time.sleep(1) else: - logger.error(f"Flash step timed out ({timer} s), terminating", - elapsed_time_in_seconds = round(time.time() - start, 2)) + logger.error( + "Flash step timed out, terminating", + elapsed_time_in_seconds=round(time.time() - start, 2), + ) t1.terminate() t1.join() if validator: diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 3e72f1d7f..c9c1f8483 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -323,12 +323,13 @@ def test_export_df_with_missingness(self, tmp_path): @mock.patch("delphi_utils.logger") def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path): - + sensor = "test" + geo_res = "state" create_export_csv( df=self.DF3.copy(), export_dir=tmp_path, - geo_res="state", - sensor="test", + sensor=sensor, + geo_res=geo_res, logger=mock_logger ) assert set(listdir(tmp_path)) == set( @@ -339,8 +340,9 @@ def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path): ] ) assert pd.read_csv(join(tmp_path, "20200315_state_test.csv")).size > 0 + date_str = datetime.strftime(self.TIMES[0], "%Y-%m-%d") mock_logger.info.assert_called_once_with( - "Filtering contradictory missing code in test_None_2020-02-15." + "Filtering contradictory missing code", sensor=sensor, metric=None, date=date_str ) def test_export_sort(self, tmp_path): diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 92a03e6c5..9c15b221d 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -25,7 +25,7 @@ def retrieve_files(params, filedate, logger): if files["denom"] is None: ## download recent files from FTP server - logger.info("downloading recent files through SFTP") + logger.info("Downloading recent files through SFTP") download_counts(filedate, params["indicator"]["input_cache_dir"], params["indicator"]["ftp_conn"]) denom_file = "%s/%s_Counts_Products_Denom.dat.gz" % (params["indicator"]["input_cache_dir"],filedate) @@ -157,18 +157,20 @@ def run_module(params: Dict[str, Dict[str, Any]]): startdate, enddate = process_dates(params, startdate_dt, enddate_dt) - logger.info("generating signal and exporting to CSV", - first_sensor_date = startdate, - last_sensor_date = enddate, - drop_date = dropdate, - n_backfill_days = n_backfill_days, - n_waiting_days = n_waiting_days, - geos = params["indicator"]["geos"], - export_dir = params["common"]["export_dir"], - parallel = params["indicator"]["parallel"], - weekday = params["indicator"]["weekday"], - types = params["indicator"]["types"], - se = params["indicator"]["se"]) + logger.info( + "Generating signal and exporting to CSV", + first_sensor_date=startdate, + last_sensor_date=enddate, + drop_date=dropdate, + n_backfill_days=n_backfill_days, + n_waiting_days=n_waiting_days, + geos=params["indicator"]["geos"], + export_dir=params["common"]["export_dir"], + parallel=params["indicator"]["parallel"], + weekday=params["indicator"]["weekday"], + types=params["indicator"]["types"], + se=params["indicator"]["se"], + ) ## start generating stats = [] @@ -176,9 +178,9 @@ def run_module(params: Dict[str, Dict[str, Any]]): for numtype in params["indicator"]["types"]: for weekday in params["indicator"]["weekday"]: if weekday: - logger.info("starting weekday adj", geo = geo, numtype = numtype) + logger.info("Starting weekday adj", geo_type=geo, numtype=numtype) else: - logger.info("starting no adj", geo = geo, numtype = numtype) + logger.info("Starting no adj", geo_type=geo, numtype=numtype) su_inst = CHCSensorUpdater( startdate, enddate, @@ -211,7 +213,7 @@ def run_module(params: Dict[str, Dict[str, Any]]): ) stats.extend(more_stats) - logger.info("finished processing", geo = geo) + logger.info("Finished processing", geo_type=geo) elapsed_time_in_seconds = round(time.time() - start_time, 2) min_max_date = stats and min(s[0] for s in stats) diff --git a/changehc/delphi_changehc/sensor.py b/changehc/delphi_changehc/sensor.py index 0449f07df..9a1fd29e0 100644 --- a/changehc/delphi_changehc/sensor.py +++ b/changehc/delphi_changehc/sensor.py @@ -118,10 +118,10 @@ def fit(y_data, first_sensor_date, geo_id, logger, num_col="num", den_col="den") se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)') rate_data['se'] = se_valid - logger.debug("{0}: {1:.3f},[{2:.3f}]".format( - geo_id, rate_data['rate'][-1], rate_data['se'][-1] - )) - return {"geo_id": geo_id, - "rate": 100 * rate_data['rate'], - "se": 100 * rate_data['se'], - "incl": include} + logger.debug( + ".fit() DEBUG - last rate/se for geo", + geo_value=geo_id, + value=rate_data["rate"][-1], + se=rate_data["se"][-1], + ) + return {"geo_id": geo_id, "rate": 100 * rate_data["rate"], "se": 100 * rate_data["se"], "incl": include} diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index edae85517..7c78dc020 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -41,7 +41,7 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa assert df[suspicious_se_mask].empty, " se contains suspiciously large values" assert not df["se"].isna().any(), " se contains nan values" if write_se: - logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name)) + logger.info("WARNING: WRITING SEs", filename=out_name) else: df["se"] = np.nan @@ -49,9 +49,7 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa suspicious_val_mask = df["val"].gt(90) if not df[suspicious_val_mask].empty: for geo in df.loc[suspicious_val_mask, "geo_id"]: - logger.warning("value suspiciously high, {0}: {1}".format( - geo, out_name - )) + logger.warning("Value suspiciously high", geo_value=geo, filename=out_name) dates = create_export_csv( df, @@ -62,10 +60,8 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa sensor=out_name, write_empty_days=True ) - logger.debug("wrote {0} rows for {1} {2}".format( - df.size, df["geo_id"].unique().size, geo_level - )) - logger.debug("wrote files to {0}".format(output_path)) + logger.debug("Wrote rows", num_rows=df.size, geo_type=geo_level, num_geo_ids=df["geo_id"].unique().size) + logger.debug("Wrote files", export_dir=output_path) return dates @@ -148,8 +144,9 @@ def geo_reindex(self, data): geo = self.geo gmpr = GeoMapper() if geo not in {"county", "state", "msa", "hrr", "nation", "hhs"}: - self.logger.error("{0} is invalid, pick one of 'county', " - "'state', 'msa', 'hrr', 'hss','nation'".format(geo)) + self.logger.error( + "Geo is invalid, pick one of 'county', " "'state', 'msa', 'hrr', 'hss','nation'", geo_type=geo + ) return False if geo == "county": data_frame = gmpr.fips_to_megacounty(data, @@ -224,7 +221,7 @@ def update_sensor(self, dfs.append(res) else: n_cpu = min(10, cpu_count()) - self.logger.debug("starting pool with {0} workers".format(n_cpu)) + self.logger.debug("Starting pool", n_workers=n_cpu) with Pool(n_cpu) as pool: pool_results = [] for geo_id, sub_data in data_frame.groupby(level=0,as_index=False): diff --git a/changehc/tests/test_update_sensor.py b/changehc/tests/test_update_sensor.py index 7ef25a608..d2e7ee2f3 100644 --- a/changehc/tests/test_update_sensor.py +++ b/changehc/tests/test_update_sensor.py @@ -9,6 +9,7 @@ import pandas as pd import numpy as np from boto3 import Session +from delphi_utils import get_structured_logger from moto import mock_s3 import pytest @@ -28,7 +29,7 @@ DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) OUTPATH="test_data/" -TEST_LOGGER = logging.getLogger() +TEST_LOGGER = get_structured_logger() class TestCHCSensorUpdater: """Tests for updating the sensors.""" diff --git a/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py b/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py index 2ce093488..ee6e98286 100644 --- a/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py +++ b/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py @@ -57,7 +57,7 @@ def download(ftp_credentials, out_path, logger): """Pull the latest raw files.""" current_time = datetime.datetime.now() seconds_in_day = 24 * 60 * 60 - logger.info("starting download", time=current_time) + logger.info("Starting download") # open client client = paramiko.SSHClient() diff --git a/claims_hosp/delphi_claims_hosp/indicator.py b/claims_hosp/delphi_claims_hosp/indicator.py index 4ad3ef7df..c5ac4e886 100644 --- a/claims_hosp/delphi_claims_hosp/indicator.py +++ b/claims_hosp/delphi_claims_hosp/indicator.py @@ -143,7 +143,10 @@ def fit(y_data, first_date, geo_id, num_col="num", den_col="den"): se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)') rate_data['se'] = se_valid - logging.debug("%s: %05.3f, [%05.3f]", - geo_id, rate_data['rate'][-1], rate_data['se'][-1]) - return {"geo_id": geo_id, "rate": 100 * rate_data['rate'], - "se": 100 * rate_data['se'], "incl": include} + logging.debug( + ".fit() DEBUG - last rate/se for geo", + geo_value=geo_id, + value=rate_data["rate"][-1], + se=rate_data["se"][-1], + ) + return {"geo_id": geo_id, "rate": 100 * rate_data["rate"], "se": 100 * rate_data["se"], "incl": include} diff --git a/claims_hosp/delphi_claims_hosp/modify_claims_drops.py b/claims_hosp/delphi_claims_hosp/modify_claims_drops.py index 0ab93ebcc..19a962884 100644 --- a/claims_hosp/delphi_claims_hosp/modify_claims_drops.py +++ b/claims_hosp/delphi_claims_hosp/modify_claims_drops.py @@ -57,5 +57,5 @@ def modify_and_write(data_path, logger, test_mode=False): dfs_list.append(dfs) else: dfs.to_csv(out_path, index=False) - logger.info(f"Wrote {out_path}") + logger.info("Wrote modified csv", filename=out_path) return files, dfs_list diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 53c4cd33b..a9752072c 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -116,9 +116,9 @@ def run_module(params): for geo in params["indicator"]["geos"]: for weekday in params["indicator"]["weekday"]: if weekday: - logger.info("starting weekday adj", geo = geo) + logger.info("Starting weekday adj", geo_type=geo) else: - logger.info("starting no weekday adj", geo = geo) + logger.info("Starting no weekday adj", geo_type=geo) signal_name = Config.signal_weekday_name if weekday else Config.signal_name if params["indicator"]["write_se"]: @@ -126,7 +126,7 @@ def run_module(params): "supply obfuscated prefix in params.json" signal_name = params["indicator"]["obfuscated_prefix"] + "_" + signal_name - logger.info("Updating signal name", signal_name = signal_name) + logger.info("Updating signal name", signal=signal_name) updater = ClaimsHospIndicatorUpdater( startdate, enddate, @@ -135,16 +135,16 @@ def run_module(params): params["indicator"]["parallel"], weekday, params["indicator"]["write_se"], - signal_name + signal_name, + logger, ) updater.update_indicator( claims_file, params["common"]["export_dir"], - logger, ) max_dates.append(updater.output_dates[-1]) n_csv_export.append(len(updater.output_dates)) - logger.info("finished updating", geo = geo) + logger.info("Finished updating", geo_type=geo) # Remove all the raw files for fn in os.listdir(params["indicator"]["input_dir"]): diff --git a/claims_hosp/delphi_claims_hosp/update_indicator.py b/claims_hosp/delphi_claims_hosp/update_indicator.py index df3f3308f..5ba8ddd22 100644 --- a/claims_hosp/delphi_claims_hosp/update_indicator.py +++ b/claims_hosp/delphi_claims_hosp/update_indicator.py @@ -7,7 +7,6 @@ """ # standard packages -import logging from multiprocessing import Pool, cpu_count # third party @@ -28,8 +27,7 @@ class ClaimsHospIndicatorUpdater: # pylint: disable=too-many-instance-attributes, too-many-arguments # all variables are used - def __init__(self, startdate, enddate, dropdate, geo, parallel, weekday, - write_se, signal_name): + def __init__(self, startdate, enddate, dropdate, geo, parallel, weekday, write_se, signal_name, logger): """ Initialize updater for the claims-based hospitalization indicator. @@ -53,6 +51,7 @@ def __init__(self, startdate, enddate, dropdate, geo, parallel, weekday, # init in shift_dates, declared here for pylint self.burnindate, self.fit_dates, self.burn_in_dates, self.output_dates = \ [None] * 4 + self.logger = logger assert ( self.startdate > (Config.FIRST_DATA_DATE + Config.BURN_IN_PERIOD) @@ -114,9 +113,9 @@ def geo_reindex(self, data): elif self.geo == "hrr": data_frame = data # data is already adjusted in aggregation step above else: - logging.error( - "%s is invalid, pick one of 'county', 'state', 'msa', 'hrr', 'hhs', nation'", - self.geo) + self.logger.error( + "Geo is invalid, pick one of 'county', 'state', 'msa', 'hrr', 'hhs', nation'", geo_type=self.geo + ) return False unique_geo_ids = pd.unique(data_frame[self.geo]) @@ -133,7 +132,7 @@ def geo_reindex(self, data): data_frame.fillna(0, inplace=True) return data_frame - def update_indicator(self, input_filepath, outpath, logger): + def update_indicator(self, input_filepath, outpath): """ Generate and output indicator values. @@ -159,7 +158,7 @@ def update_indicator(self, input_filepath, outpath, logger): ["num"], Config.DATE_COL, [1, 1e5], - logger, + self.logger, ) if self.weekday else None @@ -182,7 +181,7 @@ def update_indicator(self, input_filepath, outpath, logger): valid_inds[geo_id] = np.array(res.loc[final_output_inds, "incl"]) else: n_cpu = min(Config.MAX_CPU_POOL, cpu_count()) - logging.debug("starting pool with %d workers", n_cpu) + self.logger.debug("Starting pool", n_workers=n_cpu) with Pool(n_cpu) as pool: pool_results = [] for geo_id, sub_data in data_frame.groupby(level=0, as_index=False): @@ -217,7 +216,7 @@ def update_indicator(self, input_filepath, outpath, logger): } self.write_to_csv(output_dict, outpath) - logging.debug("wrote files to %s", outpath) + self.logger.debug("Wrote files", export_dir=outpath) def write_to_csv(self, output_dict, output_path="./receiving"): """ @@ -229,8 +228,7 @@ def write_to_csv(self, output_dict, output_path="./receiving"): """ if self.write_se: - logging.info("========= WARNING: WRITING SEs TO %s =========", - self.signal_name) + self.logger.info("WARNING: WRITING SEs", signal=self.signal_name) geo_level = output_dict["geo_level"] dates = output_dict["dates"] @@ -255,7 +253,7 @@ def write_to_csv(self, output_dict, output_path="./receiving"): assert not np.isnan(val), "value for included value is nan" assert not np.isnan(se), "se for included rate is nan" if val > 90: - logging.warning("value suspicious, %s: %d", geo_id, val) + self.logger.warning("Value suspicious", geo_type=geo_level, geo_value=geo_id, value=val) assert se < 5, f"se suspicious, {geo_id}: {se}" if self.write_se: assert val > 0 and se > 0, "p=0, std_err=0 invalid" @@ -267,4 +265,4 @@ def write_to_csv(self, output_dict, output_path="./receiving"): "%s,%f,%s,%s,%s\n" % (geo_id, val, "NA", "NA", "NA")) out_n += 1 - logging.debug("wrote %d rows for %d %s", out_n, len(geo_ids), geo_level) + self.logger.debug("Wrote rows", num_rows=out_n, geo_type=geo_level, num_geo_ids=len(geo_ids)) diff --git a/claims_hosp/tests/test_update_indicator.py b/claims_hosp/tests/test_update_indicator.py index 1471be655..5ca527287 100644 --- a/claims_hosp/tests/test_update_indicator.py +++ b/claims_hosp/tests/test_update_indicator.py @@ -51,7 +51,8 @@ def test_shift_dates(self): self.parallel, self.weekday, self.write_se, - Config.signal_name + Config.signal_name, + TEST_LOGGER ) ## Test init assert updater.startdate.month == 2 @@ -72,7 +73,8 @@ def test_geo_reindex(self): self.parallel, self.weekday, self.write_se, - Config.signal_name + Config.signal_name, + TEST_LOGGER ) updater.shift_dates() data_frame = updater.geo_reindex(self.small_test_data.reset_index()) @@ -90,13 +92,13 @@ def test_update_indicator(self): self.parallel, self.weekday, self.write_se, - Config.signal_name + Config.signal_name, + TEST_LOGGER ) updater.update_indicator( DATA_FILEPATH, - td.name, - TEST_LOGGER + td.name ) assert len(os.listdir(td.name)) == len( @@ -112,7 +114,8 @@ def test_write_to_csv_results(self): self.parallel, self.weekday, self.write_se, - Config.signal_name + Config.signal_name, + TEST_LOGGER ) res0 = { @@ -192,7 +195,8 @@ def test_write_to_csv_with_se_results(self): self.parallel, True, True, - signal_name + signal_name, + TEST_LOGGER ) res0 = { @@ -243,7 +247,8 @@ def test_write_to_csv_wrong_results(self): self.parallel, self.weekday, self.write_se, - Config.signal_name + Config.signal_name, + TEST_LOGGER ) res0 = { diff --git a/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py b/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py index 9d51768be..6cb42364a 100644 --- a/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py +++ b/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py @@ -59,7 +59,7 @@ def download(ftp_credentials, out_path, logger, issue_date=None): else: current_time = datetime.datetime.strptime(issue_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59) - logger.info("starting download", time=current_time) + logger.info("Starting download") seconds_in_day = 24 * 60 * 60 # open client diff --git a/doctor_visits/delphi_doctor_visits/modify_claims_drops.py b/doctor_visits/delphi_doctor_visits/modify_claims_drops.py index daed93d58..3be9393e4 100644 --- a/doctor_visits/delphi_doctor_visits/modify_claims_drops.py +++ b/doctor_visits/delphi_doctor_visits/modify_claims_drops.py @@ -48,5 +48,5 @@ def modify_and_write(f, logger, test_mode=False): if not test_mode: dfs.to_csv(out_path, index=False) - logger.info(f"Wrote {out_path}") + logger.info("Wrote modified csv", filename=out_path) return dfs diff --git a/doctor_visits/delphi_doctor_visits/patch.py b/doctor_visits/delphi_doctor_visits/patch.py index 32b6d308f..32c62dd6b 100644 --- a/doctor_visits/delphi_doctor_visits/patch.py +++ b/doctor_visits/delphi_doctor_visits/patch.py @@ -45,16 +45,19 @@ def patch(): start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") - logger.info(f"""Start patching {params["patch"]["patch_dir"]}""") - logger.info(f"""Start issue: {start_issue.strftime("%Y-%m-%d")}""") - logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""") + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=start_issue.strftime("%Y-%m-%d"), + end_issue=end_issue.strftime("%Y-%m-%d"), + ) makedirs(params["patch"]["patch_dir"], exist_ok=True) current_issue = start_issue while current_issue <= end_issue: - logger.info(f"""Running issue {current_issue.strftime("%Y-%m-%d")}""") + logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") diff --git a/doctor_visits/delphi_doctor_visits/run.py b/doctor_visits/delphi_doctor_visits/run.py index 3c941534a..2dccffc8c 100644 --- a/doctor_visits/delphi_doctor_visits/run.py +++ b/doctor_visits/delphi_doctor_visits/run.py @@ -88,32 +88,33 @@ def run_module(params, logger=None): # pylint: disable=too-many-statements startdate_dt = enddate_dt - timedelta(days=n_backfill_days) enddate = str(enddate_dt.date()) startdate = str(startdate_dt.date()) - logger.info("drop date:\t\t%s", dropdate) - logger.info("first sensor date:\t%s", startdate) - logger.info("last sensor date:\t%s", enddate) - logger.info("n_backfill_days:\t%s", n_backfill_days) - logger.info("n_waiting_days:\t%s", n_waiting_days) + + logger.info( + "Using params", + startdate=startdate, + enddate=enddate, + dropdate=dropdate, + n_backfill_days=n_backfill_days, + n_waiting_days=n_waiting_days, + export_dir=export_dir, + parallel=params["indicator"]["parallel"], + weekday=params["indicator"]["weekday"], + write_se=se, + prefix=prefix, + ) ## geographies geos = ["state", "msa", "hrr", "county", "hhs", "nation"] - - ## print out other vars - logger.info("outpath:\t\t%s", export_dir) - logger.info("parallel:\t\t%s", params["indicator"]["parallel"]) - logger.info("weekday:\t\t%s", params["indicator"]["weekday"]) - logger.info("write se:\t\t%s", se) - logger.info("obfuscated prefix:\t%s", prefix) - max_dates = [] n_csv_export = [] ## start generating for geo in geos: for weekday in params["indicator"]["weekday"]: if weekday: - logger.info("starting %s, weekday adj", geo) + logger.info("Starting with weekday adj", geo_type=geo) else: - logger.info("starting %s, no adj", geo) + logger.info("Starting with no adj", geo_type=geo) sensor = update_sensor( filepath=claims_file, startdate=startdate, @@ -137,8 +138,8 @@ def run_module(params, logger=None): # pylint: disable=too-many-statements write_to_csv(sensor, geo, se, out_name, logger, export_dir) max_dates.append(sensor.date.max()) n_csv_export.append(sensor.date.unique().shape[0]) - logger.debug(f"wrote files to {export_dir}") - logger.info("finished updating", geo = geo) + logger.debug("Wrote files", export_dir=export_dir) + logger.info("Finished updating", geo_type=geo) # Remove all the raw files for fn in os.listdir(params["indicator"]["input_dir"]): diff --git a/doctor_visits/delphi_doctor_visits/sensor.py b/doctor_visits/delphi_doctor_visits/sensor.py index b5a645ea8..91faa20cf 100644 --- a/doctor_visits/delphi_doctor_visits/sensor.py +++ b/doctor_visits/delphi_doctor_visits/sensor.py @@ -239,7 +239,7 @@ def fit(y_data, se[include] = np.sqrt( np.divide((new_rates[include] * (1 - new_rates[include])), den[include])) - logger.debug(f"{geo_id}: {new_rates[-1]:.3f},[{se[-1]:.3f}]") + logger.debug(".fit() DEBUG - last rate/se for geo", geo_value=geo_id, value=new_rates[-1], se=se[-1]) included_indices = [x for x in final_sensor_idxs if include[x]] diff --git a/doctor_visits/delphi_doctor_visits/update_sensor.py b/doctor_visits/delphi_doctor_visits/update_sensor.py index 125c0df18..4cac1e81c 100644 --- a/doctor_visits/delphi_doctor_visits/update_sensor.py +++ b/doctor_visits/delphi_doctor_visits/update_sensor.py @@ -34,7 +34,7 @@ def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, logger, outpu output_path: outfile path to write the csv (default is current directory) """ if se: - logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========") + logger.info("WARNING: WRITING SEs", filename=out_name) out_n = 0 for d in set(output_df["date"]): @@ -64,7 +64,7 @@ def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, logger, outpu outfile.write( "%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")) out_n += 1 - logger.debug(f"wrote {out_n} rows for {geo_level}") + logger.debug("Wrote rows", num_rows=out_n, geo_type=geo_level) def update_sensor( @@ -177,7 +177,7 @@ def update_sensor( else: n_cpu = min(10, cpu_count()) - logger.debug(f"starting pool with {n_cpu} workers") + logger.debug("Starting pool", n_workers=n_cpu) with Pool(n_cpu) as pool: pool_results = [] diff --git a/google_symptoms/delphi_google_symptoms/date_utils.py b/google_symptoms/delphi_google_symptoms/date_utils.py index ebfe5109a..2ad6244e9 100644 --- a/google_symptoms/delphi_google_symptoms/date_utils.py +++ b/google_symptoms/delphi_google_symptoms/date_utils.py @@ -98,7 +98,7 @@ def generate_num_export_days(params: Dict, logger) -> [int]: expected_date_diff += global_max_expected_lag if latest_date_diff > expected_date_diff: - logger.info(f"Missing dates from: {to_datetime(min(gs_metadata.max_time)).date()}") + logger.info("Missing date", date=to_datetime(min(gs_metadata.max_time)).date()) num_export_days = expected_date_diff diff --git a/google_symptoms/delphi_google_symptoms/patch.py b/google_symptoms/delphi_google_symptoms/patch.py index 01d099c4f..85df89394 100755 --- a/google_symptoms/delphi_google_symptoms/patch.py +++ b/google_symptoms/delphi_google_symptoms/patch.py @@ -58,16 +58,19 @@ def patch(params): issue_date = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") - logger.info(f"""Start patching {params["patch"]["patch_dir"]}""") - logger.info(f"""Start issue: {issue_date.strftime("%Y-%m-%d")}""") - logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""") + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=issue_date.strftime("%Y-%m-%d"), + end_issue=end_issue.strftime("%Y-%m-%d"), + ) makedirs(params["patch"]["patch_dir"], exist_ok=True) patch_dates = generate_patch_dates(params) while issue_date <= end_issue: - logger.info(f"""Running issue {issue_date.strftime("%Y-%m-%d")}""") + logger.info("Running issue", issue_date=issue_date.strftime("%Y-%m-%d")) # Output dir setup current_issue_yyyymmdd = issue_date.strftime("%Y%m%d") diff --git a/google_symptoms/delphi_google_symptoms/run.py b/google_symptoms/delphi_google_symptoms/run.py index 8303a9a8a..8ad1d6d10 100644 --- a/google_symptoms/delphi_google_symptoms/run.py +++ b/google_symptoms/delphi_google_symptoms/run.py @@ -80,10 +80,8 @@ def run_module(params, logger=None): if len(df_pull) == 0: continue for metric, smoother in product(COMBINED_METRIC, SMOOTHERS): - logger.info("generating signal and exporting to CSV", - geo_res=geo_res, - metric=metric, - smoother=smoother) + sensor_name = "_".join([smoother, "search"]) + logger.info("Generating signal and exporting to CSV", geo_type=geo_res, signal=f"{metric}_{sensor_name}") df = df_pull df["val"] = df[metric].astype(float) df["val"] = df[["geo_id", "val"]].groupby( @@ -94,9 +92,8 @@ def run_module(params, logger=None): # Drop early entries where data insufficient for smoothing df = df.loc[~df["val"].isnull(), :] df = df.reset_index() - sensor_name = "_".join([smoother, "search"]) if len(df) == 0: - logger.info("No data for %s_%s_%s", geo_res, metric.lower(), sensor_name) + logger.info("No data for signal", geo_type=geo_res, signal=f"{metric}_{sensor_name}") continue exported_csv_dates = create_export_csv( df, diff --git a/nssp/delphi_nssp/run.py b/nssp/delphi_nssp/run.py index 7c5a3ffac..dfd987f89 100644 --- a/nssp/delphi_nssp/run.py +++ b/nssp/delphi_nssp/run.py @@ -90,7 +90,7 @@ def run_module(params): for geo in GEOS: df = df_pull.copy() df["val"] = df[signal] - logger.info("Generating signal and exporting to CSV", metric=signal) + logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal) if geo == "nation": df = df[df["geography"] == "United States"] df["geo_id"] = "us" diff --git a/nwss_wastewater/delphi_nwss/run.py b/nwss_wastewater/delphi_nwss/run.py index 378849ba5..60bfc84c7 100644 --- a/nwss_wastewater/delphi_nwss/run.py +++ b/nwss_wastewater/delphi_nwss/run.py @@ -146,7 +146,7 @@ def run_module(params): df = generate_weights(df, sensor) for geo in GEOS: - logger.info("Generating signal and exporting to CSV", metric=sensor) + logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=sensor) if geo == "nation": agg_df = weighted_nation_sum(df, sensor) else: diff --git a/quidel_covidtest/delphi_quidel_covidtest/pull.py b/quidel_covidtest/delphi_quidel_covidtest/pull.py index d9f23f2ec..560f89456 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/pull.py +++ b/quidel_covidtest/delphi_quidel_covidtest/pull.py @@ -56,7 +56,7 @@ def get_from_s3(start_date, end_date, bucket, logger): seen_files = set() for search_date in [start_date + timedelta(days=x) for x in range(n_days)]: if search_date in s3_files.keys(): - logger.info(f"Pulling data received on {search_date.date()}") + logger.info("Pulling data received on date", search_date=search_date.date()) # Fetch data received on the same day for fn in s3_files[search_date]: @@ -110,11 +110,11 @@ def fix_date(df, logger): df.insert(2, "timestamp", df["TestDate"]) mask = df["TestDate"] <= df["StorageDate"] - logger.info(f"Removing {((len(df) - np.sum(mask)) * 100 / len(df)):.2f}% of unusual data") + logger.info("Removing unusual data", percent=round((len(df) - np.sum(mask)) * 100 / len(df), 2)) df = df[mask] mask = df["StorageDate"] - df["TestDate"] > pd.Timedelta(days=90) - logger.info(f"Fixing {(np.sum(mask) * 100 / len(df)):.2f}% of outdated data") + logger.info("Fixing outdated data", percent=round((np.sum(mask) * 100 / len(df)), 2)) df["timestamp"].values[mask] = df["StorageDate"].values[mask] return df diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index a59e0c101..e6974b6aa 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -164,7 +164,7 @@ def run_module(params: Dict[str, Any]): n_cpu = min(8, cpu_count()) # for parallelization with pool_and_threadedlogger(logger, n_cpu) as (pool, threaded_logger): # for using loggers in multiple threads - logger.info("Parallelizing sensor generation", n_cpu=n_cpu) + logger.info("Parallelizing sensor generation", n_workers=n_cpu) pool_results = [] for geo_res in NONPARENT_GEO_RESOLUTIONS: geo_data, res_key = geo_map(geo_res, data)