Skip to content

Commit

Permalink
[Flink] Add Jdbc cdc sources and sinks (lakesoul-io#381)
Browse files Browse the repository at this point in the history
* add spark call sql syntax

Signed-off-by: maosen <[email protected]>

* fix bug

Signed-off-by: maosen <[email protected]>

* postgres cdc

Signed-off-by: maosen <[email protected]>

* support oracle

Signed-off-by: ChenYunHey <[email protected]>

* rebase main and set cdc version is 2.4.2

Signed-off-by: ChenYunHey <[email protected]>

* support servertimeZone

Signed-off-by: ChenYunHey <[email protected]>

* delete scope

Signed-off-by: ChenYunHey <[email protected]>

* scope-compile

Signed-off-by: ChenYunHey <[email protected]>

* add para plugName for pg source

Signed-off-by: ChenYunHey <[email protected]>

* include pg and oracle cdc

Signed-off-by: ChenYunHey <[email protected]>

* support in lake and out lake

Signed-off-by: ChenYunHey <[email protected]>

* fix pom.xml

Signed-off-by: ChenYunHey <[email protected]>

* jdbc cdc doc

Signed-off-by: ChenYunHey <[email protected]>

* complete jdbcCDC doc

Signed-off-by: ChenYunHey <[email protected]>

* add out of lake docs

Signed-off-by: ChenYunHey <[email protected]>

* fix code

Signed-off-by: ChenYunHey <[email protected]>

* swich LocalEnvironment to RemoteEnviroment

Signed-off-by: ChenYunHey <[email protected]>

* include flink-con-jdbc,flink-doris-con

Signed-off-by: ChenYunHey <[email protected]>

* Add additional documentation

Signed-off-by: ChenYunHey <[email protected]>

* support ADB

Signed-off-by: ChenYunHey <[email protected]>

* fix fetch primary key logical

Signed-off-by: ChenYunHey <[email protected]>

---------

Signed-off-by: maosen <[email protected]>
Signed-off-by: ChenYunHey <[email protected]>
Co-authored-by: maosen <[email protected]>
Co-authored-by: moresun <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2024
1 parent b730962 commit bad1f31
Show file tree
Hide file tree
Showing 20 changed files with 889 additions and 389 deletions.
15 changes: 12 additions & 3 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ SPDX-License-Identifier: Apache-2.0
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<log4j.version>2.17.2</log4j.version>
<cdc.version>2.4.2</cdc.version>
<local.scope>provided</local.scope>
</properties>

Expand Down Expand Up @@ -82,7 +83,6 @@ SPDX-License-Identifier: Apache-2.0
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
Expand Down Expand Up @@ -121,15 +121,20 @@ SPDX-License-Identifier: Apache-2.0
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>2.3.0</version>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
Expand Down Expand Up @@ -531,6 +536,10 @@ SPDX-License-Identifier: Apache-2.0
<includes>
<include>com.dmetasoul:lakesoul-flink</include>
<include>com.ververica:flink-sql-connector-mysql-cdc</include>
<include>com.ververica:flink-sql-connector-postgres-cdc</include>
<include>com.ververica:flink-sql-connector-oracle-cdc</include>
<include>org.apache.flink:flink-connector-jdbc</include>
<include>org.apache.flink:flink-doris-connector-1.17</include>
<include>com.dmetasoul:lakesoul-common</include>
<include>com.dmetasoul:lakesoul-io-java</include>
<include>com.github.jnr:*</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,5 @@
import java.util.List;

public interface ExternalDBManager {

List<String> listTables();


void importOrSyncLakeSoulTable(String tableName) throws IOException;

void importOrSyncLakeSoulNamespace(String namespace);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0
package com.dmetasoul.lakesoul.meta.external;

import com.alibaba.fastjson.JSONObject;
import com.dmetasoul.lakesoul.meta.DBManager;

public class NameSpaceManager implements ExternalDBManager{
private final DBManager lakesoulDBManager = new DBManager();

@Override
public void importOrSyncLakeSoulNamespace(String namespace) {
if (lakesoulDBManager.getNamespaceByNamespace(namespace) != null) {
return;
}
lakesoulDBManager.createNewNamespace(namespace, new JSONObject().toJSONString(), "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,15 @@

import com.alibaba.fastjson.JSONObject;
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.DataBaseProperty;
import com.dmetasoul.lakesoul.meta.entity.TableNameId;
import com.dmetasoul.lakesoul.meta.external.DBConnector;
import com.dmetasoul.lakesoul.meta.external.ExternalDBManager;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Table;
import io.debezium.relational.Tables;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.CDC_CHANGE_COLUMN_DEFAULT;

public class MysqlDBManager implements ExternalDBManager {

public static final int DEFAULT_MYSQL_PORT = 3306;
Expand All @@ -42,7 +27,6 @@ public class MysqlDBManager implements ExternalDBManager {
private final int hashBucketNum;
private final boolean useCdc;
MysqlDataTypeConverter converter = new MysqlDataTypeConverter();
MySqlAntlrDdlParser parser = new MySqlAntlrDdlParser();
private final HashSet<String> excludeTables;
private final HashSet<String> includeTables;
private final String[] filterTables = new String[]{"sys_config"};
Expand Down Expand Up @@ -73,79 +57,6 @@ public MysqlDBManager(String dbName, String user, String passwd, String host, St
this.useCdc = useCdc;
}


@Override
public List<String> listTables() {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
String sql = "show tables";
List<String> list = new ArrayList<>();
try {
conn = dbConnector.getConn();
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
String tableName = rs.getString(String.format("Tables_in_%s", dbName));
if (includeTables.contains(tableName) | !excludeTables.contains(tableName)) {
list.add(tableName);
}
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
dbConnector.closeConn(rs, pstmt, conn);
}
return list;
}


@Override
public void importOrSyncLakeSoulTable(String tableName) throws IOException {
if (!includeTables.contains(tableName) && excludeTables.contains(tableName)) {
System.out.printf("Table %s is excluded by exclude table list%n", tableName);
return;
}
String mysqlDDL = showCreateTable(tableName);
Tuple2<StructType, List<String>> schemaAndPK = ddlToSparkSchema(tableName, mysqlDDL);
if (schemaAndPK.f1.isEmpty()) {
throw new IllegalStateException(
String.format("Table %s has no primary key, table with no Primary Keys is not supported",
tableName));
}

boolean exists = lakesoulDBManager.isTableExistsByTableName(tableName, dbName);
if (exists) {
// sync lakesoul table schema only
TableNameId tableId = lakesoulDBManager.shortTableName(tableName, dbName);
String newTableSchema = schemaAndPK.f0.json();

lakesoulDBManager.updateTableSchema(tableId.getTableId(), newTableSchema);
} else {
// import new lakesoul table with schema, pks and properties
try {
String tableId = EXTERNAL_MYSQL_TABLE_PREFIX + UUID.randomUUID();

String qualifiedPath =
FlinkUtil.makeQualifiedPath(new Path(new Path(lakesoulTablePathPrefix, dbName), tableName))
.toString();


String tableSchema = schemaAndPK.f0.json();
List<String> priKeys = schemaAndPK.f1;
String partitionsInTableInfo = DBUtil.formatTableInfoPartitionsField(priKeys, Collections.emptyList());
JSONObject json = new JSONObject();
json.put("hashBucketNum", String.valueOf(hashBucketNum));
json.put("lakesoul_cdc_change_column", CDC_CHANGE_COLUMN_DEFAULT);

lakesoulDBManager.createNewTable(tableId, dbName, tableName, qualifiedPath, tableSchema, json,
partitionsInTableInfo);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
public void importOrSyncLakeSoulNamespace(String namespace) {
if (lakesoulDBManager.getNamespaceByNamespace(namespace) != null) {
Expand Down Expand Up @@ -175,23 +86,4 @@ public String showCreateTable(String tableName) {
return result;
}

public Tuple2<StructType, List<String>> ddlToSparkSchema(String tableName, String ddl) {
final StructType[] stNew = {new StructType()};

parser.parse(ddl, new Tables());
Table table = parser.databaseTables().forTable(null, null, tableName);
table.columns().forEach(col -> {
String name = col.name();
DataType datatype = converter.schemaBuilder(col);
if (datatype == null) {
throw new IllegalStateException("Unhandled data types");
}
stNew[0] = stNew[0].add(name, datatype, col.isOptional());
});
//if uescdc add lakesoulcdccolumns
if (useCdc) {
stNew[0] = stNew[0].add(CDC_CHANGE_COLUMN_DEFAULT, DataTypes.StringType, true);
}
return Tuple2.of(stNew[0], table.primaryKeyColumnNames());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package com.dmetasoul.lakesoul.meta.external.mysql;

import com.dmetasoul.lakesoul.meta.external.jdbc.JdbcDataTypeConverter;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Column;
import io.debezium.util.Strings;
import org.apache.spark.sql.types.DataType;
Expand Down Expand Up @@ -94,12 +93,5 @@ protected boolean isGeometryCollection(String upperCaseTypeName) {
|| upperCaseTypeName.endsWith(".GEOMCOLLECTION");
}

protected List<String> extractEnumAndSetOptions(Column column) {
return MySqlAntlrDdlParser.extractEnumAndSetOptions(column.enumValues());
}

protected String extractEnumAndSetOptionsAsString(Column column) {
return Strings.join(",", extractEnumAndSetOptions(column));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0
package com.dmetasoul.lakesoul.meta.external.oracle;

import com.dmetasoul.lakesoul.meta.external.ExternalDBManager;

import java.io.IOException;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DataBaseProperty;
Expand All @@ -13,21 +20,25 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

public class OracleDBManager implements ExternalDBManager {

private final String dbName;

private final DBManager lakesoulDBManager = new DBManager();

private final HashSet<String> includeTables = null;

public static final int DEFAULT_ORACLE_PORT = 1521;
private final DBConnector dbConnector;

public OracleDBManager(String dbName,
String user,
String passwd,
String host,
String port
) {
this.dbName = dbName;


DataBaseProperty dataBaseProperty = new DataBaseProperty();
dataBaseProperty.setDriver("oracle.jdbc.driver.OracleDriver");
String url = "jdbc:oracle:thin:@" + host + ":" + port + "/" + dbName;
Expand Down Expand Up @@ -64,21 +75,6 @@ public boolean isOpen() {
return opened;
}

public ResultSet showTable(String sql){
Connection con = null;
PreparedStatement psmt = null;
ResultSet rs = null;
try {
con = dbConnector.getConn();
psmt = con.prepareStatement(sql);
rs = psmt.executeQuery();
} catch (SQLException e) {
throw new RuntimeException(e);
}
return rs;
}

@Override
public List<String> listTables() {
Connection conn = null;
PreparedStatement pstmt = null;
Expand All @@ -92,6 +88,7 @@ public List<String> listTables() {
while (rs.next()) {
String tableName = rs.getString("table_name");
list.add(tableName);

}
} catch (SQLException e) {
e.printStackTrace();
Expand All @@ -101,11 +98,6 @@ public List<String> listTables() {
return list;
}

@Override
public void importOrSyncLakeSoulTable(String tableName) {

}

@Override
public void importOrSyncLakeSoulNamespace(String namespace) {
if (lakesoulDBManager.getNamespaceByNamespace(namespace) != null){
Expand Down Expand Up @@ -134,4 +126,4 @@ public String showCreateTable(String tableName) {
}
return result;
}
}
}
Loading

0 comments on commit bad1f31

Please sign in to comment.