Skip to content

Commit

Permalink
feat: support auto schema evolution
Browse files Browse the repository at this point in the history
  • Loading branch information
hantmac committed Dec 21, 2023
1 parent 47d1fe9 commit 64be2a8
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DatabendChangeEvent {
protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
final Schema schema;
public final Schema schema;

public DatabendChangeEvent(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) {
this.destination = destination;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ public static Map<String, String> findDecimalFields(DatabendChangeEvent.Schema s
return decimalFields;
}

public static boolean isSchemaChanged(DatabendChangeEvent.Schema schema) {
String schemaNameStr = schema.keySchema().get("name").textValue();
if (schemaNameStr.toLowerCase().contains("schemachange")) {
return true;
}
return false;
}

private static String createTableSQL(String schemaName, String originalSQL, DatabendChangeEvent.Schema schema) {
//"CREATE TABLE debeziumcdc_customers_append (__deleted boolean, id bigint, first_name varchar, __op varchar, __source_ts_ms bigint);";
String[] parts = originalSQL.split("\\s", 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.sql.Connection;

public class AppendTableWriter extends BaseTableWriter {
public AppendTableWriter(Connection connection, String identifierQuoteCharacter) {
super(connection, identifierQuoteCharacter);
public AppendTableWriter(Connection connection, String identifierQuoteCharacter, boolean isSchemaEvolutionEnabled) {
super(connection, identifierQuoteCharacter, isSchemaEvolutionEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@
package io.debezium.server.databend.tablewriter;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.server.databend.DatabendChangeConsumer;
import io.debezium.server.databend.DatabendChangeEvent;
import io.debezium.server.databend.DatabendUtil;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.enterprise.context.Dependent;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.*;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.*;

import static io.debezium.server.databend.DatabendUtil.addParametersToStatement;

Expand All @@ -31,28 +31,62 @@ public abstract class BaseTableWriter {
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseTableWriter.class);
final Connection connection;
final String identifierQuoteCharacter;
final boolean isSchemaEvolutionEnabled;

public BaseTableWriter(final Connection connection, String identifierQuoteCharacter) {
public BaseTableWriter(final Connection connection, String identifierQuoteCharacter, boolean isSchemaEvolutionEnabled) {
this.connection = connection;
this.identifierQuoteCharacter = identifierQuoteCharacter;
this.isSchemaEvolutionEnabled = isSchemaEvolutionEnabled;
}

public void addToTable(final RelationalTable table, final List<DatabendChangeEvent> events) {
final String sql = table.prepareInsertStatement(this.identifierQuoteCharacter);
int inserts = 0;
List<DatabendChangeEvent> schemaEvolutionEvents = new ArrayList<>();
try (PreparedStatement statement = connection.prepareStatement(sql)) {
connection.setAutoCommit(false);
for (DatabendChangeEvent event : events) {
addParametersToStatement(statement, event);
statement.addBatch();

int[] batchResult = statement.executeBatch();
inserts = Arrays.stream(batchResult).sum();
System.out.printf("insert rows %d%n", inserts);
if (DatabendUtil.isSchemaChanged(event.schema()) && isSchemaEvolutionEnabled) {
schemaEvolutionEvents.add(event);
} else {
addParametersToStatement(statement, event);
statement.addBatch();
}
}

// Each batch needs to have the same schemas, so get the buffered records out
int[] batchResult = statement.executeBatch();
inserts = Arrays.stream(batchResult).sum();
System.out.printf("insert rows %d%n", inserts);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage());
}
// handle schema evolution
try {
schemaEvolution(schemaEvolutionEvents);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}

public void schemaEvolution(List<DatabendChangeEvent> events) {
for (DatabendChangeEvent event : events) {
Map<String, Object> values = event.valueAsMap();
for (Map.Entry<String, Object> entry : values.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
System.out.println("Key: " + key + ", Value: " + value);
if (entry.getKey().contains("ddl")) {
String ddlSql = entry.getValue().toString();
try (PreparedStatement statement = connection.prepareStatement(ddlSql)) {
System.out.println(ddlSql);
statement.execute(ddlSql);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage());
}
}
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ public class TableWriterFactory {
@ConfigProperty(name = "debezium.sink.databend.identifier-quote-char", defaultValue = "")
Optional<String> identifierQuoteCharacter;

@ConfigProperty(name = "debezium.sink.databend.schema.evolution", defaultValue = "false")
boolean isSchemaEvolutionEnabled;

public BaseTableWriter get(final Connection connection) {
if (upsert) {
return new UpsertTableWriter(connection, identifierQuoteCharacter.orElse(""), upsertKeepDeletes);
return new UpsertTableWriter(connection, identifierQuoteCharacter.orElse(""), upsertKeepDeletes, isSchemaEvolutionEnabled);
} else {
return new AppendTableWriter(connection, identifierQuoteCharacter.orElse(""));
return new AppendTableWriter(connection, identifierQuoteCharacter.orElse(""),isSchemaEvolutionEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public class UpsertTableWriter extends BaseTableWriter {
final boolean upsertKeepDeletes;
protected static final Logger LOGGER = LoggerFactory.getLogger(UpsertTableWriter.class);

public UpsertTableWriter(Connection connection, String identifierQuoteCharacter, boolean upsertKeepDeletes) {
super(connection, identifierQuoteCharacter);
public UpsertTableWriter(Connection connection, String identifierQuoteCharacter, boolean upsertKeepDeletes, boolean isSchemaEvolutionEnabled) {
super(connection, identifierQuoteCharacter,isSchemaEvolutionEnabled);
this.upsertKeepDeletes = upsertKeepDeletes;
appendTableWriter = new AppendTableWriter(connection, identifierQuoteCharacter);
appendTableWriter = new AppendTableWriter(connection, identifierQuoteCharacter,isSchemaEvolutionEnabled);
}

@Override
Expand All @@ -56,6 +56,7 @@ public void deleteUpsert(final RelationalTable table, final List<DatabendChangeE
final String upsertSql = table.preparedUpsertStatement(this.identifierQuoteCharacter);
int inserts = 0;
List<DatabendChangeEvent> deleteEvents = new ArrayList<>();
List<DatabendChangeEvent> schemaEvolutionEvents = new ArrayList<>();

try (PreparedStatement statement = connection.prepareStatement(upsertSql)) {
connection.setAutoCommit(false);
Expand All @@ -71,9 +72,12 @@ public void deleteUpsert(final RelationalTable table, final List<DatabendChangeE
// here use soft delete
// if true delete, we can use this condition event.keyAsMap().containsKey(deleteColumn)
deleteEvents.add(event);
} else if (DatabendUtil.isSchemaChanged(event.schema())) {
schemaEvolutionEvents.add(event);
}
}

// Each batch needs to have the same schemas, so get the buffered records out
int[] batchResult = statement.executeBatch();
inserts = Arrays.stream(batchResult).sum();

Expand All @@ -89,6 +93,12 @@ public void deleteUpsert(final RelationalTable table, final List<DatabendChangeE
throw new RuntimeException(e.getMessage());
}

//handle schema changed events
try {
schemaEvolution(schemaEvolutionEvents);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}

public void deleteFromTable(final RelationalTable table, final List<DatabendChangeEvent> events) throws Exception {
Expand All @@ -104,7 +114,6 @@ public void deleteFromTable(final RelationalTable table, final List<DatabendChan
}
}


private String getPrimaryKeyValue(String primaryKey, Map<String, Object> parameters) throws Exception {
String primaryValue = "";
for (Map.Entry<String, Object> entry : parameters.entrySet()) {
Expand Down

0 comments on commit 64be2a8

Please sign in to comment.