Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unstructured logging variable formatting #2029

Merged
merged 48 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
9d7d4c5
first implementation
aysim319 Aug 20, 2024
2c6623c
found missing and making msg consistent
aysim319 Aug 20, 2024
b10f0e5
suggested changes
aysim319 Aug 22, 2024
a2339ae
suggested change and making params more consistent
aysim319 Aug 22, 2024
114feb0
more suggested change
aysim319 Aug 22, 2024
43eff03
test and fix tests
aysim319 Aug 22, 2024
0c60b41
lint
aysim319 Aug 22, 2024
a53c2f1
lint
aysim319 Aug 27, 2024
39f26a2
Update doctor_visits/delphi_doctor_visits/run.py
aysim319 Aug 27, 2024
7262d29
suggested change
aysim319 Aug 29, 2024
8033bd6
suggested change
aysim319 Aug 29, 2024
43e1985
suggested change
aysim319 Aug 29, 2024
68a56d8
suggested change
aysim319 Aug 29, 2024
49af0b8
suggested changes
aysim319 Aug 29, 2024
539f0de
fixing order
aysim319 Aug 30, 2024
3bb80a9
lint
aysim319 Aug 30, 2024
1ae613c
make change to test
aysim319 Aug 30, 2024
4d7a6d0
rename argument
aysim319 Aug 30, 2024
bd89a30
Update _delphi_utils_python/README.md
aysim319 Aug 30, 2024
d085954
Update _delphi_utils_python/README.md
aysim319 Aug 30, 2024
b35a91e
Update _delphi_utils_python/delphi_utils/flash_eval/eval_day.py
aysim319 Aug 30, 2024
3ddd35d
Update _delphi_utils_python/delphi_utils/runner.py
aysim319 Aug 30, 2024
18f40af
suggested changes
aysim319 Sep 3, 2024
29c927b
more suggeseted changes
aysim319 Sep 3, 2024
fc2fa46
lint
aysim319 Sep 4, 2024
d0ef8aa
lint
aysim319 Sep 6, 2024
dd9e5fc
suggested change
aysim319 Sep 6, 2024
5a27f33
Update _delphi_utils_python/README.md
aysim319 Sep 6, 2024
6db7d61
suggesed changes
aysim319 Sep 6, 2024
5e04164
Merge remote-tracking branch 'origin/2025-fix-unstruc-logging' into 2…
aysim319 Sep 6, 2024
89e7245
lint
aysim319 Sep 6, 2024
e050a51
suggested changes
aysim319 Sep 6, 2024
37122f4
suggested changes
aysim319 Sep 9, 2024
e8631f8
more changes
aysim319 Sep 10, 2024
7a31f8e
more changes
aysim319 Sep 10, 2024
79b1ede
lint
aysim319 Sep 10, 2024
f8220a5
lint
aysim319 Sep 10, 2024
3d44e67
lint conflict
aysim319 Sep 10, 2024
a3d0aab
lint
aysim319 Sep 10, 2024
0b8d527
lint
aysim319 Sep 10, 2024
10df87a
lint
aysim319 Sep 10, 2024
9947332
suggested changes
aysim319 Sep 11, 2024
dcdbed8
lint
aysim319 Sep 11, 2024
b0c31ae
suggested changes
aysim319 Sep 17, 2024
0fb33fe
Delete fluview/tests/test_data/flu_metadata.json
melange396 Sep 18, 2024
bc6a1dc
Delete fluview/delphi_fluview/pull.py
melange396 Sep 18, 2024
54d183f
Delete fluview/tests/conftest.py
melange396 Sep 18, 2024
7c7f15a
Update runner.py
melange396 Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion _delphi_utils_python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@ Source code can be found here:

## Logger Usage

aysim319 marked this conversation as resolved.
Show resolved Hide resolved
To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic, the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier), and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument to the logger call (for use in filtering, thresholding, grouping, visualization, etc).

### Commonly used argument names:
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
- data_source
- geo_type
- signal
- issue_date
- filename

Single-thread usage.

```py
from delphi_utils.logger import get_structured_logger

logger = get_structured_logger('my_logger')
logger.info('Hello, world!')
logger.info('Hello', name='World')
```

