Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datasets): add pandas.DeltaSharingDataset #832

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ kedro_datasets_experimental
prophet.ProphetModelDataset
pytorch.PyTorchDataset
rioxarray.GeoTIFFDataset
pandas.DeltaSharingDataset
14 changes: 14 additions & 0 deletions kedro-datasets/kedro_datasets_experimental/pandas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""``AbstractDataset`` implementations that produce pandas DataFrames."""

from typing import Any

import lazy_loader as lazy

DeltaSharingDataset: Any

__getattr__, __dir__, __all__ = lazy.attach(
__name__,
submod_attrs={
"delta_sharing_dataset": ["DeltaSharingDataset"],
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

from typing import Any

import delta_sharing
import pandas as pd
from kedro.io.core import AbstractDataset, DatasetError

from kedro_datasets._typing import TablePreview


class DeltaSharingDataset(AbstractDataset):
"""``DeltaSharingDataset`` loads data from a Delta Sharing shared table using the Delta Sharing open protocol.
This dataset handles loading data into a Pandas DataFrame. Saving to Delta Sharing is not supported.

Delta Sharing is an open protocol for secure real-time exchange of large datasets, which enables
organizations to share data in real time regardless of which computing platforms they use. It is a
simple REST protocol that securely shares access to part of a cloud dataset and leverages modern cloud
storage systems, such as S3, ADLS, or GCS, to reliably transfer data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
storage systems, such as S3, ADLS, or GCS, to reliably transfer data.
storage systems, such as S3, ADLS, or GCS, to reliably transfer data. More information about the Delta Sharing project can be found in [Delta Sharing](https://github.com/delta-io/delta-sharing)


Example usage for the YAML API:

.. code-block:: yaml

my_delta_sharing_dataset:
type: pandas.DeltaSharingDataset
share: <share-name>
schema: <schema-name>
table: <table-name>
credentials:
profile_file: <profile-file-path>
load_args:
version: <version>
limit: <limit>
use_delta_format: <use_delta_format>

Example usage for the Python API:

.. code-block:: pycon

>>> from kedro_datasets_experimental.pandas import DeltaSharingDataset
>>>
>>> credentials = {"profile_file": "conf/local/config.share"}
>>> load_args = {"version": 1, "limit": 10, "use_delta_format": True}
>>> dataset = DeltaSharingDataset(
... share="example_share",
... schema="example_schema",
... table="example_table",
... credentials=credentials,
... load_args=load_args,
... )
>>> data = dataset.load()
>>> print(data)
Comment on lines +37 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Example usage for the Python API:
.. code-block:: pycon
>>> from kedro_datasets_experimental.pandas import DeltaSharingDataset
>>>
>>> credentials = {"profile_file": "conf/local/config.share"}
>>> load_args = {"version": 1, "limit": 10, "use_delta_format": True}
>>> dataset = DeltaSharingDataset(
... share="example_share",
... schema="example_schema",
... table="example_table",
... credentials=credentials,
... load_args=load_args,
... )
>>> data = dataset.load()
>>> print(data)
Example usage for the Python API:
To start quickly, you can try this dataset with the [hosted service](https://github.com/delta-io/delta-sharing?tab=readme-ov-file#accessing-shared-data) for sample dataset. You will need to download the credentials with this [link](https://databricks-datasets-oregon.s3-us-west-2.amazonaws.com/delta-sharing/share/open-datasets.share).
.. code-block:: pycon
>>> from kedro_datasets_experimental.pandas import DeltaSharingDataset
>>>
>>> credentials = {"profile_file": "conf/local/config.share"}
>>> load_args = {"version": 1, "limit": 10, "use_delta_format": True}
>>> dataset = DeltaSharingDataset(
... share="example_share",
... schema="example_schema",
... table="example_table",
... credentials=credentials,
... load_args=load_args,
... )
>>> data = dataset.load()
>>> print(data)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe

kedro_datasets_experimental.delta_sharing import PandasDeltaSharingDataset, SparkDeltaSharingDataset

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would actually expect this to be just a type for a generic delta sharing dataset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from pyspark.sql import SparkSession

spark = SparkSession.builder.config(
"spark.jars.packages", "io.delta:delta-sharing-spark_2.12:3.1.0"
).getOrCreate()

I try to setup the spark with delta-sharing support, and it seems to work quite well by just changing one line in the _load method with load_as_spark instead of load_as_pandas.

"""

DEFAULT_LOAD_ARGS: dict[str, Any] = {
"version": None, # Load the latest version by default
"limit": None, # No limit by default
"use_delta_format": False # Default to not using Delta format
}

def __init__( # noqa: PLR0913
self,
*,
share: str,
schema: str,
table: str,
credentials: dict[str, Any],
load_args: dict[str, Any] | None = None
) -> None:
"""Creates a new instance of ``DeltaSharingDataset``.

Args:
share (str): Share name in Delta Sharing.
schema (str): Schema name in Delta Sharing.
table (str): Table name in Delta Sharing.
credentials (dict[str, Any]): Credentials for accessing the Delta Sharing profile. Must include:
- `profile_file` (str): Path to the Delta Sharing profile file.
load_args (dict[str, Any], optional): Additional options for loading data.
- `version` (int, optional): A non-negative integer specifying the version of the table snapshot to load.
Defaults to None, which loads the latest version. This parameter allows you to access historical versions of the shared table.
- `limit` (int, optional): A non-negative integer specifying the maximum number of rows to load.
- `use_delta_format` (bool, optional): Whether to use the Delta format for loading data. Defaults to False.

Raises:
DatasetError: If the profile file is not specified in credentials.
"""
self.share = share
self.schema = schema
self.table = table
self.profile_file = credentials.get("profile_file")

# Merge the provided load_args with DEFAULT_LOAD_ARGS
self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
self.version = self._load_args["version"]
self.limit = self._load_args["limit"]
self.use_delta_format = self._load_args["use_delta_format"]

if not self.profile_file:
raise DatasetError("Profile file must be specified in credentials.")

self.table_path = f"{self.share}.{self.schema}.{self.table}"
self.table_url = f"{self.profile_file}#{self.table_path}"

def _load(self) -> pd.DataFrame:
"""Load data from the Delta Sharing shared table into a Pandas DataFrame.

Returns:
pd.DataFrame: Loaded data.
"""
return delta_sharing.load_as_pandas(
self.table_url,
version=self.version,
limit=self.limit,
use_delta_format=self.use_delta_format
)

def _save(self, data: pd.DataFrame) -> None:
"""Saving to Delta Sharing shared tables is not supported.

Args:
data (pd.DataFrame): Data to save.

Raises:
DatasetError: Saving to Delta Sharing shared tables is not supported.
"""
raise DatasetError("Saving to Delta Sharing shared tables is not supported.")

def preview(self, nrows: int = 5) -> TablePreview:
"""Generate a preview of the dataset with a specified number of rows.

Args:
nrows (int, optional): The number of rows to include in the preview. Defaults to 5.

Returns:
TablePreview: A dictionary containing the data in a split format.
"""
dataset_copy = self._copy()
dataset_copy.limit = nrows
data = dataset_copy._load()
return data.to_dict(orient="split")

def _describe(self) -> dict[str, Any]:
"""Describe the dataset configuration.

Returns:
dict[str, Any]: Dataset configuration.
"""
return {
"share": self.share,
"schema": self.schema,
"table": self.table,
"profile_file": self.profile_file,
"version": self.version,
"limit": self.limit,
"use_delta_format": self.use_delta_format
}
2 changes: 2 additions & 0 deletions kedro-datasets/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ pytorch = ["kedro-datasets[pytorch-dataset]"]
rioxarray-geotiffdataset = ["rioxarray>=0.15.0"]
rioxarray = ["kedro-datasets[rioxarray-geotiffdataset]"]

delta-sharing = ["kedro-datasets[delta-sharing-dataset]"]
# Docs requirements
docs = [
"kedro-sphinx-theme==2024.4.0",
Expand Down Expand Up @@ -294,6 +295,7 @@ experimental = [
"rioxarray",
"torch",
"prophet>=1.1.5",
"delta-sharing"
]

# All requirements
Expand Down
Loading