diff --git a/REPORT.adoc b/REPORT.adoc new file mode 100644 index 0000000..3cb6f4f --- /dev/null +++ b/REPORT.adoc @@ -0,0 +1,368 @@ +== Debezium integration with Cassandra report - part 1 + +_by Stefan Miklosovic / stefan dot miklosovic at instaclustr dot com_ + +=== Nature of the problem + +The problem is rather straightforward however a reader / Cassandra user can be confused by seemingly +almost random behaviour of Debezium which brings even more complexity to trying to understand what is going on. + +All problems observed are a variation of this scenario and reducing everything to this problem and +trying to solve it will resolve all errorneous behavior. + +* I have `cdc_enabled: true` in `cassandra.yaml` +* I have a table `k1.t1` +* I have cdc functionality enabled on `k1.t1` by specifying `cdc = true` in `CREATE` statement +or by `ALTER`-ing it afterwards if that is not the case. + +The important point here to make is that it is required to have Debezium running _before_ this +all happens. In other words: + +* I start Cassandra without `k1.t1` +* I start Debezium +* After points above, I create table as described above. + +The result of this procedure is that a cdc-enabled table will indeed place logs into cdc-raw directory +to be picked up by Debezium, but Debezium will not react - it just deletes these logs and nothing +is eventually sent to a respective Kafka topic. + +What is even worse is that these commit logs in cdc directory are lost for ever as Debezium will delete them (by default, +there is a way how to _not_ delete them by implementing own `CommitLogTransfer` but by default +a commit log transfer implementation will just delete these logs for good). + +What is even more strange is that the other table on which a `cdc` is enabled is just pushing these +messages to a topic fine so the fact that the other table does not produce them is mind-boggling. + +To fully understand why this is happening, we need to explain what a write and read path looks like. + +=== On Mutation and CommitLog + +The building block of any change in Cassandra is `Mutation`. A mutation contains `PartitionUpdate`-s. +If `Mutation` happens to consist of one "change" only, then this `Mutation` happens to contain +just one `PartitionUpdate`. + +Cassandra appends `Mutation` objects as they come at the end of _commit log segment_. A commit log segment +is each individual file in commitlog directory. + +If CDC is enabled, once a commit log is "full" (for the lack of better words and for brevity), it is +just copied over to `cdc` dir. + +Debezium has a watcher on cdc dir and as soon as a commit log is copied there, Debezim detects it, +it will read whole log, reconstructs all `Mutation`-s, creates messages and sends them to Kafka. + +Hence we clearly see that this process has two parts: + +1. *serialize* `Mutation`-s from Java objects to binary blob and store it to a commit log segment +2. parse a commit log segment and *deserialize* all `Mutation`-s into Java objects and create Kafka messages + +=== Write path and serialisation + +A mutation is serialized by its `MutationSerializer`. It is not too much important where this +serializer is called from. + +[source,java] +---- + public static class MutationSerializer implements IVersionedSerializer + { + public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException + { + /// other code hidden for brevity + + int size = mutation.modifications.size(); + + if (version < MessagingService.VERSION_30) + { + ByteBufferUtil.writeWithShortLength(mutation.key().getKey(), out); + out.writeInt(size); + } + else + { + out.writeUnsignedVInt(size); + } + + assert size > 0; + for (Map.Entry entry : mutation.modifications.entrySet()) + PartitionUpdate.serializer.serialize(entry.getValue(), out, version); + } +---- + +Here we see that in order to serialize a `Mutation`, we need to iterate over all _modifications_ +a.k.a `PartitionUpdate`-s and we need to serialise these. The serialisation of a Mutation +is equal to the serialisation of all its modifications. + +Let's take a look into `PartitionUpdate` serialisation: + +[source,java] +---- + public static class PartitionUpdateSerializer + { + public void serialize(PartitionUpdate update, DataOutputPlus out, int version) throws IOException + { + try (UnfilteredRowIterator iter = update.unfilteredIterator()) + { + // other code hidden for brevity + // serialisation of metadata + CFMetaData.serializer.serialize(update.metadata(), out, version); + // serialisation of partition itself + UnfilteredRowIteratorSerializer.serializer.serialize(iter, + null, out, version, update.rowCount()); + } + } +---- + +`updata.metadata()` above returns `CFMetaData`. `CFMetaData` has a field `params` of +type `TableParams` and this object has a flag called `cdc` which is a boolean saying +if that particular `PartitionUpdate` relates to a table which is _cdc-enabled_. + +From above we see that `PartitionUpdateSerializer` serializes not only partition themselves +but it also serializes its metadata via `CFMetaDataSerializer`. Let's take a look into it: + +[source,java] +---- + public static class Serializer + { + public void serialize(CFMetaData metadata, + DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(metadata.cfId, out, version); + } +---- + +Here, the very important fact is that the serialization of `CFMetaData` in Cassandra terms +means that _it will take UUID of a table that PartitionUpdate is from and it will serialize it_. +*Just that UUID*. + +=== Read path and deserialisation + +Having write path covered, let's look how read path is treated. Read path is a different problem - +we parse binary commit log and we try to construct all `Mutation`-s from it. + +*It is important to realise that this is happening in Debezium context, not in Cassandra.* + +Debezium is using Cassandra's class `CommitLogReader` by which a commit log reading is conducted so +Debezium is not using anything custom but built-in Cassandra functionality. + +Cassandra's `CommitLogReader` is used in Debezium's `CommitLogProcessor`. It just scans new commit logs +as they are copied to cdc dir and it will use `CommitLogReader` in its `process` method. + +`CommitLogReader` is reading commit logs via method `readCommitLogSegment` which accepts `CommitLogReadHandler`. +Commit log handler is the custom implementation of Debezium to actually hook there its functionality to process +mutations as they come from reading a commit log segment. + +For the completeness, the chain of method calls to the place where handler is ultimately called is like + +1. `CommitLogReader#readCommitLogSegment` +2. in method from 1) there is call to private `CommitLogReader#readSection`, a commit log is not read all at once but it is read by chunks - _sections_. +3. in 2) we pass our handler to `CommitLogReader#readMutation` + +At the beginning of 3) we *deserialize* buffer into a mutation. + +[source,java] +---- + try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) + { + mutation = Mutation.serializer.deserialize(bufIn, + desc.getMessagingVersion(), + SerializationHelper.Flag.LOCAL); +---- + +Finally, at the very bottom we see the handling of just deserialized `Mutation` by our +custom handler. + +[source,java] +---- +handler.handleMutation(mutation, size, entryLocation, desc); +---- + +The implementation of this handler in Debezium looks like this: + +[source,java] +---- + @Override + public void handleMutation(Mutation mutation, + int size, + int entryLocation, + CommitLogDescriptor descriptor) { + if (!mutation.trackedByCDC()) { + return; + } + + // other code + // here Mutation is eventually transformed to a Kafka message and sent +---- + +This is crucial. The problem is that *a mutation is not tracked by cdc* (empirically verified by putting heavy logging at all the places). + +In other words: we have verified that Cassandra serialized data as it is supposed to do but for some reason, its mutation which was previously marked as _cdc-enabled_ +is not deserialized in such a way that `trackedByCDC` would be `true` so that method +would not return immediately (hence nothing is sent to Kafka). + +Let's see the logic behind `Mutation#trackedByCDC` method + +[source,java] +---- + public boolean trackedByCDC() + { + return cdcEnabled; + } +---- + +It is just a getter. This flag is however set on _read path_ by +`MutationSerializer#deserialize`. At the end of that method it returns + +[source,java] +---- +return new Mutation(update.metadata().ksName, dk, modifications); +---- + +And finally, in its constructor we find: + +[source,java] +---- + protected Mutation(... params for constructor) + { + this.keyspaceName = keyspaceName; + this.key = key; + this.modifications = modifications; + for (PartitionUpdate pu : modifications.values()) + cdcEnabled |= pu.metadata().params.cdc; + } +---- + +Here we see that `cdcEnabled` flag will be `true` in case _whatever_ `PartitionUpdate` metadata has in their params `cdc` to be true. + +`PartitionUpdate#metadata` returns `CFMetaData` on deserialization, nothing wrong with that. + +Yet we clearly see that after everything is deserialized fully, that flag is still `false` ... + +=== The core of the problem + +The problems are two. The first problem is that the serialized object of a Mutation +does not contain its `TableParams` - or to better put it - `PartitionUpdate` of a +Mutation is not serialized in such a way that it would contain `cdc` flag as well. +We saw it contains only `cdIf` (uuid) and that is all. + +However, it is rather understandable that it is done like that because after a closer look, this information is not necessary. If we refresh the content of `MutationSerializer#deserialize`, it contains + +[source,java] +---- +PartitionUpdate.serializer.deserialize(in, version, flag, key); +---- + +Which in turn contains + +[source,java] +---- +CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); +---- + +Which finally calls: + +[source,java] +---- +UUID cfId = UUIDSerializer.serializer.deserialize(in, version); +CFMetaData metadata = Schema.instance.getCFMetaData(cfId); +---- + +Hence we see that all it takes to populate `PartitionUpdate` with `CFMetaData` +is to look what `cfId` we serialized and based on that id, we retrieve +metadata from `Schema`. + +The conclusion is rather clear - we have running node which serializes just fine +but we have deserialized mutations for which its retrieved `CFMetaData` contains +`cdc` flag which is `false` so its processing is skipped. + +The reason this is happening is that when Debezium starts, it will read Cassadra schema by doing this in `CassandraConnectorContext` constructor: + +[source,java] +---- +Schema.instance.loadDdlFromDisk(this.config.cassandraConfig()); +---- + +which translates to + +[source,java] +---- + public void loadDdlFromDisk(String yamlConfig) { + // other stuff ... + DatabaseDescriptor.toolInitialization(); + Schema.instance.loadFromDisk(false); + } +---- + +This is done *once when Debezium starts* and it is *not* changed. So +if you create a table after Debezium starts, Debezium just does not sees it. Same happens when that table already exists but you alter it with `cdc = true` *after Debezium started*. + +Debezium's internals are using Cassandra code internals but since Debezium is different JVM / process from Cassandra, what happens in Cassandra after Debezium is started is not visible to Debezium because +it is just completely different JVM process and if you enabled cdc in Cassandra, Debezium just does not know about it. + +However, if you restart Debezium while `cdc` is already enabled, +*it will read system keyspaces of Cassandra after it persisted these changes to disk to system SSTables* so it will just send it to Kafka fine. + +=== Proposed solution + +The only solution I see is to _reload Cassandra schema in Debezium_ +before *each* commmit log segment in `cdc` dir is scanned / parsed. + +The reason it should be done before processing each log is that you can not possibly know from outside while you are going to read it if that log contains mutations which contains partition updates which are coming from a table for which you just enabled cdc recently or not so you have to do it before read it in every case. This refreshment of schema is not performance sensitive, it just takes few milliseconds / sub seconds time. + +If we do that, on deserialization path, the deserialization of `PartitionUpdate` will populate it with `CFMetaData` which reflect the true state of it because Cassandra just contains it in its system tables. + +The reloading of schema can be done on demand, as said, Debezium has a watch on cdc dir and it does this + +[source,java] +---- +void handleEvent(WatchEvent event, Path path) { + if (isRunning()) { + // this would be added + SchemaProcessor.SchemaReinitializer.reinitialize(); + + // this stays as is + processCommitLog(path.toFile()); + } +} +---- + +The implementation of reinitializer would look like this (working example) + +[source,java] +---- +public static final class SchemaReinitializer { + public static synchronized void reinitialize() { + try { + // give it some time to flush system table to disk so we can read them again + Thread.sleep(5000); + clearWithoutAnnouncing(); + Schema.instance.loadFromDisk(false); + } + catch (final Throwable ex) { + logger.info("Error in reinitialisation method", ex); + } + } + + public static synchronized void clearWithoutAnnouncing() { + for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) { + org.apache.cassandra.schema.KeyspaceMetadata ksm = + Schema.instance.getKSMetaData(keyspaceName); + ksm.tables.forEach(view -> Schema.instance.unload(view)); + // this method does not exist in Cassandra + // ksm.views.forEach(view -> Schema.instance.unload(view)); + Schema.instance.clearKeyspaceMetadata(ksm); + } + logger.info("clearing without announce done"); + } + } +---- + +There is also `Schema.instance.clear()` but it also announces schema changes and it +invokes parts of the Cassandra codebase which are throwing exceptions when it is called outside +of Cassandra - remember we are using Cassandra code but we do not actually run Cassandra in Debezium +so it would call code we do not want. + +Fortunately, all methods we need are public so we just copy `clear` of `Schema` but we remove +last line announcing new version which is problematic. + +Once this is done, if we alter table in Cassandra, `cdc` will be parsed right so Mutation will not be +skipped from processing and Kafka messages will be sent over. + +The other solution would be to create JVM agent instead of a standalone process. By doing so, we would run Debezium in the same JVM as Cassandra runs so if Cassandra updates cdc flag on some metadata, Debezium sees it instantly. This path is yet to be implemented if we chose so, I am not sure if it is possible but in pricinple it should be. \ No newline at end of file diff --git a/REPORT_2.adoc b/REPORT_2.adoc new file mode 100644 index 0000000..0414828 --- /dev/null +++ b/REPORT_2.adoc @@ -0,0 +1,152 @@ +== Debezium integration with Cassandra report - part 2 + +_by Stefan Miklosovic / stefan dot miklosovic at instaclustr dot com_ + +=== Introduction + +In this document, we will retrospect what we have actually done in the part 1 and +we will build on that to improve the solution. + +=== Nature of the problem + +The problem with solution 1 is that it is not deterministic. The reason for +non-determinism seems to be the fact that it is more or less unpredictable when +Cassandra flushes / persists changes, e.g in `cdc = true / false` to disk so our +refresh of schema will pick these changes up. + +Hence we might see the following: + +1) A table is created with cdc = true +2) We detect the change in schema listener of driver and we refresh the schema in Debezium via mechanics of Cassandra +3) We think that our internal structure is indeed refreshed but our `cdc` is still false. + +This means that even Cassandra process is internally aware of the fact that we have +enabled cdc on a particular table, Debezium does not reflect this because +sometimes changes are flushed / persisted just fine but sometimes it takes time +to propagate these changes and it might be too late for Debezium as listener was already invoked. + +=== Possible solutions + +The are two solutions in general to this problem: + +1) Faking what Cassandra does in Debezium to have same data structures too. + +This is rather delicate operation / topic to deal with but it is possible and we chose to go with +this solution for a time being. + +It merges two main concepts: + +a) Debezium is informed about schema changes via provided schema change listener registered on driver +b) once a respective method on a listner is invoked, we mock same code what Cassandra would invoke but +in such a way that the parts which would be errorneous (because Debezium just does not run Cassandra) are +skipped. + +By doing b), we are internally holding a logical copy of what the real Cassandra is holding and we are +synchronizing Cassandra internal structures (keyspaces, tables ...) by registering +schema change listener and applying same changes to "Cassandra" in Debezium process. + +Lets go through the core of this logic, starting with "onKeyspaceAdded": + +[source,java] +---- +schemaChangeListener = new NoOpSchemaChangeListener() { + @Override + public void onKeyspaceAdded(final KeyspaceMetadata keyspace) { + Schema.instance.setKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); + Keyspace.openWithoutSSTables(keyspace.getName()); + logger.info("added keyspace {}", keyspace.asCQLQuery()); + } +---- + +Here we fake that we opened a keyspace. This will populate some internal structures of Cassandra and so on so +our hot Cassandra code in Debezium "knows" what keyspace was added and so on. + +On a keyspace's update, we do: + +[source,java] +---- +@Override +public void onKeyspaceChanged(KeyspaceMetadata current, + KeyspaceMetadata previous) { + Schema.instance.updateKeyspace(current.getName(), + KeyspaceParams.create(current.isDurableWrites(), + current.getReplication())); +} +---- + +When a keyspace is removed, we do: + +[source,java] +---- +@Override +public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) { + schemaHolder.removeKeyspace(keyspace.getName()); + // here KeyspaceMetadata are of Cassandra, not driver's as in method argument + Schema.instance.clearKeyspaceMetadata(KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); +} +---- + +We are removing a keyspace from our schema holder too. Think about it, if we removed whole keyspace +by "DROP KEYSPACE abc", all tables are removed too so we just get rid of all tables of that keyspace +in our schema holder as well. + +We left last three methods of a listener - onTableAdded, onTableChanged and onTableRemoved +for a reader to go through. The code you see is more or less what Cassandra does internally but +it is refactored in such a way that parts with are not needed (nor desired to be done) are just skipped. + +Please follow this https://github.com/instaclustr/debezium-connector-cassandra/blob/dd2/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java#L81-L168[link]. + +Once we put into into the action, metadata will be populated right with `cdc` flag on TableParams and so on. +`Mutation` will be as well serialised properly because it will reach into ColumnFamily's metadata which +has `cdc = true` because we were notified about this change in a listener and we updated +that table in Cassandra code so the following deserialisation of a Mutation where this code +is called will not throw: + +[source,java] +---- +public static class Serializer +{ + public void serialize(CFMetaData metadata, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(metadata.cfId, out, version); + } + + public CFMetaData deserialize(DataInputPlus in, int version) throws IOException + { + UUID cfId = UUIDSerializer.serializer.deserialize(in, version); + CFMetaData metadata = Schema.instance.getCFMetaData(cfId); + if (metadata == null) + { + String message = String.format("Couldn't find table for cfId %s. If a table was just " + + "created, this is likely due to the schema not being fully propagated. Please wait for schema " + + "agreement on table creation.", cfId); + throw new UnknownColumnFamilyException(message, cfId); + } + + return metadata; + } +---- + +Keep in mind that we are not "initialising" Cassandra by any way, when Debezium starts, +internals of Cassandra will already read tables on the disk and so on so it will be fully running +but we will never be notified about what happens afterwards (that cdc was changed from false to true, for example). For +that reason there is a schema change listener which synchronizes it. We might be notified about that via listener, that is true, +but schema refreshment does not always help and we would end up being notified about changes but we would not have any +way to make these changes visible to Cassandra internal's code - only invoking core Cassandra structures and emulating +we are running it in a proper Cassandra node will make deserialisation of a mutation possible because previously our +cdc flag was always false (was not updating) so the handling of such mutation was effectively skipped. + +The second solution consists of making an agent from Debezium - this means that it will see same data structures as Cassandra, +by definition. The problem with this solution we see is that it is rather tricky to do because +Debezium would suddenly start to have same lifecycle as Cassandra (or the other way around - Cassandra +would have same lifecycle as Debezium) - as they are inherently connected together. + +Another problem we see is that the dependencies which Debezium uses are not compatible with +what Cassandra uses and it would be just not possible to "merge it". By merely checking, +the probability this would be the case is quite high, there is Cassandra connector of diff --git a/pom.xml b/pom.xml index d62cf6d..a26daed 100644 --- a/pom.xml +++ b/pom.xml @@ -4,20 +4,20 @@ io.debezium debezium-parent - 1.5.0-SNAPSHOT + 1.5.0.Final 4.0.0 debezium-connector-cassandra Debezium Connector for Cassandra - 1.5.0-SNAPSHOT + 1.5.0.Final jar scm:git:git@github.com:debezium/debezium-connector-cassandra.git scm:git:git@github.com:debezium/debezium-connector-cassandra.git https://github.com/debezium/debezium-connector-cassandra - HEAD + v1.5.0.Final @@ -119,6 +119,7 @@ org.apache.cassandra cassandra-all + 3.11.4 ch.qos.logback diff --git a/src/main/java/io/debezium/connector/cassandra/CassandraClient.java b/src/main/java/io/debezium/connector/cassandra/CassandraClient.java index b199924..aa398b8 100644 --- a/src/main/java/io/debezium/connector/cassandra/CassandraClient.java +++ b/src/main/java/io/debezium/connector/cassandra/CassandraClient.java @@ -95,6 +95,10 @@ public String getClusterName() { return cluster.getMetadata().getClusterName(); } + public Cluster getCluster() { + return cluster; + } + public boolean isQueryable() { return !cluster.isClosed() && !session.isClosed(); } diff --git a/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java b/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java index 5d96bbb..0077eb5 100644 --- a/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java +++ b/src/main/java/io/debezium/connector/cassandra/CassandraConnectorContext.java @@ -53,7 +53,7 @@ public CassandraConnectorContext(CassandraConnectorConfig config) throws Excepti .build(); // Setting up schema holder ... - this.schemaHolder = new SchemaHolder(this.cassandraClient, this.config.kafkaTopicPrefix(), this.config.getSourceInfoStructMaker()); + this.schemaHolder = new SchemaHolder(this.config.kafkaTopicPrefix(), this.config.getSourceInfoStructMaker()); // Setting up a file-based offset manager ... this.offsetWriter = new FileOffsetWriter(this.config.offsetBackingStoreDir()); diff --git a/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java b/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java index f08c964..346929c 100644 --- a/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java +++ b/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java @@ -48,7 +48,7 @@ /** * Handler that implements {@link CommitLogReadHandler} interface provided by Cassandra source code. - * + *

* This handler implementation processes each {@link Mutation} and invokes one of the registered partition handler * for each {@link PartitionUpdate} in the {@link Mutation} (a mutation could have multiple partitions if it is a batch update), * which in turn makes one or more record via the {@link RecordMaker} and enqueue the record into the {@link ChangeEventQueue}. @@ -77,7 +77,7 @@ public class CommitLogReadHandlerImpl implements CommitLogReadHandler { } /** - * A PartitionType represents the type of a PartitionUpdate. + * A PartitionType represents the type of a PartitionUpdate. */ enum PartitionType { /** @@ -86,7 +86,7 @@ enum PartitionType { PARTITION_KEY_ROW_DELETION, /** - * a partition-level deletion where partition key + clustering key = primary key + * a partition-level deletion where partition key + clustering key = primary key */ PARTITION_AND_CLUSTERING_KEY_ROW_DELETION, @@ -147,7 +147,7 @@ public static boolean isPartitionDeletion(PartitionUpdate pu) { } /** - * A RowType represents different types of {@link Row}-level modifications in a Cassandra table. + * A RowType represents different types of {@link Row}-level modifications in a Cassandra table. */ enum RowType { /** @@ -303,18 +303,22 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace * Handle a valid deletion event resulted from a partition-level deletion by converting Cassandra representation * of this event into a {@link Record} object and queue the record to {@link ChangeEventQueue}. A valid deletion * event means a partition only has a single row, this implies there are no clustering keys. - * + *

* The steps are: - * (1) Populate the "source" field for this event - * (2) Fetch the cached key/value schemas from {@link SchemaHolder} - * (3) Populate the "after" field for this event - * a. populate partition columns - * b. populate regular columns with null values - * (4) Assemble a {@link Record} object from the populated data and queue the record + * (1) Populate the "source" field for this event + * (2) Fetch the cached key/value schemas from {@link SchemaHolder} + * (3) Populate the "after" field for this event + * a. populate partition columns + * b. populate regular columns with null values + * (4) Assemble a {@link Record} object from the populated data and queue the record */ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { - SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); + if (keyValueSchema == null) { + return; + } + Schema keySchema = keyValueSchema.keySchema(); Schema valueSchema = keyValueSchema.valueSchema(); @@ -349,27 +353,30 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo * Handle a valid event resulted from a row-level modification by converting Cassandra representation of * this event into a {@link Record} object and queue the record to {@link ChangeEventQueue}. A valid event * implies this must be an insert, update, or delete. - * + *

* The steps are: - * (1) Populate the "source" field for this event - * (2) Fetch the cached key/value schemas from {@link SchemaHolder} - * (3) Populate the "after" field for this event - * a. populate partition columns - * b. populate clustering columns - * c. populate regular columns - * d. for deletions, populate regular columns with null values - * (4) Assemble a {@link Record} object from the populated data and queue the record + * (1) Populate the "source" field for this event + * (2) Fetch the cached key/value schemas from {@link SchemaHolder} + * (3) Populate the "after" field for this event + * a. populate partition columns + * b. populate clustering columns + * c. populate regular columns + * d. for deletions, populate regular columns with null values + * (4) Assemble a {@link Record} object from the populated data and queue the record */ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { - SchemaHolder.KeyValueSchema schema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); - Schema keySchema = schema.keySchema(); - Schema valueSchema = schema.valueSchema(); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); + if (keyValueSchema == null) { + return; + } + Schema keySchema = keyValueSchema.keySchema(); + Schema valueSchema = keyValueSchema.valueSchema(); RowData after = new RowData(); populatePartitionColumns(after, pu); populateClusteringColumns(after, row, pu); - populateRegularColumns(after, row, rowType, schema); + populateRegularColumns(after, row, rowType, keyValueSchema); long ts = rowType == DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp(); diff --git a/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java b/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java new file mode 100644 index 0000000..1b45db4 --- /dev/null +++ b/src/main/java/io/debezium/connector/cassandra/NoOpSchemaChangeListener.java @@ -0,0 +1,117 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.cassandra; + +import com.datastax.driver.core.AggregateMetadata; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.FunctionMetadata; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.MaterializedViewMetadata; +import com.datastax.driver.core.SchemaChangeListener; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.UserType; + +public class NoOpSchemaChangeListener implements SchemaChangeListener { + @Override + public void onKeyspaceAdded(final KeyspaceMetadata keyspace) { + + } + + @Override + public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) { + + } + + @Override + public void onKeyspaceChanged(final KeyspaceMetadata current, final KeyspaceMetadata previous) { + + } + + @Override + public void onTableAdded(final TableMetadata table) { + + } + + @Override + public void onTableRemoved(final TableMetadata table) { + + } + + @Override + public void onTableChanged(final TableMetadata current, final TableMetadata previous) { + + } + + @Override + public void onUserTypeAdded(final UserType type) { + + } + + @Override + public void onUserTypeRemoved(final UserType type) { + + } + + @Override + public void onUserTypeChanged(final UserType current, final UserType previous) { + + } + + @Override + public void onFunctionAdded(final FunctionMetadata function) { + + } + + @Override + public void onFunctionRemoved(final FunctionMetadata function) { + + } + + @Override + public void onFunctionChanged(final FunctionMetadata current, final FunctionMetadata previous) { + + } + + @Override + public void onAggregateAdded(final AggregateMetadata aggregate) { + + } + + @Override + public void onAggregateRemoved(final AggregateMetadata aggregate) { + + } + + @Override + public void onAggregateChanged(final AggregateMetadata current, final AggregateMetadata previous) { + + } + + @Override + public void onMaterializedViewAdded(final MaterializedViewMetadata view) { + + } + + @Override + public void onMaterializedViewRemoved(final MaterializedViewMetadata view) { + + } + + @Override + public void onMaterializedViewChanged(final MaterializedViewMetadata current, final MaterializedViewMetadata previous) { + + } + + @Override + public void onRegister(final Cluster cluster) { + + } + + @Override + public void onUnregister(final Cluster cluster) { + + } +} diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java index 839e6db..471693c 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaHolder.java @@ -5,8 +5,9 @@ */ package io.debezium.connector.cassandra; -import java.util.HashMap; -import java.util.HashSet; +import static java.util.stream.Collectors.toList; + +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -20,6 +21,9 @@ import com.datastax.driver.core.ColumnMetadata; import com.datastax.driver.core.TableMetadata; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.cassandra.exceptions.CassandraConnectorSchemaException; @@ -31,35 +35,21 @@ * by {@link SchemaProcessor} periodically. */ public class SchemaHolder { - private static final String NAMESPACE = "io.debezium.connector.cassandra"; private static final Logger LOGGER = LoggerFactory.getLogger(SchemaHolder.class); - private final Map tableToKVSchemaMap = new ConcurrentHashMap<>(); + public final Map tableToKVSchemaMap = new ConcurrentHashMap<>(); + public final Multimap keyspaceTableMap = Multimaps.synchronizedListMultimap(ArrayListMultimap.create()); - private final CassandraClient cassandraClient; - private final String kafkaTopicPrefix; - private final SourceInfoStructMaker sourceInfoStructMaker; + public final String kafkaTopicPrefix; + public final SourceInfoStructMaker sourceInfoStructMaker; - public SchemaHolder(CassandraClient cassandraClient, String kafkaTopicPrefix, SourceInfoStructMaker sourceInfoStructMaker) { - this.cassandraClient = cassandraClient; + public SchemaHolder(String kafkaTopicPrefix, SourceInfoStructMaker sourceInfoStructMaker) { this.kafkaTopicPrefix = kafkaTopicPrefix; this.sourceInfoStructMaker = sourceInfoStructMaker; - refreshSchemas(); - } - - public void refreshSchemas() { - LOGGER.info("Refreshing schemas..."); - Map latest = getLatestTableMetadatas(); - removeDeletedTableSchemas(latest); - createOrUpdateNewTableSchemas(latest); - LOGGER.info("Schemas are refreshed"); } - public KeyValueSchema getOrUpdateKeyValueSchema(KeyspaceTable kt) { - if (!tableToKVSchemaMap.containsKey(kt)) { - refreshSchema(kt); - } + public synchronized KeyValueSchema getKeyValueSchema(KeyspaceTable kt) { return tableToKVSchemaMap.getOrDefault(kt, null); } @@ -70,10 +60,36 @@ public Set getCdcEnabledTableMetadataSet() { .collect(Collectors.toSet()); } + public synchronized void removeKeyspace(String keyspace) { + final List collect = tableToKVSchemaMap.keySet() + .stream() + .filter(keyValueSchema -> keyValueSchema.keyspace.equals(keyspace)) + .collect(toList()); + + collect.forEach(tableToKVSchemaMap::remove); + collect.forEach(collected -> keyspaceTableMap.removeAll(collected.keyspace)); + } + + public synchronized void removeTable(KeyspaceTable kst) { + tableToKVSchemaMap.remove(kst); + keyspaceTableMap.remove(kst.keyspace, kst.table); + } + + // there is not "addKeyspace", it is not necessary + // as we will ever add a concrete table (with keyspace) but we will also dropping all tables when keyspace is dropped + public synchronized void addTable(KeyspaceTable kst, KeyValueSchema kvs) { + tableToKVSchemaMap.put(kst, kvs); + keyspaceTableMap.put(kst.keyspace, kst.table); + } + + public synchronized boolean contains(String keyspace, String table) { + return keyspaceTableMap.containsEntry(keyspace, table); + } + /** * Get the schema of an inner field based on the field name * @param fieldName the name of the field in the schema - * @param schema the schema where the field resides in + * @param schema the schema where the field resides in * @return Schema */ public static Schema getFieldSchema(String fieldName, Schema schema) { @@ -83,48 +99,6 @@ public static Schema getFieldSchema(String fieldName, Schema schema) { throw new CassandraConnectorSchemaException("Only STRUCT type is supported for this method, but encountered " + schema.type()); } - private void refreshSchema(KeyspaceTable keyspaceTable) { - LOGGER.debug("Refreshing schema for {}", keyspaceTable); - TableMetadata existing = tableToKVSchemaMap.containsKey(keyspaceTable) ? tableToKVSchemaMap.get(keyspaceTable).tableMetadata() : null; - TableMetadata latest = cassandraClient.getCdcEnabledTableMetadata(keyspaceTable.keyspace, keyspaceTable.table); - if (existing != latest) { - if (existing == null) { - tableToKVSchemaMap.put(keyspaceTable, new KeyValueSchema(kafkaTopicPrefix, latest, sourceInfoStructMaker)); - LOGGER.debug("Updated schema for {}", keyspaceTable); - } - if (latest == null) { - tableToKVSchemaMap.remove(keyspaceTable); - LOGGER.debug("Removed schema for {}", keyspaceTable); - } - } - } - - private Map getLatestTableMetadatas() { - Map latest = new HashMap<>(); - for (TableMetadata tm : cassandraClient.getCdcEnabledTableMetadataList()) { - latest.put(new KeyspaceTable(tm), tm); - } - return latest; - } - - private void removeDeletedTableSchemas(Map latestTableMetadataMap) { - Set existingTables = new HashSet<>(tableToKVSchemaMap.keySet()); - Set latestTables = latestTableMetadataMap.keySet(); - existingTables.removeAll(latestTables); - tableToKVSchemaMap.keySet().removeAll(existingTables); - } - - private void createOrUpdateNewTableSchemas(Map latestTableMetadataMap) { - latestTableMetadataMap.forEach((table, metadata) -> { - TableMetadata existingTableMetadata = tableToKVSchemaMap.containsKey(table) ? tableToKVSchemaMap.get(table).tableMetadata() : null; - if (existingTableMetadata == null || !existingTableMetadata.equals(metadata)) { - KeyValueSchema keyValueSchema = new KeyValueSchema(kafkaTopicPrefix, metadata, sourceInfoStructMaker); - tableToKVSchemaMap.put(table, keyValueSchema); - LOGGER.info("Updated schema for {}.", table); - } - }); - } - public static class KeyValueSchema { private final TableMetadata tableMetadata; private final Schema keySchema; diff --git a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java index 260e73e..5a86aa1 100644 --- a/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java @@ -5,6 +5,27 @@ */ package io.debezium.connector.cassandra; +import static java.util.stream.Collectors.toMap; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.schema.KeyspaceParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.SchemaChangeListener; +import com.datastax.driver.core.TableMetadata; + +import io.debezium.connector.SourceInfoStructMaker; + /** * The schema processor is responsible for periodically * refreshing the table schemas in Cassandra. Cassandra @@ -13,16 +34,169 @@ */ public class SchemaProcessor extends AbstractProcessor { + private static final Logger logger = LoggerFactory.getLogger(SchemaProcessor.class); + private static final String NAME = "Schema Processor"; private final SchemaHolder schemaHolder; + private final CassandraClient cassandraClient; + private final String kafkaTopicPrefix; + private final SourceInfoStructMaker sourceInfoStructMaker; + private final SchemaChangeListener schemaChangeListener; public SchemaProcessor(CassandraConnectorContext context) { super(NAME, context.getCassandraConnectorConfig().schemaPollInterval()); schemaHolder = context.getSchemaHolder(); + this.cassandraClient = context.getCassandraClient(); + this.kafkaTopicPrefix = schemaHolder.kafkaTopicPrefix; + this.sourceInfoStructMaker = schemaHolder.sourceInfoStructMaker; + + schemaChangeListener = new NoOpSchemaChangeListener() { + @Override + public void onKeyspaceAdded(final KeyspaceMetadata keyspace) { + Schema.instance.setKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); + Keyspace.openWithoutSSTables(keyspace.getName()); + logger.info("added keyspace {}", keyspace.asCQLQuery()); + } + + @Override + public void onKeyspaceChanged(final KeyspaceMetadata current, final KeyspaceMetadata previous) { + Schema.instance.updateKeyspace(current.getName(), KeyspaceParams.create(current.isDurableWrites(), current.getReplication())); + logger.info("updated keyspace {}", current.asCQLQuery()); + } + + @Override + public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) { + schemaHolder.removeKeyspace(keyspace.getName()); + Schema.instance.clearKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create( + keyspace.getName(), + KeyspaceParams.create(keyspace.isDurableWrites(), + keyspace.getReplication()))); + logger.info("removed keyspace {}", keyspace.asCQLQuery()); + } + + @Override + public void onTableAdded(final TableMetadata table) { + logger.info(String.format("Table %s.%s detected to be added!", table.getKeyspace().getName(), table.getName())); + schemaHolder.addTable(new KeyspaceTable(table), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, table, sourceInfoStructMaker)); + + final CFMetaData rawCFMetaData = CFMetaData.compile(table.asCQLQuery(), table.getKeyspace().getName()); + // we need to copy because CFMetaData.compile will generate new cfId which would not match id of old metadata + final CFMetaData newCFMetaData = rawCFMetaData.copy(table.getId()); + + Keyspace.open(newCFMetaData.ksName).initCf(newCFMetaData, false); + + final org.apache.cassandra.schema.KeyspaceMetadata current = Schema.instance.getKSMetaData(newCFMetaData.ksName); + if (current == null) { + throw new IllegalStateException(String.format("Keyspace %s doesn't exist", newCFMetaData.ksName)); + } + + if (current.tables.get(table.getName()).isPresent()) { + logger.info(String.format("table %s.%s is already added!", table.getKeyspace(), table.getName())); + return; + } + + final java.util.function.Function transformationFunction = ks -> ks + .withSwapped(ks.tables.with(newCFMetaData)); + + org.apache.cassandra.schema.KeyspaceMetadata transformed = transformationFunction.apply(current); + + Schema.instance.setKeyspaceMetadata(transformed); + Schema.instance.load(newCFMetaData); + + logger.info("added table {}", table.asCQLQuery()); + } + + @Override + public void onTableRemoved(final TableMetadata table) { + logger.info(String.format("Table %s.%s detected to be removed!", table.getKeyspace().getName(), table.getName())); + schemaHolder.removeTable(new KeyspaceTable(table)); + + final String ksName = table.getKeyspace().getName(); + final String tableName = table.getName(); + + final org.apache.cassandra.schema.KeyspaceMetadata oldKsm = Schema.instance.getKSMetaData(table.getKeyspace().getName()); + + if (oldKsm == null) { + throw new IllegalStateException(String.format("KeyspaceMetadata for keyspace %s is not found!", table.getKeyspace().getName())); + } + + final ColumnFamilyStore cfs = Keyspace.openWithoutSSTables(ksName).getColumnFamilyStore(tableName); + + if (cfs == null) { + throw new IllegalStateException(String.format("ColumnFamilyStore for %s.%s is not found!", table.getKeyspace(), table.getName())); + } + + // make sure all the indexes are dropped, or else. + cfs.indexManager.markAllIndexesRemoved(); + + // reinitialize the keyspace. + final CFMetaData cfm = oldKsm.tables.get(tableName).get(); + final org.apache.cassandra.schema.KeyspaceMetadata newKsm = oldKsm.withSwapped(oldKsm.tables.without(tableName)); + + Schema.instance.unload(cfm); + Schema.instance.setKeyspaceMetadata(newKsm); + + logger.info("removed table {}", table.asCQLQuery()); + } + + @Override + public void onTableChanged(final TableMetadata newTableMetadata, final TableMetadata oldTableMetaData) { + logger.info(String.format("Detected alternation in schema of %s.%s (previous cdc = %s, current cdc = %s)", + newTableMetadata.getKeyspace().getName(), + newTableMetadata.getName(), + oldTableMetaData.getOptions().isCDC(), + newTableMetadata.getOptions().isCDC())); + + schemaHolder.addTable(new KeyspaceTable(newTableMetadata), + new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, newTableMetadata, sourceInfoStructMaker)); + + final CFMetaData rawNewMetadata = CFMetaData.compile(newTableMetadata.asCQLQuery(), + newTableMetadata.getKeyspace().getName()); + + final CFMetaData oldMetadata = Schema.instance.getCFMetaData(oldTableMetaData.getKeyspace().getName(), oldTableMetaData.getName()); + + // we need to copy because CFMetaData.compile will generate new cfId which would not match id of old metadata + final CFMetaData newMetadata = rawNewMetadata.copy(oldMetadata.cfId); + oldMetadata.apply(newMetadata); + + logger.info("changed table {}", newTableMetadata.asCQLQuery()); + } + }; + } + + @Override + public void initialize() { + final Map tables = getTableMetadata() + .stream() + .collect(toMap(KeyspaceTable::new, + tableMetadata -> new SchemaHolder.KeyValueSchema(kafkaTopicPrefix, tableMetadata, sourceInfoStructMaker))); + + tables.forEach(schemaHolder::addTable); + + logger.info("Registering schema change listener ..."); + cassandraClient.getCluster().register(schemaChangeListener); } @Override public void process() { - schemaHolder.refreshSchemas(); + } + + @Override + public void destroy() { + logger.info("Unregistering schema change listener ..."); + cassandraClient.getCluster().unregister(schemaChangeListener); + logger.info("Clearing cdc keyspace / table map ... "); + schemaHolder.tableToKVSchemaMap.clear(); + } + + private List getTableMetadata() { + return cassandraClient.getCluster().getMetadata().getKeyspaces().stream() + .map(KeyspaceMetadata::getTables) + .flatMap(Collection::stream) + .collect(Collectors.toList()); } } diff --git a/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java b/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java index 33c76c0..e838a47 100644 --- a/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java +++ b/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java @@ -198,7 +198,7 @@ private static BuiltStatement generateSnapshotStatement(TableMetadata tableMetad private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet) throws IOException { String tableName = tableName(tableMetadata); KeyspaceTable keyspaceTable = new KeyspaceTable(tableMetadata); - SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable); + SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable); Schema keySchema = keyValueSchema.keySchema(); Schema valueSchema = keyValueSchema.valueSchema(); diff --git a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java index 8019cd0..5ab83d5 100644 --- a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java +++ b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java @@ -8,6 +8,15 @@ import java.util.HashMap; import java.util.Map; +import com.datastax.driver.core.DataType; + +import io.debezium.connector.cassandra.transforms.type.converter.BasicTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.ListTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.MapTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.SetTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.TupleTypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.TypeConverter; +import io.debezium.connector.cassandra.transforms.type.converter.UserTypeConverter; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BooleanType; @@ -20,6 +29,7 @@ import org.apache.cassandra.db.marshal.FloatType; import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.ShortType; import org.apache.cassandra.db.marshal.SimpleDateType; @@ -29,16 +39,6 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.UUIDType; -import com.datastax.driver.core.DataType; - -import io.debezium.connector.cassandra.transforms.type.converter.BasicTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.ListTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.MapTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.SetTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.TupleTypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.TypeConverter; -import io.debezium.connector.cassandra.transforms.type.converter.UserTypeConverter; - public final class CassandraTypeConverter { private CassandraTypeConverter() { @@ -71,6 +71,8 @@ private CassandraTypeConverter() { typeMap.put(DataType.Name.TUPLE, new TupleTypeConverter()); typeMap.put(DataType.Name.UDT, new UserTypeConverter()); typeMap.put(DataType.Name.UUID, new BasicTypeConverter<>(UUIDType.instance)); + typeMap.put(DataType.Name.VARINT, new BasicTypeConverter<>(IntegerType.instance)); + typeMap.put(DataType.Name.VARCHAR, new BasicTypeConverter<>(UTF8Type.instance)); } public static AbstractType convert(DataType type) { diff --git a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java index 6a7a057..4e18d0c 100644 --- a/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java +++ b/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java @@ -23,6 +23,7 @@ import org.apache.cassandra.db.marshal.FloatType; import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.MapType; @@ -80,6 +81,7 @@ private CassandraTypeDeserializer() { tmp.put(DoubleType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DOUBLE_TYPE)); tmp.put(DecimalType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DOUBLE_TYPE)); tmp.put(Int32Type.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.INT_TYPE)); + tmp.put(IntegerType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); tmp.put(ShortType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.SHORT_TYPE)); tmp.put(LongType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); tmp.put(TimeType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); diff --git a/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java index 21276e3..a1c48f6 100644 --- a/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/CommitLogProcessorTest.java @@ -29,11 +29,14 @@ public class CommitLogProcessorTest extends EmbeddedCassandraConnectorTestBase { private CassandraConnectorContext context; private CommitLogProcessor commitLogProcessor; + private SchemaProcessor schemaProcessor; @Before public void setUp() throws Exception { context = generateTaskContext(); commitLogProcessor = new CommitLogProcessor(context); + schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); commitLogProcessor.initialize(); } @@ -41,6 +44,7 @@ public void setUp() throws Exception { public void tearDown() throws Exception { deleteTestOffsets(context); commitLogProcessor.destroy(); + schemaProcessor.destroy(); context.cleanUp(); } @@ -48,7 +52,6 @@ public void tearDown() throws Exception { public void testProcessCommitLogs() throws Exception { int commitLogRowSize = 10; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b int, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); // programmatically add insertion and deletion events into commit log, this is because running an 'INSERT' or 'DELETE' // cql against the embedded Cassandra does not modify the commit log file on disk. diff --git a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java index 0d61a38..0f22366 100644 --- a/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/SchemaProcessorTest.java @@ -7,7 +7,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import org.junit.Test; @@ -19,6 +18,7 @@ public class SchemaProcessorTest extends EmbeddedCassandraConnectorTestBase { public void testProcess() throws Exception { CassandraConnectorContext context = generateTaskContext(); SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); SchemaHolder.KeyValueSchema keyValueSchema; String namespacePrefix = "io.debezium.connector.cassandra" + "." + EmbeddedCassandraConnectorTestBase.TEST_KAFKA_TOPIC_PREFIX + "." @@ -29,48 +29,48 @@ public void testProcess() throws Exception { assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("table1") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;"); + + Thread.sleep(5000); + schemaProcessor.process(); assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - assertNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); + assertNotNull(context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"))); context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table1") + " WITH cdc = true;"); schemaProcessor.process(); assertEquals(1, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table1" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); schemaProcessor.process(); - assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); + assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table1" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")); + keyValueSchema = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")); assertNotNull(keyValueSchema); expectedKeySchemaName = namespacePrefix + "." + "table2" + "." + "Key"; assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name()); expectedValueSchemaName = namespacePrefix + "." + "table2" + "." + "Value"; assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name()); - context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table2") + " ADD c text"); schemaProcessor.process(); assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size()); TableMetadata expectedTm1 = context.getCassandraClient().getCdcEnabledTableMetadata(TEST_KEYSPACE, "table1"); TableMetadata expectedTm2 = context.getCassandraClient().getCdcEnabledTableMetadata(TEST_KEYSPACE, "table2"); - TableMetadata tm1 = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")).tableMetadata(); - TableMetadata tm2 = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")).tableMetadata(); + TableMetadata tm1 = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")).tableMetadata(); + TableMetadata tm2 = context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")).tableMetadata(); assertEquals(expectedTm1, tm1); assertEquals(expectedTm2, tm2); - deleteTestKeyspaceTables(); context.cleanUp(); } diff --git a/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java b/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java index de95acc..b662cc6 100644 --- a/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java +++ b/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java @@ -31,12 +31,13 @@ public class SnapshotProcessorTest extends EmbeddedCassandraConnectorTestBase { public void testSnapshotTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); int tableSize = 5; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); for (int i = 0; i < tableSize; i++) { context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i)); @@ -75,11 +76,12 @@ public void testSnapshotTable() throws Exception { public void testSnapshotSkipsNonCdcEnabledTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); int tableSize = 5; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("non_cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;"); - context.getSchemaHolder().refreshSchemas(); for (int i = 0; i < tableSize; i++) { context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("non_cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i)); } @@ -99,10 +101,11 @@ public void testSnapshotEmptyTable() throws Exception { CassandraConnectorContext context = generateTaskContext(); AtomicBoolean globalTaskState = new AtomicBoolean(true); SnapshotProcessor snapshotProcessor = Mockito.spy(new SnapshotProcessor(context)); + SchemaProcessor schemaProcessor = new SchemaProcessor(context); + schemaProcessor.initialize(); when(snapshotProcessor.isRunning()).thenReturn(true); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); - context.getSchemaHolder().refreshSchemas(); ChangeEventQueue queue = context.getQueue(); assertEquals(queue.totalCapacity(), queue.remainingCapacity());