Skip to content

Commit

Permalink
tests(datalake): use minio
Browse files Browse the repository at this point in the history
1. use minio instead of moto for mimicking s3 behavior.
2. removed moto dependency as it is not compatible with aiobotocore (getmoto/moto#7070 (comment))
  • Loading branch information
sushi30 committed Sep 11, 2024
1 parent 4f57f4e commit 082d6de
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 55 deletions.
8 changes: 2 additions & 6 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,8 @@
*COMMONS["datalake"],
},
"datalake-s3": {
# requires aiobotocore
# https://github.com/fsspec/s3fs/blob/9bf99f763edaf7026318e150c4bd3a8d18bb3a00/requirements.txt#L1
# however, the latest version of `s3fs` conflicts its `aiobotocore` dep with `boto3`'s dep on `botocore`.
# Leaving this marked to the automatic resolution to speed up installation.
"s3fs",
# vendoring 'boto3' to keep all dependencies aligned (s3fs, boto3, botocore, aiobotocore)
"s3fs[boto3]",
*COMMONS["datalake"],
},
"deltalake": {"delta-spark<=2.3.0", "deltalake~=0.17"},
Expand Down Expand Up @@ -343,7 +340,6 @@
"coverage",
# Install GE because it's not in the `all` plugin
VERSIONS["great-expectations"],
"moto~=5.0",
"basedpyright~=1.14",
"pytest==7.0.0",
"pytest-cov",
Expand Down
91 changes: 45 additions & 46 deletions ingestion/tests/integration/datalake/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
import os
from copy import deepcopy

import boto3
import pytest
from moto import mock_aws

from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow

BUCKET_NAME = "MyBucket"
from ..containers import MinioContainerConfigs, get_minio_container

BUCKET_NAME = "my-bucket"

INGESTION_CONFIG = {
"source": {
Expand Down Expand Up @@ -77,7 +77,7 @@
"sourceConfig": {
"config": {
"type": "TestSuite",
"entityFullyQualifiedName": 'datalake_for_integration_tests.default.MyBucket."users.csv"',
"entityFullyQualifiedName": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"',
}
},
},
Expand Down Expand Up @@ -128,31 +128,19 @@
}


@pytest.fixture(scope="module", autouse=True)
def aws():
with mock_aws():
yield boto3.client("s3", region_name="us-east-1")
@pytest.fixture(scope="session")
def minio_container():
with get_minio_container(MinioContainerConfigs()) as container:
yield container


@pytest.fixture(scope="class", autouse=True)
def setup_s3(request) -> None:
def setup_s3(minio_container) -> None:
# Mock our S3 bucket and ingest a file
boto3.DEFAULT_SESSION = None
request.cls.s3_client = boto3.client(
"s3",
region_name="us-west-1",
)
s3 = boto3.resource(
"s3",
region_name="us-west-1",
aws_access_key_id="fake_access_key",
aws_secret_access_key="fake_secret_key",
)
request.cls.s3_client.create_bucket(
Bucket=BUCKET_NAME,
CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
)
s3.meta.client.head_bucket(Bucket=BUCKET_NAME)
client = minio_container.get_client()
if client.bucket_exists(BUCKET_NAME):
return
client.make_bucket(BUCKET_NAME)
current_dir = os.path.dirname(__file__)
resources_dir = os.path.join(current_dir, "resources")

Expand All @@ -161,23 +149,31 @@ def setup_s3(request) -> None:
for path, _, files in os.walk(resources_dir)
for filename in files
]

request.cls.s3_keys = []

for path in resources_paths:
key = os.path.relpath(path, resources_dir)
request.cls.s3_keys.append(key)
request.cls.s3_client.upload_file(Filename=path, Bucket=BUCKET_NAME, Key=key)
yield
bucket = s3.Bucket(BUCKET_NAME)
for key in bucket.objects.all():
key.delete()
bucket.delete()
client.fput_object(BUCKET_NAME, key, path)
return


@pytest.fixture(scope="class")
def run_ingestion(metadata):
ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG)
def ingestion_config(minio_container):
ingestion_config = deepcopy(INGESTION_CONFIG)
ingestion_config["source"]["serviceConnection"]["config"]["configSource"].update(
{
"securityConfig": {
"awsAccessKeyId": minio_container.access_key,
"awsSecretAccessKey": minio_container.secret_key,
"awsRegion": "us-west-1",
"endPointURL": f"http://localhost:{minio_container.get_exposed_port(minio_container.port)}",
}
}
)
return ingestion_config


@pytest.fixture(scope="class")
def run_ingestion(metadata, ingestion_config):
ingestion_workflow = MetadataWorkflow.create(ingestion_config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()
Expand All @@ -189,27 +185,30 @@ def run_ingestion(metadata):


@pytest.fixture
def run_test_suite_workflow(run_ingestion):
ingestion_workflow = TestSuiteWorkflow.create(DATA_QUALITY_CONFIG)
def run_test_suite_workflow(run_ingestion, ingestion_config):
workflow_config = deepcopy(DATA_QUALITY_CONFIG)
workflow_config["source"]["serviceConnection"] = ingestion_config["source"][
"serviceConnection"
]
ingestion_workflow = TestSuiteWorkflow.create(workflow_config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()


@pytest.fixture(scope="session")
def profiler_workflow_config(workflow_config):
config = deepcopy(INGESTION_CONFIG)
config["source"]["sourceConfig"]["config"].update(
@pytest.fixture(scope="class")
def profiler_workflow_config(ingestion_config, workflow_config):
ingestion_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
config["processor"] = {
ingestion_config["processor"] = {
"type": "orm-profiler",
"config": {},
}
config["workflowConfig"] = workflow_config
return config
ingestion_config["workflowConfig"] = workflow_config
return ingestion_config


@pytest.fixture()
Expand Down
7 changes: 4 additions & 3 deletions ingestion/tests/integration/datalake/test_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import pytest

from ingestion.tests.integration.datalake.conftest import BUCKET_NAME
from metadata.generated.schema.entity.data.table import DataType, Table
from metadata.ingestion.ometa.models import EntityList
from metadata.ingestion.ometa.ometa_api import OpenMetadata
Expand Down Expand Up @@ -53,7 +54,7 @@ def test_profiler(self, run_profiler):
"""Also excluding the test for parquet files until the above is fixed"""
csv_ = self.metadata.get_by_name(
entity=Table,
fqn='datalake_for_integration_tests.default.MyBucket."users.csv"',
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"',
fields=["tableProfilerConfig"],
)
# parquet_ = self.metadata.get_by_name(
Expand All @@ -63,13 +64,13 @@ def test_profiler(self, run_profiler):
# )
json_ = self.metadata.get_by_name(
entity=Table,
fqn='datalake_for_integration_tests.default.MyBucket."names.json"',
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."names.json"',
fields=["tableProfilerConfig"],
)

jsonl_ = self.metadata.get_by_name(
entity=Table,
fqn='datalake_for_integration_tests.default.MyBucket."names.jsonl"',
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."names.jsonl"',
fields=["tableProfilerConfig"],
)

Expand Down

0 comments on commit 082d6de

Please sign in to comment.