From 8dd4ff25bff5dae7bdfef86a27e44bf18393a100 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 27 Aug 2024 10:11:10 -0400 Subject: [PATCH 1/7] first implementation --- .../delphi_google_symptoms/pull.py | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index 8a485de4f..553853d44 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -1,13 +1,14 @@ """Retrieve data and wrangle into appropriate format.""" # -*- coding: utf-8 -*- import re +import time from datetime import date, datetime # pylint: disable=unused-import import numpy as np import pandas as pd import pandas_gbq from google.oauth2 import service_account - +from google.api_core.exceptions import BadRequest from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS from .date_utils import generate_query_dates @@ -184,16 +185,21 @@ def pull_gs_data_one_geolevel(level, date_range): pd.DataFrame """ query = produce_query(level, date_range) - - df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes = DTYPE_CONVERSIONS) - if len(df) == 0: - df = pd.DataFrame( - columns=["open_covid_region_code", "date"] + - list(colname_map.keys()) - ) - - df = preprocess(df, level) - + df = pd.DataFrame() + try: + df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes = DTYPE_CONVERSIONS) + except BadRequest as e: + if e.reason == "backendError": + time.sleep(5) + df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS) + else: + if len(df) == 0: + df = pd.DataFrame( + columns=["open_covid_region_code", "date"] + + list(colname_map.keys()) + ) + + df = preprocess(df, level) return df From 7674e7d2ca9b6b6cdde944223bbdba2a5a39ceb9 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Mon, 16 Sep 2024 12:26:08 -0400 Subject: [PATCH 2/7] add testing and more robust conditions --- .../delphi_google_symptoms/constants.py | 2 + .../delphi_google_symptoms/pull.py | 37 ++++++++++-------- google_symptoms/tests/test_pull.py | 38 +++++++++++++++++++ 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/google_symptoms/delphi_google_symptoms/constants.py b/google_symptoms/delphi_google_symptoms/constants.py index 795ac3df7..bd5c0361c 100644 --- a/google_symptoms/delphi_google_symptoms/constants.py +++ b/google_symptoms/delphi_google_symptoms/constants.py @@ -113,3 +113,5 @@ FULL_BKFILL_START_DATE = datetime.strptime("2020-02-20", "%Y-%m-%d") PAD_DAYS = 7 + +NUM_RETRIES = 3 diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index 553853d44..c133ee146 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -7,9 +7,10 @@ import numpy as np import pandas as pd import pandas_gbq +from google.api_core.exceptions import BadRequest, ServerError from google.oauth2 import service_account -from google.api_core.exceptions import BadRequest -from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS + +from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, NUM_RETRIES, SYMPTOM_SETS from .date_utils import generate_query_dates # Create map of BigQuery symptom column names to desired column names. @@ -185,21 +186,25 @@ def pull_gs_data_one_geolevel(level, date_range): pd.DataFrame """ query = produce_query(level, date_range) - df = pd.DataFrame() - try: - df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes = DTYPE_CONVERSIONS) - except BadRequest as e: - if e.reason == "backendError": - time.sleep(5) + df = None + for num_try in range(NUM_RETRIES): + try: df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS) - else: - if len(df) == 0: - df = pd.DataFrame( - columns=["open_covid_region_code", "date"] + - list(colname_map.keys()) - ) - - df = preprocess(df, level) + except Exception as e: + # sometimes google throws out 400 error when it's 500 + # https://github.com/googleapis/python-bigquery/issues/23 + if num_try < NUM_RETRIES - 1 and ( + (isinstance(e, BadRequest) and e.reason == "backendError") or isinstance(e, ServerError) + ): + # time.sleep(5) + continue + else: + raise e + + if len(df) == 0: + df = pd.DataFrame(columns=["open_covid_region_code", "date"] + list(colname_map.keys())) + + df = preprocess(df, level) return df diff --git a/google_symptoms/tests/test_pull.py b/google_symptoms/tests/test_pull.py index 16792ab16..5f604157b 100644 --- a/google_symptoms/tests/test_pull.py +++ b/google_symptoms/tests/test_pull.py @@ -2,7 +2,10 @@ import mock from freezegun import freeze_time from datetime import date, datetime +from google.api_core.exceptions import BadRequest, ServerError + import pandas as pd +from google.rpc import error_details_pb2 from pandas.testing import assert_frame_equal from delphi_google_symptoms.pull import ( @@ -120,6 +123,41 @@ def test_pull_one_gs_no_dates(self, mock_read_gbq): expected = pd.DataFrame(columns=new_keep_cols) assert_frame_equal(output, expected, check_dtype = False) + def test_pull_one_gs_retry_success(self): + info = error_details_pb2.ErrorInfo( + reason="backendError", + ) + badRequestException = BadRequest(message="message", error_info=info) + serverErrorException = ServerError(message="message") + + with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: + mock_read_gbq.side_effect = [badRequestException, serverErrorException, pd.DataFrame()] + + output = pull_gs_data_one_geolevel("state", ["", ""]) + expected = pd.DataFrame(columns=new_keep_cols) + assert_frame_equal(output, expected, check_dtype = False) + assert mock_read_gbq.call_count == 3 + + def test_pull_one_gs_retry_too_many(self): + info = error_details_pb2.ErrorInfo( + reason="backendError", + ) + badRequestException = BadRequest(message="message", error_info=info) + + with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: + with pytest.raises(BadRequest): + mock_read_gbq.side_effect = [badRequestException, badRequestException, badRequestException, pd.DataFrame()] + pull_gs_data_one_geolevel("state", ["", ""]) + + + def test_pull_one_gs_retry_bad(self): + badRequestException = BadRequest(message="message", ) + + with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: + with pytest.raises(BadRequest): + mock_read_gbq.side_effect = [badRequestException,pd.DataFrame()] + pull_gs_data_one_geolevel("state", ["", ""]) + def test_preprocess_no_data(self): output = preprocess(pd.DataFrame(columns=keep_cols), "state") expected = pd.DataFrame(columns=new_keep_cols) From 984a67d58483f7559d8ca076c71578739623e9b6 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Mon, 16 Sep 2024 12:36:30 -0400 Subject: [PATCH 3/7] revert unneeded change --- google_symptoms/delphi_google_symptoms/pull.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index c133ee146..971ee6c92 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -186,7 +186,7 @@ def pull_gs_data_one_geolevel(level, date_range): pd.DataFrame """ query = produce_query(level, date_range) - df = None + df = pd.DataFrame() for num_try in range(NUM_RETRIES): try: df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS) From c9abe5e1c54cfe1776e95f6aff6c5889ab367584 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 19 Sep 2024 13:22:34 -0400 Subject: [PATCH 4/7] only retry once and added other applicable error --- .../delphi_google_symptoms/constants.py | 2 -- google_symptoms/delphi_google_symptoms/pull.py | 16 +++++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/google_symptoms/delphi_google_symptoms/constants.py b/google_symptoms/delphi_google_symptoms/constants.py index bd5c0361c..795ac3df7 100644 --- a/google_symptoms/delphi_google_symptoms/constants.py +++ b/google_symptoms/delphi_google_symptoms/constants.py @@ -113,5 +113,3 @@ FULL_BKFILL_START_DATE = datetime.strptime("2020-02-20", "%Y-%m-%d") PAD_DAYS = 7 - -NUM_RETRIES = 3 diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index 971ee6c92..8f27f6a03 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -7,10 +7,10 @@ import numpy as np import pandas as pd import pandas_gbq -from google.api_core.exceptions import BadRequest, ServerError +from google.api_core.exceptions import BadRequest, ServerError, InternalServerError from google.oauth2 import service_account -from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, NUM_RETRIES, SYMPTOM_SETS +from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS from .date_utils import generate_query_dates # Create map of BigQuery symptom column names to desired column names. @@ -187,16 +187,18 @@ def pull_gs_data_one_geolevel(level, date_range): """ query = produce_query(level, date_range) df = pd.DataFrame() - for num_try in range(NUM_RETRIES): + num_try = 0 + # recommends to only try once for 500/503 error + while num_try < 1: try: df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS) except Exception as e: # sometimes google throws out 400 error when it's 500 # https://github.com/googleapis/python-bigquery/issues/23 - if num_try < NUM_RETRIES - 1 and ( - (isinstance(e, BadRequest) and e.reason == "backendError") or isinstance(e, ServerError) - ): - # time.sleep(5) + if (isinstance(e, BadRequest) and e.reason == "backendError") or isinstance(e, ServerError) or \ + isinstance(e, InternalServerError): + time.sleep((2 ** num_try) + random.random(0, 1000)/ 1000.0) + num_try = NUM_RETRIES - 1 continue else: raise e From 9d711f43a668c3a03269aa6d871312c8f8d1322e Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 19 Sep 2024 13:24:15 -0400 Subject: [PATCH 5/7] lint --- google_symptoms/delphi_google_symptoms/pull.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index 8f27f6a03..b9bec7abb 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -7,7 +7,7 @@ import numpy as np import pandas as pd import pandas_gbq -from google.api_core.exceptions import BadRequest, ServerError, InternalServerError +from google.api_core.exceptions import BadRequest, InternalServerError, ServerError from google.oauth2 import service_account from .constants import COMBINED_METRIC, DC_FIPS, DTYPE_CONVERSIONS, METRICS, SYMPTOM_SETS @@ -195,9 +195,12 @@ def pull_gs_data_one_geolevel(level, date_range): except Exception as e: # sometimes google throws out 400 error when it's 500 # https://github.com/googleapis/python-bigquery/issues/23 - if (isinstance(e, BadRequest) and e.reason == "backendError") or isinstance(e, ServerError) or \ - isinstance(e, InternalServerError): - time.sleep((2 ** num_try) + random.random(0, 1000)/ 1000.0) + if ( + (isinstance(e, BadRequest) and e.reason == "backendError") + or isinstance(e, ServerError) + or isinstance(e, InternalServerError) + ): + time.sleep((2**num_try) + random.random(0, 1000) / 1000.0) num_try = NUM_RETRIES - 1 continue else: From 1630ec4cc1b115cbce23a4551c6939275dd1121a Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 19 Sep 2024 14:16:10 -0400 Subject: [PATCH 6/7] fixed test --- .../delphi_google_symptoms/pull.py | 36 +++++++++---------- google_symptoms/tests/test_pull.py | 6 ++-- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index b9bec7abb..9f3610086 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -1,5 +1,6 @@ """Retrieve data and wrangle into appropriate format.""" # -*- coding: utf-8 -*- +import random import re import time from datetime import date, datetime # pylint: disable=unused-import @@ -186,25 +187,24 @@ def pull_gs_data_one_geolevel(level, date_range): pd.DataFrame """ query = produce_query(level, date_range) - df = pd.DataFrame() - num_try = 0 + df = None + # recommends to only try once for 500/503 error - while num_try < 1: - try: - df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS) - except Exception as e: - # sometimes google throws out 400 error when it's 500 - # https://github.com/googleapis/python-bigquery/issues/23 - if ( - (isinstance(e, BadRequest) and e.reason == "backendError") - or isinstance(e, ServerError) - or isinstance(e, InternalServerError) - ): - time.sleep((2**num_try) + random.random(0, 1000) / 1000.0) - num_try = NUM_RETRIES - 1 - continue - else: - raise e + try: + df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS) + except Exception as e: + # sometimes google throws out 400 error when it's 500 + # https://github.com/googleapis/python-bigquery/issues/23 + if ( + (isinstance(e, BadRequest) and e.reason == "backendError") + or isinstance(e, ServerError) + or isinstance(e, InternalServerError) + ): + time.sleep(2 + random.randint(0, 1000) / 1000.0) + else: + raise e + if df is None: + df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS) if len(df) == 0: df = pd.DataFrame(columns=["open_covid_region_code", "date"] + list(colname_map.keys())) diff --git a/google_symptoms/tests/test_pull.py b/google_symptoms/tests/test_pull.py index 5f604157b..4367995b8 100644 --- a/google_symptoms/tests/test_pull.py +++ b/google_symptoms/tests/test_pull.py @@ -131,12 +131,12 @@ def test_pull_one_gs_retry_success(self): serverErrorException = ServerError(message="message") with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: - mock_read_gbq.side_effect = [badRequestException, serverErrorException, pd.DataFrame()] + mock_read_gbq.side_effect = [badRequestException, pd.DataFrame()] output = pull_gs_data_one_geolevel("state", ["", ""]) expected = pd.DataFrame(columns=new_keep_cols) assert_frame_equal(output, expected, check_dtype = False) - assert mock_read_gbq.call_count == 3 + assert mock_read_gbq.call_count == 2 def test_pull_one_gs_retry_too_many(self): info = error_details_pb2.ErrorInfo( @@ -146,7 +146,7 @@ def test_pull_one_gs_retry_too_many(self): with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: with pytest.raises(BadRequest): - mock_read_gbq.side_effect = [badRequestException, badRequestException, badRequestException, pd.DataFrame()] + mock_read_gbq.side_effect = [badRequestException, badRequestException, pd.DataFrame()] pull_gs_data_one_geolevel("state", ["", ""]) From 21873cc37d6841830d6cbe462acdcda24caa144f Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Fri, 20 Sep 2024 15:18:39 -0400 Subject: [PATCH 7/7] lint --- google_symptoms/delphi_google_symptoms/pull.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index 9f3610086..a8c4cdfde 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -192,13 +192,14 @@ def pull_gs_data_one_geolevel(level, date_range): # recommends to only try once for 500/503 error try: df = pandas_gbq.read_gbq(query, progress_bar_type=None, dtypes=DTYPE_CONVERSIONS) + # pylint: disable=W0703 except Exception as e: # sometimes google throws out 400 error when it's 500 # https://github.com/googleapis/python-bigquery/issues/23 if ( + # pylint: disable=E1101 (isinstance(e, BadRequest) and e.reason == "backendError") - or isinstance(e, ServerError) - or isinstance(e, InternalServerError) + or isinstance(e, (ServerError, InternalServerError)) ): time.sleep(2 + random.randint(0, 1000) / 1000.0) else: