Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2034 add retry loop #2057

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions google_symptoms/delphi_google_symptoms/pull.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Retrieve data and wrangle into appropriate format."""
# -*- coding: utf-8 -*-
import random
import re
import time
from datetime import date, datetime # pylint: disable=unused-import

import numpy as np
import pandas as pd
import pandas_gbq
from google.api_core.exceptions import BadRequest, InternalServerError, ServerError
from google.oauth2 import service_account

from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS
Expand Down Expand Up @@ -184,16 +187,30 @@ def pull_gs_data_one_geolevel(level, date_range):
pd.DataFrame
"""
query = produce_query(level, date_range)
df = None

# recommends to only try once for 500/503 error
try:
df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS)
# pylint: disable=W0703
except Exception as e:
# sometimes google throws out 400 error when it's 500
# https://github.com/googleapis/python-bigquery/issues/23
if (
# pylint: disable=E1101
(isinstance(e, BadRequest) and e.reason == "backendError")
or isinstance(e, (ServerError, InternalServerError))
):
time.sleep(2 + random.randint(0, 1000) / 1000.0)
else:
raise e
if df is None:
df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS)

df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes = DTYPE_CONVERSIONS)
if len(df) == 0:
df = pd.DataFrame(
columns=["open_covid_region_code", "date"] +
list(colname_map.keys())
)
df = pd.DataFrame(columns=["open_covid_region_code", "date"] + list(colname_map.keys()))

df = preprocess(df, level)

return df


Expand Down
38 changes: 38 additions & 0 deletions google_symptoms/tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import mock
from freezegun import freeze_time
from datetime import date, datetime
from google.api_core.exceptions import BadRequest, ServerError

import pandas as pd
from google.rpc import error_details_pb2
from pandas.testing import assert_frame_equal

from delphi_google_symptoms.pull import (
Expand Down Expand Up @@ -120,6 +123,41 @@ def test_pull_one_gs_no_dates(self, mock_read_gbq):
expected = pd.DataFrame(columns=new_keep_cols)
assert_frame_equal(output, expected, check_dtype = False)

def test_pull_one_gs_retry_success(self):
info = error_details_pb2.ErrorInfo(
reason="backendError",
)
badRequestException = BadRequest(message="message", error_info=info)
serverErrorException = ServerError(message="message")

with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq:
mock_read_gbq.side_effect = [badRequestException, pd.DataFrame()]

output = pull_gs_data_one_geolevel("state", ["", ""])
expected = pd.DataFrame(columns=new_keep_cols)
assert_frame_equal(output, expected, check_dtype = False)
assert mock_read_gbq.call_count == 2

def test_pull_one_gs_retry_too_many(self):
info = error_details_pb2.ErrorInfo(
reason="backendError",
)
badRequestException = BadRequest(message="message", error_info=info)

with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq:
with pytest.raises(BadRequest):
mock_read_gbq.side_effect = [badRequestException, badRequestException, pd.DataFrame()]
pull_gs_data_one_geolevel("state", ["", ""])


def test_pull_one_gs_retry_bad(self):
badRequestException = BadRequest(message="message", )

with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq:
with pytest.raises(BadRequest):
mock_read_gbq.side_effect = [badRequestException,pd.DataFrame()]
pull_gs_data_one_geolevel("state", ["", ""])

def test_preprocess_no_data(self):
output = preprocess(pd.DataFrame(columns=keep_cols), "state")
expected = pd.DataFrame(columns=new_keep_cols)
Expand Down
Loading