diff --git a/cratedb_toolkit/iac/__init__.py b/cratedb_toolkit/iac/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/iac/aws.py b/cratedb_toolkit/iac/aws.py new file mode 100644 index 00000000..904af12c --- /dev/null +++ b/cratedb_toolkit/iac/aws.py @@ -0,0 +1,9 @@ +from lorrystream.carabas.aws.function.model import LambdaFactory +from lorrystream.carabas.aws.function.oci import LambdaPythonImage +from lorrystream.carabas.aws.stack import DynamoDBKinesisPipe + +__all__ = [ + "LambdaFactory", + "LambdaPythonImage", + "DynamoDBKinesisPipe", +] diff --git a/cratedb_toolkit/io/processor/__init__.py b/cratedb_toolkit/io/processor/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/io/processor/kinesis_lambda.py b/cratedb_toolkit/io/processor/kinesis_lambda.py new file mode 100644 index 00000000..bc948f3a --- /dev/null +++ b/cratedb_toolkit/io/processor/kinesis_lambda.py @@ -0,0 +1,96 @@ +# Copyright (c) 2021-2024, Crate.io Inc. +# Distributed under the terms of the Apache 2 license. +""" +Consume an AWS Kinesis Stream and relay into CrateDB. +https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html +https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html +https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html#with-kinesis-example-create-function + +In order to run, this module/program needs the following 3rd party +libraries, defined using inline script metadata. +""" + +# /// script +# requires-python = ">=3.9" +# dependencies = [ +# "commons-codec==0.0.2", +# "sqlalchemy-cratedb==0.38.0", +# ] +# /// +import base64 +import json +import logging +import os +import sys +import typing as t + +import sqlalchemy as sa +from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB + +logger = logging.getLogger(__name__) + +# TODO: Control using environment variable. +logger.setLevel("INFO") + +# TODO: Control using environment variables. +USE_BATCH_PROCESSING: bool = False +ON_ERROR: t.Literal["exit", "noop", "raise"] = "exit" + +# TODO: Control `echo` using environment variable. +engine = sa.create_engine(os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://"), echo=True) + +# TODO: Automatically create destination table? How? +cdc = DynamoCDCTranslatorCrateDB(table_name=os.environ.get("CRATEDB_TABLE", "default")) + + +def handler(event, context): + """ + Implement partial batch response for Lambda functions that receive events from + a Kinesis stream. The function reports the batch item failures in the response, + signaling to Lambda to retry those messages later. + """ + + cur_record_sequence_number = "" + logger.info("context: %s", context) + + for record in event["Records"]: + try: + # Log and decode event. + # TODO: Remove log statements. + logger.info(f"Processed Kinesis Event - EventID: {record['eventID']}") + logger.info(f"Event Data: {record}") + record_data = json.loads(base64.b64decode(record["kinesis"]["data"]).decode("utf-8")) + logger.info(f"Record Data: {record_data}") + + # Process record. + sql = cdc.to_sql(record_data) + run_sql(sql) + + # Bookkeeping. + cur_record_sequence_number = record["kinesis"]["sequenceNumber"] + + except Exception as ex: + error_message = "An error occurred" + logger.exception(error_message) + if USE_BATCH_PROCESSING: + # Return failed record's sequence number. + return {"batchItemFailures": [{"itemIdentifier": cur_record_sequence_number}]} + if ON_ERROR == "exit": + sys.exit(6) + if ON_ERROR == "raise": + raise ex + + logger.info(f"Successfully processed {len(event['Records'])} records.") + if USE_BATCH_PROCESSING: + return {"batchItemFailures": []} + return None + + +def run_sql(sql: str): + """ + Execute an SQL statement. + + TODO: Optimize performance. + """ + with engine.connect() as connection: + connection.execute(sa.text(sql)) diff --git a/doc/io/dynamodb/cdc.md b/doc/io/dynamodb/cdc.md new file mode 100644 index 00000000..abfdf7e3 --- /dev/null +++ b/doc/io/dynamodb/cdc.md @@ -0,0 +1,255 @@ +# DynamoDB CDC Relay + + +## What's Inside +- A convenient [Infrastructure as code (IaC)] procedure to define data pipelines on [AWS]. +- Written in Python, using [AWS CloudFormation] stack deployments. To learn + what's behind, see also [How CloudFormation works]. +- Code for running on [AWS Lambda] is packaged into [OCI] images, for efficient + delta transfers, built-in versioning, and testing purposes. + + +## Details +- This specific document includes a few general guidelines, and a + a few specifics coming from `examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py`. +- That program defines a pipeline which looks like this: + + DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB Cloud + +For exercising an AWS pipeline, you need two components: The IaC description, +and a record processor implementation for the AWS Lambda. + +The IaC description will deploy a complete software stack for demonstration +purposes, including a DynamoDB Table, connected to a Kinesis Stream. + + +## Prerequisites + +### CrateDB +This walkthrough assumes a running CrateDB cluster, and focuses on CrateDB Cloud. +It does not provide relevant guidelines to set up a cluster, yet. + +### OCI image +In order to package code for AWS Lambda functions packages into OCI images, +and use them, you will need to publish them to the AWS ECR container image +registry. + +You will need to authenticate your local Docker environment, and create a +container image repository once for each project using a different runtime +image. + +Define your AWS ID, region label, and repository name, to be able to use +the templated commands 1:1. +```shell +aws_id=831394476016 +aws_region=eu-central-1 +repository_name=kinesis-cratedb-processor-lambda +``` +```shell +aws ecr get-login-password --region=${aws_region} | \ + docker login --username AWS --password-stdin ${aws_id}.dkr.ecr.${aws_region}.amazonaws.com +``` + +(ecr-repository)= +### ECR Repository +Just once, before proceeding, create an image repository hosting the runtime +code for your Lambda function. +```shell +aws ecr create-repository --region=${aws_region} \ + --repository-name=${repository_name} --image-tag-mutability=MUTABLE +``` +In order to allow others to pull that image, you will need to define a +[repository policy] using the [set-repository-policy] subcommend of the AWS CLI. +In order to invoke that command, put the [](project:#ecr-repository-policy) +JSON definition into a file called `policy.json`. +```shell +aws ecr set-repository-policy --repository-name=${repository_name} --policy-text file://policy.json +``` + + +## Install +In order to exercise the example outlined below, you need to install +CrateDB Toolkit with the "kinesis" extension, because CDC data will be +relayed using AWS Kinesis. +```shell +pip install 'cratedb-toolkit[kinesis]' +``` + + +## Usage + +:::{rubric} Configure +::: +```shell +export CRATEDB_HTTP_URL='https://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/' +export CRATEDB_SQLALCHEMY_URL='crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true' +``` + +:::{rubric} CrateDB Table +::: +The destination table name in CrateDB, where the CDC record +processor will re-materialize CDC events into. +```shell +pip install crash +crash --hosts "${CRATEDB_HTTP_URL}" -c 'CREATE TABLE "demo-sink" (data OBJECT(DYNAMIC));' +``` + +:::{rubric} Invoke pipeline +::: +Package the Lambda function, upload it, and deploy demo software stack. +```shell +python dynamodb_kinesis_lambda_oci_cratedb.py +``` +For example, choose those two variants: + +- IaC driver: [dynamodb_kinesis_lambda_oci_cratedb.py] +- Record processor: [kinesis_lambda.py] + +Putting them next to each other into a directory, and adjusting +`LambdaPythonImage(entrypoint_file=...)` to point to the second, +should be enough to get you started. + + +:::{rubric} Trigger CDC events +::: +Inserting a document into the DynamoDB table, and updating it, will trigger two CDC events. +```shell +READING_SQL="{'timestamp': '2024-07-12T01:17:42', 'device': 'foo', 'temperature': 42.42, 'humidity': 84.84}" +READING_WHERE="\"device\"='foo' AND \"timestamp\"='2024-07-12T01:17:42'" + +aws dynamodb execute-statement --statement \ + "INSERT INTO \"demo-source\" VALUE ${READING_SQL};" + +aws dynamodb execute-statement --statement \ + "UPDATE \"demo-source\" SET temperature=43.59 WHERE ${READING_WHERE};" +``` + +:::{rubric} Query data in CrateDB +::: +When the stream delivered the CDC data to the processor, and everything worked well, +data should have materialized in the target table in CrateDB. +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command \ + 'SELECT * FROM "demo-sink";' +``` + +:::{rubric} Shut down AWS stack +::: +In order to complete the experiment, you may want to shut down the AWS stack again. +```shell +aws cloudformation delete-stack --stack-name testdrive-dynamodb-dev +``` + + +## Appendix + +### Processor +Check status of Lambda function. +```shell +aws lambda get-function \ + --function-name arn:aws:lambda:eu-central-1:831394476016:function:testdrive-dynamodb-dev-lambda-processor +``` +Check status of stream mapping(s). +```shell +aws lambda list-event-source-mappings +``` +Check logs. +```shell +aws logs describe-log-groups +aws logs start-live-tail --log-group-identifiers arn:aws:logs:eu-central-1:831394476016:log-group:/aws/lambda/DynamoDBCrateDBProcessor +``` + +### Database + +There are a few utility commands that help you operate the stack, that have not +been absorbed yet. See also [Monitoring and troubleshooting Lambda functions]. + +Query records in CrateDB table. +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command \ + 'SELECT * FROM "demo-sink";' +``` + +Truncate CrateDB table. +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command \ + 'DELETE FROM "demo-sink";' +``` + +Query documents in DynamoDB table. +```shell +aws dynamodb execute-statement --statement \ + "SELECT * FROM \"demo-source\";" +``` + +Truncate DynamoDB collection. +```shell +aws dynamodb execute-statement --statement \ + "DELETE FROM \"demo-source\" WHERE ${READING_WHERE};" +``` + +(ecr-repository-policy)= +### ECR Repository Policy +```json +{ + "Version": "2008-10-17", + "Statement": [ + { + "Sid": "allow public pull", + "Effect": "Allow", + "Principal": "*", + "Action": [ + "ecr:BatchCheckLayerAvailability", + "ecr:BatchGetImage", + "ecr:GetDownloadUrlForLayer" + ] + } + ] +} +``` + +## Troubleshooting + +### ECR Repository +If you receive such an error message, your session has expired, and you need +to re-run the authentication step. +```text +denied: Your authorization token has expired. Reauthenticate and try again. +``` + +This error message indicates your ECR repository does not exist. The solution +is to create it, using the command shared above. +```text +name unknown: The repository with name 'kinesis-cratedb-processor-lambda' does +not exist in the registry with id '831394476016' +``` + +### AWS CloudFormation +If you receive such an error, ... +```text +botocore.exceptions.ClientError: An error occurred (ValidationError) when calling +the CreateChangeSet operation: Stack:arn:aws:cloudformation:eu-central-1:931394475905:stack/testdrive-dynamodb-dev/ea8c32e0-492c-11ef-b9b3-06b708ecd03f +is in UPDATE_ROLLBACK_FAILED state and can not be updated. +``` +because some detail when deploying or updating the CloudFormation recipe fails, +the CloudFormation stack is stuck, and you will need to [continue rolling back +an update] manually. +```shell +aws cloudformation continue-update-rollback --stack-name testdrive-dynamodb-dev +``` + + + +[AWS]: https://en.wikipedia.org/wiki/Amazon_Web_Services +[AWS CloudFormation]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/Welcome.html +[AWS Lambda]: https://en.wikipedia.org/wiki/AWS_Lambda +[continue rolling back an update]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-continueupdaterollback.html +[dynamodb_kinesis_lambda_oci_cratedb.py]: https://github.com/crate/cratedb-toolkit/blob/main/examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py +[example program]: https://github.com/crate/cratedb-toolkit/tree/main/examples/aws +[How CloudFormation works]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cloudformation-overview.html +[Infrastructure as code (IaC)]: https://en.wikipedia.org/wiki/Infrastructure_as_code +[kinesis_lambda.py]: https://github.com/crate/cratedb-toolkit/blob/main/cratedb_toolkit/io/processor/kinesis_lambda.py +[Monitoring and troubleshooting Lambda functions]: https://docs.aws.amazon.com/lambda/latest/dg/lambda-monitoring.html +[OCI]: https://en.wikipedia.org/wiki/Open_Container_Initiative +[repository policy]: https://docs.aws.amazon.com/lambda/latest/dg/images-create.html#gettingstarted-images-permissions +[set-repository-policy]: https://docs.aws.amazon.com/cli/latest/reference/ecr/set-repository-policy.html diff --git a/doc/io/dynamodb/index.md b/doc/io/dynamodb/index.md new file mode 100644 index 00000000..fb589920 --- /dev/null +++ b/doc/io/dynamodb/index.md @@ -0,0 +1,12 @@ +(dynamodb)= +# DynamoDB I/O Subsystem + +## About +Using the DynamoDB subsystem, you can transfer data from and to DynamoDB. + + +```{toctree} +:maxdepth: 1 + +cdc +``` diff --git a/doc/io/index.md b/doc/io/index.md index e3c6b64f..c85b5ed0 100644 --- a/doc/io/index.md +++ b/doc/io/index.md @@ -86,6 +86,7 @@ ctk shell --command="SELECT * FROM data_weather LIMIT 10;" --format=json :maxdepth: 2 :hidden: +DynamoDB InfluxDB MongoDB ``` diff --git a/examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py b/examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py new file mode 100644 index 00000000..181d1b4d --- /dev/null +++ b/examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py @@ -0,0 +1,69 @@ +import logging +import os +from pathlib import Path + +from lorrystream.util.common import setup_logging + +from cratedb_toolkit.iac.aws import DynamoDBKinesisPipe, LambdaFactory, LambdaPythonImage + +logger = logging.getLogger(__name__) + + +def main(): + """ + A recipe to deploy a data relay stack to Amazon AWS. + + Pipeline: + - DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB + + Ingredients: + - DynamoDB CDC to Kinesis + - Lambda function, shipped per OCI image + - CrateDB Cloud + + Prerequisites: Register an OCI repository. + """ + + # Build and publish OCI image that includes the AWS Lambda function. + python_image = LambdaPythonImage( + name="kinesis-cratedb-processor-lambda", + entrypoint_file=Path("./cratedb_toolkit/io/processor/kinesis_lambda.py"), + entrypoint_handler="kinesis_lambda.handler", + ) + python_image.publish() + + # Define an AWS CloudFormation software stack. + stack = DynamoDBKinesisPipe( + project="testdrive-dynamodb", + stage="dev", + region="eu-central-1", + description="DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB", + table_name="demo-source", + stream_name="dynamodb-cdc", + environment={ + "CRATEDB_SQLALCHEMY_URL": os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://"), + "CRATEDB_TABLE": "demo-sink", + }, + ) + + # Add components to the stack. + stack.table().processor( + LambdaFactory( + name="DynamoDBCrateDBProcessor", + oci_uri=python_image.uri, + handler=python_image.entrypoint_handler, + ) + ).connect() + + # Deploy stack. + stack.deploy() + logger.info(f"Deployed stack: {stack}") + + # Refresh the OCI image. + # TODO: Detect when changed. + stack.deploy_processor_image() + + +if __name__ == "__main__": + setup_logging() + main() diff --git a/pyproject.toml b/pyproject.toml index fbb508b7..89318200 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -149,6 +149,9 @@ io = [ "pandas<3,>=1", "sqlalchemy>=2", ] +kinesis = [ + "lorrystream @ git+https://github.com/daq-tools/lorrystream.git@55cf456fdcd3", +] mongodb = [ "commons-codec[mongodb]==0.0.2", "cratedb-toolkit[io]",