Skip to content

Commit

Permalink
Fix unstructured logging variable formatting (#2029)
Browse files Browse the repository at this point in the history
Co-authored-by: george <[email protected]>
Co-authored-by: minhkhul <[email protected]>
  • Loading branch information
3 people authored Sep 18, 2024
1 parent 5c72cbd commit 12a9712
Show file tree
Hide file tree
Showing 29 changed files with 163 additions and 132 deletions.
11 changes: 10 additions & 1 deletion _delphi_utils_python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@ Source code can be found here:

## Logger Usage

To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic, the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier), and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument to the logger call (for use in filtering, thresholding, grouping, visualization, etc).

### Commonly used argument names:
- data_source
- geo_type
- signal
- issue_date
- filename

Single-thread usage.

```py
from delphi_utils.logger import get_structured_logger

logger = get_structured_logger('my_logger')
logger.info('Hello, world!')
logger.info('Hello', name='World')
```

Multi-thread usage.
Expand Down
6 changes: 4 additions & 2 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
for mask in masks:
if not logger is None and df.loc[mask].size > 0:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
"Filtering contradictory missing code",
sensor=sensor,
metric=metric,
date=date.strftime(format="%Y-%m-%d"),
)
df = df.loc[~mask]
elif logger is None and df.loc[mask].size > 0:
Expand Down
3 changes: 1 addition & 2 deletions _delphi_utils_python/delphi_utils/flash_eval/eval_day.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ def output(evd_ranking, day, lag, signal, logger):
p_text += f"\t{start_link}|*{index}*, {'{:.2f}'.format(value)}>\n"
else:
break
name = f"Signal: {signal} Lag: {lag}"
logger.info(name, payload=p_text)
logger.info("FLaSH: worth inspecting", signal=signal, lag=lag, payload=p_text)


