diff --git a/ingestion/setup.py b/ingestion/setup.py index aa5bd5aa27cf..84e2f6025274 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -207,11 +207,8 @@ *COMMONS["datalake"], }, "datalake-s3": { - # requires aiobotocore - # https://github.com/fsspec/s3fs/blob/9bf99f763edaf7026318e150c4bd3a8d18bb3a00/requirements.txt#L1 - # however, the latest version of `s3fs` conflicts its `aiobotocore` dep with `boto3`'s dep on `botocore`. - # Leaving this marked to the automatic resolution to speed up installation. - "s3fs", + # vendoring 'boto3' to keep all dependencies aligned (s3fs, boto3, botocore, aiobotocore) + "s3fs[boto3]", *COMMONS["datalake"], }, "deltalake": {"delta-spark<=2.3.0", "deltalake~=0.17"}, @@ -343,7 +340,6 @@ "coverage", # Install GE because it's not in the `all` plugin VERSIONS["great-expectations"], - "moto~=5.0", "basedpyright~=1.14", "pytest==7.0.0", "pytest-cov", diff --git a/ingestion/tests/integration/datalake/conftest.py b/ingestion/tests/integration/datalake/conftest.py index 1ed88fa8ffb6..3270579e96c8 100644 --- a/ingestion/tests/integration/datalake/conftest.py +++ b/ingestion/tests/integration/datalake/conftest.py @@ -14,16 +14,16 @@ import os from copy import deepcopy -import boto3 import pytest -from moto import mock_aws from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -BUCKET_NAME = "MyBucket" +from ..containers import MinioContainerConfigs, get_minio_container + +BUCKET_NAME = "my-bucket" INGESTION_CONFIG = { "source": { @@ -77,7 +77,7 @@ "sourceConfig": { "config": { "type": "TestSuite", - "entityFullyQualifiedName": 'datalake_for_integration_tests.default.MyBucket."users.csv"', + "entityFullyQualifiedName": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"', } }, }, @@ -128,31 +128,19 @@ } -@pytest.fixture(scope="module", autouse=True) -def aws(): - with mock_aws(): - yield boto3.client("s3", region_name="us-east-1") +@pytest.fixture(scope="session") +def minio_container(): + with get_minio_container(MinioContainerConfigs()) as container: + yield container @pytest.fixture(scope="class", autouse=True) -def setup_s3(request) -> None: +def setup_s3(minio_container) -> None: # Mock our S3 bucket and ingest a file - boto3.DEFAULT_SESSION = None - request.cls.s3_client = boto3.client( - "s3", - region_name="us-west-1", - ) - s3 = boto3.resource( - "s3", - region_name="us-west-1", - aws_access_key_id="fake_access_key", - aws_secret_access_key="fake_secret_key", - ) - request.cls.s3_client.create_bucket( - Bucket=BUCKET_NAME, - CreateBucketConfiguration={"LocationConstraint": "us-west-1"}, - ) - s3.meta.client.head_bucket(Bucket=BUCKET_NAME) + client = minio_container.get_client() + if client.bucket_exists(BUCKET_NAME): + return + client.make_bucket(BUCKET_NAME) current_dir = os.path.dirname(__file__) resources_dir = os.path.join(current_dir, "resources") @@ -161,23 +149,31 @@ def setup_s3(request) -> None: for path, _, files in os.walk(resources_dir) for filename in files ] - - request.cls.s3_keys = [] - for path in resources_paths: key = os.path.relpath(path, resources_dir) - request.cls.s3_keys.append(key) - request.cls.s3_client.upload_file(Filename=path, Bucket=BUCKET_NAME, Key=key) - yield - bucket = s3.Bucket(BUCKET_NAME) - for key in bucket.objects.all(): - key.delete() - bucket.delete() + client.fput_object(BUCKET_NAME, key, path) + return @pytest.fixture(scope="class") -def run_ingestion(metadata): - ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG) +def ingestion_config(minio_container): + ingestion_config = deepcopy(INGESTION_CONFIG) + ingestion_config["source"]["serviceConnection"]["config"]["configSource"].update( + { + "securityConfig": { + "awsAccessKeyId": minio_container.access_key, + "awsSecretAccessKey": minio_container.secret_key, + "awsRegion": "us-west-1", + "endPointURL": f"http://localhost:{minio_container.get_exposed_port(minio_container.port)}", + } + } + ) + return ingestion_config + + +@pytest.fixture(scope="class") +def run_ingestion(metadata, ingestion_config): + ingestion_workflow = MetadataWorkflow.create(ingestion_config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() ingestion_workflow.stop() @@ -189,27 +185,30 @@ def run_ingestion(metadata): @pytest.fixture -def run_test_suite_workflow(run_ingestion): - ingestion_workflow = TestSuiteWorkflow.create(DATA_QUALITY_CONFIG) +def run_test_suite_workflow(run_ingestion, ingestion_config): + workflow_config = deepcopy(DATA_QUALITY_CONFIG) + workflow_config["source"]["serviceConnection"] = ingestion_config["source"][ + "serviceConnection" + ] + ingestion_workflow = TestSuiteWorkflow.create(workflow_config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() ingestion_workflow.stop() -@pytest.fixture(scope="session") -def profiler_workflow_config(workflow_config): - config = deepcopy(INGESTION_CONFIG) - config["source"]["sourceConfig"]["config"].update( +@pytest.fixture(scope="class") +def profiler_workflow_config(ingestion_config, workflow_config): + ingestion_config["source"]["sourceConfig"]["config"].update( { "type": "Profiler", } ) - config["processor"] = { + ingestion_config["processor"] = { "type": "orm-profiler", "config": {}, } - config["workflowConfig"] = workflow_config - return config + ingestion_config["workflowConfig"] = workflow_config + return ingestion_config @pytest.fixture() diff --git a/ingestion/tests/integration/datalake/test_ingestion.py b/ingestion/tests/integration/datalake/test_ingestion.py index 58c1847fee07..b3666729e34c 100644 --- a/ingestion/tests/integration/datalake/test_ingestion.py +++ b/ingestion/tests/integration/datalake/test_ingestion.py @@ -13,6 +13,7 @@ import pytest +from ingestion.tests.integration.datalake.conftest import BUCKET_NAME from metadata.generated.schema.entity.data.table import DataType, Table from metadata.ingestion.ometa.models import EntityList from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -53,7 +54,7 @@ def test_profiler(self, run_profiler): """Also excluding the test for parquet files until the above is fixed""" csv_ = self.metadata.get_by_name( entity=Table, - fqn='datalake_for_integration_tests.default.MyBucket."users.csv"', + fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"', fields=["tableProfilerConfig"], ) # parquet_ = self.metadata.get_by_name( @@ -63,13 +64,13 @@ def test_profiler(self, run_profiler): # ) json_ = self.metadata.get_by_name( entity=Table, - fqn='datalake_for_integration_tests.default.MyBucket."names.json"', + fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."names.json"', fields=["tableProfilerConfig"], ) jsonl_ = self.metadata.get_by_name( entity=Table, - fqn='datalake_for_integration_tests.default.MyBucket."names.jsonl"', + fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."names.jsonl"', fields=["tableProfilerConfig"], )