Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datasets): add pandas.DeltaSharingDataset #832

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
14 changes: 14 additions & 0 deletions kedro-datasets/kedro_datasets_experimental/pandas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""``AbstractDataset`` implementations that produce pandas DataFrames."""

from typing import Any

import lazy_loader as lazy

DeltaSharingDataset: Any

__getattr__, __dir__, __all__ = lazy.attach(
__name__,
submod_attrs={
"delta_sharing_dataset": ["DeltaSharingDataset"],
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from __future__ import annotations

from typing import Any

import delta_sharing
import pandas as pd
from kedro.io.core import AbstractDataset, DatasetError

from kedro_datasets._typing import TablePreview


class DeltaSharingDataset(AbstractDataset):
"""``DeltaSharingDataset`` loads data from a Delta Sharing shared table using the Delta Sharing open protocol.
This dataset handles loading data into a Pandas DataFrame. Saving to Delta Sharing is not supported.

Delta Sharing is an open protocol for secure real-time exchange of large datasets, which enables
organizations to share data in real time regardless of which computing platforms they use. It is a
simple REST protocol that securely shares access to part of a cloud dataset and leverages modern cloud
storage systems, such as S3, ADLS, or GCS, to reliably transfer data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
storage systems, such as S3, ADLS, or GCS, to reliably transfer data.
storage systems, such as S3, ADLS, or GCS, to reliably transfer data. More information about the Delta Sharing project can be found in [Delta Sharing](https://github.com/delta-io/delta-sharing)


Example usage for the YAML API:

.. code-block:: yaml

my_delta_sharing_dataset:
type: pandas.DeltaSharingDataset
share: <share-name>
schema: <schema-name>
table: <table-name>
credentials:
profile_file: <profile-file-path>
load_args:
version: <version>
limit: <limit>
use_delta_format: <use_delta_format>

Example usage for the Python API:

.. code-block:: pycon

>>> from kedro_datasets import DeltaSharingDataset
>>> import pandas as pd
>>>
>>> credentials = {
... "profile_file": "conf/local/config.share"
... }
>>> load_args = {
... "version": 1,
... "limit": 10,
... "use_delta_format": True
... }
>>> dataset = DeltaSharingDataset(
... share="example_share",
... schema="example_schema",
... table="example_table",
... credentials=credentials,
... load_args=load_args
... )
>>> data = dataset.load()
>>> print(data)
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to create an example that can run locally? or is it expected to connect to somewhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your feedback, really appreciate it.

For accessing the Delta Sharing Server, Delta Sharing provides a profile file that allows you to connect to a public example server. You can download the necessary credentials from this link. More information about the Delta Sharing project can be found here, their Github repository.

Regarding the runnable example, I’m not sure if you’d prefer it in the code or in the comments, but I’ll provide it here for convenience:

from kedro_datasets_experimental.pandas import DeltaSharingDataset

# Define the credentials for accessing the Delta Sharing profile
credentials = {
    "profile_file": "open-datasets.share"  # Path to the profile file downloaded from link above
}

# Load arguments specifying any additional options for loading data
load_args = {
    "limit": 10,  # Limit the number of rows loaded
}

# Create an instance of the DeltaSharingDataset with the specified parameters
dataset = DeltaSharingDataset(
    share="delta_sharing",  # Name of the share in Delta Sharing
    schema="default",       # Schema name in Delta Sharing
    table="nyctaxi_2019",  # Table name to load data from
    credentials=credentials,  # Pass the credentials for access
    load_args=load_args      # Pass the loading options
)

# Load the data into a Pandas DataFrame
data = dataset.load()
print(data)  # Display the loaded data

The public example share includes the following datasets if you'd like to test different examples:

  • COVID_19_NYT
  • boston-housing
  • flight-asa_2008
  • lending_club
  • nyctaxi_2019
  • nyctaxi_2019_part
  • owid-covid-data

Please let me know if you need any further information or if you'd like me to incorporate this example directly into the code!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer this as docstring. Docstring are rendered as doc automatically, for example: https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-5.1.0/api/kedro_datasets_experimental.langchain.ChatAnthropicDataset.html

I will try to run this code and convert it to docstring.


DEFAULT_LOAD_ARGS: dict[str, Any] = {
"version": None, # Load the latest version by default
"limit": None, # No limit by default
"use_delta_format": False # Default to not using Delta format
}

def __init__( # noqa: PLR0913
self,
*,
share: str,
schema: str,
table: str,
credentials: dict[str, Any],
load_args: dict[str, Any] | None = None
) -> None:
"""Creates a new instance of ``DeltaSharingDataset``.

Args:
share (str): Share name in Delta Sharing.
schema (str): Schema name in Delta Sharing.
table (str): Table name in Delta Sharing.
credentials (dict[str, Any]): Credentials for accessing the Delta Sharing profile. Must include:
- `profile_file` (str): Path to the Delta Sharing profile file.
load_args (dict[str, Any], optional): Additional options for loading data.
- `version` (int, optional): A non-negative integer specifying the version of the table snapshot to load.
Defaults to None, which loads the latest version. This parameter allows you to access historical versions of the shared table.
- `limit` (int, optional): A non-negative integer specifying the maximum number of rows to load.
- `use_delta_format` (bool, optional): Whether to use the Delta format for loading data. Defaults to False.

Raises:
DatasetError: If the profile file is not specified in credentials.
"""
self.share = share
self.schema = schema
self.table = table
self.profile_file = credentials.get("profile_file")

# Merge the provided load_args with DEFAULT_LOAD_ARGS
self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
self.version = self._load_args["version"]
self.limit = self._load_args["limit"]
self.use_delta_format = self._load_args["use_delta_format"]

if not self.profile_file:
raise DatasetError("Profile file must be specified in credentials.")

self.table_path = f"{self.share}.{self.schema}.{self.table}"
self.table_url = f"{self.profile_file}#{self.table_path}"

def _load(self) -> pd.DataFrame:
"""Load data from the Delta Sharing shared table into a Pandas DataFrame.

Returns:
pd.DataFrame: Loaded data.
"""
return delta_sharing.load_as_pandas(
self.table_url,
version=self.version,
limit=self.limit,
use_delta_format=self.use_delta_format
)

def _save(self, data: pd.DataFrame) -> None:
"""Saving to Delta Sharing shared tables is not supported.

Args:
data (pd.DataFrame): Data to save.

Raises:
NotImplementedError: Saving to Delta Sharing shared tables is not supported.
"""
raise NotImplementedError("Saving to Delta Sharing shared tables is not supported.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be DatasetError which can be imported from kedro.io.core as I see some other datasets where one of the operations are not supported do the same.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree! I have updated the code to use DatasetError imported from kedro.io.core instead of NotImplementedError, following the pattern in other datasets. Thank you for your feedback!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, added the requested information to the release notes and the API .rst file as you suggested. If there's anything else you need, feel free to let me know!


def preview(self, nrows: int = 5) -> TablePreview:
"""Generate a preview of the dataset with a specified number of rows.

Args:
nrows (int, optional): The number of rows to include in the preview. Defaults to 5.

Returns:
TablePreview: A dictionary containing the data in a split format.
"""
dataset_copy = self._copy()
dataset_copy.limit = nrows
data = dataset_copy._load()
return data.to_dict(orient="split")

def _describe(self) -> dict[str, Any]:
"""Describe the dataset configuration.

Returns:
dict[str, Any]: Dataset configuration.
"""
return {
"share": self.share,
"schema": self.schema,
"table": self.table,
"profile_file": self.profile_file,
"version": self.version,
"limit": self.limit,
"use_delta_format": self.use_delta_format
}
4 changes: 3 additions & 1 deletion kedro-datasets/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ pytorch = ["kedro-datasets[pytorch-dataset]"]
rioxarray-geotiffdataset = ["rioxarray>=0.15.0"]
rioxarray = ["kedro-datasets[rioxarray-geotiffdataset]"]

delta-sharing = ["kedro-datasets[delta-sharing-dataset]"]
# Docs requirements
docs = [
"kedro-sphinx-theme==2024.4.0",
Expand Down Expand Up @@ -290,7 +291,8 @@ experimental = [
"netcdf4>=1.6.4",
"xarray>=2023.1.0",
"rioxarray",
"torch"
"torch",
"delta-sharing>=1.1.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why version 1.1.1?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve removed the version pinning since it was only necessary for testing purposes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the kind words! The direction that Delta is taking was the exact reason that inspired me to contribute to the project and combine these two amazing initiatives. Regarding the runnable example, I have shared it in a different comment mentioning it. Please let me know if there's anything else you need!

]

# All requirements
Expand Down