Skip to content

Commit

Permalink
[1/n][dagster-airbyte] Scaffold AirbyteCloudWorkspace and AirbyteClie…
Browse files Browse the repository at this point in the history
…nt for rework (#26204)

## Summary & Motivation

Builds out a very barebones resource and client classes for the new
version of the Airbyte cloud integration.

## How I Tested These Changes

Tests will be added in subsequent PRs.
  • Loading branch information
maximearmstrong authored Dec 5, 2024
1 parent 11b0bff commit a0f9496
Showing 1 changed file with 115 additions and 0 deletions.
115 changes: 115 additions & 0 deletions python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
get_dagster_logger,
resource,
)
from dagster._annotations import experimental
from dagster._config.pythonic_config import infer_schema_from_config_class
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._utils.merger import deep_merge_dicts
from pydantic import Field, PrivateAttr
Expand Down Expand Up @@ -791,3 +794,115 @@ def airbyte_cloud_resource(context) -> AirbyteCloudResource:
"""
return AirbyteCloudResource.from_resource_context(context)


# -------------
# Resources v2
# -------------


@whitelist_for_serdes
@record
class AirbyteConnection:
"""Represents an Airbyte connection, based on data as returned from the API."""

@classmethod
def from_connection_details(
cls,
connection_details: Mapping[str, Any],
) -> "AirbyteConnection":
raise NotImplementedError()


@whitelist_for_serdes
@record
class AirbyteDestination:
"""Represents an Airbyte destination, based on data as returned from the API."""

@classmethod
def from_destination_details(
cls,
destination_details: Mapping[str, Any],
) -> "AirbyteDestination":
raise NotImplementedError()


@record
class AirbyteWorkspaceData:
"""A record representing all content in an Airbyte workspace.
This applies to both Airbyte OSS and Cloud.
"""

connections_by_id: Mapping[str, AirbyteConnection]
destinations_by_id: Mapping[str, AirbyteDestination]


@experimental
class AirbyteCloudClient:
"""This class exposes methods on top of the Airbyte APIs for Airbyte Cloud."""

_access_token_value: Optional[str] = PrivateAttr(default=None)
_access_token_timestamp: Optional[float] = PrivateAttr(default=None)

def __init__(
self,
workspace_id: str,
client_id: str,
client_secret: str,
):
self.workspace_id = workspace_id
self.client_id = client_id
self.client_secret = client_secret

@property
@cached_method
def _log(self) -> logging.Logger:
return get_dagster_logger()

@property
def api_base_url(self) -> str:
raise NotImplementedError()

def _make_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()

def get_connections(self) -> Mapping[str, Any]:
"""Fetches all connections of an Airbyte workspace from the Airbyte API."""
raise NotImplementedError()

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Airbyte API."""
raise NotImplementedError()


@experimental
class AirbyteCloudWorkspace(ConfigurableResource):
"""This class represents a Airbyte Cloud workspace and provides utilities
to interact with Airbyte APIs.
"""

workspace_id: str = Field(..., description="The Airbyte Cloud workspace ID")
client_id: str = Field(..., description="The Airbyte Cloud client ID.")
client_secret: str = Field(..., description="The Airbyte Cloud client secret.")

_client: AirbyteCloudClient = PrivateAttr(default=None)

@cached_method
def get_client(self) -> AirbyteCloudClient:
return AirbyteCloudClient(
workspace_id=self.workspace_id,
client_id=self.client_id,
client_secret=self.client_secret,
)

def fetch_airbyte_workspace_data(
self,
) -> AirbyteWorkspaceData:
"""Retrieves all Airbyte content from the workspace and returns it as a AirbyteWorkspaceData object.
Returns:
AirbyteWorkspaceData: A snapshot of the Airbyte workspace's content.
"""
raise NotImplementedError()

0 comments on commit a0f9496

Please sign in to comment.