diff --git a/CHANGES.md b/CHANGES.md index f8a0d44f..e4681e7f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased +- `ctk load table`: Added support for MongoDB Change Streams ## 2024/07/08 v0.0.15 - IO: Added the `if-exists` query parameter by updating to influxio 0.4.0. diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 84a84243..5ea2e5fe 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -122,11 +122,17 @@ def load_table(self, resource: InputOutputResource, target: TableAddress): logger.error(msg) raise OperationFailed(msg) elif source_url.startswith("mongodb"): - from cratedb_toolkit.io.mongodb.api import mongodb_copy + if "+cdc" in source_url: + source_url = source_url.replace("+cdc", "") + from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc - if not mongodb_copy(source_url, target_url, progress=True): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) + mongodb_relay_cdc(source_url, target_url, progress=True) + else: + from cratedb_toolkit.io.mongodb.api import mongodb_copy + + if not mongodb_copy(source_url, target_url, progress=True): + msg = "Data loading failed" + logger.error(msg) + raise OperationFailed(msg) else: raise NotImplementedError("Importing resource not implemented yet") diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 46e163f0..a8f73bfb 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -1,6 +1,7 @@ import argparse import logging +from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB from cratedb_toolkit.io.mongodb.core import export, extract, translate from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util.cr8 import cr8_insert_json @@ -68,3 +69,41 @@ def mongodb_copy(source_url, target_url, progress: bool = False): cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname) return True + + +def mongodb_relay_cdc(source_url, target_url, progress: bool = False): + """ + Synopsis + -------- + export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo-cdc + ctk load table mongodb+cdc://localhost:27017/testdrive/demo + + Backlog + ------- + TODO: Run on multiple collections. + TODO: Run on the whole database. + TODO: Accept parameters like `if_exists="append,replace"`. + TODO: Propagate parameters like `scan="full"`. + """ + logger.info("Running MongoDB CDC relay") + + # Decode database URL. + mongodb_address = DatabaseAddress.from_string(source_url) + mongodb_uri, mongodb_collection_address = mongodb_address.decode() + mongodb_database = mongodb_collection_address.schema + mongodb_collection = mongodb_collection_address.table + + cratedb_address = DatabaseAddress.from_string(target_url) + cratedb_uri, cratedb_table_address = cratedb_address.decode() + + # Configure machinery. + relay = MongoDBCDCRelayCrateDB( + mongodb_url=str(mongodb_uri), + mongodb_database=mongodb_database, + mongodb_collection=mongodb_collection, + cratedb_sqlalchemy_url=str(cratedb_uri), + cratedb_table=cratedb_table_address.fullname, + ) + + # Invoke machinery. + relay.start() diff --git a/cratedb_toolkit/io/mongodb/cdc.py b/cratedb_toolkit/io/mongodb/cdc.py new file mode 100644 index 00000000..60d82138 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/cdc.py @@ -0,0 +1,61 @@ +""" +Basic relaying of a MongoDB Change Stream into CrateDB table. + +Documentation: +- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md +- https://www.mongodb.com/docs/manual/changeStreams/ +- https://www.mongodb.com/developer/languages/python/python-change-streams/ +""" + +import logging + +import pymongo +import sqlalchemy as sa +from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB + +from cratedb_toolkit.util import DatabaseAdapter + +logger = logging.getLogger(__name__) + + +class MongoDBCDCRelayCrateDB: + """ + Relay MongoDB Change Stream into CrateDB table. + """ + + def __init__( + self, + mongodb_url: str, + mongodb_database: str, + mongodb_collection: str, + cratedb_sqlalchemy_url: str, + cratedb_table: str, + ): + self.cratedb_adapter = DatabaseAdapter(cratedb_sqlalchemy_url, echo=True) + self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url) + self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] + self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table) + self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name) + + def start(self): + """ + Subscribe to change stream events, convert to SQL, and submit to CrateDB. + """ + # FIXME: Note that the function does not perform any sensible error handling yet. + with self.cratedb_adapter.engine.connect() as connection: + connection.execute(sa.text(self.cdc.sql_ddl)) + for sql in self.cdc_to_sql(): + if sql: + connection.execute(sa.text(sql)) + + def cdc_to_sql(self): + """ + Subscribe to change stream events, and emit corresponding SQL statements. + """ + # Note that `.watch()` will block until events are ready for consumption, so + # this is not a busy loop. + # FIXME: Note that the function does not perform any sensible error handling yet. + while True: + with self.mongodb_collection.watch(full_document="updateLookup") as change_stream: + for change in change_stream: + yield self.cdc.to_sql(change) diff --git a/doc/io/mongodb/cdc.md b/doc/io/mongodb/cdc.md new file mode 100644 index 00000000..8010efb5 --- /dev/null +++ b/doc/io/mongodb/cdc.md @@ -0,0 +1,156 @@ +(mongodb-cdc-relay)= +# MongoDB CDC Relay + +## About +Relay a [MongoDB Change Stream] into a [CrateDB] table using a one-stop command +`ctk load table mongodb+cdc://...`, or `mongodb+srv+cdc://` for MongoDB Atlas. + +You can use it in order to facilitate convenient data transfers to be used +within data pipelines or ad hoc operations. It can be used as a CLI interface, +and as a library. + + +## Install +```shell +pip install --upgrade 'cratedb-toolkit[mongodb]' +``` + +:::{tip} +The tutorial also uses the programs `crash`, `mongosh`, and `atlas`. `crash` +will be installed with CrateDB Toolkit, but `mongosh` and `atlas` must be +installed by other means. If you are using Docker anyway, please use those +command aliases to provide them to your environment without actually needing +to install them. + +```shell +alias mongosh='docker run -i --rm --network=host mongo:7 mongosh' +``` + +The `atlas` program needs to store authentication information between invocations, +therefore you need to supply a storage volume. +```shell +mkdir atlas-config +alias atlas='docker run --rm -it --volume=$(pwd)/atlas-config:/root mongodb/atlas atlas' +``` +::: + + +## Usage + +(mongodb-cdc-workstation)= +### Workstation +The guidelines assume that both services, CrateDB and MongoDB, are listening on +`localhost`. +Please find guidelines how to provide them on your workstation using +Docker or Podman in the {ref}`mongodb-services-standalone` section. +```shell +export MONGODB_URL=mongodb://localhost/testdrive +export MONGODB_URL_CTK=mongodb+cdc://localhost/testdrive/demo +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost/testdrive/demo-cdc +ctk load table "${MONGODB_URL_CTK}" +``` + +Insert document into MongoDB collection, and update it. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})' +mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })' +``` + +Query data in CrateDB. +```shell +crash --command 'SELECT * FROM "testdrive"."demo-cdc";' +``` + +Invoke a delete operation, and check data in CrateDB once more. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.deleteOne({"foo": "bar"})' +crash --command 'SELECT * FROM "testdrive"."demo-cdc";' +``` + +(mongodb-cdc-cloud)= +### Cloud +The guidelines assume usage of cloud variants for both services, CrateDB Cloud +and MongoDB Atlas. +Please find guidelines how to provision relevant cloud resources in the +{ref}`mongodb-services-cloud` section. You will need authentication credentials +from this step for the next one. + +:::{rubric} Invoke pipeline +::: +A canonical invocation for ingesting MongoDB Atlas Change Streams into +CrateDB Cloud. + +```shell +export MONGODB_URL=mongodb+srv://user:password@testdrive.jaxmmfp.mongodb.net/testdrive +export MONGODB_URL_CTK=mongodb+srv+cdc://user:password@testdrive.jaxmmfp.mongodb.net/testdrive/demo +export CRATEDB_HTTP_URL="https://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/" +export CRATEDB_SQLALCHEMY_URL="crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/testdrive/demo-cdc?ssl=true" +``` +```shell +ctk load table "${MONGODB_URL_CTK}" +``` + +:::{note} +Please note the `mongodb+srv://` and `mongodb+srv+cdc://` URL schemes, and the +`ssl=true` query parameter. Both are needed to establish connectivity with +MongoDB Atlas and CrateDB. +::: + +:::{rubric} Trigger CDC events +::: +Inserting a document into the MongoDB collection, and updating it, will trigger two CDC events. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})' +mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })' +``` + +:::{rubric} Query data in CrateDB +::: +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command 'SELECT * FROM "testdrive"."demo-cdc";' +``` + + +## Appendix +A few operations that are handy when exploring this exercise. + +### Database Services +Provide CrateDB and MongoDB services. +- See {ref}`mongodb-services`. + +### Database Operations + +Query records in CrateDB table. +```shell +crash --command 'SELECT * FROM "testdrive"."demo-cdc";' +``` + +Truncate CrateDB table. +```shell +crash --command 'DELETE FROM "testdrive"."demo-cdc";' +``` + +Query documents in MongoDB collection. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.find()' +``` + +Truncate MongoDB collection. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.drop()' +``` + + +## Backlog +:::{todo} +- Improve general CLI UX/DX, for example by using `ctk shell`. +- Provide [SDK and CLI for CrateDB Cloud Cluster APIs], for improving Cloud DX. +::: + + +[commons-codec]: https://pypi.org/project/commons-codec/ +[CrateDB]: https://cratedb.com/docs/guide/home/ +[CrateDB Cloud]: https://cratedb.com/docs/cloud/ +[MongoDB Atlas]: https://www.mongodb.com/atlas +[MongoDB Change Stream]: https://www.mongodb.com/docs/manual/changeStreams/ +[SDK and CLI for CrateDB Cloud Cluster APIs]: https://github.com/crate-workbench/cratedb-toolkit/pull/81 diff --git a/doc/io/mongodb/index.md b/doc/io/mongodb/index.md index deef1bf8..67d23079 100644 --- a/doc/io/mongodb/index.md +++ b/doc/io/mongodb/index.md @@ -9,5 +9,14 @@ Using the MongoDB subsystem, you can transfer data from and to MongoDB. :maxdepth: 1 loader -migr8 +cdc ``` + +:::{note} +The MongoDB Table Loader is an improvement of the traditional {doc}`migr8`. +::: + +:::{tip} +In order to learn how to provide CrateDB and MongoDB services optimally, to +evaluate this subsystem, see {ref}`mongodb-services`. +::: diff --git a/doc/io/mongodb/migr8.md b/doc/io/mongodb/migr8.md index ecbc48f8..48ab352d 100644 --- a/doc/io/mongodb/migr8.md +++ b/doc/io/mongodb/migr8.md @@ -1,5 +1,9 @@ +--- +orphan: true +--- + (migr8)= -# migr8 +# migr8 migration utility ## About diff --git a/doc/io/mongodb/services.md b/doc/io/mongodb/services.md new file mode 100644 index 00000000..b68fcc87 --- /dev/null +++ b/doc/io/mongodb/services.md @@ -0,0 +1,140 @@ +--- +orphan: true +--- + +(mongodb-services)= +# Services CrateDB and MongoDB + +In order to exercise this subsystem, you will need one running instance of +each CrateDB and MongoDB. You can either deploy them on your workstation, +or by using their managed service infrastructure. + + +(mongodb-services-standalone)= +## Standalone Services +Quickly start CrateDB and MongoDB using Docker or Podman. + +### CrateDB +Start CrateDB. +```shell +docker run --rm -it --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=2g \ + crate:5.7 -Cdiscovery.type=single-node +``` + +### MongoDB +Start MongoDB. +Please note that change streams are only available for replica sets and +sharded clusters, so let's define a replica set by using the +`--replSet rs-testdrive` option when starting the MongoDB server. +```shell +docker run -it --rm --name=mongodb --publish=27017:27017 \ + mongo:7 mongod --replSet rs-testdrive +``` + +Now, initialize the replica set, by using the `mongosh` command to invoke +the `rs.initiate()` operation. +```shell +export MONGODB_URL="mongodb://localhost/" +docker run -i --rm --network=host mongo:7 mongosh ${MONGODB_URL} <`. + +When shutting down your workbench, you may want to clean up any cloud resources +you just used. +```shell +atlas clusters delete testdrive +``` + + +:::{todo} +- Evaluate and describe how to perform the "Database Access" step using the Atlas CLI + instead of using the Atlas Web Console. +::: + + +[Atlas CLI]: https://www.mongodb.com/docs/atlas/cli/ +[CrateDB Cloud]: https://cratedb.com/docs/cloud/ +[croud CLI]: https://cratedb.com/docs/cloud/en/latest/tutorials/deploy/croud.html +[MongoDB Atlas]: https://www.mongodb.com/atlas +[CrateDB Cloud Web Console]: https://cratedb.com/docs/cloud/en/latest/tutorials/quick-start.html#deploy-cluster diff --git a/doc/sandbox.md b/doc/sandbox.md index 8886a065..675c73f6 100644 --- a/doc/sandbox.md +++ b/doc/sandbox.md @@ -41,3 +41,14 @@ Format code. ```shell poe format ``` + + +## Troubleshooting +Docker is needed to provide service instances to the test suite. If your Docker +daemon is not running or available, you will receive an error message like that: +```python +docker.errors.DockerException: Error while fetching server API version: + ('Connection aborted.', FileNotFoundError(2, 'No such file or directory')) +AttributeError: 'CrateDBContainer' object has no attribute '_container' +``` +In order to fix the problem, just start your Docker daemon. diff --git a/pyproject.toml b/pyproject.toml index 847176f9..4c25c1bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -150,6 +150,7 @@ io = [ "sqlalchemy>=2", ] mongodb = [ + "commons-codec[mongodb]==0.0.2", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1",