From 462db47e3d1ecf1a4a30f483aae3a555375ce988 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Thu, 1 Apr 2021 11:23:36 +0200 Subject: [PATCH 01/13] schema refreshing --- REPORT.adoc | 374 ++++++++++++++++++ pom.xml | 1 + .../connector/cassandra/CassandraClient.java | 4 + .../cassandra/CassandraConnectorContext.java | 2 +- .../cassandra/CommitLogProcessor.java | 1 + .../cassandra/NoOpSchemaChangeListener.java | 117 ++++++ .../connector/cassandra/SchemaHolder.java | 71 +--- .../connector/cassandra/SchemaProcessor.java | 105 ++++- .../cassandra/CommitLogProcessorTest.java | 6 +- .../cassandra/SchemaProcessorTest.java | 13 +- .../cassandra/SnapshotProcessorTest.java | 12 +- 11 files changed, 629 insertions(+), 77 deletions(-) create mode 100644 REPORT.adoc create mode 100644 src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java diff --git a/REPORT.adoc b/REPORT.adoc new file mode 100644 index 0000000..f82c15a --- /dev/null +++ b/REPORT.adoc @@ -0,0 +1,374 @@ +== Doordash / Debezium integration report + +_by Stefan Miklosovic / stefan dot miklosovic at instaclustr dot com_ + +=== Introduction + +This report delivers the solution to a problem Doordash has with its Debezium integration in their +Cassandra clusters. In this document, we will disect the problem, +find the solution, implement it and deliver it to customer. + +=== Nature of the problem + +The problem is rather straightforward however a reader / Cassandra user can be confused by seemingly +almost random behaviour of Debezium which brings even more complexity to trying to understand what is going on. + +All problems observed are a variation of this scenario and reducing everything to this problem and +trying to solve it will resolve all errorneous behavior. + +* I have `cdc_enabled: true` in `cassandra.yaml` +* I have a table `k1.t1` +* I have cdc functionality enabled on `k1.t1` by specifying `cdc = true` in `CREATE` statement +or by `ALTER`-ing it afterwards if that is not the case. + +The important point here to make is that it is required to have Debezium running _before_ this +all happens. In other words: + +* I start Cassandra without `k1.t1` +* I start Debezium +* After points above, I create table as described above. + +The result of this procedure is that a cdc-enabled table will indeed place logs into cdc-raw directory +to be picked up by Debezium, but Debezium will not react - it just deletes these logs and nothing +is eventually sent to a respective Kafka topic. + +What is even worse is that these commit logs in cdc directory are lost for ever as Debezium will delete them (by default, +there is a way how to _not_ delete them by implementing own `CommitLogTransfer` but by default +a commit log transfer implementation will just delete these logs for good). + +What is even more strange is that the other table on which a `cdc` is enabled is just pushing these +messages to a topic fine so the fact that the other table does not produce them is mind-boggling. + +To fully understand why this is happening, we need to explain what a write and read path looks like. + +=== On Mutation and CommitLog + +The building block of any change in Cassandra is `Mutation`. A mutation contains `PartitionUpdate`-s. +If `Mutation` happens to consist of one "change" only, then this `Mutation` happens to contain +just one `PartitionUpdate`. + +Cassandra appends `Mutation` objects as they come at the end of _commit log segment_. A commit log segment +is each individual file in commitlog directory. + +If CDC is enabled, once a commit log is "full" (for the lack of better words and for brevity), it is +just copied over to `cdc` dir. + +Debezium has a watcher on cdc dir and as soon as a commit log is copied there, Debezim detects it, +it will read whole log, reconstructs all `Mutation`-s, creates messages and sends them to Kafka. + +Hence we clearly see that this process has two parts: + +1. *serialize* `Mutation`-s from Java objects to binary blob and store it to a commit log segment +2. parse a commit log segment and *deserialize* all `Mutation`-s into Java objects and create Kafka messages + +=== Write path and serialisation + +A mutation is serialized by its `MutationSerializer`. It is not too much important where this +serializer is called from. + +[source,java] +---- + public static class MutationSerializer implements IVersionedSerializer + { + public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException + { + /// other code hidden for brevity + + int size = mutation.modifications.size(); + + if (version < MessagingService.VERSION_30) + { + ByteBufferUtil.writeWithShortLength(mutation.key().getKey(), out); + out.writeInt(size); + } + else + { + out.writeUnsignedVInt(size); + } + + assert size > 0; + for (Map.Entry entry : mutation.modifications.entrySet()) + PartitionUpdate.serializer.serialize(entry.getValue(), out, version); + } +---- + +Here we see that in order to serialize a `Mutation`, we need to iterate over all _modifications_ +a.k.a `PartitionUpdate`-s and we need to serialise these. The serialisation of a Mutation +is equal to the serialisation of all its modifications. + +Let's take a look into `PartitionUpdate` serialisation: + +[source,java] +---- + public static class PartitionUpdateSerializer + { + public void serialize(PartitionUpdate update, DataOutputPlus out, int version) throws IOException + { + try (UnfilteredRowIterator iter = update.unfilteredIterator()) + { + // other code hidden for brevity + // serialisation of metadata + CFMetaData.serializer.serialize(update.metadata(), out, version); + // serialisation of partition itself + UnfilteredRowIteratorSerializer.serializer.serialize(iter, + null, out, version, update.rowCount()); + } + } +---- + +`updata.metadata()` above returns `CFMetaData`. `CFMetaData` has a field `params` of +type `TableParams` and this object has a flag called `cdc` which is a boolean saying +if that particular `PartitionUpdate` relates to a table which is _cdc-enabled_. + +From above we see that `PartitionUpdateSerializer` serializes not only partition themselves +but it also serializes its metadata via `CFMetaDataSerializer`. Let's take a look into it: + +[source,java] +---- + public static class Serializer + { + public void serialize(CFMetaData metadata, + DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(metadata.cfId, out, version); + } +---- + +Here, the very important fact is that the serialization of `CFMetaData` in Cassandra terms +means that _it will take UUID of a table that PartitionUpdate is from and it will serialize it_. +*Just that UUID*. + +=== Read path and deserialisation + +Having write path covered, let's look how read path is treated. Read path is a different problem - +we parse binary commit log and we try to construct all `Mutation`-s from it. + +*It is important to realise that this is happening in Debezium context, not in Cassandra.* + +Debezium is using Cassandra's class `CommitLogReader` by which a commit log reading is conducted so +Debezium is not using anything custom but built-in Cassandra functionality. + +Cassandra's `CommitLogReader` is used in Debezium's `CommitLogProcessor`. It just scans new commit logs +as they are copied to cdc dir and it will use `CommitLogReader` in its `process` method. + +`CommitLogReader` is reading commit logs via method `readCommitLogSegment` which accepts `CommitLogReadHandler`. +Commit log handler is the custom implementation of Debezium to actually hook there its functionality to process +mutations as they come from reading a commit log segment. + +For the completeness, the chain of method calls to the place where handler is ultimately called is like + +1. `CommitLogReader#readCommitLogSegment` +2. in method from 1) there is call to private `CommitLogReader#readSection`, a commit log is not read all at once but it is read by chunks - _sections_. +3. in 2) we pass our handler to `CommitLogReader#readMutation` + +At the beginning of 3) we *deserialize* buffer into a mutation. + +[source,java] +---- + try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) + { + mutation = Mutation.serializer.deserialize(bufIn, + desc.getMessagingVersion(), + SerializationHelper.Flag.LOCAL); +---- + +Finally, at the very bottom we see the handling of just deserialized `Mutation` by our +custom handler. + +[source,java] +---- +handler.handleMutation(mutation, size, entryLocation, desc); +---- + +The implementation of this handler in Debezium looks like this: + +[source,java] +---- + @Override + public void handleMutation(Mutation mutation, + int size, + int entryLocation, + CommitLogDescriptor descriptor) { + if (!mutation.trackedByCDC()) { + return; + } + + // other code + // here Mutation is eventually transformed to a Kafka message and sent +---- + +This is crucial. The problem is that *a mutation is not tracked by cdc* (empirically verified by putting heavy logging at all the places). + +In other words: we have verified that Cassandra serialized data as it is supposed to do but for some reason, its mutation which was previously marked as _cdc-enabled_ +is not deserialized in such a way that `trackedByCDC` would be `true` so that method +would not return immediately (hence nothing is sent to Kafka). + +Let's see the logic behind `Mutation#trackedByCDC` method + +[source,java] +---- + public boolean trackedByCDC() + { + return cdcEnabled; + } +---- + +It is just a getter. This flag is however set on _read path_ by +`MutationSerializer#deserialize`. At the end of that method it returns + +[source,java] +---- +return new Mutation(update.metadata().ksName, dk, modifications); +---- + +And finally, in its constructor we find: + +[source,java] +---- + protected Mutation(... params for constructor) + { + this.keyspaceName = keyspaceName; + this.key = key; + this.modifications = modifications; + for (PartitionUpdate pu : modifications.values()) + cdcEnabled |= pu.metadata().params.cdc; + } +---- + +Here we see that `cdcEnabled` flag will be `true` in case _whatever_ `PartitionUpdate` metadata has in their params `cdc` to be true. + +`PartitionUpdate#metadata` returns `CFMetaData` on deserialization, nothing wrong with that. + +Yet we clearly see that after everything is deserialized fully, that flag is still `false` ... + +=== The core of the problem + +The problems are two. The first problem is that the serialized object of a Mutation +does not contain its `TableParams` - or to better put it - `PartitionUpdate` of a +Mutation is not serialized in such a way that it would contain `cdc` flag as well. +We saw it contains only `cdIf` (uuid) and that is all. + +However, it is rather understandable that it is done like that because after a closer look, this information is not necessary. If we refresh the content of `MutationSerializer#deserialize`, it contains + +[source,java] +---- +PartitionUpdate.serializer.deserialize(in, version, flag, key); +---- + +Which in turn contains + +[source,java] +---- +CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); +---- + +Which finally calls: + +[source,java] +---- +UUID cfId = UUIDSerializer.serializer.deserialize(in, version); +CFMetaData metadata = Schema.instance.getCFMetaData(cfId); +---- + +Hence we see that all it takes to populate `PartitionUpdate` with `CFMetaData` +is to look what `cfId` we serialized and based on that id, we retrieve +metadata from `Schema`. + +The conclusion is rather clear - we have running node which serializes just fine +but we have deserialized mutations for which its retrieved `CFMetaData` contains +`cdc` flag which is `false` so its processing is skipped. + +The reason this is happening is that when Debezium starts, it will read Cassadra schema by doing this in `CassandraConnectorContext` constructor: + +[source,java] +---- +Schema.instance.loadDdlFromDisk(this.config.cassandraConfig()); +---- + +which translates to + +[source,java] +---- + public void loadDdlFromDisk(String yamlConfig) { + // other stuff ... + DatabaseDescriptor.toolInitialization(); + Schema.instance.loadFromDisk(false); + } +---- + +This is done *once when Debezium starts* and it is *not* changed. So +if you create a table after Debezium starts, Debezium just does not sees it. Same happens when that table already exists but you alter it with `cdc = true` *after Debezium started*. + +Debezium's internals are using Cassandra code internals but since Debezium is different JVM / process from Cassandra, what happens in Cassandra after Debezium is started is not visible to Debezium because +it is just completely different JVM process and if you enabled cdc in Cassandra, Debezium just does not know about it. + +However, if you restart Debezium while `cdc` is already enabled, +*it will read system keyspaces of Cassandra after it persisted these changes to disk to system SSTables* so it will just send it to Kafka fine. + +=== Proposed solution + +The only solution I see is to _reload Cassandra schema in Debezium_ +before *each* commmit log segment in `cdc` dir is scanned / parsed. + +The reason it should be done before processing each log is that you can not possibly know from outside while you are going to read it if that log contains mutations which contains partition updates which are coming from a table for which you just enabled cdc recently or not so you have to do it before read it in every case. This refreshment of schema is not performance sensitive, it just takes few milliseconds / sub seconds time. + +If we do that, on deserialization path, the deserialization of `PartitionUpdate` will populate it with `CFMetaData` which reflect the true state of it because Cassandra just contains it in its system tables. + +The reloading of schema can be done on demand, as said, Debezium has a watch on cdc dir and it does this + +[source,java] +---- +void handleEvent(WatchEvent event, Path path) { + if (isRunning()) { + // this would be added + SchemaProcessor.SchemaReinitializer.reinitialize(); + + // this stays as is + processCommitLog(path.toFile()); + } +} +---- + +The implementation of reinitializer would look like this (working example) + +[source,java] +---- +public static final class SchemaReinitializer { + public static synchronized void reinitialize() { + try { + // give it some time to flush system table to disk so we can read them again + Thread.sleep(5000); + clearWithoutAnnouncing(); + Schema.instance.loadFromDisk(false); + } + catch (final Throwable ex) { + logger.info("Error in reinitialisation method", ex); + } + } + + public static synchronized void clearWithoutAnnouncing() { + for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) { + org.apache.cassandra.schema.KeyspaceMetadata ksm = + Schema.instance.getKSMetaData(keyspaceName); + ksm.tables.forEach(view -> Schema.instance.unload(view)); + // this method does not exist in Cassandra + // ksm.views.forEach(view -> Schema.instance.unload(view)); + Schema.instance.clearKeyspaceMetadata(ksm); + } + logger.info("clearing without announce done"); + } + } +---- + +There is also `Schema.instance.clear()` but it also announces schema changes and it +invokes parts of the Cassandra codebase which are throwing exceptions when it is called outside +of Cassandra - remember we are using Cassandra code but we do not actually run Cassandra in Debezium +so it would call code we do not want. + +Fortunately, all methods we need are public so we just copy `clear` of `Schema` but we remove +last line announcing new version which is problematic. + +Once this is done, if we alter table in Cassandra, `cdc` will be parsed right so Mutation will not be +skipped from processing and Kafka messages will be sent over. + +The other solution would be to create JVM agent instead of a standalone process. By doing so, we would run Debezium in the same JVM as Cassandra runs so if Cassandra updates cdc flag on some metadata, Debezium sees it instantly. This path is yet to be implemented if we chose so, I am not sure if it is possible but in pricinple it should be. \ No newline at end of file diff --git a/pom.xml b/pom.xml index d62cf6d..b6f5599 100644 --- a/pom.xml +++ b/pom.xml @@ -119,6 +119,7 @@ org.apache.cassandra cassandra-all + 3.11.4 ch.qos.logback diff --git a/src/main/java/io/debezium/connector/cassandra/CassandraClient.java b/src/main/java/io/debezium/connector/cassandra/CassandraClient.java index b199924..aa398b8 100644 --- a/src/main/java/io/debezium/connector/cassandra/CassandraClient.java +++ b/src/main/java/io/debezium/connector/cassandra/CassandraClient.java @@ -95,6 +95,10 @@ public String getClusterName() { return cluster.getMetadata().getClusterName(); } + public Cluster getCluster() { + return cluster; + } + public boolean isQueryable() { return !cluster.isClosed() && !session.isClosed(); } diff --git a/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java b/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java index 5d96bbb..0077eb5 100644 --- a/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java +++ b/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java @@ -53,7 +53,7 @@ public CassandraConnectorContext(CassandraConnectorConfig config) throws Excepti .build(); // Setting up schema holder ... - this.schemaHolder = new SchemaHolder(this.cassandraClient, this.config.kafkaTopicPrefix(), this.config.getSourceInfoStructMaker()); + this.schemaHolder = new SchemaHolder(this.config.kafkaTopicPrefix(), this.config.getSourceInfoStructMaker()); // Setting up a file-based offset manager ... this.offsetWriter = new FileOffsetWriter(this.config.offsetBackingStoreDir()); diff --git a/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java b/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java index 17c0461..ad6ab8b 100644 --- a/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java @@ -64,6 +64,7 @@ public CommitLogProcessor(CassandraConnectorContext context) throws IOException @Override void handleEvent(WatchEvent event, Path path) { if (isRunning()) { + SchemaProcessor.SchemaReinitializer.reinitialize(); processCommitLog(path.toFile()); } } diff --git a/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java b/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java new file mode 100644 index 0000000..1b45db4 --- /dev/null +++ b/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java @@ -0,0 +1,117 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.cassandra; + +import com.datastax.driver.core.AggregateMetadata; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.FunctionMetadata; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.MaterializedViewMetadata; +import com.datastax.driver.core.SchemaChangeListener; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.UserType; + +public class NoOpSchemaChangeListener implements SchemaChangeListener { + @Override + public void onKeyspaceAdded(final KeyspaceMetadata keyspace) { + + } + + @Override + public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) { + + } + + @Override + public void onKeyspaceChanged(final KeyspaceMetadata current, final KeyspaceMetadata previous) { + + } + + @Override + public void onTableAdded(final TableMetadata table) { + + } + + @Override + public void onTableRemoved(final TableMetadata table) { + + } + + @Override + public void onTableChanged(final TableMetadata current, final TableMetadata previous) { + + } + + @Override + public void onUserTypeAdded(final UserType type) { + + } + + @Override + public void onUserTypeRemoved(final UserType type) { + + } + + @Override + public void onUserTypeChanged(final UserType current, final UserType previous) { + + } + + @Override + public void onFunctionAdded(final FunctionMetadata function) { + + } + + @Override + public void onFunctionRemoved(final FunctionMetadata function) { + + } + + @Override + public void onFunctionChanged(final FunctionMetadata current, final FunctionMetadata previous) { + + } + + @Override + public void onAggregateAdded(final AggregateMetadata aggregate) { + + } + + @Override + public void onAggregateRemoved(final AggregateMetadata aggregate) { + + } + + @Override + public void onAggregateChanged(final AggregateMetadata current, final AggregateMetadata previous) { + + } + + @Override + public void onMaterializedViewAdded(final MaterializedViewMetadata view) { + + } + + @Override + public void onMaterializedViewRemoved(final MaterializedViewMetadata view) { + + } + + @Override + public void onMaterializedViewChanged(final MaterializedViewMetadata current, final MaterializedViewMetadata previous) { + + } + + @Override + public void onRegister(final Cluster cluster) { + + } + + @Override + public void onUnregister(final Cluster cluster) { + + } +} diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java index 839e6db..89de8b1 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java @@ -5,8 +5,6 @@ */ package io.debezium.connector.cassandra; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -31,35 +29,20 @@ * by {@link SchemaProcessor} periodically. */ public class SchemaHolder { - private static final String NAMESPACE = "io.debezium.connector.cassandra"; private static final Logger LOGGER = LoggerFactory.getLogger(SchemaHolder.class); - private final Map tableToKVSchemaMap = new ConcurrentHashMap<>(); + public final Map tableToKVSchemaMap = new ConcurrentHashMap<>(); - private final CassandraClient cassandraClient; - private final String kafkaTopicPrefix; - private final SourceInfoStructMaker sourceInfoStructMaker; + public final String kafkaTopicPrefix; + public final SourceInfoStructMaker sourceInfoStructMaker; - public SchemaHolder(CassandraClient cassandraClient, String kafkaTopicPrefix, SourceInfoStructMaker sourceInfoStructMaker) { - this.cassandraClient = cassandraClient; + public SchemaHolder(String kafkaTopicPrefix, SourceInfoStructMaker sourceInfoStructMaker) { this.kafkaTopicPrefix = kafkaTopicPrefix; this.sourceInfoStructMaker = sourceInfoStructMaker; - refreshSchemas(); - } - - public void refreshSchemas() { - LOGGER.info("Refreshing schemas..."); - Map latest = getLatestTableMetadatas(); - removeDeletedTableSchemas(latest); - createOrUpdateNewTableSchemas(latest); - LOGGER.info("Schemas are refreshed"); } public KeyValueSchema getOrUpdateKeyValueSchema(KeyspaceTable kt) { - if (!tableToKVSchemaMap.containsKey(kt)) { - refreshSchema(kt); - } return tableToKVSchemaMap.getOrDefault(kt, null); } @@ -73,7 +56,7 @@ public Set getCdcEnabledTableMetadataSet() { /** * Get the schema of an inner field based on the field name * @param fieldName the name of the field in the schema - * @param schema the schema where the field resides in + * @param schema the schema where the field resides in * @return Schema */ public static Schema getFieldSchema(String fieldName, Schema schema) { @@ -83,48 +66,6 @@ public static Schema getFieldSchema(String fieldName, Schema schema) { throw new CassandraConnectorSchemaException("Only STRUCT type is supported for this method, but encountered " + schema.type()); } - private void refreshSchema(KeyspaceTable keyspaceTable) { - LOGGER.debug("Refreshing schema for {}", keyspaceTable); - TableMetadata existing = tableToKVSchemaMap.containsKey(keyspaceTable) ? tableToKVSchemaMap.get(keyspaceTable).tableMetadata() : null; - TableMetadata latest = cassandraClient.getCdcEnabledTableMetadata(keyspaceTable.keyspace, keyspaceTable.table); - if (existing != latest) { - if (existing == null) { - tableToKVSchemaMap.put(keyspaceTable, new KeyValueSchema(kafkaTopicPrefix, latest, sourceInfoStructMaker)); - LOGGER.debug("Updated schema for {}", keyspaceTable); - } - if (latest == null) { - tableToKVSchemaMap.remove(keyspaceTable); - LOGGER.debug("Removed schema for {}", keyspaceTable); - } - } - } - - private Map getLatestTableMetadatas() { - Map latest = new HashMap<>(); - for (TableMetadata tm : cassandraClient.getCdcEnabledTableMetadataList()) { - latest.put(new KeyspaceTable(tm), tm); - } - return latest; - } - - private void removeDeletedTableSchemas(Map latestTableMetadataMap) { - Set existingTables = new HashSet<>(tableToKVSchemaMap.keySet()); - Set latestTables = latestTableMetadataMap.keySet(); - existingTables.removeAll(latestTables); - tableToKVSchemaMap.keySet().removeAll(existingTables); - } - - private void createOrUpdateNewTableSchemas(Map latestTableMetadataMap) { - latestTableMetadataMap.forEach((table, metadata) -> { - TableMetadata existingTableMetadata = tableToKVSchemaMap.containsKey(table) ? tableToKVSchemaMap.get(table).tableMetadata() : null; - if (existingTableMetadata == null || !existingTableMetadata.equals(metadata)) { - KeyValueSchema keyValueSchema = new KeyValueSchema(kafkaTopicPrefix, metadata, sourceInfoStructMaker); - tableToKVSchemaMap.put(table, keyValueSchema); - LOGGER.info("Updated schema for {}.", table); - } - }); - } - public static class KeyValueSchema { private final TableMetadata tableMetadata; private final Schema keySchema; @@ -183,4 +124,4 @@ private static String getValueName(String kafkaTopicPrefix, TableMetadata tm) { return kafkaTopicPrefix + "." + tm.getKeyspace().getName() + "." + tm.getName() + ".Value"; } } -} +} \ No newline at end of file diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java index 260e73e..c1879c3 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java @@ -5,6 +5,24 @@ */ package io.debezium.connector.cassandra; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toMap; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.cassandra.config.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.SchemaChangeListener; +import com.datastax.driver.core.TableMetadata; + +import io.debezium.connector.SourceInfoStructMaker; + /** * The schema processor is responsible for periodically * refreshing the table schemas in Cassandra. Cassandra @@ -13,16 +31,101 @@ */ public class SchemaProcessor extends AbstractProcessor { + private static final Logger logger = LoggerFactory.getLogger(SchemaProcessor.class); + private static final String NAME = "Schema Processor"; private final SchemaHolder schemaHolder; + private final CassandraClient cassandraClient; + private final String kafkaTopicPrefix; + private final SourceInfoStructMaker sourceInfoStructMaker; + private final SchemaChangeListener schemaChangeListener; public SchemaProcessor(CassandraConnectorContext context) { super(NAME, context.getCassandraConnectorConfig().schemaPollInterval()); schemaHolder = context.getSchemaHolder(); + this.cassandraClient = context.getCassandraClient(); + this.kafkaTopicPrefix = schemaHolder.kafkaTopicPrefix; + this.sourceInfoStructMaker = schemaHolder.sourceInfoStructMaker; + + schemaChangeListener = new NoOpSchemaChangeListener() { + @Override + public void onTableAdded(final TableMetadata table) { + logger.info(String.format("Table %s.%s detected to be added!", table.getKeyspace().getName(), table.getName())); + schemaHolder.tableToKVSchemaMap.put(new KeyspaceTable(table), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, table, sourceInfoStructMaker)); + // } + } + + @Override + public void onTableRemoved(final TableMetadata table) { + logger.info(String.format("Table %s.%s detected to be removed!", table.getKeyspace().getName(), table.getName())); + schemaHolder.tableToKVSchemaMap.remove(new KeyspaceTable(table)); + } + + @Override + public void onTableChanged(final TableMetadata current, final TableMetadata previous) { + logger.info(String.format("Detected change in %s.%s", current.getKeyspace().getName(), current.getName())); + schemaHolder.tableToKVSchemaMap.put(new KeyspaceTable(current), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, current, sourceInfoStructMaker)); + } + }; + } + + private List getCdcEnabledTableMetadataList() { + return cassandraClient.getCluster().getMetadata().getKeyspaces().stream() + .map(KeyspaceMetadata::getTables) + .flatMap(Collection::stream) + .filter(tm -> tm.getOptions().isCDC()) + .collect(Collectors.toList()); + } + + @Override + public void initialize() { + final Map cdcTables = getCdcEnabledTableMetadataList() + .stream() + .collect(toMap(KeyspaceTable::new, + tableMetadata -> new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, tableMetadata, sourceInfoStructMaker))); + + logger.info("Adding cdc-enabled tables {}", cdcTables.keySet().stream().map(KeyspaceTable::toString).collect(joining(","))); + schemaHolder.tableToKVSchemaMap.putAll(cdcTables); + + logger.info("Registering schema change listener ..."); + cassandraClient.getCluster().register(schemaChangeListener); } @Override public void process() { - schemaHolder.refreshSchemas(); + } + + @Override + public void destroy() { + logger.info("Unregistering schema change listener ..."); + cassandraClient.getCluster().unregister(schemaChangeListener); + logger.info("Clearing cdc keyspace / table map ... "); + schemaHolder.tableToKVSchemaMap.clear(); + } + + public static final class SchemaReinitializer { + public static synchronized void reinitialize() { + try { + // give it some time to flush system table to disk so we can read them again + Thread.sleep(5000); + clearWithoutAnnouncing(); + Schema.instance.loadFromDisk(false); + } + catch (final Throwable ex) { + logger.info("Error in reinitialization method", ex); + } + } + + public static synchronized void clearWithoutAnnouncing() { + for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) { + org.apache.cassandra.schema.KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspaceName); + ksm.tables.forEach(view -> Schema.instance.unload(view)); + // this method does not exist + // ksm.views.forEach(view -> Schema.instance.unload(view)); + Schema.instance.clearKeyspaceMetadata(ksm); + } + } } } diff --git a/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java index 21276e3..60241d9 100644 --- a/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java @@ -29,11 +29,14 @@ public class CommitLogProcessorTest extends EmbeddedCassandraConnectorTestBase { private CassandraConnectorContext context; private CommitLogProcessor commitLogProcessor; + private SchemaProcessor schemaProcessor; @Before public void setUp() throws Exception { context = generateTaskContext(); commitLogProcessor = new CommitLogProcessor(context); + schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); commitLogProcessor.initialize(); } @@ -41,6 +44,7 @@ public void setUp() throws Exception { public void tearDown() throws Exception { deleteTestOffsets(context); commitLogProcessor.destroy(); + schemaProcessor.destroy(); context.cleanUp(); } @@ -48,7 +52,7 @@ public void tearDown() throws Exception { public void testProcessCommitLogs() throws Exception { int commitLogRowSize = 10; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b int, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); + // context.getSchemaHolder().refreshSchemas(); // programmatically add insertion and deletion events into commit log, this is because running an 'INSERT' or 'DELETE' // cql against the embedded Cassandra does not modify the commit log file on disk. diff --git a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java index 0d61a38..b5285c8 100644 --- a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java @@ -7,7 +7,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import org.junit.Test; @@ -19,6 +18,7 @@ public class SchemaProcessorTest extends EmbeddedCassandraConnectorTestBase { public void testProcess() throws Exception { CassandraConnectorContext context = generateTaskContext(); SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); SchemaHolder.KeyValueSchema keyValueSchema; String namespacePrefix = "io.debezium.connector.cassandra" + "." + EmbeddedCassandraConnectorTestBase.TEST_KAFKA_TOPIC_PREFIX + "." @@ -29,9 +29,13 @@ public void testProcess() throws Exception { assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("table1") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;"); + + Thread.sleep(5000); + schemaProcessor.process(); assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - assertNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); + + assertNotNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table1") + " WITH cdc = true;"); schemaProcessor.process(); @@ -42,11 +46,10 @@ public void testProcess() throws Exception { assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table1" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); schemaProcessor.process(); - assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); + assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key"; @@ -60,7 +63,6 @@ public void testProcess() throws Exception { assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table2" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table2") + " ADD c text"); schemaProcessor.process(); assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); @@ -70,7 +72,6 @@ public void testProcess() throws Exception { TableMetadata tm2 = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")).tableMetadata(); assertEquals(expectedTm1, tm1); assertEquals(expectedTm2, tm2); - deleteTestKeyspaceTables(); context.cleanUp(); } diff --git a/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java index de95acc..1a83c1d 100644 --- a/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java @@ -31,12 +31,14 @@ public class SnapshotProcessorTest extends EmbeddedCassandraConnectorTestBase { public void testSnapshotTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); int tableSize = 5; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); + // context.getSchemaHolder().refreshSchemas(); for (int i = 0; i < tableSize; i++) { context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i)); @@ -75,11 +77,13 @@ public void testSnapshotTable() throws Exception { public void testSnapshotSkipsNonCdcEnabledTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); int tableSize = 5; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("non_cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;"); - context.getSchemaHolder().refreshSchemas(); + // context.getSchemaHolder().refreshSchemas(); for (int i = 0; i < tableSize; i++) { context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("non_cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i)); } @@ -99,10 +103,12 @@ public void testSnapshotEmptyTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); AtomicBoolean globalTaskState = new AtomicBoolean(true); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); + // context.getSchemaHolder().refreshSchemas(); ChangeEventQueue queue = context.getQueue(); assertEquals(queue.totalCapacity(), queue.remainingCapacity()); From f287be28494e973609b9790e053e044dfbe24d5e Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Thu, 8 Apr 2021 05:59:58 +0200 Subject: [PATCH 02/13] removed sleep in reinitializer --- .../java/io/debezium/connector/cassandra/SchemaProcessor.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java index c1879c3..e6a2765 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java @@ -108,8 +108,6 @@ public void destroy() { public static final class SchemaReinitializer { public static synchronized void reinitialize() { try { - // give it some time to flush system table to disk so we can read them again - Thread.sleep(5000); clearWithoutAnnouncing(); Schema.instance.loadFromDisk(false); } From 8b4bb4150d997426467dae3421bb1d18cc695d5f Mon Sep 17 00:00:00 2001 From: Debezium Builder Date: Thu, 8 Apr 2021 04:13:32 -0400 Subject: [PATCH 03/13] [release] Stable parent 1.5.0.Final for release --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d62cf6d..2b8f5e3 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.debezium debezium-parent - 1.5.0-SNAPSHOT + 1.5.0.Final 4.0.0 From 800701515c1d3312f1b3dc1c0be7cd64b98daff0 Mon Sep 17 00:00:00 2001 From: Debezium Builder Date: Thu, 8 Apr 2021 04:14:19 -0400 Subject: [PATCH 04/13] [maven-release-plugin] prepare for next development iteration --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index a1f6b9b..5e48a21 100644 --- a/pom.xml +++ b/pom.xml @@ -10,14 +10,14 @@ 4.0.0 debezium-connector-cassandra Debezium Connector for Cassandra - 1.5.0.Final + 1.6.0-SNAPSHOT jar scm:git:git@github.com:debezium/debezium-connector-cassandra.git scm:git:git@github.com:debezium/debezium-connector-cassandra.git https://github.com/debezium/debezium-connector-cassandra - v1.5.0.Final + HEAD From e39c738f31dcd20d9ee3239550ed3ca399aa0e3e Mon Sep 17 00:00:00 2001 From: Debezium Builder Date: Thu, 8 Apr 2021 04:14:19 -0400 Subject: [PATCH 05/13] [maven-release-plugin] prepare release v1.5.0.Final --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 2b8f5e3..a1f6b9b 100644 --- a/pom.xml +++ b/pom.xml @@ -10,14 +10,14 @@ 4.0.0 debezium-connector-cassandra Debezium Connector for Cassandra - 1.5.0-SNAPSHOT + 1.5.0.Final jar scm:git:git@github.com:debezium/debezium-connector-cassandra.git scm:git:git@github.com:debezium/debezium-connector-cassandra.git https://github.com/debezium/debezium-connector-cassandra - HEAD + v1.5.0.Final From 47e4316d64ea7c9bb0d78907e485d54a10cf80e7 Mon Sep 17 00:00:00 2001 From: Debezium Builder Date: Thu, 8 Apr 2021 04:15:42 -0400 Subject: [PATCH 06/13] [release] New parent 1.6.0-SNAPSHOT for development --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5e48a21..eb17f9c 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.debezium debezium-parent - 1.5.0.Final + 1.6.0-SNAPSHOT 4.0.0 From 844f4c779d3aba4ab4ab7b63bb8e6c93bf0952c6 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Fri, 9 Apr 2021 12:16:12 +0200 Subject: [PATCH 07/13] rewritten cdc tracking without schema refreshment --- .../cassandra/CommitLogProcessor.java | 1 - .../cassandra/CommitLogReadHandlerImpl.java | 51 ++++++++------ .../connector/cassandra/SchemaHolder.java | 23 +++++- .../connector/cassandra/SchemaProcessor.java | 70 +++++++++---------- .../cassandra/SchemaProcessorTest.java | 4 +- 5 files changed, 88 insertions(+), 61 deletions(-) diff --git a/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java b/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java index ad6ab8b..17c0461 100644 --- a/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java @@ -64,7 +64,6 @@ public CommitLogProcessor(CassandraConnectorContext context) throws IOException @Override void handleEvent(WatchEvent event, Path path) { if (isRunning()) { - SchemaProcessor.SchemaReinitializer.reinitialize(); processCommitLog(path.toFile()); } } diff --git a/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java b/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java index f08c964..a6b2ca4 100644 --- a/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java +++ b/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java @@ -48,7 +48,7 @@ /** * Handler that implements {@link CommitLogReadHandler} interface provided by Cassandra source code. - * + *

* This handler implementation processes each {@link Mutation} and invokes one of the registered partition handler * for each {@link PartitionUpdate} in the {@link Mutation} (a mutation could have multiple partitions if it is a batch update), * which in turn makes one or more record via the {@link RecordMaker} and enqueue the record into the {@link ChangeEventQueue}. @@ -77,7 +77,7 @@ public class CommitLogReadHandlerImpl implements CommitLogReadHandler { } /** - * A PartitionType represents the type of a PartitionUpdate. + * A PartitionType represents the type of a PartitionUpdate. */ enum PartitionType { /** @@ -86,7 +86,7 @@ enum PartitionType { PARTITION_KEY_ROW_DELETION, /** - * a partition-level deletion where partition key + clustering key = primary key + * a partition-level deletion where partition key + clustering key = primary key */ PARTITION_AND_CLUSTERING_KEY_ROW_DELETION, @@ -147,7 +147,7 @@ public static boolean isPartitionDeletion(PartitionUpdate pu) { } /** - * A RowType represents different types of {@link Row}-level modifications in a Cassandra table. + * A RowType represents different types of {@link Row}-level modifications in a Cassandra table. */ enum RowType { /** @@ -214,9 +214,18 @@ public static boolean isUpdate(Row row) { } } + private boolean isTrackedByCDC(Mutation mutation) { + boolean trackedByCdc = false; + for (PartitionUpdate pu : mutation.getPartitionUpdates()) { + trackedByCdc |= schemaHolder.contains(pu.metadata().ksName, pu.metadata().cfName); + } + LOGGER.debug("Tracked by CDC: " + trackedByCdc); + return trackedByCdc; + } + @Override public void handleMutation(Mutation mutation, int size, int entryLocation, CommitLogDescriptor descriptor) { - if (!mutation.trackedByCDC()) { + if (!isTrackedByCDC(mutation)) { return; } @@ -303,14 +312,14 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace * Handle a valid deletion event resulted from a partition-level deletion by converting Cassandra representation * of this event into a {@link Record} object and queue the record to {@link ChangeEventQueue}. A valid deletion * event means a partition only has a single row, this implies there are no clustering keys. - * + *

* The steps are: - * (1) Populate the "source" field for this event - * (2) Fetch the cached key/value schemas from {@link SchemaHolder} - * (3) Populate the "after" field for this event - * a. populate partition columns - * b. populate regular columns with null values - * (4) Assemble a {@link Record} object from the populated data and queue the record + * (1) Populate the "source" field for this event + * (2) Fetch the cached key/value schemas from {@link SchemaHolder} + * (3) Populate the "after" field for this event + * a. populate partition columns + * b. populate regular columns with null values + * (4) Assemble a {@link Record} object from the populated data and queue the record */ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { @@ -349,16 +358,16 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo * Handle a valid event resulted from a row-level modification by converting Cassandra representation of * this event into a {@link Record} object and queue the record to {@link ChangeEventQueue}. A valid event * implies this must be an insert, update, or delete. - * + *

* The steps are: - * (1) Populate the "source" field for this event - * (2) Fetch the cached key/value schemas from {@link SchemaHolder} - * (3) Populate the "after" field for this event - * a. populate partition columns - * b. populate clustering columns - * c. populate regular columns - * d. for deletions, populate regular columns with null values - * (4) Assemble a {@link Record} object from the populated data and queue the record + * (1) Populate the "source" field for this event + * (2) Fetch the cached key/value schemas from {@link SchemaHolder} + * (3) Populate the "after" field for this event + * a. populate partition columns + * b. populate clustering columns + * c. populate regular columns + * d. for deletions, populate regular columns with null values + * (4) Assemble a {@link Record} object from the populated data and queue the record */ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java index 89de8b1..3e0940a 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java @@ -18,6 +18,9 @@ import com.datastax.driver.core.ColumnMetadata; import com.datastax.driver.core.TableMetadata; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.cassandra.exceptions.CassandraConnectorSchemaException; @@ -33,6 +36,7 @@ public class SchemaHolder { private static final Logger LOGGER = LoggerFactory.getLogger(SchemaHolder.class); public final Map tableToKVSchemaMap = new ConcurrentHashMap<>(); + public final Multimap keyspaceTableMap = Multimaps.synchronizedListMultimap(ArrayListMultimap.create()); public final String kafkaTopicPrefix; public final SourceInfoStructMaker sourceInfoStructMaker; @@ -42,7 +46,8 @@ public SchemaHolder(String kafkaTopicPrefix, SourceInfoStructMaker sourceInfoStr this.sourceInfoStructMaker = sourceInfoStructMaker; } - public KeyValueSchema getOrUpdateKeyValueSchema(KeyspaceTable kt) { + public synchronized KeyValueSchema getOrUpdateKeyValueSchema(KeyspaceTable kt) { + LOGGER.debug("content of tableToKVSchemaMap {}", tableToKVSchemaMap); return tableToKVSchemaMap.getOrDefault(kt, null); } @@ -53,6 +58,20 @@ public Set getCdcEnabledTableMetadataSet() { .collect(Collectors.toSet()); } + public synchronized void remove(KeyspaceTable kst) { + tableToKVSchemaMap.remove(kst); + keyspaceTableMap.remove(kst.keyspace, kst.table); + } + + public synchronized void add(KeyspaceTable kst, KeyValueSchema kvs) { + tableToKVSchemaMap.put(kst, kvs); + keyspaceTableMap.put(kst.keyspace, kst.table); + } + + public synchronized boolean contains(String keyspace, String table) { + return keyspaceTableMap.containsEntry(keyspace, table); + } + /** * Get the schema of an inner field based on the field name * @param fieldName the name of the field in the schema @@ -124,4 +143,4 @@ private static String getValueName(String kafkaTopicPrefix, TableMetadata tm) { return kafkaTopicPrefix + "." + tm.getKeyspace().getName() + "." + tm.getName() + ".Value"; } } -} \ No newline at end of file +} diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java index e6a2765..8530172 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java @@ -13,15 +13,12 @@ import java.util.Map; import java.util.stream.Collectors; -import org.apache.cassandra.config.Schema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.SchemaChangeListener; import com.datastax.driver.core.TableMetadata; - import io.debezium.connector.SourceInfoStructMaker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The schema processor is responsible for periodically @@ -48,25 +45,50 @@ public SchemaProcessor(CassandraConnectorContext context) { this.sourceInfoStructMaker = schemaHolder.sourceInfoStructMaker; schemaChangeListener = new NoOpSchemaChangeListener() { + /** + * We add only tables which are cdc enabled + * + * @param table table to be added + */ @Override public void onTableAdded(final TableMetadata table) { - logger.info(String.format("Table %s.%s detected to be added!", table.getKeyspace().getName(), table.getName())); - schemaHolder.tableToKVSchemaMap.put(new KeyspaceTable(table), - new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, table, sourceInfoStructMaker)); - // } + if (table.getOptions().isCDC()) { + logger.info(String.format("CDC-enabled table %s.%s detected to be added!", table.getKeyspace().getName(), table.getName())); + schemaHolder.add(new KeyspaceTable(table), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, table, sourceInfoStructMaker)); + } } @Override public void onTableRemoved(final TableMetadata table) { logger.info(String.format("Table %s.%s detected to be removed!", table.getKeyspace().getName(), table.getName())); - schemaHolder.tableToKVSchemaMap.remove(new KeyspaceTable(table)); + schemaHolder.remove(new KeyspaceTable(table)); } + /** + * If a table is changed, it does not mean it's cdc state have to change, there might be just changes in columns ... + * + * However if cdc is not enabled in the current table, we just need to remove it completely + * If cdc is enabled, we just add it (or effectively replace, if it is already there) + * + * @param current current view of a table schema + * @param previous previos view of a table schema + */ @Override public void onTableChanged(final TableMetadata current, final TableMetadata previous) { - logger.info(String.format("Detected change in %s.%s", current.getKeyspace().getName(), current.getName())); - schemaHolder.tableToKVSchemaMap.put(new KeyspaceTable(current), - new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, current, sourceInfoStructMaker)); + logger.info(String.format("Detected alternation in schema of %s.%s (previous cdc = %s, current cdc = %s)", + current.getKeyspace().getName(), + current.getName(), + previous.getOptions().isCDC(), + current.getOptions().isCDC())); + + if (current.getOptions().isCDC()) { + schemaHolder.add(new KeyspaceTable(current), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, current, sourceInfoStructMaker)); + } + else { + schemaHolder.remove(new KeyspaceTable(current)); + } } }; } @@ -104,26 +126,4 @@ public void destroy() { logger.info("Clearing cdc keyspace / table map ... "); schemaHolder.tableToKVSchemaMap.clear(); } - - public static final class SchemaReinitializer { - public static synchronized void reinitialize() { - try { - clearWithoutAnnouncing(); - Schema.instance.loadFromDisk(false); - } - catch (final Throwable ex) { - logger.info("Error in reinitialization method", ex); - } - } - - public static synchronized void clearWithoutAnnouncing() { - for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) { - org.apache.cassandra.schema.KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspaceName); - ksm.tables.forEach(view -> Schema.instance.unload(view)); - // this method does not exist - // ksm.views.forEach(view -> Schema.instance.unload(view)); - Schema.instance.clearKeyspaceMetadata(ksm); - } - } - } } diff --git a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java index b5285c8..20b3297 100644 --- a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java @@ -7,6 +7,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import org.junit.Test; @@ -34,8 +35,7 @@ public void testProcess() throws Exception { schemaProcessor.process(); assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - - assertNotNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); + assertNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table1") + " WITH cdc = true;"); schemaProcessor.process(); From 3ae29c1f13582f18aff806eec5e551aceb46f166 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Mon, 12 Apr 2021 09:11:11 +0200 Subject: [PATCH 08/13] fixed bug --- .../java/io/debezium/connector/cassandra/SchemaProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java index 8530172..3af0b56 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java @@ -109,7 +109,7 @@ public void initialize() { tableMetadata -> new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, tableMetadata, sourceInfoStructMaker))); logger.info("Adding cdc-enabled tables {}", cdcTables.keySet().stream().map(KeyspaceTable::toString).collect(joining(","))); - schemaHolder.tableToKVSchemaMap.putAll(cdcTables); + cdcTables.forEach(schemaHolder::add); logger.info("Registering schema change listener ..."); cassandraClient.getCluster().register(schemaChangeListener); From 14d9f2be73b7343505c5f12724c72aa84dd76fe6 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Mon, 12 Apr 2021 16:02:36 +0200 Subject: [PATCH 09/13] rewritten to build parallel in-memory schema structure from schema changes --- pom.xml | 2 +- .../cassandra/CommitLogReadHandlerImpl.java | 28 ++- .../connector/cassandra/SchemaHolder.java | 22 ++- .../connector/cassandra/SchemaProcessor.java | 165 +++++++++++++----- .../cassandra/SnapshotProcessor.java | 2 +- .../cassandra/SchemaProcessorTest.java | 13 +- 6 files changed, 158 insertions(+), 74 deletions(-) diff --git a/pom.xml b/pom.xml index a26daed..d3e88fc 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,7 @@ org.apache.cassandra cassandra-all - 3.11.4 + 3.11.4-SNAPSHOT ch.qos.logback diff --git a/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java b/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java index a6b2ca4..346929c 100644 --- a/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java +++ b/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java @@ -214,18 +214,9 @@ public static boolean isUpdate(Row row) { } } - private boolean isTrackedByCDC(Mutation mutation) { - boolean trackedByCdc = false; - for (PartitionUpdate pu : mutation.getPartitionUpdates()) { - trackedByCdc |= schemaHolder.contains(pu.metadata().ksName, pu.metadata().cfName); - } - LOGGER.debug("Tracked by CDC: " + trackedByCdc); - return trackedByCdc; - } - @Override public void handleMutation(Mutation mutation, int size, int entryLocation, CommitLogDescriptor descriptor) { - if (!isTrackedByCDC(mutation)) { + if (!mutation.trackedByCDC()) { return; } @@ -323,7 +314,11 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace */ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { - SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); + if (keyValueSchema == null) { + return; + } + Schema keySchema = keyValueSchema.keySchema(); Schema valueSchema = keyValueSchema.valueSchema(); @@ -371,14 +366,17 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo */ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { - SchemaHolder.KeyValueSchema schema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); - Schema keySchema = schema.keySchema(); - Schema valueSchema = schema.valueSchema(); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); + if (keyValueSchema == null) { + return; + } + Schema keySchema = keyValueSchema.keySchema(); + Schema valueSchema = keyValueSchema.valueSchema(); RowData after = new RowData(); populatePartitionColumns(after, pu); populateClusteringColumns(after, row, pu); - populateRegularColumns(after, row, rowType, schema); + populateRegularColumns(after, row, rowType, keyValueSchema); long ts = rowType == DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp(); diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java index 3e0940a..471693c 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java @@ -5,6 +5,9 @@ */ package io.debezium.connector.cassandra; +import static java.util.stream.Collectors.toList; + +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -46,8 +49,7 @@ public SchemaHolder(String kafkaTopicPrefix, SourceInfoStructMaker sourceInfoStr this.sourceInfoStructMaker = sourceInfoStructMaker; } - public synchronized KeyValueSchema getOrUpdateKeyValueSchema(KeyspaceTable kt) { - LOGGER.debug("content of tableToKVSchemaMap {}", tableToKVSchemaMap); + public synchronized KeyValueSchema getKeyValueSchema(KeyspaceTable kt) { return tableToKVSchemaMap.getOrDefault(kt, null); } @@ -58,12 +60,24 @@ public Set getCdcEnabledTableMetadataSet() { .collect(Collectors.toSet()); } - public synchronized void remove(KeyspaceTable kst) { + public synchronized void removeKeyspace(String keyspace) { + final List collect = tableToKVSchemaMap.keySet() + .stream() + .filter(keyValueSchema -> keyValueSchema.keyspace.equals(keyspace)) + .collect(toList()); + + collect.forEach(tableToKVSchemaMap::remove); + collect.forEach(collected -> keyspaceTableMap.removeAll(collected.keyspace)); + } + + public synchronized void removeTable(KeyspaceTable kst) { tableToKVSchemaMap.remove(kst); keyspaceTableMap.remove(kst.keyspace, kst.table); } - public synchronized void add(KeyspaceTable kst, KeyValueSchema kvs) { + // there is not "addKeyspace", it is not necessary + // as we will ever add a concrete table (with keyspace) but we will also dropping all tables when keyspace is dropped + public synchronized void addTable(KeyspaceTable kst, KeyValueSchema kvs) { tableToKVSchemaMap.put(kst, kvs); keyspaceTableMap.put(kst.keyspace, kst.table); } diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java index 3af0b56..2c22abf 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java @@ -5,7 +5,6 @@ */ package io.debezium.connector.cassandra; -import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toMap; import java.util.Collection; @@ -13,12 +12,19 @@ import java.util.Map; import java.util.stream.Collectors; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.schema.KeyspaceParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.SchemaChangeListener; import com.datastax.driver.core.TableMetadata; + import io.debezium.connector.SourceInfoStructMaker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The schema processor is responsible for periodically @@ -45,71 +51,131 @@ public SchemaProcessor(CassandraConnectorContext context) { this.sourceInfoStructMaker = schemaHolder.sourceInfoStructMaker; schemaChangeListener = new NoOpSchemaChangeListener() { - /** - * We add only tables which are cdc enabled - * - * @param table table to be added - */ + @Override + public void onKeyspaceAdded(final KeyspaceMetadata keyspace) { + Schema.instance.setKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); + Keyspace.openWithoutSSTables(keyspace.getName()); + logger.info("added keyspace {}", keyspace.asCQLQuery()); + } + + @Override + public void onKeyspaceChanged(final KeyspaceMetadata current, final KeyspaceMetadata previous) { + Schema.instance.updateKeyspace(current.getName(), KeyspaceParams.create(current.isDurableWrites(), current.getReplication())); + logger.info("updated keyspace {}", current.asCQLQuery()); + } + + @Override + public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) { + schemaHolder.removeKeyspace(keyspace.getName()); + Schema.instance.clearKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); + logger.info("removed keyspace {}", keyspace.asCQLQuery()); + } + @Override public void onTableAdded(final TableMetadata table) { - if (table.getOptions().isCDC()) { - logger.info(String.format("CDC-enabled table %s.%s detected to be added!", table.getKeyspace().getName(), table.getName())); - schemaHolder.add(new KeyspaceTable(table), - new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, table, sourceInfoStructMaker)); + logger.info(String.format("Table %s.%s detected to be added!", table.getKeyspace().getName(), table.getName())); + schemaHolder.addTable(new KeyspaceTable(table), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, table, sourceInfoStructMaker)); + + final CFMetaData rawCFMetaData = CFMetaData.compile(table.asCQLQuery(), table.getKeyspace().getName()); + // we need to copy because CFMetaData.compile will generate new cfId which would not match id of old metadata + final CFMetaData newCFMetaData = rawCFMetaData.copy(table.getId()); + + Keyspace.open(newCFMetaData.ksName).initCf(newCFMetaData, false); + + final org.apache.cassandra.schema.KeyspaceMetadata current = Schema.instance.getKSMetaData(newCFMetaData.ksName); + if (current == null) { + throw new IllegalStateException(String.format("Keyspace %s doesn't exist", newCFMetaData.ksName)); } + + if (current.tables.get(table.getName()).isPresent()) { + logger.info(String.format("table %s.%s is already added!", table.getKeyspace(), table.getName())); + return; + } + + final java.util.function.Function f = ks -> ks + .withSwapped(ks.tables.with(newCFMetaData)); + + org.apache.cassandra.schema.KeyspaceMetadata transformed = f.apply(current); + + Schema.instance.setKeyspaceMetadata(transformed); + Schema.instance.load(newCFMetaData); + + logger.info("added table {}", table.asCQLQuery()); } @Override public void onTableRemoved(final TableMetadata table) { logger.info(String.format("Table %s.%s detected to be removed!", table.getKeyspace().getName(), table.getName())); - schemaHolder.remove(new KeyspaceTable(table)); + schemaHolder.removeTable(new KeyspaceTable(table)); + + final String ksName = table.getKeyspace().getName(); + final String tableName = table.getName(); + + final org.apache.cassandra.schema.KeyspaceMetadata oldKsm = Schema.instance.getKSMetaData(table.getKeyspace().getName()); + + if (oldKsm == null) { + throw new IllegalStateException(String.format("KeyspaceMetadata for keyspace %s is not found!", table.getKeyspace().getName())); + } + + final ColumnFamilyStore cfs = Keyspace.openWithoutSSTables(ksName).getColumnFamilyStore(tableName); + + if (cfs == null) { + throw new IllegalStateException(String.format("ColumnFamilyStore for %s.%s is not found!", table.getKeyspace(), table.getName())); + } + + // make sure all the indexes are dropped, or else. + cfs.indexManager.markAllIndexesRemoved(); + + // reinitialize the keyspace. + final CFMetaData cfm = oldKsm.tables.get(tableName).get(); + final org.apache.cassandra.schema.KeyspaceMetadata newKsm = oldKsm.withSwapped(oldKsm.tables.without(tableName)); + + Schema.instance.unload(cfm); + Schema.instance.setKeyspaceMetadata(newKsm); + + logger.info("removed table {}", table.asCQLQuery()); } - /** - * If a table is changed, it does not mean it's cdc state have to change, there might be just changes in columns ... - * - * However if cdc is not enabled in the current table, we just need to remove it completely - * If cdc is enabled, we just add it (or effectively replace, if it is already there) - * - * @param current current view of a table schema - * @param previous previos view of a table schema - */ @Override - public void onTableChanged(final TableMetadata current, final TableMetadata previous) { + public void onTableChanged(final TableMetadata newTableMetadata, final TableMetadata oldTableMetaData) { logger.info(String.format("Detected alternation in schema of %s.%s (previous cdc = %s, current cdc = %s)", - current.getKeyspace().getName(), - current.getName(), - previous.getOptions().isCDC(), - current.getOptions().isCDC())); - - if (current.getOptions().isCDC()) { - schemaHolder.add(new KeyspaceTable(current), - new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, current, sourceInfoStructMaker)); - } - else { - schemaHolder.remove(new KeyspaceTable(current)); - } + newTableMetadata.getKeyspace().getName(), + newTableMetadata.getName(), + oldTableMetaData.getOptions().isCDC(), + newTableMetadata.getOptions().isCDC())); + + schemaHolder.addTable(new KeyspaceTable(newTableMetadata), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, newTableMetadata, sourceInfoStructMaker)); + + final CFMetaData rawNewMetadata = CFMetaData.compile(newTableMetadata.asCQLQuery(), + newTableMetadata.getKeyspace().getName()); + + final CFMetaData oldMetadata = Schema.instance.getCFMetaData(oldTableMetaData.getKeyspace().getName(), oldTableMetaData.getName()); + + // we need to copy because CFMetaData.compile will generate new cfId which would not match id of old metadata + final CFMetaData newMetadata = rawNewMetadata.copy(oldMetadata.cfId); + oldMetadata.apply(newMetadata); + + logger.info("changed table {}", newTableMetadata.asCQLQuery()); } }; } - private List getCdcEnabledTableMetadataList() { - return cassandraClient.getCluster().getMetadata().getKeyspaces().stream() - .map(KeyspaceMetadata::getTables) - .flatMap(Collection::stream) - .filter(tm -> tm.getOptions().isCDC()) - .collect(Collectors.toList()); - } - @Override public void initialize() { - final Map cdcTables = getCdcEnabledTableMetadataList() + final Map tables = getTableMetadata() .stream() .collect(toMap(KeyspaceTable::new, tableMetadata -> new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, tableMetadata, sourceInfoStructMaker))); - logger.info("Adding cdc-enabled tables {}", cdcTables.keySet().stream().map(KeyspaceTable::toString).collect(joining(","))); - cdcTables.forEach(schemaHolder::add); + tables.forEach(schemaHolder::addTable); logger.info("Registering schema change listener ..."); cassandraClient.getCluster().register(schemaChangeListener); @@ -126,4 +192,11 @@ public void destroy() { logger.info("Clearing cdc keyspace / table map ... "); schemaHolder.tableToKVSchemaMap.clear(); } + + private List getTableMetadata() { + return cassandraClient.getCluster().getMetadata().getKeyspaces().stream() + .map(KeyspaceMetadata::getTables) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } } diff --git a/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java b/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java index 33c76c0..e838a47 100644 --- a/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java @@ -198,7 +198,7 @@ private static BuiltStatement generateSnapshotStatement(TableMetadata tableMetad private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet) throws IOException { String tableName = tableName(tableMetadata); KeyspaceTable keyspaceTable = new KeyspaceTable(tableMetadata); - SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); Schema keySchema = keyValueSchema.keySchema(); Schema valueSchema = keyValueSchema.valueSchema(); diff --git a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java index 20b3297..0f22366 100644 --- a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java @@ -7,7 +7,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import org.junit.Test; @@ -35,12 +34,12 @@ public void testProcess() throws Exception { schemaProcessor.process(); assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - assertNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); + assertNotNull(context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table1") + " WITH cdc = true;"); schemaProcessor.process(); assertEquals(1, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); @@ -50,14 +49,14 @@ public void testProcess() throws Exception { schemaProcessor.process(); assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table1" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table2" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); @@ -68,8 +67,8 @@ public void testProcess() throws Exception { assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); TableMetadata expectedTm1 = context.getCassandraClient().getCdcEnabledTableMetadata(TEST_KEYSPACE, "table1"); TableMetadata expectedTm2 = context.getCassandraClient().getCdcEnabledTableMetadata(TEST_KEYSPACE, "table2"); - TableMetadata tm1 = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")).tableMetadata(); - TableMetadata tm2 = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")).tableMetadata(); + TableMetadata tm1 = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")).tableMetadata(); + TableMetadata tm2 = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")).tableMetadata(); assertEquals(expectedTm1, tm1); assertEquals(expectedTm2, tm2); deleteTestKeyspaceTables(); From 5083434e8d5e39d56d48181592e99a92dfddc319 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Thu, 1 Apr 2021 11:23:36 +0200 Subject: [PATCH 10/13] rewritten to build parallel in-memory schema structure from schema changes --- REPORT.adoc | 368 ++++++++++++++++++ REPORT_2.adoc | 152 ++++++++ pom.xml | 1 + .../connector/cassandra/CassandraClient.java | 4 + .../cassandra/CassandraConnectorContext.java | 2 +- .../cassandra/CommitLogReadHandlerImpl.java | 57 +-- .../cassandra/NoOpSchemaChangeListener.java | 117 ++++++ .../connector/cassandra/SchemaHolder.java | 104 ++--- .../connector/cassandra/SchemaProcessor.java | 176 ++++++++- .../cassandra/SnapshotProcessor.java | 2 +- .../cassandra/CommitLogProcessorTest.java | 6 +- .../cassandra/SchemaProcessorTest.java | 22 +- .../cassandra/SnapshotProcessorTest.java | 9 +- 13 files changed, 912 insertions(+), 108 deletions(-) create mode 100644 REPORT.adoc create mode 100644 REPORT_2.adoc create mode 100644 src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java diff --git a/REPORT.adoc b/REPORT.adoc new file mode 100644 index 0000000..3cb6f4f --- /dev/null +++ b/REPORT.adoc @@ -0,0 +1,368 @@ +== Debezium integration with Cassandra report - part 1 + +_by Stefan Miklosovic / stefan dot miklosovic at instaclustr dot com_ + +=== Nature of the problem + +The problem is rather straightforward however a reader / Cassandra user can be confused by seemingly +almost random behaviour of Debezium which brings even more complexity to trying to understand what is going on. + +All problems observed are a variation of this scenario and reducing everything to this problem and +trying to solve it will resolve all errorneous behavior. + +* I have `cdc_enabled: true` in `cassandra.yaml` +* I have a table `k1.t1` +* I have cdc functionality enabled on `k1.t1` by specifying `cdc = true` in `CREATE` statement +or by `ALTER`-ing it afterwards if that is not the case. + +The important point here to make is that it is required to have Debezium running _before_ this +all happens. In other words: + +* I start Cassandra without `k1.t1` +* I start Debezium +* After points above, I create table as described above. + +The result of this procedure is that a cdc-enabled table will indeed place logs into cdc-raw directory +to be picked up by Debezium, but Debezium will not react - it just deletes these logs and nothing +is eventually sent to a respective Kafka topic. + +What is even worse is that these commit logs in cdc directory are lost for ever as Debezium will delete them (by default, +there is a way how to _not_ delete them by implementing own `CommitLogTransfer` but by default +a commit log transfer implementation will just delete these logs for good). + +What is even more strange is that the other table on which a `cdc` is enabled is just pushing these +messages to a topic fine so the fact that the other table does not produce them is mind-boggling. + +To fully understand why this is happening, we need to explain what a write and read path looks like. + +=== On Mutation and CommitLog + +The building block of any change in Cassandra is `Mutation`. A mutation contains `PartitionUpdate`-s. +If `Mutation` happens to consist of one "change" only, then this `Mutation` happens to contain +just one `PartitionUpdate`. + +Cassandra appends `Mutation` objects as they come at the end of _commit log segment_. A commit log segment +is each individual file in commitlog directory. + +If CDC is enabled, once a commit log is "full" (for the lack of better words and for brevity), it is +just copied over to `cdc` dir. + +Debezium has a watcher on cdc dir and as soon as a commit log is copied there, Debezim detects it, +it will read whole log, reconstructs all `Mutation`-s, creates messages and sends them to Kafka. + +Hence we clearly see that this process has two parts: + +1. *serialize* `Mutation`-s from Java objects to binary blob and store it to a commit log segment +2. parse a commit log segment and *deserialize* all `Mutation`-s into Java objects and create Kafka messages + +=== Write path and serialisation + +A mutation is serialized by its `MutationSerializer`. It is not too much important where this +serializer is called from. + +[source,java] +---- + public static class MutationSerializer implements IVersionedSerializer + { + public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException + { + /// other code hidden for brevity + + int size = mutation.modifications.size(); + + if (version < MessagingService.VERSION_30) + { + ByteBufferUtil.writeWithShortLength(mutation.key().getKey(), out); + out.writeInt(size); + } + else + { + out.writeUnsignedVInt(size); + } + + assert size > 0; + for (Map.Entry entry : mutation.modifications.entrySet()) + PartitionUpdate.serializer.serialize(entry.getValue(), out, version); + } +---- + +Here we see that in order to serialize a `Mutation`, we need to iterate over all _modifications_ +a.k.a `PartitionUpdate`-s and we need to serialise these. The serialisation of a Mutation +is equal to the serialisation of all its modifications. + +Let's take a look into `PartitionUpdate` serialisation: + +[source,java] +---- + public static class PartitionUpdateSerializer + { + public void serialize(PartitionUpdate update, DataOutputPlus out, int version) throws IOException + { + try (UnfilteredRowIterator iter = update.unfilteredIterator()) + { + // other code hidden for brevity + // serialisation of metadata + CFMetaData.serializer.serialize(update.metadata(), out, version); + // serialisation of partition itself + UnfilteredRowIteratorSerializer.serializer.serialize(iter, + null, out, version, update.rowCount()); + } + } +---- + +`updata.metadata()` above returns `CFMetaData`. `CFMetaData` has a field `params` of +type `TableParams` and this object has a flag called `cdc` which is a boolean saying +if that particular `PartitionUpdate` relates to a table which is _cdc-enabled_. + +From above we see that `PartitionUpdateSerializer` serializes not only partition themselves +but it also serializes its metadata via `CFMetaDataSerializer`. Let's take a look into it: + +[source,java] +---- + public static class Serializer + { + public void serialize(CFMetaData metadata, + DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(metadata.cfId, out, version); + } +---- + +Here, the very important fact is that the serialization of `CFMetaData` in Cassandra terms +means that _it will take UUID of a table that PartitionUpdate is from and it will serialize it_. +*Just that UUID*. + +=== Read path and deserialisation + +Having write path covered, let's look how read path is treated. Read path is a different problem - +we parse binary commit log and we try to construct all `Mutation`-s from it. + +*It is important to realise that this is happening in Debezium context, not in Cassandra.* + +Debezium is using Cassandra's class `CommitLogReader` by which a commit log reading is conducted so +Debezium is not using anything custom but built-in Cassandra functionality. + +Cassandra's `CommitLogReader` is used in Debezium's `CommitLogProcessor`. It just scans new commit logs +as they are copied to cdc dir and it will use `CommitLogReader` in its `process` method. + +`CommitLogReader` is reading commit logs via method `readCommitLogSegment` which accepts `CommitLogReadHandler`. +Commit log handler is the custom implementation of Debezium to actually hook there its functionality to process +mutations as they come from reading a commit log segment. + +For the completeness, the chain of method calls to the place where handler is ultimately called is like + +1. `CommitLogReader#readCommitLogSegment` +2. in method from 1) there is call to private `CommitLogReader#readSection`, a commit log is not read all at once but it is read by chunks - _sections_. +3. in 2) we pass our handler to `CommitLogReader#readMutation` + +At the beginning of 3) we *deserialize* buffer into a mutation. + +[source,java] +---- + try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) + { + mutation = Mutation.serializer.deserialize(bufIn, + desc.getMessagingVersion(), + SerializationHelper.Flag.LOCAL); +---- + +Finally, at the very bottom we see the handling of just deserialized `Mutation` by our +custom handler. + +[source,java] +---- +handler.handleMutation(mutation, size, entryLocation, desc); +---- + +The implementation of this handler in Debezium looks like this: + +[source,java] +---- + @Override + public void handleMutation(Mutation mutation, + int size, + int entryLocation, + CommitLogDescriptor descriptor) { + if (!mutation.trackedByCDC()) { + return; + } + + // other code + // here Mutation is eventually transformed to a Kafka message and sent +---- + +This is crucial. The problem is that *a mutation is not tracked by cdc* (empirically verified by putting heavy logging at all the places). + +In other words: we have verified that Cassandra serialized data as it is supposed to do but for some reason, its mutation which was previously marked as _cdc-enabled_ +is not deserialized in such a way that `trackedByCDC` would be `true` so that method +would not return immediately (hence nothing is sent to Kafka). + +Let's see the logic behind `Mutation#trackedByCDC` method + +[source,java] +---- + public boolean trackedByCDC() + { + return cdcEnabled; + } +---- + +It is just a getter. This flag is however set on _read path_ by +`MutationSerializer#deserialize`. At the end of that method it returns + +[source,java] +---- +return new Mutation(update.metadata().ksName, dk, modifications); +---- + +And finally, in its constructor we find: + +[source,java] +---- + protected Mutation(... params for constructor) + { + this.keyspaceName = keyspaceName; + this.key = key; + this.modifications = modifications; + for (PartitionUpdate pu : modifications.values()) + cdcEnabled |= pu.metadata().params.cdc; + } +---- + +Here we see that `cdcEnabled` flag will be `true` in case _whatever_ `PartitionUpdate` metadata has in their params `cdc` to be true. + +`PartitionUpdate#metadata` returns `CFMetaData` on deserialization, nothing wrong with that. + +Yet we clearly see that after everything is deserialized fully, that flag is still `false` ... + +=== The core of the problem + +The problems are two. The first problem is that the serialized object of a Mutation +does not contain its `TableParams` - or to better put it - `PartitionUpdate` of a +Mutation is not serialized in such a way that it would contain `cdc` flag as well. +We saw it contains only `cdIf` (uuid) and that is all. + +However, it is rather understandable that it is done like that because after a closer look, this information is not necessary. If we refresh the content of `MutationSerializer#deserialize`, it contains + +[source,java] +---- +PartitionUpdate.serializer.deserialize(in, version, flag, key); +---- + +Which in turn contains + +[source,java] +---- +CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); +---- + +Which finally calls: + +[source,java] +---- +UUID cfId = UUIDSerializer.serializer.deserialize(in, version); +CFMetaData metadata = Schema.instance.getCFMetaData(cfId); +---- + +Hence we see that all it takes to populate `PartitionUpdate` with `CFMetaData` +is to look what `cfId` we serialized and based on that id, we retrieve +metadata from `Schema`. + +The conclusion is rather clear - we have running node which serializes just fine +but we have deserialized mutations for which its retrieved `CFMetaData` contains +`cdc` flag which is `false` so its processing is skipped. + +The reason this is happening is that when Debezium starts, it will read Cassadra schema by doing this in `CassandraConnectorContext` constructor: + +[source,java] +---- +Schema.instance.loadDdlFromDisk(this.config.cassandraConfig()); +---- + +which translates to + +[source,java] +---- + public void loadDdlFromDisk(String yamlConfig) { + // other stuff ... + DatabaseDescriptor.toolInitialization(); + Schema.instance.loadFromDisk(false); + } +---- + +This is done *once when Debezium starts* and it is *not* changed. So +if you create a table after Debezium starts, Debezium just does not sees it. Same happens when that table already exists but you alter it with `cdc = true` *after Debezium started*. + +Debezium's internals are using Cassandra code internals but since Debezium is different JVM / process from Cassandra, what happens in Cassandra after Debezium is started is not visible to Debezium because +it is just completely different JVM process and if you enabled cdc in Cassandra, Debezium just does not know about it. + +However, if you restart Debezium while `cdc` is already enabled, +*it will read system keyspaces of Cassandra after it persisted these changes to disk to system SSTables* so it will just send it to Kafka fine. + +=== Proposed solution + +The only solution I see is to _reload Cassandra schema in Debezium_ +before *each* commmit log segment in `cdc` dir is scanned / parsed. + +The reason it should be done before processing each log is that you can not possibly know from outside while you are going to read it if that log contains mutations which contains partition updates which are coming from a table for which you just enabled cdc recently or not so you have to do it before read it in every case. This refreshment of schema is not performance sensitive, it just takes few milliseconds / sub seconds time. + +If we do that, on deserialization path, the deserialization of `PartitionUpdate` will populate it with `CFMetaData` which reflect the true state of it because Cassandra just contains it in its system tables. + +The reloading of schema can be done on demand, as said, Debezium has a watch on cdc dir and it does this + +[source,java] +---- +void handleEvent(WatchEvent event, Path path) { + if (isRunning()) { + // this would be added + SchemaProcessor.SchemaReinitializer.reinitialize(); + + // this stays as is + processCommitLog(path.toFile()); + } +} +---- + +The implementation of reinitializer would look like this (working example) + +[source,java] +---- +public static final class SchemaReinitializer { + public static synchronized void reinitialize() { + try { + // give it some time to flush system table to disk so we can read them again + Thread.sleep(5000); + clearWithoutAnnouncing(); + Schema.instance.loadFromDisk(false); + } + catch (final Throwable ex) { + logger.info("Error in reinitialisation method", ex); + } + } + + public static synchronized void clearWithoutAnnouncing() { + for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) { + org.apache.cassandra.schema.KeyspaceMetadata ksm = + Schema.instance.getKSMetaData(keyspaceName); + ksm.tables.forEach(view -> Schema.instance.unload(view)); + // this method does not exist in Cassandra + // ksm.views.forEach(view -> Schema.instance.unload(view)); + Schema.instance.clearKeyspaceMetadata(ksm); + } + logger.info("clearing without announce done"); + } + } +---- + +There is also `Schema.instance.clear()` but it also announces schema changes and it +invokes parts of the Cassandra codebase which are throwing exceptions when it is called outside +of Cassandra - remember we are using Cassandra code but we do not actually run Cassandra in Debezium +so it would call code we do not want. + +Fortunately, all methods we need are public so we just copy `clear` of `Schema` but we remove +last line announcing new version which is problematic. + +Once this is done, if we alter table in Cassandra, `cdc` will be parsed right so Mutation will not be +skipped from processing and Kafka messages will be sent over. + +The other solution would be to create JVM agent instead of a standalone process. By doing so, we would run Debezium in the same JVM as Cassandra runs so if Cassandra updates cdc flag on some metadata, Debezium sees it instantly. This path is yet to be implemented if we chose so, I am not sure if it is possible but in pricinple it should be. \ No newline at end of file diff --git a/REPORT_2.adoc b/REPORT_2.adoc new file mode 100644 index 0000000..0414828 --- /dev/null +++ b/REPORT_2.adoc @@ -0,0 +1,152 @@ +== Debezium integration with Cassandra report - part 2 + +_by Stefan Miklosovic / stefan dot miklosovic at instaclustr dot com_ + +=== Introduction + +In this document, we will retrospect what we have actually done in the part 1 and +we will build on that to improve the solution. + +=== Nature of the problem + +The problem with solution 1 is that it is not deterministic. The reason for +non-determinism seems to be the fact that it is more or less unpredictable when +Cassandra flushes / persists changes, e.g in `cdc = true / false` to disk so our +refresh of schema will pick these changes up. + +Hence we might see the following: + +1) A table is created with cdc = true +2) We detect the change in schema listener of driver and we refresh the schema in Debezium via mechanics of Cassandra +3) We think that our internal structure is indeed refreshed but our `cdc` is still false. + +This means that even Cassandra process is internally aware of the fact that we have +enabled cdc on a particular table, Debezium does not reflect this because +sometimes changes are flushed / persisted just fine but sometimes it takes time +to propagate these changes and it might be too late for Debezium as listener was already invoked. + +=== Possible solutions + +The are two solutions in general to this problem: + +1) Faking what Cassandra does in Debezium to have same data structures too. + +This is rather delicate operation / topic to deal with but it is possible and we chose to go with +this solution for a time being. + +It merges two main concepts: + +a) Debezium is informed about schema changes via provided schema change listener registered on driver +b) once a respective method on a listner is invoked, we mock same code what Cassandra would invoke but +in such a way that the parts which would be errorneous (because Debezium just does not run Cassandra) are +skipped. + +By doing b), we are internally holding a logical copy of what the real Cassandra is holding and we are +synchronizing Cassandra internal structures (keyspaces, tables ...) by registering +schema change listener and applying same changes to "Cassandra" in Debezium process. + +Lets go through the core of this logic, starting with "onKeyspaceAdded": + +[source,java] +---- +schemaChangeListener = new NoOpSchemaChangeListener() { + @Override + public void onKeyspaceAdded(final KeyspaceMetadata keyspace) { + Schema.instance.setKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); + Keyspace.openWithoutSSTables(keyspace.getName()); + logger.info("added keyspace {}", keyspace.asCQLQuery()); + } +---- + +Here we fake that we opened a keyspace. This will populate some internal structures of Cassandra and so on so +our hot Cassandra code in Debezium "knows" what keyspace was added and so on. + +On a keyspace's update, we do: + +[source,java] +---- +@Override +public void onKeyspaceChanged(KeyspaceMetadata current, + KeyspaceMetadata previous) { + Schema.instance.updateKeyspace(current.getName(), + KeyspaceParams.create(current.isDurableWrites(), + current.getReplication())); +} +---- + +When a keyspace is removed, we do: + +[source,java] +---- +@Override +public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) { + schemaHolder.removeKeyspace(keyspace.getName()); + // here KeyspaceMetadata are of Cassandra, not driver's as in method argument + Schema.instance.clearKeyspaceMetadata(KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); +} +---- + +We are removing a keyspace from our schema holder too. Think about it, if we removed whole keyspace +by "DROP KEYSPACE abc", all tables are removed too so we just get rid of all tables of that keyspace +in our schema holder as well. + +We left last three methods of a listener - onTableAdded, onTableChanged and onTableRemoved +for a reader to go through. The code you see is more or less what Cassandra does internally but +it is refactored in such a way that parts with are not needed (nor desired to be done) are just skipped. + +Please follow this https://github.com/instaclustr/debezium-connector-cassandra/blob/dd2/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java#L81-L168[link]. + +Once we put into into the action, metadata will be populated right with `cdc` flag on TableParams and so on. +`Mutation` will be as well serialised properly because it will reach into ColumnFamily's metadata which +has `cdc = true` because we were notified about this change in a listener and we updated +that table in Cassandra code so the following deserialisation of a Mutation where this code +is called will not throw: + +[source,java] +---- +public static class Serializer +{ + public void serialize(CFMetaData metadata, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(metadata.cfId, out, version); + } + + public CFMetaData deserialize(DataInputPlus in, int version) throws IOException + { + UUID cfId = UUIDSerializer.serializer.deserialize(in, version); + CFMetaData metadata = Schema.instance.getCFMetaData(cfId); + if (metadata == null) + { + String message = String.format("Couldn't find table for cfId %s. If a table was just " + + "created, this is likely due to the schema not being fully propagated. Please wait for schema " + + "agreement on table creation.", cfId); + throw new UnknownColumnFamilyException(message, cfId); + } + + return metadata; + } +---- + +Keep in mind that we are not "initialising" Cassandra by any way, when Debezium starts, +internals of Cassandra will already read tables on the disk and so on so it will be fully running +but we will never be notified about what happens afterwards (that cdc was changed from false to true, for example). For +that reason there is a schema change listener which synchronizes it. We might be notified about that via listener, that is true, +but schema refreshment does not always help and we would end up being notified about changes but we would not have any +way to make these changes visible to Cassandra internal's code - only invoking core Cassandra structures and emulating +we are running it in a proper Cassandra node will make deserialisation of a mutation possible because previously our +cdc flag was always false (was not updating) so the handling of such mutation was effectively skipped. + +The second solution consists of making an agent from Debezium - this means that it will see same data structures as Cassandra, +by definition. The problem with this solution we see is that it is rather tricky to do because +Debezium would suddenly start to have same lifecycle as Cassandra (or the other way around - Cassandra +would have same lifecycle as Debezium) - as they are inherently connected together. + +Another problem we see is that the dependencies which Debezium uses are not compatible with +what Cassandra uses and it would be just not possible to "merge it". By merely checking, +the probability this would be the case is quite high, there is Cassandra connector of diff --git a/pom.xml b/pom.xml index eb17f9c..f23f573 100644 --- a/pom.xml +++ b/pom.xml @@ -119,6 +119,7 @@ org.apache.cassandra cassandra-all + 3.11.4 ch.qos.logback diff --git a/src/main/java/io/debezium/connector/cassandra/CassandraClient.java b/src/main/java/io/debezium/connector/cassandra/CassandraClient.java index b199924..aa398b8 100644 --- a/src/main/java/io/debezium/connector/cassandra/CassandraClient.java +++ b/src/main/java/io/debezium/connector/cassandra/CassandraClient.java @@ -95,6 +95,10 @@ public String getClusterName() { return cluster.getMetadata().getClusterName(); } + public Cluster getCluster() { + return cluster; + } + public boolean isQueryable() { return !cluster.isClosed() && !session.isClosed(); } diff --git a/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java b/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java index 5d96bbb..0077eb5 100644 --- a/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java +++ b/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java @@ -53,7 +53,7 @@ public CassandraConnectorContext(CassandraConnectorConfig config) throws Excepti .build(); // Setting up schema holder ... - this.schemaHolder = new SchemaHolder(this.cassandraClient, this.config.kafkaTopicPrefix(), this.config.getSourceInfoStructMaker()); + this.schemaHolder = new SchemaHolder(this.config.kafkaTopicPrefix(), this.config.getSourceInfoStructMaker()); // Setting up a file-based offset manager ... this.offsetWriter = new FileOffsetWriter(this.config.offsetBackingStoreDir()); diff --git a/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java b/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java index f08c964..346929c 100644 --- a/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java +++ b/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java @@ -48,7 +48,7 @@ /** * Handler that implements {@link CommitLogReadHandler} interface provided by Cassandra source code. - * + *

* This handler implementation processes each {@link Mutation} and invokes one of the registered partition handler * for each {@link PartitionUpdate} in the {@link Mutation} (a mutation could have multiple partitions if it is a batch update), * which in turn makes one or more record via the {@link RecordMaker} and enqueue the record into the {@link ChangeEventQueue}. @@ -77,7 +77,7 @@ public class CommitLogReadHandlerImpl implements CommitLogReadHandler { } /** - * A PartitionType represents the type of a PartitionUpdate. + * A PartitionType represents the type of a PartitionUpdate. */ enum PartitionType { /** @@ -86,7 +86,7 @@ enum PartitionType { PARTITION_KEY_ROW_DELETION, /** - * a partition-level deletion where partition key + clustering key = primary key + * a partition-level deletion where partition key + clustering key = primary key */ PARTITION_AND_CLUSTERING_KEY_ROW_DELETION, @@ -147,7 +147,7 @@ public static boolean isPartitionDeletion(PartitionUpdate pu) { } /** - * A RowType represents different types of {@link Row}-level modifications in a Cassandra table. + * A RowType represents different types of {@link Row}-level modifications in a Cassandra table. */ enum RowType { /** @@ -303,18 +303,22 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace * Handle a valid deletion event resulted from a partition-level deletion by converting Cassandra representation * of this event into a {@link Record} object and queue the record to {@link ChangeEventQueue}. A valid deletion * event means a partition only has a single row, this implies there are no clustering keys. - * + *

* The steps are: - * (1) Populate the "source" field for this event - * (2) Fetch the cached key/value schemas from {@link SchemaHolder} - * (3) Populate the "after" field for this event - * a. populate partition columns - * b. populate regular columns with null values - * (4) Assemble a {@link Record} object from the populated data and queue the record + * (1) Populate the "source" field for this event + * (2) Fetch the cached key/value schemas from {@link SchemaHolder} + * (3) Populate the "after" field for this event + * a. populate partition columns + * b. populate regular columns with null values + * (4) Assemble a {@link Record} object from the populated data and queue the record */ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { - SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); + if (keyValueSchema == null) { + return; + } + Schema keySchema = keyValueSchema.keySchema(); Schema valueSchema = keyValueSchema.valueSchema(); @@ -349,27 +353,30 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo * Handle a valid event resulted from a row-level modification by converting Cassandra representation of * this event into a {@link Record} object and queue the record to {@link ChangeEventQueue}. A valid event * implies this must be an insert, update, or delete. - * + *

* The steps are: - * (1) Populate the "source" field for this event - * (2) Fetch the cached key/value schemas from {@link SchemaHolder} - * (3) Populate the "after" field for this event - * a. populate partition columns - * b. populate clustering columns - * c. populate regular columns - * d. for deletions, populate regular columns with null values - * (4) Assemble a {@link Record} object from the populated data and queue the record + * (1) Populate the "source" field for this event + * (2) Fetch the cached key/value schemas from {@link SchemaHolder} + * (3) Populate the "after" field for this event + * a. populate partition columns + * b. populate clustering columns + * c. populate regular columns + * d. for deletions, populate regular columns with null values + * (4) Assemble a {@link Record} object from the populated data and queue the record */ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { - SchemaHolder.KeyValueSchema schema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); - Schema keySchema = schema.keySchema(); - Schema valueSchema = schema.valueSchema(); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); + if (keyValueSchema == null) { + return; + } + Schema keySchema = keyValueSchema.keySchema(); + Schema valueSchema = keyValueSchema.valueSchema(); RowData after = new RowData(); populatePartitionColumns(after, pu); populateClusteringColumns(after, row, pu); - populateRegularColumns(after, row, rowType, schema); + populateRegularColumns(after, row, rowType, keyValueSchema); long ts = rowType == DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp(); diff --git a/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java b/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java new file mode 100644 index 0000000..1b45db4 --- /dev/null +++ b/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java @@ -0,0 +1,117 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.cassandra; + +import com.datastax.driver.core.AggregateMetadata; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.FunctionMetadata; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.MaterializedViewMetadata; +import com.datastax.driver.core.SchemaChangeListener; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.UserType; + +public class NoOpSchemaChangeListener implements SchemaChangeListener { + @Override + public void onKeyspaceAdded(final KeyspaceMetadata keyspace) { + + } + + @Override + public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) { + + } + + @Override + public void onKeyspaceChanged(final KeyspaceMetadata current, final KeyspaceMetadata previous) { + + } + + @Override + public void onTableAdded(final TableMetadata table) { + + } + + @Override + public void onTableRemoved(final TableMetadata table) { + + } + + @Override + public void onTableChanged(final TableMetadata current, final TableMetadata previous) { + + } + + @Override + public void onUserTypeAdded(final UserType type) { + + } + + @Override + public void onUserTypeRemoved(final UserType type) { + + } + + @Override + public void onUserTypeChanged(final UserType current, final UserType previous) { + + } + + @Override + public void onFunctionAdded(final FunctionMetadata function) { + + } + + @Override + public void onFunctionRemoved(final FunctionMetadata function) { + + } + + @Override + public void onFunctionChanged(final FunctionMetadata current, final FunctionMetadata previous) { + + } + + @Override + public void onAggregateAdded(final AggregateMetadata aggregate) { + + } + + @Override + public void onAggregateRemoved(final AggregateMetadata aggregate) { + + } + + @Override + public void onAggregateChanged(final AggregateMetadata current, final AggregateMetadata previous) { + + } + + @Override + public void onMaterializedViewAdded(final MaterializedViewMetadata view) { + + } + + @Override + public void onMaterializedViewRemoved(final MaterializedViewMetadata view) { + + } + + @Override + public void onMaterializedViewChanged(final MaterializedViewMetadata current, final MaterializedViewMetadata previous) { + + } + + @Override + public void onRegister(final Cluster cluster) { + + } + + @Override + public void onUnregister(final Cluster cluster) { + + } +} diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java index 839e6db..471693c 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java @@ -5,8 +5,9 @@ */ package io.debezium.connector.cassandra; -import java.util.HashMap; -import java.util.HashSet; +import static java.util.stream.Collectors.toList; + +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -20,6 +21,9 @@ import com.datastax.driver.core.ColumnMetadata; import com.datastax.driver.core.TableMetadata; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.cassandra.exceptions.CassandraConnectorSchemaException; @@ -31,35 +35,21 @@ * by {@link SchemaProcessor} periodically. */ public class SchemaHolder { - private static final String NAMESPACE = "io.debezium.connector.cassandra"; private static final Logger LOGGER = LoggerFactory.getLogger(SchemaHolder.class); - private final Map tableToKVSchemaMap = new ConcurrentHashMap<>(); + public final Map tableToKVSchemaMap = new ConcurrentHashMap<>(); + public final Multimap keyspaceTableMap = Multimaps.synchronizedListMultimap(ArrayListMultimap.create()); - private final CassandraClient cassandraClient; - private final String kafkaTopicPrefix; - private final SourceInfoStructMaker sourceInfoStructMaker; + public final String kafkaTopicPrefix; + public final SourceInfoStructMaker sourceInfoStructMaker; - public SchemaHolder(CassandraClient cassandraClient, String kafkaTopicPrefix, SourceInfoStructMaker sourceInfoStructMaker) { - this.cassandraClient = cassandraClient; + public SchemaHolder(String kafkaTopicPrefix, SourceInfoStructMaker sourceInfoStructMaker) { this.kafkaTopicPrefix = kafkaTopicPrefix; this.sourceInfoStructMaker = sourceInfoStructMaker; - refreshSchemas(); - } - - public void refreshSchemas() { - LOGGER.info("Refreshing schemas..."); - Map latest = getLatestTableMetadatas(); - removeDeletedTableSchemas(latest); - createOrUpdateNewTableSchemas(latest); - LOGGER.info("Schemas are refreshed"); } - public KeyValueSchema getOrUpdateKeyValueSchema(KeyspaceTable kt) { - if (!tableToKVSchemaMap.containsKey(kt)) { - refreshSchema(kt); - } + public synchronized KeyValueSchema getKeyValueSchema(KeyspaceTable kt) { return tableToKVSchemaMap.getOrDefault(kt, null); } @@ -70,10 +60,36 @@ public Set getCdcEnabledTableMetadataSet() { .collect(Collectors.toSet()); } + public synchronized void removeKeyspace(String keyspace) { + final List collect = tableToKVSchemaMap.keySet() + .stream() + .filter(keyValueSchema -> keyValueSchema.keyspace.equals(keyspace)) + .collect(toList()); + + collect.forEach(tableToKVSchemaMap::remove); + collect.forEach(collected -> keyspaceTableMap.removeAll(collected.keyspace)); + } + + public synchronized void removeTable(KeyspaceTable kst) { + tableToKVSchemaMap.remove(kst); + keyspaceTableMap.remove(kst.keyspace, kst.table); + } + + // there is not "addKeyspace", it is not necessary + // as we will ever add a concrete table (with keyspace) but we will also dropping all tables when keyspace is dropped + public synchronized void addTable(KeyspaceTable kst, KeyValueSchema kvs) { + tableToKVSchemaMap.put(kst, kvs); + keyspaceTableMap.put(kst.keyspace, kst.table); + } + + public synchronized boolean contains(String keyspace, String table) { + return keyspaceTableMap.containsEntry(keyspace, table); + } + /** * Get the schema of an inner field based on the field name * @param fieldName the name of the field in the schema - * @param schema the schema where the field resides in + * @param schema the schema where the field resides in * @return Schema */ public static Schema getFieldSchema(String fieldName, Schema schema) { @@ -83,48 +99,6 @@ public static Schema getFieldSchema(String fieldName, Schema schema) { throw new CassandraConnectorSchemaException("Only STRUCT type is supported for this method, but encountered " + schema.type()); } - private void refreshSchema(KeyspaceTable keyspaceTable) { - LOGGER.debug("Refreshing schema for {}", keyspaceTable); - TableMetadata existing = tableToKVSchemaMap.containsKey(keyspaceTable) ? tableToKVSchemaMap.get(keyspaceTable).tableMetadata() : null; - TableMetadata latest = cassandraClient.getCdcEnabledTableMetadata(keyspaceTable.keyspace, keyspaceTable.table); - if (existing != latest) { - if (existing == null) { - tableToKVSchemaMap.put(keyspaceTable, new KeyValueSchema(kafkaTopicPrefix, latest, sourceInfoStructMaker)); - LOGGER.debug("Updated schema for {}", keyspaceTable); - } - if (latest == null) { - tableToKVSchemaMap.remove(keyspaceTable); - LOGGER.debug("Removed schema for {}", keyspaceTable); - } - } - } - - private Map getLatestTableMetadatas() { - Map latest = new HashMap<>(); - for (TableMetadata tm : cassandraClient.getCdcEnabledTableMetadataList()) { - latest.put(new KeyspaceTable(tm), tm); - } - return latest; - } - - private void removeDeletedTableSchemas(Map latestTableMetadataMap) { - Set existingTables = new HashSet<>(tableToKVSchemaMap.keySet()); - Set latestTables = latestTableMetadataMap.keySet(); - existingTables.removeAll(latestTables); - tableToKVSchemaMap.keySet().removeAll(existingTables); - } - - private void createOrUpdateNewTableSchemas(Map latestTableMetadataMap) { - latestTableMetadataMap.forEach((table, metadata) -> { - TableMetadata existingTableMetadata = tableToKVSchemaMap.containsKey(table) ? tableToKVSchemaMap.get(table).tableMetadata() : null; - if (existingTableMetadata == null || !existingTableMetadata.equals(metadata)) { - KeyValueSchema keyValueSchema = new KeyValueSchema(kafkaTopicPrefix, metadata, sourceInfoStructMaker); - tableToKVSchemaMap.put(table, keyValueSchema); - LOGGER.info("Updated schema for {}.", table); - } - }); - } - public static class KeyValueSchema { private final TableMetadata tableMetadata; private final Schema keySchema; diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java index 260e73e..2c22abf 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java @@ -5,6 +5,27 @@ */ package io.debezium.connector.cassandra; +import static java.util.stream.Collectors.toMap; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.schema.KeyspaceParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.SchemaChangeListener; +import com.datastax.driver.core.TableMetadata; + +import io.debezium.connector.SourceInfoStructMaker; + /** * The schema processor is responsible for periodically * refreshing the table schemas in Cassandra. Cassandra @@ -13,16 +34,169 @@ */ public class SchemaProcessor extends AbstractProcessor { + private static final Logger logger = LoggerFactory.getLogger(SchemaProcessor.class); + private static final String NAME = "Schema Processor"; private final SchemaHolder schemaHolder; + private final CassandraClient cassandraClient; + private final String kafkaTopicPrefix; + private final SourceInfoStructMaker sourceInfoStructMaker; + private final SchemaChangeListener schemaChangeListener; public SchemaProcessor(CassandraConnectorContext context) { super(NAME, context.getCassandraConnectorConfig().schemaPollInterval()); schemaHolder = context.getSchemaHolder(); + this.cassandraClient = context.getCassandraClient(); + this.kafkaTopicPrefix = schemaHolder.kafkaTopicPrefix; + this.sourceInfoStructMaker = schemaHolder.sourceInfoStructMaker; + + schemaChangeListener = new NoOpSchemaChangeListener() { + @Override + public void onKeyspaceAdded(final KeyspaceMetadata keyspace) { + Schema.instance.setKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); + Keyspace.openWithoutSSTables(keyspace.getName()); + logger.info("added keyspace {}", keyspace.asCQLQuery()); + } + + @Override + public void onKeyspaceChanged(final KeyspaceMetadata current, final KeyspaceMetadata previous) { + Schema.instance.updateKeyspace(current.getName(), KeyspaceParams.create(current.isDurableWrites(), current.getReplication())); + logger.info("updated keyspace {}", current.asCQLQuery()); + } + + @Override + public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) { + schemaHolder.removeKeyspace(keyspace.getName()); + Schema.instance.clearKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); + logger.info("removed keyspace {}", keyspace.asCQLQuery()); + } + + @Override + public void onTableAdded(final TableMetadata table) { + logger.info(String.format("Table %s.%s detected to be added!", table.getKeyspace().getName(), table.getName())); + schemaHolder.addTable(new KeyspaceTable(table), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, table, sourceInfoStructMaker)); + + final CFMetaData rawCFMetaData = CFMetaData.compile(table.asCQLQuery(), table.getKeyspace().getName()); + // we need to copy because CFMetaData.compile will generate new cfId which would not match id of old metadata + final CFMetaData newCFMetaData = rawCFMetaData.copy(table.getId()); + + Keyspace.open(newCFMetaData.ksName).initCf(newCFMetaData, false); + + final org.apache.cassandra.schema.KeyspaceMetadata current = Schema.instance.getKSMetaData(newCFMetaData.ksName); + if (current == null) { + throw new IllegalStateException(String.format("Keyspace %s doesn't exist", newCFMetaData.ksName)); + } + + if (current.tables.get(table.getName()).isPresent()) { + logger.info(String.format("table %s.%s is already added!", table.getKeyspace(), table.getName())); + return; + } + + final java.util.function.Function f = ks -> ks + .withSwapped(ks.tables.with(newCFMetaData)); + + org.apache.cassandra.schema.KeyspaceMetadata transformed = f.apply(current); + + Schema.instance.setKeyspaceMetadata(transformed); + Schema.instance.load(newCFMetaData); + + logger.info("added table {}", table.asCQLQuery()); + } + + @Override + public void onTableRemoved(final TableMetadata table) { + logger.info(String.format("Table %s.%s detected to be removed!", table.getKeyspace().getName(), table.getName())); + schemaHolder.removeTable(new KeyspaceTable(table)); + + final String ksName = table.getKeyspace().getName(); + final String tableName = table.getName(); + + final org.apache.cassandra.schema.KeyspaceMetadata oldKsm = Schema.instance.getKSMetaData(table.getKeyspace().getName()); + + if (oldKsm == null) { + throw new IllegalStateException(String.format("KeyspaceMetadata for keyspace %s is not found!", table.getKeyspace().getName())); + } + + final ColumnFamilyStore cfs = Keyspace.openWithoutSSTables(ksName).getColumnFamilyStore(tableName); + + if (cfs == null) { + throw new IllegalStateException(String.format("ColumnFamilyStore for %s.%s is not found!", table.getKeyspace(), table.getName())); + } + + // make sure all the indexes are dropped, or else. + cfs.indexManager.markAllIndexesRemoved(); + + // reinitialize the keyspace. + final CFMetaData cfm = oldKsm.tables.get(tableName).get(); + final org.apache.cassandra.schema.KeyspaceMetadata newKsm = oldKsm.withSwapped(oldKsm.tables.without(tableName)); + + Schema.instance.unload(cfm); + Schema.instance.setKeyspaceMetadata(newKsm); + + logger.info("removed table {}", table.asCQLQuery()); + } + + @Override + public void onTableChanged(final TableMetadata newTableMetadata, final TableMetadata oldTableMetaData) { + logger.info(String.format("Detected alternation in schema of %s.%s (previous cdc = %s, current cdc = %s)", + newTableMetadata.getKeyspace().getName(), + newTableMetadata.getName(), + oldTableMetaData.getOptions().isCDC(), + newTableMetadata.getOptions().isCDC())); + + schemaHolder.addTable(new KeyspaceTable(newTableMetadata), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, newTableMetadata, sourceInfoStructMaker)); + + final CFMetaData rawNewMetadata = CFMetaData.compile(newTableMetadata.asCQLQuery(), + newTableMetadata.getKeyspace().getName()); + + final CFMetaData oldMetadata = Schema.instance.getCFMetaData(oldTableMetaData.getKeyspace().getName(), oldTableMetaData.getName()); + + // we need to copy because CFMetaData.compile will generate new cfId which would not match id of old metadata + final CFMetaData newMetadata = rawNewMetadata.copy(oldMetadata.cfId); + oldMetadata.apply(newMetadata); + + logger.info("changed table {}", newTableMetadata.asCQLQuery()); + } + }; + } + + @Override + public void initialize() { + final Map tables = getTableMetadata() + .stream() + .collect(toMap(KeyspaceTable::new, + tableMetadata -> new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, tableMetadata, sourceInfoStructMaker))); + + tables.forEach(schemaHolder::addTable); + + logger.info("Registering schema change listener ..."); + cassandraClient.getCluster().register(schemaChangeListener); } @Override public void process() { - schemaHolder.refreshSchemas(); + } + + @Override + public void destroy() { + logger.info("Unregistering schema change listener ..."); + cassandraClient.getCluster().unregister(schemaChangeListener); + logger.info("Clearing cdc keyspace / table map ... "); + schemaHolder.tableToKVSchemaMap.clear(); + } + + private List getTableMetadata() { + return cassandraClient.getCluster().getMetadata().getKeyspaces().stream() + .map(KeyspaceMetadata::getTables) + .flatMap(Collection::stream) + .collect(Collectors.toList()); } } diff --git a/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java b/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java index 33c76c0..e838a47 100644 --- a/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java @@ -198,7 +198,7 @@ private static BuiltStatement generateSnapshotStatement(TableMetadata tableMetad private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet) throws IOException { String tableName = tableName(tableMetadata); KeyspaceTable keyspaceTable = new KeyspaceTable(tableMetadata); - SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); Schema keySchema = keyValueSchema.keySchema(); Schema valueSchema = keyValueSchema.valueSchema(); diff --git a/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java index 21276e3..60241d9 100644 --- a/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java @@ -29,11 +29,14 @@ public class CommitLogProcessorTest extends EmbeddedCassandraConnectorTestBase { private CassandraConnectorContext context; private CommitLogProcessor commitLogProcessor; + private SchemaProcessor schemaProcessor; @Before public void setUp() throws Exception { context = generateTaskContext(); commitLogProcessor = new CommitLogProcessor(context); + schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); commitLogProcessor.initialize(); } @@ -41,6 +44,7 @@ public void setUp() throws Exception { public void tearDown() throws Exception { deleteTestOffsets(context); commitLogProcessor.destroy(); + schemaProcessor.destroy(); context.cleanUp(); } @@ -48,7 +52,7 @@ public void tearDown() throws Exception { public void testProcessCommitLogs() throws Exception { int commitLogRowSize = 10; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b int, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); + // context.getSchemaHolder().refreshSchemas(); // programmatically add insertion and deletion events into commit log, this is because running an 'INSERT' or 'DELETE' // cql against the embedded Cassandra does not modify the commit log file on disk. diff --git a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java index 0d61a38..0f22366 100644 --- a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java @@ -7,7 +7,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import org.junit.Test; @@ -19,6 +18,7 @@ public class SchemaProcessorTest extends EmbeddedCassandraConnectorTestBase { public void testProcess() throws Exception { CassandraConnectorContext context = generateTaskContext(); SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); SchemaHolder.KeyValueSchema keyValueSchema; String namespacePrefix = "io.debezium.connector.cassandra" + "." + EmbeddedCassandraConnectorTestBase.TEST_KAFKA_TOPIC_PREFIX + "." @@ -29,48 +29,48 @@ public void testProcess() throws Exception { assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("table1") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;"); + + Thread.sleep(5000); + schemaProcessor.process(); assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - assertNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); + assertNotNull(context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table1") + " WITH cdc = true;"); schemaProcessor.process(); assertEquals(1, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table1" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); schemaProcessor.process(); - assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); + assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table1" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table2" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table2" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table2") + " ADD c text"); schemaProcessor.process(); assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); TableMetadata expectedTm1 = context.getCassandraClient().getCdcEnabledTableMetadata(TEST_KEYSPACE, "table1"); TableMetadata expectedTm2 = context.getCassandraClient().getCdcEnabledTableMetadata(TEST_KEYSPACE, "table2"); - TableMetadata tm1 = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")).tableMetadata(); - TableMetadata tm2 = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")).tableMetadata(); + TableMetadata tm1 = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")).tableMetadata(); + TableMetadata tm2 = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")).tableMetadata(); assertEquals(expectedTm1, tm1); assertEquals(expectedTm2, tm2); - deleteTestKeyspaceTables(); context.cleanUp(); } diff --git a/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java index de95acc..b662cc6 100644 --- a/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java @@ -31,12 +31,13 @@ public class SnapshotProcessorTest extends EmbeddedCassandraConnectorTestBase { public void testSnapshotTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); int tableSize = 5; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); for (int i = 0; i < tableSize; i++) { context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i)); @@ -75,11 +76,12 @@ public void testSnapshotTable() throws Exception { public void testSnapshotSkipsNonCdcEnabledTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); int tableSize = 5; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("non_cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;"); - context.getSchemaHolder().refreshSchemas(); for (int i = 0; i < tableSize; i++) { context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("non_cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i)); } @@ -99,10 +101,11 @@ public void testSnapshotEmptyTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); AtomicBoolean globalTaskState = new AtomicBoolean(true); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); ChangeEventQueue queue = context.getQueue(); assertEquals(queue.totalCapacity(), queue.remainingCapacity()); From 0999ce9d0c65bbffb6aa3691d6ca4d38cdaa884d Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Tue, 13 Apr 2021 15:31:48 +1000 Subject: [PATCH 11/13] Fixed up the C* version and made an initial attempt to deal with the fact that DCC doesn't handle varints. --- pom.xml | 2 +- .../transforms/CassandraTypeConverter.java | 22 ++++++++++--------- .../transforms/CassandraTypeDeserializer.java | 2 ++ 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pom.xml b/pom.xml index d3e88fc..a26daed 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,7 @@ org.apache.cassandra cassandra-all - 3.11.4-SNAPSHOT + 3.11.4 ch.qos.logback diff --git a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java index 8019cd0..5ab83d5 100644 --- a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java +++ b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java @@ -8,6 +8,15 @@ import java.util.HashMap; import java.util.Map; +import com.datastax.driver.core.DataType; + +import io.debezium.connector.cassandra.transforms.type.converter.BasicTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.ListTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.MapTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.SetTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.TupleTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.TypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.UserTypeConverter; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BooleanType; @@ -20,6 +29,7 @@ import org.apache.cassandra.db.marshal.FloatType; import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.ShortType; import org.apache.cassandra.db.marshal.SimpleDateType; @@ -29,16 +39,6 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.UUIDType; -import com.datastax.driver.core.DataType; - -import io.debezium.connector.cassandra.transforms.type.converter.BasicTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.ListTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.MapTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.SetTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.TupleTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.TypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.UserTypeConverter; - public final class CassandraTypeConverter { private CassandraTypeConverter() { @@ -71,6 +71,8 @@ private CassandraTypeConverter() { typeMap.put(DataType.Name.TUPLE, new TupleTypeConverter()); typeMap.put(DataType.Name.UDT, new UserTypeConverter()); typeMap.put(DataType.Name.UUID, new BasicTypeConverter<>(UUIDType.instance)); + typeMap.put(DataType.Name.VARINT, new BasicTypeConverter<>(IntegerType.instance)); + typeMap.put(DataType.Name.VARCHAR, new BasicTypeConverter<>(UTF8Type.instance)); } public static AbstractType convert(DataType type) { diff --git a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java index 6a7a057..4e18d0c 100644 --- a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java +++ b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java @@ -23,6 +23,7 @@ import org.apache.cassandra.db.marshal.FloatType; import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.MapType; @@ -80,6 +81,7 @@ private CassandraTypeDeserializer() { tmp.put(DoubleType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DOUBLE_TYPE)); tmp.put(DecimalType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DOUBLE_TYPE)); tmp.put(Int32Type.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.INT_TYPE)); + tmp.put(IntegerType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); tmp.put(ShortType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.SHORT_TYPE)); tmp.put(LongType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); tmp.put(TimeType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); From ecc11ce8c1224c9a3ab0ca237c370b0e7a2fef3d Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Tue, 13 Apr 2021 15:31:48 +1000 Subject: [PATCH 12/13] Fixed up the C* version and made an initial attempt to deal with the fact that DCC doesn't handle varints. --- .../transforms/CassandraTypeConverter.java | 22 ++++++++++--------- .../transforms/CassandraTypeDeserializer.java | 2 ++ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java index 8019cd0..5ab83d5 100644 --- a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java +++ b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java @@ -8,6 +8,15 @@ import java.util.HashMap; import java.util.Map; +import com.datastax.driver.core.DataType; + +import io.debezium.connector.cassandra.transforms.type.converter.BasicTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.ListTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.MapTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.SetTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.TupleTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.TypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.UserTypeConverter; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BooleanType; @@ -20,6 +29,7 @@ import org.apache.cassandra.db.marshal.FloatType; import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.ShortType; import org.apache.cassandra.db.marshal.SimpleDateType; @@ -29,16 +39,6 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.UUIDType; -import com.datastax.driver.core.DataType; - -import io.debezium.connector.cassandra.transforms.type.converter.BasicTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.ListTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.MapTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.SetTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.TupleTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.TypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.UserTypeConverter; - public final class CassandraTypeConverter { private CassandraTypeConverter() { @@ -71,6 +71,8 @@ private CassandraTypeConverter() { typeMap.put(DataType.Name.TUPLE, new TupleTypeConverter()); typeMap.put(DataType.Name.UDT, new UserTypeConverter()); typeMap.put(DataType.Name.UUID, new BasicTypeConverter<>(UUIDType.instance)); + typeMap.put(DataType.Name.VARINT, new BasicTypeConverter<>(IntegerType.instance)); + typeMap.put(DataType.Name.VARCHAR, new BasicTypeConverter<>(UTF8Type.instance)); } public static AbstractType convert(DataType type) { diff --git a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java index 6a7a057..4e18d0c 100644 --- a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java +++ b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java @@ -23,6 +23,7 @@ import org.apache.cassandra.db.marshal.FloatType; import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.MapType; @@ -80,6 +81,7 @@ private CassandraTypeDeserializer() { tmp.put(DoubleType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DOUBLE_TYPE)); tmp.put(DecimalType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DOUBLE_TYPE)); tmp.put(Int32Type.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.INT_TYPE)); + tmp.put(IntegerType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); tmp.put(ShortType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.SHORT_TYPE)); tmp.put(LongType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); tmp.put(TimeType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); From e876f6cb9d3b9318fa7762a2bd6b858b54725f30 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Tue, 13 Apr 2021 08:38:16 +0200 Subject: [PATCH 13/13] applied comments in review --- .../java/io/debezium/connector/cassandra/SchemaProcessor.java | 4 ++-- .../debezium/connector/cassandra/CommitLogProcessorTest.java | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java index 2c22abf..5a86aa1 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java @@ -99,10 +99,10 @@ public void onTableAdded(final TableMetadata table) { return; } - final java.util.function.Function f = ks -> ks + final java.util.function.Function transformationFunction = ks -> ks .withSwapped(ks.tables.with(newCFMetaData)); - org.apache.cassandra.schema.KeyspaceMetadata transformed = f.apply(current); + org.apache.cassandra.schema.KeyspaceMetadata transformed = transformationFunction.apply(current); Schema.instance.setKeyspaceMetadata(transformed); Schema.instance.load(newCFMetaData); diff --git a/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java index 60241d9..a1c48f6 100644 --- a/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java @@ -52,7 +52,6 @@ public void tearDown() throws Exception { public void testProcessCommitLogs() throws Exception { int commitLogRowSize = 10; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b int, PRIMARY KEY(a)) WITH cdc = true;"); - // context.getSchemaHolder().refreshSchemas(); // programmatically add insertion and deletion events into commit log, this is because running an 'INSERT' or 'DELETE' // cql against the embedded Cassandra does not modify the commit log file on disk.