Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DCC Schema Refresh Fix #1

Open
wants to merge 15 commits into
base: 1.5-without-insta-changes
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
368 changes: 368 additions & 0 deletions REPORT.adoc

Large diffs are not rendered by default.

152 changes: 152 additions & 0 deletions REPORT_2.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
== Debezium integration with Cassandra report - part 2

_by Stefan Miklosovic / stefan dot miklosovic at instaclustr dot com_

=== Introduction

In this document, we will retrospect what we have actually done in the part 1 and
we will build on that to improve the solution.

=== Nature of the problem

The problem with solution 1 is that it is not deterministic. The reason for
non-determinism seems to be the fact that it is more or less unpredictable when
Cassandra flushes / persists changes, e.g in `cdc = true / false` to disk so our
refresh of schema will pick these changes up.

Hence we might see the following:

1) A table is created with cdc = true
2) We detect the change in schema listener of driver and we refresh the schema in Debezium via mechanics of Cassandra
3) We think that our internal structure is indeed refreshed but our `cdc` is still false.

This means that even Cassandra process is internally aware of the fact that we have
enabled cdc on a particular table, Debezium does not reflect this because
sometimes changes are flushed / persisted just fine but sometimes it takes time
to propagate these changes and it might be too late for Debezium as listener was already invoked.

=== Possible solutions

The are two solutions in general to this problem:

1) Faking what Cassandra does in Debezium to have same data structures too.

This is rather delicate operation / topic to deal with but it is possible and we chose to go with
this solution for a time being.

It merges two main concepts:

a) Debezium is informed about schema changes via provided schema change listener registered on driver
b) once a respective method on a listner is invoked, we mock same code what Cassandra would invoke but
in such a way that the parts which would be errorneous (because Debezium just does not run Cassandra) are
skipped.

By doing b), we are internally holding a logical copy of what the real Cassandra is holding and we are
synchronizing Cassandra internal structures (keyspaces, tables ...) by registering
schema change listener and applying same changes to "Cassandra" in Debezium process.

Lets go through the core of this logic, starting with "onKeyspaceAdded":

[source,java]
----
schemaChangeListener = new NoOpSchemaChangeListener() {
@Override
public void onKeyspaceAdded(final KeyspaceMetadata keyspace) {
Schema.instance.setKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create(
keyspace.getName(),
KeyspaceParams.create(keyspace.isDurableWrites(),
keyspace.getReplication())));
Keyspace.openWithoutSSTables(keyspace.getName());
logger.info("added keyspace {}", keyspace.asCQLQuery());
}
----

Here we fake that we opened a keyspace. This will populate some internal structures of Cassandra and so on so
our hot Cassandra code in Debezium "knows" what keyspace was added and so on.

On a keyspace's update, we do:

[source,java]
----
@Override
public void onKeyspaceChanged(KeyspaceMetadata current,
KeyspaceMetadata previous) {
Schema.instance.updateKeyspace(current.getName(),
KeyspaceParams.create(current.isDurableWrites(),
current.getReplication()));
}
----

When a keyspace is removed, we do:

[source,java]
----
@Override
public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) {
schemaHolder.removeKeyspace(keyspace.getName());
// here KeyspaceMetadata are of Cassandra, not driver's as in method argument
Schema.instance.clearKeyspaceMetadata(KeyspaceMetadata.create(
keyspace.getName(),
KeyspaceParams.create(keyspace.isDurableWrites(),
keyspace.getReplication())));
}
----

We are removing a keyspace from our schema holder too. Think about it, if we removed whole keyspace
by "DROP KEYSPACE abc", all tables are removed too so we just get rid of all tables of that keyspace
in our schema holder as well.

We left last three methods of a listener - onTableAdded, onTableChanged and onTableRemoved
for a reader to go through. The code you see is more or less what Cassandra does internally but
it is refactored in such a way that parts with are not needed (nor desired to be done) are just skipped.

