Skip to content

Commit

Permalink
Add Debezium 2.x docs (#1149)
Browse files Browse the repository at this point in the history
Co-authored-by: rebekah-lawrence <[email protected]>
Co-authored-by: Oliver Howell <[email protected]>
(cherry picked from commit 50480ef)
  • Loading branch information
TomaszGaweda authored and GitHub Actions Bot committed Oct 8, 2024
1 parent 59a130e commit 61e85b5
Show file tree
Hide file tree
Showing 8 changed files with 809 additions and 273 deletions.
1 change: 1 addition & 0 deletions docs/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ include::tiered-storage:partial$nav.adoc[]
* xref:integrate:database-connectors.adoc[Database & CDC Connectors]
** xref:integrate:jdbc-connector.adoc[]
** xref:integrate:cdc-connectors.adoc[]
** xref:integrate:legacy-cdc-connectors.adoc[]
** xref:integrate:elasticsearch-connector.adoc[]
** xref:integrate:mongodb-connector.adoc[]
* File Connectors
Expand Down
234 changes: 200 additions & 34 deletions docs/modules/integrate/pages/cdc-connectors.adoc
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
= CDC Connector
[.enterprise]*Enterprise*

NOTE: This page refers to Hazelcast's {enterprise-product-name} CDC connectors. For more information on {open-source-product-name} CDC connectors, see xref:integrate:legacy-cdc-connectors.adoc[].

Change Data Capture (CDC) refers to the process of observing changes
made to a database and extracting them in a form usable by other
Expand All @@ -8,52 +11,164 @@ Change Data Capture is especially important to Hazelcast, because it allows
for the _streaming of changes from databases_, which can be efficiently
processed by the Jet engine.

Implementation of CDC in Hazelcast is based on
link:https://debezium.io/[Debezium]. Hazelcast offers a generic Debezium source
which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium],
but we're also striving to make CDC sources first class citizens in Hazelcast.
The ones for MySQL and PostgreSQL already are.
The implementation of CDC in Hazelcast {enterprise-product-name} is based on
link:https://debezium.io/[Debezium 2.x, window=_blank]. Hazelcast offers a generic Debezium source
which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium, window=_blank],
However, we're also striving to make CDC sources first class citizens in Hazelcast,
as we have done already for MySQL and PostgreSQL.

== Install the CDC connector

This connector is included in the full distribution of Hazelcast {enterprise-product-name}.

=== Maven
To use this connector in a Maven project, add the following entries to the `<dependency>` section of your `pom.xml` file:

Generic connector:

[source,xml]
----
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-debezium</artifactId>
<version>{full-version}</version>
</dependency>
----

MySQL-specific connector:

== Installing the Connector
[source,xml]
----
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-mysql</artifactId>
<version>{full-version}</version>
</dependency>
----
NOTE: Due to licensing, MySQL connector does not include the MySQL driver as a dependency. You have to manually add the `com.mysql:mysql-connector-j` dependency to the classpath.

This connector is included in the full and slim distributions of Hazelcast.
PostgreSQL-specific connector:

== CDC as a Source
[source,xml]
----
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-postgres</artifactId>
<version>{full-version}</version>
</dependency>
----

We have the following types of CDC sources:
== CDC as a source

* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources]:
generic source for all databases supported by Debezium
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources]:
specific, first class Jet CDC source for MySQL databases (also based
on Debezium, but benefiting the full range of convenience Jet can
additionally provide)
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources]:
specific, first class CDC source for PostgreSQL databases (also based
on Debezium, but benefiting the full range of convenience Hazelcast can
additionally provide)
The Java API supports the following types of CDC source:

For the setting up a streaming source of CDC data is just the matter of pointing it at the right database via configuration:
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources, window=_blank]:
a generic source for all databases supported by Debezium
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]:
a specific, first class Jet CDC source for MySQL databases (also based
on Debezium, but with the additional benefits provided by Hazelcast)
* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]:
a specific, first class CDC source for PostgreSQL databases (also based
on Debezium, but with the additional benefits provided by Hazelcast)

```java
To set up a CDC data streaming source, define it using the following configuration:

[tabs]
====
MySQL::
+
--
[source,java]
----
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
MySqlCdcSources.mysql("customers")
.setDatabaseAddress("127.0.0.1")
.setDatabasePort(3306)
.setDatabaseUser("debezium")
.setDatabasePassword("dbz")
.setDatabaseAddress("127.0.0.1", 3306)
.setDatabaseCredentials("debezium", "dbz")
.setClusterName("dbserver1")
.setDatabaseWhitelist("inventory")
.setTableWhitelist("inventory.customers")
.setDatabaseIncludeList("inventory")
.setTableIncludeList("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
```
----
--
PostgreSQL::
+
--
[source,java]
----
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
PostgresCdcSources.postgres("customers")
.setDatabaseAddress("127.0.0.1", 5432)
.setDatabaseCredentials("debezium", "dbz")
.setClusterName("dbserver1")
.setDatabaseIncludeList("inventory")
.setTableIncludeList("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
----
--
MongoDB::
+
--
[source,java]
----
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
DebeziumCdcSources.debezium("customers", MongoDbConnector.class)
.setProperty("mongodb.connection.string", "mongodb://localhost:27017")
.setDatabaseIncludeList("inventory")
.setProperty("collection.include.list", "customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
----
--
====

MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the link:https://debezium.io/documentation/reference/stable/index.html[Debezium, window=_blank] documentation for the information about required or mutually exclusive fields.

Follow the provided xref:pipelines:cdc.adoc[] tutorial to see how CDC processes change events from a MySQL database.

[NOTE]
====
Remember to ensure your database is up and running before a CDC job is started, including any additional required CDC agents (as required by DB2), for example.
====

=== Common source builder functions
[cols="m,a"]
|===
|Method name|Description

|changeRecord()
| Sets output type to `ChangeRecord` - a wrapper, which provides most of the fields in
a strongly-typed manner.

| json()
| Sets output type to `JSON` - in the result stage, the type will be set to `Map<String, String>`,
where the map entry's key is the key of `SourceRecord` in JSON format, and the value is the whole `SourceRecord`'s value in JSON format.

|customMapping(RecordMappingFunction<T>)
| Sets the output type to an arbitrary user type, `T`. Mapping from `SourceRecord` to `T` is done using the function provided by the connector.

|withDefaultEngine()
|Sets the preferred engine to the default (non-async) one. This engine is single-threaded,
but also more widely used and tested. Use this engine for the most stable results (for example, no async offset restore). For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only.

|withAsyncEngine()
|Sets the preferred engine to the async one. This engine is multithreaded (if supported by the connector), but be aware of the async nature; for example, offset restore may occur asynchronously after the restart is done, leading to sometimes confusing results.

For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial].
|setProperty(String, String)
|Sets connector property to given value. There are multiple overloads, allowing to
set the value to `long`, `String` or `boolean`.

=== Fault Tolerance
|===

=== Fault tolerance

CDC sources offer at least-once processing guarantees. The source
periodically saves the database write ahead log offset for which it had
Expand All @@ -79,20 +194,71 @@ For example, a sink mapping CDC data to a `Customer` class and
maintaining a map view of latest known email addresses per customer
(identified by ID) would look like this:

```java
[source,java]
----
Pipeline p = Pipeline.create();
p.readFrom(source)
.withoutTimestamps()
.writeTo(CdcSinks.map("customers",
r -> r.key().toMap().get("id"),
r -> r.value().toObject(Customer.class).email));
```
----

[NOTE]
====
The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly.
If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial].
====

== Data types

Hazelcast relies on Debezium, which in turn uses the Kafka Connect API, including `Struct` objects for example. Hazelcast makes conversion to `Map` and `POJO`s easier by providing abstractions such as `RecordPart`. Despite this, it's worth knowing how some database types can or will be mapped to Java types.

[NOTE]
====
Each database type has its own database type-to-struct type mappings. For specific mappings of this type, see the Debezium documentation, for example: link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types[MySQL], link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types[PostgreSQL], link:https://debezium.io/documentation/reference/stable/connectors/db2.html#db2-data-types[DB2], etc..
====