Multi-thread usage.
Expand Down
6 changes: 4 additions & 2 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
for mask in masks:
if not logger is None and df.loc[mask].size > 0:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
"Filtering contradictory missing code",
sensor=sensor,
metric=metric,
date=date.strftime(format="%Y-%m-%d"),
)
df = df.loc[~mask]
elif logger is None and df.loc[mask].size > 0:
Expand Down
3 changes: 1 addition & 2 deletions _delphi_utils_python/delphi_utils/flash_eval/eval_day.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ def output(evd_ranking, day, lag, signal, logger):
p_text += f"\t{start_link}|*{index}*, {'{:.2f}'.format(value)}>\n"
else:
break
name = f"Signal: {signal} Lag: {lag}"
logger.info(name, payload=p_text)
logger.info("FLaSH: worth inspecting", signal=signal, lag=lag, payload=p_text)


def evd_ranking_fn(ts_streams, EVD_max, EVD_min):
Expand Down
10 changes: 5 additions & 5 deletions _delphi_utils_python/delphi_utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Structured logger utility for creating JSON logs.

See the delphi_utils README.md for usage examples.
To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic,
the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier),
and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument
to the logger call (for use in filtering, thresholding, grouping, visualization, etc)

The Delphi group uses two ~identical versions of this file.
Try to keep them in sync with edits, for sanity.
https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py
https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py
See the delphi_utils README.md for usage examples.
"""

import contextlib
Expand Down
19 changes: 14 additions & 5 deletions _delphi_utils_python/delphi_utils/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],

#Get version and indicator name for startup
ind_name = indicator_fn.__module__.replace(".run", "")

#Check for version.cfg in indicator directory
if os.path.exists("version.cfg"):
with open("version.cfg") as ver_file:
Expand All @@ -59,9 +60,15 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
if "current_version" in line:
current_version = str.strip(line)
current_version = current_version.replace("current_version = ", "")
#Logging - Starting Indicator
logger.info(f"Started {ind_name} with covidcast-indicators version {current_version}")
else: logger.info(f"Started {ind_name} without version.cfg")
logger.info(
"Started a covidcast-indicator",
indicator_name=ind_name,
current_version=current_version,
)
else:
logger.info(
"Started a covidcast-indicator without version.cfg", indicator_name=ind_name
)

indicator_fn(params)
validator = validator_fn(params)
Expand All @@ -77,8 +84,10 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
break
time.sleep(1)
else:
logger.error(f"Flash step timed out ({timer} s), terminating",
elapsed_time_in_seconds = round(time.time() - start, 2))
logger.error(
"Flash step timed out, terminating",
elapsed_time_in_seconds=round(time.time() - start, 2),
)
t1.terminate()
t1.join()
if validator:
Expand Down
10 changes: 6 additions & 4 deletions _delphi_utils_python/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,13 @@ def test_export_df_with_missingness(self, tmp_path):

@mock.patch("delphi_utils.logger")
def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path):

sensor = "test"
geo_res = "state"
create_export_csv(
df=self.DF3.copy(),
export_dir=tmp_path,
geo_res="state",
sensor="test",
sensor=sensor,
geo_res=geo_res,
logger=mock_logger
)
assert set(listdir(tmp_path)) == set(
Expand All @@ -339,8 +340,9 @@ def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path):
]
)
assert pd.read_csv(join(tmp_path, "20200315_state_test.csv")).size > 0
date_str = datetime.strftime(self.TIMES[0], "%Y-%m-%d")
mock_logger.info.assert_called_once_with(
"Filtering contradictory missing code in test_None_2020-02-15."
"Filtering contradictory missing code", sensor=sensor, metric=None, date=date_str
)

def test_export_sort(self, tmp_path):
Expand Down
34 changes: 18 additions & 16 deletions changehc/delphi_changehc/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def retrieve_files(params, filedate, logger):
if files["denom"] is None:

## download recent files from FTP server
logger.info("downloading recent files through SFTP")
logger.info("Downloading recent files through SFTP")
download_counts(filedate, params["indicator"]["input_cache_dir"], params["indicator"]["ftp_conn"])

denom_file = "%s/%s_Counts_Products_Denom.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
Expand Down Expand Up @@ -157,28 +157,30 @@ def run_module(params: Dict[str, Dict[str, Any]]):

startdate, enddate = process_dates(params, startdate_dt, enddate_dt)

logger.info("generating signal and exporting to CSV",
first_sensor_date = startdate,
last_sensor_date = enddate,
drop_date = dropdate,
n_backfill_days = n_backfill_days,
n_waiting_days = n_waiting_days,
geos = params["indicator"]["geos"],
export_dir = params["common"]["export_dir"],
parallel = params["indicator"]["parallel"],
weekday = params["indicator"]["weekday"],
types = params["indicator"]["types"],
se = params["indicator"]["se"])
logger.info(
"Generating signal and exporting to CSV",
first_sensor_date=startdate,
last_sensor_date=enddate,
drop_date=dropdate,
n_backfill_days=n_backfill_days,
n_waiting_days=n_waiting_days,
geos=params["indicator"]["geos"],
export_dir=params["common"]["export_dir"],
parallel=params["indicator"]["parallel"],
weekday=params["indicator"]["weekday"],
types=params["indicator"]["types"],
se=params["indicator"]["se"],
)

## start generating
stats = []
for geo in params["indicator"]["geos"]:
for numtype in params["indicator"]["types"]:
for weekday in params["indicator"]["weekday"]:
if weekday:
logger.info("starting weekday adj", geo = geo, numtype = numtype)
logger.info("Starting weekday adj", geo_type=geo, numtype=numtype)
else:
logger.info("starting no adj", geo = geo, numtype = numtype)
logger.info("Starting no adj", geo_type=geo, numtype=numtype)
su_inst = CHCSensorUpdater(
startdate,
enddate,
Expand Down Expand Up @@ -211,7 +213,7 @@ def run_module(params: Dict[str, Dict[str, Any]]):
)
stats.extend(more_stats)

logger.info("finished processing", geo = geo)
logger.info("Finished processing", geo_type=geo)

elapsed_time_in_seconds = round(time.time() - start_time, 2)
min_max_date = stats and min(s[0] for s in stats)
Expand Down
14 changes: 7 additions & 7 deletions changehc/delphi_changehc/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ def fit(y_data, first_sensor_date, geo_id, logger, num_col="num", den_col="den")
se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)')
rate_data['se'] = se_valid

logger.debug("{0}: {1:.3f},[{2:.3f}]".format(
geo_id, rate_data['rate'][-1], rate_data['se'][-1]
))
return {"geo_id": geo_id,
"rate": 100 * rate_data['rate'],
"se": 100 * rate_data['se'],
"incl": include}
logger.debug(
".fit() DEBUG - last rate/se for geo",
geo_value=geo_id,
value=rate_data["rate"][-1],
se=rate_data["se"][-1],
)
return {"geo_id": geo_id, "rate": 100 * rate_data["rate"], "se": 100 * rate_data["se"], "incl": include}
19 changes: 8 additions & 11 deletions changehc/delphi_changehc/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,15 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa
assert df[suspicious_se_mask].empty, " se contains suspiciously large values"
assert not df["se"].isna().any(), " se contains nan values"
if write_se:
logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
logger.info("WARNING: WRITING SEs", filename=out_name)
else:
df["se"] = np.nan

assert not df["val"].isna().any(), " val contains nan values"
suspicious_val_mask = df["val"].gt(90)
if not df[suspicious_val_mask].empty:
for geo in df.loc[suspicious_val_mask, "geo_id"]:
logger.warning("value suspiciously high, {0}: {1}".format(
geo, out_name
))
logger.warning("Value suspiciously high", geo_value=geo, filename=out_name)

dates = create_export_csv(
df,
Expand All @@ -62,10 +60,8 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa
sensor=out_name,
write_empty_days=True
)
logger.debug("wrote {0} rows for {1} {2}".format(
df.size, df["geo_id"].unique().size, geo_level
))
logger.debug("wrote files to {0}".format(output_path))
logger.debug("Wrote rows", num_rows=df.size, geo_type=geo_level, num_geo_ids=df["geo_id"].unique().size)
logger.debug("Wrote files", export_dir=output_path)
return dates


Expand Down Expand Up @@ -148,8 +144,9 @@ def geo_reindex(self, data):
geo = self.geo
gmpr = GeoMapper()
if geo not in {"county", "state", "msa", "hrr", "nation", "hhs"}:
self.logger.error("{0} is invalid, pick one of 'county', "
"'state', 'msa', 'hrr', 'hss','nation'".format(geo))
self.logger.error(
"Geo is invalid, pick one of 'county', " "'state', 'msa', 'hrr', 'hss','nation'", geo_type=geo
)
return False
if geo == "county":
data_frame = gmpr.fips_to_megacounty(data,
Expand Down Expand Up @@ -224,7 +221,7 @@ def update_sensor(self,
dfs.append(res)
else:
n_cpu = min(10, cpu_count())
self.logger.debug("starting pool with {0} workers".format(n_cpu))
self.logger.debug("Starting pool", n_workers=n_cpu)
with Pool(n_cpu) as pool:
pool_results = []
for geo_id, sub_data in data_frame.groupby(level=0,as_index=False):
Expand Down
3 changes: 2 additions & 1 deletion changehc/tests/test_update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd
import numpy as np
from boto3 import Session
from delphi_utils import get_structured_logger
from moto import mock_s3
import pytest

Expand All @@ -28,7 +29,7 @@
DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"]
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
OUTPATH="test_data/"
TEST_LOGGER = logging.getLogger()
TEST_LOGGER = get_structured_logger()

class TestCHCSensorUpdater:
"""Tests for updating the sensors."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def download(ftp_credentials, out_path, logger):
"""Pull the latest raw files."""
current_time = datetime.datetime.now()
seconds_in_day = 24 * 60 * 60
logger.info("starting download", time=current_time)
logger.info("Starting download")
aysim319 marked this conversation as resolved.
Show resolved Hide resolved

