Skip to content

Commit

Permalink
Merge pull request #20 from databendcloud/feat/schema-evolution
Browse files Browse the repository at this point in the history
feat: support auto schema evolution
  • Loading branch information
hantmac authored Jan 26, 2024
2 parents 47d1fe9 + e3a5609 commit a36ea28
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DatabendChangeEvent {
protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
final Schema schema;
public final Schema schema;

public DatabendChangeEvent(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) {
this.destination = destination;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,6 @@ public static void addParametersToStatement(PreparedStatement statement, Databen
Map<String, Object> values = event.valueAsMap();
//DatabendChangeEvent.Schema k = event.schema();
Map<String, String> decimalFields = DatabendUtil.findDecimalFields(event.schema());
System.out.println("valueSchema: " + event.schema.valueSchema());
System.out.println("keySchema: " + event.schema.keySchema());
System.out.println("valueAsMap" + event.valueAsMap());
System.out.println("keyAsMap" + event.keyAsMap());
int index = 1;
for (String key : values.keySet()) {
if (decimalFields.containsKey(key)) {
Expand Down Expand Up @@ -259,14 +255,24 @@ public static Map<String, String> findDecimalFields(DatabendChangeEvent.Schema s
return decimalFields;
}

public static boolean isSchemaChanged(DatabendChangeEvent.Schema schema) {
if (schema == null || schema.keySchema() == null || schema.keySchema().get("name") == null) {
return false;
}
String schemaNameStr = schema.keySchema().get("name").textValue();
if (schemaNameStr.toLowerCase().contains("schemachangekey")) {
return true;
}
return false;
}

private static String createTableSQL(String schemaName, String originalSQL, DatabendChangeEvent.Schema schema) {
//"CREATE TABLE debeziumcdc_customers_append (__deleted boolean, id bigint, first_name varchar, __op varchar, __source_ts_ms bigint);";
String[] parts = originalSQL.split("\\s", 4);
parts[2] = schemaName + "." + parts[2];
//
String modifiedSQL = String.join(" ", parts);
// String modifiedSQL = originalSQL;
System.out.println("sjh" + modifiedSQL);
// replace `decimal` with `decimal(precision,scale)` by handling schema.valueSchema()
for (JsonNode jsonSchemaFieldNode : schema.valueSchema().get("fields")) {
// if the field is decimal, replace it with decimal(precision,scale)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.sql.Connection;

public class AppendTableWriter extends BaseTableWriter {
public AppendTableWriter(Connection connection, String identifierQuoteCharacter) {
super(connection, identifierQuoteCharacter);
public AppendTableWriter(Connection connection, String identifierQuoteCharacter, boolean isSchemaEvolutionEnabled) {
super(connection, identifierQuoteCharacter, isSchemaEvolutionEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@
package io.debezium.server.databend.tablewriter;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.server.databend.DatabendChangeConsumer;
import io.debezium.server.databend.DatabendChangeEvent;
import io.debezium.server.databend.DatabendUtil;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.enterprise.context.Dependent;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.*;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.debezium.server.databend.DatabendUtil.addParametersToStatement;

Expand All @@ -31,29 +33,83 @@ public abstract class BaseTableWriter {
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseTableWriter.class);
final Connection connection;
final String identifierQuoteCharacter;
final boolean isSchemaEvolutionEnabled;

public BaseTableWriter(final Connection connection, String identifierQuoteCharacter) {
public BaseTableWriter(final Connection connection, String identifierQuoteCharacter, boolean isSchemaEvolutionEnabled) {
this.connection = connection;
this.identifierQuoteCharacter = identifierQuoteCharacter;
this.isSchemaEvolutionEnabled = isSchemaEvolutionEnabled;
}

public void addToTable(final RelationalTable table, final List<DatabendChangeEvent> events) {
final String sql = table.prepareInsertStatement(this.identifierQuoteCharacter);
int inserts = 0;
List<DatabendChangeEvent> schemaEvolutionEvents = new ArrayList<>();
try (PreparedStatement statement = connection.prepareStatement(sql)) {
connection.setAutoCommit(false);
for (DatabendChangeEvent event : events) {
addParametersToStatement(statement, event);
statement.addBatch();

int[] batchResult = statement.executeBatch();
inserts = Arrays.stream(batchResult).sum();
System.out.printf("insert rows %d%n", inserts);
if (DatabendUtil.isSchemaChanged(event.schema()) && isSchemaEvolutionEnabled) {
schemaEvolutionEvents.add(event);
} else {
addParametersToStatement(statement, event);
statement.addBatch();
}
}

// Each batch needs to have the same schemas, so get the buffered records out
int[] batchResult = statement.executeBatch();
inserts = Arrays.stream(batchResult).sum();
System.out.printf("insert rows %d%n", inserts);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage());
}
// handle schema evolution
try {
schemaEvolution(table, schemaEvolutionEvents);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}

public void schemaEvolution(RelationalTable table, List<DatabendChangeEvent> events) {
for (DatabendChangeEvent event : events) {
Map<String, Object> values = event.valueAsMap();
for (Map.Entry<String, Object> entry : values.entrySet()) {
// String key = entry.getKey();
// Object value = entry.getValue();
// System.out.println("Key: " + key + ", Value: " + value);
if (entry.getKey().contains("ddl") && entry.getValue().toString().toLowerCase().contains("alter table")) {
String tableName = getFirstWordAfterAlterTable(entry.getValue().toString());
String ddlSql = replaceFirstWordAfterTable(entry.getValue().toString(), table.databaseName + "." + tableName);
try (PreparedStatement statement = connection.prepareStatement(ddlSql)) {
System.out.println(ddlSql);
statement.execute(ddlSql);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage());
}
}
}
}
}

public static String replaceFirstWordAfterTable(String statement, String newTableName) {
if (statement == null || newTableName == null) {
return statement;
}
Pattern pattern = Pattern.compile("(?<=table )\\w+");
Matcher matcher = pattern.matcher(statement);
return matcher.replaceFirst(newTableName);
}

public static String getFirstWordAfterAlterTable(String alterStatement) {
if (alterStatement == null) {
return null;
}
String[] parts = alterStatement.split(" ");
if (parts.length >= 3) {
return parts[2];
}
return null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class RelationalTable {
protected static final Logger LOGGER = LoggerFactory.getLogger(RelationalTable.class);

public final String tableName;
private final String databaseName;
public final String databaseName;
public final Map<String, DatabendRawType> columns = new HashMap<>();
public final Map<String, Integer> primaryKeysMap = new HashMap<>();
public final String primaryKey;
Expand Down Expand Up @@ -59,7 +59,7 @@ public RelationalTable(String primaryKey, String databaseName, String tableName,
if (!primaryKey.isEmpty()) {
primaryKeysMap.put(primaryKey, 1);
}
LOGGER.warn("Loaded Databend table {}.{} \nColumns:{} \nPK:{}", schema, table, columns, primaryKeysMap);
LOGGER.warn("Loaded Databend table {}.{} \nColumns:{} \nPK:{}", schema, table, columns, primaryKeysMap);
}

if (numTablesFound == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ public class TableWriterFactory {
@ConfigProperty(name = "debezium.sink.databend.identifier-quote-char", defaultValue = "")
Optional<String> identifierQuoteCharacter;

@ConfigProperty(name = "debezium.sink.databend.schema.evolution", defaultValue = "false")
boolean isSchemaEvolutionEnabled;

public BaseTableWriter get(final Connection connection) {
if (upsert) {
return new UpsertTableWriter(connection, identifierQuoteCharacter.orElse(""), upsertKeepDeletes);
return new UpsertTableWriter(connection, identifierQuoteCharacter.orElse(""), upsertKeepDeletes, isSchemaEvolutionEnabled);
} else {
return new AppendTableWriter(connection, identifierQuoteCharacter.orElse(""));
return new AppendTableWriter(connection, identifierQuoteCharacter.orElse(""),isSchemaEvolutionEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public class UpsertTableWriter extends BaseTableWriter {
final boolean upsertKeepDeletes;
protected static final Logger LOGGER = LoggerFactory.getLogger(UpsertTableWriter.class);

public UpsertTableWriter(Connection connection, String identifierQuoteCharacter, boolean upsertKeepDeletes) {
super(connection, identifierQuoteCharacter);
public UpsertTableWriter(Connection connection, String identifierQuoteCharacter, boolean upsertKeepDeletes, boolean isSchemaEvolutionEnabled) {
super(connection, identifierQuoteCharacter, isSchemaEvolutionEnabled);
this.upsertKeepDeletes = upsertKeepDeletes;
appendTableWriter = new AppendTableWriter(connection, identifierQuoteCharacter);
appendTableWriter = new AppendTableWriter(connection, identifierQuoteCharacter, isSchemaEvolutionEnabled);
}

@Override
Expand All @@ -56,6 +56,7 @@ public void deleteUpsert(final RelationalTable table, final List<DatabendChangeE
final String upsertSql = table.preparedUpsertStatement(this.identifierQuoteCharacter);
int inserts = 0;
List<DatabendChangeEvent> deleteEvents = new ArrayList<>();
List<DatabendChangeEvent> schemaEvolutionEvents = new ArrayList<>();

try (PreparedStatement statement = connection.prepareStatement(upsertSql)) {
connection.setAutoCommit(false);
Expand All @@ -71,9 +72,12 @@ public void deleteUpsert(final RelationalTable table, final List<DatabendChangeE
// here use soft delete
// if true delete, we can use this condition event.keyAsMap().containsKey(deleteColumn)
deleteEvents.add(event);
} else if (DatabendUtil.isSchemaChanged(event.schema()) && isSchemaEvolutionEnabled) {
schemaEvolutionEvents.add(event);
}
}

// Each batch needs to have the same schemas, so get the buffered records out
int[] batchResult = statement.executeBatch();
inserts = Arrays.stream(batchResult).sum();

Expand All @@ -89,6 +93,12 @@ public void deleteUpsert(final RelationalTable table, final List<DatabendChangeE
throw new RuntimeException(e.getMessage());
}

//handle schema changed events
try {
schemaEvolution(table, schemaEvolutionEvents);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}

public void deleteFromTable(final RelationalTable table, final List<DatabendChangeEvent> events) throws Exception {
Expand All @@ -104,7 +114,6 @@ public void deleteFromTable(final RelationalTable table, final List<DatabendChan
}
}


private String getPrimaryKeyValue(String primaryKey, Map<String, Object> parameters) throws Exception {
String primaryValue = "";
for (Map.Entry<String, Object> entry : parameters.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ debezium.source.database.history=io.debezium.relational.history.FileDatabaseHist
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
# https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-new-record-state-drop-tombstones
debezium.transforms=unwrap,a
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.a.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
debezium.transforms.a.target.type=string
debezium.transforms.a.field=a
#debezium.transforms=unwrap,a
#debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
#debezium.transforms.a.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
#debezium.transforms.a.target.type=string
#debezium.transforms.a.field=a
# datetime format
debezium.transforms.a.format=yyyy-MM-dd
debezium.source.time.precision.mode=connect
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.debezium.databend.tablewriter;

import org.junit.Assert;
import org.junit.jupiter.api.Test;

import static io.debezium.server.databend.tablewriter.BaseTableWriter.replaceFirstWordAfterTable;

public class TableWriterTest {
@Test
public void testFirstWordAfterTable() throws Exception {
String statement = "alter table products add column a int";
String newStatement = replaceFirstWordAfterTable(statement, "newTable");
System.out.println(newStatement);
Assert.assertEquals(newStatement, "alter table newTable add column a int");

statement = "alter table products drop column a";
newStatement = replaceFirstWordAfterTable(statement, "yyy");
System.out.println(newStatement);
Assert.assertEquals(newStatement, "alter table yyy drop column a");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,44 @@

import org.apache.kafka.connect.source.SourceRecord;

@SuppressWarnings("unchecked")
public class TestUtil {
static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
static final SecureRandom rnd = new SecureRandom();


public static int randomInt(int low, int high) {
return rnd.nextInt(high - low) + low;
}

public static String randomString(int len) {
StringBuilder sb = new StringBuilder(len);
for (int i = 0; i < len; i++)
sb.append(AB.charAt(rnd.nextInt(AB.length())));
return sb.toString();
}

public static DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> getCommitter() {
return new DebeziumEngine.RecordCommitter() {
public synchronized void markProcessed(SourceRecord record) {
}

@Override
public void markProcessed(Object record) {
}

public synchronized void markBatchFinished() {
}

@Override
public void markProcessed(Object record, DebeziumEngine.Offsets sourceOffsets) {
}

@Override
public DebeziumEngine.Offsets buildOffsets() {
return null;
}
};
}
static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
static final SecureRandom rnd = new SecureRandom();


public static int randomInt(int low, int high) {
return rnd.nextInt(high - low) + low;
}

public static String randomString(int len) {
StringBuilder sb = new StringBuilder(len);
for (int i = 0; i < len; i++)
sb.append(AB.charAt(rnd.nextInt(AB.length())));
return sb.toString();
}

public static DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> getCommitter() {
return new DebeziumEngine.RecordCommitter() {
public synchronized void markProcessed(SourceRecord record) {
}

@Override
public void markProcessed(Object record) {
}

public synchronized void markBatchFinished() {
}

@Override
public void markProcessed(Object record, DebeziumEngine.Offsets sourceOffsets) {
}

@Override
public DebeziumEngine.Offsets buildOffsets() {
return null;
}
};
}

}

0 comments on commit a36ea28

Please sign in to comment.