=== Common datatypes mapping.
[cols="m,a,a"]
|===
|Struct type|Semantic type|Java type

.3+|INT32
|-|int/Integer
|io.debezium.time.Date|java.time.LocalDate / java.util.Date / String `yyyy-MM-dd`
|io.debezium.time.Time|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS`

.5+|INT64
|-|long/Long
|io.debezium.time.Timestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS`
|io.debezium.time.MicroTimestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS`
|io.debezium.time.MicroTime|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS`
|io.debezium.time.NanoTimestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS`
|io.debezium.time.NanoTime|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS`

|FLOAT32|-|float/Float / String
|FLOAT64|-|double/Double / String
|BOOLEAN|-|boolean/Boolean / String
|STRING|-|String

The `RecordPart#value` field contains Debezium's message in a JSON format. This JSON format uses string as date representation,
instead of ints, which are standard in Debezium but harder to work with.

[NOTE]
====
We strongly recommend using `time.precision.mode=adaptive` (default).
Using `time.precision.mode=connect` uses `java.util.Date` to represent dates, time, etc. and is less precise.
====

|===

== Migration tips

Hazelcast {open-source-product-name} has a Debezium CDC connector, but it's based on an older version of Debezium.
Migration to the new connector is straightforward but be aware of the following changes:

Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario.
====
* You should use the `com.hazelcast.enterprise.jet.cdc` package instead of `com.hazelcast.jet.cdc`.
* Artifact names are now `hazelcast-enterprise-cdc-debezium`, `hazelcast-enterprise-cdc-mysql` and `hazelcast-enterprise-cdc-postgres` (instead of `hazelcast-jet-...`).
* Debezium renamed certain terms, which we have also replicated in our code. For example, `include list` replaces `whitelist`, `exclude list` replaces `blacklist`. This means, for example, you need to use `setTableIncludeList` instead of `setTableWhitelist`. For more detail on new Debezium names, see their link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties[MySQL] and link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties[PostgreSQL] documentation.
43 changes: 31 additions & 12 deletions docs/modules/integrate/pages/connectors.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,36 @@ The Jet API supports more connectors than SQL.
|batch
|N/A

|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium]
|xref:integrate:legacy-cdc-connectors.adoc[DebeziumCdcSources.debezium] (Legacy)
|hazelcast-jet-cdc-debezium
|streaming
|at-least-once

|xref:integrate:legacy-cdc-connectors.adoc[MySqlCdcSources.mysql] (Legacy)
|hazelcast-jet-cdc-mysql
|streaming
|exactly-once

|xref:integrate:legacy-cdc-connectors.adoc[PostgresCdcSources.postgres] (Legacy)
|hazelcast-jet-cdc-postgres
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium] ([.enterprise]*Enterprise*)
|hazelcast-enterprise-cdc-debezium
|streaming
|at-least-once

|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql]
|hazelcast-enterprise-cdc-mysql
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres]
|hazelcast-enterprise-cdc-postgres
|streaming
|exactly-once

|xref:integrate:elasticsearch-connector.adoc[ElasticSources.elastic]
|hazelcast-jet-elasticsearch-7
|batch
Expand Down Expand Up @@ -150,16 +175,6 @@ The Jet API supports more connectors than SQL.
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql]
|hazelcast-jet-cdc-mysql
|streaming
|exactly-once

|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres]
|hazelcast-jet-cdc-postgres
|streaming
|exactly-once

|xref:integrate:pulsar-connector.adoc[PulsarSources.pulsarConsumer]
|hazelcast-jet-contrib-pulsar
|streaming
Expand Down Expand Up @@ -270,7 +285,11 @@ The Jet API supports more connectors than SQL.
|N/A

|xref:integrate:cdc-connectors.adoc[CdcSinks.map]
|hazelcast-jet-cdc-debezium
|hazelcast-jet-cdc-debezium (legacy, {open-source-product-name})

or

hazelcast-enterprise-cdc-debezium ({enterprise-product-name})
|streaming
|at-least-once

Expand Down
Loading

0 comments on commit 61e85b5

Please sign in to comment.