Skip to content

Commit

Permalink
MongoDB CDC: Add support for MongoDB Change Streams to ctk load table
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jul 23, 2024
1 parent 6325b92 commit 8c1acd4
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


## Unreleased
- `ctk load table`: Added support for MongoDB Change Streams

## 2024/07/08 v0.0.15
- IO: Added the `if-exists` query parameter by updating to influxio 0.4.0.
Expand Down
16 changes: 11 additions & 5 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,17 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
logger.error(msg)
raise OperationFailed(msg)
elif source_url.startswith("mongodb"):
from cratedb_toolkit.io.mongodb.api import mongodb_copy
if "+cdc" in source_url:
source_url = source_url.replace("+cdc", "")
from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

if not mongodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
mongodb_relay_cdc(source_url, target_url, progress=True)
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
else:
raise NotImplementedError("Importing resource not implemented yet")
39 changes: 39 additions & 0 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import logging

from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.cr8 import cr8_insert_json
Expand Down Expand Up @@ -68,3 +69,41 @@ def mongodb_copy(source_url, target_url, progress: bool = False):
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)

return True


def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
"""
Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo-cdc
ctk load table mongodb+cdc://localhost:27017/testdrive/demo
Backlog
-------
TODO: Run on multiple collections.
TODO: Run on the whole database.
TODO: Accept parameters like `if_exists="append,replace"`.
TODO: Propagate parameters like `scan="full"`.
"""
logger.info("Running MongoDB CDC relay")

