Skip to content

Commit

Permalink
DynamoDB: Add table loader for full-load operations
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Aug 19, 2024
1 parent 31897f9 commit 12e7313
Show file tree
Hide file tree
Showing 21 changed files with 892 additions and 3 deletions.
94 changes: 94 additions & 0 deletions .github/workflows/dynamodb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
---
name: "Tests: DynamoDB"

on:
pull_request:
branches: ~
paths:
- '.github/workflows/dynamodb.yml'
- 'cratedb_toolkit/io/dynamodb/**'
- 'pyproject.toml'
push:
branches: [ main ]
paths:
- '.github/workflows/dynamodb.yml'
- 'cratedb_toolkit/io/dynamodb/**'
- 'pyproject.toml'

# Allow job to be triggered manually.
workflow_dispatch:

# Run job each second night after CrateDB nightly has been published.
# The reason about "why each second night", is because free capacity
# for Codecov uploads is limited.
schedule:
- cron: '0 3 */2 * *'

# Cancel in-progress jobs when pushing to the same branch.
concurrency:
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.ref }}

jobs:

tests:

runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
# TODO: yarl, dependency of influxio, is currently not available on Python 3.12.
# https://github.com/aio-libs/yarl/pull/942
python-version: ["3.8", "3.11"]
localstack-version: ["3.6"]

env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}
LOCALSTACK_VERSION: ${{ matrix.localstack-version }}
# Do not tear down Testcontainers
TC_KEEPALIVE: true

name: "
Python ${{ matrix.python-version }},
LocalStack ${{ matrix.localstack-version }},
OS ${{ matrix.os }}
"
steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: 'pyproject.toml'

- name: Set up project
run: |
# `setuptools 0.64.0` adds support for editable install hooks (PEP 660).
# https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop]
- name: Run linter and software tests
run: |
pytest -m dynamodb
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: ./coverage.xml
flags: dynamodb
env_vars: OS,PYTHON
name: codecov-umbrella
fail_ci_if_error: false
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- MongoDB: Add capability to give type hints and add transformations
- Dependencies: Adjust code for lorrystream version 0.0.3
- Dependencies: Update to lorrystream 0.0.4 and commons-codec 0.0.7
- DynamoDB: Add table loader for full-load operations

## 2024/07/25 v0.0.16
- `ctk load table`: Added support for MongoDB Change Streams
Expand Down
10 changes: 9 additions & 1 deletion cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,15 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
source_url = resource.url
target_url = self.address.dburi
source_url_obj = URL(source_url)
if source_url.startswith("influxdb"):
if source_url.startswith("dynamodb"):
from cratedb_toolkit.io.dynamodb.api import dynamodb_copy

if not dynamodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)

elif source_url.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

http_scheme = "http://"
Expand Down
Empty file.
26 changes: 26 additions & 0 deletions cratedb_toolkit/io/dynamodb/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import boto3
from yarl import URL


class DynamoDBAdapter:
def __init__(self, dynamodb_url: URL, echo: bool = False):
self.session = boto3.Session(
aws_access_key_id=dynamodb_url.user,
aws_secret_access_key=dynamodb_url.password,
region_name=dynamodb_url.query.get("region"),
)
endpoint_url = None
if dynamodb_url.host and dynamodb_url.host.lower() != "aws":
endpoint_url = f"http://{dynamodb_url.host}:{dynamodb_url.port}"
self.dynamodb_resource = self.session.resource("dynamodb", endpoint_url=endpoint_url)
self.dynamodb_client = self.session.client("dynamodb", endpoint_url=endpoint_url)

def scan(self, table_name: str):
"""
Return all items from DynamoDB table.
"""
return self.dynamodb_client.scan(TableName=table_name)

def count_records(self, table_name: str):
table = self.dynamodb_resource.Table(table_name)
return table.item_count
40 changes: 40 additions & 0 deletions cratedb_toolkit/io/dynamodb/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging

from cratedb_toolkit.io.dynamodb.copy import DynamoDBFullLoad

logger = logging.getLogger(__name__)


def dynamodb_copy(source_url, target_url, progress: bool = False):
"""
Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@localhost:4566/us-east-1/ProductCatalog
ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@localhost:4566/arn:aws:dynamodb:us-east-1:000000000000:table/ProductCatalog
ctk load table dynamodb://arn:aws:dynamodb:us-east-1:000000000000:table/ProductCatalog
arn:aws:dynamodb:us-east-1:841394475918:table/stream-demo
ctk load table dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/ProductCatalog?region=eu-central-1
Resources
---------
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html
Backlog
-------
Currently, it is not directly possible to address DynamoDB tables by ARN, i.e. for using a different AccountID.
- https://github.com/boto/boto3/issues/2658
- https://stackoverflow.com/questions/71019941/how-to-point-to-the-arn-of-a-dynamodb-table-instead-of-using-the-name-when-using
- https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/configure-cross-account-access-to-amazon-dynamodb.html
"""
logger.info("Invoking DynamoDBFullLoad")
ddb_full = DynamoDBFullLoad(
dynamodb_url=source_url,
cratedb_url=target_url,
progress=progress,
)
ddb_full.start()
return True
35 changes: 35 additions & 0 deletions cratedb_toolkit/io/dynamodb/backlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# DynamoDB Backlog

