Skip to content

Commit

Permalink
suggested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
aysim319 committed Sep 18, 2024
1 parent 2f94d15 commit 57fc591
Show file tree
Hide file tree
Showing 16 changed files with 83 additions and 7,555 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,20 @@
from epiweeks import Week


def _date_to_api_string(d: datetime, time_type: str = "day") -> str:
def date_to_api_string(d: datetime, time_type: str = "day") -> str:
"""Convert a date object to a YYYYMMDD or YYYYMM string expected by the API."""
# pylint: disable=R1705
if time_type == "day":
return d.strftime("%Y%m%d")
elif time_type == "week":
if time_type == "week":
return Week.fromdate(d).cdcformat()
raise ValueError(f"Unknown time_type: {time_type}")


def _parse_datetimes(df: pd.DataFrame, col: str, date_format: str = "%Y%m%d") -> pd.Series:
def convert_apitime_column_to_datetimes(df: pd.DataFrame, col: str, date_format: str = "%Y%m%d") -> pd.Series:
"""Convert a DataFrame date or epiweek column into datetimes.
Assumes the column is string type. Dates are assumed to be in the YYYYMMDD
format by default. Weeks are assumed to be in the epiweek CDC format YYYYWW
Dates are assumed to be in the YYYYMMDD format by default.
Weeks are assumed to be in the epiweek CDC format YYYYWW
format and return the date of the first day of the week.
"""
df[col] = df[col].astype("str")
Expand Down
55 changes: 7 additions & 48 deletions _delphi_utils_python/delphi_utils/validator/datafetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,9 @@ def get_geo_signal_combos(data_source, api_key):

response = Epidata.covidcast_meta()

# pylint: disable=R1720
if response["result"] != 1:
# Something failed in the API and we did not get real metadata
raise RuntimeError(
"Error when fetching metadata from the API", response["message"]
)

# pylint: disable=I0021
else:
meta = pd.DataFrame.from_dict(response["epidata"])
# note: this will fail for signals with weekly data, but currently not supported for validation
meta = meta[meta["time_type"] == "day"]
meta = pd.DataFrame.from_dict(Epidata.check(response))
# note: this will fail for signals with weekly data, but currently not supported for validation
meta = meta[meta["time_type"] == "day"]

