From 0e5a4c49764eaac934378c784732f3f7f53323d4 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Fri, 11 Oct 2024 16:46:11 +0530 Subject: [PATCH 01/15] added the experimental ExternalTableDataset Signed-off-by: Minura Punchihewa --- .../databricks/__init__.py | 12 ++ .../databricks/external_table_dataset.py | 175 ++++++++++++++++++ 2 files changed, 187 insertions(+) create mode 100644 kedro-datasets/kedro_datasets_experimental/databricks/__init__.py create mode 100644 kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py diff --git a/kedro-datasets/kedro_datasets_experimental/databricks/__init__.py b/kedro-datasets/kedro_datasets_experimental/databricks/__init__.py new file mode 100644 index 000000000..ec5a41a6f --- /dev/null +++ b/kedro-datasets/kedro_datasets_experimental/databricks/__init__.py @@ -0,0 +1,12 @@ +"""Provides an interface to Unity Catalog External Tables.""" + +from typing import Any + +import lazy_loader as lazy + +# https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901 +ExternalTableDataset: Any + +__getattr__, __dir__, __all__ = lazy.attach( + __name__, submod_attrs={"external_table_dataset": ["ExternalTableDataset"]} +) diff --git a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py new file mode 100644 index 000000000..c85687d97 --- /dev/null +++ b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py @@ -0,0 +1,175 @@ +"""``ExternalTableDataset`` implementation to access external tables +in Databricks. +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Any + +import pandas as pd +import pandas as pd +from kedro.io.core import ( + DatasetError +) +from pyspark.sql import DataFrame + +from kedro_datasets.databricks._base_table_dataset import BaseTable, BaseTableDataset + +logger = logging.getLogger(__name__) +pd.DataFrame.iteritems = pd.DataFrame.items + + +@dataclass(frozen=True) +class ExternalTable(BaseTable): + """Stores the definition of an external table.""" + + def _validate_location(self) -> None: + """Validates that a location is provided if the table does not exist. + + Raises: + DatasetError: If the table does not exist and no location is provided. + """ + if not self.exists() and not self.location: + raise DatasetError( + "If the external table does not exists, the `location` parameter must be provided. " + "This should be valid path in an external location that has already been created." + ) + + def _validate_write_mode(self) -> None: + """Validates that the write mode is compatible with the format. + + Raises: + DatasetError: If the write mode is not compatible with the format. + """ + super()._validate_write_mode() + + if self.write_mode == "upsert" and self.format != "delta": + raise DatasetError( + f"Format '{self.format}' is not supported for upserts. " + f"Please use 'delta' format." + ) + + if self.write_mode == "overwrite" and self.format != "delta" and not self.location: + raise DatasetError( + f"Format '{self.format}' is supported for overwrites only if the location is provided. " + f"Please provide a valid path in an external location." + ) + + +class ExternalTableDataset(BaseTableDataset): + """``ExternalTableDataset`` loads and saves data into external tables in Databricks. + Load and save can be in Spark or Pandas dataframes, specified in dataframe_type. + Example usage for the + `YAML API `_: + .. code-block:: yaml + names_and_ages@spark: + type: databricks.ExternalTableDataset + format: parquet + table: names_and_ages + names_and_ages@pandas: + type: databricks.ExternalTableDataset + format: parquet + table: names_and_ages + dataframe_type: pandas + Example usage for the + `Python API `_: + .. code-block:: pycon + >>> from kedro_datasets.databricks import ExternalTableDataset + >>> from pyspark.sql import SparkSession + >>> from pyspark.sql.types import IntegerType, Row, StringType, StructField, StructType + >>> import importlib_metadata + >>> + >>> DELTA_VERSION = importlib_metadata.version("delta-spark") + >>> schema = StructType( + ... [StructField("name", StringType(), True), StructField("age", IntegerType(), True)] + ... ) + >>> data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)] + >>> spark_df = ( + ... SparkSession.builder.config( + ... "spark.jars.packages", f"io.delta:delta-core_2.12:{DELTA_VERSION}" + ... ) + ... .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + ... .config( + ... "spark.sql.catalog.spark_catalog", + ... "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ... ) + ... .getOrCreate() + ... .createDataFrame(data, schema) + ... ) + >>> dataset = ExternalTableDataset( + ... table="names_and_ages", + ... write_mode="overwrite", + ... location="abfss://container@storageaccount.dfs.core.windows.net/depts/cust" + ... ) + >>> dataset.save(spark_df) + >>> reloaded = dataset.load() + >>> assert Row(name="Bob", age=12) in reloaded.take(4) + """ + + def _create_table( # noqa: PLR0913 + self, + table: str, + catalog: str | None, + database: str, + format: str, + write_mode: str | None, + location: str | None, + dataframe_type: str, + primary_key: str | list[str] | None, + json_schema: dict[str, Any] | None, + partition_columns: list[str] | None, + owner_group: str | None + ) -> ExternalTable: + """Creates a new ``ExternalTable`` instance with the provided attributes. + Args: + table: The name of the table. + catalog: The catalog of the table. + database: The database of the table. + format: The format of the table. + write_mode: The write mode for the table. + dataframe_type: The type of dataframe. + primary_key: The primary key of the table. + json_schema: The JSON schema of the table. + partition_columns: The partition columns of the table. + owner_group: The owner group of the table. + Returns: + ``ExternalTable``: The new ``ExternalTable`` instance. + """ + return ExternalTable( + table=table, + catalog=catalog, + database=database, + write_mode=write_mode, + location=location, + dataframe_type=dataframe_type, + json_schema=json_schema, + partition_columns=partition_columns, + owner_group=owner_group, + primary_key=primary_key, + format=format + ) + + def _save_overwrite(self, data: DataFrame) -> None: + """Overwrites the data in the table with the data provided. + Args: + data (DataFrame): The Spark dataframe to overwrite the table with. + """ + writer = data.write.format(self._table.format).mode("overwrite").option( + "overwriteSchema", "true" + ) + + if self._table.partition_columns: + writer.partitionBy( + *self._table.partition_columns if isinstance(self._table.partition_columns, list) else self._table.partition_columns + ) + + if self._table.format == "delta" or (not self._table.exists()): + if self._table.location: + writer.option("path", self._table.location) + + writer.saveAsTable(self._table.full_table_location() or "") + + else: + writer.save(self._table.location) From 61f7771ed4d31b00a9e97a0a0985bb67f2c242b4 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Fri, 11 Oct 2024 17:05:42 +0530 Subject: [PATCH 02/15] fixed lint issues Signed-off-by: Minura Punchihewa --- .../databricks/external_table_dataset.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py index c85687d97..29a545df7 100644 --- a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py @@ -8,10 +8,7 @@ from typing import Any import pandas as pd -import pandas as pd -from kedro.io.core import ( - DatasetError -) +from kedro.io.core import DatasetError from pyspark.sql import DataFrame from kedro_datasets.databricks._base_table_dataset import BaseTable, BaseTableDataset @@ -26,7 +23,7 @@ class ExternalTable(BaseTable): def _validate_location(self) -> None: """Validates that a location is provided if the table does not exist. - + Raises: DatasetError: If the table does not exist and no location is provided. """ @@ -35,10 +32,10 @@ def _validate_location(self) -> None: "If the external table does not exists, the `location` parameter must be provided. " "This should be valid path in an external location that has already been created." ) - + def _validate_write_mode(self) -> None: """Validates that the write mode is compatible with the format. - + Raises: DatasetError: If the write mode is not compatible with the format. """ @@ -49,7 +46,7 @@ def _validate_write_mode(self) -> None: f"Format '{self.format}' is not supported for upserts. " f"Please use 'delta' format." ) - + if self.write_mode == "overwrite" and self.format != "delta" and not self.location: raise DatasetError( f"Format '{self.format}' is supported for overwrites only if the location is provided. " @@ -150,7 +147,7 @@ def _create_table( # noqa: PLR0913 primary_key=primary_key, format=format ) - + def _save_overwrite(self, data: DataFrame) -> None: """Overwrites the data in the table with the data provided. Args: @@ -159,7 +156,7 @@ def _save_overwrite(self, data: DataFrame) -> None: writer = data.write.format(self._table.format).mode("overwrite").option( "overwriteSchema", "true" ) - + if self._table.partition_columns: writer.partitionBy( *self._table.partition_columns if isinstance(self._table.partition_columns, list) else self._table.partition_columns From 46858c309caeaeb65974e614e35ee2f8038ef4e6 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Fri, 11 Oct 2024 17:08:07 +0530 Subject: [PATCH 03/15] added the missing location attr to the docstring Signed-off-by: Minura Punchihewa --- .../databricks/external_table_dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py index 29a545df7..5c1a2466c 100644 --- a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py @@ -126,6 +126,7 @@ def _create_table( # noqa: PLR0913 database: The database of the table. format: The format of the table. write_mode: The write mode for the table. + location: The location of the table. dataframe_type: The type of dataframe. primary_key: The primary key of the table. json_schema: The JSON schema of the table. From e9d097eb288ee892d92d78c3d0f2dd0bebecea83 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Fri, 11 Oct 2024 19:42:51 +0530 Subject: [PATCH 04/15] removed unused code from the tests for ManagedTableDataset Signed-off-by: Minura Punchihewa --- .../databricks/test_managed_table_dataset.py | 167 ------------------ 1 file changed, 167 deletions(-) diff --git a/kedro-datasets/tests/databricks/test_managed_table_dataset.py b/kedro-datasets/tests/databricks/test_managed_table_dataset.py index c3cc623f4..6c7acb97b 100644 --- a/kedro-datasets/tests/databricks/test_managed_table_dataset.py +++ b/kedro-datasets/tests/databricks/test_managed_table_dataset.py @@ -1,173 +1,6 @@ -import pandas as pd -import pytest -from pyspark.sql import SparkSession -from pyspark.sql.types import IntegerType, StringType, StructField, StructType - from kedro_datasets.databricks import ManagedTableDataset -@pytest.fixture -def sample_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def upsert_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [("Alex", 32), ("Evan", 23)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def mismatched_upsert_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - StructField("height", IntegerType(), True), - ] - ) - - data = [("Alex", 32, 174), ("Evan", 23, 166)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def subset_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - StructField("height", IntegerType(), True), - ] - ) - - data = [("Alex", 32, 174), ("Evan", 23, 166)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def subset_pandas_df(): - return pd.DataFrame( - {"name": ["Alex", "Evan"], "age": [32, 23], "height": [174, 166]} - ) - - -@pytest.fixture -def subset_expected_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [("Alex", 32), ("Evan", 23)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def sample_pandas_df(): - return pd.DataFrame( - {"name": ["Alex", "Bob", "Clarke", "Dave"], "age": [31, 12, 65, 29]} - ) - - -@pytest.fixture -def append_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [("Evan", 23), ("Frank", 13)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def expected_append_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [ - ("Alex", 31), - ("Bob", 12), - ("Clarke", 65), - ("Dave", 29), - ("Evan", 23), - ("Frank", 13), - ] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def expected_upsert_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [ - ("Alex", 32), - ("Bob", 12), - ("Clarke", 65), - ("Dave", 29), - ("Evan", 23), - ] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def expected_upsert_multiple_primary_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [ - ("Alex", 31), - ("Alex", 32), - ("Bob", 12), - ("Clarke", 65), - ("Dave", 29), - ("Evan", 23), - ] - - return spark_session.createDataFrame(data, schema) - - class TestManagedTableDataset: def test_describe(self): unity_ds = ManagedTableDataset(table="test") From b8964640d2ae8bfbb4e423407453e54add763f42 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Fri, 11 Oct 2024 19:43:12 +0530 Subject: [PATCH 05/15] added tests for ExternalTableDataset Signed-off-by: Minura Punchihewa --- .../databricks/test_external_table_dataset.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 kedro-datasets/tests/databricks/test_external_table_dataset.py diff --git a/kedro-datasets/tests/databricks/test_external_table_dataset.py b/kedro-datasets/tests/databricks/test_external_table_dataset.py new file mode 100644 index 000000000..e838c7bd1 --- /dev/null +++ b/kedro-datasets/tests/databricks/test_external_table_dataset.py @@ -0,0 +1,42 @@ +import pytest +from kedro.io.core import DatasetError +from pyspark.sql import DataFrame + +from kedro_datasets_experimental.databricks.external_table_dataset import ExternalTableDataset + + +class TestExternalTableDataset: + def test_location_for_non_existing_table(self): + with pytest.raises(DatasetError): + ExternalTableDataset(table="test") + + def test_invalid_upsert_write_mode(self): + with pytest.raises(DatasetError): + ExternalTableDataset(table="test", write_mode="upsert", format="parquet") + + def test_invalid_overwrite_write_mode(self): + with pytest.raises(DatasetError): + ExternalTableDataset(table="test", write_mode="overwrite", format="parquet") + + def test_save_overwrite_without_location(self): + with pytest.raises(DatasetError): + ExternalTableDataset(table="test", write_mode="overwrite", format="delta") + + def test_save_overwrite( + self, + sample_spark_df: DataFrame, + append_spark_df: DataFrame, + external_location: str, + ): + unity_ds = ExternalTableDataset( + database="test", + table="test_save", + write_mode="overwrite", + location=f"{external_location}/test_save_overwrite_external", + ) + unity_ds.save(sample_spark_df) + unity_ds.save(append_spark_df) + + overwritten_table = unity_ds.load() + + assert append_spark_df.exceptAll(overwritten_table).count() == 0 From 8354b001511c0164965de5b3f5c3b077da7f1a58 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Fri, 11 Oct 2024 19:45:48 +0530 Subject: [PATCH 06/15] moved all fixtures to conftest.py Signed-off-by: Minura Punchihewa --- kedro-datasets/tests/databricks/conftest.py | 171 ++++++++++++++++++ .../databricks/test_base_table_dataset.py | 169 +---------------- 2 files changed, 172 insertions(+), 168 deletions(-) diff --git a/kedro-datasets/tests/databricks/conftest.py b/kedro-datasets/tests/databricks/conftest.py index ccc0c78ad..336e8168d 100644 --- a/kedro-datasets/tests/databricks/conftest.py +++ b/kedro-datasets/tests/databricks/conftest.py @@ -4,11 +4,15 @@ discover them automatically. More info here: https://docs.pytest.org/en/latest/fixture.html """ +import os # importlib_metadata needs backport for python 3.8 and older import importlib_metadata +import pandas as pd import pytest from pyspark.sql import SparkSession +from pyspark.sql.types import IntegerType, StringType, StructField, StructType + DELTA_VERSION = importlib_metadata.version("delta-spark") @@ -28,3 +32,170 @@ def spark_session(): spark.sql("create database if not exists test") yield spark spark.sql("drop database test cascade;") + + +@pytest.fixture +def sample_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def upsert_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [("Alex", 32), ("Evan", 23)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def mismatched_upsert_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + StructField("height", IntegerType(), True), + ] + ) + + data = [("Alex", 32, 174), ("Evan", 23, 166)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def subset_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + StructField("height", IntegerType(), True), + ] + ) + + data = [("Alex", 32, 174), ("Evan", 23, 166)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def subset_pandas_df(): + return pd.DataFrame( + {"name": ["Alex", "Evan"], "age": [32, 23], "height": [174, 166]} + ) + + +@pytest.fixture +def subset_expected_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [("Alex", 32), ("Evan", 23)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def sample_pandas_df(): + return pd.DataFrame( + {"name": ["Alex", "Bob", "Clarke", "Dave"], "age": [31, 12, 65, 29]} + ) + + +@pytest.fixture +def append_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [("Evan", 23), ("Frank", 13)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def expected_append_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [ + ("Alex", 31), + ("Bob", 12), + ("Clarke", 65), + ("Dave", 29), + ("Evan", 23), + ("Frank", 13), + ] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def expected_upsert_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [ + ("Alex", 32), + ("Bob", 12), + ("Clarke", 65), + ("Dave", 29), + ("Evan", 23), + ] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def expected_upsert_multiple_primary_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [ + ("Alex", 31), + ("Alex", 32), + ("Bob", 12), + ("Clarke", 65), + ("Dave", 29), + ("Evan", 23), + ] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def external_location(): + return os.environ.get("DATABRICKS_EXTERNAL_LOCATION") diff --git a/kedro-datasets/tests/databricks/test_base_table_dataset.py b/kedro-datasets/tests/databricks/test_base_table_dataset.py index 5cc88e8df..ac0c0cc27 100644 --- a/kedro-datasets/tests/databricks/test_base_table_dataset.py +++ b/kedro-datasets/tests/databricks/test_base_table_dataset.py @@ -3,179 +3,12 @@ import pandas as pd import pytest from kedro.io.core import DatasetError, Version, VersionNotFoundError -from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import DataFrame from pyspark.sql.types import IntegerType, StringType, StructField, StructType from kedro_datasets.databricks._base_table_dataset import BaseTableDataset -@pytest.fixture -def sample_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def upsert_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [("Alex", 32), ("Evan", 23)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def mismatched_upsert_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - StructField("height", IntegerType(), True), - ] - ) - - data = [("Alex", 32, 174), ("Evan", 23, 166)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def subset_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - StructField("height", IntegerType(), True), - ] - ) - - data = [("Alex", 32, 174), ("Evan", 23, 166)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def subset_pandas_df(): - return pd.DataFrame( - {"name": ["Alex", "Evan"], "age": [32, 23], "height": [174, 166]} - ) - - -@pytest.fixture -def subset_expected_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [("Alex", 32), ("Evan", 23)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def sample_pandas_df(): - return pd.DataFrame( - {"name": ["Alex", "Bob", "Clarke", "Dave"], "age": [31, 12, 65, 29]} - ) - - -@pytest.fixture -def append_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [("Evan", 23), ("Frank", 13)] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def expected_append_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [ - ("Alex", 31), - ("Bob", 12), - ("Clarke", 65), - ("Dave", 29), - ("Evan", 23), - ("Frank", 13), - ] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def expected_upsert_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [ - ("Alex", 32), - ("Bob", 12), - ("Clarke", 65), - ("Dave", 29), - ("Evan", 23), - ] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def expected_upsert_multiple_primary_spark_df(spark_session: SparkSession): - schema = StructType( - [ - StructField("name", StringType(), True), - StructField("age", IntegerType(), True), - ] - ) - - data = [ - ("Alex", 31), - ("Alex", 32), - ("Bob", 12), - ("Clarke", 65), - ("Dave", 29), - ("Evan", 23), - ] - - return spark_session.createDataFrame(data, schema) - - -@pytest.fixture -def external_location(): - return os.environ.get("DATABRICKS_EXTERNAL_LOCATION") - - class TestBaseTableDataset: def test_full_table(self): unity_ds = BaseTableDataset(catalog="test", database="test", table="test") From 31aecc8c4fab30eb4dba025649b5ebbb93ad2969 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Fri, 11 Oct 2024 20:17:53 +0530 Subject: [PATCH 07/15] fixed lint issues Signed-off-by: Minura Punchihewa --- kedro-datasets/tests/databricks/conftest.py | 1 - kedro-datasets/tests/databricks/test_base_table_dataset.py | 2 -- .../tests/databricks/test_external_table_dataset.py | 4 +++- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/kedro-datasets/tests/databricks/conftest.py b/kedro-datasets/tests/databricks/conftest.py index 336e8168d..6984faabb 100644 --- a/kedro-datasets/tests/databricks/conftest.py +++ b/kedro-datasets/tests/databricks/conftest.py @@ -13,7 +13,6 @@ from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType, StringType, StructField, StructType - DELTA_VERSION = importlib_metadata.version("delta-spark") diff --git a/kedro-datasets/tests/databricks/test_base_table_dataset.py b/kedro-datasets/tests/databricks/test_base_table_dataset.py index ac0c0cc27..49f2283a1 100644 --- a/kedro-datasets/tests/databricks/test_base_table_dataset.py +++ b/kedro-datasets/tests/databricks/test_base_table_dataset.py @@ -1,5 +1,3 @@ -import os - import pandas as pd import pytest from kedro.io.core import DatasetError, Version, VersionNotFoundError diff --git a/kedro-datasets/tests/databricks/test_external_table_dataset.py b/kedro-datasets/tests/databricks/test_external_table_dataset.py index e838c7bd1..369587899 100644 --- a/kedro-datasets/tests/databricks/test_external_table_dataset.py +++ b/kedro-datasets/tests/databricks/test_external_table_dataset.py @@ -2,7 +2,9 @@ from kedro.io.core import DatasetError from pyspark.sql import DataFrame -from kedro_datasets_experimental.databricks.external_table_dataset import ExternalTableDataset +from kedro_datasets_experimental.databricks.external_table_dataset import ( + ExternalTableDataset, +) class TestExternalTableDataset: From 1696d23085dabee514e232f3b40f127fbb5a243a Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Mon, 14 Oct 2024 18:59:07 +0530 Subject: [PATCH 08/15] updated the format for the test for save_overwrite to Parquet Signed-off-by: Minura Punchihewa --- kedro-datasets/tests/databricks/test_external_table_dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kedro-datasets/tests/databricks/test_external_table_dataset.py b/kedro-datasets/tests/databricks/test_external_table_dataset.py index 369587899..0e4ef3446 100644 --- a/kedro-datasets/tests/databricks/test_external_table_dataset.py +++ b/kedro-datasets/tests/databricks/test_external_table_dataset.py @@ -33,6 +33,7 @@ def test_save_overwrite( unity_ds = ExternalTableDataset( database="test", table="test_save", + format="parquet", write_mode="overwrite", location=f"{external_location}/test_save_overwrite_external", ) From 761f9634e7fd44f7a0f011c1b922c29119a5a659 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Mon, 14 Oct 2024 19:20:14 +0530 Subject: [PATCH 09/15] moved tests to the kedro_datasets_experimental pkg Signed-off-by: Minura Punchihewa --- .../tests/databricks/__init__.py | 0 .../tests/databricks/conftest.py | 200 ++++++++++++++++++ .../databricks/test_external_table_dataset.py | 0 3 files changed, 200 insertions(+) create mode 100644 kedro-datasets/kedro_datasets_experimental/tests/databricks/__init__.py create mode 100644 kedro-datasets/kedro_datasets_experimental/tests/databricks/conftest.py rename kedro-datasets/{ => kedro_datasets_experimental}/tests/databricks/test_external_table_dataset.py (100%) diff --git a/kedro-datasets/kedro_datasets_experimental/tests/databricks/__init__.py b/kedro-datasets/kedro_datasets_experimental/tests/databricks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kedro-datasets/kedro_datasets_experimental/tests/databricks/conftest.py b/kedro-datasets/kedro_datasets_experimental/tests/databricks/conftest.py new file mode 100644 index 000000000..6984faabb --- /dev/null +++ b/kedro-datasets/kedro_datasets_experimental/tests/databricks/conftest.py @@ -0,0 +1,200 @@ +""" +This file contains the fixtures that are reusable by any tests within +this directory. You don't need to import the fixtures as pytest will +discover them automatically. More info here: +https://docs.pytest.org/en/latest/fixture.html +""" +import os + +# importlib_metadata needs backport for python 3.8 and older +import importlib_metadata +import pandas as pd +import pytest +from pyspark.sql import SparkSession +from pyspark.sql.types import IntegerType, StringType, StructField, StructType + +DELTA_VERSION = importlib_metadata.version("delta-spark") + + +@pytest.fixture(scope="class", autouse=True) +def spark_session(): + spark = ( + SparkSession.builder.appName("test") + .config("spark.jars.packages", f"io.delta:delta-core_2.12:{DELTA_VERSION}") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ) + .getOrCreate() + ) + spark.sql("create database if not exists test") + yield spark + spark.sql("drop database test cascade;") + + +@pytest.fixture +def sample_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def upsert_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [("Alex", 32), ("Evan", 23)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def mismatched_upsert_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + StructField("height", IntegerType(), True), + ] + ) + + data = [("Alex", 32, 174), ("Evan", 23, 166)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def subset_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + StructField("height", IntegerType(), True), + ] + ) + + data = [("Alex", 32, 174), ("Evan", 23, 166)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def subset_pandas_df(): + return pd.DataFrame( + {"name": ["Alex", "Evan"], "age": [32, 23], "height": [174, 166]} + ) + + +@pytest.fixture +def subset_expected_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [("Alex", 32), ("Evan", 23)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def sample_pandas_df(): + return pd.DataFrame( + {"name": ["Alex", "Bob", "Clarke", "Dave"], "age": [31, 12, 65, 29]} + ) + + +@pytest.fixture +def append_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [("Evan", 23), ("Frank", 13)] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def expected_append_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [ + ("Alex", 31), + ("Bob", 12), + ("Clarke", 65), + ("Dave", 29), + ("Evan", 23), + ("Frank", 13), + ] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def expected_upsert_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [ + ("Alex", 32), + ("Bob", 12), + ("Clarke", 65), + ("Dave", 29), + ("Evan", 23), + ] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def expected_upsert_multiple_primary_spark_df(spark_session: SparkSession): + schema = StructType( + [ + StructField("name", StringType(), True), + StructField("age", IntegerType(), True), + ] + ) + + data = [ + ("Alex", 31), + ("Alex", 32), + ("Bob", 12), + ("Clarke", 65), + ("Dave", 29), + ("Evan", 23), + ] + + return spark_session.createDataFrame(data, schema) + + +@pytest.fixture +def external_location(): + return os.environ.get("DATABRICKS_EXTERNAL_LOCATION") diff --git a/kedro-datasets/tests/databricks/test_external_table_dataset.py b/kedro-datasets/kedro_datasets_experimental/tests/databricks/test_external_table_dataset.py similarity index 100% rename from kedro-datasets/tests/databricks/test_external_table_dataset.py rename to kedro-datasets/kedro_datasets_experimental/tests/databricks/test_external_table_dataset.py From 25c3e5b1dc5d9a0811f8722fb00036ec2d8154ab Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Sat, 19 Oct 2024 00:13:51 +0530 Subject: [PATCH 10/15] improved spacing in the main docstring Signed-off-by: Minura Punchihewa --- .../databricks/external_table_dataset.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py index 5c1a2466c..ab1ad9dc2 100644 --- a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py @@ -57,6 +57,7 @@ def _validate_write_mode(self) -> None: class ExternalTableDataset(BaseTableDataset): """``ExternalTableDataset`` loads and saves data into external tables in Databricks. Load and save can be in Spark or Pandas dataframes, specified in dataframe_type. + Example usage for the `YAML API `_: .. code-block:: yaml @@ -64,11 +65,13 @@ class ExternalTableDataset(BaseTableDataset): type: databricks.ExternalTableDataset format: parquet table: names_and_ages + names_and_ages@pandas: type: databricks.ExternalTableDataset format: parquet table: names_and_ages dataframe_type: pandas + Example usage for the `Python API `_: From 1577f9afa2df0955ce2df6596f5ea5dc64381c92 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Sat, 19 Oct 2024 00:29:32 +0530 Subject: [PATCH 11/15] updated the release doc Signed-off-by: Minura Punchihewa --- kedro-datasets/RELEASE.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index d90898153..20915e597 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -2,12 +2,24 @@ ## Major features and improvements +- Added the following new **experimental** datasets: + +| Type | Description | Location | +| --------------------------------- | ------------------------------------------------------ | ---------------------------------------- | +| `databricks.ExternalTableDataset` | A dataset for accessing external tables in Databricks. | `kedro_datasets_experimental.databricks` | + - Added the following new core datasets: | Type | Description | Location | | ------------------- | ------------------------------------------------------------- | --------------------- | | `ibis.TableDataset` | A dataset for loading and saving files using Ibis's backends. | `kedro_datasets.ibis` | +## Community contributions + +Many thanks to the following Kedroids for contributing PRs to this release: + +- [Minura Punchihewa](https://github.com/MinuraPunchihewa) + # Release 5.0.0 ## Major features and improvements From d8668ea90083cc215325ceffed09792474a8dca1 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Mon, 21 Oct 2024 12:03:11 +0100 Subject: [PATCH 12/15] Add contribution to correct release Signed-off-by: Merel Theisen --- kedro-datasets/RELEASE.md | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 5f4e4b78f..fe94686f6 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,7 +1,5 @@ # Upcoming Release -# Release 5.1.0 - ## Major features and improvements - Added the following new **experimental** datasets: @@ -10,6 +8,20 @@ | --------------------------------- | ------------------------------------------------------ | ---------------------------------------- | | `databricks.ExternalTableDataset` | A dataset for accessing external tables in Databricks. | `kedro_datasets_experimental.databricks` | +## Bug fixes and other changes + +## Breaking Changes + +## Community contributions + +Many thanks to the following Kedroids for contributing PRs to this release: + +- [Minura Punchihewa](https://github.com/MinuraPunchihewa) + +# Release 5.1.0 + +## Major features and improvements + - Added the following new core datasets: | Type | Description | Location | @@ -20,12 +32,6 @@ - Changed Ibis datasets to connect to an in-memory DuckDB database if connection configuration is not provided. -## Community contributions - -Many thanks to the following Kedroids for contributing PRs to this release: - -- [Minura Punchihewa](https://github.com/MinuraPunchihewa) - # Release 5.0.0 ## Major features and improvements From 586068822a4e45705a975202283e252dc5210a28 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Mon, 21 Oct 2024 17:26:58 +0530 Subject: [PATCH 13/15] added the dataset to the documentation Signed-off-by: Minura Punchihewa --- kedro-datasets/docs/source/api/kedro_datasets_experimental.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/kedro-datasets/docs/source/api/kedro_datasets_experimental.rst b/kedro-datasets/docs/source/api/kedro_datasets_experimental.rst index 219510954..a29e8449c 100644 --- a/kedro-datasets/docs/source/api/kedro_datasets_experimental.rst +++ b/kedro-datasets/docs/source/api/kedro_datasets_experimental.rst @@ -11,6 +11,7 @@ kedro_datasets_experimental :toctree: :template: autosummary/class.rst + databricks.ExternalTableDataset langchain.ChatAnthropicDataset langchain.ChatCohereDataset langchain.ChatOpenAIDataset From 2451d702d8cf525499bc5b900a5985fd14ac8a53 Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Mon, 21 Oct 2024 17:29:25 +0530 Subject: [PATCH 14/15] listed the dependencies for the dataset Signed-off-by: Minura Punchihewa --- kedro-datasets/pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index 706d2c80d..c33584112 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -174,6 +174,7 @@ yaml-yamldataset = ["kedro-datasets[pandas-base]", "PyYAML>=4.2, <7.0"] yaml = ["kedro-datasets[yaml-yamldataset]"] # Experimental Datasets +databricks-externaltabledataset = ["kedro-datasets[spark-base,pandas-base,delta-base,hdfs-base,s3fs-base]"] langchain-chatopenaidataset = ["langchain-openai~=0.1.7"] langchain-openaiembeddingsdataset = ["langchain-openai~=0.1.7"] langchain-chatanthropicdataset = ["langchain-anthropic~=0.1.13", "langchain-community~=0.2.0"] From fe767e2cffdbe123d223d9e3bba48978abb6144c Mon Sep 17 00:00:00 2001 From: Minura Punchihewa Date: Mon, 21 Oct 2024 17:43:05 +0530 Subject: [PATCH 15/15] fixed indentation in the docstring Signed-off-by: Minura Punchihewa --- .../databricks/external_table_dataset.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py index ab1ad9dc2..a4f4a351b 100644 --- a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py @@ -60,7 +60,9 @@ class ExternalTableDataset(BaseTableDataset): Example usage for the `YAML API `_: + .. code-block:: yaml + names_and_ages@spark: type: databricks.ExternalTableDataset format: parquet @@ -73,9 +75,10 @@ class ExternalTableDataset(BaseTableDataset): dataframe_type: pandas Example usage for the - `Python API `_: + `Python API `_: + .. code-block:: pycon + >>> from kedro_datasets.databricks import ExternalTableDataset >>> from pyspark.sql import SparkSession >>> from pyspark.sql.types import IntegerType, Row, StringType, StructField, StructType