diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index 3a04fc1267457..42bf51477ab8f 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -96,7 +96,11 @@ def all_additional_request_params(self) -> Mapping[str, Any]: raise NotImplementedError() def make_request( - self, endpoint: str, data: Optional[Mapping[str, object]] = None, method: str = "POST" + self, + endpoint: str, + data: Optional[Mapping[str, object]] = None, + method: str = "POST", + include_additional_request_params: bool = True, ) -> Optional[Mapping[str, object]]: """Creates and sends a request to the desired Airbyte REST API endpoint. @@ -122,10 +126,11 @@ def make_request( if data: request_args["json"] = data - request_args = deep_merge_dicts( - request_args, - self.all_additional_request_params, - ) + if include_additional_request_params: + request_args = deep_merge_dicts( + request_args, + self.all_additional_request_params, + ) response = requests.request( **request_args, @@ -275,8 +280,8 @@ class AirbyteCloudResource(BaseAirbyteResource): client_id: str = Field(..., description="The Airbyte Cloud client ID.") client_secret: str = Field(..., description="The Airbyte Cloud client secret.") - _access_token_value: str = PrivateAttr() - _access_token_timestamp: float = PrivateAttr() + _access_token_value: Optional[str] = PrivateAttr(default=None) + _access_token_timestamp: Optional[float] = PrivateAttr(default=None) def setup_for_execution(self, context: InitResourceContext) -> None: # Refresh access token when the resource is initialized @@ -288,7 +293,7 @@ def api_base_url(self) -> str: @property def all_additional_request_params(self) -> Mapping[str, Any]: - # Make sure the access token is refreshed before using it. + # Make sure the access token is refreshed before using it when calling the API. if self._needs_refreshed_access_token(): self._refresh_access_token() return { @@ -333,6 +338,8 @@ def _refresh_access_token(self) -> None: "client_id": self.client_id, "client_secret": self.client_secret, }, + # Must not pass the bearer access token when refreshing it. + include_additional_request_params=False, ) ) self._access_token_value = str(response["access_token"]) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py index 87aef59373a55..aa4513d6a4fb7 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py @@ -208,7 +208,9 @@ def test_assets_with_normalization( def test_assets_cloud() -> None: - ab_resource = AirbyteCloudResource(api_key="some_key", poll_interval=0) + ab_resource = AirbyteCloudResource( + client_id="some_client_id", client_secret="some_client_secret", poll_interval=0 + ) ab_url = ab_resource.api_base_url ab_assets = build_airbyte_assets( @@ -236,6 +238,11 @@ def test_assets_cloud() -> None: f"{ab_url}/jobs/1", json={"jobId": 1, "status": "succeeded", "jobType": "sync"}, ) + rsps.add( + rsps.POST, + f"{ab_url}/applications/token", + json={"access_token": "some_access_token"}, + ) res = materialize_to_memory( ab_assets,