Skip to content

Commit

Permalink
[10/n][dagster-airbyte] Implement get_job_details method in AirbyteCl…
Browse files Browse the repository at this point in the history
…oudClient (#26429)

## Summary & Motivation

Implement endpoint to fetch job details in Airbyte Cloud. To be used in
subsequent PRs in sync and poll process.

## How I Tested These Changes

Additional tests with BK
  • Loading branch information
maximearmstrong authored Dec 26, 2024
1 parent 9468df4 commit 9c22ef4
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,11 @@ def start_sync_job(self, connection_id: str) -> Mapping[str, Any]:
},
)

def get_job_details(self, job_id: int) -> Mapping[str, Any]:
return self._make_request(
method="GET", endpoint=f"jobs/{job_id}", base_url=self.rest_api_base_url
)


@experimental
class AirbyteCloudWorkspace(ConfigurableResource):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
TEST_STREAM_NAME = "test_stream"
TEST_SELECTED = True
TEST_JSON_SCHEMA = {}
TEST_JOB_ID = 12345

TEST_AIRBYTE_CONNECTION_TABLE_PROPS = AirbyteConnectionTableProps(
table_name=f"{TEST_STREAM_PREFIX}{TEST_STREAM_NAME}",
Expand Down Expand Up @@ -165,7 +166,7 @@
# Taken from Airbyte REST API documentation
# https://reference.airbyte.com/reference/getjob
SAMPLE_JOB_RESPONSE = {
"jobId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"jobId": TEST_JOB_ID,
"status": "running",
"jobType": "sync",
"startTime": "2023-03-25T01:30:50Z",
Expand Down Expand Up @@ -226,4 +227,10 @@ def all_api_mocks_fixture(
json=SAMPLE_JOB_RESPONSE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}",
json=SAMPLE_JOB_RESPONSE,
status=200,
)
yield fetch_workspace_data_api_mocks
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
TEST_CLIENT_SECRET,
TEST_CONNECTION_ID,
TEST_DESTINATION_ID,
TEST_JOB_ID,
TEST_WORKSPACE_ID,
)

Expand Down Expand Up @@ -128,8 +129,9 @@ def test_basic_resource_request(
client.get_connection_details(connection_id=TEST_CONNECTION_ID)
client.get_destination_details(destination_id=TEST_DESTINATION_ID)
client.start_sync_job(connection_id=TEST_CONNECTION_ID)
client.get_job_details(job_id=TEST_JOB_ID)

assert len(all_api_mocks.calls) == 5
assert len(all_api_mocks.calls) == 6
# The first call is to create the access token
api_calls = assert_token_call_and_split_calls(calls=all_api_mocks.calls)
# The next calls are actual API calls
Expand All @@ -139,3 +141,4 @@ def test_basic_resource_request(
)
assert_rest_api_call(call=api_calls[2], endpoint=f"destinations/{TEST_DESTINATION_ID}")
assert_rest_api_call(call=api_calls[3], endpoint="jobs", object_id=TEST_CONNECTION_ID)
assert_rest_api_call(call=api_calls[4], endpoint=f"jobs/{TEST_JOB_ID}")

0 comments on commit 9c22ef4

Please sign in to comment.