From f63d5172bd37eb246604974de4cd669ca5e7fc02 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> Date: Wed, 21 Aug 2024 11:02:34 +0100 Subject: [PATCH 01/24] ci: Fix `pandas.DeltaTableDataset` tests (#811) Fix deltatable tests Signed-off-by: Ankita Katiyar Signed-off-by: Harm Matthias Harms --- kedro-datasets/tests/pandas/test_deltatable_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/tests/pandas/test_deltatable_dataset.py b/kedro-datasets/tests/pandas/test_deltatable_dataset.py index 3b81ef421..666e24e39 100644 --- a/kedro-datasets/tests/pandas/test_deltatable_dataset.py +++ b/kedro-datasets/tests/pandas/test_deltatable_dataset.py @@ -48,7 +48,7 @@ def test_overwrite_with_diff_schema(self, deltatable_dataset_from_path, dummy_df """Test saving with the default overwrite mode with new data of diff schema.""" deltatable_dataset_from_path.save(dummy_df) new_df = pd.DataFrame({"new_col": [1, 2]}) - pattern = "Schema of data does not match table schema" + pattern = "Cannot cast schema, number of fields does not match: 1 vs 3" with pytest.raises(DatasetError, match=pattern): deltatable_dataset_from_path.save(new_df) @@ -167,4 +167,4 @@ def test_history(self, deltatable_dataset_from_path, dummy_df): deltatable_dataset_from_path.save(dummy_df) history = deltatable_dataset_from_path.history assert isinstance(history, list) - assert history[0]["operation"] == "CREATE TABLE" + assert history[0]["operation"] == "WRITE" From bb20cf8344f65623ba0b8f56b889bdd3414b879a Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Wed, 21 Aug 2024 15:33:09 +0200 Subject: [PATCH 02/24] feat(datasets): Add geopandas ParquetDataset Signed-off-by: Harm Matthias Harms --- .../kedro_datasets/geopandas/__init__.py | 6 +- .../geopandas/parquet_dataset.py | 162 +++++++++++++ kedro-datasets/pyproject.toml | 8 +- .../tests/geopandas/test_parquet_dataset.py | 229 ++++++++++++++++++ 4 files changed, 401 insertions(+), 4 deletions(-) create mode 100644 kedro-datasets/kedro_datasets/geopandas/parquet_dataset.py create mode 100644 kedro-datasets/tests/geopandas/test_parquet_dataset.py diff --git a/kedro-datasets/kedro_datasets/geopandas/__init__.py b/kedro-datasets/kedro_datasets/geopandas/__init__.py index d4843aa68..a257e4121 100644 --- a/kedro-datasets/kedro_datasets/geopandas/__init__.py +++ b/kedro-datasets/kedro_datasets/geopandas/__init__.py @@ -8,5 +8,9 @@ GeoJSONDataset: Any __getattr__, __dir__, __all__ = lazy.attach( - __name__, submod_attrs={"geojson_dataset": ["GeoJSONDataset"]} + __name__, + submod_attrs={ + "geojson_dataset": ["GeoJSONDataset"], + "parquet_dataset": ["ParquetDataset"], + }, ) diff --git a/kedro-datasets/kedro_datasets/geopandas/parquet_dataset.py b/kedro-datasets/kedro_datasets/geopandas/parquet_dataset.py new file mode 100644 index 000000000..831bc4b3b --- /dev/null +++ b/kedro-datasets/kedro_datasets/geopandas/parquet_dataset.py @@ -0,0 +1,162 @@ +"""ParquetDataset loads and saves data to a local parquet file. The +underlying functionality is supported by geopandas, so it supports all +allowed geopandas (pandas) options for loading and saving geosjon files. +""" + +from __future__ import annotations + +import copy +from pathlib import PurePosixPath +from typing import Any, Union + +import fsspec +import geopandas as gpd +from kedro.io.core import ( + AbstractVersionedDataset, + DatasetError, + Version, + get_filepath_str, + get_protocol_and_path, +) + + +class ParquetDataset( + AbstractVersionedDataset[ + gpd.GeoDataFrame, Union[gpd.GeoDataFrame, dict[str, gpd.GeoDataFrame]] + ] +): + """``ParquetDataset`` loads/saves data to a parquet file using an underlying filesystem + (eg: local, S3, GCS). + The underlying functionality is supported by geopandas, so it supports all + allowed geopandas (pandas) options for loading and saving parquet files. + + Example: + + .. code-block:: pycon + + >>> import geopandas as gpd + >>> from kedro_datasets.geopandas import ParquetDataset + >>> from shapely.geometry import Point + >>> + >>> data = gpd.GeoDataFrame( + ... {"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}, + ... geometry=[Point(1, 1), Point(2, 4)], + ... ) + >>> dataset = ParquetDataset(filepath=tmp_path / "test.parquet", save_args=None) + >>> dataset.save(data) + >>> reloaded = dataset.load() + >>> + >>> assert data.equals(reloaded) + + """ + + DEFAULT_LOAD_ARGS: dict[str, Any] = {} + DEFAULT_SAVE_ARGS: dict[str, Any] = {} + + def __init__( # noqa: PLR0913 + self, + *, + filepath: str, + load_args: dict[str, Any] | None = None, + save_args: dict[str, Any] | None = None, + version: Version | None = None, + credentials: dict[str, Any] | None = None, + fs_args: dict[str, Any] | None = None, + metadata: dict[str, Any] | None = None, + ) -> None: + """Creates a new instance of ``ParquetDataset`` pointing to a concrete parquet file + on a specific filesystem fsspec. + + Args: + + filepath: Filepath in POSIX format to a parquet file prefixed with a protocol like + `s3://`. If prefix is not provided `file` protocol (local filesystem) will be used. + The prefix should be any protocol supported by ``fsspec``. + Note: `http(s)` doesn't support versioning. + load_args: GeoPandas options for loading parquet files. + Here you can find all available arguments: + https://geopandas.org/en/stable/docs/reference/api/geopandas.read_parquet.html + save_args: GeoPandas options for saving parquet files. + Here you can find all available arguments: + https://geopandas.org/en/stable/docs/reference/api/geopandas.GeoDataFrame.to_parquet.html + version: If specified, should be an instance of + ``kedro.io.core.Version``. If its ``load`` attribute is + None, the latest version will be loaded. If its ``save`` + credentials: credentials required to access the underlying filesystem. + Eg. for ``GCFileSystem`` it would look like `{'token': None}`. + fs_args: Extra arguments to pass into underlying filesystem class constructor + (e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as + to pass to the filesystem's `open` method through nested keys + `open_args_load` and `open_args_save`. + Here you can find all available arguments for `open`: + https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open + All defaults are preserved, except `mode`, which is set to `wb` when saving. + metadata: Any arbitrary metadata. + This is ignored by Kedro, but may be consumed by users or external plugins. + """ + _fs_args = copy.deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) + _credentials = copy.deepcopy(credentials) or {} + protocol, path = get_protocol_and_path(filepath, version) + self._protocol = protocol + if protocol == "file": + _fs_args.setdefault("auto_mkdir", True) + + self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args) + + self.metadata = metadata + + super().__init__( + filepath=PurePosixPath(path), + version=version, + exists_function=self._fs.exists, + glob_function=self._fs.glob, + ) + + self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS) + if load_args is not None: + self._load_args.update(load_args) + + self._save_args = copy.deepcopy(self.DEFAULT_SAVE_ARGS) + if save_args is not None: + self._save_args.update(save_args) + + _fs_open_args_save.setdefault("mode", "wb") + self._fs_open_args_load = _fs_open_args_load + self._fs_open_args_save = _fs_open_args_save + + def _load(self) -> gpd.GeoDataFrame | dict[str, gpd.GeoDataFrame]: + load_path = get_filepath_str(self._get_load_path(), self._protocol) + with self._fs.open(load_path, **self._fs_open_args_load) as fs_file: + return gpd.read_parquet(fs_file, **self._load_args) + + def _save(self, data: gpd.GeoDataFrame) -> None: + save_path = get_filepath_str(self._get_save_path(), self._protocol) + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + data.to_parquet(fs_file, **self._save_args) + self.invalidate_cache() + + def _exists(self) -> bool: + try: + load_path = get_filepath_str(self._get_load_path(), self._protocol) + except DatasetError: + return False + return self._fs.exists(load_path) + + def _describe(self) -> dict[str, Any]: + return { + "filepath": self._filepath, + "protocol": self._protocol, + "load_args": self._load_args, + "save_args": self._save_args, + "version": self._version, + } + + def _release(self) -> None: + self.invalidate_cache() + + def invalidate_cache(self) -> None: + """Invalidate underlying filesystem cache.""" + filepath = get_filepath_str(self._filepath, self._protocol) + self._fs.invalidate_cache(filepath) diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index 270a2673c..f0388acbb 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -25,6 +25,7 @@ polars-base = ["polars>=0.18.0",] plotly-base = ["plotly>=4.8.0, <6.0"] delta-base = ["delta-spark~=1.2.1",] networkx-base = ["networkx~=2.4"] +geopandas-base = ["geopandas>=0.8.0, <1.0"] # Individual Datasets api-apidataset = ["requests~=2.20"] @@ -39,8 +40,9 @@ dask = ["kedro-datasets[dask-parquetdataset]"] databricks-managedtabledataset = ["kedro-datasets[spark-base,pandas-base,delta-base,hdfs-base,s3fs-base]"] databricks = ["kedro-datasets[databricks-managedtabledataset]"] -geopandas-geojsondataset = ["geopandas>=0.6.0, <1.0", "pyproj~=3.0"] -geopandas = ["kedro-datasets[geopandas-geojsondataset]"] +geopandas-geojsondataset = ["kedro-datasets[geopandas-base]", "pyproj~=3.0"] +geopandas-parquetdataset = ["kedro-datasets[geopandas-base]"] +geopandas = ["kedro-datasets[geopandas-geojsondataset,geopandas-parquetdataset]"] holoviews-holoviewswriter = ["holoviews~=1.13.0"] holoviews = ["kedro-datasets[holoviews-holoviewswriter]"] @@ -199,7 +201,7 @@ test = [ "dill~=0.3.1", "filelock>=3.4.0, <4.0", "gcsfs>=2023.1, <2023.3", - "geopandas>=0.6.0, <1.0", + "geopandas>=0.8.0, <1.0", "hdfs>=2.5.8, <3.0", "holoviews>=1.13.0", "ibis-framework[duckdb,examples]", diff --git a/kedro-datasets/tests/geopandas/test_parquet_dataset.py b/kedro-datasets/tests/geopandas/test_parquet_dataset.py new file mode 100644 index 000000000..f0e305cc8 --- /dev/null +++ b/kedro-datasets/tests/geopandas/test_parquet_dataset.py @@ -0,0 +1,229 @@ +from pathlib import Path, PurePosixPath + +import geopandas as gpd +import pytest +from fsspec.implementations.http import HTTPFileSystem +from fsspec.implementations.local import LocalFileSystem +from gcsfs import GCSFileSystem +from kedro.io.core import PROTOCOL_DELIMITER, DatasetError, Version, generate_timestamp +from pandas.testing import assert_frame_equal +from s3fs import S3FileSystem +from shapely.geometry import Point + +from kedro_datasets.geopandas import ParquetDataset + + +@pytest.fixture(params=[None]) +def load_version(request): + return request.param + + +@pytest.fixture(params=[None]) +def save_version(request): + return request.param or generate_timestamp() + + +@pytest.fixture +def filepath(tmp_path): + return (tmp_path / "test.parquet").as_posix() + + +@pytest.fixture(params=[None]) +def load_args(request): + return request.param + + +@pytest.fixture(params=[None]) +def save_args(request): + return request.param + + +@pytest.fixture +def dummy_dataframe(): + return gpd.GeoDataFrame( + {"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}, + geometry=[Point(1, 1), Point(2, 2)], + ) + + +@pytest.fixture +def parquet_dataset(filepath, load_args, save_args, fs_args): + return ParquetDataset( + filepath=filepath, load_args=load_args, save_args=save_args, fs_args=fs_args + ) + + +@pytest.fixture +def versioned_parquet_dataset(filepath, load_version, save_version): + return ParquetDataset( + filepath=filepath, version=Version(load_version, save_version) + ) + + +class TestParquetDataset: + def test_save_and_load(self, parquet_dataset, dummy_dataframe): + """Test that saved and reloaded data matches the original one.""" + parquet_dataset.save(dummy_dataframe) + reloaded_df = parquet_dataset.load() + assert_frame_equal(reloaded_df, dummy_dataframe) + assert parquet_dataset._fs_open_args_load == {} + assert parquet_dataset._fs_open_args_save == {"mode": "wb"} + + @pytest.mark.parametrize("parquet_dataset", [{"index": False}], indirect=True) + def test_load_missing_file(self, parquet_dataset): + """Check the error while trying to load from missing source.""" + pattern = r"Failed while loading data from data set ParquetDataSet" + with pytest.raises(DatasetError, match=pattern): + parquet_dataset.load() + + def test_exists(self, parquet_dataset, dummy_dataframe): + """Test `exists` method invocation for both cases.""" + assert not parquet_dataset.exists() + parquet_dataset.save(dummy_dataframe) + assert parquet_dataset.exists() + + @pytest.mark.parametrize("load_args", [{"crs": "init:4326"}, {"crs": "init:2154"}]) + def test_load_extra_params(self, parquet_dataset, load_args): + """Test overriding default save args""" + for k, v in load_args.items(): + assert parquet_dataset._load_args[k] == v + + @pytest.mark.parametrize( + "save_args", [{"driver": "ESRI Shapefile"}, {"driver": "GPKG"}] + ) + def test_save_extra_params(self, parquet_dataset, save_args): + """Test overriding default save args""" + for k, v in save_args.items(): + assert parquet_dataset._save_args[k] == v + + @pytest.mark.parametrize( + "fs_args", + [{"open_args_load": {"mode": "rb", "compression": "gzip"}}], + indirect=True, + ) + def test_open_extra_args(self, parquet_dataset, fs_args): + assert parquet_dataset._fs_open_args_load == fs_args["open_args_load"] + assert parquet_dataset._fs_open_args_save == {"mode": "wb"} + + @pytest.mark.parametrize( + "path,instance_type", + [ + ("s3://bucket/file.parquet", S3FileSystem), + ("/tmp/test.parquet", LocalFileSystem), + ("gcs://bucket/file.parquet", GCSFileSystem), + ("file:///tmp/file.parquet", LocalFileSystem), + ("https://example.com/file.parquet", HTTPFileSystem), + ], + ) + def test_protocol_usage(self, path, instance_type): + parquet_dataset = ParquetDataset(filepath=path) + assert isinstance(parquet_dataset._fs, instance_type) + + path = path.split(PROTOCOL_DELIMITER, 1)[-1] + + assert str(parquet_dataset._filepath) == path + assert isinstance(parquet_dataset._filepath, PurePosixPath) + + def test_catalog_release(self, mocker): + fs_mock = mocker.patch("fsspec.filesystem").return_value + filepath = "test.parquet" + parquet_dataset = ParquetDataset(filepath=filepath) + parquet_dataset.release() + fs_mock.invalidate_cache.assert_called_once_with(filepath) + + +class TestParquetDatasetVersioned: + def test_version_str_repr(self, load_version, save_version): + """Test that version is in string representation of the class instance + when applicable.""" + filepath = "test.parquet" + ds = ParquetDataset(filepath=filepath) + ds_versioned = ParquetDataset( + filepath=filepath, version=Version(load_version, save_version) + ) + assert filepath in str(ds) + assert "version" not in str(ds) + + assert filepath in str(ds_versioned) + ver_str = f"version=Version(load={load_version}, save='{save_version}')" + assert ver_str in str(ds_versioned) + assert "ParquetDataset" in str(ds_versioned) + assert "ParquetDataset" in str(ds) + assert "protocol" in str(ds_versioned) + assert "protocol" in str(ds) + + def test_save_and_load(self, versioned_parquet_dataset, dummy_dataframe): + """Test that saved and reloaded data matches the original one for + the versioned data set.""" + versioned_parquet_dataset.save(dummy_dataframe) + reloaded_df = versioned_parquet_dataset.load() + assert_frame_equal(reloaded_df, dummy_dataframe) + + def test_no_versions(self, versioned_parquet_dataset): + """Check the error if no versions are available for load.""" + pattern = r"Did not find any versions for ParquetDataset\(.+\)" + with pytest.raises(DatasetError, match=pattern): + versioned_parquet_dataset.load() + + def test_exists(self, versioned_parquet_dataset, dummy_dataframe): + """Test `exists` method invocation for versioned data set.""" + assert not versioned_parquet_dataset.exists() + versioned_parquet_dataset.save(dummy_dataframe) + assert versioned_parquet_dataset.exists() + + def test_prevent_override(self, versioned_parquet_dataset, dummy_dataframe): + """Check the error when attempt to override the same data set + version.""" + versioned_parquet_dataset.save(dummy_dataframe) + pattern = ( + r"Save path \'.+\' for ParquetDataset\(.+\) must not " + r"exist if versioning is enabled" + ) + with pytest.raises(DatasetError, match=pattern): + versioned_parquet_dataset.save(dummy_dataframe) + + @pytest.mark.parametrize( + "load_version", ["2019-01-01T23.59.59.999Z"], indirect=True + ) + @pytest.mark.parametrize( + "save_version", ["2019-01-02T00.00.00.000Z"], indirect=True + ) + def test_save_version_warning( + self, versioned_parquet_dataset, load_version, save_version, dummy_dataframe + ): + """Check the warning when saving to the path that differs from + the subsequent load path.""" + pattern = ( + rf"Save version '{save_version}' did not match load version " + rf"'{load_version}' for ParquetDataset\(.+\)" + ) + with pytest.warns(UserWarning, match=pattern): + versioned_parquet_dataset.save(dummy_dataframe) + + def test_http_filesystem_no_versioning(self): + pattern = "Versioning is not supported for HTTP protocols." + + with pytest.raises(DatasetError, match=pattern): + ParquetDataset( + filepath="https://example/file.parquet", version=Version(None, None) + ) + + def test_versioning_existing_dataset( + self, parquet_dataset, versioned_parquet_dataset, dummy_dataframe + ): + """Check the error when attempting to save a versioned dataset on top of an + already existing (non-versioned) dataset.""" + parquet_dataset.save(dummy_dataframe) + assert parquet_dataset.exists() + assert parquet_dataset._filepath == versioned_parquet_dataset._filepath + pattern = ( + f"(?=.*file with the same name already exists in the directory)" + f"(?=.*{versioned_parquet_dataset._filepath.parent.as_posix()})" + ) + with pytest.raises(DatasetError, match=pattern): + versioned_parquet_dataset.save(dummy_dataframe) + + # Remove non-versioned dataset and try again + Path(parquet_dataset._filepath.as_posix()).unlink() + versioned_parquet_dataset.save(dummy_dataframe) + assert versioned_parquet_dataset.exists() From 602674243eaa42f9c76415d3a09d4281002f78ab Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Wed, 21 Aug 2024 15:46:55 +0200 Subject: [PATCH 03/24] Add release notes Signed-off-by: Harm Matthias Harms --- kedro-datasets/RELEASE.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index a90176166..8cf006e61 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,6 +1,11 @@ # Upcoming Release ## Major features and improvements +- Added the following new datasets: + | Type | Description | Location | + | ---- | ----------- | -------- | + | `geopandas.ParquetDataset` | A dataset for loading and saving geopandas dataframe. | `kedro_datasets.geopandas` | + ## Bug fixes and other changes ## Breaking Changes ## Community contributions From d54c181476db78ecedfbcccd7e3016c921524447 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Wed, 21 Aug 2024 15:56:13 +0200 Subject: [PATCH 04/24] Add parquet dataset to docs Signed-off-by: Harm Matthias Harms --- kedro-datasets/docs/source/api/kedro_datasets.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/kedro-datasets/docs/source/api/kedro_datasets.rst b/kedro-datasets/docs/source/api/kedro_datasets.rst index 4a7868d38..c48d11871 100644 --- a/kedro-datasets/docs/source/api/kedro_datasets.rst +++ b/kedro-datasets/docs/source/api/kedro_datasets.rst @@ -18,6 +18,7 @@ kedro_datasets databricks.ManagedTableDataset email.EmailMessageDataset geopandas.GeoJSONDataset + geopandas.ParquetDataset holoviews.HoloviewsWriter huggingface.HFDataset huggingface.HFTransformerPipelineDataset From 3fc3af424a95b3a50b9dd86c93462e784cf61b43 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Wed, 21 Aug 2024 16:12:50 +0200 Subject: [PATCH 05/24] Fix typo in tests Signed-off-by: Harm Matthias Harms --- kedro-datasets/tests/geopandas/test_parquet_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/tests/geopandas/test_parquet_dataset.py b/kedro-datasets/tests/geopandas/test_parquet_dataset.py index f0e305cc8..50a118f9e 100644 --- a/kedro-datasets/tests/geopandas/test_parquet_dataset.py +++ b/kedro-datasets/tests/geopandas/test_parquet_dataset.py @@ -72,7 +72,7 @@ def test_save_and_load(self, parquet_dataset, dummy_dataframe): @pytest.mark.parametrize("parquet_dataset", [{"index": False}], indirect=True) def test_load_missing_file(self, parquet_dataset): """Check the error while trying to load from missing source.""" - pattern = r"Failed while loading data from data set ParquetDataSet" + pattern = r"Failed while loading data from data set ParquetDataset" with pytest.raises(DatasetError, match=pattern): parquet_dataset.load() From 94d02e70b0c2eb007c2c518926b3ecc99abdf286 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Wed, 21 Aug 2024 16:33:12 +0200 Subject: [PATCH 06/24] Fix pylint type Signed-off-by: Harm Matthias Harms --- kedro-datasets/kedro_datasets/geopandas/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kedro-datasets/kedro_datasets/geopandas/__init__.py b/kedro-datasets/kedro_datasets/geopandas/__init__.py index a257e4121..a411c1e39 100644 --- a/kedro-datasets/kedro_datasets/geopandas/__init__.py +++ b/kedro-datasets/kedro_datasets/geopandas/__init__.py @@ -6,6 +6,7 @@ # https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901 GeoJSONDataset: Any +ParquetDataset: Any __getattr__, __dir__, __all__ = lazy.attach( __name__, From fe5c31dabb9f79bbaa2f3669748003c38b4b0434 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Thu, 22 Aug 2024 12:01:03 +0200 Subject: [PATCH 07/24] Discard changes to kedro-datasets/docs/source/api/kedro_datasets.rst Signed-off-by: Harm Matthias Harms --- kedro-datasets/docs/source/api/kedro_datasets.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/kedro-datasets/docs/source/api/kedro_datasets.rst b/kedro-datasets/docs/source/api/kedro_datasets.rst index c48d11871..4a7868d38 100644 --- a/kedro-datasets/docs/source/api/kedro_datasets.rst +++ b/kedro-datasets/docs/source/api/kedro_datasets.rst @@ -18,7 +18,6 @@ kedro_datasets databricks.ManagedTableDataset email.EmailMessageDataset geopandas.GeoJSONDataset - geopandas.ParquetDataset holoviews.HoloviewsWriter huggingface.HFDataset huggingface.HFTransformerPipelineDataset From 032f25950624f34b7bc306c7f82fa75233406261 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Thu, 22 Aug 2024 12:01:08 +0200 Subject: [PATCH 08/24] Discard changes to kedro-datasets/kedro_datasets/geopandas/__init__.py Signed-off-by: Harm Matthias Harms --- kedro-datasets/kedro_datasets/geopandas/__init__.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/kedro-datasets/kedro_datasets/geopandas/__init__.py b/kedro-datasets/kedro_datasets/geopandas/__init__.py index a411c1e39..d4843aa68 100644 --- a/kedro-datasets/kedro_datasets/geopandas/__init__.py +++ b/kedro-datasets/kedro_datasets/geopandas/__init__.py @@ -6,12 +6,7 @@ # https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901 GeoJSONDataset: Any -ParquetDataset: Any __getattr__, __dir__, __all__ = lazy.attach( - __name__, - submod_attrs={ - "geojson_dataset": ["GeoJSONDataset"], - "parquet_dataset": ["ParquetDataset"], - }, + __name__, submod_attrs={"geojson_dataset": ["GeoJSONDataset"]} ) From 15fa4ba1117fcf63d3672a7fa90537fd57cfdc16 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Thu, 22 Aug 2024 12:03:29 +0200 Subject: [PATCH 09/24] Extend geojson dataset to support more file types Signed-off-by: Harm Matthias Harms --- kedro-datasets/RELEASE.md | 5 +- .../geopandas/geojson_dataset.py | 56 ++++- .../geopandas/parquet_dataset.py | 162 ------------- .../tests/geopandas/test_geojson_dataset.py | 96 +++++++- .../tests/geopandas/test_parquet_dataset.py | 229 ------------------ 5 files changed, 140 insertions(+), 408 deletions(-) delete mode 100644 kedro-datasets/kedro_datasets/geopandas/parquet_dataset.py delete mode 100644 kedro-datasets/tests/geopandas/test_parquet_dataset.py diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 8cf006e61..6e325c3fe 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,10 +1,7 @@ # Upcoming Release ## Major features and improvements -- Added the following new datasets: - | Type | Description | Location | - | ---- | ----------- | -------- | - | `geopandas.ParquetDataset` | A dataset for loading and saving geopandas dataframe. | `kedro_datasets.geopandas` | +- Add `file_format` to `geopandas.GeoJSONDataSet` to support parquet and feather file formats. ## Bug fixes and other changes ## Breaking Changes diff --git a/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py b/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py index 42be606e3..00c037dd1 100644 --- a/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py +++ b/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py @@ -2,6 +2,7 @@ underlying functionality is supported by geopandas, so it supports all allowed geopandas (pandas) options for loading and saving geosjon files. """ + from __future__ import annotations import copy @@ -18,6 +19,8 @@ get_protocol_and_path, ) +NON_FILE_SYSTEM_TARGETS = ["postgis"] + class GeoJSONDataset( AbstractVersionedDataset[ @@ -41,7 +44,7 @@ class GeoJSONDataset( ... {"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}, ... geometry=[Point(1, 1), Point(2, 4)], ... ) - >>> dataset = GeoJSONDataset(filepath=tmp_path / "test.geojson", save_args=None) + >>> dataset = GeoJSONDataset(filepath=tmp_path / "test.geojson") >>> dataset.save(data) >>> reloaded = dataset.load() >>> @@ -50,12 +53,13 @@ class GeoJSONDataset( """ DEFAULT_LOAD_ARGS: dict[str, Any] = {} - DEFAULT_SAVE_ARGS = {"driver": "GeoJSON"} + DEFAULT_SAVE_ARGS: dict[str, Any] = {} def __init__( # noqa: PLR0913 self, *, filepath: str, + file_format: str = "file", load_args: dict[str, Any] | None = None, save_args: dict[str, Any] | None = None, version: Version | None = None, @@ -72,6 +76,11 @@ def __init__( # noqa: PLR0913 `s3://`. If prefix is not provided `file` protocol (local filesystem) will be used. The prefix should be any protocol supported by ``fsspec``. Note: `http(s)` doesn't support versioning. + file_format: String which is used to match the appropriate load/save method on a best + effort basis. For example if 'parquet' is passed in the `geopandas.read_parquet` and + `geopandas.DataFrame.to_parquet` will be identified. An error will be raised unless + at least one matching `read_{file_format}` or `to_{file_format}` method is + identified. Defaults to 'file'. load_args: GeoPandas options for loading GeoJSON files. Here you can find all available arguments: https://geopandas.org/en/stable/docs/reference/api/geopandas.read_file.html @@ -94,6 +103,9 @@ def __init__( # noqa: PLR0913 metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ + + self._file_format = file_format.lower() + _fs_args = copy.deepcopy(fs_args) or {} _fs_open_args_load = _fs_args.pop("open_args_load", {}) _fs_open_args_save = _fs_args.pop("open_args_save", {}) @@ -126,16 +138,45 @@ def __init__( # noqa: PLR0913 self._fs_open_args_load = _fs_open_args_load self._fs_open_args_save = _fs_open_args_save + def _ensure_file_system_target(self) -> None: + # Fail fast if provided a known non-filesystem target + if self._file_format in NON_FILE_SYSTEM_TARGETS: + raise DatasetError( + f"Cannot create a dataset of file_format '{self._file_format}' as it " + f"does not support a filepath target/source." + ) + def _load(self) -> gpd.GeoDataFrame | dict[str, gpd.GeoDataFrame]: + self._ensure_file_system_target() + load_path = get_filepath_str(self._get_load_path(), self._protocol) - with self._fs.open(load_path, **self._fs_open_args_load) as fs_file: - return gpd.read_file(fs_file, **self._load_args) + load_method = getattr(gpd, f"read_{self._file_format}", None) + if load_method: + with self._fs.open(load_path, **self._fs_open_args_load) as fs_file: + return load_method(fs_file, **self._load_args) + raise DatasetError( + f"Unable to retrieve 'geopandas.read_{self._file_format}' method, please ensure that your " + "'file_format' parameter has been defined correctly as per the GeoPandas API " + "https://geopandas.org/en/stable/docs/reference/io.html" + ) def _save(self, data: gpd.GeoDataFrame) -> None: + self._ensure_file_system_target() + save_path = get_filepath_str(self._get_save_path(), self._protocol) - with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: - data.to_file(fs_file, **self._save_args) - self.invalidate_cache() + save_method = getattr(data, f"to_{self._file_format}", None) + if save_method: + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + # KEY ASSUMPTION - first argument is path/buffer/io + save_method(fs_file, **self._save_args) + self.invalidate_cache() + else: + raise DatasetError( + f"Unable to retrieve 'geopandas.DataFrame.to_{self._file_format}' method, please " + "ensure that your 'file_format' parameter has been defined correctly as " + "per the GeoPandas API " + "https://geopandas.org/en/stable/docs/reference/io.html" + ) def _exists(self) -> bool: try: @@ -147,6 +188,7 @@ def _exists(self) -> bool: def _describe(self) -> dict[str, Any]: return { "filepath": self._filepath, + "file_format": self._file_format, "protocol": self._protocol, "load_args": self._load_args, "save_args": self._save_args, diff --git a/kedro-datasets/kedro_datasets/geopandas/parquet_dataset.py b/kedro-datasets/kedro_datasets/geopandas/parquet_dataset.py deleted file mode 100644 index 831bc4b3b..000000000 --- a/kedro-datasets/kedro_datasets/geopandas/parquet_dataset.py +++ /dev/null @@ -1,162 +0,0 @@ -"""ParquetDataset loads and saves data to a local parquet file. The -underlying functionality is supported by geopandas, so it supports all -allowed geopandas (pandas) options for loading and saving geosjon files. -""" - -from __future__ import annotations - -import copy -from pathlib import PurePosixPath -from typing import Any, Union - -import fsspec -import geopandas as gpd -from kedro.io.core import ( - AbstractVersionedDataset, - DatasetError, - Version, - get_filepath_str, - get_protocol_and_path, -) - - -class ParquetDataset( - AbstractVersionedDataset[ - gpd.GeoDataFrame, Union[gpd.GeoDataFrame, dict[str, gpd.GeoDataFrame]] - ] -): - """``ParquetDataset`` loads/saves data to a parquet file using an underlying filesystem - (eg: local, S3, GCS). - The underlying functionality is supported by geopandas, so it supports all - allowed geopandas (pandas) options for loading and saving parquet files. - - Example: - - .. code-block:: pycon - - >>> import geopandas as gpd - >>> from kedro_datasets.geopandas import ParquetDataset - >>> from shapely.geometry import Point - >>> - >>> data = gpd.GeoDataFrame( - ... {"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}, - ... geometry=[Point(1, 1), Point(2, 4)], - ... ) - >>> dataset = ParquetDataset(filepath=tmp_path / "test.parquet", save_args=None) - >>> dataset.save(data) - >>> reloaded = dataset.load() - >>> - >>> assert data.equals(reloaded) - - """ - - DEFAULT_LOAD_ARGS: dict[str, Any] = {} - DEFAULT_SAVE_ARGS: dict[str, Any] = {} - - def __init__( # noqa: PLR0913 - self, - *, - filepath: str, - load_args: dict[str, Any] | None = None, - save_args: dict[str, Any] | None = None, - version: Version | None = None, - credentials: dict[str, Any] | None = None, - fs_args: dict[str, Any] | None = None, - metadata: dict[str, Any] | None = None, - ) -> None: - """Creates a new instance of ``ParquetDataset`` pointing to a concrete parquet file - on a specific filesystem fsspec. - - Args: - - filepath: Filepath in POSIX format to a parquet file prefixed with a protocol like - `s3://`. If prefix is not provided `file` protocol (local filesystem) will be used. - The prefix should be any protocol supported by ``fsspec``. - Note: `http(s)` doesn't support versioning. - load_args: GeoPandas options for loading parquet files. - Here you can find all available arguments: - https://geopandas.org/en/stable/docs/reference/api/geopandas.read_parquet.html - save_args: GeoPandas options for saving parquet files. - Here you can find all available arguments: - https://geopandas.org/en/stable/docs/reference/api/geopandas.GeoDataFrame.to_parquet.html - version: If specified, should be an instance of - ``kedro.io.core.Version``. If its ``load`` attribute is - None, the latest version will be loaded. If its ``save`` - credentials: credentials required to access the underlying filesystem. - Eg. for ``GCFileSystem`` it would look like `{'token': None}`. - fs_args: Extra arguments to pass into underlying filesystem class constructor - (e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as - to pass to the filesystem's `open` method through nested keys - `open_args_load` and `open_args_save`. - Here you can find all available arguments for `open`: - https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open - All defaults are preserved, except `mode`, which is set to `wb` when saving. - metadata: Any arbitrary metadata. - This is ignored by Kedro, but may be consumed by users or external plugins. - """ - _fs_args = copy.deepcopy(fs_args) or {} - _fs_open_args_load = _fs_args.pop("open_args_load", {}) - _fs_open_args_save = _fs_args.pop("open_args_save", {}) - _credentials = copy.deepcopy(credentials) or {} - protocol, path = get_protocol_and_path(filepath, version) - self._protocol = protocol - if protocol == "file": - _fs_args.setdefault("auto_mkdir", True) - - self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args) - - self.metadata = metadata - - super().__init__( - filepath=PurePosixPath(path), - version=version, - exists_function=self._fs.exists, - glob_function=self._fs.glob, - ) - - self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - - self._save_args = copy.deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "wb") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save - - def _load(self) -> gpd.GeoDataFrame | dict[str, gpd.GeoDataFrame]: - load_path = get_filepath_str(self._get_load_path(), self._protocol) - with self._fs.open(load_path, **self._fs_open_args_load) as fs_file: - return gpd.read_parquet(fs_file, **self._load_args) - - def _save(self, data: gpd.GeoDataFrame) -> None: - save_path = get_filepath_str(self._get_save_path(), self._protocol) - with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: - data.to_parquet(fs_file, **self._save_args) - self.invalidate_cache() - - def _exists(self) -> bool: - try: - load_path = get_filepath_str(self._get_load_path(), self._protocol) - except DatasetError: - return False - return self._fs.exists(load_path) - - def _describe(self) -> dict[str, Any]: - return { - "filepath": self._filepath, - "protocol": self._protocol, - "load_args": self._load_args, - "save_args": self._save_args, - "version": self._version, - } - - def _release(self) -> None: - self.invalidate_cache() - - def invalidate_cache(self) -> None: - """Invalidate underlying filesystem cache.""" - filepath = get_filepath_str(self._filepath, self._protocol) - self._fs.invalidate_cache(filepath) diff --git a/kedro-datasets/tests/geopandas/test_geojson_dataset.py b/kedro-datasets/tests/geopandas/test_geojson_dataset.py index d2779e5c2..bdcca1d62 100644 --- a/kedro-datasets/tests/geopandas/test_geojson_dataset.py +++ b/kedro-datasets/tests/geopandas/test_geojson_dataset.py @@ -24,16 +24,31 @@ def save_version(request): @pytest.fixture -def filepath(tmp_path): +def filepath_geojson(tmp_path): return (tmp_path / "test.geojson").as_posix() +@pytest.fixture +def filepath_parquet(tmp_path): + return (tmp_path / "test.parquet").as_posix() + + +@pytest.fixture +def filepath_feather(tmp_path): + return (tmp_path / "test.feather").as_posix() + + +@pytest.fixture +def filepath_postgis(tmp_path): + return (tmp_path / "test.sql").as_posix() + + @pytest.fixture(params=[None]) def load_args(request): return request.param -@pytest.fixture(params=[{"driver": "GeoJSON"}]) +@pytest.fixture(params=[None]) def save_args(request): return request.param @@ -47,16 +62,62 @@ def dummy_dataframe(): @pytest.fixture -def geojson_dataset(filepath, load_args, save_args, fs_args): +def geojson_dataset(filepath_geojson, load_args, save_args, fs_args): + return GeoJSONDataset( + filepath=filepath_geojson, + load_args=load_args, + save_args=save_args, + fs_args=fs_args, + ) + + +@pytest.fixture +def parquet_dataset(filepath_parquet, load_args, save_args, fs_args): + return GeoJSONDataset( + filepath=filepath_parquet, + file_format="parquet", + load_args=load_args, + save_args=save_args, + fs_args=fs_args, + ) + + +@pytest.fixture +def parquet_dataset_bad_config(filepath_parquet, load_args, save_args, fs_args): return GeoJSONDataset( - filepath=filepath, load_args=load_args, save_args=save_args, fs_args=fs_args + filepath=filepath_parquet, + load_args=load_args, + save_args=save_args, + fs_args=fs_args, ) @pytest.fixture -def versioned_geojson_dataset(filepath, load_version, save_version): +def feather_dataset(filepath_feather, load_args, save_args, fs_args): return GeoJSONDataset( - filepath=filepath, version=Version(load_version, save_version) + filepath=filepath_feather, + file_format="feather", + load_args=load_args, + save_args=save_args, + fs_args=fs_args, + ) + + +@pytest.fixture +def postgis_dataset(filepath_postgis, load_args, save_args, fs_args): + return GeoJSONDataset( + filepath=filepath_postgis, + file_format="postgis", + load_args=load_args, + save_args=save_args, + fs_args=fs_args, + ) + + +@pytest.fixture +def versioned_geojson_dataset(filepath_geojson, load_version, save_version): + return GeoJSONDataset( + filepath=filepath_geojson, version=Version(load_version, save_version) ) @@ -82,6 +143,29 @@ def test_exists(self, geojson_dataset, dummy_dataframe): geojson_dataset.save(dummy_dataframe) assert geojson_dataset.exists() + def test_load_parquet_dataset(self, parquet_dataset, dummy_dataframe): + parquet_dataset.save(dummy_dataframe) + reloaded_df = parquet_dataset.load() + assert_frame_equal(reloaded_df, dummy_dataframe) + + def test_load_feather_dataset(self, feather_dataset, dummy_dataframe): + feather_dataset.save(dummy_dataframe) + reloaded_df = feather_dataset.load() + assert_frame_equal(reloaded_df, dummy_dataframe) + + def test_bad_load( + self, parquet_dataset_bad_config, dummy_dataframe, filepath_parquet + ): + dummy_dataframe.to_parquet(filepath_parquet) + pattern = r"not recognized as a supported file format" + with pytest.raises(DatasetError, match=pattern): + parquet_dataset_bad_config.load() + + def test_none_file_system_target(self, postgis_dataset, dummy_dataframe): + pattern = "Cannot create a dataset of file_format 'postgis' as it does not support a filepath target/source." + with pytest.raises(DatasetError, match=pattern): + postgis_dataset.save(dummy_dataframe) + @pytest.mark.parametrize( "load_args", [{"crs": "init:4326"}, {"crs": "init:2154", "driver": "GeoJSON"}] ) diff --git a/kedro-datasets/tests/geopandas/test_parquet_dataset.py b/kedro-datasets/tests/geopandas/test_parquet_dataset.py deleted file mode 100644 index 50a118f9e..000000000 --- a/kedro-datasets/tests/geopandas/test_parquet_dataset.py +++ /dev/null @@ -1,229 +0,0 @@ -from pathlib import Path, PurePosixPath - -import geopandas as gpd -import pytest -from fsspec.implementations.http import HTTPFileSystem -from fsspec.implementations.local import LocalFileSystem -from gcsfs import GCSFileSystem -from kedro.io.core import PROTOCOL_DELIMITER, DatasetError, Version, generate_timestamp -from pandas.testing import assert_frame_equal -from s3fs import S3FileSystem -from shapely.geometry import Point - -from kedro_datasets.geopandas import ParquetDataset - - -@pytest.fixture(params=[None]) -def load_version(request): - return request.param - - -@pytest.fixture(params=[None]) -def save_version(request): - return request.param or generate_timestamp() - - -@pytest.fixture -def filepath(tmp_path): - return (tmp_path / "test.parquet").as_posix() - - -@pytest.fixture(params=[None]) -def load_args(request): - return request.param - - -@pytest.fixture(params=[None]) -def save_args(request): - return request.param - - -@pytest.fixture -def dummy_dataframe(): - return gpd.GeoDataFrame( - {"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}, - geometry=[Point(1, 1), Point(2, 2)], - ) - - -@pytest.fixture -def parquet_dataset(filepath, load_args, save_args, fs_args): - return ParquetDataset( - filepath=filepath, load_args=load_args, save_args=save_args, fs_args=fs_args - ) - - -@pytest.fixture -def versioned_parquet_dataset(filepath, load_version, save_version): - return ParquetDataset( - filepath=filepath, version=Version(load_version, save_version) - ) - - -class TestParquetDataset: - def test_save_and_load(self, parquet_dataset, dummy_dataframe): - """Test that saved and reloaded data matches the original one.""" - parquet_dataset.save(dummy_dataframe) - reloaded_df = parquet_dataset.load() - assert_frame_equal(reloaded_df, dummy_dataframe) - assert parquet_dataset._fs_open_args_load == {} - assert parquet_dataset._fs_open_args_save == {"mode": "wb"} - - @pytest.mark.parametrize("parquet_dataset", [{"index": False}], indirect=True) - def test_load_missing_file(self, parquet_dataset): - """Check the error while trying to load from missing source.""" - pattern = r"Failed while loading data from data set ParquetDataset" - with pytest.raises(DatasetError, match=pattern): - parquet_dataset.load() - - def test_exists(self, parquet_dataset, dummy_dataframe): - """Test `exists` method invocation for both cases.""" - assert not parquet_dataset.exists() - parquet_dataset.save(dummy_dataframe) - assert parquet_dataset.exists() - - @pytest.mark.parametrize("load_args", [{"crs": "init:4326"}, {"crs": "init:2154"}]) - def test_load_extra_params(self, parquet_dataset, load_args): - """Test overriding default save args""" - for k, v in load_args.items(): - assert parquet_dataset._load_args[k] == v - - @pytest.mark.parametrize( - "save_args", [{"driver": "ESRI Shapefile"}, {"driver": "GPKG"}] - ) - def test_save_extra_params(self, parquet_dataset, save_args): - """Test overriding default save args""" - for k, v in save_args.items(): - assert parquet_dataset._save_args[k] == v - - @pytest.mark.parametrize( - "fs_args", - [{"open_args_load": {"mode": "rb", "compression": "gzip"}}], - indirect=True, - ) - def test_open_extra_args(self, parquet_dataset, fs_args): - assert parquet_dataset._fs_open_args_load == fs_args["open_args_load"] - assert parquet_dataset._fs_open_args_save == {"mode": "wb"} - - @pytest.mark.parametrize( - "path,instance_type", - [ - ("s3://bucket/file.parquet", S3FileSystem), - ("/tmp/test.parquet", LocalFileSystem), - ("gcs://bucket/file.parquet", GCSFileSystem), - ("file:///tmp/file.parquet", LocalFileSystem), - ("https://example.com/file.parquet", HTTPFileSystem), - ], - ) - def test_protocol_usage(self, path, instance_type): - parquet_dataset = ParquetDataset(filepath=path) - assert isinstance(parquet_dataset._fs, instance_type) - - path = path.split(PROTOCOL_DELIMITER, 1)[-1] - - assert str(parquet_dataset._filepath) == path - assert isinstance(parquet_dataset._filepath, PurePosixPath) - - def test_catalog_release(self, mocker): - fs_mock = mocker.patch("fsspec.filesystem").return_value - filepath = "test.parquet" - parquet_dataset = ParquetDataset(filepath=filepath) - parquet_dataset.release() - fs_mock.invalidate_cache.assert_called_once_with(filepath) - - -class TestParquetDatasetVersioned: - def test_version_str_repr(self, load_version, save_version): - """Test that version is in string representation of the class instance - when applicable.""" - filepath = "test.parquet" - ds = ParquetDataset(filepath=filepath) - ds_versioned = ParquetDataset( - filepath=filepath, version=Version(load_version, save_version) - ) - assert filepath in str(ds) - assert "version" not in str(ds) - - assert filepath in str(ds_versioned) - ver_str = f"version=Version(load={load_version}, save='{save_version}')" - assert ver_str in str(ds_versioned) - assert "ParquetDataset" in str(ds_versioned) - assert "ParquetDataset" in str(ds) - assert "protocol" in str(ds_versioned) - assert "protocol" in str(ds) - - def test_save_and_load(self, versioned_parquet_dataset, dummy_dataframe): - """Test that saved and reloaded data matches the original one for - the versioned data set.""" - versioned_parquet_dataset.save(dummy_dataframe) - reloaded_df = versioned_parquet_dataset.load() - assert_frame_equal(reloaded_df, dummy_dataframe) - - def test_no_versions(self, versioned_parquet_dataset): - """Check the error if no versions are available for load.""" - pattern = r"Did not find any versions for ParquetDataset\(.+\)" - with pytest.raises(DatasetError, match=pattern): - versioned_parquet_dataset.load() - - def test_exists(self, versioned_parquet_dataset, dummy_dataframe): - """Test `exists` method invocation for versioned data set.""" - assert not versioned_parquet_dataset.exists() - versioned_parquet_dataset.save(dummy_dataframe) - assert versioned_parquet_dataset.exists() - - def test_prevent_override(self, versioned_parquet_dataset, dummy_dataframe): - """Check the error when attempt to override the same data set - version.""" - versioned_parquet_dataset.save(dummy_dataframe) - pattern = ( - r"Save path \'.+\' for ParquetDataset\(.+\) must not " - r"exist if versioning is enabled" - ) - with pytest.raises(DatasetError, match=pattern): - versioned_parquet_dataset.save(dummy_dataframe) - - @pytest.mark.parametrize( - "load_version", ["2019-01-01T23.59.59.999Z"], indirect=True - ) - @pytest.mark.parametrize( - "save_version", ["2019-01-02T00.00.00.000Z"], indirect=True - ) - def test_save_version_warning( - self, versioned_parquet_dataset, load_version, save_version, dummy_dataframe - ): - """Check the warning when saving to the path that differs from - the subsequent load path.""" - pattern = ( - rf"Save version '{save_version}' did not match load version " - rf"'{load_version}' for ParquetDataset\(.+\)" - ) - with pytest.warns(UserWarning, match=pattern): - versioned_parquet_dataset.save(dummy_dataframe) - - def test_http_filesystem_no_versioning(self): - pattern = "Versioning is not supported for HTTP protocols." - - with pytest.raises(DatasetError, match=pattern): - ParquetDataset( - filepath="https://example/file.parquet", version=Version(None, None) - ) - - def test_versioning_existing_dataset( - self, parquet_dataset, versioned_parquet_dataset, dummy_dataframe - ): - """Check the error when attempting to save a versioned dataset on top of an - already existing (non-versioned) dataset.""" - parquet_dataset.save(dummy_dataframe) - assert parquet_dataset.exists() - assert parquet_dataset._filepath == versioned_parquet_dataset._filepath - pattern = ( - f"(?=.*file with the same name already exists in the directory)" - f"(?=.*{versioned_parquet_dataset._filepath.parent.as_posix()})" - ) - with pytest.raises(DatasetError, match=pattern): - versioned_parquet_dataset.save(dummy_dataframe) - - # Remove non-versioned dataset and try again - Path(parquet_dataset._filepath.as_posix()).unlink() - versioned_parquet_dataset.save(dummy_dataframe) - assert versioned_parquet_dataset.exists() From 04fd6f8592cbb7ad294cc050d7b21d42e309398f Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Thu, 22 Aug 2024 12:09:12 +0200 Subject: [PATCH 10/24] Discard changes to kedro-datasets/tests/pandas/test_deltatable_dataset.py Signed-off-by: Harm Matthias Harms --- kedro-datasets/tests/pandas/test_deltatable_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/tests/pandas/test_deltatable_dataset.py b/kedro-datasets/tests/pandas/test_deltatable_dataset.py index 666e24e39..3b81ef421 100644 --- a/kedro-datasets/tests/pandas/test_deltatable_dataset.py +++ b/kedro-datasets/tests/pandas/test_deltatable_dataset.py @@ -48,7 +48,7 @@ def test_overwrite_with_diff_schema(self, deltatable_dataset_from_path, dummy_df """Test saving with the default overwrite mode with new data of diff schema.""" deltatable_dataset_from_path.save(dummy_df) new_df = pd.DataFrame({"new_col": [1, 2]}) - pattern = "Cannot cast schema, number of fields does not match: 1 vs 3" + pattern = "Schema of data does not match table schema" with pytest.raises(DatasetError, match=pattern): deltatable_dataset_from_path.save(new_df) @@ -167,4 +167,4 @@ def test_history(self, deltatable_dataset_from_path, dummy_df): deltatable_dataset_from_path.save(dummy_df) history = deltatable_dataset_from_path.history assert isinstance(history, list) - assert history[0]["operation"] == "WRITE" + assert history[0]["operation"] == "CREATE TABLE" From 04ff6a24df2e9a8a0c6085d1860db33f52362cb8 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Thu, 22 Aug 2024 12:10:07 +0200 Subject: [PATCH 11/24] Update RELEASE.md Signed-off-by: Harm Matthias Harms --- kedro-datasets/RELEASE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 6e325c3fe..20d9ce81e 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,7 +1,7 @@ # Upcoming Release ## Major features and improvements -- Add `file_format` to `geopandas.GeoJSONDataSet` to support parquet and feather file formats. +- Add `file_format` to `geopandas.GeoJSONDataset` to support parquet and feather file formats. ## Bug fixes and other changes ## Breaking Changes From a8dbb0cf4ad07b33de2db25bd9128a909a918ebf Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Thu, 22 Aug 2024 12:37:23 +0200 Subject: [PATCH 12/24] Add test for unsupported file format Signed-off-by: Harm Matthias Harms --- .../geopandas/geojson_dataset.py | 7 ++++- .../tests/geopandas/test_geojson_dataset.py | 26 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py b/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py index 00c037dd1..13352d9ee 100644 --- a/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py +++ b/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py @@ -19,7 +19,12 @@ get_protocol_and_path, ) -NON_FILE_SYSTEM_TARGETS = ["postgis"] +from kedro_datasets.pandas.generic_dataset import ( + NON_FILE_SYSTEM_TARGETS as PANDAS_NON_FILE_SYSTEM_TARGETS, +) + +# Unofficially GeoPandas also supports pandas io-methods, such as read_csv. +NON_FILE_SYSTEM_TARGETS = ["postgis"] + PANDAS_NON_FILE_SYSTEM_TARGETS class GeoJSONDataset( diff --git a/kedro-datasets/tests/geopandas/test_geojson_dataset.py b/kedro-datasets/tests/geopandas/test_geojson_dataset.py index bdcca1d62..63818d665 100644 --- a/kedro-datasets/tests/geopandas/test_geojson_dataset.py +++ b/kedro-datasets/tests/geopandas/test_geojson_dataset.py @@ -43,6 +43,11 @@ def filepath_postgis(tmp_path): return (tmp_path / "test.sql").as_posix() +@pytest.fixture +def filepath_abc(tmp_path): + return tmp_path / "test.abc" + + @pytest.fixture(params=[None]) def load_args(request): return request.param @@ -114,6 +119,17 @@ def postgis_dataset(filepath_postgis, load_args, save_args, fs_args): ) +@pytest.fixture +def abc_dataset(filepath_abc, load_args, save_args, fs_args): + return GeoJSONDataset( + filepath=filepath_abc, + file_format="abc", + load_args=load_args, + save_args=save_args, + fs_args=fs_args, + ) + + @pytest.fixture def versioned_geojson_dataset(filepath_geojson, load_version, save_version): return GeoJSONDataset( @@ -166,6 +182,16 @@ def test_none_file_system_target(self, postgis_dataset, dummy_dataframe): with pytest.raises(DatasetError, match=pattern): postgis_dataset.save(dummy_dataframe) + def test_unknown_file_format(self, abc_dataset, dummy_dataframe, filepath_abc): + pattern = "Unable to retrieve 'geopandas.DataFrame.to_abc' method" + with pytest.raises(DatasetError, match=pattern): + abc_dataset.save(dummy_dataframe) + + filepath_abc.write_bytes(b"") + pattern = "Unable to retrieve 'geopandas.read_abc' method" + with pytest.raises(DatasetError, match=pattern): + abc_dataset.load() + @pytest.mark.parametrize( "load_args", [{"crs": "init:4326"}, {"crs": "init:2154", "driver": "GeoJSON"}] ) From 710637661c0786a640335fa547bf76877997235e Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Wed, 28 Aug 2024 10:29:55 +0200 Subject: [PATCH 13/24] Cleanup GeoJSONDataset Signed-off-by: Harm Matthias Harms --- kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py b/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py index 13352d9ee..c0be06324 100644 --- a/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py +++ b/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py @@ -19,12 +19,8 @@ get_protocol_and_path, ) -from kedro_datasets.pandas.generic_dataset import ( - NON_FILE_SYSTEM_TARGETS as PANDAS_NON_FILE_SYSTEM_TARGETS, -) -# Unofficially GeoPandas also supports pandas io-methods, such as read_csv. -NON_FILE_SYSTEM_TARGETS = ["postgis"] + PANDAS_NON_FILE_SYSTEM_TARGETS +NON_FILE_SYSTEM_TARGETS = ["postgis"] class GeoJSONDataset( From e4d69e9f49535a5361ab98a401318808fa1c0cff Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Wed, 28 Aug 2024 10:36:51 +0200 Subject: [PATCH 14/24] Fix lint Signed-off-by: Harm Matthias Harms --- kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py b/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py index c0be06324..00c037dd1 100644 --- a/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py +++ b/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py @@ -19,7 +19,6 @@ get_protocol_and_path, ) - NON_FILE_SYSTEM_TARGETS = ["postgis"] From 1335db175a35187093808385b138ef74cee6933e Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Thu, 5 Sep 2024 13:39:28 +0200 Subject: [PATCH 15/24] Replace GeoJSONDataset by GenericDataset Signed-off-by: Harm Matthias Harms --- .../docs/source/api/kedro_datasets.rst | 2 +- .../kedro_datasets/geopandas/README.md | 31 -------------- .../kedro_datasets/geopandas/__init__.py | 6 +-- ...{geojson_dataset.py => generic_dataset.py} | 21 +++++----- kedro-datasets/pyproject.toml | 9 ++-- ...son_dataset.py => test_generic_dataset.py} | 42 +++++++++---------- 6 files changed, 38 insertions(+), 73 deletions(-) delete mode 100644 kedro-datasets/kedro_datasets/geopandas/README.md rename kedro-datasets/kedro_datasets/geopandas/{geojson_dataset.py => generic_dataset.py} (90%) rename kedro-datasets/tests/geopandas/{test_geojson_dataset.py => test_generic_dataset.py} (92%) diff --git a/kedro-datasets/docs/source/api/kedro_datasets.rst b/kedro-datasets/docs/source/api/kedro_datasets.rst index 669378b7b..45b275de5 100644 --- a/kedro-datasets/docs/source/api/kedro_datasets.rst +++ b/kedro-datasets/docs/source/api/kedro_datasets.rst @@ -17,7 +17,7 @@ kedro_datasets dask.ParquetDataset databricks.ManagedTableDataset email.EmailMessageDataset - geopandas.GeoJSONDataset + geopandas.GenericDataset holoviews.HoloviewsWriter huggingface.HFDataset huggingface.HFTransformerPipelineDataset diff --git a/kedro-datasets/kedro_datasets/geopandas/README.md b/kedro-datasets/kedro_datasets/geopandas/README.md deleted file mode 100644 index a7926a706..000000000 --- a/kedro-datasets/kedro_datasets/geopandas/README.md +++ /dev/null @@ -1,31 +0,0 @@ -# GeoJSON - -``GeoJSONDataset`` loads and saves data to a local yaml file using ``geopandas``. -See [geopandas.GeoDataFrame](http://geopandas.org/reference/geopandas.GeoDataFrame.html) for details. - -#### Example use: - -```python -import geopandas as gpd -from shapely.geometry import Point -from kedro_datasets.geopandas import GeoJSONDataset - -data = gpd.GeoDataFrame( - {"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}, - geometry=[Point(1, 1), Point(2, 4)], -) -dataset = GeoJSONDataset(filepath="test.geojson") -dataset.save(data) -reloaded = dataset.load() -assert data.equals(reloaded) -``` - -#### Example catalog.yml: - -```yaml -example_geojson_data: - type: geopandas.GeoJSONDataset - filepath: data/08_reporting/test.geojson -``` - -Contributed by (Luis Blanche)[https://github.com/lblanche]. diff --git a/kedro-datasets/kedro_datasets/geopandas/__init__.py b/kedro-datasets/kedro_datasets/geopandas/__init__.py index d4843aa68..1ae7701bd 100644 --- a/kedro-datasets/kedro_datasets/geopandas/__init__.py +++ b/kedro-datasets/kedro_datasets/geopandas/__init__.py @@ -1,12 +1,12 @@ -"""``GeoJSONDataset`` is an ``AbstractVersionedDataset`` to save and load GeoJSON files.""" +"""``AbstractDataset`` implementations that produce geopandas GeoDataFrames.""" from typing import Any import lazy_loader as lazy # https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901 -GeoJSONDataset: Any +GenericDataset: Any __getattr__, __dir__, __all__ = lazy.attach( - __name__, submod_attrs={"geojson_dataset": ["GeoJSONDataset"]} + __name__, submod_attrs={"generic_dataset": ["GenericDataset"]} ) diff --git a/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py similarity index 90% rename from kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py rename to kedro-datasets/kedro_datasets/geopandas/generic_dataset.py index 00c037dd1..a8b43cf48 100644 --- a/kedro-datasets/kedro_datasets/geopandas/geojson_dataset.py +++ b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py @@ -1,4 +1,4 @@ -"""GeoJSONDataset loads and saves data to a local geojson file. The +"""GenericDataset loads and saves data to a local file. The underlying functionality is supported by geopandas, so it supports all allowed geopandas (pandas) options for loading and saving geosjon files. """ @@ -22,29 +22,29 @@ NON_FILE_SYSTEM_TARGETS = ["postgis"] -class GeoJSONDataset( +class GenericDataset( AbstractVersionedDataset[ gpd.GeoDataFrame, Union[gpd.GeoDataFrame, dict[str, gpd.GeoDataFrame]] ] ): - """``GeoJSONDataset`` loads/saves data to a GeoJSON file using an underlying filesystem + """``GenericDataset`` loads/saves data to a file using an underlying filesystem (eg: local, S3, GCS). The underlying functionality is supported by geopandas, so it supports all - allowed geopandas (pandas) options for loading and saving GeoJSON files. + allowed geopandas (pandas) options for loading and saving files. Example: .. code-block:: pycon >>> import geopandas as gpd - >>> from kedro_datasets.geopandas import GeoJSONDataset + >>> from kedro_datasets.geopandas import GenericDataset >>> from shapely.geometry import Point >>> >>> data = gpd.GeoDataFrame( ... {"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}, ... geometry=[Point(1, 1), Point(2, 4)], ... ) - >>> dataset = GeoJSONDataset(filepath=tmp_path / "test.geojson") + >>> dataset = GenericDataset(filepath=tmp_path / "test.geojson") >>> dataset.save(data) >>> reloaded = dataset.load() >>> @@ -67,12 +67,12 @@ def __init__( # noqa: PLR0913 fs_args: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, ) -> None: - """Creates a new instance of ``GeoJSONDataset`` pointing to a concrete GeoJSON file + """Creates a new instance of ``GenericDataset`` pointing to a concrete file on a specific filesystem fsspec. Args: - filepath: Filepath in POSIX format to a GeoJSON file prefixed with a protocol like + filepath: Filepath in POSIX format to a file prefixed with a protocol like `s3://`. If prefix is not provided `file` protocol (local filesystem) will be used. The prefix should be any protocol supported by ``fsspec``. Note: `http(s)` doesn't support versioning. @@ -81,13 +81,12 @@ def __init__( # noqa: PLR0913 `geopandas.DataFrame.to_parquet` will be identified. An error will be raised unless at least one matching `read_{file_format}` or `to_{file_format}` method is identified. Defaults to 'file'. - load_args: GeoPandas options for loading GeoJSON files. + load_args: GeoPandas options for loading files. Here you can find all available arguments: https://geopandas.org/en/stable/docs/reference/api/geopandas.read_file.html - save_args: GeoPandas options for saving geojson files. + save_args: GeoPandas options for saving files. Here you can find all available arguments: https://geopandas.org/en/stable/docs/reference/api/geopandas.GeoDataFrame.to_file.html - The default_save_arg driver is 'GeoJSON', all others preserved. version: If specified, should be an instance of ``kedro.io.core.Version``. If its ``load`` attribute is None, the latest version will be loaded. If its ``save`` diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index 32a99155d..a8b77d406 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -25,7 +25,6 @@ polars-base = ["polars>=0.18.0",] plotly-base = ["plotly>=4.8.0, <6.0"] delta-base = ["delta-spark>=1.0, <4.0",] networkx-base = ["networkx~=2.4"] -geopandas-base = ["geopandas>=0.8.0, <1.0"] # Individual Datasets api-apidataset = ["requests~=2.20"] @@ -40,9 +39,8 @@ dask = ["kedro-datasets[dask-parquetdataset]"] databricks-managedtabledataset = ["kedro-datasets[spark-base,pandas-base,delta-base,hdfs-base,s3fs-base]"] databricks = ["kedro-datasets[databricks-managedtabledataset]"] -geopandas-geojsondataset = ["kedro-datasets[geopandas-base]", "pyproj~=3.0"] -geopandas-parquetdataset = ["kedro-datasets[geopandas-base]"] -geopandas = ["kedro-datasets[geopandas-geojsondataset,geopandas-parquetdataset]"] +geopandas-genericdataset = ["geopandas>=0.8.0, <2.0"] +geopandas = ["kedro-datasets[geopandas-genericdataset]"] holoviews-holoviewswriter = ["holoviews~=1.13.0"] holoviews = ["kedro-datasets[holoviews-holoviewswriter]"] @@ -205,7 +203,6 @@ test = [ "dill~=0.3.1", "filelock>=3.4.0, <4.0", "gcsfs>=2023.1, <2023.3", - "geopandas>=0.8.0, <1.0", "hdfs>=2.5.8, <3.0", "holoviews>=1.13.0", "ibis-framework[duckdb,examples]", @@ -233,7 +230,6 @@ test = [ "pyarrow>=1.0; python_version < '3.11'", "pyarrow>=7.0; python_version >= '3.11'", # Adding to avoid numpy build errors "pyodbc~=5.0", - "pyproj~=3.0", "pyspark>=3.0; python_version < '3.11'", "pyspark>=3.4; python_version >= '3.11'", "pytest-cov~=3.0", @@ -270,6 +266,7 @@ test = [ "types-decorator", "types-six", "types-tabulate", + "geopandas>=0.8.0, <2.0", ] # Experimental dataset requirements diff --git a/kedro-datasets/tests/geopandas/test_geojson_dataset.py b/kedro-datasets/tests/geopandas/test_generic_dataset.py similarity index 92% rename from kedro-datasets/tests/geopandas/test_geojson_dataset.py rename to kedro-datasets/tests/geopandas/test_generic_dataset.py index 63818d665..722cc87cd 100644 --- a/kedro-datasets/tests/geopandas/test_geojson_dataset.py +++ b/kedro-datasets/tests/geopandas/test_generic_dataset.py @@ -10,7 +10,7 @@ from s3fs import S3FileSystem from shapely.geometry import Point -from kedro_datasets.geopandas import GeoJSONDataset +from kedro_datasets.geopandas import GenericDataset @pytest.fixture(params=[None]) @@ -68,7 +68,7 @@ def dummy_dataframe(): @pytest.fixture def geojson_dataset(filepath_geojson, load_args, save_args, fs_args): - return GeoJSONDataset( + return GenericDataset( filepath=filepath_geojson, load_args=load_args, save_args=save_args, @@ -78,7 +78,7 @@ def geojson_dataset(filepath_geojson, load_args, save_args, fs_args): @pytest.fixture def parquet_dataset(filepath_parquet, load_args, save_args, fs_args): - return GeoJSONDataset( + return GenericDataset( filepath=filepath_parquet, file_format="parquet", load_args=load_args, @@ -89,7 +89,7 @@ def parquet_dataset(filepath_parquet, load_args, save_args, fs_args): @pytest.fixture def parquet_dataset_bad_config(filepath_parquet, load_args, save_args, fs_args): - return GeoJSONDataset( + return GenericDataset( filepath=filepath_parquet, load_args=load_args, save_args=save_args, @@ -99,7 +99,7 @@ def parquet_dataset_bad_config(filepath_parquet, load_args, save_args, fs_args): @pytest.fixture def feather_dataset(filepath_feather, load_args, save_args, fs_args): - return GeoJSONDataset( + return GenericDataset( filepath=filepath_feather, file_format="feather", load_args=load_args, @@ -110,7 +110,7 @@ def feather_dataset(filepath_feather, load_args, save_args, fs_args): @pytest.fixture def postgis_dataset(filepath_postgis, load_args, save_args, fs_args): - return GeoJSONDataset( + return GenericDataset( filepath=filepath_postgis, file_format="postgis", load_args=load_args, @@ -121,7 +121,7 @@ def postgis_dataset(filepath_postgis, load_args, save_args, fs_args): @pytest.fixture def abc_dataset(filepath_abc, load_args, save_args, fs_args): - return GeoJSONDataset( + return GenericDataset( filepath=filepath_abc, file_format="abc", load_args=load_args, @@ -132,12 +132,12 @@ def abc_dataset(filepath_abc, load_args, save_args, fs_args): @pytest.fixture def versioned_geojson_dataset(filepath_geojson, load_version, save_version): - return GeoJSONDataset( + return GenericDataset( filepath=filepath_geojson, version=Version(load_version, save_version) ) -class TestGeoJSONDataset: +class TestGenericDataset: def test_save_and_load(self, geojson_dataset, dummy_dataframe): """Test that saved and reloaded data matches the original one.""" geojson_dataset.save(dummy_dataframe) @@ -149,7 +149,7 @@ def test_save_and_load(self, geojson_dataset, dummy_dataframe): @pytest.mark.parametrize("geojson_dataset", [{"index": False}], indirect=True) def test_load_missing_file(self, geojson_dataset): """Check the error while trying to load from missing source.""" - pattern = r"Failed while loading data from data set GeoJSONDataset" + pattern = r"Failed while loading data from data set GenericDataset" with pytest.raises(DatasetError, match=pattern): geojson_dataset.load() @@ -228,7 +228,7 @@ def test_open_extra_args(self, geojson_dataset, fs_args): ], ) def test_protocol_usage(self, path, instance_type): - geojson_dataset = GeoJSONDataset(filepath=path) + geojson_dataset = GenericDataset(filepath=path) assert isinstance(geojson_dataset._fs, instance_type) path = path.split(PROTOCOL_DELIMITER, 1)[-1] @@ -239,18 +239,18 @@ def test_protocol_usage(self, path, instance_type): def test_catalog_release(self, mocker): fs_mock = mocker.patch("fsspec.filesystem").return_value filepath = "test.geojson" - geojson_dataset = GeoJSONDataset(filepath=filepath) + geojson_dataset = GenericDataset(filepath=filepath) geojson_dataset.release() fs_mock.invalidate_cache.assert_called_once_with(filepath) -class TestGeoJSONDatasetVersioned: +class TestGenericDatasetVersioned: def test_version_str_repr(self, load_version, save_version): """Test that version is in string representation of the class instance when applicable.""" filepath = "test.geojson" - ds = GeoJSONDataset(filepath=filepath) - ds_versioned = GeoJSONDataset( + ds = GenericDataset(filepath=filepath) + ds_versioned = GenericDataset( filepath=filepath, version=Version(load_version, save_version) ) assert filepath in str(ds) @@ -259,8 +259,8 @@ def test_version_str_repr(self, load_version, save_version): assert filepath in str(ds_versioned) ver_str = f"version=Version(load={load_version}, save='{save_version}')" assert ver_str in str(ds_versioned) - assert "GeoJSONDataset" in str(ds_versioned) - assert "GeoJSONDataset" in str(ds) + assert "GenericDataset" in str(ds_versioned) + assert "GenericDataset" in str(ds) assert "protocol" in str(ds_versioned) assert "protocol" in str(ds) @@ -273,7 +273,7 @@ def test_save_and_load(self, versioned_geojson_dataset, dummy_dataframe): def test_no_versions(self, versioned_geojson_dataset): """Check the error if no versions are available for load.""" - pattern = r"Did not find any versions for GeoJSONDataset\(.+\)" + pattern = r"Did not find any versions for GenericDataset\(.+\)" with pytest.raises(DatasetError, match=pattern): versioned_geojson_dataset.load() @@ -288,7 +288,7 @@ def test_prevent_override(self, versioned_geojson_dataset, dummy_dataframe): version.""" versioned_geojson_dataset.save(dummy_dataframe) pattern = ( - r"Save path \'.+\' for GeoJSONDataset\(.+\) must not " + r"Save path \'.+\' for GenericDataset\(.+\) must not " r"exist if versioning is enabled" ) with pytest.raises(DatasetError, match=pattern): @@ -307,7 +307,7 @@ def test_save_version_warning( the subsequent load path.""" pattern = ( rf"Save version '{save_version}' did not match load version " - rf"'{load_version}' for GeoJSONDataset\(.+\)" + rf"'{load_version}' for GenericDataset\(.+\)" ) with pytest.warns(UserWarning, match=pattern): versioned_geojson_dataset.save(dummy_dataframe) @@ -316,7 +316,7 @@ def test_http_filesystem_no_versioning(self): pattern = "Versioning is not supported for HTTP protocols." with pytest.raises(DatasetError, match=pattern): - GeoJSONDataset( + GenericDataset( filepath="https://example/file.geojson", version=Version(None, None) ) From b74384552cf55caffa56930997a59bb6dd88c7e0 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Thu, 5 Sep 2024 13:40:55 +0200 Subject: [PATCH 16/24] Update pyproject.toml Signed-off-by: Harm Matthias Harms --- kedro-datasets/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index a8b77d406..11724d00d 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -203,6 +203,7 @@ test = [ "dill~=0.3.1", "filelock>=3.4.0, <4.0", "gcsfs>=2023.1, <2023.3", + "geopandas>=0.8.0, <2.0", "hdfs>=2.5.8, <3.0", "holoviews>=1.13.0", "ibis-framework[duckdb,examples]", @@ -266,7 +267,6 @@ test = [ "types-decorator", "types-six", "types-tabulate", - "geopandas>=0.8.0, <2.0", ] # Experimental dataset requirements From 1e540f4c5891d676bf19cf268c7c46d286a47a36 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Thu, 5 Sep 2024 13:50:06 +0200 Subject: [PATCH 17/24] Update RELEASE.md Signed-off-by: Harm Matthias Harms --- kedro-datasets/RELEASE.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 07d54ee3a..7bc3cdd5d 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -12,16 +12,17 @@ |----------------------|------------------------------------------------|-------------------------| | `plotly.HTMLDataset` | A dataset for saving a `plotly` figure as HTML | `kedro_datasets.plotly` | -- Add `file_format` to `geopandas.GeoJSONDataset` to support parquet and feather file formats. - ## Bug fixes and other changes * Refactored all datasets to set `fs_args` defaults in the same way as `load_args` and `save_args` and not have hardcoded values in the save methods. ## Breaking Changes +* Replaced the `geopandas.GeoJSONDataset` with `geopandas.GenericDataset` to support parquet and feather file formats. + ## Community contributions Many thanks to the following Kedroids for contributing PRs to this release: * [Brandon Meek](https://github.com/bpmeek) * [yury-fedotov](https://github.com/yury-fedotov) +* [harm-matthias-harms](https://github.com/harm-matthias-harms) # Release 4.1.0 From e84ac9389cc708600a881a5e9540561d8857d0e8 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Thu, 5 Sep 2024 14:21:50 +0200 Subject: [PATCH 18/24] Use new default fs args Signed-off-by: Harm Matthias Harms --- .../geopandas/generic_dataset.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py index a8b43cf48..65dcaf35b 100644 --- a/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py +++ b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py @@ -54,6 +54,7 @@ class GenericDataset( DEFAULT_LOAD_ARGS: dict[str, Any] = {} DEFAULT_SAVE_ARGS: dict[str, Any] = {} + DEFAULT_FS_ARGS: dict[str, Any] = {"open_args_save": {"mode": "wb"}} def __init__( # noqa: PLR0913 self, @@ -125,17 +126,17 @@ def __init__( # noqa: PLR0913 glob_function=self._fs.glob, ) - self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS) - if load_args is not None: - self._load_args.update(load_args) - - self._save_args = copy.deepcopy(self.DEFAULT_SAVE_ARGS) - if save_args is not None: - self._save_args.update(save_args) - - _fs_open_args_save.setdefault("mode", "wb") - self._fs_open_args_load = _fs_open_args_load - self._fs_open_args_save = _fs_open_args_save + # Handle default load and save and fs arguments + self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})} + self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})} + self._fs_open_args_load = { + **self.DEFAULT_FS_ARGS.get("open_args_load", {}), + **(_fs_open_args_load or {}), + } + self._fs_open_args_save = { + **self.DEFAULT_FS_ARGS.get("open_args_save", {}), + **(_fs_open_args_save or {}), + } def _ensure_file_system_target(self) -> None: # Fail fast if provided a known non-filesystem target From 0fd94c8d80c6e1577687df7dad05081e1b3f0ab2 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Wed, 11 Sep 2024 19:34:28 +0200 Subject: [PATCH 19/24] Fix pattern in test Signed-off-by: Harm Matthias Harms --- kedro-datasets/tests/geopandas/test_generic_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/tests/geopandas/test_generic_dataset.py b/kedro-datasets/tests/geopandas/test_generic_dataset.py index 722cc87cd..2f8658301 100644 --- a/kedro-datasets/tests/geopandas/test_generic_dataset.py +++ b/kedro-datasets/tests/geopandas/test_generic_dataset.py @@ -173,7 +173,7 @@ def test_bad_load( self, parquet_dataset_bad_config, dummy_dataframe, filepath_parquet ): dummy_dataframe.to_parquet(filepath_parquet) - pattern = r"not recognized as a supported file format" + pattern = r"Failed while loading data from data set GenericDataset(.*)" with pytest.raises(DatasetError, match=pattern): parquet_dataset_bad_config.load() From 6b2a2256ad0438fbd6e83ab2c53dfcf7c887de8d Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Fri, 13 Sep 2024 10:56:58 +0200 Subject: [PATCH 20/24] Use fiona for python < 3.11 Signed-off-by: Harm Matthias Harms --- kedro-datasets/kedro_datasets/geopandas/generic_dataset.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py index 65dcaf35b..c31d7f030 100644 --- a/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py +++ b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py @@ -6,6 +6,7 @@ from __future__ import annotations import copy +import sys from pathlib import PurePosixPath from typing import Any, Union @@ -21,6 +22,9 @@ NON_FILE_SYSTEM_TARGETS = ["postgis"] +if sys.version_info < (3, 11): + gpd.options.io_engine = "fiona" + class GenericDataset( AbstractVersionedDataset[ From db1b4465cdd9d953abf99dd1176e3d4e9fd8bbfd Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Fri, 13 Sep 2024 11:09:53 +0200 Subject: [PATCH 21/24] Install fiona dependency for python < 3.11 Signed-off-by: Harm Matthias Harms --- kedro-datasets/pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index ec517bf6f..76a5e828e 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -40,7 +40,7 @@ dask = ["kedro-datasets[dask-parquetdataset, dask-csvdataset]"] databricks-managedtabledataset = ["kedro-datasets[spark-base,pandas-base,delta-base,hdfs-base,s3fs-base]"] databricks = ["kedro-datasets[databricks-managedtabledataset]"] -geopandas-genericdataset = ["geopandas>=0.8.0, <2.0"] +geopandas-genericdataset = ["geopandas>=0.8.0, <2.0", "fiona~=1.10; python_version < '3.11'"] geopandas = ["kedro-datasets[geopandas-genericdataset]"] holoviews-holoviewswriter = ["holoviews~=1.13.0"] @@ -213,6 +213,7 @@ test = [ "deltalake>=0.10.0", "dill~=0.3.1", "filelock>=3.4.0, <4.0", + "fiona~=1.10; python_version < '3.11'", "gcsfs>=2023.1, <2023.3", "geopandas>=0.8.0, <2.0", "hdfs>=2.5.8, <3.0", From f1bda0e4e5ce05941df3f205587a77053d6242c2 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Fri, 13 Sep 2024 11:24:42 +0200 Subject: [PATCH 22/24] Revert fiona test Signed-off-by: Harm Matthias Harms --- kedro-datasets/kedro_datasets/geopandas/generic_dataset.py | 4 ---- kedro-datasets/pyproject.toml | 3 +-- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py index c31d7f030..65dcaf35b 100644 --- a/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py +++ b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py @@ -6,7 +6,6 @@ from __future__ import annotations import copy -import sys from pathlib import PurePosixPath from typing import Any, Union @@ -22,9 +21,6 @@ NON_FILE_SYSTEM_TARGETS = ["postgis"] -if sys.version_info < (3, 11): - gpd.options.io_engine = "fiona" - class GenericDataset( AbstractVersionedDataset[ diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index 76a5e828e..ec517bf6f 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -40,7 +40,7 @@ dask = ["kedro-datasets[dask-parquetdataset, dask-csvdataset]"] databricks-managedtabledataset = ["kedro-datasets[spark-base,pandas-base,delta-base,hdfs-base,s3fs-base]"] databricks = ["kedro-datasets[databricks-managedtabledataset]"] -geopandas-genericdataset = ["geopandas>=0.8.0, <2.0", "fiona~=1.10; python_version < '3.11'"] +geopandas-genericdataset = ["geopandas>=0.8.0, <2.0"] geopandas = ["kedro-datasets[geopandas-genericdataset]"] holoviews-holoviewswriter = ["holoviews~=1.13.0"] @@ -213,7 +213,6 @@ test = [ "deltalake>=0.10.0", "dill~=0.3.1", "filelock>=3.4.0, <4.0", - "fiona~=1.10; python_version < '3.11'", "gcsfs>=2023.1, <2023.3", "geopandas>=0.8.0, <2.0", "hdfs>=2.5.8, <3.0", From 0dab99f8cbe2d8962e516d6b686a8ab188174fdd Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Fri, 13 Sep 2024 11:59:37 +0200 Subject: [PATCH 23/24] Use fiona because pyogrio doesnt support fsspec Signed-off-by: Harm Matthias Harms --- kedro-datasets/kedro_datasets/geopandas/generic_dataset.py | 3 +++ kedro-datasets/pyproject.toml | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py index 65dcaf35b..c7ce7d308 100644 --- a/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py +++ b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py @@ -21,6 +21,9 @@ NON_FILE_SYSTEM_TARGETS = ["postgis"] +# pyogrio currently supports no alternate file handlers https://github.com/geopandas/pyogrio/issues/430 +gpd.options.io_engine = "fiona" + class GenericDataset( AbstractVersionedDataset[ diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index ec517bf6f..838b77a33 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -40,7 +40,7 @@ dask = ["kedro-datasets[dask-parquetdataset, dask-csvdataset]"] databricks-managedtabledataset = ["kedro-datasets[spark-base,pandas-base,delta-base,hdfs-base,s3fs-base]"] databricks = ["kedro-datasets[databricks-managedtabledataset]"] -geopandas-genericdataset = ["geopandas>=0.8.0, <2.0"] +geopandas-genericdataset = ["geopandas>=0.8.0, <2.0", "fiona >=1.8, <2.0"] geopandas = ["kedro-datasets[geopandas-genericdataset]"] holoviews-holoviewswriter = ["holoviews~=1.13.0"] @@ -213,6 +213,7 @@ test = [ "deltalake>=0.10.0", "dill~=0.3.1", "filelock>=3.4.0, <4.0", + "fiona >=1.8, <2.0", "gcsfs>=2023.1, <2023.3", "geopandas>=0.8.0, <2.0", "hdfs>=2.5.8, <3.0", From 978ad6ce37257ca8375a71a0d6ce1cdada179e67 Mon Sep 17 00:00:00 2001 From: Harm Matthias Harms Date: Fri, 13 Sep 2024 12:14:25 +0200 Subject: [PATCH 24/24] Format file Signed-off-by: Harm Matthias Harms --- kedro-datasets/kedro_datasets/geopandas/generic_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py index c7ce7d308..d8f753e89 100644 --- a/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py +++ b/kedro-datasets/kedro_datasets/geopandas/generic_dataset.py @@ -19,11 +19,11 @@ get_protocol_and_path, ) -NON_FILE_SYSTEM_TARGETS = ["postgis"] - # pyogrio currently supports no alternate file handlers https://github.com/geopandas/pyogrio/issues/430 gpd.options.io_engine = "fiona" +NON_FILE_SYSTEM_TARGETS = ["postgis"] + class GenericDataset( AbstractVersionedDataset[