Skip to content

Commit

Permalink
[Flink] Support oracle cdc source (lakesoul-io#375)
Browse files Browse the repository at this point in the history
* suport oracle cdc for flink-cdc is 2.3.0

Signed-off-by: ChenYunHey <[email protected]>

* suport oracle cdc for flink-cdc is 2.3.0

Signed-off-by: ChenYunHey <[email protected]>

* rebase main

Signed-off-by: ChenYunHey <[email protected]>

---------

Signed-off-by: ChenYunHey <[email protected]>
  • Loading branch information
ChenYunHey authored Dec 16, 2023
1 parent 74c28a2 commit 3fbf17d
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 6 deletions.
6 changes: 5 additions & 1 deletion lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ SPDX-License-Identifier: Apache-2.0
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.dmetasoul.lakesoul.meta.external.oracle;

import com.alibaba.fastjson.JSONObject;
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DataBaseProperty;
import com.dmetasoul.lakesoul.meta.external.DBConnector;
import com.dmetasoul.lakesoul.meta.external.ExternalDBManager;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

public class OracleDBManager implements ExternalDBManager {

private final String dbName;
private final DBManager lakesoulDBManager = new DBManager();
private final HashSet<String> includeTables = null;
public static final int DEFAULT_ORACLE_PORT = 1521;
private final DBConnector dbConnector;
public OracleDBManager(String dbName,
String user,
String passwd,
String host,
String port
) {
this.dbName = dbName;
DataBaseProperty dataBaseProperty = new DataBaseProperty();
dataBaseProperty.setDriver("oracle.jdbc.driver.OracleDriver");
String url = "jdbc:oracle:thin:@" + host + ":" + port + "/" + dbName;
dataBaseProperty.setUrl(url);
dataBaseProperty.setUsername(user);
dataBaseProperty.setPassword(passwd);
dbConnector = new DBConnector(dataBaseProperty);

}

public OracleDBManager(String dbName, DBConnector dbConnector) {
this.dbName = dbName;
this.dbConnector = dbConnector;
}

public boolean isOpen() {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
String sql = "select status from v$instance";
boolean opened = false;
try {
conn = dbConnector.getConn();
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
opened = rs.getString("STATUS").equals("OPEN");
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
dbConnector.closeConn(rs, pstmt, conn);
}
return opened;
}

public ResultSet showTable(String sql){
Connection con = null;
PreparedStatement psmt = null;
ResultSet rs = null;
try {
con = dbConnector.getConn();
psmt = con.prepareStatement(sql);
rs = psmt.executeQuery();
} catch (SQLException e) {
throw new RuntimeException(e);
}
return rs;
}

@Override
public List<String> listTables() {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
String sql = "SELECT table_name FROM user_tables ORDER BY table_name";
List<String> list = new ArrayList<>();
try {
conn = dbConnector.getConn();
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
String tableName = rs.getString("table_name");
list.add(tableName);
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
dbConnector.closeConn(rs, pstmt, conn);
}
return list;
}

@Override
public void importOrSyncLakeSoulTable(String tableName) {

}

@Override
public void importOrSyncLakeSoulNamespace(String namespace) {
if (lakesoulDBManager.getNamespaceByNamespace(namespace) != null){
return;
}
lakesoulDBManager.createNewNamespace(namespace,new JSONObject().toJSONString(),"");
}

public String showCreateTable(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
String sql = String.format("select dbms_metadata.get_ddl('TABLE','%s') from dual", tableName);
String result = null;
try {
conn = dbConnector.getConn();
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
result = rs.getString(String.format("DBMS_METADATA.GET_DDL('TABLE','%s')",tableName));
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
dbConnector.closeConn(rs, pstmt, conn);
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package org.apache.flink.lakesoul.entry;

import com.dmetasoul.lakesoul.meta.external.oracle.OracleDBManager;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.lakesoul.sink.LakeSoulMultiTableSinkStreamBuilder;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.BinaryDebeziumDeserializationSchema;
import org.apache.flink.lakesoul.types.BinarySourceRecord;
import org.apache.flink.lakesoul.types.BinarySourceRecordSerializer;
import org.apache.flink.lakesoul.types.LakeSoulRecordConvert;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.lakesoul.tool.JobOptions.*;
import static org.apache.flink.lakesoul.tool.LakeSoulDDLSinkOptions.*;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.BUCKET_PARALLELISM;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.SERVER_TIME_ZONE;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.SOURCE_PARALLELISM;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.WAREHOUSE_PATH;

public class OracleCdc {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
String dbName = parameter.get(SOURCE_DB_DB_NAME.key());
String userName = parameter.get(SOURCE_DB_USER.key());
String passWord = parameter.get(SOURCE_DB_PASSWORD.key());
String[] schemaList = parameter.get(SOURCE_DB_SCHEMA_LIST.key()).split(",");
String host = parameter.get(SOURCE_DB_HOST.key());
int splitSize = parameter.getInt(SOURCE_DB_SPLIT_SIZE.key(), SOURCE_DB_SPLIT_SIZE.defaultValue());
int port = parameter.getInt(SOURCE_DB_PORT.key(), OracleDBManager.DEFAULT_ORACLE_PORT);
String databasePrefixPath = parameter.get(WAREHOUSE_PATH.key());
String serverTimezone = parameter.get(SERVER_TIME_ZONE.key(), SERVER_TIME_ZONE.defaultValue());
int sourceParallelism = parameter.getInt(SOURCE_PARALLELISM.key());
int bucketParallelism = parameter.getInt(BUCKET_PARALLELISM.key());
int checkpointInterval = parameter.getInt(JOB_CHECKPOINT_INTERVAL.key(),
JOB_CHECKPOINT_INTERVAL.defaultValue());
String tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key());
String flinkCheckpoint = parameter.get(FLINK_CHECKPOINT.key());

String[] tables = tableList.split(",");
String[] userTables = new String[tables.length];
for (int i = 0; i < tables.length; i++) {
userTables[i] = tables[i].toUpperCase();
}

OracleDBManager dbManager = new OracleDBManager(dbName,
userName,
passWord,
host,
Integer.toString(port));

dbManager.importOrSyncLakeSoulNamespace(dbName);
Configuration conf = new Configuration();
conf.set(LakeSoulSinkOptions.USE_CDC, true);
conf.set(LakeSoulSinkOptions.isMultiTableSource, true);
conf.set(SOURCE_PARALLELISM, sourceParallelism);
conf.set(BUCKET_PARALLELISM, bucketParallelism);
conf.set(SERVER_TIME_ZONE, serverTimezone);
conf.set(WAREHOUSE_PATH, databasePrefixPath);
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().registerTypeWithKryoSerializer(BinarySourceRecord.class, BinarySourceRecordSerializer.class);

ParameterTool pt = ParameterTool.fromMap(conf.toMap());
env.getConfig().setGlobalJobParameters(pt);
env.enableCheckpointing(checkpointInterval);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(4034);
CheckpointingMode checkpointingMode = CheckpointingMode.EXACTLY_ONCE;
if (JOB_CHECKPOINT_MODE.defaultValue().equals("AT_LEAST_ONCE")) {
checkpointingMode = CheckpointingMode.AT_LEAST_ONCE;
}
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.getCheckpointConfig().setCheckpointingMode(checkpointingMode);
env.getCheckpointConfig()
.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

env.getCheckpointConfig().setCheckpointStorage(flinkCheckpoint);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3,
Time.of(10, TimeUnit.MINUTES),
Time.of(20, TimeUnit.SECONDS)
));

LakeSoulRecordConvert lakeSoulRecordConvert = new LakeSoulRecordConvert(conf, conf.getString(SERVER_TIME_ZONE));

Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy", "online_catalog");
debeziumProperties.setProperty("log.mining.continuous.mine", "true");
JdbcIncrementalSource<BinarySourceRecord> oracleChangeEventSource =
new OracleSourceBuilder()
.hostname(host)
.schemaList(schemaList)
.tableList(userTables)
.databaseList(dbName)
.port(port)
.username(userName)
.password(passWord)
.deserializer(new BinaryDebeziumDeserializationSchema(lakeSoulRecordConvert, conf.getString(WAREHOUSE_PATH)))
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(splitSize)
.build();

LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
context.env = env;
context.conf = conf;
LakeSoulMultiTableSinkStreamBuilder builder = new LakeSoulMultiTableSinkStreamBuilder(oracleChangeEventSource, context, lakeSoulRecordConvert);
DataStreamSource<BinarySourceRecord> source = builder.buildMultiTableSource("Oracle Source");
DataStream<BinarySourceRecord> stream = builder.buildHashPartitionedCDCStream(source);
DataStreamSink<BinarySourceRecord> dmSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From Oracle Database" + dbName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ public class LakeSoulSinkOptions {
.defaultValue(null)
.withDescription("flink_warehouse_dir");

public static final ConfigOption<String> SOURCE_DB_SCHEMA_LIST = ConfigOptions
.key("source_db.schemaList")
.stringType()
.noDefaultValue()
.withDescription("source database schemaList");

public static final ConfigOption<String> SOURCE_DB_SCHEMA_TABLES = ConfigOptions
.key("source_db.schema_tables")
.stringType()
.defaultValue("")
.withDescription("list tables of a schema");

public static final ConfigOption<Integer> SOURCE_DB_SPLIT_SIZE = ConfigOptions
.key("source_db.splitSize")
.intType()
.defaultValue(1024)
.withDescription("The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.");

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.debezium.data.EnumSet;
import io.debezium.data.Envelope;
import io.debezium.data.Json;
import io.debezium.time.MicroDuration;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.data.SpecialValueDecimal;
Expand Down Expand Up @@ -42,10 +43,7 @@
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.CDC_CHANGE_COLUMN;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.CDC_CHANGE_COLUMN_DEFAULT;
Expand Down Expand Up @@ -240,6 +238,8 @@ private LogicalType otherLogicalType(Schema fieldSchema) {
return new VarCharType(nullable, Integer.MAX_VALUE);
case Time.SCHEMA_NAME:
case MicroTime.SCHEMA_NAME:
case MicroDuration.SCHEMA_NAME:
return new BigIntType(nullable);
case NanoTime.SCHEMA_NAME:
return new BigIntType(nullable);
case Timestamp.SCHEMA_NAME:
Expand All @@ -258,6 +258,8 @@ private LogicalType otherLogicalType(Schema fieldSchema) {
case ZonedTimestamp.SCHEMA_NAME:
return new LocalZonedTimestampType(nullable, LocalZonedTimestampType.DEFAULT_PRECISION);
case Geometry.LOGICAL_NAME:
case VariableScaleDecimal.LOGICAL_NAME:
return new DecimalType(nullable, 38, 30);
case Point.LOGICAL_NAME:
paras = fieldSchema.field("wkb").schema().parameters();
int byteLen = Integer.MAX_VALUE;
Expand Down Expand Up @@ -432,6 +434,12 @@ private void otherTypeWrite(BinaryRowWriter writer, int index,
case ZonedTimestamp.SCHEMA_NAME:
writeUTCTimeStamp(writer, index, fieldValue, fieldSchema);
break;
case VariableScaleDecimal.LOGICAL_NAME:
writeDecimal(writer, index, fieldValue, fieldSchema);
break;
case MicroDuration.SCHEMA_NAME:
writeLong(writer, index, fieldValue);
break;
// Geometry and Point can not support now
// case Geometry.LOGICAL_NAME:
// Object object = convertToGeometry(fieldValue, fieldSchema);
Expand Down Expand Up @@ -483,7 +491,12 @@ public Object convertToDecimal(Object dbzObj, Schema schema) {
}
}
Map<String, String> paras = schema.parameters();
return DecimalData.fromBigDecimal(bigDecimal, Integer.parseInt(paras.get("connect.decimal.precision")), Integer.parseInt(paras.get("scale")));
if (paras==null){
return DecimalData.fromBigDecimal(bigDecimal, 38, 30);
}
else {
return DecimalData.fromBigDecimal(bigDecimal, Integer.parseInt(paras.get("connect.decimal.precision")), Integer.parseInt(paras.get("scale")));
}
}

public void writeDecimal(BinaryRowWriter writer, int index, Object dbzObj, Schema schema) {
Expand Down

0 comments on commit 3fbf17d

Please sign in to comment.