# open client
client = paramiko.SSHClient()
Expand Down
11 changes: 7 additions & 4 deletions claims_hosp/delphi_claims_hosp/indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ def fit(y_data, first_date, geo_id, num_col="num", den_col="den"):
se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)')
rate_data['se'] = se_valid

logging.debug("%s: %05.3f, [%05.3f]",
geo_id, rate_data['rate'][-1], rate_data['se'][-1])
return {"geo_id": geo_id, "rate": 100 * rate_data['rate'],
"se": 100 * rate_data['se'], "incl": include}
logging.debug(
".fit() DEBUG - last rate/se for geo",
geo_value=geo_id,
value=rate_data["rate"][-1],
se=rate_data["se"][-1],
)
return {"geo_id": geo_id, "rate": 100 * rate_data["rate"], "se": 100 * rate_data["se"], "incl": include}
2 changes: 1 addition & 1 deletion claims_hosp/delphi_claims_hosp/modify_claims_drops.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ def modify_and_write(data_path, logger, test_mode=False):
dfs_list.append(dfs)
else:
dfs.to_csv(out_path, index=False)
logger.info(f"Wrote {out_path}")
logger.info("Wrote modified csv", filename=out_path)
return files, dfs_list
12 changes: 6 additions & 6 deletions claims_hosp/delphi_claims_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,17 @@ def run_module(params):
for geo in params["indicator"]["geos"]:
for weekday in params["indicator"]["weekday"]:
if weekday:
logger.info("starting weekday adj", geo = geo)
logger.info("Starting weekday adj", geo_type=geo)
else:
logger.info("starting no weekday adj", geo = geo)
logger.info("Starting no weekday adj", geo_type=geo)

signal_name = Config.signal_weekday_name if weekday else Config.signal_name
if params["indicator"]["write_se"]:
assert params["indicator"]["obfuscated_prefix"] is not None, \
"supply obfuscated prefix in params.json"
signal_name = params["indicator"]["obfuscated_prefix"] + "_" + signal_name

logger.info("Updating signal name", signal_name = signal_name)
logger.info("Updating signal name", signal=signal_name)
updater = ClaimsHospIndicatorUpdater(
startdate,
enddate,
Expand All @@ -135,16 +135,16 @@ def run_module(params):
params["indicator"]["parallel"],
weekday,
params["indicator"]["write_se"],
signal_name
signal_name,
logger,
)
updater.update_indicator(
claims_file,
params["common"]["export_dir"],
logger,
)
max_dates.append(updater.output_dates[-1])
n_csv_export.append(len(updater.output_dates))
logger.info("finished updating", geo = geo)
logger.info("Finished updating", geo_type=geo)

# Remove all the raw files
for fn in os.listdir(params["indicator"]["input_dir"]):
Expand Down
Loading
Loading