def evd_ranking_fn(ts_streams, EVD_max, EVD_min):
Expand Down
10 changes: 5 additions & 5 deletions _delphi_utils_python/delphi_utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Structured logger utility for creating JSON logs.
See the delphi_utils README.md for usage examples.
To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic,
the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier),
and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument
to the logger call (for use in filtering, thresholding, grouping, visualization, etc)
The Delphi group uses two ~identical versions of this file.
Try to keep them in sync with edits, for sanity.
https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py
https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py
See the delphi_utils README.md for usage examples.
"""

import contextlib
Expand Down
19 changes: 14 additions & 5 deletions _delphi_utils_python/delphi_utils/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],

#Get version and indicator name for startup
ind_name = indicator_fn.__module__.replace(".run", "")

#Check for version.cfg in indicator directory
if os.path.exists("version.cfg"):
with open("version.cfg") as ver_file:
Expand All @@ -59,9 +60,15 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
if "current_version" in line:
current_version = str.strip(line)
current_version = current_version.replace("current_version = ", "")
#Logging - Starting Indicator
logger.info(f"Started {ind_name} with covidcast-indicators version {current_version}")
else: logger.info(f"Started {ind_name} without version.cfg")
logger.info(
"Started a covidcast-indicator",
indicator_name=ind_name,
current_version=current_version,
)
else:
logger.info(
"Started a covidcast-indicator without version.cfg", indicator_name=ind_name
)

indicator_fn(params)
validator = validator_fn(params)
Expand All @@ -77,8 +84,10 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
break
time.sleep(1)
else:
logger.error(f"Flash step timed out ({timer} s), terminating",
elapsed_time_in_seconds = round(time.time() - start, 2))
logger.error(
"Flash step timed out, terminating",
elapsed_time_in_seconds=round(time.time() - start, 2),
)
t1.terminate()
t1.join()
if validator:
Expand Down
10 changes: 6 additions & 4 deletions _delphi_utils_python/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,13 @@ def test_export_df_with_missingness(self, tmp_path):

@mock.patch("delphi_utils.logger")
def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path):

sensor = "test"
geo_res = "state"
create_export_csv(
df=self.DF3.copy(),
export_dir=tmp_path,
geo_res="state",
sensor="test",
sensor=sensor,
geo_res=geo_res,
logger=mock_logger
)
assert set(listdir(tmp_path)) == set(
Expand All @@ -339,8 +340,9 @@ def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path):
]
)
assert pd.read_csv(join(tmp_path, "20200315_state_test.csv")).size > 0
date_str = datetime.strftime(self.TIMES[0], "%Y-%m-%d")
mock_logger.info.assert_called_once_with(
"Filtering contradictory missing code in test_None_2020-02-15."
"Filtering contradictory missing code", sensor=sensor, metric=None, date=date_str
)

def test_export_sort(self, tmp_path):
Expand Down
34 changes: 18 additions & 16 deletions changehc/delphi_changehc/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def retrieve_files(params, filedate, logger):
if files["denom"] is None:

## download recent files from FTP server
logger.info("downloading recent files through SFTP")
logger.info("Downloading recent files through SFTP")
download_counts(filedate, params["indicator"]["input_cache_dir"], params["indicator"]["ftp_conn"])

denom_file = "%s/%s_Counts_Products_Denom.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
Expand Down Expand Up @@ -157,28 +157,30 @@ def run_module(params: Dict[str, Dict[str, Any]]):

startdate, enddate = process_dates(params, startdate_dt, enddate_dt)

logger.info("generating signal and exporting to CSV",
first_sensor_date = startdate,
last_sensor_date = enddate,
drop_date = dropdate,
n_backfill_days = n_backfill_days,
n_waiting_days = n_waiting_days,
geos = params["indicator"]["geos"],
export_dir = params["common"]["export_dir"],
parallel = params["indicator"]["parallel"],
weekday = params["indicator"]["weekday"],
types = params["indicator"]["types"],
se = params["indicator"]["se"])
logger.info(
"Generating signal and exporting to CSV",
first_sensor_date=startdate,
last_sensor_date=enddate,
drop_date=dropdate,
n_backfill_days=n_backfill_days,
n_waiting_days=n_waiting_days,
geos=params["indicator"]["geos"],
export_dir=params["common"]["export_dir"],
parallel=params["indicator"]["parallel"],
weekday=params["indicator"]["weekday"],
types=params["indicator"]["types"],
se=params["indicator"]["se"],
)

## start generating
stats = []
for geo in params["indicator"]["geos"]:
for numtype in params["indicator"]["types"]:
for weekday in params["indicator"]["weekday"]:
if weekday:
logger.info("starting weekday adj", geo = geo, numtype = numtype)
logger.info("Starting weekday adj", geo_type=geo, numtype=numtype)
else:
logger.info("starting no adj", geo = geo, numtype = numtype)
logger.info("Starting no adj", geo_type=geo, numtype=numtype)
su_inst = CHCSensorUpdater(
startdate,
enddate,
Expand Down Expand Up @@ -211,7 +213,7 @@ def run_module(params: Dict[str, Dict[str, Any]]):
)
stats.extend(more_stats)

logger.info("finished processing", geo = geo)
logger.info("Finished processing", geo_type=geo)

elapsed_time_in_seconds = round(time.time() - start_time, 2)
min_max_date = stats and min(s[0] for s in stats)
Expand Down
14 changes: 7 additions & 7 deletions changehc/delphi_changehc/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ def fit(y_data, first_sensor_date, geo_id, logger, num_col="num", den_col="den")
se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)')
rate_data['se'] = se_valid

logger.debug("{0}: {1:.3f},[{2:.3f}]".format(
geo_id, rate_data['rate'][-1], rate_data['se'][-1]
))
return {"geo_id": geo_id,
"rate": 100 * rate_data['rate'],
"se": 100 * rate_data['se'],
"incl": include}
logger.debug(
".fit() DEBUG - last rate/se for geo",
geo_value=geo_id,
value=rate_data["rate"][-1],
se=rate_data["se"][-1],
)
return {"geo_id": geo_id, "rate": 100 * rate_data["rate"], "se": 100 * rate_data["se"], "incl": include}
19 changes: 8 additions & 11 deletions changehc/delphi_changehc/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,15 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa
assert df[suspicious_se_mask].empty, " se contains suspiciously large values"
assert not df["se"].isna().any(), " se contains nan values"
if write_se:
logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
logger.info("WARNING: WRITING SEs", filename=out_name)
else:
df["se"] = np.nan

assert not df["val"].isna().any(), " val contains nan values"
suspicious_val_mask = df["val"].gt(90)
if not df[suspicious_val_mask].empty:
for geo in df.loc[suspicious_val_mask, "geo_id"]:
logger.warning("value suspiciously high, {0}: {1}".format(
geo, out_name
))
logger.warning("Value suspiciously high", geo_value=geo, filename=out_name)

dates = create_export_csv(
df,
Expand All @@ -62,10 +60,8 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa
sensor=out_name,
write_empty_days=True
)
logger.debug("wrote {0} rows for {1} {2}".format(
df.size, df["geo_id"].unique().size, geo_level
))
logger.debug("wrote files to {0}".format(output_path))
logger.debug("Wrote rows", num_rows=df.size, geo_type=geo_level, num_geo_ids=df["geo_id"].unique().size)
logger.debug("Wrote files", export_dir=output_path)
return dates


Expand Down Expand Up @@ -148,8 +144,9 @@ def geo_reindex(self, data):
geo = self.geo
gmpr = GeoMapper()
if geo not in {"county", "state", "msa", "hrr", "nation", "hhs"}:
self.logger.error("{0} is invalid, pick one of 'county', "
"'state', 'msa', 'hrr', 'hss','nation'".format(geo))
self.logger.error(
"Geo is invalid, pick one of 'county', " "'state', 'msa', 'hrr', 'hss','nation'", geo_type=geo
)
return False
if geo == "county":
data_frame = gmpr.fips_to_megacounty(data,
Expand Down Expand Up @@ -224,7 +221,7 @@ def update_sensor(self,
dfs.append(res)
else:
n_cpu = min(10, cpu_count())
self.logger.debug("starting pool with {0} workers".format(n_cpu))
self.logger.debug("Starting pool", n_workers=n_cpu)
with Pool(n_cpu) as pool:
pool_results = []
for geo_id, sub_data in data_frame.groupby(level=0,as_index=False):
Expand Down
3 changes: 2 additions & 1 deletion changehc/tests/test_update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd
import numpy as np
from boto3 import Session
from delphi_utils import get_structured_logger
from moto import mock_s3
import pytest

Expand All @@ -28,7 +29,7 @@
DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"]
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
OUTPATH="test_data/"
TEST_LOGGER = logging.getLogger()
TEST_LOGGER = get_structured_logger()

class TestCHCSensorUpdater:
"""Tests for updating the sensors."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def download(ftp_credentials, out_path, logger):
"""Pull the latest raw files."""
current_time = datetime.datetime.now()
seconds_in_day = 24 * 60 * 60
logger.info("starting download", time=current_time)
logger.info("Starting download")

