diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index 8a485de4f..a8c4cdfde 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -1,11 +1,14 @@ """Retrieve data and wrangle into appropriate format.""" # -*- coding: utf-8 -*- +import random import re +import time from datetime import date, datetime # pylint: disable=unused-import import numpy as np import pandas as pd import pandas_gbq +from google.api_core.exceptions import BadRequest, InternalServerError, ServerError from google.oauth2 import service_account from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS @@ -184,16 +187,30 @@ def pull_gs_data_one_geolevel(level, date_range): pd.DataFrame """ query = produce_query(level, date_range) + df = None + + # recommends to only try once for 500/503 error + try: + df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS) + # pylint: disable=W0703 + except Exception as e: + # sometimes google throws out 400 error when it's 500 + # https://github.com/googleapis/python-bigquery/issues/23 + if ( + # pylint: disable=E1101 + (isinstance(e, BadRequest) and e.reason == "backendError") + or isinstance(e, (ServerError, InternalServerError)) + ): + time.sleep(2 + random.randint(0, 1000) / 1000.0) + else: + raise e + if df is None: + df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS) - df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes = DTYPE_CONVERSIONS) if len(df) == 0: - df = pd.DataFrame( - columns=["open_covid_region_code", "date"] + - list(colname_map.keys()) - ) + df = pd.DataFrame(columns=["open_covid_region_code", "date"] + list(colname_map.keys())) df = preprocess(df, level) - return df diff --git a/google_symptoms/tests/test_pull.py b/google_symptoms/tests/test_pull.py index 16792ab16..4367995b8 100644 --- a/google_symptoms/tests/test_pull.py +++ b/google_symptoms/tests/test_pull.py @@ -2,7 +2,10 @@ import mock from freezegun import freeze_time from datetime import date, datetime +from google.api_core.exceptions import BadRequest, ServerError + import pandas as pd +from google.rpc import error_details_pb2 from pandas.testing import assert_frame_equal from delphi_google_symptoms.pull import ( @@ -120,6 +123,41 @@ def test_pull_one_gs_no_dates(self, mock_read_gbq): expected = pd.DataFrame(columns=new_keep_cols) assert_frame_equal(output, expected, check_dtype = False) + def test_pull_one_gs_retry_success(self): + info = error_details_pb2.ErrorInfo( + reason="backendError", + ) + badRequestException = BadRequest(message="message", error_info=info) + serverErrorException = ServerError(message="message") + + with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: + mock_read_gbq.side_effect = [badRequestException, pd.DataFrame()] + + output = pull_gs_data_one_geolevel("state", ["", ""]) + expected = pd.DataFrame(columns=new_keep_cols) + assert_frame_equal(output, expected, check_dtype = False) + assert mock_read_gbq.call_count == 2 + + def test_pull_one_gs_retry_too_many(self): + info = error_details_pb2.ErrorInfo( + reason="backendError", + ) + badRequestException = BadRequest(message="message", error_info=info) + + with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: + with pytest.raises(BadRequest): + mock_read_gbq.side_effect = [badRequestException, badRequestException, pd.DataFrame()] + pull_gs_data_one_geolevel("state", ["", ""]) + + + def test_pull_one_gs_retry_bad(self): + badRequestException = BadRequest(message="message", ) + + with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: + with pytest.raises(BadRequest): + mock_read_gbq.side_effect = [badRequestException,pd.DataFrame()] + pull_gs_data_one_geolevel("state", ["", ""]) + def test_preprocess_no_data(self): output = preprocess(pd.DataFrame(columns=keep_cols), "state") expected = pd.DataFrame(columns=new_keep_cols)