From 683366e2a5dadd0ec1622a42c3175392dfdfc4fe Mon Sep 17 00:00:00 2001 From: Anupam Saini <4581797+anupam-saini@users.noreply.github.com> Date: Tue, 30 Jan 2024 23:09:33 -0500 Subject: [PATCH 1/5] Refresh Auth token on expiry --- pyiceberg/catalog/rest.py | 27 +++++++++++++++++++++++++++ pyproject.toml | 5 +++++ tests/catalog/test_rest.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index 765f04b128..df255017e0 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -30,6 +30,7 @@ from pydantic import Field, ValidationError from requests import HTTPError, Session +from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt from pyiceberg import __version__ from pyiceberg.catalog import ( @@ -211,6 +212,11 @@ def __init__(self, name: str, **properties: str): self._fetch_config() self._session = self._create_session() + @staticmethod + def _retry_hook(retry_state: RetryCallState) -> None: + rest_catalog: RestCatalog = retry_state.args[0] + rest_catalog._refresh_token() # pylint: disable=protected-access + def _create_session(self) -> Session: """Create a request session with provided catalog configuration.""" session = Session() @@ -438,6 +444,16 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: catalog=self, ) + def _refresh_token(self) -> None: + session: Session = self._session + # If we have credentials, fetch a new token + if CREDENTIAL in self.properties: + self.properties[TOKEN] = self._fetch_access_token(session, self.properties[CREDENTIAL]) + # Set Auth token for subsequent calls in the session + if token := self.properties.get(TOKEN): + session.headers[AUTHORIZATION_HEADER] = f"{BEARER_PREFIX} {token}" + + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def create_table( self, identifier: Union[str, Identifier], @@ -472,6 +488,7 @@ def create_table( table_response = TableResponse(**response.json()) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -503,6 +520,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: table_response = TableResponse(**response.json()) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) @@ -513,6 +531,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: self._handle_non_200_response(exc, {404: NoSuchNamespaceError}) return [(*table.namespace, table.name) for table in ListTablesResponse(**response.json()).identifiers] + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def load_table(self, identifier: Union[str, Identifier]) -> Table: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) response = self._session.get( @@ -526,6 +545,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: table_response = TableResponse(**response.json()) return self._response_to_table(identifier_tuple, table_response) + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) response = self._session.delete( @@ -538,9 +558,11 @@ def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = except HTTPError as exc: self._handle_non_200_response(exc, {404: NoSuchTableError}) + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def purge_table(self, identifier: Union[str, Identifier]) -> None: self.drop_table(identifier=identifier, purge_requested=True) + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier) payload = { @@ -585,6 +607,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) return CommitTableResponse(**response.json()) + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) payload = {"namespace": namespace_tuple, "properties": properties} @@ -594,6 +617,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper except HTTPError as exc: self._handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceAlreadyExistsError}) + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def drop_namespace(self, namespace: Union[str, Identifier]) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) @@ -603,6 +627,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: except HTTPError as exc: self._handle_non_200_response(exc, {404: NoSuchNamespaceError}) + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: namespace_tuple = self.identifier_to_tuple(namespace) response = self._session.get( @@ -620,6 +645,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi namespaces = ListNamespaceResponse(**response.json()) return [namespace_tuple + child_namespace for child_namespace in namespaces.namespaces] + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) @@ -631,6 +657,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper return NamespaceResponse(**response.json()).properties + @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: diff --git a/pyproject.toml b/pyproject.toml index f823155382..a0daa03abb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ sortedcontainers = "2.4.0" fsspec = ">=2023.1.0,<2024.1.0" pyparsing = ">=3.1.0,<4.0.0" zstandard = ">=0.13.0,<1.0.0" +tenacity = ">=8.2.3,<9.0.0" pyarrow = { version = ">=9.0.0,<16.0.0", optional = true } pandas = { version = ">=1.0.0,<3.0.0", optional = true } duckdb = { version = ">=0.5.0,<1.0.0", optional = true } @@ -295,6 +296,10 @@ ignore_missing_imports = true module = "setuptools.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "tenacity.*" +ignore_missing_imports = true + [tool.coverage.run] source = ['pyiceberg/'] diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 248cc14d88..76865228ca 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -26,6 +26,7 @@ from pyiceberg.catalog import PropertiesUpdateSummary, Table, load_catalog from pyiceberg.catalog.rest import AUTH_URL, RestCatalog from pyiceberg.exceptions import ( + AuthorizationExpiredError, NamespaceAlreadyExistsError, NoSuchNamespaceError, NoSuchTableError, @@ -517,6 +518,34 @@ def test_create_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> Non assert "Table already exists" in str(e.value) +def test_create_table_419(rest_mock: Mocker, table_schema_simple: Schema) -> None: + rest_mock.post( + f"{TEST_URI}v1/namespaces/fokko/tables", + json={ + "error": { + "message": "Authorization expired.", + "type": "AuthorizationExpiredError", + "code": 419, + } + }, + status_code=419, + request_headers=TEST_HEADERS, + ) + + with pytest.raises(AuthorizationExpiredError) as e: + RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_table( + identifier=("fokko", "fokko2"), + schema=table_schema_simple, + location=None, + partition_spec=PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id") + ), + sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())), + properties={"owner": "fokko"}, + ) + assert "Authorization expired" in str(e.value) + + def test_register_table_200( rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any] ) -> None: From e5c6ede3d3adeefbf380c3ad4658f8c36fcfed41 Mon Sep 17 00:00:00 2001 From: Anupam Saini <4581797+anupam-saini@users.noreply.github.com> Date: Tue, 30 Jan 2024 23:14:01 -0500 Subject: [PATCH 2/5] Check call count --- tests/catalog/test_rest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 76865228ca..dffe140383 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -544,6 +544,7 @@ def test_create_table_419(rest_mock: Mocker, table_schema_simple: Schema) -> Non properties={"owner": "fokko"}, ) assert "Authorization expired" in str(e.value) + assert rest_mock.call_count == 3 def test_register_table_200( From b0b65446237046417ef7e87493a39a073c2aba68 Mon Sep 17 00:00:00 2001 From: Anupam Saini <4581797+anupam-saini@users.noreply.github.com> Date: Wed, 31 Jan 2024 10:37:39 -0500 Subject: [PATCH 3/5] Add test to cover retry logic --- tests/catalog/test_rest.py | 42 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index dffe140383..7ae0d19558 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -267,6 +267,48 @@ def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None: ] +def test_list_namespaces_419(rest_mock: Mocker) -> None: + new_token = "new_jwt_token" + new_header = dict(TEST_HEADERS) + new_header["Authorization"] = f"Bearer {new_token}" + + rest_mock.post( + f"{TEST_URI}v1/namespaces", + json={ + "error": { + "message": "Authorization expired.", + "type": "AuthorizationExpiredError", + "code": 419, + } + }, + status_code=419, + request_headers=TEST_HEADERS, + ) + rest_mock.post( + f"{TEST_URI}v1/oauth/tokens", + json={ + "access_token": new_token, + "token_type": "Bearer", + "expires_in": 86400, + "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", + }, + status_code=200, + ) + rest_mock.get( + f"{TEST_URI}v1/namespaces", + json={"namespaces": [["default"], ["examples"], ["fokko"], ["system"]]}, + status_code=200, + request_headers=new_header, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, credential=TEST_CREDENTIALS) + assert catalog.list_namespaces() == [ + ("default",), + ("examples",), + ("fokko",), + ("system",), + ] + + def test_create_namespace_200(rest_mock: Mocker) -> None: namespace = "leden" rest_mock.post( From 29c2632642783a9e3ca759e49a759a1405446d54 Mon Sep 17 00:00:00 2001 From: Anupam Saini <4581797+anupam-saini@users.noreply.github.com> Date: Wed, 31 Jan 2024 16:38:20 +0000 Subject: [PATCH 4/5] Update poetry.lock with tenacity --- poetry.lock | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/poetry.lock b/poetry.lock index f8d7a138e3..13e590ea6b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -48,7 +48,7 @@ boto3 = ["boto3 (>=1.33.2,<1.34.28)"] name = "aiohttp" version = "3.9.2" description = "Async http client/server framework (asyncio)" -optional = false +optional = true python-versions = ">=3.8" files = [ {file = "aiohttp-3.9.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:772fbe371788e61c58d6d3d904268e48a594ba866804d08c995ad71b144f94cb"}, @@ -158,7 +158,7 @@ typing_extensions = {version = ">=4.0", markers = "python_version < \"3.10\""} name = "aiosignal" version = "1.3.1" description = "aiosignal: a list of registered asynchronous callbacks" -optional = false +optional = true python-versions = ">=3.7" files = [ {file = "aiosignal-1.3.1-py3-none-any.whl", hash = "sha256:f8376fb07dd1e86a584e4fcdec80b36b7f81aac666ebc724e2c090300dd83b17"}, @@ -186,7 +186,7 @@ typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.9\""} name = "async-timeout" version = "4.0.3" description = "Timeout context manager for asyncio programs" -optional = false +optional = true python-versions = ">=3.7" files = [ {file = "async-timeout-4.0.3.tar.gz", hash = "sha256:4640d96be84d82d02ed59ea2b7105a0f7b33abe8703703cd0ab0bf87c427522f"}, @@ -1064,7 +1064,7 @@ Flask = ">=0.9" name = "frozenlist" version = "1.4.1" description = "A list-like structure which implements collections.abc.MutableSequence" -optional = false +optional = true python-versions = ">=3.8" files = [ {file = "frozenlist-1.4.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:f9aa1878d1083b276b0196f2dfbe00c9b7e752475ed3b682025ff20c1c1f51ac"}, @@ -2149,7 +2149,7 @@ files = [ name = "multidict" version = "6.0.4" description = "multidict implementation" -optional = false +optional = true python-versions = ">=3.7" files = [ {file = "multidict-6.0.4-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:0b1a97283e0c85772d613878028fec909f003993e1007eafa715b24b377cb9b8"}, @@ -3884,6 +3884,20 @@ files = [ [package.dependencies] mpmath = ">=0.19" +[[package]] +name = "tenacity" +version = "8.2.3" +description = "Retry code until it succeeds" +optional = false +python-versions = ">=3.7" +files = [ + {file = "tenacity-8.2.3-py3-none-any.whl", hash = "sha256:ce510e327a630c9e1beaf17d42e6ffacc88185044ad85cf74c0a8887c6a0f88c"}, + {file = "tenacity-8.2.3.tar.gz", hash = "sha256:5398ef0d78e63f40007c1fb4c0bff96e1911394d2fa8d194f77619c05ff6cc8a"}, +] + +[package.extras] +doc = ["reno", "sphinx", "tornado (>=4.5)"] + [[package]] name = "thrift" version = "0.16.0" @@ -4099,7 +4113,7 @@ files = [ name = "yarl" version = "1.9.4" description = "Yet another URL library" -optional = false +optional = true python-versions = ">=3.7" files = [ {file = "yarl-1.9.4-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:a8c1df72eb746f4136fe9a2e72b0c9dc1da1cbd23b5372f94b5820ff8ae30e0e"}, @@ -4293,4 +4307,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "5becf154a072fa31d9cefaee2adaacca1e2315579fd821a4567f41b075ed3b1d" +content-hash = "ae943269ea4f813548d2379b34bd8ca54d5f9807b32eb47cf131be89b0dd4229" From 4daac0d443129651303659b25718dbdcfa035f5b Mon Sep 17 00:00:00 2001 From: Anupam Saini <4581797+anupam-saini@users.noreply.github.com> Date: Mon, 5 Feb 2024 13:05:52 +0000 Subject: [PATCH 5/5] Fix tests for Python <= 3.9 --- pyiceberg/catalog/rest.py | 61 +++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index df255017e0..6a75328cae 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -119,6 +119,19 @@ class Endpoints: NAMESPACE_SEPARATOR = b"\x1F".decode(UTF8) +def _retry_hook(retry_state: RetryCallState) -> None: + rest_catalog: RestCatalog = retry_state.args[0] + rest_catalog._refresh_token() # pylint: disable=protected-access + + +_RETRY_ARGS = { + "retry": retry_if_exception_type(AuthorizationExpiredError), + "stop": stop_after_attempt(2), + "before": _retry_hook, + "reraise": True, +} + + class TableResponse(IcebergBaseModel): metadata_location: str = Field(alias="metadata-location") metadata: TableMetadata @@ -212,11 +225,6 @@ def __init__(self, name: str, **properties: str): self._fetch_config() self._session = self._create_session() - @staticmethod - def _retry_hook(retry_state: RetryCallState) -> None: - rest_catalog: RestCatalog = retry_state.args[0] - rest_catalog._refresh_token() # pylint: disable=protected-access - def _create_session(self) -> Session: """Create a request session with provided catalog configuration.""" session = Session() @@ -231,13 +239,7 @@ def _create_session(self) -> Session: elif ssl_client_cert := ssl_client.get(CERT): session.cert = ssl_client_cert - # If we have credentials, but not a token, we want to fetch a token - if TOKEN not in self.properties and CREDENTIAL in self.properties: - self.properties[TOKEN] = self._fetch_access_token(session, self.properties[CREDENTIAL]) - - # Set Auth token for subsequent calls in the session - if token := self.properties.get(TOKEN): - session.headers[AUTHORIZATION_HEADER] = f"{BEARER_PREFIX} {token}" + self._refresh_token(session, self.properties.get(TOKEN)) # Set HTTP headers session.headers["Content-type"] = "application/json" @@ -444,16 +446,18 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: catalog=self, ) - def _refresh_token(self) -> None: - session: Session = self._session - # If we have credentials, fetch a new token - if CREDENTIAL in self.properties: + def _refresh_token(self, session: Optional[Session] = None, new_token: Optional[str] = None) -> None: + session = session or self._session + if new_token is not None: + self.properties[TOKEN] = new_token + elif CREDENTIAL in self.properties: self.properties[TOKEN] = self._fetch_access_token(session, self.properties[CREDENTIAL]) + # Set Auth token for subsequent calls in the session if token := self.properties.get(TOKEN): session.headers[AUTHORIZATION_HEADER] = f"{BEARER_PREFIX} {token}" - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def create_table( self, identifier: Union[str, Identifier], @@ -488,7 +492,7 @@ def create_table( table_response = TableResponse(**response.json()) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -520,7 +524,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: table_response = TableResponse(**response.json()) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) @@ -531,7 +535,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: self._handle_non_200_response(exc, {404: NoSuchNamespaceError}) return [(*table.namespace, table.name) for table in ListTablesResponse(**response.json()).identifiers] - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def load_table(self, identifier: Union[str, Identifier]) -> Table: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) response = self._session.get( @@ -545,7 +549,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: table_response = TableResponse(**response.json()) return self._response_to_table(identifier_tuple, table_response) - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) response = self._session.delete( @@ -558,11 +562,11 @@ def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = except HTTPError as exc: self._handle_non_200_response(exc, {404: NoSuchTableError}) - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def purge_table(self, identifier: Union[str, Identifier]) -> None: self.drop_table(identifier=identifier, purge_requested=True) - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier) payload = { @@ -577,6 +581,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U return self.load_table(to_identifier) + @retry(**_RETRY_ARGS) def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update the table. @@ -607,7 +612,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) return CommitTableResponse(**response.json()) - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) payload = {"namespace": namespace_tuple, "properties": properties} @@ -617,7 +622,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper except HTTPError as exc: self._handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceAlreadyExistsError}) - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def drop_namespace(self, namespace: Union[str, Identifier]) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) @@ -627,7 +632,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: except HTTPError as exc: self._handle_non_200_response(exc, {404: NoSuchNamespaceError}) - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: namespace_tuple = self.identifier_to_tuple(namespace) response = self._session.get( @@ -645,7 +650,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi namespaces = ListNamespaceResponse(**response.json()) return [namespace_tuple + child_namespace for child_namespace in namespaces.namespaces] - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) @@ -657,7 +662,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper return NamespaceResponse(**response.json()).properties - @retry(retry=retry_if_exception_type(AuthorizationExpiredError), stop=stop_after_attempt(2), before=_retry_hook, reraise=True) + @retry(**_RETRY_ARGS) def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: