Skip to content

Commit

Permalink
fix: auto generate target table name
Browse files Browse the repository at this point in the history
  • Loading branch information
hantmac committed Dec 14, 2023
1 parent be89b88 commit c7e8b26
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,15 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
}

public String mapDestination(String destination) {
System.out.println("destination is :" + destination);
if (tableName.isPresent()) {
return tablePrefix.orElse("") + tableName.orElse("");
}
final String getTableName = destination
.replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse(""))
.replace(".", "_");
return tablePrefix.orElse("") + getTableName;
String[] parts = getTableName.split("_");
String tableName = parts[parts.length - 1];
return tablePrefix.orElse("") + tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ public static void createTable(String schemaName, String tableName, Connection c
*/
Map<String, DataType<?>> fl = DatabendUtil.fields(schema.valueSchema());
List<Field<?>> fields = new ArrayList<>();
// System.out.println("valueSchema: " + schema.valueSchema());
System.out.println("valueSchema: " + schema.valueSchema());

// fl.forEach((k, v) -> fields.add(DSL.field(DSL.name(k), v)));
for (Map.Entry<String, DataType<?>> entry : fl.entrySet()) {
String k = entry.getKey();
DataType<?> dataType = entry.getValue();
// System.out.println("k: " + k + ", v: " + dataType);
System.out.println("k: " + k + ", v: " + dataType);

if (dataType.toString().contains("decimal")) {
DataType<BigDecimal> decimalType = (DataType<BigDecimal>) dataType;
Expand Down Expand Up @@ -205,6 +205,10 @@ public static void addParametersToStatement(PreparedStatement statement, Databen
Map<String, Object> values = event.valueAsMap();
//DatabendChangeEvent.Schema k = event.schema();
Map<String, String> decimalFields = DatabendUtil.findDecimalFields(event.schema());
System.out.println("valueSchema: " + event.schema.valueSchema());
System.out.println("keySchema: " + event.schema.keySchema());
System.out.println("valueAsMap"+event.valueAsMap());
System.out.println("keyAsMap"+event.keyAsMap());
int index = 1;
for (String key : values.keySet()) {
if (decimalFields.containsKey(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,28 @@

import io.debezium.databend.testresources.BaseDbTest;
import io.debezium.databend.testresources.SourcePostgresqlDB;
import io.debezium.server.databend.DatabendChangeConsumer;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;

import java.time.Duration;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import javax.inject.Inject;

/**
* Integration test that verifies basic reading from PostgreSQL database and writing to databend destination.
*
* @author hantmac
* @author hantmac
*/
@QuarkusTest
@QuarkusTestResource(SourcePostgresqlDB.class)
public class DatabendChangeConsumerTest extends BaseDbTest {

@Inject
DatabendChangeConsumer consumer;

// @Test
// public void testSchemaChanges() throws Exception {
Expand Down Expand Up @@ -88,66 +93,73 @@ public class DatabendChangeConsumerTest extends BaseDbTest {
//
// }

@Test
public void testConsumingVariousDataTypes() throws Exception {
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.data_types;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_types (\n" +
" c_id INTEGER ,\n" +
" c_text TEXT,\n" +
" c_varchar VARCHAR,\n" +
" c_int INTEGER,\n" +
" c_date DATE,\n" +
" c_timestamp TIMESTAMP,\n" +
" c_timestamptz TIMESTAMPTZ,\n" +
" c_float FLOAT,\n" +
" c_decimal DECIMAL(18,4),\n" +
" c_numeric NUMERIC(18,4),\n" +
" c_interval INTERVAL,\n" +
" c_boolean BOOLEAN,\n" +
" c_uuid UUID,\n" +
" c_json JSON,\n" +
" c_jsonb JSONB\n" +
" );";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.data_types (" +
"c_id, " +
"c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " +
"c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid," +
"c_json, c_jsonb) " +
"VALUES (1, null, null, null,null,null,null," +
"null,null,null,null,null,null," +
"null,null)," +
"(2, 'val_text', 'A', 123, current_date , current_timestamp, current_timestamp," +
"'1.23'::float,'1234566.34456'::decimal,'345672123.452'::numeric, interval '1 day',false," +
"'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID," +
"'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb" +
")";
SourcePostgresqlDB.runSQL(sql);
}
@Test
public void testConsumingVariousDataTypes() throws Exception {
String sql = "\n" +
" DROP TABLE IF EXISTS inventory.data_types;\n" +
" CREATE TABLE IF NOT EXISTS inventory.data_types (\n" +
" c_id INTEGER ,\n" +
" c_text TEXT,\n" +
" c_varchar VARCHAR,\n" +
" c_int INTEGER,\n" +
" c_date DATE,\n" +
" c_timestamp TIMESTAMP,\n" +
" c_timestamptz TIMESTAMPTZ,\n" +
" c_float FLOAT,\n" +
" c_decimal DECIMAL(18,4),\n" +
" c_numeric NUMERIC(18,4),\n" +
" c_interval INTERVAL,\n" +
" c_boolean BOOLEAN,\n" +
" c_uuid UUID,\n" +
" c_json JSON,\n" +
" c_jsonb JSONB\n" +
" );";
SourcePostgresqlDB.runSQL(sql);
sql = "INSERT INTO inventory.data_types (" +
"c_id, " +
"c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " +
"c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid," +
"c_json, c_jsonb) " +
"VALUES (1, null, null, null,null,null,null," +
"null,null,null,null,null,null," +
"null,null)," +
"(2, 'val_text', 'A', 123, current_date , current_timestamp, current_timestamp," +
"'1.23'::float,'1234566.34456'::decimal,'345672123.452'::numeric, interval '1 day',false," +
"'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID," +
"'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb" +
")";
SourcePostgresqlDB.runSQL(sql);
}

@Test
public void testTargetTableName() throws Exception {
String destination = "server_name_databaseName_realTableName";
String realTableName = consumer.mapDestination(destination);
Assertions.assertEquals("realTableName", realTableName);
}

@Test
public void testConsumingArrayDataType() throws Exception {
String sql = " DROP TABLE IF EXISTS inventory.array_data;\n" +
" CREATE TABLE IF NOT EXISTS inventory.array_data (\n" +
" name text,\n" +
" pay_by_quarter integer[],\n" +
" schedule text[][]\n" +
" );\n" +
" INSERT INTO inventory.array_data\n" +
" VALUES " +
"('Carol2',\n" +
" ARRAY[20000, 25000, 25000, 25000],\n" +
" ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n" +
"('Bill',\n" +
" '{10000, 10000, 10000, 10000}',\n" +
" '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" +
" ('Carol1',\n" +
" '{20000, 25000, 25000, 25000}',\n" +
" '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" +
";";
SourcePostgresqlDB.runSQL(sql);
}
@Test
public void testConsumingArrayDataType() throws Exception {
String sql = " DROP TABLE IF EXISTS inventory.array_data;\n" +
" CREATE TABLE IF NOT EXISTS inventory.array_data (\n" +
" name text,\n" +
" pay_by_quarter integer[],\n" +
" schedule text[][]\n" +
" );\n" +
" INSERT INTO inventory.array_data\n" +
" VALUES " +
"('Carol2',\n" +
" ARRAY[20000, 25000, 25000, 25000],\n" +
" ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n" +
"('Bill',\n" +
" '{10000, 10000, 10000, 10000}',\n" +
" '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" +
" ('Carol1',\n" +
" '{20000, 25000, 25000, 25000}',\n" +
" '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" +
";";
SourcePostgresqlDB.runSQL(sql);
}


// @Test
Expand Down

0 comments on commit c7e8b26

Please sign in to comment.