Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build(datasets): upgrade s3fs to newer calver #348

Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
dcef71d
build(datasets): upgrade to newer, CalVered s3fs
MatthiasRoels Sep 21, 2023
5d87b28
fix(datasets): unit tests for matplotlib writer leverage moto server
MatthiasRoels Sep 26, 2023
dd5e433
fix(datasets): unit tests for dask pq dataset now leverage moto server
MatthiasRoels Sep 26, 2023
73a5ad2
build(datasets): bump moto version for server mode
MatthiasRoels Sep 27, 2023
f08fce6
fix(datasets): apply linting to test files
MatthiasRoels Sep 27, 2023
5109fb5
fix(datasets): unit tests for video dataset now leverage moto server
MatthiasRoels Sep 27, 2023
a5bfd4c
Fix(datasets): linting errors
MatthiasRoels Oct 2, 2023
51859ad
Merge branch 'main' into build/datasets-upgrade-s3fs-to-newer-calver
MatthiasRoels Oct 10, 2023
e578ad3
Merge branch 'main' into build/datasets-upgrade-s3fs-to-newer-calver
MatthiasRoels Oct 11, 2023
308fe19
fix(datasets): set correct CalVer for s3fs in setup.py
MatthiasRoels Oct 12, 2023
4fb1b02
Merge branch 'main' into build/datasets-upgrade-s3fs-to-newer-calver
MatthiasRoels Oct 12, 2023
3e43613
Merge branch 'main' into build/datasets-upgrade-s3fs-to-newer-calver
MatthiasRoels Oct 18, 2023
31510e8
Merge branch 'main' into build/datasets-upgrade-s3fs-to-newer-calver
noklam Nov 20, 2023
3bfbfdc
Merge branch 'main' into build/datasets-upgrade-s3fs-to-newer-calver
merelcht Nov 27, 2023
66931e8
Merge branch 'main' into build/datasets-upgrade-s3fs-to-newer-calver
merelcht Nov 29, 2023
619213c
Add patching for AWSResponse to make moto and aiobotocore work
merelcht Nov 30, 2023
cee9b80
Fix lint
merelcht Nov 30, 2023
dabd773
Add AWS mocking to lazy polars dataset test + temporarily ignore spar…
merelcht Nov 30, 2023
3bc77e0
Try use same moto for all python versions
merelcht Nov 30, 2023
e5b551d
Merge branch 'main' into build/datasets-upgrade-s3fs-to-newer-calver
merelcht Nov 30, 2023
cf32731
Undo polars changes
merelcht Nov 30, 2023
32e0e31
Merge branch 'build/datasets-upgrade-s3fs-to-newer-calver' of github.…
merelcht Nov 30, 2023
402439e
chore(datasets): fix accidental reference to NumPy (#450)
deepyaman Nov 30, 2023
1857887
Merge branch 'main' into build/datasets-upgrade-s3fs-to-newer-calver
merelcht Nov 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
PANDAS = "pandas>=1.3, <3.0"
SPARK = "pyspark>=2.2, <4.0"
HDFS = "hdfs>=2.5.8, <3.0"
S3FS = "s3fs>=0.3.0, <0.5"
S3FS = "s3fs>=2021.04, <2024.1" # Upper bound set arbitrarily, to be reassessed in early 2024
MatthiasRoels marked this conversation as resolved.
Show resolved Hide resolved
POLARS = "polars>=0.18.0"
DELTA = "delta-spark~=1.2.1"

Expand Down Expand Up @@ -175,8 +175,8 @@ def _collect_requirements(requires):
"matplotlib>=3.0.3, <3.4; python_version < '3.10'", # 3.4.0 breaks holoviews
"matplotlib>=3.5, <3.6; python_version >= '3.10'",
"memory_profiler>=0.50.0, <1.0",
"moto==1.3.7; python_version < '3.10'",
"moto==4.1.12; python_version >= '3.10'",
"moto[server]==3.1.0; python_version < '3.10'",
"moto[server]==4.2.4; python_version >= '3.10'",
"networkx~=2.4",
"opencv-python~=4.5.5.64",
"openpyxl>=3.0.3, <4.0",
Expand All @@ -201,7 +201,7 @@ def _collect_requirements(requires):
"redis~=4.1",
"requests-mock~=1.6",
"requests~=2.20",
"s3fs>=0.3.0, <0.5", # Needs to be at least 0.3.0 to make use of `cachable` attribute on S3FileSystem.
"s3fs>=2021.04, <2024.1",
"snowflake-snowpark-python~=1.0.0; python_version == '3.8'",
"scikit-learn>=1.0.2,<2",
"scipy>=1.7.3",
Expand Down
93 changes: 93 additions & 0 deletions kedro-datasets/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@
discover them automatically. More info here:
https://docs.pytest.org/en/latest/fixture.html
"""
import json
import os

import requests
from kedro.io.core import generate_timestamp
from moto.moto_server.threaded_moto_server import ThreadedMotoServer
from pytest import fixture

BUCKET_NAME = "test_bucket"
IP_ADDRESS = "127.0.0.1"
PORT = 5555
ENDPOINT_URI = f"http://{IP_ADDRESS}:{PORT}/"


@fixture(params=[None])
def load_version(request):
Expand All @@ -32,3 +41,87 @@ def save_args(request):
@fixture(params=[None])
def fs_args(request):
return request.param


@fixture(params=[None])
def mock_fs_args(request):
fs_args = {
# NB: use moto server to mock S3
"client_kwargs": {"endpoint_url": ENDPOINT_URI}
}

if isinstance(request.param, dict):
fs_args.update(request.param)

return fs_args


@fixture
def credentials():
return {
"key": "fake_access_key",
"secret": "fake_secret_key",
}
Comment on lines +60 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there are 2 sets of credentials?

Do we need key and secret or the AWS_XXXXX?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is for credentials file, the other one are env vars. The env vars are required for bucket setup etc while the credentials are used in the Dataset classes.



@fixture(scope="session")
def moto_server():
# This fixture is module-scoped, meaning that we can re-use the MotoServer across all tests
server = ThreadedMotoServer(ip_address=IP_ADDRESS, port=PORT)
server.start()

if "AWS_SECRET_ACCESS_KEY" not in os.environ:
os.environ["AWS_SECRET_ACCESS_KEY"] = "fake_access_key"
if "AWS_ACCESS_KEY_ID" not in os.environ:
os.environ["AWS_ACCESS_KEY_ID"] = "fake_secret_key"

yield

server.stop()


def _reset_moto_server():
# We reuse the MotoServer for all S3 related tests
# But we do want a clean state for every test
requests.post(f"{ENDPOINT_URI}/moto-api/reset")


def _get_boto3_client():
from botocore.session import Session

# NB: we use the sync botocore client for setup
session = Session()
return session.create_client(service_name="s3", endpoint_url=ENDPOINT_URI)


@fixture
def mocked_s3_bucket(moto_server):
"""Create a bucket for testing using moto."""
_reset_moto_server()
client = _get_boto3_client()
client.create_bucket(Bucket=BUCKET_NAME)
yield client


@fixture
def mocked_encrypted_s3_bucket(moto_server):
bucket_policy = {
"Version": "2012-10-17",
"Id": "PutObjPolicy",
"Statement": [
{
"Sid": "DenyUnEncryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": f"arn:aws:s3:::{BUCKET_NAME}/*",
"Condition": {"Null": {"s3:x-amz-server-side-encryption": "aws:kms"}},
}
],
}
bucket_policy = json.dumps(bucket_policy)
_reset_moto_server()
client = _get_boto3_client()
client.create_bucket(Bucket=BUCKET_NAME)
client.put_bucket_policy(Bucket=BUCKET_NAME, Policy=bucket_policy)
yield client
96 changes: 29 additions & 67 deletions kedro-datasets/tests/dask/test_parquet_dataset.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,24 @@
import importlib
from io import BytesIO

import boto3
import dask.dataframe as dd
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from moto import mock_s3
from pandas.testing import assert_frame_equal
from s3fs import S3FileSystem

from kedro_datasets._io import DatasetError
from kedro_datasets.dask import ParquetDataset
from kedro_datasets.dask.parquet_dataset import _DEPRECATED_CLASSES

FILE_NAME = "test.parquet"
BUCKET_NAME = "test_bucket"
AWS_CREDENTIALS = {"key": "FAKE_ACCESS_KEY", "secret": "FAKE_SECRET_KEY"}

# Pathlib cannot be used since it strips out the second slash from "s3://"
S3_PATH = f"s3://{BUCKET_NAME}/{FILE_NAME}"


@pytest.fixture
def mocked_s3_bucket():
"""Create a bucket for testing using moto."""
with mock_s3():
conn = boto3.client(
"s3",
aws_access_key_id="fake_access_key",
aws_secret_access_key="fake_secret_key",
)
conn.create_bucket(Bucket=BUCKET_NAME)
yield conn


@pytest.fixture
def dummy_dd_dataframe() -> dd.DataFrame:
df = pd.DataFrame(
Expand All @@ -44,34 +28,24 @@ def dummy_dd_dataframe() -> dd.DataFrame:


@pytest.fixture
def mocked_s3_object(tmp_path, mocked_s3_bucket, dummy_dd_dataframe: dd.DataFrame):
"""Creates test data and adds it to mocked S3 bucket."""
pandas_df = dummy_dd_dataframe.compute()
table = pa.Table.from_pandas(pandas_df)
temporary_path = tmp_path / FILE_NAME
pq.write_table(table, str(temporary_path))

mocked_s3_bucket.put_object(
Bucket=BUCKET_NAME, Key=FILE_NAME, Body=temporary_path.read_bytes()
)
return mocked_s3_bucket


@pytest.fixture
def s3_dataset(load_args, save_args):
def s3_dataset(mocked_s3_bucket, credentials, mock_fs_args, save_args, load_args):
return ParquetDataset(
filepath=S3_PATH,
credentials=AWS_CREDENTIALS,
credentials=credentials,
fs_args=mock_fs_args,
load_args=load_args,
save_args=save_args,
)


@pytest.fixture()
def s3fs_cleanup():
# clear cache so we get a clean slate every time we instantiate a S3FileSystem
yield
S3FileSystem.cachable = False
@pytest.fixture
def mocked_parquet_in_s3(mocked_s3_bucket, dummy_dd_dataframe):
pandas_df = dummy_dd_dataframe.compute()
buffer = BytesIO()
pandas_df.to_parquet(buffer)
buffer.seek(0)
mocked_s3_bucket.put_object(Bucket=BUCKET_NAME, Key=FILE_NAME, Body=buffer)
return S3_PATH


@pytest.mark.parametrize(
Expand All @@ -83,7 +57,6 @@ def test_deprecation(module_name, class_name):
getattr(importlib.import_module(module_name), class_name)


@pytest.mark.usefixtures("s3fs_cleanup")
class TestParquetDataset:
def test_incorrect_credentials_load(self):
"""Test that incorrect credential keys won't instantiate dataset."""
Expand All @@ -103,22 +76,25 @@ def test_empty_credentials_load(self, bad_credentials):
with pytest.raises(DatasetError, match=pattern):
parquet_dataset.load().compute()

def test_pass_credentials(self, mocker):
"""Test that AWS credentials are passed successfully into boto3
client instantiation on creating S3 connection."""
client_mock = mocker.patch("botocore.session.Session.create_client")
s3_dataset = ParquetDataset(filepath=S3_PATH, credentials=AWS_CREDENTIALS)
pattern = r"Failed while loading data from data set ParquetDataset\(.+\)"
with pytest.raises(DatasetError, match=pattern):
s3_dataset.load().compute()
def test_exists(self, s3_dataset, dummy_dd_dataframe):
"""Test `exists` method invocation for both existing and
nonexistent data set."""
assert not s3_dataset.exists()
s3_dataset.save(dummy_dd_dataframe)
assert s3_dataset.exists()

assert client_mock.call_count == 1
args, kwargs = client_mock.call_args_list[0]
assert args == ("s3",)
assert kwargs["aws_access_key_id"] == AWS_CREDENTIALS["key"]
assert kwargs["aws_secret_access_key"] == AWS_CREDENTIALS["secret"]
def test_load_data(
self, mocked_parquet_in_s3, mock_fs_args, credentials, dummy_dd_dataframe
):
"""Test loading the data from S3."""
dataset = ParquetDataset(
filepath=mocked_parquet_in_s3,
credentials=credentials,
fs_args=mock_fs_args,
)
loaded_data = dataset.load()
assert_frame_equal(loaded_data.compute(), dummy_dd_dataframe.compute())

@pytest.mark.usefixtures("mocked_s3_bucket")
def test_save_data(self, s3_dataset):
"""Test saving the data to S3."""
pd_data = pd.DataFrame(
Expand All @@ -129,20 +105,6 @@ def test_save_data(self, s3_dataset):
loaded_data = s3_dataset.load()
assert_frame_equal(loaded_data.compute(), dd_data.compute())

@pytest.mark.usefixtures("mocked_s3_object")
def test_load_data(self, s3_dataset, dummy_dd_dataframe):
"""Test loading the data from S3."""
loaded_data = s3_dataset.load()
assert_frame_equal(loaded_data.compute(), dummy_dd_dataframe.compute())

@pytest.mark.usefixtures("mocked_s3_bucket")
def test_exists(self, s3_dataset, dummy_dd_dataframe):
"""Test `exists` method invocation for both existing and
nonexistent data set."""
assert not s3_dataset.exists()
s3_dataset.save(dummy_dd_dataframe)
assert s3_dataset.exists()

def test_save_load_locally(self, tmp_path, dummy_dd_dataframe):
"""Test loading the data locally."""
file_path = str(tmp_path / "some" / "dir" / FILE_NAME)
Expand Down
Loading