From 12e7313c1a8301372ff6306e089f84894697a657 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 19 Aug 2024 03:58:17 +0200 Subject: [PATCH] DynamoDB: Add table loader for full-load operations --- .github/workflows/dynamodb.yml | 94 ++++++ CHANGES.md | 1 + cratedb_toolkit/api/main.py | 10 +- cratedb_toolkit/io/dynamodb/__init__.py | 0 cratedb_toolkit/io/dynamodb/adapter.py | 26 ++ cratedb_toolkit/io/dynamodb/api.py | 40 +++ cratedb_toolkit/io/dynamodb/backlog.md | 35 ++ cratedb_toolkit/io/dynamodb/copy.py | 97 ++++++ .../io/processor/kinesis_lambda.py | 2 + .../testing/testcontainers/localstack.py | 41 +++ .../testing/testcontainers/util.py | 6 +- doc/io/dynamodb/index.md | 1 + doc/io/dynamodb/loader.md | 68 ++++ pyproject.toml | 6 + tests/io/dynamodb/README.md | 7 + tests/io/dynamodb/__init__.py | 3 + tests/io/dynamodb/conftest.py | 74 +++++ tests/io/dynamodb/manager.py | 39 +++ tests/io/dynamodb/productcatalog.json | 306 ++++++++++++++++++ tests/io/dynamodb/test_cli.py | 31 ++ tests/io/dynamodb/test_import.py | 8 + 21 files changed, 892 insertions(+), 3 deletions(-) create mode 100644 .github/workflows/dynamodb.yml create mode 100644 cratedb_toolkit/io/dynamodb/__init__.py create mode 100644 cratedb_toolkit/io/dynamodb/adapter.py create mode 100644 cratedb_toolkit/io/dynamodb/api.py create mode 100644 cratedb_toolkit/io/dynamodb/backlog.md create mode 100644 cratedb_toolkit/io/dynamodb/copy.py create mode 100644 cratedb_toolkit/testing/testcontainers/localstack.py create mode 100644 doc/io/dynamodb/loader.md create mode 100644 tests/io/dynamodb/README.md create mode 100644 tests/io/dynamodb/__init__.py create mode 100644 tests/io/dynamodb/conftest.py create mode 100644 tests/io/dynamodb/manager.py create mode 100644 tests/io/dynamodb/productcatalog.json create mode 100644 tests/io/dynamodb/test_cli.py create mode 100644 tests/io/dynamodb/test_import.py diff --git a/.github/workflows/dynamodb.yml b/.github/workflows/dynamodb.yml new file mode 100644 index 00000000..d01b2225 --- /dev/null +++ b/.github/workflows/dynamodb.yml @@ -0,0 +1,94 @@ +--- +name: "Tests: DynamoDB" + +on: + pull_request: + branches: ~ + paths: + - '.github/workflows/dynamodb.yml' + - 'cratedb_toolkit/io/dynamodb/**' + - 'pyproject.toml' + push: + branches: [ main ] + paths: + - '.github/workflows/dynamodb.yml' + - 'cratedb_toolkit/io/dynamodb/**' + - 'pyproject.toml' + + # Allow job to be triggered manually. + workflow_dispatch: + + # Run job each second night after CrateDB nightly has been published. + # The reason about "why each second night", is because free capacity + # for Codecov uploads is limited. + schedule: + - cron: '0 3 */2 * *' + +# Cancel in-progress jobs when pushing to the same branch. +concurrency: + cancel-in-progress: true + group: ${{ github.workflow }}-${{ github.ref }} + +jobs: + + tests: + + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest"] + # TODO: yarl, dependency of influxio, is currently not available on Python 3.12. + # https://github.com/aio-libs/yarl/pull/942 + python-version: ["3.8", "3.11"] + localstack-version: ["3.6"] + + env: + OS: ${{ matrix.os }} + PYTHON: ${{ matrix.python-version }} + LOCALSTACK_VERSION: ${{ matrix.localstack-version }} + # Do not tear down Testcontainers + TC_KEEPALIVE: true + + name: " + Python ${{ matrix.python-version }}, + LocalStack ${{ matrix.localstack-version }}, + OS ${{ matrix.os }} + " + steps: + + - name: Acquire sources + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + cache: 'pip' + cache-dependency-path: 'pyproject.toml' + + - name: Set up project + run: | + + # `setuptools 0.64.0` adds support for editable install hooks (PEP 660). + # https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400 + pip install "setuptools>=64" --upgrade + + # Install package in editable mode. + pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop] + + - name: Run linter and software tests + run: | + pytest -m dynamodb + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + with: + files: ./coverage.xml + flags: dynamodb + env_vars: OS,PYTHON + name: codecov-umbrella + fail_ci_if_error: false diff --git a/CHANGES.md b/CHANGES.md index 5c19ec0f..8575212c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -17,6 +17,7 @@ - MongoDB: Add capability to give type hints and add transformations - Dependencies: Adjust code for lorrystream version 0.0.3 - Dependencies: Update to lorrystream 0.0.4 and commons-codec 0.0.7 +- DynamoDB: Add table loader for full-load operations ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 93a355b3..01a3d029 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -113,7 +113,15 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf source_url = resource.url target_url = self.address.dburi source_url_obj = URL(source_url) - if source_url.startswith("influxdb"): + if source_url.startswith("dynamodb"): + from cratedb_toolkit.io.dynamodb.api import dynamodb_copy + + if not dynamodb_copy(source_url, target_url, progress=True): + msg = "Data loading failed" + logger.error(msg) + raise OperationFailed(msg) + + elif source_url.startswith("influxdb"): from cratedb_toolkit.io.influxdb import influxdb_copy http_scheme = "http://" diff --git a/cratedb_toolkit/io/dynamodb/__init__.py b/cratedb_toolkit/io/dynamodb/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/io/dynamodb/adapter.py b/cratedb_toolkit/io/dynamodb/adapter.py new file mode 100644 index 00000000..4980da86 --- /dev/null +++ b/cratedb_toolkit/io/dynamodb/adapter.py @@ -0,0 +1,26 @@ +import boto3 +from yarl import URL + + +class DynamoDBAdapter: + def __init__(self, dynamodb_url: URL, echo: bool = False): + self.session = boto3.Session( + aws_access_key_id=dynamodb_url.user, + aws_secret_access_key=dynamodb_url.password, + region_name=dynamodb_url.query.get("region"), + ) + endpoint_url = None + if dynamodb_url.host and dynamodb_url.host.lower() != "aws": + endpoint_url = f"http://{dynamodb_url.host}:{dynamodb_url.port}" + self.dynamodb_resource = self.session.resource("dynamodb", endpoint_url=endpoint_url) + self.dynamodb_client = self.session.client("dynamodb", endpoint_url=endpoint_url) + + def scan(self, table_name: str): + """ + Return all items from DynamoDB table. + """ + return self.dynamodb_client.scan(TableName=table_name) + + def count_records(self, table_name: str): + table = self.dynamodb_resource.Table(table_name) + return table.item_count diff --git a/cratedb_toolkit/io/dynamodb/api.py b/cratedb_toolkit/io/dynamodb/api.py new file mode 100644 index 00000000..538b2c9d --- /dev/null +++ b/cratedb_toolkit/io/dynamodb/api.py @@ -0,0 +1,40 @@ +import logging + +from cratedb_toolkit.io.dynamodb.copy import DynamoDBFullLoad + +logger = logging.getLogger(__name__) + + +def dynamodb_copy(source_url, target_url, progress: bool = False): + """ + + Synopsis + -------- + export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo + ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@localhost:4566/us-east-1/ProductCatalog + ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@localhost:4566/arn:aws:dynamodb:us-east-1:000000000000:table/ProductCatalog + + ctk load table dynamodb://arn:aws:dynamodb:us-east-1:000000000000:table/ProductCatalog + arn:aws:dynamodb:us-east-1:841394475918:table/stream-demo + + ctk load table dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/ProductCatalog?region=eu-central-1 + + Resources + --------- + https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html + + Backlog + ------- + Currently, it is not directly possible to address DynamoDB tables by ARN, i.e. for using a different AccountID. + - https://github.com/boto/boto3/issues/2658 + - https://stackoverflow.com/questions/71019941/how-to-point-to-the-arn-of-a-dynamodb-table-instead-of-using-the-name-when-using + - https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/configure-cross-account-access-to-amazon-dynamodb.html + """ + logger.info("Invoking DynamoDBFullLoad") + ddb_full = DynamoDBFullLoad( + dynamodb_url=source_url, + cratedb_url=target_url, + progress=progress, + ) + ddb_full.start() + return True diff --git a/cratedb_toolkit/io/dynamodb/backlog.md b/cratedb_toolkit/io/dynamodb/backlog.md new file mode 100644 index 00000000..1361357d --- /dev/null +++ b/cratedb_toolkit/io/dynamodb/backlog.md @@ -0,0 +1,35 @@ +# DynamoDB Backlog + +## Iteration +1 +- Pagination / Batch Getting. + https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html#programming-with-python-pagination + +- Use `batch_get_item`. + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_get_item.html + +- Scan by query instead of full. + + +## Iteration +2 + +### Resumption on errors? +Another variant to scan the table, probably for resuming on errors? +```python +key = None +while True: + if key is None: + response = table.scan() + else: + response = table.scan(ExclusiveStartKey=key) + key = response.get("LastEvaluatedKey", None) +``` + +### Item transformations? +That's another item transformation idea picked up from an example program. +Please advise if this is sensible in all situations, or if it's just a +special case. + +```python +if 'id' in item and not isinstance(item['id'], str): + item['id'] = str(item['id']) +``` diff --git a/cratedb_toolkit/io/dynamodb/copy.py b/cratedb_toolkit/io/dynamodb/copy.py new file mode 100644 index 00000000..eafff70d --- /dev/null +++ b/cratedb_toolkit/io/dynamodb/copy.py @@ -0,0 +1,97 @@ +# ruff: noqa: S608 +import logging +import typing as t + +import sqlalchemy as sa +from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB +from tqdm import tqdm +from yarl import URL + +from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter +from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util import DatabaseAdapter + +logger = logging.getLogger(__name__) + + +class DynamoDBFullLoad: + """ + Copy DynamoDB table into CrateDB table. + """ + + def __init__( + self, + dynamodb_url: str, + cratedb_url: str, + progress: bool = False, + ): + cratedb_address = DatabaseAddress.from_string(cratedb_url) + cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() + cratedb_table = cratedb_table_address.fullname + + self.dynamodb_url = URL(dynamodb_url) + self.dynamodb_adapter = DynamoDBAdapter(self.dynamodb_url) + self.dynamodb_table = self.dynamodb_url.path.lstrip("/") + self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) + self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) + self.translator = DynamoDBCrateDBTranslator(table_name=self.cratedb_table) + + self.progress = progress + + def start(self): + """ + Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. + """ + records_in = self.dynamodb_adapter.count_records(self.dynamodb_table) + logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}") + with self.cratedb_adapter.engine.connect() as connection: + if not self.cratedb_adapter.table_exists(self.cratedb_table): + connection.execute(sa.text(self.translator.sql_ddl)) + connection.commit() + records_target = self.cratedb_adapter.count_records(self.cratedb_table) + logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") + progress_bar = tqdm(total=records_in) + result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table) + records_out = 0 + for sql in self.items_to_sql(result["Items"]): + if sql: + try: + connection.execute(sa.text(sql)) + records_out += 1 + except sa.exc.ProgrammingError as ex: + logger.warning(f"Running query failed: {ex}") + progress_bar.update() + progress_bar.close() + connection.commit() + logger.info(f"Number of records written: {records_out}") + if records_out < records_in: + logger.warning("No data has been copied") + + def items_to_sql(self, items): + """ + Convert data for record items to INSERT statements. + """ + for item in items: + yield self.translator.to_sql(item) + + +class DynamoDBCrateDBTranslator(DynamoCDCTranslatorCrateDB): + @property + def sql_ddl(self): + """` + Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. + """ + return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" + + def to_sql(self, record: t.Dict[str, t.Any]) -> str: + """ + Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record. + """ + values_clause = self.image_to_values(record) + sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES ('{values_clause}');" + return sql + + @staticmethod + def quote_table_name(name: str): + # TODO @ Upstream: Quoting table names should be the responsibility of the caller. + return name diff --git a/cratedb_toolkit/io/processor/kinesis_lambda.py b/cratedb_toolkit/io/processor/kinesis_lambda.py index 80eb6dc9..1340cd81 100644 --- a/cratedb_toolkit/io/processor/kinesis_lambda.py +++ b/cratedb_toolkit/io/processor/kinesis_lambda.py @@ -34,6 +34,7 @@ import logging import os import sys +import typing as t import sqlalchemy as sa from commons_codec.exception import UnknownOperationError @@ -77,6 +78,7 @@ # TODO: Automatically create destination table. # TODO: Propagate mapping definitions and other settings. +cdc: t.Union[DMSTranslatorCrateDB, DynamoCDCTranslatorCrateDB] if MESSAGE_FORMAT == "dms": cdc = DMSTranslatorCrateDB(column_types=column_types) elif MESSAGE_FORMAT == "dynamodb": diff --git a/cratedb_toolkit/testing/testcontainers/localstack.py b/cratedb_toolkit/testing/testcontainers/localstack.py new file mode 100644 index 00000000..2a340c00 --- /dev/null +++ b/cratedb_toolkit/testing/testcontainers/localstack.py @@ -0,0 +1,41 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import os + +from testcontainers.localstack import LocalStackContainer + +from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer + + +class LocalStackContainerWithKeepalive(KeepaliveContainer, LocalStackContainer): + """ + A Testcontainer for LocalStack with improved configurability. + + It honors the `TC_KEEPALIVE` and `LOCALSTACK_VERSION` environment variables. + + Defining `TC_KEEPALIVE` will set a signal not to shut down the container + after running the test cases, in order to speed up subsequent invocations. + + `LOCALSTACK_VERSION` will define the designated LocalStack version, which is + useful when used within a test matrix. Its default value is `latest`. + """ + + LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "latest") + + def __init__( + self, + image: str = f"localstack/localstack:{LOCALSTACK_VERSION}", + **kwargs, + ) -> None: + super().__init__(image=image, **kwargs) + self.with_name("testcontainers-localstack") diff --git a/cratedb_toolkit/testing/testcontainers/util.py b/cratedb_toolkit/testing/testcontainers/util.py index 766c8233..afa38aac 100644 --- a/cratedb_toolkit/testing/testcontainers/util.py +++ b/cratedb_toolkit/testing/testcontainers/util.py @@ -72,7 +72,8 @@ def start(self): define the `CRATEDB_KEEPALIVE` or `TC_KEEPALIVE` environment variables. """ - self._configure() + if hasattr(self, "_configure"): + self._configure() if self._name is None: raise ValueError( @@ -110,7 +111,8 @@ def start(self): logger.info(f"Starting container: {container_id} ({container_name})") self._container.start() - self._connect() + if hasattr(self, "_connect"): + self._connect() return self def stop(self, **kwargs): diff --git a/doc/io/dynamodb/index.md b/doc/io/dynamodb/index.md index fb589920..9b37c671 100644 --- a/doc/io/dynamodb/index.md +++ b/doc/io/dynamodb/index.md @@ -8,5 +8,6 @@ Using the DynamoDB subsystem, you can transfer data from and to DynamoDB. ```{toctree} :maxdepth: 1 +loader cdc ``` diff --git a/doc/io/dynamodb/loader.md b/doc/io/dynamodb/loader.md new file mode 100644 index 00000000..8d561e0e --- /dev/null +++ b/doc/io/dynamodb/loader.md @@ -0,0 +1,68 @@ +(dynamodb-loader)= +# DynamoDB Table Loader + +## About +Load data from DynamoDB into CrateDB using a one-stop command +`ctk load table dynamodb://...`, in order to facilitate convenient +data transfers to be used within data pipelines or ad hoc operations. + +## Install +```shell +pip install --upgrade 'cratedb-toolkit[dynamodb]' +``` + +## Usage +Transfer data from DynamoDB table into CrateDB schema/table. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@aws/ProductCatalog?region=us-east-1 +``` + +Query data in CrateDB. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk shell --command "SELECT * FROM testdrive.demo;" +ctk show table "testdrive.demo" +``` + +## Variants + +### CrateDB Cloud +When aiming to transfer data to CrateDB Cloud, the shape of the target URL +looks like that. +```shell +export CRATEDB_SQLALCHEMY_URL='crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true' +``` + +### LocalStack +In order to exercise data transfers exclusively on your workstation, you can +use LocalStack to run a DynamoDB service surrogate locally. See also the +[Get started with DynamoDB on LocalStack] tutorial. + +For addressing a DynamoDB database on LocalStack, use a command of that shape. +See [Credentials for accessing LocalStack AWS API] for further information. +```shell +ctk load table dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/ProductCatalog?region=us-east-1 +``` + +:::{tip} +LocalStack is a cloud service emulator that runs in a single container on your +laptop or in your CI environment. With LocalStack, you can run your AWS +applications or Lambdas entirely on your local machine without connecting to +a remote cloud provider. + +In order to invoke LocalStack on your workstation, you can use this Docker +command. +```shell +docker run \ + --rm -it \ + -p 127.0.0.1:4566:4566 \ + -p 127.0.0.1:4510-4559:4510-4559 \ + -v /var/run/docker.sock:/var/run/docker.sock \ + localstack/localstack:latest +``` +::: + + +[Credentials for accessing LocalStack AWS API]: https://docs.localstack.cloud/references/credentials/ +[Get started with DynamoDB on LocalStack]: https://docs.localstack.cloud/user-guide/aws/dynamodb/ diff --git a/pyproject.toml b/pyproject.toml index c16de8d4..0c0cbac5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -137,6 +137,10 @@ docs = [ "sphinxcontrib-mermaid<1", "sphinxext-opengraph<1", ] +dynamodb = [ + "boto3", + "commons-codec", +] full = [ "cratedb-toolkit[cfr,cloud,datasets,io,service]", ] @@ -188,6 +192,7 @@ test = [ "pytest-mock<4", "responses<0.26", "testcontainers-azurite==0.0.1rc1", + "testcontainers-localstack==0.0.1rc1", "testcontainers-minio==0.0.1rc1", ] test-mongodb = [ @@ -254,6 +259,7 @@ testpaths = [ xfail_strict = true markers = [ "examples", + "dynamodb", "influxdb", "kinesis", "mongodb", diff --git a/tests/io/dynamodb/README.md b/tests/io/dynamodb/README.md new file mode 100644 index 00000000..deb36d5b --- /dev/null +++ b/tests/io/dynamodb/README.md @@ -0,0 +1,7 @@ +# Testing DynamoDB + +## About +This unit-/integration test subsystem uses LocalStack's DynamoDB. + +## Resources +- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleTables.html diff --git a/tests/io/dynamodb/__init__.py b/tests/io/dynamodb/__init__.py new file mode 100644 index 00000000..933fb4a5 --- /dev/null +++ b/tests/io/dynamodb/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("boto3", reason="Skipping DynamoDB tests because 'boto3' package is not installed") diff --git a/tests/io/dynamodb/conftest.py b/tests/io/dynamodb/conftest.py new file mode 100644 index 00000000..8806c508 --- /dev/null +++ b/tests/io/dynamodb/conftest.py @@ -0,0 +1,74 @@ +import logging + +import pytest +from yarl import URL + +from tests.io.dynamodb.manager import DynamoDBTestManager + +logger = logging.getLogger(__name__) + + +# Define databases to be deleted before running each test case. +RESET_TABLES = [ + "ProductCatalog", +] + + +class DynamoDBFixture: + """ + A little helper wrapping Testcontainer's `LocalStackContainer`. + """ + + def __init__(self): + self.container = None + self.url = None + self.setup() + + def setup(self): + # TODO: Make image name configurable. + from cratedb_toolkit.testing.testcontainers.localstack import LocalStackContainerWithKeepalive + + self.container = LocalStackContainerWithKeepalive() + self.container.with_services("dynamodb") + self.container.start() + + def finalize(self): + self.container.stop() + + def reset(self): + """ + Drop all databases used for testing. + """ + # FIXME + return + for database_name in RESET_TABLES: + self.client.drop_database(database_name) + + def get_connection_url(self): + url = URL(self.container.get_url()) + return f"dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@{url.host}:{url.port}" + + +@pytest.fixture(scope="session") +def dynamodb_service(): + """ + Provide a DynamoDB service instance to the test suite. + """ + db = DynamoDBFixture() + db.reset() + yield db + db.finalize() + + +@pytest.fixture(scope="function") +def dynamodb(dynamodb_service): + """ + Provide a fresh canvas to each test case invocation, by resetting database content. + """ + dynamodb_service.reset() + yield dynamodb_service + + +@pytest.fixture(scope="session") +def dynamodb_test_manager(dynamodb_service): + return DynamoDBTestManager(dynamodb_service.get_connection_url()) diff --git a/tests/io/dynamodb/manager.py b/tests/io/dynamodb/manager.py new file mode 100644 index 00000000..ce62c125 --- /dev/null +++ b/tests/io/dynamodb/manager.py @@ -0,0 +1,39 @@ +import json +from pathlib import Path + +from yarl import URL + +from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter + + +class DynamoDBTestManager: + def __init__(self, url: str): + self.adapter = DynamoDBAdapter(URL(url).with_query({"region": "us-east-1"})) + + def load_product_catalog(self): + table = self.adapter.dynamodb_resource.Table("ProductCatalog") + try: + table.delete() + except Exception: # noqa: S110 + pass + + table = self.adapter.dynamodb_resource.create_table( + TableName="ProductCatalog", + KeySchema=[ + {"AttributeName": "Id", "KeyType": "HASH"}, + ], + AttributeDefinitions=[ + {"AttributeName": "Id", "AttributeType": "N"}, + ], + ProvisionedThroughput={ + "ReadCapacityUnits": 1, + "WriteCapacityUnits": 1, + }, + TableClass="STANDARD", + ) + table.wait_until_exists() + + data = json.loads(Path("tests/io/dynamodb/productcatalog.json").read_text()) + self.adapter.dynamodb_client.batch_write_item(RequestItems=data) + table.load() + return table diff --git a/tests/io/dynamodb/productcatalog.json b/tests/io/dynamodb/productcatalog.json new file mode 100644 index 00000000..4c42189b --- /dev/null +++ b/tests/io/dynamodb/productcatalog.json @@ -0,0 +1,306 @@ +{ + "ProductCatalog": [ + { + "PutRequest": { + "Item": { + "Id": { + "N": "101" + }, + "Title": { + "S": "Book 101 Title" + }, + "ISBN": { + "S": "111-1111111111" + }, + "Authors": { + "L": [ + { + "S": "Author1" + } + ] + }, + "Price": { + "N": "2" + }, + "Dimensions": { + "S": "8.5 x 11.0 x 0.5" + }, + "PageCount": { + "N": "500" + }, + "InPublication": { + "BOOL": true + }, + "ProductCategory": { + "S": "Book" + } + } + } + }, + { + "PutRequest": { + "Item": { + "Id": { + "N": "102" + }, + "Title": { + "S": "Book 102 Title" + }, + "ISBN": { + "S": "222-2222222222" + }, + "Authors": { + "L": [ + { + "S": "Author1" + }, + { + "S": "Author2" + } + ] + }, + "Price": { + "N": "20" + }, + "Dimensions": { + "S": "8.5 x 11.0 x 0.8" + }, + "PageCount": { + "N": "600" + }, + "InPublication": { + "BOOL": true + }, + "ProductCategory": { + "S": "Book" + } + } + } + }, + { + "PutRequest": { + "Item": { + "Id": { + "N": "103" + }, + "Title": { + "S": "Book 103 Title" + }, + "ISBN": { + "S": "333-3333333333" + }, + "Authors": { + "L": [ + { + "S": "Author1" + }, + { + "S": "Author2" + } + ] + }, + "Price": { + "N": "2000" + }, + "Dimensions": { + "S": "8.5 x 11.0 x 1.5" + }, + "PageCount": { + "N": "600" + }, + "InPublication": { + "BOOL": false + }, + "ProductCategory": { + "S": "Book" + } + } + } + }, + { + "PutRequest": { + "Item": { + "Id": { + "N": "201" + }, + "Title": { + "S": "18-Bike-201" + }, + "Description": { + "S": "201 Description" + }, + "BicycleType": { + "S": "Road" + }, + "Brand": { + "S": "Mountain A" + }, + "Price": { + "N": "100" + }, + "Color": { + "L": [ + { + "S": "Red" + }, + { + "S": "Black" + } + ] + }, + "ProductCategory": { + "S": "Bicycle" + } + } + } + }, + { + "PutRequest": { + "Item": { + "Id": { + "N": "202" + }, + "Title": { + "S": "21-Bike-202" + }, + "Description": { + "S": "202 Description" + }, + "BicycleType": { + "S": "Road" + }, + "Brand": { + "S": "Brand-Company A" + }, + "Price": { + "N": "200" + }, + "Color": { + "L": [ + { + "S": "Green" + }, + { + "S": "Black" + } + ] + }, + "ProductCategory": { + "S": "Bicycle" + } + } + } + }, + { + "PutRequest": { + "Item": { + "Id": { + "N": "203" + }, + "Title": { + "S": "19-Bike-203" + }, + "Description": { + "S": "203 Description" + }, + "BicycleType": { + "S": "Road" + }, + "Brand": { + "S": "Brand-Company B" + }, + "Price": { + "N": "300" + }, + "Color": { + "L": [ + { + "S": "Red" + }, + { + "S": "Green" + }, + { + "S": "Black" + } + ] + }, + "ProductCategory": { + "S": "Bicycle" + } + } + } + }, + { + "PutRequest": { + "Item": { + "Id": { + "N": "204" + }, + "Title": { + "S": "18-Bike-204" + }, + "Description": { + "S": "204 Description" + }, + "BicycleType": { + "S": "Mountain" + }, + "Brand": { + "S": "Brand-Company B" + }, + "Price": { + "N": "400" + }, + "Color": { + "L": [ + { + "S": "Red" + } + ] + }, + "ProductCategory": { + "S": "Bicycle" + } + } + } + }, + { + "PutRequest": { + "Item": { + "Id": { + "N": "205" + }, + "Title": { + "S": "18-Bike-204" + }, + "Description": { + "S": "205 Description" + }, + "BicycleType": { + "S": "Hybrid" + }, + "Brand": { + "S": "Brand-Company C" + }, + "Price": { + "N": "500" + }, + "Color": { + "L": [ + { + "S": "Red" + }, + { + "S": "Black" + } + ] + }, + "ProductCategory": { + "S": "Bicycle" + } + } + } + } + ] +} diff --git a/tests/io/dynamodb/test_cli.py b/tests/io/dynamodb/test_cli.py new file mode 100644 index 00000000..80d87d63 --- /dev/null +++ b/tests/io/dynamodb/test_cli.py @@ -0,0 +1,31 @@ +import pytest +from click.testing import CliRunner + +from cratedb_toolkit.cli import cli + +pytestmark = pytest.mark.dynamodb + + +def test_dynamodb_load_table(caplog, cratedb, dynamodb, dynamodb_test_manager): + """ + CLI test: Invoke `ctk load table` for DynamoDB. + """ + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + dynamodb_url = f"{dynamodb.get_connection_url()}/ProductCatalog?region=us-east-1" + + # Populate source database with sample dataset. + dynamodb_test_manager.load_product_catalog() + + # Run transfer command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url}) + result = runner.invoke( + cli, + args=f"load table {dynamodb_url}", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify data in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 8 diff --git a/tests/io/dynamodb/test_import.py b/tests/io/dynamodb/test_import.py new file mode 100644 index 00000000..78914d47 --- /dev/null +++ b/tests/io/dynamodb/test_import.py @@ -0,0 +1,8 @@ +import pytest + +pytestmark = pytest.mark.dynamodb + + +def test_dynamodb_import(dynamodb_test_manager): + product_catalog = dynamodb_test_manager.load_product_catalog() + assert product_catalog.item_count == 8