From 9c22ef4384082808bc9cc3f6d3c59c3bf0a3554d Mon Sep 17 00:00:00 2001 From: Maxime Armstrong <46797220+maximearmstrong@users.noreply.github.com> Date: Thu, 26 Dec 2024 10:44:37 -0500 Subject: [PATCH] [10/n][dagster-airbyte] Implement get_job_details method in AirbyteCloudClient (#26429) ## Summary & Motivation Implement endpoint to fetch job details in Airbyte Cloud. To be used in subsequent PRs in sync and poll process. ## How I Tested These Changes Additional tests with BK --- .../dagster-airbyte/dagster_airbyte/resources.py | 5 +++++ .../dagster_airbyte_tests/experimental/conftest.py | 9 ++++++++- .../dagster_airbyte_tests/experimental/test_resources.py | 5 ++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index 2be0677807f1b..02f483833aeb1 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -1003,6 +1003,11 @@ def start_sync_job(self, connection_id: str) -> Mapping[str, Any]: }, ) + def get_job_details(self, job_id: int) -> Mapping[str, Any]: + return self._make_request( + method="GET", endpoint=f"jobs/{job_id}", base_url=self.rest_api_base_url + ) + @experimental class AirbyteCloudWorkspace(ConfigurableResource): diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index 67d366c7c5717..67eb7d7fe2700 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -27,6 +27,7 @@ TEST_STREAM_NAME = "test_stream" TEST_SELECTED = True TEST_JSON_SCHEMA = {} +TEST_JOB_ID = 12345 TEST_AIRBYTE_CONNECTION_TABLE_PROPS = AirbyteConnectionTableProps( table_name=f"{TEST_STREAM_PREFIX}{TEST_STREAM_NAME}", @@ -165,7 +166,7 @@ # Taken from Airbyte REST API documentation # https://reference.airbyte.com/reference/getjob SAMPLE_JOB_RESPONSE = { - "jobId": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "jobId": TEST_JOB_ID, "status": "running", "jobType": "sync", "startTime": "2023-03-25T01:30:50Z", @@ -226,4 +227,10 @@ def all_api_mocks_fixture( json=SAMPLE_JOB_RESPONSE, status=200, ) + fetch_workspace_data_api_mocks.add( + method=responses.GET, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", + json=SAMPLE_JOB_RESPONSE, + status=200, + ) yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py index 144bad731c115..b051434862885 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py @@ -18,6 +18,7 @@ TEST_CLIENT_SECRET, TEST_CONNECTION_ID, TEST_DESTINATION_ID, + TEST_JOB_ID, TEST_WORKSPACE_ID, ) @@ -128,8 +129,9 @@ def test_basic_resource_request( client.get_connection_details(connection_id=TEST_CONNECTION_ID) client.get_destination_details(destination_id=TEST_DESTINATION_ID) client.start_sync_job(connection_id=TEST_CONNECTION_ID) + client.get_job_details(job_id=TEST_JOB_ID) - assert len(all_api_mocks.calls) == 5 + assert len(all_api_mocks.calls) == 6 # The first call is to create the access token api_calls = assert_token_call_and_split_calls(calls=all_api_mocks.calls) # The next calls are actual API calls @@ -139,3 +141,4 @@ def test_basic_resource_request( ) assert_rest_api_call(call=api_calls[2], endpoint=f"destinations/{TEST_DESTINATION_ID}") assert_rest_api_call(call=api_calls[3], endpoint="jobs", object_id=TEST_CONNECTION_ID) + assert_rest_api_call(call=api_calls[4], endpoint=f"jobs/{TEST_JOB_ID}")