diff --git a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/DatabendChangeEvent.java b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/DatabendChangeEvent.java index 3332f2c..c3c2bed 100644 --- a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/DatabendChangeEvent.java +++ b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/DatabendChangeEvent.java @@ -28,7 +28,7 @@ public class DatabendChangeEvent { protected final String destination; protected final JsonNode value; protected final JsonNode key; - final Schema schema; + public final Schema schema; public DatabendChangeEvent(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) { this.destination = destination; diff --git a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/DatabendUtil.java b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/DatabendUtil.java index e00236b..8abf382 100644 --- a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/DatabendUtil.java +++ b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/DatabendUtil.java @@ -259,6 +259,14 @@ public static Map findDecimalFields(DatabendChangeEvent.Schema s return decimalFields; } + public static boolean isSchemaChanged(DatabendChangeEvent.Schema schema) { + String schemaNameStr = schema.keySchema().get("name").textValue(); + if (schemaNameStr.toLowerCase().contains("schemachange")) { + return true; + } + return false; + } + private static String createTableSQL(String schemaName, String originalSQL, DatabendChangeEvent.Schema schema) { //"CREATE TABLE debeziumcdc_customers_append (__deleted boolean, id bigint, first_name varchar, __op varchar, __source_ts_ms bigint);"; String[] parts = originalSQL.split("\\s", 4); diff --git a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/AppendTableWriter.java b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/AppendTableWriter.java index b4729b0..ccab567 100644 --- a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/AppendTableWriter.java +++ b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/AppendTableWriter.java @@ -11,7 +11,7 @@ import java.sql.Connection; public class AppendTableWriter extends BaseTableWriter { - public AppendTableWriter(Connection connection, String identifierQuoteCharacter) { - super(connection, identifierQuoteCharacter); + public AppendTableWriter(Connection connection, String identifierQuoteCharacter, boolean isSchemaEvolutionEnabled) { + super(connection, identifierQuoteCharacter, isSchemaEvolutionEnabled); } } diff --git a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/BaseTableWriter.java b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/BaseTableWriter.java index 94b959e..093d08a 100644 --- a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/BaseTableWriter.java +++ b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/BaseTableWriter.java @@ -9,20 +9,20 @@ package io.debezium.server.databend.tablewriter; import com.fasterxml.jackson.databind.JsonNode; +import io.debezium.server.databend.DatabendChangeConsumer; import io.debezium.server.databend.DatabendChangeEvent; import io.debezium.server.databend.DatabendUtil; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.enterprise.context.Dependent; import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Connection; import java.sql.*; import java.sql.SQLException; -import java.util.Arrays; -import java.util.Base64; -import java.util.List; -import java.util.Map; +import java.util.*; import static io.debezium.server.databend.DatabendUtil.addParametersToStatement; @@ -31,28 +31,62 @@ public abstract class BaseTableWriter { protected static final Logger LOGGER = LoggerFactory.getLogger(BaseTableWriter.class); final Connection connection; final String identifierQuoteCharacter; + final boolean isSchemaEvolutionEnabled; - public BaseTableWriter(final Connection connection, String identifierQuoteCharacter) { + public BaseTableWriter(final Connection connection, String identifierQuoteCharacter, boolean isSchemaEvolutionEnabled) { this.connection = connection; this.identifierQuoteCharacter = identifierQuoteCharacter; + this.isSchemaEvolutionEnabled = isSchemaEvolutionEnabled; } public void addToTable(final RelationalTable table, final List events) { final String sql = table.prepareInsertStatement(this.identifierQuoteCharacter); int inserts = 0; + List schemaEvolutionEvents = new ArrayList<>(); try (PreparedStatement statement = connection.prepareStatement(sql)) { connection.setAutoCommit(false); for (DatabendChangeEvent event : events) { - addParametersToStatement(statement, event); - statement.addBatch(); - - int[] batchResult = statement.executeBatch(); - inserts = Arrays.stream(batchResult).sum(); - System.out.printf("insert rows %d%n", inserts); + if (DatabendUtil.isSchemaChanged(event.schema()) && isSchemaEvolutionEnabled) { + schemaEvolutionEvents.add(event); + } else { + addParametersToStatement(statement, event); + statement.addBatch(); + } } + + // Each batch needs to have the same schemas, so get the buffered records out + int[] batchResult = statement.executeBatch(); + inserts = Arrays.stream(batchResult).sum(); + System.out.printf("insert rows %d%n", inserts); } catch (SQLException e) { throw new RuntimeException(e.getMessage()); } + // handle schema evolution + try { + schemaEvolution(schemaEvolutionEvents); + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + } + + public void schemaEvolution(List events) { + for (DatabendChangeEvent event : events) { + Map values = event.valueAsMap(); + for (Map.Entry entry : values.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + System.out.println("Key: " + key + ", Value: " + value); + if (entry.getKey().contains("ddl")) { + String ddlSql = entry.getValue().toString(); + try (PreparedStatement statement = connection.prepareStatement(ddlSql)) { + System.out.println(ddlSql); + statement.execute(ddlSql); + } catch (SQLException e) { + throw new RuntimeException(e.getMessage()); + } + } + } + } } } diff --git a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/TableWriterFactory.java b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/TableWriterFactory.java index cce53eb..cf54882 100644 --- a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/TableWriterFactory.java +++ b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/TableWriterFactory.java @@ -16,11 +16,14 @@ public class TableWriterFactory { @ConfigProperty(name = "debezium.sink.databend.identifier-quote-char", defaultValue = "") Optional identifierQuoteCharacter; + @ConfigProperty(name = "debezium.sink.databend.schema.evolution", defaultValue = "false") + boolean isSchemaEvolutionEnabled; + public BaseTableWriter get(final Connection connection) { if (upsert) { - return new UpsertTableWriter(connection, identifierQuoteCharacter.orElse(""), upsertKeepDeletes); + return new UpsertTableWriter(connection, identifierQuoteCharacter.orElse(""), upsertKeepDeletes, isSchemaEvolutionEnabled); } else { - return new AppendTableWriter(connection, identifierQuoteCharacter.orElse("")); + return new AppendTableWriter(connection, identifierQuoteCharacter.orElse(""),isSchemaEvolutionEnabled); } } } diff --git a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/UpsertTableWriter.java b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/UpsertTableWriter.java index 2ae7714..3f9925a 100644 --- a/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/UpsertTableWriter.java +++ b/debezium-server-databend-sink/src/main/java/io/debezium/server/databend/tablewriter/UpsertTableWriter.java @@ -36,10 +36,10 @@ public class UpsertTableWriter extends BaseTableWriter { final boolean upsertKeepDeletes; protected static final Logger LOGGER = LoggerFactory.getLogger(UpsertTableWriter.class); - public UpsertTableWriter(Connection connection, String identifierQuoteCharacter, boolean upsertKeepDeletes) { - super(connection, identifierQuoteCharacter); + public UpsertTableWriter(Connection connection, String identifierQuoteCharacter, boolean upsertKeepDeletes, boolean isSchemaEvolutionEnabled) { + super(connection, identifierQuoteCharacter,isSchemaEvolutionEnabled); this.upsertKeepDeletes = upsertKeepDeletes; - appendTableWriter = new AppendTableWriter(connection, identifierQuoteCharacter); + appendTableWriter = new AppendTableWriter(connection, identifierQuoteCharacter,isSchemaEvolutionEnabled); } @Override @@ -56,6 +56,7 @@ public void deleteUpsert(final RelationalTable table, final List deleteEvents = new ArrayList<>(); + List schemaEvolutionEvents = new ArrayList<>(); try (PreparedStatement statement = connection.prepareStatement(upsertSql)) { connection.setAutoCommit(false); @@ -71,9 +72,12 @@ public void deleteUpsert(final RelationalTable table, final List events) throws Exception { @@ -104,7 +114,6 @@ public void deleteFromTable(final RelationalTable table, final List parameters) throws Exception { String primaryValue = ""; for (Map.Entry entry : parameters.entrySet()) {