Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nssp patching code #2000

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c78ae21
nssp patching code
minhkhul Jul 23, 2024
7694c0a
lint
minhkhul Jul 23, 2024
a3ed4c2
add test
minhkhul Jul 24, 2024
1628d34
add test
minhkhul Jul 24, 2024
2536b94
Add patching how-to to readme
minhkhul Jul 24, 2024
e4d45e5
adjust current_issue_dir name for weekly data instead of daily.
minhkhul Jul 25, 2024
db906fc
lint
minhkhul Jul 25, 2024
8f0bb32
adjust test for more cases
minhkhul Jul 25, 2024
7f151f5
add custom_run flag
minhkhul Aug 6, 2024
c093349
handle custom flag on but bad config
minhkhul Aug 6, 2024
a967416
make patch config check readable
minhkhul Aug 6, 2024
c020da6
make good_patch_config check comprehensive
minhkhul Aug 6, 2024
9a6130b
rewrite good_patch_config for clarity
minhkhul Aug 7, 2024
b8a2177
add unit tests for good_patch_config check
minhkhul Aug 7, 2024
a7d9443
add test_pull unit test for patching case + cleanup format
minhkhul Aug 7, 2024
e29e07e
split test cases + move to pytest
minhkhul Aug 8, 2024
0a4bfb6
add test for multi-week patching
minhkhul Aug 9, 2024
6c0abad
rename tests for clarity + restructure test_patch tests to clarify pu…
minhkhul Aug 15, 2024
7078dd0
make expected issue dates explicit in test_patch_confirm_dir_structur…
minhkhul Aug 15, 2024
d435bf0
add log to distinguish grabbing records from socrata vs locally store…
minhkhul Aug 15, 2024
8734daa
Update nssp/README.md
minhkhul Aug 28, 2024
5a6f8b6
Update nssp/README.md
minhkhul Aug 28, 2024
4356494
Add auto-download source backup data + update docs + test
minhkhul Aug 29, 2024
ca427a4
adjust custom_run flag to leave room for non-patch custom runs
minhkhul Aug 30, 2024
f58b068
move pull logic from run.py into pull.py
minhkhul Aug 30, 2024
5e93175
logger to static
minhkhul Aug 30, 2024
2d8670d
adjust unit tests
minhkhul Aug 30, 2024
f0335f6
more unit test adjustment
minhkhul Aug 31, 2024
7e06f94
move get_source_data to pull.py + make get_source_data run when sourc…
minhkhul Sep 3, 2024
e678ce6
auto-remove source_dir content after finish patch run
minhkhul Sep 3, 2024
bc1d7a7
lint happy
minhkhul Sep 3, 2024
84cba84
Update pull.py
minhkhul Sep 10, 2024
742737b
Update pull.py - remove stat debug
minhkhul Sep 10, 2024
e13d3db
add progress log for source file download
minhkhul Sep 10, 2024
9cec6ff
lint
minhkhul Sep 10, 2024
5450d8b
lint
minhkhul Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions nssp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,33 @@ with the percentage of code covered by the tests.

None of the linting or unit tests should fail, and the code lines that are not covered by unit tests should be small and
should not include critical sub-routines.

## Running Patches:
A daily backup of from source in the form of csv files can be found on `bigchunk-dev-02` under `/common/source_backup/nssp`. Talk to your sysadmin for access.
minhkhul marked this conversation as resolved.
Show resolved Hide resolved

You can also generate your own backup from source by setting up a cron job that runs the following .py every day when a pipeline outtage is going on on our side but aource api is still available:
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
```
import numpy as np
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why do we need to make a backup? Also suggest putting this in a indicator-specific dir inside of the testing-utils dir.

import pandas as pd
from sodapy import Socrata
from datetime import date

today = date.today()
socrata_token = 'FILL_YOUR_OWN_TOKEN_HERE'
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get("rdmq-nq56", limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit
df_ervisits = pd.DataFrame.from_records(results)
df_ervisits.to_csv(f'~/{today}.csv', index=False)
```
When you're ready to create patching data for a specific date range in batch issue format, adjust `params.json` in accordance with instructions in `patch.py`, move the backup csv files into your chosen `source_dir`, then run
```
env/bin/python -m delphi_nssp.patch
```
83 changes: 83 additions & 0 deletions nssp/delphi_nssp/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
This module is used for patching data in the delphi_nssp package.