source_meta = meta[meta['data_source'] == data_source]
# Need to convert np.records to tuples so they are hashable and can be used in sets and dicts.
Expand Down Expand Up @@ -177,52 +168,20 @@ def fetch_api_reference(data_source, start_date, end_date, geo_type, signal_type
"""
if start_date > end_date:
raise ValueError(
"end_day must be on or after start_day, but "
f"start_day = '{start_date}', end_day = '{end_date}'"
"end_date must be on or after start_date, but " f"start_date = '{start_date}', end_date = '{end_date}'"
)
response = Epidata.covidcast(
data_source,
signal_type,
time_type="day",
geo_type=geo_type,
time_values=Epidata.range(
start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
),
time_values=Epidata.range(start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")),
geo_value="*",
)
if response["result"] != 1:
# Something failed in the API and we did not get real signal data
raise RuntimeError(
"Error when fetching signal data from the API", response["message"]
)

# pylint: disable=E1124
if response["message"] not in {"success", "no results"}:
# pylint: disable=E1123
warnings.warn(
"Problem obtaining data",
# pylint: disable=E0602
RuntimeWarning,
message=response["message"],
data_source=data_source,
signal=signal,
time_value=params["time_values"],
geo_type=geo_type,
)
response = Epidata.covidcast(
data_source,
signal_type,
time_type="day",
geo_type=geo_type,
time_values=Epidata.range(
start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
),
geo_value="*",
)
api_df = pd.DataFrame.from_dict(Epidata.check(response))

api_df = None
if len(response["epidata"]) > 0:
api_df = pd.DataFrame.from_dict(response["epidata"])
if isinstance(api_df, pd.DataFrame) and len(api_df) > 0:
# note: this will fail for signals with weekly data, but currently not supported for validation
api_df["issue"] = pd.to_datetime(api_df["issue"], format="%Y%m%d")
api_df["time_value"] = pd.to_datetime(api_df["time_value"], format="%Y%m%d")
Expand Down
14 changes: 5 additions & 9 deletions google_symptoms/delphi_google_symptoms/date_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from itertools import product
from typing import Dict, List, Union

import pandas as pd
from delphi_epidata import Epidata
from delphi_utils.validator.utils import lag_converter
from pandas import to_datetime

from .constants import COMBINED_METRIC, FULL_BKFILL_START_DATE, PAD_DAYS, SMOOTHERS

Expand Down Expand Up @@ -68,11 +68,6 @@ def generate_num_export_days(params: Dict, logger) -> [int]:
params["indicator"].get("export_end_date", datetime.strftime(date.today(), "%Y-%m-%d")), "%Y-%m-%d"
)

# Generate a list of signals we expect to produce
sensor_names = set(
"_".join([metric, smoother, "search"]) for metric, smoother in product(COMBINED_METRIC, SMOOTHERS)
)

num_export_days = params["indicator"]["num_export_days"]
custom_run = False if not params["common"].get("custom_run") else params["common"].get("custom_run", False)

Expand All @@ -83,7 +78,8 @@ def generate_num_export_days(params: Dict, logger) -> [int]:
)
Epidata.auth = ("epidata", params["indicator"]["api_credentials"])
# Fetch metadata to check how recent each signal is
metadata = Epidata.covidcast_meta()
response = Epidata.covidcast_meta()
metadata = pd.DataFrame.from_dict(Epidata.check(response))
# Filter to only those we currently want to produce, ignore any old or deprecated signals
gs_metadata = metadata[(metadata.data_source == "google-symptoms") & (metadata.signal.isin(sensor_names))]

Expand All @@ -92,7 +88,7 @@ def generate_num_export_days(params: Dict, logger) -> [int]:
logger.warning("Signals missing in the epidata; backfilling full history")
num_export_days = (export_end_date - FULL_BKFILL_START_DATE).days + 1
else:
latest_date_diff = (datetime.today() - to_datetime(min(gs_metadata.max_time))).days + 1
latest_date_diff = (datetime.today() - pd.to_datetime(min(gs_metadata.max_time))).days + 1

expected_date_diff = params["validation"]["common"].get("span_length", 14)

Expand All @@ -102,7 +98,7 @@ def generate_num_export_days(params: Dict, logger) -> [int]:
expected_date_diff += global_max_expected_lag

if latest_date_diff > expected_date_diff:
logger.info(f"Missing dates from: {to_datetime(min(gs_metadata.max_time)).date()}")
logger.info(f"Missing dates from: {pd.to_datetime(min(gs_metadata.max_time)).date()}")

num_export_days = expected_date_diff

Expand Down
19 changes: 9 additions & 10 deletions google_symptoms/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from pathlib import Path
import re

import json
import copy
import pytest
import mock
Expand Down Expand Up @@ -41,6 +41,7 @@
# from `bigquery-public-data.covid19_symptom_search.counties_daily_2020` # Counties by day; includes state and county name, + FIPS code
# where timestamp(date) between timestamp("2020-07-15") and timestamp("2020-08-22")


good_input = {
"state": f"{TEST_DIR}/test_data/small_states_2020_07_15_2020_08_22.csv",
"county": f"{TEST_DIR}/test_data/small_counties_2020_07_15_2020_08_22.csv"
Expand All @@ -58,18 +59,13 @@
county_data = pd.read_csv(
good_input["county"], parse_dates=["date"])[keep_cols]

state_data_gap = pd.read_csv(patch_input["state"], parse_dates=["date"])[keep_cols]

covidcast_backfill_metadata = pd.read_csv(f"{TEST_DIR}/test_data/covid_metadata_backfill.csv",
parse_dates=["max_time", "min_time", "max_issue", "last_update"])
covidcast_metadata = pd.read_csv(f"{TEST_DIR}/test_data/covid_metadata.csv",
parse_dates=["max_time", "min_time", "max_issue", "last_update"])
state_data_gap = pd.read_csv(patch_input["state"], parse_dates=["date"])[keep_cols]

NEW_DATE = "2024-02-20"
@pytest.fixture(scope="session")
def logger():
return logging.getLogger()

@pytest.fixture(scope="session")
def params():
params = {
Expand Down Expand Up @@ -125,7 +121,7 @@ def run_as_module(params):
return_value=None), \
mock.patch("pandas_gbq.read_gbq") as mock_read_gbq, \
mock.patch("delphi_google_symptoms.pull.initialize_credentials", return_value=None), \
mock.patch("delphi_google_symptoms.date_utils.Epidata.covidcast_meta", return_value=None) as mock_covidcast_meta:
mock.patch("delphi_google_symptoms.date_utils.Epidata.covidcast_meta") as mock_covidcast_meta:
def side_effect(*args, **kwargs):
if "symptom_search_sub_region_1_daily" in args[0]:
df = state_data
Expand All @@ -140,5 +136,8 @@ def side_effect(*args, **kwargs):
else:
return pd.DataFrame()

mock_read_gbq.side_effect = side_effect
delphi_google_symptoms.run.run_module(params)
with open(f"{TEST_DIR}/test_data/covid_metadata.json", "r") as f:
covidcast_meta = json.load(f)
mock_covidcast_meta = covidcast_meta
mock_read_gbq.side_effect = side_effect
delphi_google_symptoms.run.run_module(params)
Loading

0 comments on commit 57fc591

Please sign in to comment.