## Iteration +1
- Pagination / Batch Getting.
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html#programming-with-python-pagination

- Use `batch_get_item`.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_get_item.html

- Scan by query instead of full.


## Iteration +2

### Resumption on errors?
Another variant to scan the table, probably for resuming on errors?
```python
key = None
while True:
if key is None:
response = table.scan()
else:
response = table.scan(ExclusiveStartKey=key)
key = response.get("LastEvaluatedKey", None)
```

### Item transformations?
That's another item transformation idea picked up from an example program.
Please advise if this is sensible in all situations, or if it's just a
special case.

```python
if 'id' in item and not isinstance(item['id'], str):
item['id'] = str(item['id'])
```
97 changes: 97 additions & 0 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# ruff: noqa: S608
import logging
import typing as t

import sqlalchemy as sa
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB
from tqdm import tqdm
from yarl import URL

from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


class DynamoDBFullLoad:
"""
Copy DynamoDB table into CrateDB table.
"""

def __init__(
self,
dynamodb_url: str,
cratedb_url: str,
progress: bool = False,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname

self.dynamodb_url = URL(dynamodb_url)
self.dynamodb_adapter = DynamoDBAdapter(self.dynamodb_url)
self.dynamodb_table = self.dynamodb_url.path.lstrip("/")
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.translator = DynamoDBCrateDBTranslator(table_name=self.cratedb_table)

self.progress = progress

def start(self):
"""
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB.
"""
records_in = self.dynamodb_adapter.count_records(self.dynamodb_table)
logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}")
with self.cratedb_adapter.engine.connect() as connection:
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table)
records_out = 0
for sql in self.items_to_sql(result["Items"]):
if sql:
try:
connection.execute(sa.text(sql))
records_out += 1
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
progress_bar.update()
progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out < records_in:
logger.warning("No data has been copied")

def items_to_sql(self, items):
"""
Convert data for record items to INSERT statements.
"""
for item in items:
yield self.translator.to_sql(item)


class DynamoDBCrateDBTranslator(DynamoCDCTranslatorCrateDB):
@property
def sql_ddl(self):
"""`
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
"""
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
"""
values_clause = self.image_to_values(record)
sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES ('{values_clause}');"
return sql

@staticmethod
def quote_table_name(name: str):
# TODO @ Upstream: Quoting table names should be the responsibility of the caller.
return name
2 changes: 2 additions & 0 deletions cratedb_toolkit/io/processor/kinesis_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import logging
import os
import sys
import typing as t

import sqlalchemy as sa
from commons_codec.exception import UnknownOperationError
Expand Down Expand Up @@ -77,6 +78,7 @@

# TODO: Automatically create destination table.
# TODO: Propagate mapping definitions and other settings.
cdc: t.Union[DMSTranslatorCrateDB, DynamoCDCTranslatorCrateDB]
if MESSAGE_FORMAT == "dms":
cdc = DMSTranslatorCrateDB(column_types=column_types)
elif MESSAGE_FORMAT == "dynamodb":
Expand Down
41 changes: 41 additions & 0 deletions cratedb_toolkit/testing/testcontainers/localstack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os

from testcontainers.localstack import LocalStackContainer

from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer


class LocalStackContainerWithKeepalive(KeepaliveContainer, LocalStackContainer):
"""
A Testcontainer for LocalStack with improved configurability.
It honors the `TC_KEEPALIVE` and `LOCALSTACK_VERSION` environment variables.
Defining `TC_KEEPALIVE` will set a signal not to shut down the container
after running the test cases, in order to speed up subsequent invocations.
`LOCALSTACK_VERSION` will define the designated LocalStack version, which is
useful when used within a test matrix. Its default value is `latest`.
"""

LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "latest")

def __init__(
self,
image: str = f"localstack/localstack:{LOCALSTACK_VERSION}",
**kwargs,
) -> None:
super().__init__(image=image, **kwargs)
self.with_name("testcontainers-localstack")
6 changes: 4 additions & 2 deletions cratedb_toolkit/testing/testcontainers/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def start(self):
define the `CRATEDB_KEEPALIVE` or `TC_KEEPALIVE` environment variables.
"""

self._configure()
if hasattr(self, "_configure"):
self._configure()

if self._name is None:
raise ValueError(
Expand Down Expand Up @@ -110,7 +111,8 @@ def start(self):
logger.info(f"Starting container: {container_id} ({container_name})")
self._container.start()

self._connect()
if hasattr(self, "_connect"):
self._connect()
return self

def stop(self, **kwargs):
Expand Down
Loading

0 comments on commit 12e7313

Please sign in to comment.