To use this module, you need to specify the range of issue dates in params.json, like so:

{
"common": {
...
},
"validation": {
...
},
"patch": {
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
"patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/nssp/AprilPatch",
"start_issue": "2024-04-20",
"end_issue": "2024-04-21",
"source_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/nssp/source_data"
Copy link
Contributor

@nmdefries nmdefries Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: what is the source_dir used for? Isn't NSSP data fetched from an API?

Ideally, I'd like the patch params to be the same between indicators. Is source_dir info that we already have somewhere else (e.g. if it's required for a patch, maybe it's also required for a normal run, in which case it would be stored in the indicator section?)?

Copy link
Contributor Author

@minhkhul minhkhul Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nssp data is fetched from an API when it is run normally. The API does not have versioning (the numbers do gets updated), so we keep a daily stash of what the api returns every day on a separate server, which is used here as source data for issue dates in patching.

}
}

It will generate data for that range of issue dates, and store them in batch issue format:
[name-of-patch]/issue_[issue-date]/nssp/actual_data_file.csv
"""

from datetime import datetime, timedelta
from os import makedirs, path

from delphi_utils import get_structured_logger, read_params
from epiweeks import Week

from .run import run_module


def patch():
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
"""
Run the nssp indicator for a range of issue dates.

The range of issue dates is specified in params.json using the following keys:
- "patch": Only used for patching data
- "start_date": str, YYYY-MM-DD format, first issue date
- "end_date": str, YYYY-MM-DD format, last issue date
- "patch_dir": str, directory to write all issues output
- "source_dir": str, directory to read source data from.
"""
params = read_params()
logger = get_structured_logger("delphi_nssp.patch", filename=params["common"]["log_filename"])

start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")

logger.info(f"""Start patching {params["patch"]["patch_dir"]}""")
logger.info(f"""Start issue: {start_issue.strftime("%Y-%m-%d")}""")
logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""")
logger.info(f"""Source from: {params["patch"]["source_dir"]}""")
logger.info(f"""Output to: {params["patch"]["patch_dir"]}""")

makedirs(params["patch"]["patch_dir"], exist_ok=True)

current_issue = start_issue
while current_issue <= end_issue:
logger.info(f"""Running issue {current_issue.strftime("%Y-%m-%d")}""")

current_issue_source_csv = (
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
f"""{params.get("patch", {}).get("source_dir")}/{current_issue.strftime("%Y-%m-%d")}.csv"""
)
if not path.isfile(current_issue_source_csv):
logger.info(f"No source data at {current_issue_source_csv}")
current_issue += timedelta(days=1)
continue

params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d")