# Decode database URL.
mongodb_address = DatabaseAddress.from_string(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table

cratedb_address = DatabaseAddress.from_string(target_url)
cratedb_uri, cratedb_table_address = cratedb_address.decode()

# Configure machinery.
relay = MongoDBCDCRelayCrateDB(
mongodb_url=str(mongodb_uri),
mongodb_database=mongodb_database,
mongodb_collection=mongodb_collection,
cratedb_sqlalchemy_url=str(cratedb_uri),
cratedb_table=cratedb_table_address.fullname,
)

# Invoke machinery.
relay.start()
61 changes: 61 additions & 0 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Basic relaying of a MongoDB Change Stream into CrateDB table.
Documentation:
- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md
- https://www.mongodb.com/docs/manual/changeStreams/
- https://www.mongodb.com/developer/languages/python/python-change-streams/
"""

import logging

import pymongo
import sqlalchemy as sa
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB

from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


class MongoDBCDCRelayCrateDB:
"""
Relay MongoDB Change Stream into CrateDB table.
"""

def __init__(
self,
mongodb_url: str,
mongodb_database: str,
mongodb_collection: str,
cratedb_sqlalchemy_url: str,
cratedb_table: str,
):
self.cratedb_adapter = DatabaseAdapter(cratedb_sqlalchemy_url, echo=True)
self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name)

def start(self):
"""
Subscribe to change stream events, convert to SQL, and submit to CrateDB.
"""
# FIXME: Note that the function does not perform any sensible error handling yet.
with self.cratedb_adapter.engine.connect() as connection:
connection.execute(sa.text(self.cdc.sql_ddl))
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))

def cdc_to_sql(self):
"""
Subscribe to change stream events, and emit corresponding SQL statements.
"""
# Note that `.watch()` will block until events are ready for consumption, so
# this is not a busy loop.
# FIXME: Note that the function does not perform any sensible error handling yet.
while True:
with self.mongodb_collection.watch(full_document="updateLookup") as change_stream:
for change in change_stream:
yield self.cdc.to_sql(change)
156 changes: 156 additions & 0 deletions doc/io/mongodb/cdc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
(mongodb-cdc-relay)=
# MongoDB CDC Relay

## About
Relay a [MongoDB Change Stream] into a [CrateDB] table using a one-stop command
`ctk load table mongodb+cdc://...`, or `mongodb+srv+cdc://` for MongoDB Atlas.

You can use it in order to facilitate convenient data transfers to be used
within data pipelines or ad hoc operations. It can be used as a CLI interface,
and as a library.


## Install
```shell
pip install --upgrade 'cratedb-toolkit[mongodb]'
```

:::{tip}
The tutorial also uses the programs `crash`, `mongosh`, and `atlas`. `crash`
will be installed with CrateDB Toolkit, but `mongosh` and `atlas` must be
installed by other means. If you are using Docker anyway, please use those
command aliases to provide them to your environment without actually needing
to install them.

```shell
alias mongosh='docker run -i --rm --network=host mongo:7 mongosh'
```

The `atlas` program needs to store authentication information between invocations,
therefore you need to supply a storage volume.
```shell
mkdir atlas-config
alias atlas='docker run --rm -it --volume=$(pwd)/atlas-config:/root mongodb/atlas atlas'
```
:::


## Usage

(mongodb-cdc-workstation)=
### Workstation
The guidelines assume that both services, CrateDB and MongoDB, are listening on
`localhost`.
Please find guidelines how to provide them on your workstation using
Docker or Podman in the {ref}`mongodb-services-standalone` section.
```shell
export MONGODB_URL=mongodb://localhost/testdrive
export MONGODB_URL_CTK=mongodb+cdc://localhost/testdrive/demo
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost/testdrive/demo-cdc
ctk load table "${MONGODB_URL_CTK}"
```

Insert document into MongoDB collection, and update it.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})'
mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })'
```

Query data in CrateDB.
```shell
crash --command 'SELECT * FROM "testdrive"."demo-cdc";'
```

Invoke a delete operation, and check data in CrateDB once more.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.deleteOne({"foo": "bar"})'
crash --command 'SELECT * FROM "testdrive"."demo-cdc";'
```

(mongodb-cdc-cloud)=
### Cloud
The guidelines assume usage of cloud variants for both services, CrateDB Cloud
and MongoDB Atlas.
Please find guidelines how to provision relevant cloud resources in the
{ref}`mongodb-services-cloud` section. You will need authentication credentials
from this step for the next one.

:::{rubric} Invoke pipeline
:::
A canonical invocation for ingesting MongoDB Atlas Change Streams into
CrateDB Cloud.

```shell
export MONGODB_URL=mongodb+srv://user:[email protected]/testdrive
export MONGODB_URL_CTK=mongodb+srv+cdc://user:[email protected]/testdrive/demo
export CRATEDB_HTTP_URL="https://admin:[email protected]:4200/"
export CRATEDB_SQLALCHEMY_URL="crate://admin:[email protected]:4200/testdrive/demo-cdc?ssl=true"
```
```shell
ctk load table "${MONGODB_URL_CTK}"
```

:::{note}
Please note the `mongodb+srv://` and `mongodb+srv+cdc://` URL schemes, and the
`ssl=true` query parameter. Both are needed to establish connectivity with
MongoDB Atlas and CrateDB.
:::

:::{rubric} Trigger CDC events
:::
Inserting a document into the MongoDB collection, and updating it, will trigger two CDC events.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})'
mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })'
```

:::{rubric} Query data in CrateDB
:::
```shell
crash --hosts "${CRATEDB_HTTP_URL}" --command 'SELECT * FROM "testdrive"."demo-cdc";'
```


## Appendix
A few operations that are handy when exploring this exercise.

### Database Services
Provide CrateDB and MongoDB services.
- See {ref}`mongodb-services`.

### Database Operations

Query records in CrateDB table.
```shell
crash --command 'SELECT * FROM "testdrive"."demo-cdc";'
```

Truncate CrateDB table.
```shell
crash --command 'DELETE FROM "testdrive"."demo-cdc";'
```

Query documents in MongoDB collection.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.find()'
```

Truncate MongoDB collection.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.drop()'
```


## Backlog
:::{todo}
- Improve general CLI UX/DX, for example by using `ctk shell`.
- Provide [SDK and CLI for CrateDB Cloud Cluster APIs], for improving Cloud DX.
:::


[commons-codec]: https://pypi.org/project/commons-codec/
[CrateDB]: https://cratedb.com/docs/guide/home/
[CrateDB Cloud]: https://cratedb.com/docs/cloud/
[MongoDB Atlas]: https://www.mongodb.com/atlas
[MongoDB Change Stream]: https://www.mongodb.com/docs/manual/changeStreams/
[SDK and CLI for CrateDB Cloud Cluster APIs]: https://github.com/crate-workbench/cratedb-toolkit/pull/81
11 changes: 10 additions & 1 deletion doc/io/mongodb/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,14 @@ Using the MongoDB subsystem, you can transfer data from and to MongoDB.
:maxdepth: 1
loader
migr8
cdc
```

:::{note}
The MongoDB Table Loader is an improvement of the traditional {doc}`migr8`.
:::

:::{tip}
In order to learn how to provide CrateDB and MongoDB services optimally, to
evaluate this subsystem, see {ref}`mongodb-services`.
:::
6 changes: 5 additions & 1 deletion doc/io/mongodb/migr8.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
orphan: true
---

(migr8)=
# migr8
# migr8 migration utility

## About

Expand Down
Loading

0 comments on commit 8c1acd4

Please sign in to comment.