# open client
client = paramiko.SSHClient()
Expand Down
11 changes: 7 additions & 4 deletions claims_hosp/delphi_claims_hosp/indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ def fit(y_data, first_date, geo_id, num_col="num", den_col="den"):
se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)')
rate_data['se'] = se_valid

logging.debug("%s: %05.3f, [%05.3f]",
geo_id, rate_data['rate'][-1], rate_data['se'][-1])
return {"geo_id": geo_id, "rate": 100 * rate_data['rate'],
"se": 100 * rate_data['se'], "incl": include}
logging.debug(
".fit() DEBUG - last rate/se for geo",
geo_value=geo_id,
value=rate_data["rate"][-1],
se=rate_data["se"][-1],
)
return {"geo_id": geo_id, "rate": 100 * rate_data["rate"], "se": 100 * rate_data["se"], "incl": include}
2 changes: 1 addition & 1 deletion claims_hosp/delphi_claims_hosp/modify_claims_drops.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ def modify_and_write(data_path, logger, test_mode=False):
dfs_list.append(dfs)
else:
dfs.to_csv(out_path, index=False)
logger.info(f"Wrote {out_path}")
logger.info("Wrote modified csv", filename=out_path)
return files, dfs_list
12 changes: 6 additions & 6 deletions claims_hosp/delphi_claims_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,17 @@ def run_module(params):
for geo in params["indicator"]["geos"]:
for weekday in params["indicator"]["weekday"]:
if weekday:
logger.info("starting weekday adj", geo = geo)
logger.info("Starting weekday adj", geo_type=geo)
else:
logger.info("starting no weekday adj", geo = geo)
logger.info("Starting no weekday adj", geo_type=geo)

signal_name = Config.signal_weekday_name if weekday else Config.signal_name
if params["indicator"]["write_se"]:
assert params["indicator"]["obfuscated_prefix"] is not None, \
"supply obfuscated prefix in params.json"
signal_name = params["indicator"]["obfuscated_prefix"] + "_" + signal_name

logger.info("Updating signal name", signal_name = signal_name)
logger.info("Updating signal name", signal=signal_name)
updater = ClaimsHospIndicatorUpdater(
startdate,
enddate,
Expand All @@ -135,16 +135,16 @@ def run_module(params):
params["indicator"]["parallel"],
weekday,
params["indicator"]["write_se"],
signal_name
signal_name,
logger,
)
updater.update_indicator(
claims_file,
params["common"]["export_dir"],
logger,
)
max_dates.append(updater.output_dates[-1])
n_csv_export.append(len(updater.output_dates))
logger.info("finished updating", geo = geo)
logger.info("Finished updating", geo_type=geo)

# Remove all the raw files
for fn in os.listdir(params["indicator"]["input_dir"]):
Expand Down
Loading

0 comments on commit 12a9712

Please sign in to comment.