current_issue_week = Week.fromdate(current_issue)
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_week}/nssp"""
makedirs(f"{current_issue_dir}", exist_ok=True)
params["common"]["export_dir"] = f"""{current_issue_dir}"""

run_module(params, logger)
current_issue += timedelta(days=1)


if __name__ == "__main__":
patch()
34 changes: 21 additions & 13 deletions nssp/delphi_nssp/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def warn_string(df, type_dict):
return warn


def pull_nssp_data(socrata_token: str):
def pull_nssp_data(socrata_token: str, issue_date: str = None, source_dir: str = None) -> pd.DataFrame:
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.

The output dataset has:
Expand All @@ -39,6 +39,11 @@ def pull_nssp_data(socrata_token: str):
----------
socrata_token: str
My App Token for pulling the NWSS data (could be the same as the nchs data)
issue_date: Optional[str]
(patching mode only) YYYY-MM-DD formatted date of the issue to pull data for
source_dir: Optional[str]
(patching mode only) Path to the directory containing the source data csv files
The files in source_dir are expected to be named yyyy-mm-dd.csv
test_file: Optional[str]
When not null, name of file from which to read test data

Expand All @@ -47,18 +52,21 @@ def pull_nssp_data(socrata_token: str):
pd.DataFrame
Dataframe as described above.
"""
# Pull data from Socrata API
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get("rdmq-nq56", limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit
df_ervisits = pd.DataFrame.from_records(results)
if not issue_date:
# Pull data from Socrata API
client = Socrata("data.cdc.gov", socrata_token)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get("rdmq-nq56", limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit
df_ervisits = pd.DataFrame.from_records(results)
else:
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
df_ervisits = pd.read_csv(f"{source_dir}/{issue_date}.csv")
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)

Expand Down
17 changes: 10 additions & 7 deletions nssp/delphi_nssp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def logging(start_time, run_stats, logger):
)


def run_module(params):
def run_module(params, logger=None):
"""
Run the indicator.

Expand All @@ -72,18 +72,21 @@ def run_module(params):
Nested dictionary of parameters.
"""
start_time = time.time()
logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)
issue_date = params.get("patch", {}).get("current_issue", None)
source_dir = params.get("patch", {}).get("source_dir", None)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
if not logger:
logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)
export_dir = params["common"]["export_dir"]
socrata_token = params["indicator"]["socrata_token"]

run_stats = []
## build the base version of the signal at the most detailed geo level you can get.
## compute stuff here or farm out to another function or file
df_pull = pull_nssp_data(socrata_token)
df_pull = pull_nssp_data(socrata_token, issue_date, source_dir)
## aggregate
geo_mapper = GeoMapper()
for signal in SIGNALS:
Expand Down
2 changes: 2 additions & 0 deletions nssp/tests/source_dir/2021-01-02.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
week_end,geography,county,percent_visits_combined,percent_visits_covid,percent_visits_influenza,percent_visits_rsv,percent_visits_smoothed,percent_visits_smoothed_covid,percent_visits_smoothed_1,percent_visits_smoothed_rsv,ed_trends_covid,ed_trends_influenza,ed_trends_rsv,hsa,hsa_counties,hsa_nci_id,fips,trend_source
2022-10-01T00:00:00.000,United States,All,2.84,1.84,0.48,0.55,2.83,2.07,0.34,0.44,Decreasing,Increasing,Increasing,All,All,All,0,United States
38 changes: 38 additions & 0 deletions nssp/tests/test_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import unittest
from unittest.mock import patch as mock_patch, call
from delphi_nssp.patch import patch
import os
import shutil

class TestPatchModule(unittest.TestCase):
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
def test_patch(self):
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
with mock_patch('delphi_nssp.patch.run_module') as mock_run_module, \
mock_patch('delphi_nssp.patch.get_structured_logger') as mock_get_structured_logger, \
mock_patch('delphi_nssp.patch.read_params') as mock_read_params:

mock_read_params.return_value = {
"common": {
"log_filename": "test.log"
},
"patch": {
"start_issue": "2021-01-01",
"end_issue": "2021-01-02",
"patch_dir": "./patch_dir",
"source_dir": "./source_dir"
}
}

patch()

self.assertIn('current_issue', mock_read_params.return_value['patch'])
self.assertEqual(mock_read_params.return_value['patch']['current_issue'], '2021-01-02')

self.assertTrue(os.path.isdir('./patch_dir'))
self.assertTrue(os.path.isdir('./patch_dir/issue_202053/nssp'))
self.assertFalse(os.path.isdir('./patch_dir/issue_202101/nssp'))

# Clean up the created directories after the test
shutil.rmtree(mock_read_params.return_value["patch"]["patch_dir"])

if __name__ == '__main__':
unittest.main()
Loading