Please follow this https://github.com/instaclustr/debezium-connector-cassandra/blob/dd2/src/main/java/io/debezium/connector/cassandra/SchemaProcessor.java#L81-L168[link].

Once we put into into the action, metadata will be populated right with `cdc` flag on TableParams and so on.
`Mutation` will be as well serialised properly because it will reach into ColumnFamily's metadata which
has `cdc = true` because we were notified about this change in a listener and we updated
that table in Cassandra code so the following deserialisation of a Mutation where this code
is called will not throw:

[source,java]
----
public static class Serializer
{
public void serialize(CFMetaData metadata, DataOutputPlus out, int version) throws IOException
{
UUIDSerializer.serializer.serialize(metadata.cfId, out, version);
}

public CFMetaData deserialize(DataInputPlus in, int version) throws IOException
{
UUID cfId = UUIDSerializer.serializer.deserialize(in, version);
CFMetaData metadata = Schema.instance.getCFMetaData(cfId);
if (metadata == null)
{
String message = String.format("Couldn't find table for cfId %s. If a table was just " +
"created, this is likely due to the schema not being fully propagated. Please wait for schema " +
"agreement on table creation.", cfId);
throw new UnknownColumnFamilyException(message, cfId);
}

return metadata;
}
----

Keep in mind that we are not "initialising" Cassandra by any way, when Debezium starts,
internals of Cassandra will already read tables on the disk and so on so it will be fully running
but we will never be notified about what happens afterwards (that cdc was changed from false to true, for example). For
that reason there is a schema change listener which synchronizes it. We might be notified about that via listener, that is true,
but schema refreshment does not always help and we would end up being notified about changes but we would not have any
way to make these changes visible to Cassandra internal's code - only invoking core Cassandra structures and emulating
we are running it in a proper Cassandra node will make deserialisation of a mutation possible because previously our
cdc flag was always false (was not updating) so the handling of such mutation was effectively skipped.

The second solution consists of making an agent from Debezium - this means that it will see same data structures as Cassandra,
by definition. The problem with this solution we see is that it is rather tricky to do because
Debezium would suddenly start to have same lifecycle as Cassandra (or the other way around - Cassandra
would have same lifecycle as Debezium) - as they are inherently connected together.

Another problem we see is that the dependencies which Debezium uses are not compatible with
what Cassandra uses and it would be just not possible to "merge it". By merely checking,
the probability this would be the case is quite high, there is Cassandra connector of
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-parent</artifactId>
<version>1.5.0-SNAPSHOT</version>
<version>1.5.0.Final</version>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why these .Finals are showing up as diff - they are the same on the target branch.

</parent>

<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-connector-cassandra</artifactId>
<name>Debezium Connector for Cassandra</name>
<version>1.5.0-SNAPSHOT</version>
<version>1.5.0.Final</version>
<packaging>jar</packaging>

<scm>
<connection>scm:git:[email protected]:debezium/debezium-connector-cassandra.git</connection>
<developerConnection>scm:git:[email protected]:debezium/debezium-connector-cassandra.git</developerConnection>
<url>https://github.com/debezium/debezium-connector-cassandra</url>
<tag>HEAD</tag>
<tag>v1.5.0.Final</tag>
</scm>

<properties>
Expand Down Expand Up @@ -119,6 +119,7 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>3.11.4</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public String getClusterName() {
return cluster.getMetadata().getClusterName();
}

public Cluster getCluster() {
return cluster;
}

public boolean isQueryable() {
return !cluster.isClosed() && !session.isClosed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public CassandraConnectorContext(CassandraConnectorConfig config) throws Excepti
.build();

// Setting up schema holder ...
this.schemaHolder = new SchemaHolder(this.cassandraClient, this.config.kafkaTopicPrefix(), this.config.getSourceInfoStructMaker());
this.schemaHolder = new SchemaHolder(this.config.kafkaTopicPrefix(), this.config.getSourceInfoStructMaker());

// Setting up a file-based offset manager ...
this.offsetWriter = new FileOffsetWriter(this.config.offsetBackingStoreDir());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

/**
* Handler that implements {@link CommitLogReadHandler} interface provided by Cassandra source code.
*
* <p>
* This handler implementation processes each {@link Mutation} and invokes one of the registered partition handler
* for each {@link PartitionUpdate} in the {@link Mutation} (a mutation could have multiple partitions if it is a batch update),
* which in turn makes one or more record via the {@link RecordMaker} and enqueue the record into the {@link ChangeEventQueue}.
Expand Down Expand Up @@ -77,7 +77,7 @@ public class CommitLogReadHandlerImpl implements CommitLogReadHandler {
}

/**
* A PartitionType represents the type of a PartitionUpdate.
* A PartitionType represents the type of a PartitionUpdate.
*/
enum PartitionType {
/**
Expand All @@ -86,7 +86,7 @@ enum PartitionType {
PARTITION_KEY_ROW_DELETION,

/**
* a partition-level deletion where partition key + clustering key = primary key
* a partition-level deletion where partition key + clustering key = primary key
*/
PARTITION_AND_CLUSTERING_KEY_ROW_DELETION,

Expand Down Expand Up @@ -147,7 +147,7 @@ public static boolean isPartitionDeletion(PartitionUpdate pu) {
}

/**
* A RowType represents different types of {@link Row}-level modifications in a Cassandra table.
* A RowType represents different types of {@link Row}-level modifications in a Cassandra table.
*/
enum RowType {
/**
Expand Down Expand Up @@ -303,18 +303,22 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace
* Handle a valid deletion event resulted from a partition-level deletion by converting Cassandra representation
* of this event into a {@link Record} object and queue the record to {@link ChangeEventQueue}. A valid deletion
* event means a partition only has a single row, this implies there are no clustering keys.
*
* <p>
* The steps are:
* (1) Populate the "source" field for this event
* (2) Fetch the cached key/value schemas from {@link SchemaHolder}
* (3) Populate the "after" field for this event
* a. populate partition columns
* b. populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
* (1) Populate the "source" field for this event
* (2) Fetch the cached key/value schemas from {@link SchemaHolder}
* (3) Populate the "after" field for this event
* a. populate partition columns
* b. populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
*/
private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {

SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable);
SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable);
if (keyValueSchema == null) {
return;
}

Schema keySchema = keyValueSchema.keySchema();
Schema valueSchema = keyValueSchema.valueSchema();

Expand Down Expand Up @@ -349,27 +353,30 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo
* Handle a valid event resulted from a row-level modification by converting Cassandra representation of
* this event into a {@link Record} object and queue the record to {@link ChangeEventQueue}. A valid event
* implies this must be an insert, update, or delete.
*
* <p>
* The steps are:
* (1) Populate the "source" field for this event
* (2) Fetch the cached key/value schemas from {@link SchemaHolder}
* (3) Populate the "after" field for this event
* a. populate partition columns
* b. populate clustering columns
* c. populate regular columns
* d. for deletions, populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
* (1) Populate the "source" field for this event
* (2) Fetch the cached key/value schemas from {@link SchemaHolder}
* (3) Populate the "after" field for this event
* a. populate partition columns
* b. populate clustering columns
* c. populate regular columns
* d. for deletions, populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
*/
private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {

SchemaHolder.KeyValueSchema schema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable);
Schema keySchema = schema.keySchema();
Schema valueSchema = schema.valueSchema();
SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable);
if (keyValueSchema == null) {
return;
}
Schema keySchema = keyValueSchema.keySchema();
Schema valueSchema = keyValueSchema.valueSchema();

RowData after = new RowData();
populatePartitionColumns(after, pu);
populateClusteringColumns(after, row, pu);
populateRegularColumns(after, row, rowType, schema);
populateRegularColumns(after, row, rowType, keyValueSchema);

long ts = rowType == DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp();

Expand Down
Loading