From ebe01652446fb7554df9da9f728e3e69d2e96d8e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sat, 24 Aug 2024 15:34:14 +0200 Subject: [PATCH] DMS/DynamoDB/MongoDB: Use SQL with parameters instead of inlining values - `SQLOperation` bundles data about an SQL operation, including statement and parameters. - Also, refactor `DynamoDBFullLoadTranslator` to `commons-codec`. --- CHANGES.md | 1 + cratedb_toolkit/io/dynamodb/copy.py | 44 ++---- cratedb_toolkit/io/mongodb/cdc.py | 6 +- .../io/processor/kinesis_lambda.py | 17 ++- pyproject.toml | 5 +- tests/io/mongodb/test_transformation.py | 1 + tests/io/test_processor.py | 136 +++++++++++++++++- 7 files changed, 163 insertions(+), 47 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 5f407ef5..a4f476ec 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - MongoDB: Fix and verify Zyp transformations +- DMS/DynamoDB/MongoDB I/O: Use SQL with parameters instead of inlining values ## 2024/08/21 v0.0.18 - Dependencies: Unpin commons-codec, to always use the latest version diff --git a/cratedb_toolkit/io/dynamodb/copy.py b/cratedb_toolkit/io/dynamodb/copy.py index eafff70d..d6af7f51 100644 --- a/cratedb_toolkit/io/dynamodb/copy.py +++ b/cratedb_toolkit/io/dynamodb/copy.py @@ -1,9 +1,8 @@ # ruff: noqa: S608 import logging -import typing as t import sqlalchemy as sa -from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB +from commons_codec.transform.dynamodb import DynamoDBFullLoadTranslator from tqdm import tqdm from yarl import URL @@ -34,7 +33,7 @@ def __init__( self.dynamodb_table = self.dynamodb_url.path.lstrip("/") self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) - self.translator = DynamoDBCrateDBTranslator(table_name=self.cratedb_table) + self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table) self.progress = progress @@ -53,45 +52,22 @@ def start(self): progress_bar = tqdm(total=records_in) result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table) records_out = 0 - for sql in self.items_to_sql(result["Items"]): - if sql: - try: - connection.execute(sa.text(sql)) - records_out += 1 - except sa.exc.ProgrammingError as ex: - logger.warning(f"Running query failed: {ex}") - progress_bar.update() + for operation in self.items_to_operations(result["Items"]): + try: + connection.execute(sa.text(operation.statement), operation.parameters) + records_out += 1 + except sa.exc.ProgrammingError as ex: + logger.warning(f"Running query failed: {ex}") + progress_bar.update() progress_bar.close() connection.commit() logger.info(f"Number of records written: {records_out}") if records_out < records_in: logger.warning("No data has been copied") - def items_to_sql(self, items): + def items_to_operations(self, items): """ Convert data for record items to INSERT statements. """ for item in items: yield self.translator.to_sql(item) - - -class DynamoDBCrateDBTranslator(DynamoCDCTranslatorCrateDB): - @property - def sql_ddl(self): - """` - Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. - """ - return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" - - def to_sql(self, record: t.Dict[str, t.Any]) -> str: - """ - Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record. - """ - values_clause = self.image_to_values(record) - sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES ('{values_clause}');" - return sql - - @staticmethod - def quote_table_name(name: str): - # TODO @ Upstream: Quoting table names should be the responsibility of the caller. - return name diff --git a/cratedb_toolkit/io/mongodb/cdc.py b/cratedb_toolkit/io/mongodb/cdc.py index 60d82138..67be74e1 100644 --- a/cratedb_toolkit/io/mongodb/cdc.py +++ b/cratedb_toolkit/io/mongodb/cdc.py @@ -44,9 +44,9 @@ def start(self): # FIXME: Note that the function does not perform any sensible error handling yet. with self.cratedb_adapter.engine.connect() as connection: connection.execute(sa.text(self.cdc.sql_ddl)) - for sql in self.cdc_to_sql(): - if sql: - connection.execute(sa.text(sql)) + for operation in self.cdc_to_sql(): + if operation: + connection.execute(sa.text(operation.statement), operation.parameters) def cdc_to_sql(self): """ diff --git a/cratedb_toolkit/io/processor/kinesis_lambda.py b/cratedb_toolkit/io/processor/kinesis_lambda.py index d6a9f0bf..4d661d34 100644 --- a/cratedb_toolkit/io/processor/kinesis_lambda.py +++ b/cratedb_toolkit/io/processor/kinesis_lambda.py @@ -25,7 +25,7 @@ # /// script # requires-python = ">=3.9" # dependencies = [ -# "commons-codec", +# "commons-codec>=0.0.12", # "sqlalchemy-cratedb==0.38.0", # ] # /// @@ -40,7 +40,7 @@ from commons_codec.exception import UnknownOperationError from commons_codec.model import ColumnTypeMapStore from commons_codec.transform.aws_dms import DMSTranslatorCrateDB -from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator from sqlalchemy.util import asbool LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO") @@ -78,11 +78,11 @@ # TODO: Automatically create destination table. # TODO: Propagate mapping definitions and other settings. -cdc: t.Union[DMSTranslatorCrateDB, DynamoCDCTranslatorCrateDB] +cdc: t.Union[DMSTranslatorCrateDB, DynamoDBCDCTranslator] if MESSAGE_FORMAT == "dms": cdc = DMSTranslatorCrateDB(column_types=column_types) elif MESSAGE_FORMAT == "dynamodb": - cdc = DynamoCDCTranslatorCrateDB(table_name=CRATEDB_TABLE) + cdc = DynamoDBCDCTranslator(table_name=CRATEDB_TABLE) # Create the database connection outside the handler to allow # connections to be re-used by subsequent function invocations. @@ -123,8 +123,13 @@ def handler(event, context): logger.debug(f"Record Data: {record_data}") # Process record. - sql = cdc.to_sql(record_data) - connection.execute(sa.text(sql)) + operation = cdc.to_sql(record_data) + connection.execute(sa.text(operation.statement), parameters=operation.parameters) + + # Processing alternating CDC events requires write synchronization. + # FIXME: Needs proper table name quoting. + connection.execute(sa.text(f"REFRESH TABLE {CRATEDB_TABLE}")) + connection.commit() # Bookkeeping. diff --git a/pyproject.toml b/pyproject.toml index 7cce4298..2c9ae342 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -139,7 +139,7 @@ docs = [ ] dynamodb = [ "boto3", - "commons-codec", + "commons-codec>=0.0.12", ] full = [ "cratedb-toolkit[cfr,cloud,datasets,io,service]", @@ -155,10 +155,11 @@ io = [ "sqlalchemy>=2", ] kinesis = [ + "commons-codec>=0.0.12", "lorrystream[carabas]", ] mongodb = [ - "commons-codec[mongodb,zyp]>=0.0.4", + "commons-codec[mongodb,zyp]>=0.0.12", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1", diff --git a/tests/io/mongodb/test_transformation.py b/tests/io/mongodb/test_transformation.py index d360c7dd..1ef416cb 100644 --- a/tests/io/mongodb/test_transformation.py +++ b/tests/io/mongodb/test_transformation.py @@ -8,6 +8,7 @@ pytestmark = pytest.mark.mongodb pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") +pytest.importorskip("bsonjs", reason="Skipping tests because bsonjs is not installed") pytest.importorskip("rich", reason="Skipping tests because rich is not installed") from cratedb_toolkit.io.mongodb.api import mongodb_copy # noqa: E402 diff --git a/tests/io/test_processor.py b/tests/io/test_processor.py index 49e5bc37..6de1bad0 100644 --- a/tests/io/test_processor.py +++ b/tests/io/test_processor.py @@ -1,3 +1,5 @@ +import base64 +import json import os import sys @@ -7,6 +9,80 @@ pytest.importorskip("commons_codec", reason="Only works with commons-codec installed") +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402 + +DYNAMODB_CDC_INSERT_NESTED = { + "awsRegion": "us-east-1", + "eventID": "b581c2dc-9d97-44ed-94f7-cb77e4fdb740", + "eventName": "INSERT", + "userIdentity": None, + "recordFormat": "application/json", + "tableName": "table-testdrive-nested", + "dynamodb": { + "ApproximateCreationDateTime": 1720800199717446, + "Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}}, + "NewImage": { + "id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}, + "data": {"M": {"temperature": {"N": "42.42"}, "humidity": {"N": "84.84"}}}, + "meta": {"M": {"timestamp": {"S": "2024-07-12T01:17:42"}, "device": {"S": "foo"}}}, + "string_set": {"SS": ["location_1"]}, + "number_set": {"NS": [1, 2, 3, 0.34]}, + "binary_set": {"BS": ["U3Vubnk="]}, + "somemap": { + "M": { + "test": {"N": 1}, + "test2": {"N": 2}, + } + }, + }, + "SizeBytes": 156, + "ApproximateCreationDateTimePrecision": "MICROSECOND", + }, + "eventSource": "aws:dynamodb", +} + +DYNAMODB_CDC_MODIFY_NESTED = { + "awsRegion": "us-east-1", + "eventID": "24757579-ebfd-480a-956d-a1287d2ef707", + "eventName": "MODIFY", + "userIdentity": None, + "recordFormat": "application/json", + "tableName": "foo", + "dynamodb": { + "ApproximateCreationDateTime": 1720742302233719, + "Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}}, + "NewImage": { + "id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}, + "device": {"M": {"id": {"S": "bar"}, "serial": {"N": 12345}}}, + "tags": {"L": [{"S": "foo"}, {"S": "bar"}]}, + "empty_map": {"M": {}}, + "empty_list": {"L": []}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + "string_set": {"SS": ["location_1"]}, + "number_set": {"NS": [1, 2, 3, 0.34]}, + "binary_set": {"BS": ["U3Vubnk="]}, + "somemap": { + "M": { + "test": {"N": 1}, + "test2": {"N": 2}, + } + }, + "list_of_objects": {"L": [{"M": {"foo": {"S": "bar"}}}, {"M": {"baz": {"S": "qux"}}}]}, + }, + "OldImage": { + "NOTE": "This event does not match the INSERT record", + "humidity": {"N": "84.84"}, + "temperature": {"N": "42.42"}, + "location": {"S": "Sydney"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + "device": {"M": {"id": {"S": "bar"}, "serial": {"N": 12345}}}, + }, + "SizeBytes": 161, + "ApproximateCreationDateTimePrecision": "MICROSECOND", + }, + "eventSource": "aws:dynamodb", +} + @pytest.fixture def reset_handler(): @@ -16,9 +92,9 @@ def reset_handler(): pass -def test_processor_invoke_no_records(reset_handler, mocker, caplog): +def test_processor_kinesis_dms_no_records(reset_handler, mocker, caplog): """ - Roughly verify that the unified Lambda handler works. + Roughly verify that the unified Lambda handler works with AWS DMS. """ # Configure environment variables. @@ -33,3 +109,59 @@ def test_processor_invoke_no_records(reset_handler, mocker, caplog): handler(event, None) assert "Successfully processed 0 records" in caplog.messages + + +def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker, caplog): + """ + Roughly verify that the unified Lambda handler works with AWS DynamoDB. + """ + + # Define target table name. + table_name = '"testdrive"."demo"' + + # Create target table. + cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl) + + # Configure Lambda processor per environment variables. + handler_environment = { + "CRATEDB_SQLALCHEMY_URL": cratedb.get_connection_url(), + "MESSAGE_FORMAT": "dynamodb", + "CRATEDB_TABLE": table_name, + } + mocker.patch.dict(os.environ, handler_environment) + + from cratedb_toolkit.io.processor.kinesis_lambda import handler + + # Define two CDC events: INSERT and UPDATE. + # They have to be conveyed separately because CrateDB needs a + # `REFRESH TABLE` operation between them. + event = { + "Records": [ + wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), + wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED), + ] + } + + # Run transfer command. + handler(event, None) + + # Verify outcome of processor, per validating log output. + assert "Successfully processed 2 records" in caplog.messages + + # Verify data in target database. + assert cratedb.database.count_records(table_name) == 1 + results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608 + assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}] + + +def wrap_kinesis(data): + """ + Wrap a CDC event into a Kinesis message, to satisfy the interface of the Lambda processor. + """ + return { + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "kinesis": { + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": base64.b64encode(json.dumps(data).encode("utf-8")), + }, + }