From 21dd6c80a2f939fd5fa00dd8e61b0c12d5e3b255 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 27 Jun 2024 19:45:07 +0200 Subject: [PATCH 01/25] New Debezium docs --- .../integrate/pages/cdc-connectors.adoc | 4 +- .../integrate/pages/cdc-ee-connectors.adoc | 133 ++++++++++++++++++ docs/modules/integrate/pages/connectors.adoc | 13 +- 3 files changed, 146 insertions(+), 4 deletions(-) create mode 100644 docs/modules/integrate/pages/cdc-ee-connectors.adoc diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 94a5a7623..ce77351a8 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -1,4 +1,4 @@ -= CDC Connector += Legacy CDC Connector Change Data Capture (CDC) refers to the process of observing changes made to a database and extracting them in a form usable by other @@ -16,7 +16,7 @@ The ones for MySQL and PostgreSQL already are. == Installing the Connector -This connector is included in the full and slim distributions of Hazelcast. +This connector is included in the full distribution of Open Source Hazelcast. == CDC as a Source diff --git a/docs/modules/integrate/pages/cdc-ee-connectors.adoc b/docs/modules/integrate/pages/cdc-ee-connectors.adoc new file mode 100644 index 000000000..fa2256ba0 --- /dev/null +++ b/docs/modules/integrate/pages/cdc-ee-connectors.adoc @@ -0,0 +1,133 @@ += CDC Connector +[.enterprise]*Enterprise* + +Change Data Capture (CDC) refers to the process of observing changes +made to a database and extracting them in a form usable by other +systems, for the purposes of replication, analysis and many more. + +Change Data Capture is especially important to Hazelcast, because it allows +for the _streaming of changes from databases_, which can be efficiently +processed by the Jet engine. + +Implementation of CDC in Hazelcast Enterprise is based on +link:https://debezium.io/[Debezium 2.x]. Hazelcast offers a generic Debezium source +which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium], +but we're also striving to make CDC sources first class citizens in Hazelcast. +The ones for MySQL and PostgreSQL already are. + +== Installing the Connector + +This connector is included in the full distribution of Hazelcast Enterprise. + +=== Maven +For using this connector inside Maven project you can add following entries into `pom.xml`'s `` section: + +Generic connector: +```xml + + com.hazelcast.jet + hazelcast-enterprise-cdc-debezium + {full-version} + jar-with-dependencies + +``` + +MySQL-specific connector: +```xml + + com.hazelcast.jet + hazelcast-enterprise-cdc-mysql + {full-version} + jar-with-dependencies + +``` +Note: MySQL connector does not include MySQL driver as a dependency. + +PostgreSQL-specific connector: +```xml + + com.hazelcast.jet + hazelcast-enterprise-cdc-postgres + {full-version} + jar-with-dependencies + +``` + +== CDC as a Source + +We have the following types of CDC sources: + +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources]: + generic source for all databases supported by Debezium +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources]: + specific, first class Jet CDC source for MySQL databases (also based + on Debezium, but benefiting the full range of convenience Jet can + additionally provide) +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources]: + specific, first class CDC source for PostgreSQL databases (also based + on Debezium, but benefiting the full range of convenience Hazelcast can + additionally provide) + +For the setting up a streaming source of CDC data is just the matter of pointing it at the right database via configuration: + +```java +Pipeline pipeline = Pipeline.create(); +pipeline.readFrom( + MySqlCdcSources.mysql("customers") + .setDatabaseAddress("127.0.0.1", 3306) + .setDatabaseCredentials("debezium", "dbz") + .setClusterName("dbserver1") + .setDatabaseIncludeList("inventory") + .setTableIncludeList("inventory.customers") + .build()) + .withNativeTimestamps(0) + .writeTo(Sinks.logger()); +``` + +MySQL- and PostgreSQL-specific source builders contain methods for all major configuration setting and it guards if +e.g. mutually exclusive options are not used. For generic source builder user must rely on Debezium's documentation +to provide all necessary options. + +For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial]. + +=== Fault Tolerance + +CDC sources offer at least-once processing guarantees. The source +periodically saves the database write ahead log offset for which it had +dispatched events and in case of a failure/restart it will replay all +events since the last successfully saved offset. + +Unfortunately, however, there is no guaran`tee that the last saved offset +is still in the database changelog. Such logs are always finite and +depending on the DB configuration can be relatively short, so if the CDC +source has to replay data for a long period of inactivity, then there +can be a data loss. With careful management though we can say that +at-least once guarantee can practically be provided. + +== CDC as a Sink + +Change data capture is a source-side functionality in Jet, but we also +offer some specialized sinks that simplify applying CDC events to a map, which gives you the ability to reconstruct the contents of the +original database table. The sinks expect to receive `ChangeRecord` +objects and apply your custom functions to them that extract the key and +the value that will be applied to the target map. + +For example, a sink mapping CDC data to a `Customer` class and +maintaining a map view of latest known email addresses per customer +(identified by ID) would look like this: + +```java +Pipeline p = Pipeline.create(); +p.readFrom(source) + .withoutTimestamps() + .writeTo(CdcSinks.map("customers", + r -> r.key().toMap().get("id"), + r -> r.value().toObject(Customer.class).email)); +``` + +[NOTE] +==== +The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly. + +If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial]. +==== \ No newline at end of file diff --git a/docs/modules/integrate/pages/connectors.adoc b/docs/modules/integrate/pages/connectors.adoc index ea7a9ee89..bf7ea52d4 100644 --- a/docs/modules/integrate/pages/connectors.adoc +++ b/docs/modules/integrate/pages/connectors.adoc @@ -115,11 +115,16 @@ The Jet API supports more connectors than SQL. |batch |N/A -|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium] +|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium] (Legacy) |hazelcast-jet-cdc-debezium |streaming |at-least-once +|xref:integrate:cdc-ee-connectors.adoc[DebeziumCdcSources.debezium] ([.enterprise]*Enterprise*) +|hazelcast-enterprise-cdc-debezium +|streaming +|at-least-once + |xref:integrate:elasticsearch-connector.adoc[ElasticSources.elastic] |hazelcast-jet-elasticsearch-6 |batch @@ -263,7 +268,11 @@ The Jet API supports more connectors than SQL. |N/A |xref:integrate:cdc-connectors.adoc[CdcSinks.map] -|hazelcast-jet-cdc-debezium +|hazelcast-jet-cdc-debezium (legacy, OS) + +or + +hazelcast-enterprise-cdc-debezium (EE) |streaming |at-least-once From 15951644c2b9e8e35e3c3ab7f9b63cd00641a726 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 27 Jun 2024 20:23:18 +0200 Subject: [PATCH 02/25] Example changed --- docs/modules/pipelines/pages/cdc.adoc | 168 +++++++++++++++----------- 1 file changed, 95 insertions(+), 73 deletions(-) diff --git a/docs/modules/pipelines/pages/cdc.adoc b/docs/modules/pipelines/pages/cdc.adoc index e55f3da96..33680b796 100644 --- a/docs/modules/pipelines/pages/cdc.adoc +++ b/docs/modules/pipelines/pages/cdc.adoc @@ -26,12 +26,12 @@ inventory database: ```bash docker run -it --rm --name mysql -p 3306:3306 \ -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser \ - -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.2 + -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.6 ``` -This runs a new container using version `1.2` of the +This runs a new container using version `2.6` of the link:https://hub.docker.com/r/debezium/example-mysql[debezium/example-mysql] -image (based on link:https://hub.docker.com/_/mysql[mysql:5.7]. It defines +image (based on link:https://hub.docker.com/_/mysql[mysql 8.2]. It defines and populates a sample "inventory" database and creates a `debezium` user with password `dbz` that has the minimum privileges required by Debezium’s MySQL connector. @@ -84,7 +84,8 @@ specifies the correct options so that it can connect properly. The container should output lines similar to the following: -``` +[source] +---- mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 4 @@ -99,25 +100,28 @@ owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> -``` +---- Unlike the other containers, this container runs a process that produces a prompt. We’ll use the prompt to interact with the database. First, switch to the "inventory" database: -```sql +[source,sql] +---- mysql> use inventory; -``` +---- and then list the tables in the database: -```sql +[source,sql] +---- mysql> show tables; -``` +---- which should then display: -``` +[source] +---- +---------------------+ | Tables_in_inventory | +---------------------+ @@ -129,14 +133,15 @@ which should then display: | products_on_hand | +---------------------+ 6 rows in set (0.01 sec) -``` +---- Use the MySQL command line client to explore the database and view the preloaded data. For example: -```sql +[source,sql] +---- mysql> SELECT * FROM customers; -``` +---- == Step 4. Start Hazelcast @@ -145,33 +150,35 @@ mysql> SELECT * FROM customers; If you already have Hazelcast and you skipped the above steps, make sure to follow from here on. -. Make sure the MySQL CDC plugin is in `lib/` directory. +. Make sure the MySQL CDC plugin is in `lib/` directory. You need to manually download the MySQL CDC plugin from Hazelcast's Maven https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-enterprise-cdc-mysql/{full-version}/hazelcast-enterprise-cdc-mysql-{full-version}-jar-with-dependencies.jar[repository] and then copy it to the `lib/` directory. + -```bash +[source,bash] +---- ls lib/ -``` +---- + You should see the following jars: + -* hazelcast-jet-cdc-debezium-{full-version}.jar -* hazelcast-jet-cdc-mysql-{full-version}.jar -* hazelcast-jet-cdc-postgres-{full-version}.jar -+ -WARNING: If you have Hazelcast Enterprise Edition, you need to manually download the MySQL CDC plugin from Hazelcast's Maven https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-jet-cdc-mysql/{full-version}/hazelcast-jet-cdc-mysql-{full-version}-jar-with-dependencies.jar[repository] and then copy it to the `lib/` directory. +* hazelcast-enterprise-cdc-debezium-{full-version}-jar-with-dependencies.jar +* hazelcast-enterprise-cdc-mysql-{full-version}-jar-with-dependencies.jar +* hazelcast-enterprise-cdc-postgres-{full-version}-jar-with-dependencies.jar + . Start Hazelcast. + -```bash +[source,bash] +---- bin/hz-start -``` +---- . When you see output like this, Hazelcast is up: + -``` +[source] +---- Members {size:1, ver:1} [ Member [192.168.1.5]:5701 - e7c26f7c-df9e-4994-a41d-203a1c63480e this ] -``` +---- == Step 5. Create a New Java Project @@ -197,8 +204,8 @@ repositories.mavenCentral() dependencies { implementation 'com.hazelcast:hazelcast:{full-version}' - implementation 'com.hazelcast.jet:hazelcast-jet-cdc-debezium:{full-version}' - implementation 'com.hazelcast.jet:hazelcast-jet-cdc-mysql:{full-version}' + implementation 'com.hazelcast.jet:hazelcast-enterprise-cdc-debezium:{full-version}' + implementation 'com.hazelcast.jet:hazelcast-enterprise-cdc-mysql:{full-version}' implementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.0' } @@ -220,8 +227,7 @@ Maven:: 1.0-SNAPSHOT - 1.8 - 1.8 + 17 @@ -232,12 +238,12 @@ Maven:: com.hazelcast.jet - hazelcast-jet-cdc-debezium + hazelcast-enterprise-cdc-debezium {full-version} com.hazelcast.jet - hazelcast-jet-cdc-mysql + hazelcast-enterprise-cdc-mysql {full-version} @@ -280,7 +286,8 @@ customer with a specific ID. This is how the code doing this looks like: -```java +[source,java] +---- package org.example; import com.hazelcast.core.Hazelcast; @@ -296,13 +303,11 @@ public class JetJob { public static void main(String[] args) { StreamSource source = MySqlCdcSources.mysql("source") - .setDatabaseAddress("127.0.0.1") - .setDatabasePort(3306) - .setDatabaseUser("debezium") - .setDatabasePassword("dbz") + .setDatabaseAddress("127.0.0.1", 3306) + .setDatabaseCredentials("debezium", "dbz") .setClusterName("dbserver1") - .setDatabaseWhitelist("inventory") - .setTableWhitelist("inventory.customers") + .setDatabaseIncludeList("inventory") + .setTableIncludeList("inventory.customers") .build(); Pipeline pipeline = Pipeline.create(); @@ -319,11 +324,12 @@ public class JetJob { } } -``` +---- The `Customer` class we map change events to is quite simple too: -```java +[source,java] +---- package org.example; import com.fasterxml.jackson.annotation.JsonProperty; @@ -381,14 +387,15 @@ public class Customer implements Serializable { return "Customer {id=" + id + ", firstName=" + firstName + ", lastName=" + lastName + ", email=" + email + '}'; } } -``` +---- To make it evident that our pipeline serves the purpose of building an up-to-date cache of customers, which can be interrogated at any time let's add one more class. This code can be executed at any time in your IDE and will print the current content of the cache. -```java +[source,java] +---- package org.example; import com.hazelcast.client.HazelcastClient; @@ -406,7 +413,7 @@ public class CacheRead { } } -``` +---- == Step 7. Package the Pipeline into a JAR @@ -422,9 +429,10 @@ need to do is to run the build command: Gradle:: + -- -```bash +[source,bash] +---- gradle build -``` +---- This will produce a JAR file called `cdc-tutorial-1.0-SNAPSHOT.jar` in the `build/libs` directory of our project. @@ -433,9 +441,10 @@ Maven:: + -- -```bash +[source,bash] +---- mvn package -``` +---- This will produce a JAR file called `cdc-tutorial-1.0-SNAPSHOT.jar` in the `target` directory or our project. @@ -452,16 +461,18 @@ issue is following command: Gradle:: + -- -```bash +[source,bash] +---- bin/hz-cli submit build/libs/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- Maven:: + -- -```bash +[source,bash] +---- bin/hz-cli submit target/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- ==== @@ -469,95 +480,106 @@ The output in the Hazelcast member's log should look something like this (we also log what we put in the `IMap` sink thanks to the `peek()` stage we inserted): -``` +[source] +---- ... Completed snapshot in 00:00:01.519 ... Output to ordinal 0: key:{{"id":1001}}, value:{{"id":1001,"first_name":"Sally","last_name":"Thomas",... ... Output to ordinal 0: key:{{"id":1002}}, value:{{"id":1002,"first_name":"George","last_name":"Bailey",... ... Output to ordinal 0: key:{{"id":1003}}, value:{{"id":1003,"first_name":"Edward","last_name":"Walker",... ... Output to ordinal 0: key:{{"id":1004}}, value:{{"id":1004,"first_name":"Anne","last_name":"Kretchmar",... ... Transitioning from the snapshot reader to the binlog reader -``` +---- == Step 9. Track Updates Let's see how our cache looks like at this time. If we execute the `CacheRead` code <<6-define-jet-job, defined above>>, we'll get: -```text +[source,text] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com} Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- Let's do some updates in our database. Go to the MySQL CLI <> and run following update statement: -``` +[source,bash] +---- mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 -``` +---- In the log of the Hazelcast member we should immediately see the effect: -``` +[source] +---- ... Output to ordinal 0: key:{{"id":1004}}, value:{{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar",... -``` +---- If we check the cache with `CacheRead` we get: -``` +[source] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com} Customer {id=1004, firstName=Anne Marie, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- One more: -``` +[source,bash] +---- mysql> UPDATE customers SET email='edward.walker@walker.com' WHERE id=1003; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 -``` +---- -``` +[source] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=edward.walker@walker.com} Customer {id=1004, firstName=Anne Marie, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- == Step 10. Clean up . Cancel the job. + -```bash +[source,bash] +---- bin/hz-cli cancel mysql-monitor -``` +---- + Shut down the Hazelcast cluster. + -```bash +[source,bash] +---- bin/hz-stop -``` +---- . Use Docker to stop the running container (this will kill the command-line client too, since it's running in the same container): + -```bash +[source,bash] +---- docker stop mysql -``` +---- + Since we've used the `--rm` flag when starting the connectors, Docker should remove them right after we stop them. We can verify that all processes are stopped and removed with following command: + -```bash +[source,bash] +---- docker ps -a -``` +---- From 0c5e07bede32fcf2e6abc792fea7677e169ec2d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 27 Jun 2024 20:50:32 +0200 Subject: [PATCH 03/25] Function overview --- .../integrate/pages/cdc-ee-connectors.adoc | 32 +++++++++++++++++++ docs/modules/pipelines/pages/cdc.adoc | 15 +++++---- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-ee-connectors.adoc b/docs/modules/integrate/pages/cdc-ee-connectors.adoc index fa2256ba0..cd822be4d 100644 --- a/docs/modules/integrate/pages/cdc-ee-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-ee-connectors.adoc @@ -90,6 +90,38 @@ to provide all necessary options. For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial]. +=== Common source builder functions +[cols="m,a"] +|=== +|Method name|Description + +|changeRecord() +| Sets output type to ChangeRecord - a wrapper, providing most of the fields in +strongly-typed manner. + +| json() +| Sets output type to JSON - result stage will have `Map` as it's type, +where key is SourceRecord's key in json format and value is whole SourceRecord's value in json string. + +|customMapping(RecordMappingFunction) +| Sets output type to some arbitrary user type `T`. Mapping from `SourceRecord` to `T` will +be done using provided function. + +|withDefaultEngine +|Sets preferred engine to default (non-async) one. This engine is single-threaded, +but also older and more tested. For most stable results (e.g. no async offset restore) this engine should be preferred. For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. + +|withAsyncEngine +|Sets preferred engine to async one. This engine is multithreaded (if connector supports +it), but you must be aware of it's async nature, e.g. offset restore after restart is done +asynchronously as well, leading to sometimes confusing results. + +|setProperty +|Sets connector property to given value. There are multiple overloads, allowing to +sets value to some `long`, `String` or `boolean`. + +|=== + === Fault Tolerance CDC sources offer at least-once processing guarantees. The source diff --git a/docs/modules/pipelines/pages/cdc.adoc b/docs/modules/pipelines/pages/cdc.adoc index 33680b796..cad258521 100644 --- a/docs/modules/pipelines/pages/cdc.adoc +++ b/docs/modules/pipelines/pages/cdc.adoc @@ -23,11 +23,12 @@ Open a terminal, and run following command. It will start a new container that runs a MySQL database server preconfigured with an inventory database: -```bash +[source,bash] +---- docker run -it --rm --name mysql -p 3306:3306 \ -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser \ -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.6 -``` +---- This runs a new container using version `2.6` of the link:https://hub.docker.com/r/debezium/example-mysql[debezium/example-mysql] @@ -52,11 +53,12 @@ variables to specific values. You should see in your terminal something like the following: -```text +[source,text] +---- ... 2020-03-09T09:48:24.579480Z 0 [Note] mysqld: ready for connections. Version: '5.7.29-log' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL) -``` +---- Notice that the MySQL server starts and stops a few times as the configuration is modified. The last line listed above reports that the @@ -68,10 +70,11 @@ Open a new terminal, and use it to start a new container for the MySQL command line client and connect it to the MySQL server running in the `mysql` container: -```bash +[source,bash] +---- docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh \ -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"' -``` +---- Here we start the container using the `mysql:5.7` image, name the container `mysqlterm` and link it to the mysql container where the From bb480f1b5b70d507d2dbfa6bd89a126fbcb4925a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 27 Jun 2024 20:51:13 +0200 Subject: [PATCH 04/25] More info --- docs/modules/integrate/pages/cdc-ee-connectors.adoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-ee-connectors.adoc b/docs/modules/integrate/pages/cdc-ee-connectors.adoc index cd822be4d..ae6ea421a 100644 --- a/docs/modules/integrate/pages/cdc-ee-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-ee-connectors.adoc @@ -107,16 +107,16 @@ where key is SourceRecord's key in json format and value is whole SourceRecord's | Sets output type to some arbitrary user type `T`. Mapping from `SourceRecord` to `T` will be done using provided function. -|withDefaultEngine +|withDefaultEngine() |Sets preferred engine to default (non-async) one. This engine is single-threaded, but also older and more tested. For most stable results (e.g. no async offset restore) this engine should be preferred. For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. -|withAsyncEngine +|withAsyncEngine() |Sets preferred engine to async one. This engine is multithreaded (if connector supports it), but you must be aware of it's async nature, e.g. offset restore after restart is done asynchronously as well, leading to sometimes confusing results. -|setProperty +|setProperty(String, String) |Sets connector property to given value. There are multiple overloads, allowing to sets value to some `long`, `String` or `boolean`. From 17f6ebba454b1ea01f5a3c88410292f97f08fa7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 27 Jun 2024 20:59:29 +0200 Subject: [PATCH 05/25] CDC-join --- .../pipelines/pages/cdc-database-setup.adoc | 27 +-- docs/modules/pipelines/pages/cdc-join.adoc | 169 ++++++++++-------- 2 files changed, 113 insertions(+), 83 deletions(-) diff --git a/docs/modules/pipelines/pages/cdc-database-setup.adoc b/docs/modules/pipelines/pages/cdc-database-setup.adoc index e667e495f..18c99ed15 100644 --- a/docs/modules/pipelines/pages/cdc-database-setup.adoc +++ b/docs/modules/pipelines/pages/cdc-database-setup.adoc @@ -39,13 +39,14 @@ config file. See MySQL Reference Manual on how to do that link:https://dev.mysql.com/doc/refman/8.0/en/option-files.html[8.0]). For example: -``` +[source] +---- server-id = 223344 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10 -``` +---- The semantics of these options are as follows: @@ -62,9 +63,10 @@ link:https://dev.mysql.com/doc/refman/8.0/en/show-variables.html[8.0]). It's worth pointing out that the names of the options sometimes differ from the names of the MySQL system variables they set. For example: -``` +[source] +---- SHOW VARIABLES LIKE 'server_id'; -``` +---- === Configure Session Timeouts @@ -121,7 +123,8 @@ configuration options to be set accordingly. This can be done either by The important properties to set are: -```properties +[source,properties] +---- # MODULES shared_preload_libraries = 'decoderbufs,wal2json' @@ -129,7 +132,7 @@ shared_preload_libraries = 'decoderbufs,wal2json' wal_level = logical max_wal_senders = 1 max_replication_slots = 1 -``` +---- `shared_preload_libraries` contains a comma separated list of installed output plug-ins. `wal_levels` is used to tell the server to use logical @@ -152,9 +155,10 @@ permissions. The permissions needed are `REPLICATION` and `LOGIN`. For setting up database users/roles see the link:https://www.postgresql.org/docs/9.6/user-manag.html[PostgreSQL documentation], but basically the essential command is: -``` +[source,sql] +---- CREATE ROLE name REPLICATION LOGIN; -``` +---- Note: database super-users already have all the permissions needed by replication too. @@ -166,11 +170,12 @@ PostgreSQL server needs to allow access from the host the CDC connector is running on. To specify such link:https://www.postgresql.org/docs/9.6/auth-pg-hba-conf.html[client authentication] options add following lines to the end of the `pg_hba.conf` file: -``` +[source] +---- local replication user trust host replication user 127.0.0.1/32 trust host replication user ::1/128 trust -``` +---- This example tells the server to allow replication for the specified user locally or on `localhost`, using IPv4 or IPv6. @@ -253,7 +258,7 @@ shouldn't affect normal operations too severely. ==== Failure Tolerance -PostgreSQL failure tolerance associated with replication slots is +PostgreSQL's failure tolerance associated with replication slots is somewhat lacking in certain aspects. The CDC connector can quite nicely deal with its own restart or connection loss to the primary database, but only as long as replication slots remain intact. Replication diff --git a/docs/modules/pipelines/pages/cdc-join.adoc b/docs/modules/pipelines/pages/cdc-join.adoc index 448ad895f..4c14bd10b 100644 --- a/docs/modules/pipelines/pages/cdc-join.adoc +++ b/docs/modules/pipelines/pages/cdc-join.adoc @@ -45,7 +45,8 @@ They differ slightly depending on which database you use: Let's write the code for the processing we want to accomplish: -```java +[source,java] +---- package org.example; import com.hazelcast.core.Hazelcast; @@ -64,13 +65,11 @@ private static final int MAX_CONCURRENT_OPERATIONS = 1; public static void main(String[] args) { StreamSource source = MySqlCdcSources.mysql("source") - .setDatabaseAddress("127.0.0.1") - .setDatabasePort(3306) - .setDatabaseUser("debezium") - .setDatabasePassword("dbz") + .setDatabaseAddress("127.0.0.1", 3306) + .setDatabaseCredentials("debezium", "dbz") .setClusterName("dbserver1") - .setDatabaseWhitelist("inventory") - .setTableWhitelist("inventory.customers", "inventory.orders") + .setDatabaseIncludeList("inventory") + .setTableIncludeList("inventory.customers", "inventory.orders") .build(); Pipeline pipeline = Pipeline.create(); @@ -98,25 +97,25 @@ public static void main(String[] args) { hz.getJet().newJob(pipeline, cfg); } } -``` +---- If using Postgres, only the source would need to change, like this: -```java +[source,java] +---- StreamSource source = PostgresCdcSources.postgres("source") - .setDatabaseAddress("127.0.0.1") - .setDatabasePort(5432) - .setDatabaseUser("postgres") - .setDatabasePassword("postgres") + .setDatabaseAddress("127.0.0.1", 5432) + .setDatabaseCredentials("postgres", "postgres") .setDatabaseName("postgres") - .setTableWhitelist("inventory.customers", "inventory.orders") + .setTableIncludeList("inventory.customers", "inventory.orders") .build(); -``` +---- As we can see from the pipeline code, our `Sink` is `EntryProcessor` based. The two `EntryProcessors` we use are: -```java +[source,java] +---- package org.example; import com.hazelcast.jet.cdc.ChangeRecord; @@ -158,9 +157,10 @@ public class CustomerEntryProcessor implements EntryProcessor/build/libs/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- Maven:: + -- -```yaml +[source,yaml] +---- user-code-deployment: enabled: true jarPaths: - /target/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- ==== + @@ -558,17 +570,19 @@ you created the project for this tutorial. . Start Hazelcast. + -```bash +[source,bash] +---- bin/hz-start -``` +---- . When you see output like this, Hazelcast is up: + -``` +[source] +---- Members {size:1, ver:1} [ Member [192.168.1.5]:5701 - e7c26f7c-df9e-4994-a41d-203a1c63480e this ] -``` +---- == Step 8. Submit the Job for Execution @@ -581,77 +595,85 @@ following command: Gradle:: + -- -```bash +[source,bash] +---- bin/hz-cli submit build/libs/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- Maven:: + -- -```bash +[source,bash] +---- bin/hz-cli submit target/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- ==== The output in the Hazelcast member's log should look something like this (we see these lines due to the `peek()` stages we've inserted): -``` +[source,text] +---- ........ ... Output to ordinal 0: key:{{"order_number":10002}}, value:{{"order_number":10002,"order_date":16817,"purchaser":1002,"quantity":2,"product_id":105,"__op":"c","__db":"inventory","__table":"orders","__ts_ms":1593681751174,"__deleted":"false"}} (eventTime=12:22:31.174) ... Output to ordinal 0: key:{{"order_number":10003}}, value:{{"order_number":10003,"order_date":16850,"purchaser":1002,"quantity":2,"product_id":106,"__op":"c","__db":"inventory","__table":"orders","__ts_ms":1593681751174,"__deleted":"false"}} (eventTime=12:22:31.174) ... Output to ordinal 0: key:{{"id":1003}}, value:{{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com","__op":"c","__db":"inventory","__table":"customers","__ts_ms":1593681751161,"__deleted":"false"}} (eventTime=12:22:31.161) ........ -``` +---- == Step 9. Track Updates Let's see how our cache looks like at this time. If we execute the `CacheRead` code <<5-define-jet-job, defined above>>, we'll get: -``` +[source] +---- Currently there are following customers in the cache: Customer: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com}, Orders: {10002=Order {orderNumber=10002, orderDate=Sun Jan 17 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=105}, 10003=Order {orderNumber=10003, orderDate=Fri Feb 19 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=106}} Customer: Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com}, Orders: {10004=Order {orderNumber=10004, orderDate=Sun Feb 21 02:00:00 EET 2016, purchaser=1003, quantity=1, productId=107}} Customer: Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org}, Orders: {} Customer: Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com}, Orders: {10001=Order {orderNumber=10001, orderDate=Sat Jan 16 02:00:00 EET 2016, purchaser=1001, quantity=1, productId=102}} -``` +---- Let's do some updates in our database. Go to the database CLI <<3-start-command-line-client, we've started earlier>> and run following commands: -```bash +[source,sql] +---- INSERT INTO inventory.customers VALUES (1005, 'Jason', 'Bourne', 'jason@bourne.org'); DELETE FROM inventory.orders WHERE order_number=10002; -``` +---- If we check the cache with `CacheRead` we get: -``` +[source] +---- Currently there are following customers in the cache: Customer: Customer {id=1005, firstName=Jason, lastName=Bourne, email=jason@bourne.org}, Orders: {} Customer: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com}, Orders: {10003=Order {orderNumber=10003, orderDate=Fri Feb 19 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=106}} Customer: Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com}, Orders: {10004=Order {orderNumber=10004, orderDate=Sun Feb 21 02:00:00 EET 2016, purchaser=1003, quantity=1, productId=107}} Customer: Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org}, Orders: {} Customer: Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com}, Orders: {10001=Order {orderNumber=10001, orderDate=Sat Jan 16 02:00:00 EET 2016, purchaser=1001, quantity=1, productId=102}} -``` +---- == Step 10. Clean up . Cancel the job. + -```bash +[source,bash] +---- bin/hz-cli cancel postgres-monitor -``` +---- Shut down the Hazelcast cluster. + -```bash +[source,bash] +---- bin/hz-stop -``` +---- . Use Docker to stop the running container (this will kill the command-line client too, since it's running in the same container): @@ -663,9 +685,10 @@ MySQL:: -- You can use Docker to stop all running containers: -```bash +[source,bash] +---- docker stop mysqlterm mysql -``` +---- -- Postgres:: + @@ -674,9 +697,10 @@ Postgres:: You can use Docker to stop the running container (this will kill the command-line client too, since it's running in the same container): -```bash +[source,bash] +---- docker stop postgres -``` +---- -- ==== + @@ -685,6 +709,7 @@ Docker should remove them right after we stop them. We can verify that all processes are stopped and removed with following command: -```bash +[source,bash] +---- docker ps -a -``` +---- From 5e18cce7e8bb53609551dcda8dee5cb769b155c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Fri, 28 Jun 2024 14:42:20 +0200 Subject: [PATCH 06/25] Fix links --- .../integrate/pages/cdc-connectors.adoc | 95 ++++++++++++++++--- docs/modules/integrate/pages/connectors.adoc | 34 ++++--- ...ectors.adoc => legacy-cdc-connectors.adoc} | 95 +++---------------- docs/modules/integrate/partials/nav.adoc | 1 + 4 files changed, 118 insertions(+), 107 deletions(-) rename docs/modules/integrate/pages/{cdc-ee-connectors.adoc => legacy-cdc-connectors.adoc} (54%) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 6ad73e628..ae6ea421a 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -1,4 +1,5 @@ -= Legacy CDC Connector += CDC Connector +[.enterprise]*Enterprise* Change Data Capture (CDC) refers to the process of observing changes made to a database and extracting them in a form usable by other @@ -8,15 +9,49 @@ Change Data Capture is especially important to Hazelcast, because it allows for the _streaming of changes from databases_, which can be efficiently processed by the Jet engine. -Implementation of CDC in Hazelcast is based on -link:https://debezium.io/[Debezium]. Hazelcast offers a generic Debezium source -which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium], +Implementation of CDC in Hazelcast Enterprise is based on +link:https://debezium.io/[Debezium 2.x]. Hazelcast offers a generic Debezium source +which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium], but we're also striving to make CDC sources first class citizens in Hazelcast. The ones for MySQL and PostgreSQL already are. == Installing the Connector -This connector is included in the full distribution of Open Source Hazelcast. +This connector is included in the full distribution of Hazelcast Enterprise. + +=== Maven +For using this connector inside Maven project you can add following entries into `pom.xml`'s `` section: + +Generic connector: +```xml + + com.hazelcast.jet + hazelcast-enterprise-cdc-debezium + {full-version} + jar-with-dependencies + +``` + +MySQL-specific connector: +```xml + + com.hazelcast.jet + hazelcast-enterprise-cdc-mysql + {full-version} + jar-with-dependencies + +``` +Note: MySQL connector does not include MySQL driver as a dependency. + +PostgreSQL-specific connector: +```xml + + com.hazelcast.jet + hazelcast-enterprise-cdc-postgres + {full-version} + jar-with-dependencies + +``` == CDC as a Source @@ -39,20 +74,54 @@ For the setting up a streaming source of CDC data is just the matter of pointing Pipeline pipeline = Pipeline.create(); pipeline.readFrom( MySqlCdcSources.mysql("customers") - .setDatabaseAddress("127.0.0.1") - .setDatabasePort(3306) - .setDatabaseUser("debezium") - .setDatabasePassword("dbz") + .setDatabaseAddress("127.0.0.1", 3306) + .setDatabaseCredentials("debezium", "dbz") .setClusterName("dbserver1") - .setDatabaseWhitelist("inventory") - .setTableWhitelist("inventory.customers") + .setDatabaseIncludeList("inventory") + .setTableIncludeList("inventory.customers") .build()) .withNativeTimestamps(0) .writeTo(Sinks.logger()); ``` +MySQL- and PostgreSQL-specific source builders contain methods for all major configuration setting and it guards if +e.g. mutually exclusive options are not used. For generic source builder user must rely on Debezium's documentation +to provide all necessary options. + For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial]. +=== Common source builder functions +[cols="m,a"] +|=== +|Method name|Description + +|changeRecord() +| Sets output type to ChangeRecord - a wrapper, providing most of the fields in +strongly-typed manner. + +| json() +| Sets output type to JSON - result stage will have `Map` as it's type, +where key is SourceRecord's key in json format and value is whole SourceRecord's value in json string. + +|customMapping(RecordMappingFunction) +| Sets output type to some arbitrary user type `T`. Mapping from `SourceRecord` to `T` will +be done using provided function. + +|withDefaultEngine() +|Sets preferred engine to default (non-async) one. This engine is single-threaded, +but also older and more tested. For most stable results (e.g. no async offset restore) this engine should be preferred. For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. + +|withAsyncEngine() +|Sets preferred engine to async one. This engine is multithreaded (if connector supports +it), but you must be aware of it's async nature, e.g. offset restore after restart is done +asynchronously as well, leading to sometimes confusing results. + +|setProperty(String, String) +|Sets connector property to given value. There are multiple overloads, allowing to +sets value to some `long`, `String` or `boolean`. + +|=== + === Fault Tolerance CDC sources offer at least-once processing guarantees. The source @@ -60,7 +129,7 @@ periodically saves the database write ahead log offset for which it had dispatched events and in case of a failure/restart it will replay all events since the last successfully saved offset. -Unfortunately, however, there is no guarantee that the last saved offset +Unfortunately, however, there is no guaran`tee that the last saved offset is still in the database changelog. Such logs are always finite and depending on the DB configuration can be relatively short, so if the CDC source has to replay data for a long period of inactivity, then there @@ -93,6 +162,4 @@ p.readFrom(source) The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly. If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial]. - -Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario. ==== \ No newline at end of file diff --git a/docs/modules/integrate/pages/connectors.adoc b/docs/modules/integrate/pages/connectors.adoc index 1c47de94a..c6b834ae3 100644 --- a/docs/modules/integrate/pages/connectors.adoc +++ b/docs/modules/integrate/pages/connectors.adoc @@ -115,16 +115,36 @@ The Jet API supports more connectors than SQL. |batch |N/A -|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium] (Legacy) +|xref:integrate:legacy-cdc-connectors.adoc[DebeziumCdcSources.debezium] (Legacy) |hazelcast-jet-cdc-debezium |streaming |at-least-once -|xref:integrate:cdc-ee-connectors.adoc[DebeziumCdcSources.debezium] ([.enterprise]*Enterprise*) +|xref:integrate:legacy-cdc-connectors.adoc[MySqlCdcSources.mysql] (Legacy) +|hazelcast-jet-cdc-mysql +|streaming +|exactly-once + +|xref:integrate:legacy-cdc-connectors.adoc[PostgresCdcSources.postgres] (Legacy) +|hazelcast-jet-cdc-postgres +|streaming +|exactly-once + +|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium] ([.enterprise]*Enterprise*) |hazelcast-enterprise-cdc-debezium |streaming |at-least-once +|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql] +|hazelcast-enterprise-cdc-mysql +|streaming +|exactly-once + +|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres] +|hazelcast-enterprise-cdc-postgres +|streaming +|exactly-once + |xref:integrate:elasticsearch-connector.adoc[ElasticSources.elastic] |hazelcast-jet-elasticsearch-7 |batch @@ -155,16 +175,6 @@ The Jet API supports more connectors than SQL. |streaming |exactly-once -|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql] -|hazelcast-jet-cdc-mysql -|streaming -|exactly-once - -|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres] -|hazelcast-jet-cdc-postgres -|streaming -|exactly-once - |xref:integrate:pulsar-connector.adoc[PulsarSources.pulsarConsumer] |hazelcast-jet-contrib-pulsar |streaming diff --git a/docs/modules/integrate/pages/cdc-ee-connectors.adoc b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc similarity index 54% rename from docs/modules/integrate/pages/cdc-ee-connectors.adoc rename to docs/modules/integrate/pages/legacy-cdc-connectors.adoc index ae6ea421a..6ad73e628 100644 --- a/docs/modules/integrate/pages/cdc-ee-connectors.adoc +++ b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc @@ -1,5 +1,4 @@ -= CDC Connector -[.enterprise]*Enterprise* += Legacy CDC Connector Change Data Capture (CDC) refers to the process of observing changes made to a database and extracting them in a form usable by other @@ -9,49 +8,15 @@ Change Data Capture is especially important to Hazelcast, because it allows for the _streaming of changes from databases_, which can be efficiently processed by the Jet engine. -Implementation of CDC in Hazelcast Enterprise is based on -link:https://debezium.io/[Debezium 2.x]. Hazelcast offers a generic Debezium source -which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium], +Implementation of CDC in Hazelcast is based on +link:https://debezium.io/[Debezium]. Hazelcast offers a generic Debezium source +which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium], but we're also striving to make CDC sources first class citizens in Hazelcast. The ones for MySQL and PostgreSQL already are. == Installing the Connector -This connector is included in the full distribution of Hazelcast Enterprise. - -=== Maven -For using this connector inside Maven project you can add following entries into `pom.xml`'s `` section: - -Generic connector: -```xml - - com.hazelcast.jet - hazelcast-enterprise-cdc-debezium - {full-version} - jar-with-dependencies - -``` - -MySQL-specific connector: -```xml - - com.hazelcast.jet - hazelcast-enterprise-cdc-mysql - {full-version} - jar-with-dependencies - -``` -Note: MySQL connector does not include MySQL driver as a dependency. - -PostgreSQL-specific connector: -```xml - - com.hazelcast.jet - hazelcast-enterprise-cdc-postgres - {full-version} - jar-with-dependencies - -``` +This connector is included in the full distribution of Open Source Hazelcast. == CDC as a Source @@ -74,54 +39,20 @@ For the setting up a streaming source of CDC data is just the matter of pointing Pipeline pipeline = Pipeline.create(); pipeline.readFrom( MySqlCdcSources.mysql("customers") - .setDatabaseAddress("127.0.0.1", 3306) - .setDatabaseCredentials("debezium", "dbz") + .setDatabaseAddress("127.0.0.1") + .setDatabasePort(3306) + .setDatabaseUser("debezium") + .setDatabasePassword("dbz") .setClusterName("dbserver1") - .setDatabaseIncludeList("inventory") - .setTableIncludeList("inventory.customers") + .setDatabaseWhitelist("inventory") + .setTableWhitelist("inventory.customers") .build()) .withNativeTimestamps(0) .writeTo(Sinks.logger()); ``` -MySQL- and PostgreSQL-specific source builders contain methods for all major configuration setting and it guards if -e.g. mutually exclusive options are not used. For generic source builder user must rely on Debezium's documentation -to provide all necessary options. - For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial]. -=== Common source builder functions -[cols="m,a"] -|=== -|Method name|Description - -|changeRecord() -| Sets output type to ChangeRecord - a wrapper, providing most of the fields in -strongly-typed manner. - -| json() -| Sets output type to JSON - result stage will have `Map` as it's type, -where key is SourceRecord's key in json format and value is whole SourceRecord's value in json string. - -|customMapping(RecordMappingFunction) -| Sets output type to some arbitrary user type `T`. Mapping from `SourceRecord` to `T` will -be done using provided function. - -|withDefaultEngine() -|Sets preferred engine to default (non-async) one. This engine is single-threaded, -but also older and more tested. For most stable results (e.g. no async offset restore) this engine should be preferred. For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. - -|withAsyncEngine() -|Sets preferred engine to async one. This engine is multithreaded (if connector supports -it), but you must be aware of it's async nature, e.g. offset restore after restart is done -asynchronously as well, leading to sometimes confusing results. - -|setProperty(String, String) -|Sets connector property to given value. There are multiple overloads, allowing to -sets value to some `long`, `String` or `boolean`. - -|=== - === Fault Tolerance CDC sources offer at least-once processing guarantees. The source @@ -129,7 +60,7 @@ periodically saves the database write ahead log offset for which it had dispatched events and in case of a failure/restart it will replay all events since the last successfully saved offset. -Unfortunately, however, there is no guaran`tee that the last saved offset +Unfortunately, however, there is no guarantee that the last saved offset is still in the database changelog. Such logs are always finite and depending on the DB configuration can be relatively short, so if the CDC source has to replay data for a long period of inactivity, then there @@ -162,4 +93,6 @@ p.readFrom(source) The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly. If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial]. + +Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario. ==== \ No newline at end of file diff --git a/docs/modules/integrate/partials/nav.adoc b/docs/modules/integrate/partials/nav.adoc index ddee698cc..f4b644c66 100644 --- a/docs/modules/integrate/partials/nav.adoc +++ b/docs/modules/integrate/partials/nav.adoc @@ -22,6 +22,7 @@ *** xref:integrate:database-connectors.adoc[Overview] *** xref:integrate:jdbc-connector.adoc[] *** xref:integrate:cdc-connectors.adoc[] +*** xref:integrate:legacy-cdc-connectors.adoc[] *** xref:integrate:elasticsearch-connector.adoc[] *** xref:integrate:mongodb-connector.adoc[] *** xref:integrate:influxdb-connector.adoc[] From 4e0825fcaff0ed994d6b8f3f8ded4ab00024573e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Mon, 8 Jul 2024 21:54:39 +0200 Subject: [PATCH 07/25] Apply suggestions from code review Co-authored-by: rebekah-lawrence <142301480+rebekah-lawrence@users.noreply.github.com> --- docs/modules/integrate/pages/cdc-connectors.adoc | 4 ++-- docs/modules/integrate/pages/connectors.adoc | 2 +- docs/modules/pipelines/pages/cdc.adoc | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index ae6ea421a..78ee1032a 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -118,7 +118,7 @@ asynchronously as well, leading to sometimes confusing results. |setProperty(String, String) |Sets connector property to given value. There are multiple overloads, allowing to -sets value to some `long`, `String` or `boolean`. +set the value to `long`, `String` or `boolean`. |=== @@ -129,7 +129,7 @@ periodically saves the database write ahead log offset for which it had dispatched events and in case of a failure/restart it will replay all events since the last successfully saved offset. -Unfortunately, however, there is no guaran`tee that the last saved offset +Unfortunately, however, there is no guarantee that the last saved offset is still in the database changelog. Such logs are always finite and depending on the DB configuration can be relatively short, so if the CDC source has to replay data for a long period of inactivity, then there diff --git a/docs/modules/integrate/pages/connectors.adoc b/docs/modules/integrate/pages/connectors.adoc index c6b834ae3..7a40101e1 100644 --- a/docs/modules/integrate/pages/connectors.adoc +++ b/docs/modules/integrate/pages/connectors.adoc @@ -273,7 +273,7 @@ The Jet API supports more connectors than SQL. |N/A |xref:integrate:cdc-connectors.adoc[CdcSinks.map] -|hazelcast-jet-cdc-debezium (legacy, OS) +|hazelcast-jet-cdc-debezium (legacy, {open-source-product-name}) or diff --git a/docs/modules/pipelines/pages/cdc.adoc b/docs/modules/pipelines/pages/cdc.adoc index 438011f31..19cc91bf2 100644 --- a/docs/modules/pipelines/pages/cdc.adoc +++ b/docs/modules/pipelines/pages/cdc.adoc @@ -153,7 +153,7 @@ mysql> SELECT * FROM customers; If you already have Hazelcast and you skipped the above steps, make sure to follow from here on. -. Make sure the MySQL CDC plugin is in `lib/` directory. You need to manually download the MySQL CDC plugin from Hazelcast's Maven https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-enterprise-cdc-mysql/{full-version}/hazelcast-enterprise-cdc-mysql-{full-version}-jar-with-dependencies.jar[repository] and then copy it to the `lib/` directory. +. Make sure the MySQL CDC plugin is in the `lib/` directory. You must manually download the MySQL CDC plugin from Hazelcast's Maven link:https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-enterprise-cdc-mysql/{full-version}/hazelcast-enterprise-cdc-mysql-{full-version}-jar-with-dependencies.jar[repository, window=_blank] and then copy it to the `lib/` directory. + [source,bash] ---- From 4fd775f2e7d6003a4500091e0db1615d4784dd51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Mon, 8 Jul 2024 21:56:37 +0200 Subject: [PATCH 08/25] Apply suggestions from code review Co-authored-by: rebekah-lawrence <142301480+rebekah-lawrence@users.noreply.github.com> --- docs/modules/integrate/pages/connectors.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/integrate/pages/connectors.adoc b/docs/modules/integrate/pages/connectors.adoc index 7a40101e1..86a028c37 100644 --- a/docs/modules/integrate/pages/connectors.adoc +++ b/docs/modules/integrate/pages/connectors.adoc @@ -277,7 +277,7 @@ The Jet API supports more connectors than SQL. or -hazelcast-enterprise-cdc-debezium (EE) +hazelcast-enterprise-cdc-debezium ({enterprise-product-name}) |streaming |at-least-once From a25adae0ef13f4769a4ae54a5245bd9525997659 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Tue, 9 Jul 2024 19:16:48 +0200 Subject: [PATCH 09/25] Review comments --- docs/modules/pipelines/pages/cdc-join.adoc | 97 ++++++++ .../modules/pipelines/pages/cdc-postgres.adoc | 222 +++++++++++++----- 2 files changed, 256 insertions(+), 63 deletions(-) diff --git a/docs/modules/pipelines/pages/cdc-join.adoc b/docs/modules/pipelines/pages/cdc-join.adoc index 04c1043df..2d17b38ad 100644 --- a/docs/modules/pipelines/pages/cdc-join.adoc +++ b/docs/modules/pipelines/pages/cdc-join.adoc @@ -45,6 +45,11 @@ They differ slightly depending on which database you use: Let's write the code for the processing we want to accomplish: +[tabs] +==== +{enterprise-product-name}:: ++ +-- [source,java] ---- package org.example; @@ -98,9 +103,72 @@ public static void main(String[] args) { } } ---- +-- +{open-source-product-name}:: ++ +-- +[source,java] +---- +package org.example; + +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.jet.cdc.ChangeRecord; +import com.hazelcast.jet.cdc.mysql.MySqlCdcSources; +import com.hazelcast.jet.config.JobConfig; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.StreamSource; +import com.hazelcast.jet.pipeline.StreamStage; + +public class JetJob { + +private static final int MAX_CONCURRENT_OPERATIONS = 1; + +public static void main(String[] args) { + StreamSource source = MySqlCdcSources.mysql("source") + .setDatabaseAddress("127.0.0.1", 3306) + .setDatabaseCredentials("debezium", "dbz") + .setClusterName("dbserver1") + .setDatabaseWhitelist("inventory") + .setTableWhitelist("inventory.customers", "inventory.orders") + .build(); + + Pipeline pipeline = Pipeline.create(); + StreamStage allRecords = pipeline.readFrom(source) + .withNativeTimestamps(0); + + allRecords.filter(r -> r.table().equals("customers")) + .apply(Ordering::fix) + .peek() + .writeTo(Sinks.mapWithEntryProcessor(MAX_CONCURRENT_OPERATIONS, "cache", + record -> (Integer) record.key().toMap().get("id"), + CustomerEntryProcessor::new + )); + + allRecords.filter(r -> r.table().equals("orders")) + .apply(Ordering::fix) + .peek() + .writeTo(Sinks.mapWithEntryProcessor(MAX_CONCURRENT_OPERATIONS, "cache", + record -> (Integer) record.value().toMap().get("purchaser"), + OrderEntryProcessor::new + )); + + JobConfig cfg = new JobConfig().setName("monitor"); + HazelcastInstance hz = Hazelcast.bootstrappedInstance(); + hz.getJet().newJob(pipeline, cfg); + } +} +---- +-- +==== If using Postgres, only the source would need to change, like this: +[tabs] +==== +{enterprise-product-name}:: ++ [source,java] ---- StreamSource source = PostgresCdcSources.postgres("source") @@ -110,6 +178,20 @@ StreamSource source = PostgresCdcSources.postgres("source") .setTableIncludeList("inventory.customers", "inventory.orders") .build(); ---- +{open-source-product-name}:: ++ +[source,java] +---- +StreamSource source = PostgresCdcSources.postgres("source") + .setDatabaseAddress("127.0.0.1") + .setDatabasePort(5432) + .setDatabaseUser("postgres") + .setDatabasePassword("postgres") + .setDatabaseName("postgres") + .setTableIncludeList("inventory.customers", "inventory.orders") + .build(); +---- +==== As we can see from the pipeline code, our `Sink` is `EntryProcessor` based. The two `EntryProcessors` we use are: @@ -516,10 +598,25 @@ ls lib/ ---- + You should see the following jars: + +[tabs] +==== +{enterprise-product-name}:: + +-- * hazelcast-enterprise-cdc-debezium-{full-version}.jar * hazelcast-enterprise-cdc-mysql-{full-version}.jar (for MySQL) * hazelcast-enterprise-cdc-postgres-{full-version}.jar (for Postgres) +-- +{open-source-product-name}:: ++ +-- +* hazelcast-jet-cdc-debezium-{full-version}.jar +* hazelcast-jet-cdc-mysql-{full-version}.jar (for MySQL) +* hazelcast-jet-cdc-postgres-{full-version}.jar (for Postgres) +-- +==== + . Enable user code deployment: + diff --git a/docs/modules/pipelines/pages/cdc-postgres.adoc b/docs/modules/pipelines/pages/cdc-postgres.adoc index ca77ddbc0..171a62300 100644 --- a/docs/modules/pipelines/pages/cdc-postgres.adoc +++ b/docs/modules/pipelines/pages/cdc-postgres.adoc @@ -23,11 +23,12 @@ Open a terminal, and run following command. It will start a new container that runs a PostgreSQL database server preconfigured with an inventory database: -```bash +[source,bash] +---- docker run -it --rm --name postgres -p 5432:5432 \ -e POSTGRES_DB=postgres -e POSTGRES_USER=postgres \ -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.2 -``` +---- This runs a new container using version `1.2` of the link:https://hub.docker.com/r/debezium/example-postgres[debezium/example-postgres] @@ -50,7 +51,8 @@ container to the same port on the Docker host so that software outside In your terminal you should see something like the following: -``` +[source] +---- ... PostgreSQL init process complete; ready for start up. @@ -58,7 +60,7 @@ PostgreSQL init process complete; ready for start up. 2020-06-02 11:36:19.581 GMT [1] LOG: listening on IPv6 address "::", port 5432 2020-06-02 11:36:19.585 GMT [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432" 2020-06-02 11:36:19.618 GMT [1] LOG: database system is ready to accept connections -``` +---- The PostgreSQL server is running and ready for use. @@ -67,35 +69,40 @@ The PostgreSQL server is running and ready for use. Open a new terminal, and use it to run `psql` (PostgreSQL interactive terminal) inside the already running `postgres` container: -```bash +[source,bash] +---- docker exec -it postgres psql -U postgres -``` +---- You should end up with a prompt similar to this: -``` +[source] +---- psql (11.8 (Debian 11.8-1.pgdg90+1)) Type "help" for help. postgres=# -``` +---- We’ll use the prompt to interact with the database. First, switch to the "inventory" schema: -```bash +[source,bash] +---- SET search_path TO inventory; -``` +---- and then list the tables in the database: -```bash +[source,bash] +---- \dt; -``` +---- This should display the following: -``` +[source] +------------ List of relations Schema | Name | Type | Owner -----------+------------------+-------+---------- @@ -106,14 +113,15 @@ This should display the following: inventory | products_on_hand | table | postgres inventory | spatial_ref_sys | table | postgres (6 rows) -``` +------------ Feel free to explore the database and view the preloaded data. For example: -```bash +[source,bash] +---- SELECT * FROM customers; -``` +---- == Step 4. Start Hazelcast @@ -124,28 +132,44 @@ follow from here on. . Make sure the PostgreSQL CDC plugin is in the `lib/` directory. + -```bash +[source,bash] +---- ls lib/ -``` +---- + You should see the following jars: + +[tabs] +==== +{enterprise-product-name}:: ++ +-- +* hazelcast-enterprise-cdc-debezium-{full-version}.jar +* hazelcast-enterprise-cdc-postgres-{full-version}.jar (for Postgres) +-- +{open-source-product-name}:: ++ +-- * hazelcast-jet-cdc-debezium-{full-version}.jar -* hazelcast-jet-cdc-postgres-{full-version}.jar +* hazelcast-jet-cdc-postgres-{full-version}.jar (for Postgres) +-- +==== . Start Hazelcast + -```bash +[source,bash] +---- bin/hz-start -``` +---- . When you see output like this, Hazelcast is up: + -``` +[source] +---- Members {size:1, ver:1} [ Member [192.168.1.5]:5701 - e7c26f7c-df9e-4994-a41d-203a1c63480e this ] -``` +---- == Step 5. Create a New Java Project @@ -171,8 +195,8 @@ repositories.mavenCentral() dependencies { implementation 'com.hazelcast:hazelcast:{full-version}' - implementation 'com.hazelcast.jet:hazelcast-jet-cdc-debezium:{full-version}' - implementation 'com.hazelcast.jet:hazelcast-jet-cdc-postgres:{full-version}' + implementation 'com.hazelcast.jet:hazelcast-enterprise-cdc-debezium:{full-version}' + implementation 'com.hazelcast.jet:hazelcast-enterprise-cdc-postgres:{full-version}' implementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.0' } @@ -206,12 +230,12 @@ Maven:: com.hazelcast.jet - hazelcast-jet-cdc-debezium + hazelcast-enterprise-cdc-debezium {full-version} com.hazelcast.jet - hazelcast-jet-cdc-postgres + hazelcast-enterprise-cdc-postgres {full-version} @@ -241,6 +265,9 @@ Maven:: -- ==== +If you are using {open-source-product-name}, you have to replace `hazelcast-enterprise-cdc-debezium` +with `hazelcast-jet-cdc-debezium` and `hazelcast-enterprise-cdc-postgres` with `hazelcast-jet-cdc-postgres`. + == Step 6. Define Data Pipeline Let's write the code that will monitor the database and do something @@ -254,7 +281,57 @@ customer with a specific ID. This is how the code doing this looks like: -```java +[tabs] +==== +{enterprise-product-name}:: ++ +-- + + +[source,java] +---- +package org.example; + +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.jet.cdc.CdcSinks; +import com.hazelcast.jet.cdc.ChangeRecord; +import com.hazelcast.jet.cdc.postgres.PostgresCdcSources; +import com.hazelcast.jet.config.JobConfig; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.StreamSource; + +public class JetJob { + + public static void main(String[] args) { + StreamSource source = PostgresCdcSources.postgres("source") + .setDatabaseAddress("127.0.0.1", 5432) + .setDatabaseCredentials("postgres", "postgres") + .setDatabaseName("postgres") + .setTableIncludeList("inventory.customers") + .build(); + + Pipeline pipeline = Pipeline.create(); + pipeline.readFrom(source) + .withoutTimestamps() + .peek() + .writeTo(CdcSinks.map("customers", + r -> r.key().toMap().get("id"), + r -> r.value().toObject(Customer.class).toString())); + + JobConfig cfg = new JobConfig().setName("postgres-monitor"); + HazelcastInstance hz = Hazelcast.bootstrappedInstance(); + hz.getJet().newJob(pipeline, cfg); + } + +} +---- +-- +{open-source-product-name}:: ++ +-- +[source,java] +---- package org.example; import com.hazelcast.core.Hazelcast; @@ -292,11 +369,14 @@ public class JetJob { } } -``` +---- +-- +==== The `Customer` class we map change events to is quite simple too: -```java +[source,java] +---- package org.example; import com.fasterxml.jackson.annotation.JsonProperty; @@ -354,14 +434,15 @@ public class Customer implements Serializable { return "Customer {id=" + id + ", firstName=" + firstName + ", lastName=" + lastName + ", email=" + email + '}'; } } -``` +---- To make it evident that our pipeline serves the purpose of building an up-to-date cache of customers, which can be interrogated at any time let's add one more class. This code can be executed at any time in your IDE and will print the current content of the cache. -```java +[source,java] +---- package org.example; import com.hazelcast.client.HazelcastClient; @@ -379,7 +460,7 @@ public class CacheRead { } } -``` +---- == Step 7. Package the Pipeline into a JAR @@ -395,9 +476,10 @@ need to do is to run the build command: Gradle:: + -- -```bash +[source,bash] +---- gradle build -``` +---- This will produce a JAR file called `cdc-tutorial-1.0-SNAPSHOT.jar` in the `build/libs` directory of our project. @@ -405,9 +487,10 @@ in the `build/libs` directory of our project. Maven:: + -- -```bash +[source,bash] +---- mvn package -``` +---- This will produce a JAR file called `cdc-tutorial-1.0-SNAPSHOT.jar` in the `target` directory or our project. @@ -424,16 +507,18 @@ issue is following command: Gradle:: + -- -```bash +[source,bash] +---- bin/hz-cli submit build/libs/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- Maven:: + -- -```bash +[source,bash] +---- bin/hz-cli submit target/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- ==== @@ -441,7 +526,8 @@ The output in the Hazelcast member's log should look something like this (we also log what we put in the `IMap` sink thanks to the `peek()` stage we inserted): -``` +[source] +---- ... Snapshot ended with SnapshotResult [...] ... Obtained valid replication slot ReplicationSlot [...] ... REPLICA IDENTITY for 'inventory.customers' is 'FULL'; UPDATE AND DELETE events will contain the previous values of all the columns @@ -450,85 +536,95 @@ we inserted): ... Output to ordinal 0: key:{{"id":1003}}, value:{{"id":1003,"first_name":"Edward","last_name":"Walker",... ... Output to ordinal 0: key:{{"id":1004}}, value:{{"id":1004,"first_name":"Anne","last_name":"Kretchmar",... ... Transitioning from the snapshot reader to the binlog reader -``` +---- == Step 9. Track Updates Let's see how our cache looks like at this time. If we execute the `CacheRead` code <<6-define-jet-job, defined above>>, we'll get: -``` +[source] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com} Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- Let's do some updates in our database. Go to the PostgreSQL CLI <<3-start-postgresql-command-line-client, we've started earlier>> and run following update statement: -```bash +[source,bash] +---- UPDATE customers SET first_name='Anne Marie' WHERE id=1004; -``` +---- In the log of the Hazelcast member we should immediately see the effect: -``` +[source] +---- ... Output to ordinal 0: key:{{"id":1004}}, value:{{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar",... -``` +---- If we check the cache with `CacheRead` we get: -``` +[source] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com} Customer {id=1004, firstName=Anne Marie, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- One more: -```bash +[source,bash] +---- UPDATE customers SET email='edward.walker@walker.com' WHERE id=1003; -``` +---- -``` +[source] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=edward.walker@walker.com} Customer {id=1004, firstName=Anne Marie, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- == Step 10. Clean Up . Cancel the job. + -```bash +[source,bash] +---- bin/hz-cli cancel postgres-monitor -``` +---- Shut down the Hazelcast cluster. + -```bash +[source,bash] +---- bin/hz-stop -``` +---- . Use Docker to stop the running container (this will kill the command-line client too, since it's running in the same container): + -```bash +[source,bash] +---- docker stop postgres -``` +---- + Since we've used the `--rm` flag when starting the connectors, Docker should remove them right after we stop them. We can verify that all processes are stopped and removed with following command: -```bash +[source,bash] +---- docker ps -a -``` +---- From 3642046c2da30edccd63d493594ca8a8b00367b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Tue, 9 Jul 2024 19:33:02 +0200 Subject: [PATCH 10/25] CRs --- .../integrate/pages/cdc-connectors.adoc | 89 ++++++++++--------- .../pages/legacy-cdc-connectors.adoc | 40 ++++----- 2 files changed, 65 insertions(+), 64 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 78ee1032a..f5bbacb66 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -9,68 +9,73 @@ Change Data Capture is especially important to Hazelcast, because it allows for the _streaming of changes from databases_, which can be efficiently processed by the Jet engine. -Implementation of CDC in Hazelcast Enterprise is based on -link:https://debezium.io/[Debezium 2.x]. Hazelcast offers a generic Debezium source -which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium], -but we're also striving to make CDC sources first class citizens in Hazelcast. -The ones for MySQL and PostgreSQL already are. +Implementation of CDC in Hazelcast {enterprise-product-name} is based on +link:https://debezium.io/[Debezium 2.x, window=_blank]. Hazelcast offers a generic Debezium source +which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium, window=_blank], +However, we're also striving to make CDC sources first class citizens in Hazelcast, +as we have done already for MySQL and PostgreSQL already are. == Installing the Connector -This connector is included in the full distribution of Hazelcast Enterprise. +This connector is included in the full distribution of Hazelcast {enterprise-product-name}. === Maven -For using this connector inside Maven project you can add following entries into `pom.xml`'s `` section: +To use this connector in a Maven project, add the following entries to the `` section of your `pom.xml` file: Generic connector: -```xml + +[source,xml] +---- com.hazelcast.jet hazelcast-enterprise-cdc-debezium {full-version} jar-with-dependencies -``` +---- MySQL-specific connector: -```xml + +[source,xml] +---- com.hazelcast.jet hazelcast-enterprise-cdc-mysql {full-version} jar-with-dependencies -``` -Note: MySQL connector does not include MySQL driver as a dependency. +---- +NOTE: MySQL connector does not include the MySQL driver as a dependency. PostgreSQL-specific connector: -```xml + +[source,xml] +---- com.hazelcast.jet hazelcast-enterprise-cdc-postgres {full-version} jar-with-dependencies -``` +---- == CDC as a Source -We have the following types of CDC sources: +The Java API supports the following types of CDC source: -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources]: - generic source for all databases supported by Debezium -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources]: - specific, first class Jet CDC source for MySQL databases (also based - on Debezium, but benefiting the full range of convenience Jet can - additionally provide) -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources]: - specific, first class CDC source for PostgreSQL databases (also based - on Debezium, but benefiting the full range of convenience Hazelcast can - additionally provide) +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources, window=_blank]: + a generic source for all databases supported by Debezium +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]: + a specific, first class Jet CDC source for MySQL databases (also based + on Debezium, but with the additional benefits provided by Hazelcast +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]: + a specific, first class CDC source for PostgreSQL databases (also based +on Debezium, but with the additional benefits provided by Hazelcast -For the setting up a streaming source of CDC data is just the matter of pointing it at the right database via configuration: +To set up a streaming source of CDC data, define it using the following configuration: -```java +[source,java] +---- Pipeline pipeline = Pipeline.create(); pipeline.readFrom( MySqlCdcSources.mysql("customers") @@ -82,13 +87,11 @@ pipeline.readFrom( .build()) .withNativeTimestamps(0) .writeTo(Sinks.logger()); -``` +---- -MySQL- and PostgreSQL-specific source builders contain methods for all major configuration setting and it guards if -e.g. mutually exclusive options are not used. For generic source builder user must rely on Debezium's documentation -to provide all necessary options. +MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the link:https://debezium.io/documentation/reference/stable/index.html[Debezium, window=_blank] documentation -For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial]. +Follow the provided xref:pipelines:cdc.adoc[] tutorial to see how CDC processes change events from a MySQL database. === Common source builder functions [cols="m,a"] @@ -96,25 +99,22 @@ For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial]. |Method name|Description |changeRecord() -| Sets output type to ChangeRecord - a wrapper, providing most of the fields in +| Sets output type to `ChangeRecord` - a wrapper, which provides most of the fields in strongly-typed manner. | json() -| Sets output type to JSON - result stage will have `Map` as it's type, -where key is SourceRecord's key in json format and value is whole SourceRecord's value in json string. +| Sets output type to `JSON` - in the result stage, the type will be set to `Map`, +where map entry's key is the key of `SourceRecord` in JSON format and value is whole `SourceRecord`'s value in JSON format. |customMapping(RecordMappingFunction) -| Sets output type to some arbitrary user type `T`. Mapping from `SourceRecord` to `T` will -be done using provided function. +| Sets the output type to an arbitrary user type, `T`. Mapping from `SourceRecord` to `T` is done using provided function by the connector. |withDefaultEngine() -|Sets preferred engine to default (non-async) one. This engine is single-threaded, -but also older and more tested. For most stable results (e.g. no async offset restore) this engine should be preferred. For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. +|Sets the preferred engine to the default (non-async) one. This engine is single-threaded, +but also older and more tested. Use this engine for most stable results (for example, no async offset restore). For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. |withAsyncEngine() -|Sets preferred engine to async one. This engine is multithreaded (if connector supports -it), but you must be aware of it's async nature, e.g. offset restore after restart is done -asynchronously as well, leading to sometimes confusing results. +|Sets the preferred engine to the async one. This engine is multithreaded (if supported by the connector), but you must be aware of the async nature; for example, offset restore may occur asynchronously after the restart is done, leading to sometimes confusing results. |setProperty(String, String) |Sets connector property to given value. There are multiple overloads, allowing to @@ -148,14 +148,15 @@ For example, a sink mapping CDC data to a `Customer` class and maintaining a map view of latest known email addresses per customer (identified by ID) would look like this: -```java +[source,java] +---- Pipeline p = Pipeline.create(); p.readFrom(source) .withoutTimestamps() .writeTo(CdcSinks.map("customers", r -> r.key().toMap().get("id"), r -> r.value().toObject(Customer.class).email)); -``` +---- [NOTE] ==== diff --git a/docs/modules/integrate/pages/legacy-cdc-connectors.adoc b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc index 6ad73e628..a0aa9afc3 100644 --- a/docs/modules/integrate/pages/legacy-cdc-connectors.adoc +++ b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc @@ -8,11 +8,11 @@ Change Data Capture is especially important to Hazelcast, because it allows for the _streaming of changes from databases_, which can be efficiently processed by the Jet engine. -Implementation of CDC in Hazelcast is based on -link:https://debezium.io/[Debezium]. Hazelcast offers a generic Debezium source -which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium], -but we're also striving to make CDC sources first class citizens in Hazelcast. -The ones for MySQL and PostgreSQL already are. +Implementation of CDC in Hazelcast {open-source-product-name} is based on +link:https://debezium.io/[Debezium, window=_blank]. Hazelcast offers a generic Debezium source +which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium, window=_blank]. +However, we're also striving to make CDC sources first class citizens in Hazelcast, +as we have done already for MySQL and PostgreSQL already are. == Installing the Connector @@ -22,20 +22,19 @@ This connector is included in the full distribution of Open Source Hazelcast. We have the following types of CDC sources: -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources]: - generic source for all databases supported by Debezium -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources]: - specific, first class Jet CDC source for MySQL databases (also based - on Debezium, but benefiting the full range of convenience Jet can - additionally provide) -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources]: - specific, first class CDC source for PostgreSQL databases (also based - on Debezium, but benefiting the full range of convenience Hazelcast can - additionally provide) +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources, window=_blank]: + a generic source for all databases supported by Debezium +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]: + a specific, first class Jet CDC source for MySQL databases (also based + on Debezium, but with the additional benefits provided by Hazelcast +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]: + a specific, first class CDC source for PostgreSQL databases (also based + on Debezium, but with the additional benefits provided by Hazelcast -For the setting up a streaming source of CDC data is just the matter of pointing it at the right database via configuration: +To set up a streaming source of CDC data, define it using the following configuration: -```java +[source,java] +---- Pipeline pipeline = Pipeline.create(); pipeline.readFrom( MySqlCdcSources.mysql("customers") @@ -49,7 +48,7 @@ pipeline.readFrom( .build()) .withNativeTimestamps(0) .writeTo(Sinks.logger()); -``` +---- For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial]. @@ -79,14 +78,15 @@ For example, a sink mapping CDC data to a `Customer` class and maintaining a map view of latest known email addresses per customer (identified by ID) would look like this: -```java +[source,java] +---- Pipeline p = Pipeline.create(); p.readFrom(source) .withoutTimestamps() .writeTo(CdcSinks.map("customers", r -> r.key().toMap().get("id"), r -> r.value().toObject(Customer.class).email)); -``` +---- [NOTE] ==== From 9d68f3ce7cebe56fc7ae119779621c20fe23ea30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Tue, 9 Jul 2024 19:36:28 +0200 Subject: [PATCH 11/25] Post-merge --- docs/modules/ROOT/nav.adoc | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index fef9a3738..1b3863611 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -195,6 +195,7 @@ include::wan:partial$nav.adoc[] ** xref:integrate:database-connectors.adoc[Overview] ** xref:integrate:jdbc-connector.adoc[] ** xref:integrate:cdc-connectors.adoc[] +** xref:integrate:legacy-cdc-connectors.adoc[] ** xref:integrate:elasticsearch-connector.adoc[] ** xref:integrate:mongodb-connector.adoc[] ** xref:integrate:influxdb-connector.adoc[] From 28df78e42650d78e9701476a3b37ca5c41c8e2ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Wed, 17 Jul 2024 13:27:37 +0200 Subject: [PATCH 12/25] Remove jar-with-dependencies --- docs/modules/integrate/pages/cdc-connectors.adoc | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index f5bbacb66..fbfb0559f 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -30,7 +30,6 @@ Generic connector: com.hazelcast.jet hazelcast-enterprise-cdc-debezium {full-version} - jar-with-dependencies ---- @@ -42,7 +41,6 @@ MySQL-specific connector: com.hazelcast.jet hazelcast-enterprise-cdc-mysql {full-version} - jar-with-dependencies ---- NOTE: MySQL connector does not include the MySQL driver as a dependency. @@ -55,7 +53,6 @@ PostgreSQL-specific connector: com.hazelcast.jet hazelcast-enterprise-cdc-postgres {full-version} - jar-with-dependencies ---- From 13132395302d0ce5a952a050381a8f6875f887eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Fri, 19 Jul 2024 17:52:19 +0200 Subject: [PATCH 13/25] Changed package name --- .../integrate/pages/cdc-connectors.adoc | 6 ++--- docs/modules/pipelines/pages/cdc-join.adoc | 22 ++++++++++--------- .../modules/pipelines/pages/cdc-postgres.adoc | 8 +++---- docs/modules/pipelines/pages/cdc.adoc | 8 ++++--- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index fbfb0559f..6654258a0 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -60,12 +60,12 @@ PostgreSQL-specific connector: The Java API supports the following types of CDC source: -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources, window=_blank]: +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources, window=_blank]: a generic source for all databases supported by Debezium -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]: +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]: a specific, first class Jet CDC source for MySQL databases (also based on Debezium, but with the additional benefits provided by Hazelcast -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]: +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]: a specific, first class CDC source for PostgreSQL databases (also based on Debezium, but with the additional benefits provided by Hazelcast diff --git a/docs/modules/pipelines/pages/cdc-join.adoc b/docs/modules/pipelines/pages/cdc-join.adoc index 2d17b38ad..c287bbb5b 100644 --- a/docs/modules/pipelines/pages/cdc-join.adoc +++ b/docs/modules/pipelines/pages/cdc-join.adoc @@ -3,6 +3,8 @@ In this tutorial, you will learn how to make a map hold enriched data, combined (joined) from multiple database tables. +NOTE: If you are using Hazelcast {open-source-product-name}, you have to change the package from `com.hazelcast.enterprise.jet...` to `com.hazelcast.jet...`. + == Step 1. Install Docker This tutorial uses link:https://www.docker.com/[Docker] to simplify the @@ -56,8 +58,8 @@ package org.example; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; -import com.hazelcast.jet.cdc.ChangeRecord; -import com.hazelcast.jet.cdc.mysql.MySqlCdcSources; +import com.hazelcast.enterprise.jet.cdc.ChangeRecord; +import com.hazelcast.enterprise.jet.cdc.mysql.MySqlCdcSources; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; @@ -200,9 +202,9 @@ based. The two `EntryProcessors` we use are: ---- package org.example; -import com.hazelcast.jet.cdc.ChangeRecord; -import com.hazelcast.jet.cdc.Operation; -import com.hazelcast.jet.cdc.ParsingException; +import com.hazelcast.enterprise.jet.cdc.ChangeRecord; +import com.hazelcast.enterprise.jet.cdc.Operation; +import com.hazelcast.enterprise.jet.cdc.ParsingException; import com.hazelcast.map.EntryProcessor; import java.util.Map; @@ -245,9 +247,9 @@ public class CustomerEntryProcessor implements EntryProcessor Date: Tue, 20 Aug 2024 18:33:21 +0200 Subject: [PATCH 14/25] CR --- .../integrate/pages/cdc-connectors.adoc | 46 ++++++++++++++++++- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 6654258a0..c2176c8ba 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -43,7 +43,7 @@ MySQL-specific connector: {full-version} ---- -NOTE: MySQL connector does not include the MySQL driver as a dependency. +NOTE: Due to licensing, MySQL connector does not include the MySQL driver as a dependency. You have to manually add dependency `com.mysql:mysql-connector-j` to the classpath. PostgreSQL-specific connector: @@ -71,6 +71,11 @@ on Debezium, but with the additional benefits provided by Hazelcast To set up a streaming source of CDC data, define it using the following configuration: +[tabs] +==== +MySQL:: ++ +-- [source,java] ---- Pipeline pipeline = Pipeline.create(); @@ -85,8 +90,45 @@ pipeline.readFrom( .withNativeTimestamps(0) .writeTo(Sinks.logger()); ---- +-- +PostgreSQL:: ++ +-- +[source,java] +---- +Pipeline pipeline = Pipeline.create(); +pipeline.readFrom( + PostgresCdcSources.postgres("customers") + .setDatabaseAddress("127.0.0.1", 5432) + .setDatabaseCredentials("debezium", "dbz") + .setClusterName("dbserver1") + .setDatabaseIncludeList("inventory") + .setTableIncludeList("inventory.customers") + .build()) + .withNativeTimestamps(0) + .writeTo(Sinks.logger()); +---- +-- +MongoDB:: ++ +-- +[source,java] +---- +Pipeline pipeline = Pipeline.create(); +pipeline.readFrom( + DebeziumCdcSources.debezium("customers", MongoDbConnector.class) + .setProperty("mongodb.connection.string", "mongodb://localhost:27017") + .setDatabaseIncludeList("inventory") + .setProperty("collection.include.list", "customers") + .build()) + .withNativeTimestamps(0) + .writeTo(Sinks.logger()); +---- +-- + +==== -MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the link:https://debezium.io/documentation/reference/stable/index.html[Debezium, window=_blank] documentation +MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the link:https://debezium.io/documentation/reference/stable/index.html[Debezium, window=_blank] documentation for the information about possible required or mutually exclusive fields. Follow the provided xref:pipelines:cdc.adoc[] tutorial to see how CDC processes change events from a MySQL database. From d03fbb37e9baebb6a3c6e917f923315b8d849c34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 22 Aug 2024 17:17:41 +0200 Subject: [PATCH 15/25] Added migration guide --- docs/modules/integrate/pages/cdc-connectors.adoc | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index c2176c8ba..9f8f5a34a 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -202,4 +202,13 @@ p.readFrom(source) The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly. If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial]. -==== \ No newline at end of file +==== + +== Migration Guide + +Hazelcast {open-source-product-name} has Debezium CDC connector, however it is based on older version of Debezium. +While migrating to the new connector should be very simple, there are some things users should remember: + + * Package was changed from `com.hazelcast.jet.cdc` to `com.hazelcast.enterprise.jet.cdc` + * Artifact names are now `hazelcast-enterprise-cdc-debezium`, `hazelcast-enterprise-cdc-mysql` and `hazelcast-enterprise-cdc-postgres` (instead of `hazelcast-jet-...`). + * Debezium replaced all `whitelist`s with `include list`s and `blacklist`s with `exclude list`s, Hazelcast for consistency followed the same naming structure and instead of e.g. `setTableWhitelist` you need to use `setTableIncludeList` method. \ No newline at end of file From e5627949d2999c8819029e4dc13e5cfd2fcc8b53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 5 Sep 2024 18:44:01 +0200 Subject: [PATCH 16/25] CR Suggestions Co-authored-by: Oliver Howell --- .../integrate/pages/cdc-connectors.adoc | 22 +++++++++---------- .../pages/legacy-cdc-connectors.adoc | 22 +++++++++---------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 9f8f5a34a..8943417f6 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -9,11 +9,11 @@ Change Data Capture is especially important to Hazelcast, because it allows for the _streaming of changes from databases_, which can be efficiently processed by the Jet engine. -Implementation of CDC in Hazelcast {enterprise-product-name} is based on +The implementation of CDC in Hazelcast {enterprise-product-name} is based on link:https://debezium.io/[Debezium 2.x, window=_blank]. Hazelcast offers a generic Debezium source which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium, window=_blank], However, we're also striving to make CDC sources first class citizens in Hazelcast, -as we have done already for MySQL and PostgreSQL already are. +as we have done already for MySQL and PostgreSQL. == Installing the Connector @@ -43,7 +43,7 @@ MySQL-specific connector: {full-version} ---- -NOTE: Due to licensing, MySQL connector does not include the MySQL driver as a dependency. You have to manually add dependency `com.mysql:mysql-connector-j` to the classpath. +NOTE: Due to licensing, MySQL connector does not include the MySQL driver as a dependency. You have to manually add the `com.mysql:mysql-connector-j` dependency to the classpath. PostgreSQL-specific connector: @@ -69,7 +69,7 @@ The Java API supports the following types of CDC source: a specific, first class CDC source for PostgreSQL databases (also based on Debezium, but with the additional benefits provided by Hazelcast -To set up a streaming source of CDC data, define it using the following configuration: +To set up a CDC data streaming source, define it using the following configuration: [tabs] ==== @@ -128,7 +128,7 @@ pipeline.readFrom( ==== -MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the link:https://debezium.io/documentation/reference/stable/index.html[Debezium, window=_blank] documentation for the information about possible required or mutually exclusive fields. +MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the link:https://debezium.io/documentation/reference/stable/index.html[Debezium, window=_blank] documentation for the information about required or mutually exclusive fields. Follow the provided xref:pipelines:cdc.adoc[] tutorial to see how CDC processes change events from a MySQL database. @@ -139,18 +139,18 @@ Follow the provided xref:pipelines:cdc.adoc[] tutorial to see how CDC processes |changeRecord() | Sets output type to `ChangeRecord` - a wrapper, which provides most of the fields in -strongly-typed manner. +a strongly-typed manner. | json() | Sets output type to `JSON` - in the result stage, the type will be set to `Map`, -where map entry's key is the key of `SourceRecord` in JSON format and value is whole `SourceRecord`'s value in JSON format. +where the map entry's key is the key of `SourceRecord` in JSON format, and the value is the whole `SourceRecord`'s value in JSON format. |customMapping(RecordMappingFunction) | Sets the output type to an arbitrary user type, `T`. Mapping from `SourceRecord` to `T` is done using provided function by the connector. |withDefaultEngine() |Sets the preferred engine to the default (non-async) one. This engine is single-threaded, -but also older and more tested. Use this engine for most stable results (for example, no async offset restore). For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. +but also more widely used and tested. Use this engine for the most stable results (for example, no async offset restore). For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. |withAsyncEngine() |Sets the preferred engine to the async one. This engine is multithreaded (if supported by the connector), but you must be aware of the async nature; for example, offset restore may occur asynchronously after the restart is done, leading to sometimes confusing results. @@ -206,9 +206,9 @@ If user code has to be used, then the problem can be solved with the help of the == Migration Guide -Hazelcast {open-source-product-name} has Debezium CDC connector, however it is based on older version of Debezium. -While migrating to the new connector should be very simple, there are some things users should remember: +Hazelcast {open-source-product-name} has a Debezium CDC connector but it's based on an older version of Debezium. +Migration to the new connector is straightforward but be aware of the following changes: * Package was changed from `com.hazelcast.jet.cdc` to `com.hazelcast.enterprise.jet.cdc` * Artifact names are now `hazelcast-enterprise-cdc-debezium`, `hazelcast-enterprise-cdc-mysql` and `hazelcast-enterprise-cdc-postgres` (instead of `hazelcast-jet-...`). - * Debezium replaced all `whitelist`s with `include list`s and `blacklist`s with `exclude list`s, Hazelcast for consistency followed the same naming structure and instead of e.g. `setTableWhitelist` you need to use `setTableIncludeList` method. \ No newline at end of file + * Debezium replaced all `whitelist`s with `include list`s and `blacklist`s with `exclude list`s, which we have replicated in our naming; so, for example, use `setTableIncludeList` instead of `setTableWhitelist`. \ No newline at end of file diff --git a/docs/modules/integrate/pages/legacy-cdc-connectors.adoc b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc index a0aa9afc3..6a27f4d6f 100644 --- a/docs/modules/integrate/pages/legacy-cdc-connectors.adoc +++ b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc @@ -12,11 +12,11 @@ Implementation of CDC in Hazelcast {open-source-product-name} is based on link:https://debezium.io/[Debezium, window=_blank]. Hazelcast offers a generic Debezium source which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium, window=_blank]. However, we're also striving to make CDC sources first class citizens in Hazelcast, -as we have done already for MySQL and PostgreSQL already are. +as we have done already for MySQL and PostgreSQL. == Installing the Connector -This connector is included in the full distribution of Open Source Hazelcast. +This connector is included in the full distribution of Hazelcast {open-source-product-name}. == CDC as a Source @@ -54,23 +54,21 @@ For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial]. === Fault Tolerance -CDC sources offer at least-once processing guarantees. The source -periodically saves the database write ahead log offset for which it had +CDC sources offer _at least once_ processing guarantees. The source +periodically saves the database write ahead log offset for which it has dispatched events and in case of a failure/restart it will replay all events since the last successfully saved offset. -Unfortunately, however, there is no guarantee that the last saved offset -is still in the database changelog. Such logs are always finite and -depending on the DB configuration can be relatively short, so if the CDC +Unfortunately, there is no guarantee that the last saved offset +is still in the database changelog. Such logs are always finite and, +depending on the DB configuration, can be relatively short, so if the CDC source has to replay data for a long period of inactivity, then there -can be a data loss. With careful management though we can say that -at-least once guarantee can practically be provided. +can be data loss. With careful management, the _at least once_ guarantee can be practically implemented. == CDC as a Sink -Change data capture is a source-side functionality in Jet, but we also -offer some specialized sinks that simplify applying CDC events to a map, which gives you the ability to reconstruct the contents of the -original database table. The sinks expect to receive `ChangeRecord` +Change data capture is a source-side functionality in Jet, but Hazelcast also +offers specialized sinks that simplify applying CDC events to a map, which gives you the ability to reconstruct the contents of the original database table. The sinks expect to receive `ChangeRecord` objects and apply your custom functions to them that extract the key and the value that will be applied to the target map. From 14ba5e936b7c01712b1ec9b2f0ba804899c0800a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 5 Sep 2024 18:49:22 +0200 Subject: [PATCH 17/25] CR --- docs/modules/integrate/pages/cdc-connectors.adoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 8943417f6..c5645b865 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -204,11 +204,11 @@ The key and value functions have certain limitations. They can be used to map on If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial]. ==== -== Migration Guide +== Migration Tips -Hazelcast {open-source-product-name} has a Debezium CDC connector but it's based on an older version of Debezium. +Hazelcast {open-source-product-name} has a Debezium CDC connector, but it's based on an older version of Debezium. Migration to the new connector is straightforward but be aware of the following changes: * Package was changed from `com.hazelcast.jet.cdc` to `com.hazelcast.enterprise.jet.cdc` * Artifact names are now `hazelcast-enterprise-cdc-debezium`, `hazelcast-enterprise-cdc-mysql` and `hazelcast-enterprise-cdc-postgres` (instead of `hazelcast-jet-...`). - * Debezium replaced all `whitelist`s with `include list`s and `blacklist`s with `exclude list`s, which we have replicated in our naming; so, for example, use `setTableIncludeList` instead of `setTableWhitelist`. \ No newline at end of file + * Debezium replaced all `whitelist`s with `include list`s and `blacklist`s with `exclude list`s, which we have replicated in our naming; so, for example, use `setTableIncludeList` instead of `setTableWhitelist`. If you are not sure what are the new names the Debezium is using, you can check out their link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties[MySQL] and link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties[PostgreSQL] documentation. \ No newline at end of file From 9daaafb4dd753271a92b4eb91a923d32832c5201 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Mon, 9 Sep 2024 14:30:54 +0200 Subject: [PATCH 18/25] Update docs/modules/integrate/pages/cdc-connectors.adoc Co-authored-by: Oliver Howell --- docs/modules/integrate/pages/cdc-connectors.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index c5645b865..2a0535393 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -209,6 +209,6 @@ If user code has to be used, then the problem can be solved with the help of the Hazelcast {open-source-product-name} has a Debezium CDC connector, but it's based on an older version of Debezium. Migration to the new connector is straightforward but be aware of the following changes: - * Package was changed from `com.hazelcast.jet.cdc` to `com.hazelcast.enterprise.jet.cdc` + * You should use the `com.hazelcast.enterprise.jet.cdc` package instead of `com.hazelcast.jet.cdc`. * Artifact names are now `hazelcast-enterprise-cdc-debezium`, `hazelcast-enterprise-cdc-mysql` and `hazelcast-enterprise-cdc-postgres` (instead of `hazelcast-jet-...`). * Debezium replaced all `whitelist`s with `include list`s and `blacklist`s with `exclude list`s, which we have replicated in our naming; so, for example, use `setTableIncludeList` instead of `setTableWhitelist`. If you are not sure what are the new names the Debezium is using, you can check out their link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties[MySQL] and link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties[PostgreSQL] documentation. \ No newline at end of file From 8870d99098c52b61a8492f972a7e8f13fb8a3e8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Mon, 9 Sep 2024 14:31:05 +0200 Subject: [PATCH 19/25] Update docs/modules/integrate/pages/cdc-connectors.adoc Co-authored-by: Oliver Howell --- docs/modules/integrate/pages/cdc-connectors.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 2a0535393..83f219584 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -146,7 +146,7 @@ a strongly-typed manner. where the map entry's key is the key of `SourceRecord` in JSON format, and the value is the whole `SourceRecord`'s value in JSON format. |customMapping(RecordMappingFunction) -| Sets the output type to an arbitrary user type, `T`. Mapping from `SourceRecord` to `T` is done using provided function by the connector. +| Sets the output type to an arbitrary user type, `T`. Mapping from `SourceRecord` to `T` is done using the function provided by the connector. |withDefaultEngine() |Sets the preferred engine to the default (non-async) one. This engine is single-threaded, From 31448f4d458a5f4f168905e3da7e11829af877a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 12 Sep 2024 14:39:17 +0200 Subject: [PATCH 20/25] Few changes --- .../integrate/pages/cdc-connectors.adoc | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index c5645b865..1b3bba432 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -132,6 +132,11 @@ MySQL- and PostgreSQL-specific source builders contain methods for all major con Follow the provided xref:pipelines:cdc.adoc[] tutorial to see how CDC processes change events from a MySQL database. +[NOTE] +==== +Remember you have to have database up and running before CDC job is started, including e.g. additional CDC agents required (like DB2 does require). +==== + === Common source builder functions [cols="m,a"] |=== @@ -204,6 +209,50 @@ The key and value functions have certain limitations. They can be used to map on If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial]. ==== +== Data types + +Hazelcast relies on Debezium, which in turn uses Kafka Connect API such as `Struct` objects. Hazelcast makes conversion to `Map` and `POJO` s easier by providing abstractions such as `RecordPart`. Despite that, it's worth knowing how some database types can or will be mapped to Java types. + +[NOTE] +==== +Each database type has it's own database type-to-struct type mappings. For specific mappings of this type, please +check out Debezium documentation, for example: link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types[MySQL], link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types[PostgreSQL], link:https://debezium.io/documentation/reference/stable/connectors/db2.html#db2-data-types[DB2], etc.. +==== + +=== Common datatypes mapping. +[cols="m,a,a"] +|=== +|Struct type|Semantic type|Java type + +.3+|INT32 +|-|int/Integer +|io.debezium.time.Date|java.time.LocalDate / java.util.Date +|io.debezium.time.Time|java.time.Duration + +.5+|INT64 +|-|long/Long +|io.debezium.time.Timestamp|java.time.Instant +|io.debezium.time.MicroTimestamp|java.time.Instant +|io.debezium.time.MicroTime|java.time.Duration +|io.debezium.time.NanoTimestamp|java.time.Instant +|io.debezium.time.NanoTime|java.time.Duration + +|FLOAT32|-|float/Float +|FLOAT64|-|double/Double +|BOOLEAN|-|boolean/Boolean +|STRING|-|String + +The `RecordPart#value` field contains Debezium's message in a JSON format. This JSON format uses string as date representation, +instead of ints, which are standard in Debezium, but harder to deal with. + +[NOTE] +==== +We strongly recommend using `time.precision.mode=adaptive` (default). +Using `time.precision.mode=connect` uses `java.util.Date` to represent dates, time, etc. and is less precise. +==== + +|=== + == Migration Tips Hazelcast {open-source-product-name} has a Debezium CDC connector, but it's based on an older version of Debezium. From a7c06913ad03699b4c27f79bf3d8ecfc588e76ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 12 Sep 2024 14:41:29 +0200 Subject: [PATCH 21/25] Everything can be string after all --- .../integrate/pages/cdc-connectors.adoc | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 48df34990..9740208ec 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -226,20 +226,20 @@ check out Debezium documentation, for example: link:https://debezium.io/document .3+|INT32 |-|int/Integer -|io.debezium.time.Date|java.time.LocalDate / java.util.Date -|io.debezium.time.Time|java.time.Duration +|io.debezium.time.Date|java.time.LocalDate / java.util.Date / String +|io.debezium.time.Time|java.time.Duration / String .5+|INT64 |-|long/Long -|io.debezium.time.Timestamp|java.time.Instant -|io.debezium.time.MicroTimestamp|java.time.Instant -|io.debezium.time.MicroTime|java.time.Duration -|io.debezium.time.NanoTimestamp|java.time.Instant -|io.debezium.time.NanoTime|java.time.Duration - -|FLOAT32|-|float/Float -|FLOAT64|-|double/Double -|BOOLEAN|-|boolean/Boolean +|io.debezium.time.Timestamp|java.time.Instant / String +|io.debezium.time.MicroTimestamp|java.time.Instant / String +|io.debezium.time.MicroTime|java.time.Duration / String +|io.debezium.time.NanoTimestamp|java.time.Instant / String +|io.debezium.time.NanoTime|java.time.Duration / String + +|FLOAT32|-|float/Float / String +|FLOAT64|-|double/Double / String +|BOOLEAN|-|boolean/Boolean / String |STRING|-|String The `RecordPart#value` field contains Debezium's message in a JSON format. This JSON format uses string as date representation, From 5c1f26975fd378b41a50945b8ab9199c69f91f95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Thu, 12 Sep 2024 14:44:01 +0200 Subject: [PATCH 22/25] Added formatting --- docs/modules/integrate/pages/cdc-connectors.adoc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 9740208ec..d6eafe093 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -226,16 +226,16 @@ check out Debezium documentation, for example: link:https://debezium.io/document .3+|INT32 |-|int/Integer -|io.debezium.time.Date|java.time.LocalDate / java.util.Date / String -|io.debezium.time.Time|java.time.Duration / String +|io.debezium.time.Date|java.time.LocalDate / java.util.Date / String `yyyy-MM-dd` +|io.debezium.time.Time|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS` .5+|INT64 |-|long/Long -|io.debezium.time.Timestamp|java.time.Instant / String -|io.debezium.time.MicroTimestamp|java.time.Instant / String -|io.debezium.time.MicroTime|java.time.Duration / String -|io.debezium.time.NanoTimestamp|java.time.Instant / String -|io.debezium.time.NanoTime|java.time.Duration / String +|io.debezium.time.Timestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS` +|io.debezium.time.MicroTimestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS` +|io.debezium.time.MicroTime|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS` +|io.debezium.time.NanoTimestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS` +|io.debezium.time.NanoTime|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS` |FLOAT32|-|float/Float / String |FLOAT64|-|double/Double / String From c2afb098a793a3869b7fa037825f1789bf2beab9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Mon, 7 Oct 2024 18:37:33 +0200 Subject: [PATCH 23/25] Apply suggestions from code review Co-authored-by: Oliver Howell --- docs/modules/integrate/pages/cdc-connectors.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index d6eafe093..0f71ab95c 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -64,10 +64,10 @@ The Java API supports the following types of CDC source: a generic source for all databases supported by Debezium * link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]: a specific, first class Jet CDC source for MySQL databases (also based - on Debezium, but with the additional benefits provided by Hazelcast + on Debezium, but with the additional benefits provided by Hazelcast) * link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]: a specific, first class CDC source for PostgreSQL databases (also based -on Debezium, but with the additional benefits provided by Hazelcast +on Debezium, but with the additional benefits provided by Hazelcast) To set up a CDC data streaming source, define it using the following configuration: From f85e53cce7d1c5cd36a278d3b7041f6543e864ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Tue, 8 Oct 2024 13:12:41 +0200 Subject: [PATCH 24/25] Apply suggestions from code review Co-authored-by: Oliver Howell --- .../integrate/pages/cdc-connectors.adoc | 19 ++++++++++--------- .../pages/legacy-cdc-connectors.adoc | 19 +++++++++++-------- docs/modules/pipelines/pages/cdc.adoc | 4 ++-- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 0f71ab95c..1a9c89bd8 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -1,6 +1,8 @@ = CDC Connector [.enterprise]*Enterprise* +NOTE: This page refers to Hazelcast's {enterprise-product-name} CDC connectors. For more information on {open-source-product-name} CDC connectors, see xref:integrate:legacy-cdc-connectors.adoc[]. + Change Data Capture (CDC) refers to the process of observing changes made to a database and extracting them in a form usable by other systems, for the purposes of replication, analysis and many more. @@ -15,7 +17,7 @@ which can handle CDC events from link:https://debezium.io/documentation/referenc However, we're also striving to make CDC sources first class citizens in Hazelcast, as we have done already for MySQL and PostgreSQL. -== Installing the Connector +== Install the CDC connector This connector is included in the full distribution of Hazelcast {enterprise-product-name}. @@ -134,7 +136,7 @@ Follow the provided xref:pipelines:cdc.adoc[] tutorial to see how CDC processes [NOTE] ==== -Remember you have to have database up and running before CDC job is started, including e.g. additional CDC agents required (like DB2 does require). +Remember to ensure your database is up and running before a CDC job is started, including any additional required CDC agents (as required by DB2), for example. ==== === Common source builder functions @@ -158,7 +160,7 @@ where the map entry's key is the key of `SourceRecord` in JSON format, and the v but also more widely used and tested. Use this engine for the most stable results (for example, no async offset restore). For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. |withAsyncEngine() -|Sets the preferred engine to the async one. This engine is multithreaded (if supported by the connector), but you must be aware of the async nature; for example, offset restore may occur asynchronously after the restart is done, leading to sometimes confusing results. +|Sets the preferred engine to the async one. This engine is multithreaded (if supported by the connector), but be aware of the async nature; for example, offset restore may occur asynchronously after the restart is done, leading to sometimes confusing results. |setProperty(String, String) |Sets connector property to given value. There are multiple overloads, allowing to @@ -166,7 +168,7 @@ set the value to `long`, `String` or `boolean`. |=== -=== Fault Tolerance +=== Fault tolerance CDC sources offer at least-once processing guarantees. The source periodically saves the database write ahead log offset for which it had @@ -211,12 +213,11 @@ If user code has to be used, then the problem can be solved with the help of the == Data types -Hazelcast relies on Debezium, which in turn uses Kafka Connect API such as `Struct` objects. Hazelcast makes conversion to `Map` and `POJO` s easier by providing abstractions such as `RecordPart`. Despite that, it's worth knowing how some database types can or will be mapped to Java types. +Hazelcast relies on Debezium, which in turn uses the Kafka Connect API, including `Struct` objects for example. Hazelcast makes conversion to `Map` and `POJO`s easier by providing abstractions such as `RecordPart`. Despite this, it's worth knowing how some database types can or will be mapped to Java types. [NOTE] ==== -Each database type has it's own database type-to-struct type mappings. For specific mappings of this type, please -check out Debezium documentation, for example: link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types[MySQL], link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types[PostgreSQL], link:https://debezium.io/documentation/reference/stable/connectors/db2.html#db2-data-types[DB2], etc.. +Each database type has its own database type-to-struct type mappings. For specific mappings of this type, see the Debezium documentation, for example: link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types[MySQL], link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types[PostgreSQL], link:https://debezium.io/documentation/reference/stable/connectors/db2.html#db2-data-types[DB2], etc.. ==== === Common datatypes mapping. @@ -253,11 +254,11 @@ Using `time.precision.mode=connect` uses `java.util.Date` to represent dates, ti |=== -== Migration Tips +== Migration tips Hazelcast {open-source-product-name} has a Debezium CDC connector, but it's based on an older version of Debezium. Migration to the new connector is straightforward but be aware of the following changes: * You should use the `com.hazelcast.enterprise.jet.cdc` package instead of `com.hazelcast.jet.cdc`. * Artifact names are now `hazelcast-enterprise-cdc-debezium`, `hazelcast-enterprise-cdc-mysql` and `hazelcast-enterprise-cdc-postgres` (instead of `hazelcast-jet-...`). - * Debezium replaced all `whitelist`s with `include list`s and `blacklist`s with `exclude list`s, which we have replicated in our naming; so, for example, use `setTableIncludeList` instead of `setTableWhitelist`. If you are not sure what are the new names the Debezium is using, you can check out their link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties[MySQL] and link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties[PostgreSQL] documentation. \ No newline at end of file + * Debezium renamed certain terms, which we have also replicated in our code. For example, `include list` replaces `whitelist`, `exclude list` replaces `blacklist`. This means, for example, you need to use `setTableIncludeList` instead of `setTableWhitelist`. For more detail on new Debezium names, see their link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties[MySQL] and link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties[PostgreSQL] documentation. \ No newline at end of file diff --git a/docs/modules/integrate/pages/legacy-cdc-connectors.adoc b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc index 6a27f4d6f..d97b75533 100644 --- a/docs/modules/integrate/pages/legacy-cdc-connectors.adoc +++ b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc @@ -1,5 +1,8 @@ = Legacy CDC Connector + +NOTE: This page refers to Hazelcast's {open-source-product-name} CDC connectors, also known as legacy CDC connectors. For more information on {enterprise-product-name} CDC connectors, see xref:integrate:cdc-connectors.adoc[]. + Change Data Capture (CDC) refers to the process of observing changes made to a database and extracting them in a form usable by other systems, for the purposes of replication, analysis and many more. @@ -8,17 +11,17 @@ Change Data Capture is especially important to Hazelcast, because it allows for the _streaming of changes from databases_, which can be efficiently processed by the Jet engine. -Implementation of CDC in Hazelcast {open-source-product-name} is based on +The implementation of CDC in Hazelcast {open-source-product-name} is based on link:https://debezium.io/[Debezium, window=_blank]. Hazelcast offers a generic Debezium source -which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium, window=_blank]. +that can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium, window=_blank]. However, we're also striving to make CDC sources first class citizens in Hazelcast, as we have done already for MySQL and PostgreSQL. -== Installing the Connector +== Install the CDC connector This connector is included in the full distribution of Hazelcast {open-source-product-name}. -== CDC as a Source +== CDC as a source We have the following types of CDC sources: @@ -26,10 +29,10 @@ We have the following types of CDC sources: a generic source for all databases supported by Debezium * link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]: a specific, first class Jet CDC source for MySQL databases (also based - on Debezium, but with the additional benefits provided by Hazelcast + on Debezium, but with the additional benefits provided by Hazelcast) * link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]: a specific, first class CDC source for PostgreSQL databases (also based - on Debezium, but with the additional benefits provided by Hazelcast + on Debezium, but with the additional benefits provided by Hazelcast) To set up a streaming source of CDC data, define it using the following configuration: @@ -50,9 +53,9 @@ pipeline.readFrom( .writeTo(Sinks.logger()); ---- -For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial]. +For an example of how to use CDC data, see the xref:pipelines:cdc.adoc[] tutorial. -=== Fault Tolerance +=== Fault tolerance CDC sources offer _at least once_ processing guarantees. The source periodically saves the database write ahead log offset for which it has diff --git a/docs/modules/pipelines/pages/cdc.adoc b/docs/modules/pipelines/pages/cdc.adoc index 2c165e2d3..3645ca029 100644 --- a/docs/modules/pipelines/pages/cdc.adoc +++ b/docs/modules/pipelines/pages/cdc.adoc @@ -153,7 +153,7 @@ mysql> SELECT * FROM customers; If you already have Hazelcast and you skipped the above steps, make sure to follow from here on. -. Make sure the MySQL CDC plugin is in the `lib/` directory. You must manually download the MySQL CDC plugin from Hazelcast's Maven link:https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-enterprise-cdc-mysql/{full-version}/hazelcast-enterprise-cdc-mysql-{full-version}-jar-with-dependencies.jar[repository, window=_blank] and then copy it to the `lib/` directory. +. Make sure the MySQL CDC plugin is in the `lib/` directory. You must manually download the MySQL CDC plugin from link:https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-enterprise-cdc-mysql/{full-version}/hazelcast-enterprise-cdc-mysql-{full-version}-jar-with-dependencies.jar[Hazelcast's Maven repository, window=_blank] and then copy it to the `lib/` directory. + [source,bash] ---- @@ -166,7 +166,7 @@ You should see the following jars: * hazelcast-enterprise-cdc-mysql-{full-version}-jar-with-dependencies.jar * hazelcast-enterprise-cdc-postgres-{full-version}-jar-with-dependencies.jar + -WARNING: If you have Hazelcast {enterprise-product-name} Edition, you need to manually download the MySQL CDC plugin from Hazelcast's Maven https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-jet-cdc-mysql/{full-version}/hazelcast-jet-cdc-mysql-{full-version}-jar-with-dependencies.jar[repository] and then copy it to the `lib/` directory. +WARNING: If you have Hazelcast {enterprise-product-name}, you need to manually download the MySQL CDC plugin from https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-jet-cdc-mysql/{full-version}/hazelcast-jet-cdc-mysql-{full-version}-jar-with-dependencies.jar[Hazelcast's Maven repository] and then copy it to the `lib/` directory. . Start Hazelcast. + From 2cfdd2d78e0d93903e8b104e58e7bf13758f9d71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Gaw=C4=99da?= Date: Tue, 8 Oct 2024 13:13:16 +0200 Subject: [PATCH 25/25] Apply suggestions from code review Co-authored-by: Oliver Howell --- docs/modules/integrate/pages/cdc-connectors.adoc | 4 ++-- docs/modules/integrate/pages/legacy-cdc-connectors.adoc | 2 +- docs/modules/pipelines/pages/cdc-database-setup.adoc | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index 1a9c89bd8..4f42298f4 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -58,7 +58,7 @@ PostgreSQL-specific connector: ---- -== CDC as a Source +== CDC as a source The Java API supports the following types of CDC source: @@ -244,7 +244,7 @@ Each database type has its own database type-to-struct type mappings. For specif |STRING|-|String The `RecordPart#value` field contains Debezium's message in a JSON format. This JSON format uses string as date representation, -instead of ints, which are standard in Debezium, but harder to deal with. +instead of ints, which are standard in Debezium but harder to work with. [NOTE] ==== diff --git a/docs/modules/integrate/pages/legacy-cdc-connectors.adoc b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc index d97b75533..662789fcf 100644 --- a/docs/modules/integrate/pages/legacy-cdc-connectors.adoc +++ b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc @@ -68,7 +68,7 @@ depending on the DB configuration, can be relatively short, so if the CDC source has to replay data for a long period of inactivity, then there can be data loss. With careful management, the _at least once_ guarantee can be practically implemented. -== CDC as a Sink +== CDC as a sink Change data capture is a source-side functionality in Jet, but Hazelcast also offers specialized sinks that simplify applying CDC events to a map, which gives you the ability to reconstruct the contents of the original database table. The sinks expect to receive `ChangeRecord` diff --git a/docs/modules/pipelines/pages/cdc-database-setup.adoc b/docs/modules/pipelines/pages/cdc-database-setup.adoc index af1a0a8b2..69e0c75e0 100644 --- a/docs/modules/pipelines/pages/cdc-database-setup.adoc +++ b/docs/modules/pipelines/pages/cdc-database-setup.adoc @@ -255,7 +255,7 @@ In our tests we didn't manage to make it output much more than 20,000 records/second, so on a powerful server running the database it shouldn't affect normal operations too severely. -==== Failure Tolerance +==== Failure tolerance PostgreSQL's failure tolerance associated with replication slots is somewhat lacking in certain aspects. The CDC connector can quite nicely