diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index 758fba3e1..12ba9ae4b 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -194,6 +194,7 @@ include::wan:partial$nav.adoc[] * xref:integrate:database-connectors.adoc[Database & CDC Connectors] ** xref:integrate:jdbc-connector.adoc[] ** xref:integrate:cdc-connectors.adoc[] +** xref:integrate:legacy-cdc-connectors.adoc[] ** xref:integrate:elasticsearch-connector.adoc[] ** xref:integrate:mongodb-connector.adoc[] * File Connectors diff --git a/docs/modules/integrate/pages/cdc-connectors.adoc b/docs/modules/integrate/pages/cdc-connectors.adoc index dce0f675b..4f42298f4 100644 --- a/docs/modules/integrate/pages/cdc-connectors.adoc +++ b/docs/modules/integrate/pages/cdc-connectors.adoc @@ -1,4 +1,7 @@ = CDC Connector +[.enterprise]*Enterprise* + +NOTE: This page refers to Hazelcast's {enterprise-product-name} CDC connectors. For more information on {open-source-product-name} CDC connectors, see xref:integrate:legacy-cdc-connectors.adoc[]. Change Data Capture (CDC) refers to the process of observing changes made to a database and extracting them in a form usable by other @@ -8,52 +11,164 @@ Change Data Capture is especially important to Hazelcast, because it allows for the _streaming of changes from databases_, which can be efficiently processed by the Jet engine. -Implementation of CDC in Hazelcast is based on -link:https://debezium.io/[Debezium]. Hazelcast offers a generic Debezium source -which can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium], -but we're also striving to make CDC sources first class citizens in Hazelcast. -The ones for MySQL and PostgreSQL already are. +The implementation of CDC in Hazelcast {enterprise-product-name} is based on +link:https://debezium.io/[Debezium 2.x, window=_blank]. Hazelcast offers a generic Debezium source +which can handle CDC events from link:https://debezium.io/documentation/reference/2.7/connectors/index.html[any database supported by Debezium, window=_blank], +However, we're also striving to make CDC sources first class citizens in Hazelcast, +as we have done already for MySQL and PostgreSQL. + +== Install the CDC connector + +This connector is included in the full distribution of Hazelcast {enterprise-product-name}. + +=== Maven +To use this connector in a Maven project, add the following entries to the `` section of your `pom.xml` file: + +Generic connector: + +[source,xml] +---- + + com.hazelcast.jet + hazelcast-enterprise-cdc-debezium + {full-version} + +---- + +MySQL-specific connector: -== Installing the Connector +[source,xml] +---- + + com.hazelcast.jet + hazelcast-enterprise-cdc-mysql + {full-version} + +---- +NOTE: Due to licensing, MySQL connector does not include the MySQL driver as a dependency. You have to manually add the `com.mysql:mysql-connector-j` dependency to the classpath. -This connector is included in the full and slim distributions of Hazelcast. +PostgreSQL-specific connector: -== CDC as a Source +[source,xml] +---- + + com.hazelcast.jet + hazelcast-enterprise-cdc-postgres + {full-version} + +---- -We have the following types of CDC sources: +== CDC as a source -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources]: - generic source for all databases supported by Debezium -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources]: - specific, first class Jet CDC source for MySQL databases (also based - on Debezium, but benefiting the full range of convenience Jet can - additionally provide) -* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources]: - specific, first class CDC source for PostgreSQL databases (also based - on Debezium, but benefiting the full range of convenience Hazelcast can - additionally provide) +The Java API supports the following types of CDC source: -For the setting up a streaming source of CDC data is just the matter of pointing it at the right database via configuration: +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources, window=_blank]: + a generic source for all databases supported by Debezium +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]: + a specific, first class Jet CDC source for MySQL databases (also based + on Debezium, but with the additional benefits provided by Hazelcast) +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/enterprise/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]: + a specific, first class CDC source for PostgreSQL databases (also based +on Debezium, but with the additional benefits provided by Hazelcast) -```java +To set up a CDC data streaming source, define it using the following configuration: + +[tabs] +==== +MySQL:: ++ +-- +[source,java] +---- Pipeline pipeline = Pipeline.create(); pipeline.readFrom( MySqlCdcSources.mysql("customers") - .setDatabaseAddress("127.0.0.1") - .setDatabasePort(3306) - .setDatabaseUser("debezium") - .setDatabasePassword("dbz") + .setDatabaseAddress("127.0.0.1", 3306) + .setDatabaseCredentials("debezium", "dbz") .setClusterName("dbserver1") - .setDatabaseWhitelist("inventory") - .setTableWhitelist("inventory.customers") + .setDatabaseIncludeList("inventory") + .setTableIncludeList("inventory.customers") .build()) .withNativeTimestamps(0) .writeTo(Sinks.logger()); -``` +---- +-- +PostgreSQL:: ++ +-- +[source,java] +---- +Pipeline pipeline = Pipeline.create(); +pipeline.readFrom( + PostgresCdcSources.postgres("customers") + .setDatabaseAddress("127.0.0.1", 5432) + .setDatabaseCredentials("debezium", "dbz") + .setClusterName("dbserver1") + .setDatabaseIncludeList("inventory") + .setTableIncludeList("inventory.customers") + .build()) + .withNativeTimestamps(0) + .writeTo(Sinks.logger()); +---- +-- +MongoDB:: ++ +-- +[source,java] +---- +Pipeline pipeline = Pipeline.create(); +pipeline.readFrom( + DebeziumCdcSources.debezium("customers", MongoDbConnector.class) + .setProperty("mongodb.connection.string", "mongodb://localhost:27017") + .setDatabaseIncludeList("inventory") + .setProperty("collection.include.list", "customers") + .build()) + .withNativeTimestamps(0) + .writeTo(Sinks.logger()); +---- +-- + +==== + +MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the link:https://debezium.io/documentation/reference/stable/index.html[Debezium, window=_blank] documentation for the information about required or mutually exclusive fields. + +Follow the provided xref:pipelines:cdc.adoc[] tutorial to see how CDC processes change events from a MySQL database. + +[NOTE] +==== +Remember to ensure your database is up and running before a CDC job is started, including any additional required CDC agents (as required by DB2), for example. +==== + +=== Common source builder functions +[cols="m,a"] +|=== +|Method name|Description + +|changeRecord() +| Sets output type to `ChangeRecord` - a wrapper, which provides most of the fields in +a strongly-typed manner. + +| json() +| Sets output type to `JSON` - in the result stage, the type will be set to `Map`, +where the map entry's key is the key of `SourceRecord` in JSON format, and the value is the whole `SourceRecord`'s value in JSON format. + +|customMapping(RecordMappingFunction) +| Sets the output type to an arbitrary user type, `T`. Mapping from `SourceRecord` to `T` is done using the function provided by the connector. + +|withDefaultEngine() +|Sets the preferred engine to the default (non-async) one. This engine is single-threaded, +but also more widely used and tested. Use this engine for the most stable results (for example, no async offset restore). For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. + +|withAsyncEngine() +|Sets the preferred engine to the async one. This engine is multithreaded (if supported by the connector), but be aware of the async nature; for example, offset restore may occur asynchronously after the restart is done, leading to sometimes confusing results. -For an example of how to use CDC data see xref:pipelines:cdc.adoc[our tutorial]. +|setProperty(String, String) +|Sets connector property to given value. There are multiple overloads, allowing to +set the value to `long`, `String` or `boolean`. -=== Fault Tolerance +|=== + +=== Fault tolerance CDC sources offer at least-once processing guarantees. The source periodically saves the database write ahead log offset for which it had @@ -79,20 +194,71 @@ For example, a sink mapping CDC data to a `Customer` class and maintaining a map view of latest known email addresses per customer (identified by ID) would look like this: -```java +[source,java] +---- Pipeline p = Pipeline.create(); p.readFrom(source) .withoutTimestamps() .writeTo(CdcSinks.map("customers", r -> r.key().toMap().get("id"), r -> r.value().toObject(Customer.class).email)); -``` +---- [NOTE] ==== The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly. If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial]. +==== + +== Data types + +Hazelcast relies on Debezium, which in turn uses the Kafka Connect API, including `Struct` objects for example. Hazelcast makes conversion to `Map` and `POJO`s easier by providing abstractions such as `RecordPart`. Despite this, it's worth knowing how some database types can or will be mapped to Java types. + +[NOTE] +==== +Each database type has its own database type-to-struct type mappings. For specific mappings of this type, see the Debezium documentation, for example: link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types[MySQL], link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types[PostgreSQL], link:https://debezium.io/documentation/reference/stable/connectors/db2.html#db2-data-types[DB2], etc.. +==== + +=== Common datatypes mapping. +[cols="m,a,a"] +|=== +|Struct type|Semantic type|Java type + +.3+|INT32 +|-|int/Integer +|io.debezium.time.Date|java.time.LocalDate / java.util.Date / String `yyyy-MM-dd` +|io.debezium.time.Time|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS` + +.5+|INT64 +|-|long/Long +|io.debezium.time.Timestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS` +|io.debezium.time.MicroTimestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS` +|io.debezium.time.MicroTime|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS` +|io.debezium.time.NanoTimestamp|java.time.Instant / String `yyyy-MM-dd HH:mm:ss.SSS` +|io.debezium.time.NanoTime|java.time.Duration / String ISO-8601 `PnDTnHnMn.nS` + +|FLOAT32|-|float/Float / String +|FLOAT64|-|double/Double / String +|BOOLEAN|-|boolean/Boolean / String +|STRING|-|String + +The `RecordPart#value` field contains Debezium's message in a JSON format. This JSON format uses string as date representation, +instead of ints, which are standard in Debezium but harder to work with. + +[NOTE] +==== +We strongly recommend using `time.precision.mode=adaptive` (default). +Using `time.precision.mode=connect` uses `java.util.Date` to represent dates, time, etc. and is less precise. +==== + +|=== + +== Migration tips + +Hazelcast {open-source-product-name} has a Debezium CDC connector, but it's based on an older version of Debezium. +Migration to the new connector is straightforward but be aware of the following changes: -Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario. -==== \ No newline at end of file + * You should use the `com.hazelcast.enterprise.jet.cdc` package instead of `com.hazelcast.jet.cdc`. + * Artifact names are now `hazelcast-enterprise-cdc-debezium`, `hazelcast-enterprise-cdc-mysql` and `hazelcast-enterprise-cdc-postgres` (instead of `hazelcast-jet-...`). + * Debezium renamed certain terms, which we have also replicated in our code. For example, `include list` replaces `whitelist`, `exclude list` replaces `blacklist`. This means, for example, you need to use `setTableIncludeList` instead of `setTableWhitelist`. For more detail on new Debezium names, see their link:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties[MySQL] and link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties[PostgreSQL] documentation. \ No newline at end of file diff --git a/docs/modules/integrate/pages/connectors.adoc b/docs/modules/integrate/pages/connectors.adoc index 28ffb43d9..875ef8b1e 100644 --- a/docs/modules/integrate/pages/connectors.adoc +++ b/docs/modules/integrate/pages/connectors.adoc @@ -115,11 +115,36 @@ The Jet API supports more connectors than SQL. |batch |N/A -|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium] +|xref:integrate:legacy-cdc-connectors.adoc[DebeziumCdcSources.debezium] (Legacy) |hazelcast-jet-cdc-debezium |streaming |at-least-once +|xref:integrate:legacy-cdc-connectors.adoc[MySqlCdcSources.mysql] (Legacy) +|hazelcast-jet-cdc-mysql +|streaming +|exactly-once + +|xref:integrate:legacy-cdc-connectors.adoc[PostgresCdcSources.postgres] (Legacy) +|hazelcast-jet-cdc-postgres +|streaming +|exactly-once + +|xref:integrate:cdc-connectors.adoc[DebeziumCdcSources.debezium] ([.enterprise]*Enterprise*) +|hazelcast-enterprise-cdc-debezium +|streaming +|at-least-once + +|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql] +|hazelcast-enterprise-cdc-mysql +|streaming +|exactly-once + +|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres] +|hazelcast-enterprise-cdc-postgres +|streaming +|exactly-once + |xref:integrate:elasticsearch-connector.adoc[ElasticSources.elastic] |hazelcast-jet-elasticsearch-7 |batch @@ -150,16 +175,6 @@ The Jet API supports more connectors than SQL. |streaming |exactly-once -|xref:integrate:cdc-connectors.adoc[MySqlCdcSources.mysql] -|hazelcast-jet-cdc-mysql -|streaming -|exactly-once - -|xref:integrate:cdc-connectors.adoc[PostgresCdcSources.postgres] -|hazelcast-jet-cdc-postgres -|streaming -|exactly-once - |xref:integrate:pulsar-connector.adoc[PulsarSources.pulsarConsumer] |hazelcast-jet-contrib-pulsar |streaming @@ -270,7 +285,11 @@ The Jet API supports more connectors than SQL. |N/A |xref:integrate:cdc-connectors.adoc[CdcSinks.map] -|hazelcast-jet-cdc-debezium +|hazelcast-jet-cdc-debezium (legacy, {open-source-product-name}) + +or + +hazelcast-enterprise-cdc-debezium ({enterprise-product-name}) |streaming |at-least-once diff --git a/docs/modules/integrate/pages/legacy-cdc-connectors.adoc b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc new file mode 100644 index 000000000..662789fcf --- /dev/null +++ b/docs/modules/integrate/pages/legacy-cdc-connectors.adoc @@ -0,0 +1,99 @@ += Legacy CDC Connector + + +NOTE: This page refers to Hazelcast's {open-source-product-name} CDC connectors, also known as legacy CDC connectors. For more information on {enterprise-product-name} CDC connectors, see xref:integrate:cdc-connectors.adoc[]. + +Change Data Capture (CDC) refers to the process of observing changes +made to a database and extracting them in a form usable by other +systems, for the purposes of replication, analysis and many more. + +Change Data Capture is especially important to Hazelcast, because it allows +for the _streaming of changes from databases_, which can be efficiently +processed by the Jet engine. + +The implementation of CDC in Hazelcast {open-source-product-name} is based on +link:https://debezium.io/[Debezium, window=_blank]. Hazelcast offers a generic Debezium source +that can handle CDC events from link:https://debezium.io/documentation/reference/stable/connectors/index.html[any database supported by Debezium, window=_blank]. +However, we're also striving to make CDC sources first class citizens in Hazelcast, +as we have done already for MySQL and PostgreSQL. + +== Install the CDC connector + +This connector is included in the full distribution of Hazelcast {open-source-product-name}. + +== CDC as a source + +We have the following types of CDC sources: + +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/DebeziumCdcSources.html[DebeziumCdcSources, window=_blank]: + a generic source for all databases supported by Debezium +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/mysql/MySqlCdcSources.html[MySqlCdcSources, window=_blank]: + a specific, first class Jet CDC source for MySQL databases (also based + on Debezium, but with the additional benefits provided by Hazelcast) +* link:https://docs.hazelcast.org/docs/{full-version}/javadoc/com/hazelcast/jet/cdc/postgres/PostgresCdcSources.html[PostgresCdcSources, window=_blank]: + a specific, first class CDC source for PostgreSQL databases (also based + on Debezium, but with the additional benefits provided by Hazelcast) + +To set up a streaming source of CDC data, define it using the following configuration: + +[source,java] +---- +Pipeline pipeline = Pipeline.create(); +pipeline.readFrom( + MySqlCdcSources.mysql("customers") + .setDatabaseAddress("127.0.0.1") + .setDatabasePort(3306) + .setDatabaseUser("debezium") + .setDatabasePassword("dbz") + .setClusterName("dbserver1") + .setDatabaseWhitelist("inventory") + .setTableWhitelist("inventory.customers") + .build()) + .withNativeTimestamps(0) + .writeTo(Sinks.logger()); +---- + +For an example of how to use CDC data, see the xref:pipelines:cdc.adoc[] tutorial. + +=== Fault tolerance + +CDC sources offer _at least once_ processing guarantees. The source +periodically saves the database write ahead log offset for which it has +dispatched events and in case of a failure/restart it will replay all +events since the last successfully saved offset. + +Unfortunately, there is no guarantee that the last saved offset +is still in the database changelog. Such logs are always finite and, +depending on the DB configuration, can be relatively short, so if the CDC +source has to replay data for a long period of inactivity, then there +can be data loss. With careful management, the _at least once_ guarantee can be practically implemented. + +== CDC as a sink + +Change data capture is a source-side functionality in Jet, but Hazelcast also +offers specialized sinks that simplify applying CDC events to a map, which gives you the ability to reconstruct the contents of the original database table. The sinks expect to receive `ChangeRecord` +objects and apply your custom functions to them that extract the key and +the value that will be applied to the target map. + +For example, a sink mapping CDC data to a `Customer` class and +maintaining a map view of latest known email addresses per customer +(identified by ID) would look like this: + +[source,java] +---- +Pipeline p = Pipeline.create(); +p.readFrom(source) + .withoutTimestamps() + .writeTo(CdcSinks.map("customers", + r -> r.key().toMap().get("id"), + r -> r.value().toObject(Customer.class).email)); +---- + +[NOTE] +==== +The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn't include user code submitted as a part of the job. So in the above example it's OK to have `String` email values, but we wouldn't be able to use `Customer` directly. + +If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our xref:pipelines:cdc-join.adoc#7-start-hazelcast-jet[CDC Join tutorial]. + +Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario. +==== \ No newline at end of file diff --git a/docs/modules/pipelines/pages/cdc-database-setup.adoc b/docs/modules/pipelines/pages/cdc-database-setup.adoc index 22877d4c8..69e0c75e0 100644 --- a/docs/modules/pipelines/pages/cdc-database-setup.adoc +++ b/docs/modules/pipelines/pages/cdc-database-setup.adoc @@ -39,13 +39,14 @@ config file. See MySQL Reference Manual on how to do that link:https://dev.mysql.com/doc/refman/8.0/en/option-files.html[8.0]). For example: -``` +[source] +---- server-id = 223344 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10 -``` +---- The semantics of these options are as follows: @@ -62,9 +63,10 @@ link:https://dev.mysql.com/doc/refman/8.0/en/show-variables.html[8.0]). It's worth pointing out that the names of the options sometimes differ from the names of the MySQL system variables they set. For example: -``` +[source] +---- SHOW VARIABLES LIKE 'server_id'; -``` +---- === Configure Session Timeouts @@ -121,7 +123,8 @@ configuration options to be set accordingly. This can be done either by The important properties to set are: -```properties +[source,properties] +---- # MODULES shared_preload_libraries = 'decoderbufs,wal2json' @@ -129,7 +132,7 @@ shared_preload_libraries = 'decoderbufs,wal2json' wal_level = logical max_wal_senders = 1 max_replication_slots = 1 -``` +---- `shared_preload_libraries` contains a comma separated list of installed output plug-ins. `wal_levels` is used to tell the server to use logical @@ -152,9 +155,10 @@ permissions. The permissions needed are `REPLICATION` and `LOGIN`. For setting up database users/roles see the link:https://www.postgresql.org/docs/9.6/user-manag.html[PostgreSQL documentation], but basically the essential command is: -``` +[source,sql] +---- CREATE ROLE name REPLICATION LOGIN; -``` +---- Note: database super-users already have all the permissions needed by replication too. @@ -166,11 +170,12 @@ PostgreSQL server needs to allow access from the host the CDC connector is running on. To specify such link:https://www.postgresql.org/docs/9.6/auth-pg-hba-conf.html[client authentication] options add following lines to the end of the `pg_hba.conf` file: -``` +[source] +---- local replication user trust host replication user 127.0.0.1/32 trust host replication user ::1/128 trust -``` +---- This example tells the server to allow replication for the specified user locally or on `localhost`, using IPv4 or IPv6. @@ -250,9 +255,9 @@ In our tests we didn't manage to make it output much more than 20,000 records/second, so on a powerful server running the database it shouldn't affect normal operations too severely. -==== Failure Tolerance +==== Failure tolerance -PostgreSQL failure tolerance associated with replication slots is +PostgreSQL's failure tolerance associated with replication slots is somewhat lacking in certain aspects. The CDC connector can quite nicely deal with its own restart or connection loss to the primary database, but only as long as replication slots remain intact. Replication diff --git a/docs/modules/pipelines/pages/cdc-join.adoc b/docs/modules/pipelines/pages/cdc-join.adoc index 448ad895f..d5fca7f64 100644 --- a/docs/modules/pipelines/pages/cdc-join.adoc +++ b/docs/modules/pipelines/pages/cdc-join.adoc @@ -3,6 +3,8 @@ In this tutorial, you will learn how to make a map hold enriched data, combined (joined) from multiple database tables. +NOTE: If you are using Hazelcast {open-source-product-name}, you have to change the package from `com.hazelcast.enterprise.jet...` to `com.hazelcast.jet...`. + == Step 1. Install Docker This tutorial uses link:https://www.docker.com/[Docker] to simplify the @@ -45,7 +47,70 @@ They differ slightly depending on which database you use: Let's write the code for the processing we want to accomplish: -```java +[tabs] +==== +{enterprise-product-name}:: ++ +-- +[source,java] +---- +package org.example; + +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.enterprise.jet.cdc.ChangeRecord; +import com.hazelcast.enterprise.jet.cdc.mysql.MySqlCdcSources; +import com.hazelcast.jet.config.JobConfig; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.StreamSource; +import com.hazelcast.jet.pipeline.StreamStage; + +public class JetJob { + +private static final int MAX_CONCURRENT_OPERATIONS = 1; + +public static void main(String[] args) { + StreamSource source = MySqlCdcSources.mysql("source") + .setDatabaseAddress("127.0.0.1", 3306) + .setDatabaseCredentials("debezium", "dbz") + .setClusterName("dbserver1") + .setDatabaseIncludeList("inventory") + .setTableIncludeList("inventory.customers", "inventory.orders") + .build(); + + Pipeline pipeline = Pipeline.create(); + StreamStage allRecords = pipeline.readFrom(source) + .withNativeTimestamps(0); + + allRecords.filter(r -> r.table().equals("customers")) + .apply(Ordering::fix) + .peek() + .writeTo(Sinks.mapWithEntryProcessor(MAX_CONCURRENT_OPERATIONS, "cache", + record -> (Integer) record.key().toMap().get("id"), + CustomerEntryProcessor::new + )); + + allRecords.filter(r -> r.table().equals("orders")) + .apply(Ordering::fix) + .peek() + .writeTo(Sinks.mapWithEntryProcessor(MAX_CONCURRENT_OPERATIONS, "cache", + record -> (Integer) record.value().toMap().get("purchaser"), + OrderEntryProcessor::new + )); + + JobConfig cfg = new JobConfig().setName("monitor"); + HazelcastInstance hz = Hazelcast.bootstrappedInstance(); + hz.getJet().newJob(pipeline, cfg); + } +} +---- +-- +{open-source-product-name}:: ++ +-- +[source,java] +---- package org.example; import com.hazelcast.core.Hazelcast; @@ -64,10 +129,8 @@ private static final int MAX_CONCURRENT_OPERATIONS = 1; public static void main(String[] args) { StreamSource source = MySqlCdcSources.mysql("source") - .setDatabaseAddress("127.0.0.1") - .setDatabasePort(3306) - .setDatabaseUser("debezium") - .setDatabasePassword("dbz") + .setDatabaseAddress("127.0.0.1", 3306) + .setDatabaseCredentials("debezium", "dbz") .setClusterName("dbserver1") .setDatabaseWhitelist("inventory") .setTableWhitelist("inventory.customers", "inventory.orders") @@ -98,30 +161,50 @@ public static void main(String[] args) { hz.getJet().newJob(pipeline, cfg); } } -``` +---- +-- +==== If using Postgres, only the source would need to change, like this: -```java +[tabs] +==== +{enterprise-product-name}:: ++ +[source,java] +---- +StreamSource source = PostgresCdcSources.postgres("source") + .setDatabaseAddress("127.0.0.1", 5432) + .setDatabaseCredentials("postgres", "postgres") + .setDatabaseName("postgres") + .setTableIncludeList("inventory.customers", "inventory.orders") + .build(); +---- +{open-source-product-name}:: ++ +[source,java] +---- StreamSource source = PostgresCdcSources.postgres("source") .setDatabaseAddress("127.0.0.1") .setDatabasePort(5432) .setDatabaseUser("postgres") .setDatabasePassword("postgres") .setDatabaseName("postgres") - .setTableWhitelist("inventory.customers", "inventory.orders") + .setTableIncludeList("inventory.customers", "inventory.orders") .build(); -``` +---- +==== As we can see from the pipeline code, our `Sink` is `EntryProcessor` based. The two `EntryProcessors` we use are: -```java +[source,java] +---- package org.example; -import com.hazelcast.jet.cdc.ChangeRecord; -import com.hazelcast.jet.cdc.Operation; -import com.hazelcast.jet.cdc.ParsingException; +import com.hazelcast.enterprise.jet.cdc.ChangeRecord; +import com.hazelcast.enterprise.jet.cdc.Operation; +import com.hazelcast.enterprise.jet.cdc.ParsingException; import com.hazelcast.map.EntryProcessor; import java.util.Map; @@ -158,14 +241,15 @@ public class CustomerEntryProcessor implements EntryProcessor/build/libs/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- Maven:: + -- -```yaml +[source,yaml] +---- user-code-deployment: enabled: true jarPaths: - /target/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- ==== + @@ -558,17 +669,19 @@ you created the project for this tutorial. . Start Hazelcast. + -```bash +[source,bash] +---- bin/hz-start -``` +---- . When you see output like this, Hazelcast is up: + -``` +[source] +---- Members {size:1, ver:1} [ Member [192.168.1.5]:5701 - e7c26f7c-df9e-4994-a41d-203a1c63480e this ] -``` +---- == Step 8. Submit the Job for Execution @@ -581,77 +694,85 @@ following command: Gradle:: + -- -```bash +[source,bash] +---- bin/hz-cli submit build/libs/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- Maven:: + -- -```bash +[source,bash] +---- bin/hz-cli submit target/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- ==== The output in the Hazelcast member's log should look something like this (we see these lines due to the `peek()` stages we've inserted): -``` +[source,text] +---- ........ ... Output to ordinal 0: key:{{"order_number":10002}}, value:{{"order_number":10002,"order_date":16817,"purchaser":1002,"quantity":2,"product_id":105,"__op":"c","__db":"inventory","__table":"orders","__ts_ms":1593681751174,"__deleted":"false"}} (eventTime=12:22:31.174) ... Output to ordinal 0: key:{{"order_number":10003}}, value:{{"order_number":10003,"order_date":16850,"purchaser":1002,"quantity":2,"product_id":106,"__op":"c","__db":"inventory","__table":"orders","__ts_ms":1593681751174,"__deleted":"false"}} (eventTime=12:22:31.174) ... Output to ordinal 0: key:{{"id":1003}}, value:{{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com","__op":"c","__db":"inventory","__table":"customers","__ts_ms":1593681751161,"__deleted":"false"}} (eventTime=12:22:31.161) ........ -``` +---- == Step 9. Track Updates Let's see how our cache looks like at this time. If we execute the `CacheRead` code <<5-define-jet-job, defined above>>, we'll get: -``` +[source] +---- Currently there are following customers in the cache: Customer: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com}, Orders: {10002=Order {orderNumber=10002, orderDate=Sun Jan 17 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=105}, 10003=Order {orderNumber=10003, orderDate=Fri Feb 19 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=106}} Customer: Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com}, Orders: {10004=Order {orderNumber=10004, orderDate=Sun Feb 21 02:00:00 EET 2016, purchaser=1003, quantity=1, productId=107}} Customer: Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org}, Orders: {} Customer: Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com}, Orders: {10001=Order {orderNumber=10001, orderDate=Sat Jan 16 02:00:00 EET 2016, purchaser=1001, quantity=1, productId=102}} -``` +---- Let's do some updates in our database. Go to the database CLI <<3-start-command-line-client, we've started earlier>> and run following commands: -```bash +[source,sql] +---- INSERT INTO inventory.customers VALUES (1005, 'Jason', 'Bourne', 'jason@bourne.org'); DELETE FROM inventory.orders WHERE order_number=10002; -``` +---- If we check the cache with `CacheRead` we get: -``` +[source] +---- Currently there are following customers in the cache: Customer: Customer {id=1005, firstName=Jason, lastName=Bourne, email=jason@bourne.org}, Orders: {} Customer: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com}, Orders: {10003=Order {orderNumber=10003, orderDate=Fri Feb 19 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=106}} Customer: Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com}, Orders: {10004=Order {orderNumber=10004, orderDate=Sun Feb 21 02:00:00 EET 2016, purchaser=1003, quantity=1, productId=107}} Customer: Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org}, Orders: {} Customer: Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com}, Orders: {10001=Order {orderNumber=10001, orderDate=Sat Jan 16 02:00:00 EET 2016, purchaser=1001, quantity=1, productId=102}} -``` +---- == Step 10. Clean up . Cancel the job. + -```bash +[source,bash] +---- bin/hz-cli cancel postgres-monitor -``` +---- Shut down the Hazelcast cluster. + -```bash +[source,bash] +---- bin/hz-stop -``` +---- . Use Docker to stop the running container (this will kill the command-line client too, since it's running in the same container): @@ -663,9 +784,10 @@ MySQL:: -- You can use Docker to stop all running containers: -```bash +[source,bash] +---- docker stop mysqlterm mysql -``` +---- -- Postgres:: + @@ -674,9 +796,10 @@ Postgres:: You can use Docker to stop the running container (this will kill the command-line client too, since it's running in the same container): -```bash +[source,bash] +---- docker stop postgres -``` +---- -- ==== + @@ -685,6 +808,7 @@ Docker should remove them right after we stop them. We can verify that all processes are stopped and removed with following command: -```bash +[source,bash] +---- docker ps -a -``` +---- diff --git a/docs/modules/pipelines/pages/cdc-postgres.adoc b/docs/modules/pipelines/pages/cdc-postgres.adoc index ca77ddbc0..ed9e6fe1f 100644 --- a/docs/modules/pipelines/pages/cdc-postgres.adoc +++ b/docs/modules/pipelines/pages/cdc-postgres.adoc @@ -23,11 +23,12 @@ Open a terminal, and run following command. It will start a new container that runs a PostgreSQL database server preconfigured with an inventory database: -```bash +[source,bash] +---- docker run -it --rm --name postgres -p 5432:5432 \ -e POSTGRES_DB=postgres -e POSTGRES_USER=postgres \ -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.2 -``` +---- This runs a new container using version `1.2` of the link:https://hub.docker.com/r/debezium/example-postgres[debezium/example-postgres] @@ -50,7 +51,8 @@ container to the same port on the Docker host so that software outside In your terminal you should see something like the following: -``` +[source] +---- ... PostgreSQL init process complete; ready for start up. @@ -58,7 +60,7 @@ PostgreSQL init process complete; ready for start up. 2020-06-02 11:36:19.581 GMT [1] LOG: listening on IPv6 address "::", port 5432 2020-06-02 11:36:19.585 GMT [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432" 2020-06-02 11:36:19.618 GMT [1] LOG: database system is ready to accept connections -``` +---- The PostgreSQL server is running and ready for use. @@ -67,35 +69,40 @@ The PostgreSQL server is running and ready for use. Open a new terminal, and use it to run `psql` (PostgreSQL interactive terminal) inside the already running `postgres` container: -```bash +[source,bash] +---- docker exec -it postgres psql -U postgres -``` +---- You should end up with a prompt similar to this: -``` +[source] +---- psql (11.8 (Debian 11.8-1.pgdg90+1)) Type "help" for help. postgres=# -``` +---- We’ll use the prompt to interact with the database. First, switch to the "inventory" schema: -```bash +[source,bash] +---- SET search_path TO inventory; -``` +---- and then list the tables in the database: -```bash +[source,bash] +---- \dt; -``` +---- This should display the following: -``` +[source] +------------ List of relations Schema | Name | Type | Owner -----------+------------------+-------+---------- @@ -106,14 +113,15 @@ This should display the following: inventory | products_on_hand | table | postgres inventory | spatial_ref_sys | table | postgres (6 rows) -``` +------------ Feel free to explore the database and view the preloaded data. For example: -```bash +[source,bash] +---- SELECT * FROM customers; -``` +---- == Step 4. Start Hazelcast @@ -124,28 +132,44 @@ follow from here on. . Make sure the PostgreSQL CDC plugin is in the `lib/` directory. + -```bash +[source,bash] +---- ls lib/ -``` +---- + You should see the following jars: + +[tabs] +==== +{enterprise-product-name}:: ++ +-- +* hazelcast-enterprise-cdc-debezium-{full-version}.jar +* hazelcast-enterprise-cdc-postgres-{full-version}.jar (for Postgres) +-- +{open-source-product-name}:: ++ +-- * hazelcast-jet-cdc-debezium-{full-version}.jar -* hazelcast-jet-cdc-postgres-{full-version}.jar +* hazelcast-jet-cdc-postgres-{full-version}.jar (for Postgres) +-- +==== . Start Hazelcast + -```bash +[source,bash] +---- bin/hz-start -``` +---- . When you see output like this, Hazelcast is up: + -``` +[source] +---- Members {size:1, ver:1} [ Member [192.168.1.5]:5701 - e7c26f7c-df9e-4994-a41d-203a1c63480e this ] -``` +---- == Step 5. Create a New Java Project @@ -171,8 +195,8 @@ repositories.mavenCentral() dependencies { implementation 'com.hazelcast:hazelcast:{full-version}' - implementation 'com.hazelcast.jet:hazelcast-jet-cdc-debezium:{full-version}' - implementation 'com.hazelcast.jet:hazelcast-jet-cdc-postgres:{full-version}' + implementation 'com.hazelcast.jet:hazelcast-enterprise-cdc-debezium:{full-version}' + implementation 'com.hazelcast.jet:hazelcast-enterprise-cdc-postgres:{full-version}' implementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.0' } @@ -206,12 +230,12 @@ Maven:: com.hazelcast.jet - hazelcast-jet-cdc-debezium + hazelcast-enterprise-cdc-debezium {full-version} com.hazelcast.jet - hazelcast-jet-cdc-postgres + hazelcast-enterprise-cdc-postgres {full-version} @@ -241,6 +265,9 @@ Maven:: -- ==== +If you are using {open-source-product-name}, you have to replace `hazelcast-enterprise-cdc-debezium` +with `hazelcast-jet-cdc-debezium` and `hazelcast-enterprise-cdc-postgres` with `hazelcast-jet-cdc-postgres`. + == Step 6. Define Data Pipeline Let's write the code that will monitor the database and do something @@ -254,7 +281,55 @@ customer with a specific ID. This is how the code doing this looks like: -```java +[tabs] +==== +{enterprise-product-name}:: ++ +-- +[source,java] +---- +package org.example; + +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.enterprise.jet.cdc.CdcSinks; +import com.hazelcast.enterprise.jet.cdc.ChangeRecord; +import com.hazelcast.enterprise.jet.cdc.postgres.PostgresCdcSources; +import com.hazelcast.jet.config.JobConfig; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.StreamSource; + +public class JetJob { + + public static void main(String[] args) { + StreamSource source = PostgresCdcSources.postgres("source") + .setDatabaseAddress("127.0.0.1", 5432) + .setDatabaseCredentials("postgres", "postgres") + .setDatabaseName("postgres") + .setTableIncludeList("inventory.customers") + .build(); + + Pipeline pipeline = Pipeline.create(); + pipeline.readFrom(source) + .withoutTimestamps() + .peek() + .writeTo(CdcSinks.map("customers", + r -> r.key().toMap().get("id"), + r -> r.value().toObject(Customer.class).toString())); + + JobConfig cfg = new JobConfig().setName("postgres-monitor"); + HazelcastInstance hz = Hazelcast.bootstrappedInstance(); + hz.getJet().newJob(pipeline, cfg); + } + +} +---- +-- +{open-source-product-name}:: ++ +-- +[source,java] +---- package org.example; import com.hazelcast.core.Hazelcast; @@ -292,11 +367,14 @@ public class JetJob { } } -``` +---- +-- +==== The `Customer` class we map change events to is quite simple too: -```java +[source,java] +---- package org.example; import com.fasterxml.jackson.annotation.JsonProperty; @@ -354,14 +432,15 @@ public class Customer implements Serializable { return "Customer {id=" + id + ", firstName=" + firstName + ", lastName=" + lastName + ", email=" + email + '}'; } } -``` +---- To make it evident that our pipeline serves the purpose of building an up-to-date cache of customers, which can be interrogated at any time let's add one more class. This code can be executed at any time in your IDE and will print the current content of the cache. -```java +[source,java] +---- package org.example; import com.hazelcast.client.HazelcastClient; @@ -379,7 +458,7 @@ public class CacheRead { } } -``` +---- == Step 7. Package the Pipeline into a JAR @@ -395,9 +474,10 @@ need to do is to run the build command: Gradle:: + -- -```bash +[source,bash] +---- gradle build -``` +---- This will produce a JAR file called `cdc-tutorial-1.0-SNAPSHOT.jar` in the `build/libs` directory of our project. @@ -405,9 +485,10 @@ in the `build/libs` directory of our project. Maven:: + -- -```bash +[source,bash] +---- mvn package -``` +---- This will produce a JAR file called `cdc-tutorial-1.0-SNAPSHOT.jar` in the `target` directory or our project. @@ -424,16 +505,18 @@ issue is following command: Gradle:: + -- -```bash +[source,bash] +---- bin/hz-cli submit build/libs/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- Maven:: + -- -```bash +[source,bash] +---- bin/hz-cli submit target/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- ==== @@ -441,7 +524,8 @@ The output in the Hazelcast member's log should look something like this (we also log what we put in the `IMap` sink thanks to the `peek()` stage we inserted): -``` +[source] +---- ... Snapshot ended with SnapshotResult [...] ... Obtained valid replication slot ReplicationSlot [...] ... REPLICA IDENTITY for 'inventory.customers' is 'FULL'; UPDATE AND DELETE events will contain the previous values of all the columns @@ -450,85 +534,95 @@ we inserted): ... Output to ordinal 0: key:{{"id":1003}}, value:{{"id":1003,"first_name":"Edward","last_name":"Walker",... ... Output to ordinal 0: key:{{"id":1004}}, value:{{"id":1004,"first_name":"Anne","last_name":"Kretchmar",... ... Transitioning from the snapshot reader to the binlog reader -``` +---- == Step 9. Track Updates Let's see how our cache looks like at this time. If we execute the `CacheRead` code <<6-define-jet-job, defined above>>, we'll get: -``` +[source] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com} Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- Let's do some updates in our database. Go to the PostgreSQL CLI <<3-start-postgresql-command-line-client, we've started earlier>> and run following update statement: -```bash +[source,bash] +---- UPDATE customers SET first_name='Anne Marie' WHERE id=1004; -``` +---- In the log of the Hazelcast member we should immediately see the effect: -``` +[source] +---- ... Output to ordinal 0: key:{{"id":1004}}, value:{{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar",... -``` +---- If we check the cache with `CacheRead` we get: -``` +[source] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com} Customer {id=1004, firstName=Anne Marie, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- One more: -```bash +[source,bash] +---- UPDATE customers SET email='edward.walker@walker.com' WHERE id=1003; -``` +---- -``` +[source] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=edward.walker@walker.com} Customer {id=1004, firstName=Anne Marie, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- == Step 10. Clean Up . Cancel the job. + -```bash +[source,bash] +---- bin/hz-cli cancel postgres-monitor -``` +---- Shut down the Hazelcast cluster. + -```bash +[source,bash] +---- bin/hz-stop -``` +---- . Use Docker to stop the running container (this will kill the command-line client too, since it's running in the same container): + -```bash +[source,bash] +---- docker stop postgres -``` +---- + Since we've used the `--rm` flag when starting the connectors, Docker should remove them right after we stop them. We can verify that all processes are stopped and removed with following command: -```bash +[source,bash] +---- docker ps -a -``` +---- diff --git a/docs/modules/pipelines/pages/cdc.adoc b/docs/modules/pipelines/pages/cdc.adoc index a30649095..3645ca029 100644 --- a/docs/modules/pipelines/pages/cdc.adoc +++ b/docs/modules/pipelines/pages/cdc.adoc @@ -23,15 +23,16 @@ Open a terminal, and run following command. It will start a new container that runs a MySQL database server preconfigured with an inventory database: -```bash +[source,bash] +---- docker run -it --rm --name mysql -p 3306:3306 \ -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser \ - -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.2 -``` + -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.6 +---- -This runs a new container using version `1.2` of the +This runs a new container using version `2.6` of the link:https://hub.docker.com/r/debezium/example-mysql[debezium/example-mysql] -image (based on link:https://hub.docker.com/_/mysql[mysql:5.7]. It defines +image (based on link:https://hub.docker.com/_/mysql[mysql 8.2]. It defines and populates a sample "inventory" database and creates a `debezium` user with password `dbz` that has the minimum privileges required by Debezium’s MySQL connector. @@ -52,11 +53,12 @@ variables to specific values. You should see in your terminal something like the following: -```text +[source,text] +---- ... 2020-03-09T09:48:24.579480Z 0 [Note] mysqld: ready for connections. Version: '5.7.29-log' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL) -``` +---- Notice that the MySQL server starts and stops a few times as the configuration is modified. The last line listed above reports that the @@ -68,10 +70,11 @@ Open a new terminal, and use it to start a new container for the MySQL command line client and connect it to the MySQL server running in the `mysql` container: -```bash +[source,bash] +---- docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh \ -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"' -``` +---- Here we start the container using the `mysql:5.7` image, name the container `mysqlterm` and link it to the mysql container where the @@ -84,7 +87,8 @@ specifies the correct options so that it can connect properly. The container should output lines similar to the following: -``` +[source] +---- mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 4 @@ -99,25 +103,28 @@ owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> -``` +---- Unlike the other containers, this container runs a process that produces a prompt. We’ll use the prompt to interact with the database. First, switch to the "inventory" database: -```sql +[source,sql] +---- mysql> use inventory; -``` +---- and then list the tables in the database: -```sql +[source,sql] +---- mysql> show tables; -``` +---- which should then display: -``` +[source] +---- +---------------------+ | Tables_in_inventory | +---------------------+ @@ -129,14 +136,15 @@ which should then display: | products_on_hand | +---------------------+ 6 rows in set (0.01 sec) -``` +---- Use the MySQL command line client to explore the database and view the preloaded data. For example: -```sql +[source,sql] +---- mysql> SELECT * FROM customers; -``` +---- == Step 4. Start Hazelcast @@ -145,33 +153,36 @@ mysql> SELECT * FROM customers; If you already have Hazelcast and you skipped the above steps, make sure to follow from here on. -. Make sure the MySQL CDC plugin is in `lib/` directory. +. Make sure the MySQL CDC plugin is in the `lib/` directory. You must manually download the MySQL CDC plugin from link:https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-enterprise-cdc-mysql/{full-version}/hazelcast-enterprise-cdc-mysql-{full-version}-jar-with-dependencies.jar[Hazelcast's Maven repository, window=_blank] and then copy it to the `lib/` directory. + -```bash +[source,bash] +---- ls lib/ -``` +---- + You should see the following jars: + -* hazelcast-jet-cdc-debezium-{full-version}.jar -* hazelcast-jet-cdc-mysql-{full-version}.jar -* hazelcast-jet-cdc-postgres-{full-version}.jar +* hazelcast-enterprise-cdc-debezium-{full-version}-jar-with-dependencies.jar +* hazelcast-enterprise-cdc-mysql-{full-version}-jar-with-dependencies.jar +* hazelcast-enterprise-cdc-postgres-{full-version}-jar-with-dependencies.jar + -WARNING: If you have Hazelcast {enterprise-product-name}, you need to manually download the MySQL CDC plugin from Hazelcast's Maven https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-jet-cdc-mysql/{full-version}/hazelcast-jet-cdc-mysql-{full-version}-jar-with-dependencies.jar[repository] and then copy it to the `lib/` directory. +WARNING: If you have Hazelcast {enterprise-product-name}, you need to manually download the MySQL CDC plugin from https://repo1.maven.org/maven2/com/hazelcast/jet/hazelcast-jet-cdc-mysql/{full-version}/hazelcast-jet-cdc-mysql-{full-version}-jar-with-dependencies.jar[Hazelcast's Maven repository] and then copy it to the `lib/` directory. . Start Hazelcast. + -```bash +[source,bash] +---- bin/hz-start -``` +---- . When you see output like this, Hazelcast is up: + -``` +[source] +---- Members {size:1, ver:1} [ Member [192.168.1.5]:5701 - e7c26f7c-df9e-4994-a41d-203a1c63480e this ] -``` +---- == Step 5. Create a New Java Project @@ -197,8 +208,8 @@ repositories.mavenCentral() dependencies { implementation 'com.hazelcast:hazelcast:{full-version}' - implementation 'com.hazelcast.jet:hazelcast-jet-cdc-debezium:{full-version}' - implementation 'com.hazelcast.jet:hazelcast-jet-cdc-mysql:{full-version}' + implementation 'com.hazelcast.jet:hazelcast-enterprise-cdc-debezium:{full-version}' + implementation 'com.hazelcast.jet:hazelcast-enterprise-cdc-mysql:{full-version}' implementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.0' } @@ -220,8 +231,7 @@ Maven:: 1.0-SNAPSHOT - 1.8 - 1.8 + 17 @@ -232,12 +242,12 @@ Maven:: com.hazelcast.jet - hazelcast-jet-cdc-debezium + hazelcast-enterprise-cdc-debezium {full-version} com.hazelcast.jet - hazelcast-jet-cdc-mysql + hazelcast-enterprise-cdc-mysql {full-version} @@ -280,14 +290,15 @@ customer with a specific ID. This is how the code doing this looks like: -```java +[source,java] +---- package org.example; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; -import com.hazelcast.jet.cdc.CdcSinks; -import com.hazelcast.jet.cdc.ChangeRecord; -import com.hazelcast.jet.cdc.mysql.MySqlCdcSources; +import com.hazelcast.enterprise.jet.cdc.CdcSinks; +import com.hazelcast.enterprise.jet.cdc.ChangeRecord; +import com.hazelcast.enterprise.jet.cdc.mysql.MySqlCdcSources; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.StreamSource; @@ -296,13 +307,11 @@ public class JetJob { public static void main(String[] args) { StreamSource source = MySqlCdcSources.mysql("source") - .setDatabaseAddress("127.0.0.1") - .setDatabasePort(3306) - .setDatabaseUser("debezium") - .setDatabasePassword("dbz") + .setDatabaseAddress("127.0.0.1", 3306) + .setDatabaseCredentials("debezium", "dbz") .setClusterName("dbserver1") - .setDatabaseWhitelist("inventory") - .setTableWhitelist("inventory.customers") + .setDatabaseIncludeList("inventory") + .setTableIncludeList("inventory.customers") .build(); Pipeline pipeline = Pipeline.create(); @@ -319,11 +328,14 @@ public class JetJob { } } -``` +---- + +NOTE: If you are using Hazelcast {open-source-product-name}, you have to change the package from `com.hazelcast.enterprise.jet...` to `com.hazelcast.jet...`. The `Customer` class we map change events to is quite simple too: -```java +[source,java] +---- package org.example; import com.fasterxml.jackson.annotation.JsonProperty; @@ -381,14 +393,15 @@ public class Customer implements Serializable { return "Customer {id=" + id + ", firstName=" + firstName + ", lastName=" + lastName + ", email=" + email + '}'; } } -``` +---- To make it evident that our pipeline serves the purpose of building an up-to-date cache of customers, which can be interrogated at any time let's add one more class. This code can be executed at any time in your IDE and will print the current content of the cache. -```java +[source,java] +---- package org.example; import com.hazelcast.client.HazelcastClient; @@ -406,7 +419,7 @@ public class CacheRead { } } -``` +---- == Step 7. Package the Pipeline into a JAR @@ -422,9 +435,10 @@ need to do is to run the build command: Gradle:: + -- -```bash +[source,bash] +---- gradle build -``` +---- This will produce a JAR file called `cdc-tutorial-1.0-SNAPSHOT.jar` in the `build/libs` directory of our project. @@ -433,9 +447,10 @@ Maven:: + -- -```bash +[source,bash] +---- mvn package -``` +---- This will produce a JAR file called `cdc-tutorial-1.0-SNAPSHOT.jar` in the `target` directory or our project. @@ -452,16 +467,18 @@ issue is following command: Gradle:: + -- -```bash +[source,bash] +---- bin/hz-cli submit build/libs/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- Maven:: + -- -```bash +[source,bash] +---- bin/hz-cli submit target/cdc-tutorial-1.0-SNAPSHOT.jar -``` +---- -- ==== @@ -469,95 +486,106 @@ The output in the Hazelcast member's log should look something like this (we also log what we put in the `IMap` sink thanks to the `peek()` stage we inserted): -``` +[source] +---- ... Completed snapshot in 00:00:01.519 ... Output to ordinal 0: key:{{"id":1001}}, value:{{"id":1001,"first_name":"Sally","last_name":"Thomas",... ... Output to ordinal 0: key:{{"id":1002}}, value:{{"id":1002,"first_name":"George","last_name":"Bailey",... ... Output to ordinal 0: key:{{"id":1003}}, value:{{"id":1003,"first_name":"Edward","last_name":"Walker",... ... Output to ordinal 0: key:{{"id":1004}}, value:{{"id":1004,"first_name":"Anne","last_name":"Kretchmar",... ... Transitioning from the snapshot reader to the binlog reader -``` +---- == Step 9. Track Updates Let's see how our cache looks like at this time. If we execute the `CacheRead` code <<6-define-jet-job, defined above>>, we'll get: -```text +[source,text] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com} Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- Let's do some updates in our database. Go to the MySQL CLI <> and run following update statement: -``` +[source,bash] +---- mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 -``` +---- In the log of the Hazelcast member we should immediately see the effect: -``` +[source] +---- ... Output to ordinal 0: key:{{"id":1004}}, value:{{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar",... -``` +---- If we check the cache with `CacheRead` we get: -``` +[source] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com} Customer {id=1004, firstName=Anne Marie, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- One more: -``` +[source,bash] +---- mysql> UPDATE customers SET email='edward.walker@walker.com' WHERE id=1003; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 -``` +---- -``` +[source] +---- Currently there are following customers in the cache: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com} Customer {id=1003, firstName=Edward, lastName=Walker, email=edward.walker@walker.com} Customer {id=1004, firstName=Anne Marie, lastName=Kretchmar, email=annek@noanswer.org} Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com} -``` +---- == Step 10. Clean up . Cancel the job. + -```bash +[source,bash] +---- bin/hz-cli cancel mysql-monitor -``` +---- + Shut down the Hazelcast cluster. + -```bash +[source,bash] +---- bin/hz-stop -``` +---- . Use Docker to stop the running container (this will kill the command-line client too, since it's running in the same container): + -```bash +[source,bash] +---- docker stop mysql -``` +---- + Since we've used the `--rm` flag when starting the connectors, Docker should remove them right after we stop them. We can verify that all processes are stopped and removed with following command: + -```bash +[source,bash] +---- docker ps -a -``` +----