Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DCC Schema Refresh Fix #1

Open
wants to merge 15 commits into
base: 1.5-without-insta-changes
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
374 changes: 374 additions & 0 deletions REPORT.adoc

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-parent</artifactId>
<version>1.5.0-SNAPSHOT</version>
<version>1.5.0.Final</version>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why these .Finals are showing up as diff - they are the same on the target branch.

</parent>

<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-connector-cassandra</artifactId>
<name>Debezium Connector for Cassandra</name>
<version>1.5.0-SNAPSHOT</version>
<version>1.5.0.Final</version>
<packaging>jar</packaging>

<scm>
<connection>scm:git:[email protected]:debezium/debezium-connector-cassandra.git</connection>
<developerConnection>scm:git:[email protected]:debezium/debezium-connector-cassandra.git</developerConnection>
<url>https://github.com/debezium/debezium-connector-cassandra</url>
<tag>HEAD</tag>
<tag>v1.5.0.Final</tag>
</scm>

<properties>
Expand Down Expand Up @@ -119,6 +119,7 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>3.11.4</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public String getClusterName() {
return cluster.getMetadata().getClusterName();
}

public Cluster getCluster() {
return cluster;
}

public boolean isQueryable() {
return !cluster.isClosed() && !session.isClosed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public CassandraConnectorContext(CassandraConnectorConfig config) throws Excepti
.build();

// Setting up schema holder ...
this.schemaHolder = new SchemaHolder(this.cassandraClient, this.config.kafkaTopicPrefix(), this.config.getSourceInfoStructMaker());
this.schemaHolder = new SchemaHolder(this.config.kafkaTopicPrefix(), this.config.getSourceInfoStructMaker());

// Setting up a file-based offset manager ...
this.offsetWriter = new FileOffsetWriter(this.config.offsetBackingStoreDir());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

/**
* Handler that implements {@link CommitLogReadHandler} interface provided by Cassandra source code.
*
* <p>
* This handler implementation processes each {@link Mutation} and invokes one of the registered partition handler
* for each {@link PartitionUpdate} in the {@link Mutation} (a mutation could have multiple partitions if it is a batch update),
* which in turn makes one or more record via the {@link RecordMaker} and enqueue the record into the {@link ChangeEventQueue}.
Expand Down Expand Up @@ -77,7 +77,7 @@ public class CommitLogReadHandlerImpl implements CommitLogReadHandler {
}

/**
* A PartitionType represents the type of a PartitionUpdate.
* A PartitionType represents the type of a PartitionUpdate.
*/
enum PartitionType {
/**
Expand All @@ -86,7 +86,7 @@ enum PartitionType {
PARTITION_KEY_ROW_DELETION,

/**
* a partition-level deletion where partition key + clustering key = primary key
* a partition-level deletion where partition key + clustering key = primary key
*/
PARTITION_AND_CLUSTERING_KEY_ROW_DELETION,

Expand Down Expand Up @@ -147,7 +147,7 @@ public static boolean isPartitionDeletion(PartitionUpdate pu) {
}

/**
* A RowType represents different types of {@link Row}-level modifications in a Cassandra table.
* A RowType represents different types of {@link Row}-level modifications in a Cassandra table.
*/
enum RowType {
/**
Expand Down Expand Up @@ -303,18 +303,22 @@ private void process(PartitionUpdate pu, OffsetPosition offsetPosition, Keyspace
* Handle a valid deletion event resulted from a partition-level deletion by converting Cassandra representation
* of this event into a {@link Record} object and queue the record to {@link ChangeEventQueue}. A valid deletion
* event means a partition only has a single row, this implies there are no clustering keys.
*
* <p>
* The steps are:
* (1) Populate the "source" field for this event
* (2) Fetch the cached key/value schemas from {@link SchemaHolder}
* (3) Populate the "after" field for this event
* a. populate partition columns
* b. populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
* (1) Populate the "source" field for this event
* (2) Fetch the cached key/value schemas from {@link SchemaHolder}
* (3) Populate the "after" field for this event
* a. populate partition columns
* b. populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
*/
private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {

SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable);
SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable);
if (keyValueSchema == null) {
return;
}

Schema keySchema = keyValueSchema.keySchema();
Schema valueSchema = keyValueSchema.valueSchema();

Expand Down Expand Up @@ -349,27 +353,30 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo
* Handle a valid event resulted from a row-level modification by converting Cassandra representation of
* this event into a {@link Record} object and queue the record to {@link ChangeEventQueue}. A valid event
* implies this must be an insert, update, or delete.
*
* <p>
* The steps are:
* (1) Populate the "source" field for this event
* (2) Fetch the cached key/value schemas from {@link SchemaHolder}
* (3) Populate the "after" field for this event
* a. populate partition columns
* b. populate clustering columns
* c. populate regular columns
* d. for deletions, populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
* (1) Populate the "source" field for this event
* (2) Fetch the cached key/value schemas from {@link SchemaHolder}
* (3) Populate the "after" field for this event
* a. populate partition columns
* b. populate clustering columns
* c. populate regular columns
* d. for deletions, populate regular columns with null values
* (4) Assemble a {@link Record} object from the populated data and queue the record
*/
private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {

SchemaHolder.KeyValueSchema schema = schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable);
Schema keySchema = schema.keySchema();
Schema valueSchema = schema.valueSchema();
SchemaHolder.KeyValueSchema keyValueSchema = schemaHolder.getKeyValueSchema(keyspaceTable);
if (keyValueSchema == null) {
return;
}
Schema keySchema = keyValueSchema.keySchema();
Schema valueSchema = keyValueSchema.valueSchema();

RowData after = new RowData();
populatePartitionColumns(after, pu);
populateClusteringColumns(after, row, pu);
populateRegularColumns(after, row, rowType, schema);
populateRegularColumns(after, row, rowType, keyValueSchema);

long ts = rowType == DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.cassandra;

import com.datastax.driver.core.AggregateMetadata;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.FunctionMetadata;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.MaterializedViewMetadata;
import com.datastax.driver.core.SchemaChangeListener;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.UserType;

public class NoOpSchemaChangeListener implements SchemaChangeListener {
@Override
public void onKeyspaceAdded(final KeyspaceMetadata keyspace) {

}

@Override
public void onKeyspaceRemoved(final KeyspaceMetadata keyspace) {

}

@Override
public void onKeyspaceChanged(final KeyspaceMetadata current, final KeyspaceMetadata previous) {

}

@Override
public void onTableAdded(final TableMetadata table) {

}

@Override
public void onTableRemoved(final TableMetadata table) {

}

@Override
public void onTableChanged(final TableMetadata current, final TableMetadata previous) {

}

@Override
public void onUserTypeAdded(final UserType type) {

}

@Override
public void onUserTypeRemoved(final UserType type) {

}

@Override
public void onUserTypeChanged(final UserType current, final UserType previous) {

}

@Override
public void onFunctionAdded(final FunctionMetadata function) {

}

@Override
public void onFunctionRemoved(final FunctionMetadata function) {

}

@Override
public void onFunctionChanged(final FunctionMetadata current, final FunctionMetadata previous) {

}

@Override
public void onAggregateAdded(final AggregateMetadata aggregate) {

}

@Override
public void onAggregateRemoved(final AggregateMetadata aggregate) {

}

@Override
public void onAggregateChanged(final AggregateMetadata current, final AggregateMetadata previous) {

}

@Override
public void onMaterializedViewAdded(final MaterializedViewMetadata view) {

}

@Override
public void onMaterializedViewRemoved(final MaterializedViewMetadata view) {

}

@Override
public void onMaterializedViewChanged(final MaterializedViewMetadata current, final MaterializedViewMetadata previous) {

}

@Override
public void onRegister(final Cluster cluster) {

}

@Override
public void onUnregister(final Cluster cluster) {

}
}
Loading